vortex_datafusion/persistent/
cache.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use std::sync::Arc;

use chrono::{DateTime, Utc};
use moka::future::Cache;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use vortex_error::{vortex_err, VortexError, VortexResult};
use vortex_file::{read_initial_bytes, InitialRead};
use vortex_io::ObjectStoreReadAt;

#[derive(Debug, Clone)]
pub struct InitialReadCache {
    inner: Cache<Key, InitialRead>,
}

#[derive(Hash, Eq, PartialEq, Debug)]
pub struct Key {
    location: Path,
    m_time: DateTime<Utc>,
}

impl From<&ObjectMeta> for Key {
    fn from(value: &ObjectMeta) -> Self {
        Self {
            location: value.location.clone(),
            m_time: value.last_modified,
        }
    }
}

impl InitialReadCache {
    pub fn new(size_mb: usize) -> Self {
        let inner = Cache::builder()
            .weigher(|k: &Key, v: &InitialRead| (k.location.as_ref().len() + v.buf.len()) as u32)
            .max_capacity(size_mb as u64 * (2 << 20))
            .eviction_listener(|k, _v, cause| {
                log::trace!("Removed {} due to {:?}", k.location, cause);
            })
            .build();

        Self { inner }
    }
    pub async fn try_get(
        &self,
        object: &ObjectMeta,
        store: Arc<dyn ObjectStore>,
    ) -> VortexResult<InitialRead> {
        self.inner
            .try_get_with(Key::from(object), async {
                let os_read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
                let initial_read = read_initial_bytes(&os_read_at, object.size as u64).await?;
                VortexResult::Ok(initial_read)
            })
            .await
            .map_err(|e: Arc<VortexError>| match Arc::try_unwrap(e) {
                Ok(e) => e,
                Err(e) => vortex_err!("{}", e.to_string()),
            })
    }
}