#[cfg(feature = "compio")]
mod compio;
#[cfg(not(target_arch = "wasm32"))]
mod tokio;
#[cfg(target_arch = "wasm32")]
mod wasm;
use std::future::Future;
use std::task::Poll;
use cfg_if::cfg_if;
use futures::channel::oneshot;
use futures::FutureExt;
use vortex_error::{vortex_err, VortexResult};
#[cfg(feature = "compio")]
use self::compio::*;
#[cfg(not(target_arch = "wasm32"))]
use self::tokio::*;
#[cfg(target_arch = "wasm32")]
use self::wasm::*;
mod sealed {
pub trait Sealed {}
impl Sealed for super::IoDispatcher {}
#[cfg(feature = "compio")]
impl Sealed for super::CompioDispatcher {}
#[cfg(not(target_arch = "wasm32"))]
impl Sealed for super::TokioDispatcher {}
#[cfg(target_arch = "wasm32")]
impl Sealed for super::WasmDispatcher {}
}
pub trait Dispatch: sealed::Sealed {
fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<JoinHandle<R>>
where
F: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
R: Send + 'static;
fn shutdown(self) -> VortexResult<()>;
}
#[derive(Debug)]
pub struct IoDispatcher(Inner);
pub struct JoinHandle<R>(oneshot::Receiver<R>);
impl<R> Future for JoinHandle<R> {
type Output = VortexResult<R>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
match self.0.poll_unpin(cx) {
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
Poll::Ready(Err(_)) => Poll::Ready(Err(vortex_err!("Task was canceled"))),
Poll::Pending => Poll::Pending,
}
}
}
#[derive(Debug)]
enum Inner {
#[cfg(not(target_arch = "wasm32"))]
Tokio(TokioDispatcher),
#[cfg(feature = "compio")]
Compio(CompioDispatcher),
#[cfg(target_arch = "wasm32")]
Wasm(WasmDispatcher),
}
impl Default for IoDispatcher {
fn default() -> Self {
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
Self(Inner::Wasm(WasmDispatcher::new()))
} else if #[cfg(not(feature = "compio"))] {
Self(Inner::Tokio(TokioDispatcher::new(1)))
} else {
Self(Inner::Compio(CompioDispatcher::new(1)))
}
}
}
}
impl Dispatch for IoDispatcher {
#[allow(unused_variables)] fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<JoinHandle<R>>
where
F: (FnOnce() -> Fut) + Send + 'static,
Fut: Future<Output = R> + 'static,
R: Send + 'static,
{
match self.0 {
#[cfg(not(target_arch = "wasm32"))]
Inner::Tokio(ref tokio_dispatch) => tokio_dispatch.dispatch(task),
#[cfg(feature = "compio")]
Inner::Compio(ref compio_dispatch) => compio_dispatch.dispatch(task),
#[cfg(target_arch = "wasm32")]
Inner::Wasm(ref wasm_dispatch) => wasm_dispatch.dispatch(task),
}
}
fn shutdown(self) -> VortexResult<()> {
match self.0 {
#[cfg(not(target_arch = "wasm32"))]
Inner::Tokio(tokio_dispatch) => tokio_dispatch.shutdown(),
#[cfg(feature = "compio")]
Inner::Compio(compio_dispatch) => compio_dispatch.shutdown(),
#[cfg(target_arch = "wasm32")]
Inner::Wasm(wasm_dispatch) => wasm_dispatch.shutdown(),
}
}
}
impl IoDispatcher {
#[cfg(not(target_arch = "wasm32"))]
pub fn new_tokio(num_thread: usize) -> Self {
Self(Inner::Tokio(TokioDispatcher::new(num_thread)))
}
#[cfg(feature = "compio")]
pub fn new_compio(num_threads: usize) -> Self {
Self(Inner::Compio(CompioDispatcher::new(num_threads)))
}
#[cfg(target_arch = "wasm32")]
pub fn new_wasm() -> Self {
Self(Inner::Wasm(WasmDispatcher))
}
}