vortex_datafusion/memory/
stream.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult};
use datafusion_execution::RecordBatchStream;
use futures::Stream;
use vortex_array::array::ChunkedArray;
use vortex_array::IntoArrayVariant;
use vortex_dtype::Field;

pub(crate) struct VortexRecordBatchStream {
    pub(crate) schema_ref: SchemaRef,

    pub(crate) idx: usize,
    pub(crate) num_chunks: usize,
    pub(crate) chunks: ChunkedArray,

    pub(crate) projection: Vec<Field>,
}

impl Stream for VortexRecordBatchStream {
    type Item = DFResult<RecordBatch>;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.idx >= self.num_chunks {
            return Poll::Ready(None);
        }

        // Grab next chunk, project and convert to Arrow.
        let chunk = self.chunks.chunk(self.idx)?;
        self.idx += 1;

        let struct_array = chunk
            .into_struct()
            .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?;

        let projected_struct = struct_array
            .project(&self.projection)
            .map_err(|vortex_err| {
                exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}")
            })?;

        Poll::Ready(Some(Ok(projected_struct.into_record_batch()?)))
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (self.num_chunks, Some(self.num_chunks))
    }
}

impl RecordBatchStream for VortexRecordBatchStream {
    fn schema(&self) -> SchemaRef {
        Arc::clone(&self.schema_ref)
    }
}