vortex_file/read/
handle.rsuse std::collections::BTreeSet;
use std::sync::Arc;
use futures::stream;
use itertools::Itertools;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_io::{IoDispatcher, VortexReadAt};
use super::splits::SplitsAccumulator;
use super::{LayoutMessageCache, LayoutReader, RowMask, VortexReadArrayStream};
use crate::read::buffered::{BufferedLayoutReader, ReadArray};
use crate::read::splits::ReadRowMask;
#[derive(Clone)]
pub struct VortexReadHandle<R> {
input: R,
dtype: Arc<DType>,
row_count: u64,
splits: Vec<(usize, usize)>,
layout_reader: Arc<dyn LayoutReader>,
filter_reader: Option<Arc<dyn LayoutReader>>,
messages_cache: LayoutMessageCache,
row_mask: Option<RowMask>,
io_dispatcher: Arc<IoDispatcher>,
}
impl<R: VortexReadAt + Unpin> VortexReadHandle<R> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn try_new(
input: R,
layout_reader: Arc<dyn LayoutReader>,
filter_reader: Option<Arc<dyn LayoutReader>>,
messages_cache: LayoutMessageCache,
dtype: Arc<DType>,
row_count: u64,
row_mask: Option<RowMask>,
io_dispatcher: Arc<IoDispatcher>,
) -> VortexResult<Self> {
let mut reader_splits = BTreeSet::new();
layout_reader.add_splits(0, &mut reader_splits)?;
if let Some(ref fr) = filter_reader {
fr.add_splits(0, &mut reader_splits)?;
}
reader_splits.insert(0);
reader_splits.insert(row_count as usize);
let splits = reader_splits.into_iter().tuple_windows().collect();
Ok(Self {
input,
dtype,
row_count,
splits,
layout_reader,
filter_reader,
messages_cache,
row_mask,
io_dispatcher,
})
}
pub fn dtype(&self) -> &DType {
self.dtype.as_ref()
}
pub fn row_count(&self) -> u64 {
self.row_count
}
pub fn splits(&self) -> &[(usize, usize)] {
&self.splits
}
pub fn into_stream(self) -> VortexReadArrayStream<R> {
let splits_vec = Vec::from_iter(self.splits().iter().copied());
let split_accumulator = SplitsAccumulator::new(splits_vec.into_iter(), self.row_mask);
let splits_stream = stream::iter(split_accumulator);
let mask_iterator = if let Some(fr) = &self.filter_reader {
Box::new(BufferedLayoutReader::new(
self.input.clone(),
self.io_dispatcher.clone(),
splits_stream,
ReadRowMask::new(fr.clone()),
self.messages_cache.clone(),
)) as _
} else {
Box::new(splits_stream) as _
};
let array_reader = BufferedLayoutReader::new(
self.input,
self.io_dispatcher,
mask_iterator,
ReadArray::new(self.layout_reader),
self.messages_cache,
);
VortexReadArrayStream::new(self.dtype, self.row_count, array_reader)
}
pub fn stream_range(
mut self,
begin: usize,
end: usize,
) -> VortexResult<VortexReadArrayStream<R>> {
self.row_mask = match self.row_mask {
Some(mask) => Some(mask.and_rowmask(RowMask::new_valid_between(begin, end))?),
None => Some(RowMask::new_valid_between(begin, end)),
};
Ok(self.into_stream())
}
}