use std::future::Future;
use futures_util::TryStreamExt;
use vortex_error::VortexResult;
use crate::array::ChunkedArray;
use crate::stream::take_rows::TakeRows;
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::{ArrayData, IntoArrayData};
pub trait ArrayStreamExt: ArrayStream {
fn into_array_data(self) -> impl Future<Output = VortexResult<ArrayData>>
where
Self: Sized,
{
async move {
let dtype = self.dtype().clone();
let mut chunks: Vec<ArrayData> = self.try_collect().await?;
if chunks.len() == 1 {
Ok(chunks.remove(0))
} else {
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}
}
}
fn take_rows(self, indices: ArrayData) -> VortexResult<impl ArrayStream>
where
Self: Sized,
{
Ok(ArrayStreamAdapter::new(
self.dtype().clone(),
TakeRows::try_new(self, indices)?,
))
}
}
impl<S: ArrayStream> ArrayStreamExt for S {}