vortex_layout/segments/
mod.rsuse std::ops::Deref;
use async_trait::async_trait;
use bytes::Bytes;
use vortex_array::ArrayData;
use vortex_error::VortexResult;
use vortex_ipc::messages::{EncoderMessage, MessageEncoder};
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SegmentId(u32);
impl From<u32> for SegmentId {
fn from(value: u32) -> Self {
Self(value)
}
}
impl Deref for SegmentId {
type Target = u32;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[async_trait]
pub trait AsyncSegmentReader: Send + Sync {
async fn get(&self, id: SegmentId) -> VortexResult<Bytes>;
}
pub trait SegmentWriter {
fn put(&mut self, data: Vec<Bytes>) -> SegmentId;
fn put_chunk(&mut self, array: ArrayData) -> SegmentId {
self.put(MessageEncoder::default().encode(EncoderMessage::Array(&array)))
}
}
#[cfg(test)]
pub mod test {
use bytes::{Bytes, BytesMut};
use vortex_error::{vortex_err, VortexExpect};
use super::*;
#[derive(Default)]
pub struct TestSegments {
segments: Vec<Bytes>,
}
impl SegmentWriter for TestSegments {
fn put(&mut self, data: Vec<Bytes>) -> SegmentId {
let id = u32::try_from(self.segments.len())
.vortex_expect("Cannot store more than u32::MAX segments");
let mut buffer = BytesMut::with_capacity(data.iter().map(Bytes::len).sum());
for bytes in data {
buffer.extend_from_slice(&bytes);
}
self.segments.push(buffer.freeze());
id.into()
}
}
#[async_trait]
impl AsyncSegmentReader for TestSegments {
async fn get(&self, id: SegmentId) -> VortexResult<Bytes> {
self.segments
.get(*id as usize)
.cloned()
.ok_or_else(|| vortex_err!("Segment not found"))
}
}
}