vortex_file/v2/
writer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::io::Write;

use futures_util::StreamExt;
use vortex_array::iter::ArrayIterator;
use vortex_array::stream::ArrayStream;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
use vortex_io::VortexWrite;
use vortex_layout::strategies::LayoutStrategy;

use crate::v2::footer::{FileLayout, Postscript, Segment};
use crate::v2::segments::writer::BufferedSegmentWriter;
use crate::v2::strategy::VortexLayoutStrategy;
use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};

pub struct WriteOptions {
    strategy: Box<dyn LayoutStrategy>,
}

impl Default for WriteOptions {
    fn default() -> Self {
        Self {
            strategy: Box::new(VortexLayoutStrategy),
        }
    }
}

impl WriteOptions {
    /// Replace the default layout strategy with the provided one.
    pub fn with_strategy(mut self, strategy: Box<dyn LayoutStrategy>) -> Self {
        self.strategy = strategy;
        self
    }
}

impl WriteOptions {
    /// Perform a blocking write of the provided iterator of `ArrayData`.
    pub fn write_sync<W: Write, I: ArrayIterator>(self, _write: W, _iter: I) -> VortexResult<()> {
        todo!()
    }

    /// Perform an async write of the provided stream of `ArrayData`.
    pub async fn write_async<W: VortexWrite, S: ArrayStream + Unpin>(
        self,
        write: W,
        mut stream: S,
    ) -> VortexResult<W> {
        // Set up the root layout
        let mut layout_writer = self.strategy.new_writer(stream.dtype())?;

        // First we write the magic number
        let mut write = futures_util::io::Cursor::new(write);
        write.write_all(MAGIC_BYTES).await?;

        // Our buffered message writer accumulates messages for each batch so we can flush them
        // into the file.
        let mut segment_writer = BufferedSegmentWriter::default();
        let mut segments = vec![];

        // Then write the stream via the root layout
        while let Some(chunk) = stream.next().await {
            layout_writer.push_chunk(&mut segment_writer, chunk?)?;
            // NOTE(ngates): we could spawn this task and continue to compress the next chunk.
            segment_writer
                .flush_async(&mut write, &mut segments)
                .await?;
        }

        // Flush the final layout messages into the file
        let root_layout = layout_writer.finish(&mut segment_writer)?;
        segment_writer
            .flush_async(&mut write, &mut segments)
            .await?;

        // Write the DType + FileLayout segments
        let dtype_segment = self.write_flatbuffer(&mut write, stream.dtype()).await?;
        let file_layout_segment = self
            .write_flatbuffer(
                &mut write,
                &FileLayout {
                    root_layout,
                    segments: segments.into(),
                },
            )
            .await?;

        // Assemble the postscript, and write it manually to avoid any framing.
        let postscript = Postscript {
            dtype: dtype_segment,
            file_layout: file_layout_segment,
        };
        let postscript_buffer = postscript.write_flatbuffer_bytes();
        if postscript_buffer.len() > MAX_FOOTER_SIZE as usize {
            vortex_bail!(
                "Postscript is too large ({} bytes); max postscript size is {}",
                postscript_buffer.len(),
                MAX_FOOTER_SIZE
            );
        }
        let postscript_len = u16::try_from(postscript_buffer.len())
            .vortex_expect("Postscript already verified to fit into u16");
        write.write_all(postscript_buffer).await?;

        // And finally, the EOF 8-byte footer.
        let mut eof = [0u8; EOF_SIZE];
        eof[0..2].copy_from_slice(&VERSION.to_le_bytes());
        eof[2..4].copy_from_slice(&postscript_len.to_le_bytes());
        eof[4..8].copy_from_slice(&MAGIC_BYTES);
        write.write_all(eof).await?;

        Ok(write.into_inner())
    }

    async fn write_flatbuffer<W: VortexWrite, F: FlatBufferRoot + WriteFlatBuffer>(
        &self,
        write: &mut futures_util::io::Cursor<W>,
        flatbuffer: &F,
    ) -> VortexResult<Segment> {
        let layout_offset = write.position();
        write.write_all(flatbuffer.write_flatbuffer_bytes()).await?;
        Ok(Segment {
            offset: layout_offset,
            length: usize::try_from(write.position() - layout_offset)
                .map_err(|_| vortex_err!("segment length exceeds maximum usize"))?,
        })
    }
}