use arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer, ScalarBuffer};
use vortex_dtype::{DType, Nullability, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, ErrString, VortexExpect, VortexResult};
use crate::array::chunked::ChunkedArray;
use crate::array::extension::ExtensionArray;
use crate::array::null::NullArray;
use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
use crate::array::{BinaryView, BoolArray, ListArray, VarBinViewArray};
use crate::compute::{scalar_at, slice, try_cast};
use crate::validity::Validity;
use crate::{
ArrayDType, ArrayData, ArrayLen, ArrayValidity, Canonical, IntoArrayData, IntoArrayVariant,
IntoCanonical,
};
impl IntoCanonical for ChunkedArray {
fn into_canonical(self) -> VortexResult<Canonical> {
let validity = if self.dtype().is_nullable() {
self.logical_validity().into_validity()
} else {
Validity::NonNullable
};
try_canonicalize_chunks(self.chunks().collect(), validity, self.dtype())
}
}
pub(crate) fn try_canonicalize_chunks(
chunks: Vec<ArrayData>,
validity: Validity,
dtype: &DType,
) -> VortexResult<Canonical> {
let mismatched = chunks
.iter()
.filter(|chunk| !chunk.dtype().eq(dtype))
.collect::<Vec<_>>();
if !mismatched.is_empty() {
vortex_bail!(MismatchedTypes: dtype.clone(), ErrString::from(format!("{:?}", mismatched)))
}
match dtype {
DType::Struct(struct_dtype, _) => {
let struct_array = swizzle_struct_chunks(chunks.as_slice(), validity, struct_dtype)?;
Ok(Canonical::Struct(struct_array))
}
DType::Extension(ext_dtype) => {
let storage_chunks: Vec<ArrayData> = chunks
.iter()
.map(|chunk| chunk.clone().into_extension().map(|ext| ext.storage()))
.collect::<VortexResult<Vec<ArrayData>>>()?;
let storage_dtype = ext_dtype.storage_dtype().clone();
let chunked_storage =
ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();
Ok(Canonical::Extension(ExtensionArray::new(
ext_dtype.clone(),
chunked_storage,
)))
}
DType::List(..) => {
let list = pack_lists(chunks.as_slice(), validity, dtype)?;
Ok(Canonical::List(list))
}
DType::Bool(_) => {
let bool_array = pack_bools(chunks.as_slice(), validity)?;
Ok(Canonical::Bool(bool_array))
}
DType::Primitive(ptype, _) => {
let prim_array = pack_primitives(chunks.as_slice(), *ptype, validity)?;
Ok(Canonical::Primitive(prim_array))
}
DType::Utf8(_) => {
let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?;
Ok(Canonical::VarBinView(varbin_array))
}
DType::Binary(_) => {
let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?;
Ok(Canonical::VarBinView(varbin_array))
}
DType::Null => {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let null_array = NullArray::new(len);
Ok(Canonical::Null(null_array))
}
}
}
fn pack_lists(chunks: &[ArrayData], validity: Validity, dtype: &DType) -> VortexResult<ListArray> {
let len: usize = chunks.iter().map(|c| c.len()).sum();
let mut offsets = Vec::with_capacity(len + 1);
offsets.push(0);
let mut elements = Vec::new();
let elem_dtype = dtype
.as_list_element()
.vortex_expect("ListArray must have List dtype");
for chunk in chunks {
let chunk = chunk.clone().into_list()?;
let offsets_arr = try_cast(
chunk.offsets(),
&DType::Primitive(PType::I64, Nullability::NonNullable),
)?
.into_primitive()?;
let first_offset_value: usize = usize::try_from(&scalar_at(offsets_arr.as_ref(), 0)?)?;
let last_offset_value: usize =
usize::try_from(&scalar_at(offsets_arr.as_ref(), offsets_arr.len() - 1)?)?;
elements.push(slice(
chunk.elements(),
first_offset_value,
last_offset_value,
)?);
let adjustment_from_previous = *offsets
.last()
.ok_or_else(|| vortex_err!("List offsets must have at least one element"))?;
offsets.extend(
offsets_arr
.maybe_null_slice::<i64>()
.iter()
.skip(1)
.map(|off| off + adjustment_from_previous - first_offset_value as i64),
);
}
let chunked_elements = ChunkedArray::try_new(elements, elem_dtype.clone())?.into_array();
let offsets = PrimitiveArray::from_vec(offsets, Validity::NonNullable);
ListArray::try_new(chunked_elements, offsets.into_array(), validity)
}
fn swizzle_struct_chunks(
chunks: &[ArrayData],
validity: Validity,
struct_dtype: &StructDType,
) -> VortexResult<StructArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut field_arrays = Vec::new();
for (field_idx, field_dtype) in struct_dtype.dtypes().iter().enumerate() {
let field_chunks = chunks.iter().map(|c| c.as_struct_array()
.vortex_expect("Chunk was not a StructArray")
.field(field_idx)
.ok_or_else(|| vortex_err!("All chunks must have same dtype; missing field at index {}, current chunk dtype: {}", field_idx, c.dtype()))
).collect::<VortexResult<Vec<_>>>()?;
let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
field_arrays.push(field_array.into_array());
}
StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)
}
fn pack_bools(chunks: &[ArrayData], validity: Validity) -> VortexResult<BoolArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut buffer = BooleanBufferBuilder::new(len);
for chunk in chunks {
let chunk = chunk.clone().into_bool()?;
buffer.append_buffer(&chunk.boolean_buffer());
}
BoolArray::try_new(buffer.finish(), validity)
}
fn pack_primitives(
chunks: &[ArrayData],
ptype: PType,
validity: Validity,
) -> VortexResult<PrimitiveArray> {
let len: usize = chunks.iter().map(|chunk| chunk.len()).sum();
let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width());
for chunk in chunks {
let chunk = chunk.clone().into_primitive()?;
buffer.extend_from_slice(chunk.buffer());
}
Ok(PrimitiveArray::new(
Buffer::from(buffer).into(),
ptype,
validity,
))
}
fn pack_views(
chunks: &[ArrayData],
dtype: &DType,
validity: Validity,
) -> VortexResult<VarBinViewArray> {
let total_len = chunks.iter().map(|a| a.len()).sum();
let mut views: Vec<u128> = Vec::with_capacity(total_len);
let mut buffers = Vec::new();
for chunk in chunks {
let buffers_offset = u32::try_from(buffers.len())?;
let canonical_chunk = chunk.clone().into_varbinview()?;
for buffer in canonical_chunk.buffers() {
let canonical_buffer = buffer.into_canonical()?.into_primitive()?.into_array();
buffers.push(canonical_buffer);
}
for view in canonical_chunk.binary_views()? {
if view.is_inlined() {
views.push(view.as_u128());
} else {
let view_ref = view.as_view();
views.push(
BinaryView::new_view(
view.len(),
*view_ref.prefix(),
buffers_offset + view_ref.buffer_index(),
view_ref.offset(),
)
.as_u128(),
);
}
}
}
let views_buffer: Buffer = ScalarBuffer::<u128>::from(views).into_inner();
VarBinViewArray::try_new(
ArrayData::from(views_buffer),
buffers,
dtype.clone(),
validity,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use vortex_dtype::DType;
use vortex_dtype::DType::{List, Primitive};
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::PType::I32;
use crate::accessor::ArrayAccessor;
use crate::array::chunked::canonical::pack_views;
use crate::array::{ChunkedArray, ListArray, StructArray, VarBinViewArray};
use crate::compute::{scalar_at, slice};
use crate::validity::Validity;
use crate::variants::StructArrayTrait;
use crate::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant, ToArrayData};
fn stringview_array() -> VarBinViewArray {
VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"])
}
#[test]
pub fn pack_sliced_varbin() {
let array1 = slice(stringview_array().as_ref(), 1, 3).unwrap();
let array2 = slice(stringview_array().as_ref(), 2, 4).unwrap();
let packed = pack_views(
&[array1, array2],
&DType::Utf8(NonNullable),
Validity::NonNullable,
)
.unwrap();
assert_eq!(packed.len(), 4);
let values = packed
.with_iterator(|iter| {
iter.flatten()
.map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) })
.collect::<Vec<_>>()
})
.unwrap();
assert_eq!(values, &["bar", "baz", "baz", "quak"]);
}
#[test]
pub fn pack_nested_structs() {
let struct_array = StructArray::try_new(
vec!["a".into()].into(),
vec![stringview_array().into_array()],
4,
Validity::NonNullable,
)
.unwrap();
let dtype = struct_array.dtype().clone();
let chunked = ChunkedArray::try_new(
vec![
ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
.unwrap()
.into_array(),
],
dtype,
)
.unwrap()
.into_array();
let canonical_struct = chunked.into_struct().unwrap();
let canonical_varbin = canonical_struct
.field(0)
.unwrap()
.into_varbinview()
.unwrap();
let original_varbin = struct_array.field(0).unwrap().into_varbinview().unwrap();
let orig_values = original_varbin
.with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
.unwrap();
let canon_values = canonical_varbin
.with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
.unwrap();
assert_eq!(orig_values, canon_values);
}
#[test]
pub fn pack_nested_lists() {
let l1 = ListArray::try_new(
vec![1, 2, 3, 4].into_array(),
vec![0, 3].into_array(),
Validity::NonNullable,
)
.unwrap();
let l2 = ListArray::try_new(
vec![5, 6].into_array(),
vec![0, 2].into_array(),
Validity::NonNullable,
)
.unwrap();
let chunked_list = ChunkedArray::try_new(
vec![l1.clone().into_array(), l2.clone().into_array()],
List(Arc::new(Primitive(I32, NonNullable)), NonNullable),
);
let canon_values = chunked_list.unwrap().into_list().unwrap();
assert_eq!(
scalar_at(l1, 0).unwrap(),
scalar_at(canon_values.clone(), 0).unwrap()
);
assert_eq!(
scalar_at(l2, 0).unwrap(),
scalar_at(canon_values, 1).unwrap()
);
}
}