vortex_layout/layouts/chunked/
stats_table.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::sync::Arc;

use itertools::Itertools;
use vortex_array::array::StructArray;
use vortex_array::builders::{builder_with_capacity, ArrayBuilder, ArrayBuilderExt};
use vortex_array::stats::{ArrayStatistics as _, Stat};
use vortex_array::validity::{ArrayValidity, Validity};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_dtype::{DType, Nullability, StructDType};
use vortex_error::{vortex_bail, VortexResult};

/// A table of statistics for a column.
/// Each row of the stats table corresponds to a chunk of the column.
///
/// Note that it's possible for the stats table to have no statistics.
#[derive(Clone)]
pub struct StatsTable {
    // The DType of the column for which these stats are computed.
    column_dtype: DType,
    // The struct array backing the stats table
    array: ArrayData,
    // The statistics that are included in the table.
    stats: Arc<[Stat]>,
}

impl StatsTable {
    pub fn try_new(
        column_dtype: DType,
        array: ArrayData,
        stats: Arc<[Stat]>,
    ) -> VortexResult<Self> {
        if &Self::dtype_for_stats_table(&column_dtype, &stats) != array.dtype() {
            vortex_bail!("Array dtype does not match expected stats table dtype");
        }
        Ok(Self {
            column_dtype,
            array,
            stats,
        })
    }

    /// Returns the DType of the statistics table given a set of statistics and column [`DType`].
    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
        let dtypes = present_stats
            .iter()
            .map(|s| s.dtype(column_dtype).as_nullable())
            .collect();
        DType::Struct(
            StructDType::new(
                present_stats.iter().map(|s| s.name().into()).collect(),
                dtypes,
            ),
            Nullability::NonNullable,
        )
    }

    /// The DType of the column for which these stats are computed.
    pub fn column_dtype(&self) -> &DType {
        &self.column_dtype
    }

    /// The struct array backing the stats table
    pub fn array(&self) -> &ArrayData {
        &self.array
    }

    /// The statistics that are included in the table.
    pub fn present_stats(&self) -> &[Stat] {
        &self.stats
    }
}

/// Accumulates statistics for a column.
///
/// TODO(ngates): we should make it such that the stats table stores a mirror of the DType
///  underneath each stats column. For example, `min: i32` for an `i32` array.
///  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
///  See: <https://github.com/spiraldb/vortex/issues/1835>
pub struct StatsAccumulator {
    column_dtype: DType,
    stats: Vec<Stat>,
    builders: Vec<Box<dyn ArrayBuilder>>,
    length: usize,
}

impl StatsAccumulator {
    pub fn new(dtype: DType, mut stats: Vec<Stat>) -> Self {
        // Sort stats by their ordinal so we can recreate their dtype from bitset
        stats.sort_by_key(|s| u8::from(*s));
        let builders = stats
            .iter()
            .map(|s| builder_with_capacity(&s.dtype(&dtype).as_nullable(), 1024))
            .collect();
        Self {
            column_dtype: dtype,
            stats,
            builders,
            length: 0,
        }
    }

    pub fn push_chunk(&mut self, array: &ArrayData) -> VortexResult<()> {
        for (s, builder) in self.stats.iter().zip_eq(self.builders.iter_mut()) {
            if let Some(v) = array.statistics().compute(*s) {
                builder.append_scalar(&v.cast(builder.dtype())?)?;
            } else {
                builder.append_null();
            }
        }
        self.length += 1;
        Ok(())
    }

    /// Finishes the accumulator into a [`StatsTable`].
    ///
    /// Returns `None` if none of the requested statistics can be computed, for example they are
    /// not applicable to the column's data type.
    pub fn as_stats_table(&mut self) -> VortexResult<Option<StatsTable>> {
        let mut names = Vec::new();
        let mut fields = Vec::new();
        let mut stats = Vec::new();

        for (stat, builder) in self.stats.iter().zip(self.builders.iter_mut()) {
            let values = builder
                .finish()
                .map_err(|e| e.with_context(format!("Failed to finish stat builder for {stat}")))?;

            // We drop any all-null stats columns
            if values.logical_validity().null_count()? == values.len() {
                continue;
            }

            stats.push(*stat);
            names.push(stat.to_string().into());
            fields.push(values);
        }

        if names.is_empty() {
            return Ok(None);
        }

        Ok(Some(StatsTable {
            column_dtype: self.column_dtype.clone(),
            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)?
                .into_array(),
            stats: stats.into(),
        }))
    }
}