vortex_layout/layouts/chunked/
writer.rsuse bytes::Bytes;
use vortex_array::stats::{as_stat_bitset_bytes, Stat, PRUNING_STATS};
use vortex_array::{ArrayDType, ArrayData};
use vortex_dtype::DType;
use vortex_error::VortexResult;
use crate::data::LayoutData;
use crate::layouts::chunked::stats_table::StatsAccumulator;
use crate::layouts::chunked::ChunkedLayout;
use crate::layouts::flat::writer::FlatLayoutWriter;
use crate::layouts::flat::FlatLayout;
use crate::segments::SegmentWriter;
use crate::strategies::{LayoutStrategy, LayoutWriter, LayoutWriterExt};
pub struct ChunkedLayoutOptions {
pub chunk_stats: Vec<Stat>,
pub chunk_strategy: Box<dyn LayoutStrategy>,
}
impl Default for ChunkedLayoutOptions {
fn default() -> Self {
Self {
chunk_stats: PRUNING_STATS.to_vec(),
chunk_strategy: Box::new(FlatLayout),
}
}
}
pub struct ChunkedLayoutWriter {
options: ChunkedLayoutOptions,
chunks: Vec<Box<dyn LayoutWriter>>,
stats_accumulator: StatsAccumulator,
dtype: DType,
row_count: u64,
}
impl ChunkedLayoutWriter {
pub fn new(dtype: &DType, options: ChunkedLayoutOptions) -> Self {
let stats_accumulator = StatsAccumulator::new(dtype.clone(), options.chunk_stats.clone());
Self {
options,
chunks: Vec::new(),
stats_accumulator,
dtype: dtype.clone(),
row_count: 0,
}
}
}
impl LayoutWriter for ChunkedLayoutWriter {
fn push_chunk(
&mut self,
segments: &mut dyn SegmentWriter,
chunk: ArrayData,
) -> VortexResult<()> {
self.row_count += chunk.len() as u64;
self.stats_accumulator.push_chunk(&chunk)?;
let mut chunk_writer = self.options.chunk_strategy.new_writer(chunk.dtype())?;
chunk_writer.push_chunk(segments, chunk)?;
self.chunks.push(chunk_writer);
Ok(())
}
fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<LayoutData> {
let mut children = vec![];
for writer in self.chunks.iter_mut() {
children.push(writer.finish(segments)?);
}
let stats_table = self.stats_accumulator.as_stats_table()?;
let metadata: Option<Bytes> = match stats_table {
Some(stats_table) => {
let stats_layout = FlatLayoutWriter::new(stats_table.array().dtype().clone())
.push_one(segments, stats_table.array().clone())?;
children.push(stats_layout);
Some(as_stat_bitset_bytes(stats_table.present_stats()).into())
}
None => None,
};
Ok(LayoutData::new_owned(
&ChunkedLayout,
self.dtype.clone(),
self.row_count,
None,
Some(children),
metadata,
))
}
}