1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
use std::future::{self, Future};
use std::io;
use std::sync::Arc;
use vortex_buffer::Buffer;
use vortex_error::{vortex_err, VortexUnwrap};
/// A trait for types that support asynchronous reads.
///
/// References to the type must be safe to [share across threads][Send], but spawned
/// futures may be `!Send` to support thread-per-core implementations.
///
/// Readers must be cheaply cloneable to allow for easy sharing across tasks or threads.
pub trait VortexReadAt: Send + Sync + Clone + 'static {
/// Request an asynchronous positional read. Results will be returned as a [`Buffer`].
///
/// If the reader does not have the requested number of bytes, the returned Future will complete
/// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof].
///
/// ## Thread Safety
///
/// The resultant Future need not be [`Send`], allowing implementations that use thread-per-core
/// executors.
fn read_byte_range(
&self,
pos: u64,
len: u64,
) -> impl Future<Output = io::Result<Buffer>> + 'static;
// TODO(ngates): the read implementation should be able to hint at its latency/throughput
// allowing the caller to make better decisions about how to coalesce reads.
fn performance_hint(&self) -> usize {
0
}
/// Asynchronously get the number of bytes of data readable.
///
/// For a file it will be the size in bytes, for an object in an
/// `ObjectStore` it will be the `ObjectMeta::size`.
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static;
}
impl<T: VortexReadAt> VortexReadAt for Arc<T> {
fn read_byte_range(
&self,
pos: u64,
len: u64,
) -> impl Future<Output = io::Result<Buffer>> + 'static {
T::read_byte_range(self, pos, len)
}
fn performance_hint(&self) -> usize {
T::performance_hint(self)
}
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
T::size(self)
}
}
impl VortexReadAt for Buffer {
fn read_byte_range(
&self,
pos: u64,
len: u64,
) -> impl Future<Output = io::Result<Buffer>> + 'static {
let read_start: usize = pos.try_into().vortex_unwrap();
let read_end: usize = (len + pos).try_into().vortex_unwrap();
if read_end > self.len() {
return future::ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
vortex_err!("unexpected eof"),
)));
}
future::ready(Ok(self.slice(read_start..read_end)))
}
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
future::ready(Ok(self.len() as u64))
}
}