use std::fs::File;
use std::future::{self, Future};
use std::io;
use std::ops::Deref;
use std::os::unix::fs::FileExt;
use std::path::Path;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use vortex_buffer::Buffer;
use vortex_error::VortexUnwrap;
use crate::{IoBuf, VortexReadAt, VortexWrite};
pub struct TokioAdapter<IO>(pub IO);
impl<W: AsyncWrite + Unpin> VortexWrite for TokioAdapter<W> {
async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
self.0.write_all(buffer.as_slice()).await?;
Ok(buffer)
}
async fn flush(&mut self) -> io::Result<()> {
self.0.flush().await
}
async fn shutdown(&mut self) -> io::Result<()> {
self.0.shutdown().await
}
}
#[derive(Debug, Clone)]
pub struct TokioFile(Arc<File>);
impl TokioFile {
pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
let f = File::open(path)?;
Ok(Self(Arc::new(f)))
}
}
impl Deref for TokioFile {
type Target = File;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl VortexReadAt for TokioFile {
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
fn read_byte_range(
&self,
pos: u64,
len: u64,
) -> impl Future<Output = io::Result<Buffer>> + 'static {
let this = self.clone();
let mut buffer = vec![0u8; len.try_into().vortex_unwrap()];
match this.read_exact_at(&mut buffer, pos) {
Ok(()) => future::ready(Ok(Buffer::from(buffer))),
Err(e) => future::ready(Err(e)),
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
fn size(&self) -> impl Future<Output = io::Result<u64>> + 'static {
let this = self.clone();
async move { this.metadata().map(|metadata| metadata.len()) }
}
}
impl VortexWrite for tokio::fs::File {
async fn write_all<B: IoBuf>(&mut self, buffer: B) -> io::Result<B> {
AsyncWriteExt::write_all(self, buffer.as_slice()).await?;
Ok(buffer)
}
async fn flush(&mut self) -> io::Result<()> {
AsyncWriteExt::flush(self).await
}
async fn shutdown(&mut self) -> io::Result<()> {
AsyncWriteExt::shutdown(self).await
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::Write;
use std::ops::Deref;
use std::os::unix::fs::FileExt;
use tempfile::NamedTempFile;
use crate::{TokioFile, VortexReadAt};
#[tokio::test]
async fn test_shared_file() {
let mut tmpfile = NamedTempFile::new().unwrap();
write!(tmpfile, "0123456789").unwrap();
let shared_file = TokioFile::open(tmpfile.path()).unwrap();
let first_half = shared_file.read_byte_range(0, 5).await.unwrap();
let second_half = shared_file.read_byte_range(5, 5).await.unwrap();
assert_eq!(first_half.as_ref(), "01234".as_bytes());
assert_eq!(second_half.as_ref(), "56789".as_bytes());
}
#[test]
fn test_drop_semantics() {
let mut file = NamedTempFile::new().unwrap();
write!(file, "test123").unwrap();
let tokio_file = TokioFile::open(file.path()).unwrap();
std::fs::remove_file(file.path()).unwrap();
let can_read = |file: &File| {
let mut buffer = vec![0; 7];
file.read_exact_at(&mut buffer, 0).is_ok()
};
assert!(can_read(tokio_file.deref()));
let tokio_file_cloned = tokio_file.clone();
drop(tokio_file);
assert!(can_read(tokio_file_cloned.deref()));
drop(tokio_file_cloned);
assert!(!std::fs::exists(file.path()).unwrap());
}
}