use std::fmt::{Debug, Display};
pub use compress::*;
use serde::{Deserialize, Serialize};
use vortex_array::array::PrimitiveArray;
use vortex_array::encoding::ids;
use vortex_array::stats::{StatisticsVTable, StatsSet};
use vortex_array::validity::{LogicalValidity, Validity, ValidityMetadata, ValidityVTable};
use vortex_array::variants::{PrimitiveArrayTrait, VariantsVTable};
use vortex_array::visitor::{ArrayVisitor, VisitorVTable};
use vortex_array::{
impl_encoding, ArrayDType, ArrayData, ArrayLen, ArrayTrait, Canonical, IntoArrayData,
IntoCanonical,
};
use vortex_dtype::{match_each_unsigned_integer_ptype, NativePType};
use vortex_error::{vortex_bail, vortex_panic, VortexExpect as _, VortexResult};
mod compress;
mod compute;
impl_encoding!("fastlanes.delta", ids::FL_DELTA, Delta);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaMetadata {
validity: ValidityMetadata,
deltas_len: u64,
offset: u16, }
impl Display for DeltaMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}
impl DeltaArray {
pub fn try_from_vec<T: NativePType>(vec: Vec<T>) -> VortexResult<Self> {
Self::try_from_primitive_array(&PrimitiveArray::from(vec))
}
pub fn try_from_primitive_array(array: &PrimitiveArray) -> VortexResult<Self> {
let (bases, deltas) = delta_compress(array)?;
Self::try_from_delta_compress_parts(
bases.into_array(),
deltas.into_array(),
Validity::NonNullable,
)
}
pub fn try_from_delta_compress_parts(
bases: ArrayData,
deltas: ArrayData,
validity: Validity,
) -> VortexResult<Self> {
let logical_len = deltas.len();
Self::try_new(bases, deltas, validity, 0, logical_len)
}
pub fn try_new(
bases: ArrayData,
deltas: ArrayData,
validity: Validity,
offset: usize,
logical_len: usize,
) -> VortexResult<Self> {
if offset >= 1024 {
vortex_bail!("offset must be less than 1024: {}", offset);
}
if offset + logical_len > deltas.len() {
vortex_bail!(
"offset + logical_len, {} + {}, must be less than or equal to the size of deltas: {}",
offset,
logical_len,
deltas.len()
)
}
if bases.dtype() != deltas.dtype() {
vortex_bail!(
"DeltaArray: bases and deltas must have the same dtype, got {:?} and {:?}",
bases.dtype(),
deltas.dtype()
);
}
let dtype = bases.dtype().clone();
if !dtype.is_int() {
vortex_bail!("DeltaArray: dtype must be an integer, got {}", dtype);
}
let metadata = DeltaMetadata {
validity: validity.to_metadata(logical_len)?,
deltas_len: deltas.len() as u64,
offset: offset as u16,
};
let mut children = vec![bases, deltas];
if let Some(varray) = validity.into_array() {
children.push(varray)
}
let delta = Self::try_from_parts(
dtype,
logical_len,
metadata,
children.into(),
StatsSet::default(),
)?;
if delta.bases().len() != delta.bases_len() {
vortex_bail!(
"DeltaArray: bases.len() ({}) != expected_bases_len ({}), based on len ({}) and lane count ({})",
delta.bases().len(),
delta.bases_len(),
logical_len,
delta.lanes()
);
}
if (delta.deltas_len() % 1024 == 0) != (delta.bases_len() % delta.lanes() == 0) {
vortex_bail!(
"deltas length ({}) is a multiple of 1024 iff bases length ({}) is a multiple of LANES ({})",
delta.deltas_len(),
delta.bases_len(),
delta.lanes(),
);
}
Ok(delta)
}
#[inline]
pub fn bases(&self) -> ArrayData {
self.as_ref()
.child(0, self.dtype(), self.bases_len())
.vortex_expect("DeltaArray is missing bases child array")
}
#[inline]
pub fn deltas(&self) -> ArrayData {
self.as_ref()
.child(1, self.dtype(), self.deltas_len())
.vortex_expect("DeltaArray is missing deltas child array")
}
#[inline]
fn lanes(&self) -> usize {
let ptype = self.dtype().try_into().unwrap_or_else(|err| {
vortex_panic!(
err,
"Failed to convert DeltaArray DType {} to PType",
self.dtype()
)
});
match_each_unsigned_integer_ptype!(ptype, |$T| {
<$T as fastlanes::FastLanes>::LANES
})
}
#[inline]
pub fn offset(&self) -> usize {
self.metadata().offset as usize
}
pub fn validity(&self) -> Validity {
self.metadata().validity.to_validity(|| {
self.as_ref()
.child(2, &Validity::DTYPE, self.len())
.vortex_expect("DeltaArray: validity child")
})
}
fn bases_len(&self) -> usize {
let num_chunks = self.deltas().len() / 1024;
let remainder_base_size = if self.deltas().len() % 1024 > 0 { 1 } else { 0 };
num_chunks * self.lanes() + remainder_base_size
}
fn deltas_len(&self) -> usize {
self.metadata().deltas_len as usize
}
}
impl ArrayTrait for DeltaArray {}
impl VariantsVTable<DeltaArray> for DeltaEncoding {
fn as_primitive_array<'a>(&self, array: &'a DeltaArray) -> Option<&'a dyn PrimitiveArrayTrait> {
Some(array)
}
}
impl PrimitiveArrayTrait for DeltaArray {}
impl IntoCanonical for DeltaArray {
fn into_canonical(self) -> VortexResult<Canonical> {
delta_decompress(self).map(Canonical::Primitive)
}
}
impl ValidityVTable<DeltaArray> for DeltaEncoding {
fn is_valid(&self, array: &DeltaArray, index: usize) -> bool {
array.validity().is_valid(index)
}
fn logical_validity(&self, array: &DeltaArray) -> LogicalValidity {
array.validity().to_logical(array.len())
}
}
impl VisitorVTable<DeltaArray> for DeltaEncoding {
fn accept(&self, array: &DeltaArray, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("bases", &array.bases())?;
visitor.visit_child("deltas", &array.deltas())
}
}
impl StatisticsVTable<DeltaArray> for DeltaEncoding {}