1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::future::Future;

use futures_util::TryStreamExt;
use vortex_error::VortexResult;

use crate::array::ChunkedArray;
use crate::stream::take_rows::TakeRows;
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::{ArrayData, IntoArrayData};

pub trait ArrayStreamExt: ArrayStream {
    /// Collect the stream into a single `ArrayData`.
    ///
    /// If the stream yields multiple chunks, they will be returned as a [`ChunkedArray`].
    fn into_array_data(self) -> impl Future<Output = VortexResult<ArrayData>>
    where
        Self: Sized,
    {
        async move {
            let dtype = self.dtype().clone();
            let mut chunks: Vec<ArrayData> = self.try_collect().await?;
            if chunks.len() == 1 {
                Ok(chunks.remove(0))
            } else {
                Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
            }
        }
    }

    /// Perform a row-wise selection on the stream from an array of sorted indicessss.
    fn take_rows(self, indices: ArrayData) -> VortexResult<impl ArrayStream>
    where
        Self: Sized,
    {
        Ok(ArrayStreamAdapter::new(
            self.dtype().clone(),
            TakeRows::try_new(self, indices)?,
        ))
    }
}

impl<S: ArrayStream> ArrayStreamExt for S {}