vortex_file/v2/open/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
mod split_by;

use std::io::Read;
use std::sync::Arc;

use flatbuffers::root;
use itertools::Itertools;
pub use split_by::*;
use vortex_array::ContextRef;
use vortex_buffer::{ByteBuffer, ByteBufferMut};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_flatbuffers::{dtype as fbd, footer2 as fb, FlatBuffer, ReadFlatBuffer};
use vortex_io::VortexReadAt;
use vortex_layout::segments::SegmentId;
use vortex_layout::{LayoutContextRef, LayoutData, LayoutId};

use crate::v2::footer::{FileLayout, Postscript, Segment};
use crate::v2::segments::cache::SegmentCache;
use crate::v2::VortexFile;
use crate::{EOF_SIZE, MAGIC_BYTES, VERSION};

const INITIAL_READ_SIZE: u64 = 1 << 20; // 1 MB

/// Open options for a Vortex file reader.
pub struct OpenOptions {
    /// The Vortex Array encoding context.
    ctx: ContextRef,
    /// The Vortex Layout encoding context.
    layout_ctx: LayoutContextRef,
    /// An optional, externally provided, file layout.
    file_layout: Option<FileLayout>,
    /// An optional, externally provided, dtype.
    dtype: Option<DType>,
    // TODO(ngates): also support a messages_middleware that can wrap a message cache to provide
    //  additional caching, metrics, or other intercepts, etc. It should support synchronous
    //  read + write of Map<MessageId, ByteBuffer> or similar.
    initial_read_size: u64,
    split_by: SplitBy,
}

impl OpenOptions {
    pub fn new(ctx: ContextRef) -> Self {
        Self {
            ctx,
            layout_ctx: LayoutContextRef::default(),
            file_layout: None,
            dtype: None,
            initial_read_size: INITIAL_READ_SIZE,
            split_by: SplitBy::Layout,
        }
    }

    /// Configure the initial read size for the Vortex file.
    pub fn with_initial_read_size(mut self, initial_read_size: u64) -> VortexResult<Self> {
        if self.initial_read_size < u16::MAX as u64 {
            vortex_bail!("initial_read_size must be at least u16::MAX");
        }
        self.initial_read_size = initial_read_size;
        Ok(self)
    }

    /// Configure how to split the file into batches for reading.
    ///
    /// Defaults to [`SplitBy::Layout`].
    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
        self.split_by = split_by;
        self
    }
}

impl OpenOptions {
    /// Open the Vortex file using synchronous IO.
    pub fn open_sync<R: Read>(self, _read: R) -> VortexResult<VortexFile<R>> {
        todo!()
    }

    /// Open the Vortex file using asynchronous IO.
    pub async fn open<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile<R>> {
        // Fetch the file size and perform the initial read.
        let file_size = read.size().await?;
        let initial_read_size = self.initial_read_size.min(file_size);
        let initial_offset = file_size - initial_read_size;
        let initial_read: ByteBuffer = read
            .read_byte_range(initial_offset, initial_read_size)
            .await?
            .into();

        // We know the initial read _must_ contain at least the Postscript.
        let postscript = self.parse_postscript(&initial_read)?;

        // Check if we need to read more bytes.
        // NOTE(ngates): for now, we assume the dtype and layout segments are adjacent.
        let (initial_offset, initial_read) = if (self.dtype.is_none()
            && postscript.dtype.offset < initial_offset)
            || (self.file_layout.is_none() && postscript.file_layout.offset < initial_offset)
        {
            let offset = postscript.dtype.offset.min(postscript.file_layout.offset);
            let mut new_initial_read =
                ByteBufferMut::with_capacity(usize::try_from(file_size - offset)?);
            new_initial_read.extend_from_slice(
                &read
                    .read_byte_range(offset, initial_offset - offset)
                    .await?,
            );
            new_initial_read.extend_from_slice(&initial_read);
            (offset, new_initial_read.freeze())
        } else {
            (initial_offset, initial_read)
        };

        // Now we try to read the DType and Layout segments.
        let dtype = self.dtype.clone().unwrap_or_else(|| {
            self.parse_dtype(initial_offset, &initial_read, postscript.dtype)
                .vortex_expect("Failed to parse dtype")
        });
        let file_layout = self.file_layout.clone().unwrap_or_else(|| {
            self.parse_file_layout(
                initial_offset,
                &initial_read,
                postscript.file_layout,
                dtype.clone(),
            )
            .vortex_expect("Failed to parse file layout")
        });

        // Set up our segment cache and for good measure, we populate any segments that were
        // covered by the initial read.
        let mut segment_cache = SegmentCache::<R>::new(read, file_layout.segments.clone());
        self.populate_segments(
            initial_offset,
            &initial_read,
            &file_layout,
            &mut segment_cache,
        )?;

        // Compute the splits of the file.
        let splits = self.split_by.splits(&file_layout.root_layout)?.into();

        // Finally, create the VortexFile.
        Ok(VortexFile {
            ctx: self.ctx.clone(),
            layout: file_layout.root_layout,
            segments: Arc::new(segment_cache),
            splits,
        })
    }

