vortex_layout/layouts/struct_/
writer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use itertools::Itertools;
use vortex_array::ArrayData;
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult};

use crate::data::LayoutData;
use crate::layouts::struct_::StructLayout;
use crate::segments::SegmentWriter;
use crate::strategies::{LayoutStrategy, LayoutWriter};

/// A [`LayoutWriter`] that splits a StructArray batch into child layout writers
pub struct StructLayoutWriter {
    column_strategies: Vec<Box<dyn LayoutWriter>>,
    dtype: DType,
    row_count: u64,
}

impl StructLayoutWriter {
    pub fn new(dtype: DType, column_layout_writers: Vec<Box<dyn LayoutWriter>>) -> Self {
        let struct_dtype = dtype.as_struct().vortex_expect("dtype is not a struct");
        if struct_dtype.dtypes().len() != column_layout_writers.len() {
            vortex_panic!(
                "number of fields in struct dtype does not match number of column layout writers"
            );
        }
        Self {
            column_strategies: column_layout_writers,
            dtype,
            row_count: 0,
        }
    }

    pub fn try_new_with_factory<F: LayoutStrategy>(
        dtype: &DType,
        factory: F,
    ) -> VortexResult<Self> {
        let struct_dtype = dtype.as_struct().vortex_expect("dtype is not a struct");
        Ok(Self::new(
            dtype.clone(),
            struct_dtype
                .dtypes()
                .map(|dtype| factory.new_writer(&dtype))
                .try_collect()?,
        ))
    }
}

impl LayoutWriter for StructLayoutWriter {
    fn push_chunk(
        &mut self,
        segments: &mut dyn SegmentWriter,
        chunk: ArrayData,
    ) -> VortexResult<()> {
        let struct_array = chunk
            .as_struct_array()
            .ok_or_else(|| vortex_err!("batch is not a struct array"))?;

        if struct_array.nfields() != self.column_strategies.len() {
            vortex_bail!(
                "number of fields in struct array does not match number of column layout writers"
            );
        }
        self.row_count += struct_array.len() as u64;

        for i in 0..struct_array.nfields() {
            let column = chunk
                .as_struct_array()
                .vortex_expect("batch is a struct array")
                .maybe_null_field_by_idx(i)
                .vortex_expect("bounds already checked");
            self.column_strategies[i].push_chunk(segments, column)?;
        }

        Ok(())
    }

    fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<LayoutData> {
        let mut column_layouts = vec![];
        for writer in self.column_strategies.iter_mut() {
            column_layouts.push(writer.finish(segments)?);
        }
        Ok(LayoutData::new_owned(
            &StructLayout,
            self.dtype.clone(),
            self.row_count,
            None,
            Some(column_layouts),
            None,
        ))
    }
}