use std::io::Write;
use futures_util::StreamExt;
use vortex_array::iter::ArrayIterator;
use vortex_array::stream::ArrayStream;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
use vortex_io::VortexWrite;
use vortex_layout::strategies::LayoutStrategy;
use crate::v2::footer::{FileLayout, Postscript, Segment};
use crate::v2::segments::writer::BufferedSegmentWriter;
use crate::v2::strategy::VortexLayoutStrategy;
use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
pub struct WriteOptions {
strategy: Box<dyn LayoutStrategy>,
}
impl Default for WriteOptions {
fn default() -> Self {
Self {
strategy: Box::new(VortexLayoutStrategy),
}
}
}
impl WriteOptions {
pub fn with_strategy(mut self, strategy: Box<dyn LayoutStrategy>) -> Self {
self.strategy = strategy;
self
}
}
impl WriteOptions {
pub fn write_sync<W: Write, I: ArrayIterator>(self, _write: W, _iter: I) -> VortexResult<()> {
todo!()
}
pub async fn write_async<W: VortexWrite, S: ArrayStream + Unpin>(
self,
write: W,
mut stream: S,
) -> VortexResult<W> {
let mut layout_writer = self.strategy.new_writer(stream.dtype())?;
let mut write = futures_util::io::Cursor::new(write);
write.write_all(MAGIC_BYTES).await?;
let mut segment_writer = BufferedSegmentWriter::default();
let mut segments = vec![];
while let Some(chunk) = stream.next().await {
layout_writer.push_chunk(&mut segment_writer, chunk?)?;
segment_writer
.flush_async(&mut write, &mut segments)
.await?;
}
let root_layout = layout_writer.finish(&mut segment_writer)?;
segment_writer
.flush_async(&mut write, &mut segments)
.await?;
let dtype_segment = self.write_flatbuffer(&mut write, stream.dtype()).await?;
let file_layout_segment = self
.write_flatbuffer(
&mut write,
&FileLayout {
root_layout,
segments: segments.into(),
},
)
.await?;
let postscript = Postscript {
dtype: dtype_segment,
file_layout: file_layout_segment,
};
let postscript_buffer = postscript.write_flatbuffer_bytes();
if postscript_buffer.len() > MAX_FOOTER_SIZE as usize {
vortex_bail!(
"Postscript is too large ({} bytes); max postscript size is {}",
postscript_buffer.len(),
MAX_FOOTER_SIZE
);
}
let postscript_len = u16::try_from(postscript_buffer.len())
.vortex_expect("Postscript already verified to fit into u16");
write.write_all(postscript_buffer).await?;
let mut eof = [0u8; EOF_SIZE];
eof[0..2].copy_from_slice(&VERSION.to_le_bytes());
eof[2..4].copy_from_slice(&postscript_len.to_le_bytes());
eof[4..8].copy_from_slice(&MAGIC_BYTES);
write.write_all(eof).await?;
Ok(write.into_inner())
}
async fn write_flatbuffer<W: VortexWrite, F: FlatBufferRoot + WriteFlatBuffer>(
&self,
write: &mut futures_util::io::Cursor<W>,
flatbuffer: &F,
) -> VortexResult<Segment> {
let layout_offset = write.position();
write.write_all(flatbuffer.write_flatbuffer_bytes()).await?;
Ok(Segment {
offset: layout_offset,
length: usize::try_from(write.position() - layout_offset)
.map_err(|_| vortex_err!("segment length exceeds maximum usize"))?,
})
}
}