vortex_array/data/
viewed.rsuse std::fmt::{Debug, Formatter};
use std::sync::Arc;
use enum_iterator::all;
use flatbuffers::Follow;
use itertools::Itertools;
use vortex_buffer::ByteBuffer;
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_err, VortexExpect as _, VortexResult};
use vortex_flatbuffers::FlatBuffer;
use vortex_scalar::{Scalar, ScalarValue};
use crate::encoding::opaque::OpaqueEncoding;
use crate::encoding::EncodingRef;
use crate::stats::{Stat, Statistics, StatsSet};
use crate::{flatbuffers as fb, ArrayData, ArrayMetadata, ChildrenCollector, ContextRef};
#[derive(Clone)]
pub(super) struct ViewedArrayData {
pub(super) encoding: EncodingRef,
pub(super) dtype: DType,
pub(super) len: usize,
pub(super) metadata: Arc<dyn ArrayMetadata>,
pub(super) flatbuffer: FlatBuffer,
pub(super) flatbuffer_loc: usize,
pub(super) buffers: Arc<[ByteBuffer]>,
pub(super) ctx: ContextRef,
#[cfg(feature = "canonical_counter")]
pub(super) canonical_counter: Arc<std::sync::atomic::AtomicUsize>,
}
impl Debug for ViewedArrayData {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ArrayView")
.field("encoding", &self.encoding)
.field("dtype", &self.dtype)
.field("buffers", &self.buffers)
.field("ctx", &self.ctx)
.finish()
}
}
impl ViewedArrayData {
pub fn flatbuffer(&self) -> fb::Array {
unsafe { fb::Array::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
}
pub fn metadata_bytes(&self) -> Option<&[u8]> {
self.flatbuffer().metadata().map(|m| m.bytes())
}
pub fn child(&self, idx: usize, dtype: &DType, len: usize) -> VortexResult<Self> {
let child = self
.array_child(idx)
.ok_or_else(|| vortex_err!("ArrayView: array_child({idx}) not found"))?;
let flatbuffer_loc = child._tab.loc();
let encoding = self
.ctx
.lookup_encoding(child.encoding())
.unwrap_or_else(|| {
Box::leak(Box::new(OpaqueEncoding(child.encoding())))
});
let metadata = encoding.load_metadata(child.metadata().map(|m| m.bytes()))?;
Ok(Self {
encoding,
dtype: dtype.clone(),
len,
metadata,
flatbuffer: self.flatbuffer.clone(),
flatbuffer_loc,
buffers: self.buffers.clone(),
ctx: self.ctx.clone(),
#[cfg(feature = "canonical_counter")]
canonical_counter: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
})
}
fn array_child(&self, idx: usize) -> Option<fb::Array> {
let children = self.flatbuffer().children()?;
(idx < children.len()).then(|| children.get(idx))
}
pub fn nchildren(&self) -> usize {
self.flatbuffer().children().map(|c| c.len()).unwrap_or(0)
}
pub fn children(&self) -> Vec<ArrayData> {
let mut collector = ChildrenCollector::default();
self.encoding
.accept(&ArrayData::from(self.clone()), &mut collector)
.vortex_expect("Failed to get children");
collector.children()
}
pub fn nbuffers(&self) -> usize {
self.flatbuffer()
.buffers()
.map(|b| b.len())
.unwrap_or_default()
}
pub fn buffer(&self, index: usize) -> Option<&ByteBuffer> {
self.flatbuffer()
.buffers()
.map(|buffers| {
assert!(
index < buffers.len(),
"ArrayView buffer index out of bounds"
);
buffers.get(index) as usize
})
.map(|idx| &self.buffers[idx])
}
}
impl Statistics for ViewedArrayData {
fn get(&self, stat: Stat) -> Option<Scalar> {
match stat {
Stat::Max => {
let max = self.flatbuffer().stats()?.max();
max.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::Min => {
let min = self.flatbuffer().stats()?.min();
min.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::IsConstant => self.flatbuffer().stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.flatbuffer().stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self
.flatbuffer()
.stats()?
.is_strict_sorted()
.map(bool::into),
Stat::RunCount => self.flatbuffer().stats()?.run_count().map(u64::into),
Stat::TrueCount => self.flatbuffer().stats()?.true_count().map(u64::into),
Stat::NullCount => self.flatbuffer().stats()?.null_count().map(u64::into),
Stat::BitWidthFreq => {
let element_dtype =
Arc::new(DType::Primitive(PType::U64, Nullability::NonNullable));
self.flatbuffer()
.stats()?
.bit_width_freq()
.map(|v| v.iter().map(Scalar::from).collect_vec())
.map(|v| Scalar::list(element_dtype, v, Nullability::NonNullable))
}
Stat::TrailingZeroFreq => self
.flatbuffer()
.stats()?
.trailing_zero_freq()
.map(|v| v.iter().collect_vec())
.map(|v| v.into()),
Stat::UncompressedSizeInBytes => self
.flatbuffer()
.stats()?
.uncompressed_size_in_bytes()
.map(u64::into),
}
}
fn to_set(&self) -> StatsSet {
let mut result = StatsSet::default();
for stat in all::<Stat>() {
if let Some(value) = self.get(stat) {
result.set(stat, value)
}
}
result
}
fn set(&self, _stat: Stat, _value: Scalar) {
}
fn clear(&self, _stat: Stat) {
}
fn retain_only(&self, _stats: &[Stat]) {
}
fn compute(&self, stat: Stat) -> Option<Scalar> {
if let Some(s) = self.get(stat) {
return Some(s);
}
self.encoding
.compute_statistics(&ArrayData::from(self.clone()), stat)
.ok()?
.get(stat)
.cloned()
}
}