vortex_layout/layouts/chunked/
evaluator.rsuse async_trait::async_trait;
use futures::future::{ready, try_join_all};
use futures::FutureExt;
use vortex_array::array::{ChunkedArray, ConstantArray};
use vortex_array::{ArrayDType, ArrayData, Canonical, IntoArrayData, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::pruning::PruningPredicate;
use vortex_expr::ExprRef;
use vortex_scalar::Scalar;
use vortex_scan::{AsyncEvaluator, RowMask};
use crate::layouts::chunked::reader::ChunkedReader;
use crate::reader::LayoutScanExt;
#[async_trait(?Send)]
impl AsyncEvaluator for ChunkedReader {
async fn evaluate(self: &Self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData> {
let dtype = expr
.evaluate(&Canonical::empty(self.dtype())?.into_array())?
.dtype()
.clone();
let pruning_predicate = PruningPredicate::try_new(&expr);
let pruning_mask = if let Some(predicate) = pruning_predicate {
if let Some(stats_table) = self.stats_table().await? {
predicate
.evaluate(stats_table.array())?
.map(|mask| mask.into_bool())
.transpose()?
.map(|mask| mask.boolean_buffer())
} else {
None
}
} else {
None
};
let mut chunks = Vec::with_capacity(self.nchunks());
let mut row_offset = 0;
for chunk_idx in 0..self.nchunks() {
let chunk_reader = self.child(chunk_idx)?;
let chunk_len = chunk_reader.layout().row_count();
let chunk_range = row_offset..row_offset + chunk_len;
row_offset += chunk_len;
if row_mask.is_disjoint(chunk_range.clone()) {
continue;
}
if let Some(pruning_mask) = &pruning_mask {
if pruning_mask.value(chunk_idx) {
let false_array = ConstantArray::new(
Scalar::bool(false, dtype.nullability()),
row_mask.true_count(),
);
chunks.push(ready(Ok(false_array.into_array())).boxed_local());
continue;
}
}
let chunk_mask = row_mask
.slice(chunk_range.start, chunk_range.end)?
.shift(chunk_range.start)?;
let expr = expr.clone();
chunks.push(chunk_reader.evaluate(chunk_mask, expr).boxed_local());
}
let chunks = try_join_all(chunks).await?;
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use futures::executor::block_on;
use vortex_array::array::{BoolArray, ChunkedArray, ConstantArray};
use vortex_array::{ArrayLen, IntoArrayData, IntoArrayVariant};
use vortex_buffer::buffer;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType};
use vortex_error::VortexExpect;
use vortex_expr::{gt, lit, Identity};
use vortex_scan::RowMask;
use crate::layouts::chunked::writer::ChunkedLayoutWriter;
use crate::segments::test::TestSegments;
use crate::strategies::LayoutWriterExt;
use crate::LayoutData;
fn chunked_layout() -> (Arc<TestSegments>, LayoutData) {
let mut segments = TestSegments::default();
let layout = ChunkedLayoutWriter::new(
&DType::Primitive(PType::I32, NonNullable),
Default::default(),
)
.push_all(
&mut segments,
[
Ok(buffer![1, 2, 3].into_array()),
Ok(buffer![4, 5, 6].into_array()),
Ok(buffer![7, 8, 9].into_array()),
],
)
.unwrap();
(Arc::new(segments), layout)
}
#[test]
fn test_chunked_scan() {
block_on(async {
let (segments, layout) = chunked_layout();
let result = layout
.reader(segments, Default::default())
.unwrap()
.evaluate(
RowMask::new_valid_between(0, layout.row_count()),
Identity::new_expr(),
)
.await
.unwrap()
.into_primitive()
.unwrap();
assert_eq!(result.len(), 9);
assert_eq!(result.as_slice::<i32>(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
})
}
#[test]
fn test_chunked_pruning_mask() {
block_on(async {
let (segments, layout) = chunked_layout();
let row_count = layout.row_count();
let reader = layout.reader(segments, Default::default()).unwrap();
let expr = gt(Identity::new_expr(), lit(7));
let result = reader
.evaluate(RowMask::new_valid_between(0, row_count), expr.clone())
.await
.unwrap();
let result = ChunkedArray::try_from(result).unwrap();
assert_eq!(result.nchunks(), 3);
ConstantArray::try_from(result.chunk(0).unwrap())
.vortex_expect("Expected first chunk to be pruned");
ConstantArray::try_from(result.chunk(1).unwrap())
.vortex_expect("Expected second chunk to be pruned");
BoolArray::try_from(result.chunk(2).unwrap())
.vortex_expect("Expected third chunk to be evaluated");
})
}
}