vortex_ipc/messages/
reader_sync.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
use std::io::Read;

use bytes::BytesMut;
use vortex_error::VortexResult;

use crate::messages::{DecoderMessage, MessageDecoder, PollRead};

/// An IPC message reader backed by a `Read` stream.
pub struct SyncMessageReader<R> {
    read: R,
    buffer: BytesMut,
    decoder: MessageDecoder,
}

impl<R: Read> SyncMessageReader<R> {
    pub fn new(read: R) -> Self {
        SyncMessageReader {
            read,
            buffer: BytesMut::new(),
            decoder: MessageDecoder::default(),
        }
    }
}

impl<R: Read> Iterator for SyncMessageReader<R> {
    type Item = VortexResult<DecoderMessage>;

    fn next(&mut self) -> Option<Self::Item> {
        loop {
            match self.decoder.read_next(&mut self.buffer) {
                Ok(PollRead::Some(msg)) => {
                    return Some(Ok(msg));
                }
                Ok(PollRead::NeedMore(nbytes)) => {
                    self.buffer.resize(nbytes, 0x00);
                    match self.read.read(&mut self.buffer) {
                        Ok(0) => {
                            // EOF
                            return None;
                        }
                        Ok(_nbytes) => {
                            // Continue in the loop
                        }
                        Err(e) => return Some(Err(e.into())),
                    }
                }
                Err(e) => return Some(Err(e)),
            }
        }
    }
}