    /// Parse the postscript from the initial read.
    fn parse_postscript(&self, initial_read: &[u8]) -> VortexResult<Postscript> {
        let eof_loc = initial_read.len() - EOF_SIZE;
        let magic_bytes_loc = eof_loc + (EOF_SIZE - MAGIC_BYTES.len());

        let magic_number = &initial_read[magic_bytes_loc..];
        if magic_number != MAGIC_BYTES {
            vortex_bail!("Malformed file, invalid magic bytes, got {magic_number:?}")
        }

        let version = u16::from_le_bytes(
            initial_read[eof_loc..eof_loc + 2]
                .try_into()
                .map_err(|e| vortex_err!("Version was not a u16 {e}"))?,
        );
        if version != VERSION {
            vortex_bail!("Malformed file, unsupported version {version}")
        }

        let ps_size = u16::from_le_bytes(
            initial_read[eof_loc + 2..eof_loc + 4]
                .try_into()
                .map_err(|e| vortex_err!("Postscript size was not a u16 {e}"))?,
        ) as usize;

        Postscript::read_flatbuffer_bytes(&initial_read[eof_loc - ps_size..eof_loc])
    }

    /// Parse the DType from the initial read.
    fn parse_dtype(
        &self,
        initial_offset: u64,
        initial_read: &ByteBuffer,
        dtype: Segment,
    ) -> VortexResult<DType> {
        let offset = usize::try_from(dtype.offset - initial_offset)?;
        let sliced_buffer =
            FlatBuffer::align_from(initial_read.slice(offset..offset + dtype.length));
        let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;

        DType::try_from_view(fbd_dtype, sliced_buffer.clone())
    }

    /// Parse the FileLayout from the initial read.
    fn parse_file_layout(
        &self,
        initial_offset: u64,
        initial_read: &ByteBuffer,
        segment: Segment,
        dtype: DType,
    ) -> VortexResult<FileLayout> {
        let offset = usize::try_from(segment.offset - initial_offset)?;
        let bytes = initial_read.slice(offset..offset + segment.length);

        let fb = root::<fb::FileLayout>(&bytes)?;
        let fb_root_layout = fb
            .root_layout()
            .ok_or_else(|| vortex_err!("FileLayout missing root layout"))?;

        let root_encoding = self
            .layout_ctx
            .lookup_layout(LayoutId(fb_root_layout.encoding()))
            .ok_or_else(|| {
                vortex_err!(
                    "FileLayout root layout encoding {} not found",
                    fb_root_layout.encoding()
                )
            })?;
        let _fb_encoding_id = fb_root_layout.encoding();
        let root_layout = LayoutData::try_new_viewed(
            root_encoding,
            dtype,
            bytes.clone(),
            fb_root_layout._tab.loc(),
            self.layout_ctx.clone(),
        )?;

        let fb_segments = fb
            .segments()
            .ok_or_else(|| vortex_err!("FileLayout missing segments"))?;
        let segments = fb_segments
            .iter()
            .map(|s| Segment::read_flatbuffer(&s))
            .try_collect()?;

        Ok(FileLayout {
            root_layout,
            segments,
        })
    }

    fn populate_segments<R>(
        &self,
        initial_offset: u64,
        initial_read: &ByteBuffer,
        file_layout: &FileLayout,
        segments: &mut SegmentCache<R>,
    ) -> VortexResult<()> {
        for (idx, segment) in file_layout.segments.iter().enumerate() {
            if segment.offset < initial_offset {
                // Skip segments that aren't in the initial read.
                continue;
            }

            let segment_id = SegmentId::from(u32::try_from(idx)?);

            let offset = usize::try_from(segment.offset - initial_offset)?;
            let bytes = initial_read.slice(offset..offset + segment.length);

            segments.set(segment_id, bytes.into_inner())?;
        }
        Ok(())
    }
}