vortex_layout/layouts/chunked/
writer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use bytes::Bytes;
use vortex_array::stats::{as_stat_bitset_bytes, Stat, PRUNING_STATS};
use vortex_array::{ArrayDType, ArrayData};
use vortex_dtype::DType;
use vortex_error::VortexResult;

use crate::data::LayoutData;
use crate::layouts::chunked::stats_table::StatsAccumulator;
use crate::layouts::chunked::ChunkedLayout;
use crate::layouts::flat::writer::FlatLayoutWriter;
use crate::layouts::flat::FlatLayout;
use crate::segments::SegmentWriter;
use crate::strategies::{LayoutStrategy, LayoutWriter, LayoutWriterExt};

pub struct ChunkedLayoutOptions {
    /// The statistics to collect for each chunk.
    pub chunk_stats: Vec<Stat>,
    /// The layout strategy for each chunk.
    pub chunk_strategy: Box<dyn LayoutStrategy>,
}

impl Default for ChunkedLayoutOptions {
    fn default() -> Self {
        Self {
            chunk_stats: PRUNING_STATS.to_vec(),
            chunk_strategy: Box::new(FlatLayout),
        }
    }
}

/// A basic implementation of a chunked layout writer that writes each batch into its own chunk.
///
/// TODO(ngates): introduce more sophisticated layout writers with different chunking strategies.
pub struct ChunkedLayoutWriter {
    options: ChunkedLayoutOptions,
    chunks: Vec<Box<dyn LayoutWriter>>,
    stats_accumulator: StatsAccumulator,
    dtype: DType,
    row_count: u64,
}

impl ChunkedLayoutWriter {
    pub fn new(dtype: &DType, options: ChunkedLayoutOptions) -> Self {
        let stats_accumulator = StatsAccumulator::new(dtype.clone(), options.chunk_stats.clone());
        Self {
            options,
            chunks: Vec::new(),
            stats_accumulator,
            dtype: dtype.clone(),
            row_count: 0,
        }
    }
}

impl LayoutWriter for ChunkedLayoutWriter {
    fn push_chunk(
        &mut self,
        segments: &mut dyn SegmentWriter,
        chunk: ArrayData,
    ) -> VortexResult<()> {
        self.row_count += chunk.len() as u64;
        self.stats_accumulator.push_chunk(&chunk)?;

        // We write each chunk, but don't call finish quite yet to ensure that chunks have an
        // opportunity to write messages at the end of the file.
        let mut chunk_writer = self.options.chunk_strategy.new_writer(chunk.dtype())?;
        chunk_writer.push_chunk(segments, chunk)?;
        self.chunks.push(chunk_writer);

        Ok(())
    }

    fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<LayoutData> {
        // Call finish on each chunk's writer
        let mut children = vec![];
        for writer in self.chunks.iter_mut() {
            children.push(writer.finish(segments)?);
        }

        // Collect together the statistics
        let stats_table = self.stats_accumulator.as_stats_table()?;
        let metadata: Option<Bytes> = match stats_table {
            Some(stats_table) => {
                // Write the stats array as the final layout.
                let stats_layout = FlatLayoutWriter::new(stats_table.array().dtype().clone())
                    .push_one(segments, stats_table.array().clone())?;
                children.push(stats_layout);

                // We store a bit-set of the statistics in the layout metadata so we can infer the
                // statistics array schema when reading the layout.
                Some(as_stat_bitset_bytes(stats_table.present_stats()).into())
            }
            None => None,
        };

        Ok(LayoutData::new_owned(
            &ChunkedLayout,
            self.dtype.clone(),
            self.row_count,
            None,
            Some(children),
            metadata,
        ))
    }
}