1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#[cfg(feature = "compio")]
mod compio;
#[cfg(not(target_arch = "wasm32"))]
mod tokio;
#[cfg(target_arch = "wasm32")]
mod wasm;

use std::future::Future;
use std::task::Poll;

use cfg_if::cfg_if;
use futures::channel::oneshot;
use futures::FutureExt;
use vortex_error::{vortex_err, VortexResult};

#[cfg(feature = "compio")]
use self::compio::*;
#[cfg(not(target_arch = "wasm32"))]
use self::tokio::*;
#[cfg(target_arch = "wasm32")]
use self::wasm::*;

mod sealed {
    pub trait Sealed {}

    impl Sealed for super::IoDispatcher {}

    #[cfg(feature = "compio")]
    impl Sealed for super::CompioDispatcher {}

    #[cfg(not(target_arch = "wasm32"))]
    impl Sealed for super::TokioDispatcher {}

    #[cfg(target_arch = "wasm32")]
    impl Sealed for super::WasmDispatcher {}
}

/// A trait for types that may be dispatched.
pub trait Dispatch: sealed::Sealed {
    /// Dispatch a new asynchronous task.
    ///
    /// The function spawning the task must be `Send` as it will be sent to
    /// the driver thread.
    ///
    /// The returned `Future` will be executed to completion on a single thread,
    /// thus it may be `!Send`.
    fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<JoinHandle<R>>
    where
        F: (FnOnce() -> Fut) + Send + 'static,
        Fut: Future<Output = R> + 'static,
        R: Send + 'static;

    /// Gracefully shutdown the dispatcher, consuming it.
    ///
    /// Existing tasks are awaited before exiting.
    fn shutdown(self) -> VortexResult<()>;
}

/// <div class="warning">IoDispatcher is unstable and may change in the future.</div>
///
/// A cross-thread, cross-runtime dispatcher of async IO workloads.
///
/// `IoDispatcher`s are handles to an async runtime that can handle work submissions and
/// multiplexes them across a set of worker threads. Unlike an async runtime, which is free
/// to balance tasks as they see fit, the purpose of the Dispatcher is to enable the spawning
/// of asynchronous, `!Send` tasks across potentially many worker threads, and allowing work
/// submission from any other runtime.
///
#[derive(Debug)]
pub struct IoDispatcher(Inner);

pub struct JoinHandle<R>(oneshot::Receiver<R>);

impl<R> Future for JoinHandle<R> {
    type Output = VortexResult<R>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Self::Output> {
        match self.0.poll_unpin(cx) {
            Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
            Poll::Ready(Err(_)) => Poll::Ready(Err(vortex_err!("Task was canceled"))),
            Poll::Pending => Poll::Pending,
        }
    }
}

#[derive(Debug)]
enum Inner {
    #[cfg(not(target_arch = "wasm32"))]
    Tokio(TokioDispatcher),
    #[cfg(feature = "compio")]
    Compio(CompioDispatcher),
    #[cfg(target_arch = "wasm32")]
    Wasm(WasmDispatcher),
}

impl Default for IoDispatcher {
    fn default() -> Self {
        cfg_if! {
            if #[cfg(target_arch = "wasm32")] {
                Self(Inner::Wasm(WasmDispatcher::new()))
            } else if #[cfg(not(feature = "compio"))] {
                Self(Inner::Tokio(TokioDispatcher::new(1)))
            } else {
                Self(Inner::Compio(CompioDispatcher::new(1)))
            }
        }
    }
}

impl Dispatch for IoDispatcher {
    #[allow(unused_variables)] // If no features are enabled `task` ends up being unused
    fn dispatch<F, Fut, R>(&self, task: F) -> VortexResult<JoinHandle<R>>
    where
        F: (FnOnce() -> Fut) + Send + 'static,
        Fut: Future<Output = R> + 'static,
        R: Send + 'static,
    {
        match self.0 {
            #[cfg(not(target_arch = "wasm32"))]
            Inner::Tokio(ref tokio_dispatch) => tokio_dispatch.dispatch(task),
            #[cfg(feature = "compio")]
            Inner::Compio(ref compio_dispatch) => compio_dispatch.dispatch(task),
            #[cfg(target_arch = "wasm32")]
            Inner::Wasm(ref wasm_dispatch) => wasm_dispatch.dispatch(task),
        }
    }

    fn shutdown(self) -> VortexResult<()> {
        match self.0 {
            #[cfg(not(target_arch = "wasm32"))]
            Inner::Tokio(tokio_dispatch) => tokio_dispatch.shutdown(),
            #[cfg(feature = "compio")]
            Inner::Compio(compio_dispatch) => compio_dispatch.shutdown(),
            #[cfg(target_arch = "wasm32")]
            Inner::Wasm(wasm_dispatch) => wasm_dispatch.shutdown(),
        }
    }
}

impl IoDispatcher {
    /// Create a new IO dispatcher that uses a set of Tokio `current_thread` runtimes to
    /// execute both `Send` and `!Send` futures.
    ///
    /// A handle to the dispatcher can be passed freely among threads, allowing multiple parties to
    /// perform dispatching across different threads.
    #[cfg(not(target_arch = "wasm32"))]
    pub fn new_tokio(num_thread: usize) -> Self {
        Self(Inner::Tokio(TokioDispatcher::new(num_thread)))
    }

    #[cfg(feature = "compio")]
    pub fn new_compio(num_threads: usize) -> Self {
        Self(Inner::Compio(CompioDispatcher::new(num_threads)))
    }

    #[cfg(target_arch = "wasm32")]
    pub fn new_wasm() -> Self {
        Self(Inner::Wasm(WasmDispatcher))
    }
}