1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::future::Future;
use std::sync::Arc;

use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
use futures::StreamExt;
use vortex_array::arrow::infer_schema;
use vortex_array::ArrayData;
use vortex_error::{VortexError, VortexResult};
use vortex_io::VortexReadAt;

use super::VortexFileArrayStream;

fn vortex_to_arrow_error(error: VortexError) -> ArrowError {
    ArrowError::ExternalError(Box::new(error))
}

fn vortex_to_arrow(result: VortexResult<ArrayData>) -> Result<RecordBatch, ArrowError> {
    result
        .and_then(RecordBatch::try_from)
        .map_err(vortex_to_arrow_error)
}

pub trait AsyncRuntime {
    fn block_on<F: Future>(&self, fut: F) -> F::Output;
}

impl AsyncRuntime for tokio::runtime::Runtime {
    fn block_on<F: Future>(&self, fut: F) -> F::Output {
        self.block_on(fut)
    }
}

pub struct VortexRecordBatchReader<'a, R, AR> {
    stream: VortexFileArrayStream<R>,
    arrow_schema: SchemaRef,
    runtime: &'a AR,
}

impl<'a, R, AR> VortexRecordBatchReader<'a, R, AR>
where
    R: VortexReadAt + Unpin + 'static,
    AR: AsyncRuntime,
{
    pub fn try_new(
        stream: VortexFileArrayStream<R>,
        runtime: &'a AR,
    ) -> VortexResult<VortexRecordBatchReader<'a, R, AR>> {
        let arrow_schema = Arc::new(infer_schema(stream.dtype())?);
        Ok(VortexRecordBatchReader {
            stream,
            arrow_schema,
            runtime,
        })
    }
}

impl<R, AR> Iterator for VortexRecordBatchReader<'_, R, AR>
where
    R: VortexReadAt + Unpin + 'static,
    AR: AsyncRuntime,
{
    type Item = Result<RecordBatch, ArrowError>;

    fn next(&mut self) -> Option<Self::Item> {
        let maybe_result = self.runtime.block_on(self.stream.next());
        maybe_result.map(vortex_to_arrow)
    }
}

impl<R, AR> RecordBatchReader for VortexRecordBatchReader<'_, R, AR>
where
    R: VortexReadAt + Unpin + 'static,
    AR: AsyncRuntime,
{
    fn schema(&self) -> SchemaRef {
        self.arrow_schema.clone()
    }

    fn next_batch(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
        self.next().transpose()
    }
}