1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
use std::pin::Pin;
use std::task::{ready, Context, Poll};

use bytes::BytesMut;
use futures_util::{AsyncRead, Stream};
use pin_project_lite::pin_project;
use vortex_error::VortexResult;

use crate::messages::{DecoderMessage, MessageDecoder, PollRead};

pin_project! {
    /// An IPC message reader backed by an `AsyncRead` stream.
    pub struct AsyncMessageReader<R> {
        #[pin]
        read: R,
        buffer: BytesMut,
        decoder: MessageDecoder,
        bytes_read: usize,
    }
}

impl<R> AsyncMessageReader<R> {
    pub fn new(read: R) -> Self {
        AsyncMessageReader {
            read,
            buffer: BytesMut::new(),
            decoder: MessageDecoder::default(),
            bytes_read: 0,
        }
    }
}

impl<R: AsyncRead> Stream for AsyncMessageReader<R> {
    type Item = VortexResult<DecoderMessage>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        loop {
            match this.decoder.read_next(this.buffer)? {
                PollRead::Some(msg) => return Poll::Ready(Some(Ok(msg))),
                PollRead::NeedMore(nbytes) => {
                    this.buffer.resize(nbytes, 0x00);

                    match ready!(this
                        .read
                        .as_mut()
                        .poll_read(cx, &mut this.buffer.as_mut()[*this.bytes_read..]))
                    {
                        Ok(0) => {
                            // End of file
                            return Poll::Ready(None);
                        }
                        Ok(nbytes) => {
                            *this.bytes_read += nbytes;
                            // If we've finished the read operation, then we continue the loop
                            // and the decoder should present us with a new response.
                            if *this.bytes_read == nbytes {
                                *this.bytes_read = 0;
                            }
                        }
                        Err(e) => return Poll::Ready(Some(Err(e.into()))),
                    }
                }
            }
        }
    }
}