use std::ops::Range;
use std::sync::Arc;
use std::task::Poll;
use futures::pin_mut;
use futures_util::future::poll_fn;
use futures_util::{stream, TryFutureExt};
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::ContextRef;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_io::VortexReadAt;
use vortex_layout::{LayoutData, LayoutReader};
use vortex_scan::Scan;
use crate::v2::segments::cache::SegmentCache;
pub struct VortexFile<R> {
pub(crate) ctx: ContextRef,
pub(crate) layout: LayoutData,
pub(crate) segments: Arc<SegmentCache<R>>,
#[allow(dead_code)]
pub(crate) splits: Arc<[Range<u64>]>,
}
impl<R> VortexFile<R> {}
impl<R: VortexReadAt + Unpin> VortexFile<R> {
pub fn row_count(&self) -> u64 {
self.layout.row_count()
}
pub fn dtype(&self) -> &DType {
self.layout.dtype()
}
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + '_> {
let reader: Arc<dyn LayoutReader> = self
.layout
.reader(self.segments.clone(), self.ctx.clone())?;
let result_dtype = scan.result_dtype(self.dtype())?;
let stream = stream::once(async move {
let row_range = 0..self.layout.row_count();
let eval = scan
.range_scan(row_range)?
.evaluate_async(reader.evaluator());
pin_mut!(eval);
poll_fn(|cx| {
loop {
if let Poll::Ready(array) = eval.try_poll_unpin(cx) {
return Poll::Ready(array);
}
let drive = self.segments.drive();
pin_mut!(drive);
match drive.try_poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
})
.await
});
Ok(ArrayStreamAdapter::new(result_dtype, stream))
}
}