1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use vortex_dtype::DType;
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_scalar::Scalar;

use crate::array::{ConstantArray, ConstantEncoding};
use crate::compute::{BinaryBooleanFn, BinaryOperator};
use crate::{ArrayDType, ArrayData, ArrayLen, IntoArrayData};

impl BinaryBooleanFn<ConstantArray> for ConstantEncoding {
    fn binary_boolean(
        &self,
        lhs: &ConstantArray,
        rhs: &ArrayData,
        op: BinaryOperator,
    ) -> VortexResult<Option<ArrayData>> {
        // We only implement this for constant <-> constant arrays, otherwise we allow fall back
        // to the Arrow implementation.
        if !rhs.is_constant() {
            return Ok(None);
        }

        let length = lhs.len();
        let nullable = lhs.dtype().is_nullable() || rhs.dtype().is_nullable();
        let lhs = lhs.scalar().as_bool().value();
        let Some(rhs) = rhs.as_constant() else {
            vortex_bail!("Binary boolean operation requires both sides to be constant");
        };
        let rhs = rhs
            .as_bool_opt()
            .ok_or_else(|| vortex_err!("expected rhs to be boolean"))?
            .value();

        let result = match op {
            BinaryOperator::And => and(lhs, rhs),
            BinaryOperator::AndKleene => kleene_and(lhs, rhs),
            BinaryOperator::Or => or(lhs, rhs),
            BinaryOperator::OrKleene => kleene_or(lhs, rhs),
        };

        let scalar = result
            .map(|b| Scalar::bool(b, nullable.into()))
            .unwrap_or_else(|| Scalar::null(DType::Bool(nullable.into())));

        Ok(Some(ConstantArray::new(scalar, length).into_array()))
    }
}

fn and(left: Option<bool>, right: Option<bool>) -> Option<bool> {
    left.zip(right).map(|(l, r)| l & r)
}

fn kleene_and(left: Option<bool>, right: Option<bool>) -> Option<bool> {
    match (left, right) {
        (Some(false), _) => Some(false),
        (_, Some(false)) => Some(false),
        (None, _) => None,
        (_, None) => None,
        (Some(l), Some(r)) => Some(l & r),
    }
}

fn or(left: Option<bool>, right: Option<bool>) -> Option<bool> {
    left.zip(right).map(|(l, r)| l | r)
}

fn kleene_or(left: Option<bool>, right: Option<bool>) -> Option<bool> {
    match (left, right) {
        (Some(true), _) => Some(true),
        (_, Some(true)) => Some(true),
        (None, _) => None,
        (_, None) => None,
        (Some(l), Some(r)) => Some(l | r),
    }
}

#[cfg(test)]
mod test {
    use rstest::rstest;

    use crate::array::constant::ConstantArray;
    use crate::array::BoolArray;
    use crate::compute::{and, or, scalar_at};
    use crate::{ArrayData, IntoArrayData, IntoArrayVariant};

    #[rstest]
    #[case(ConstantArray::new(true, 4).into_array(), BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array()
    )]
    #[case(BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array(), ConstantArray::new(true, 4).into_array()
    )]
    fn test_or(#[case] lhs: ArrayData, #[case] rhs: ArrayData) {
        let r = or(&lhs, &rhs).unwrap().into_bool().unwrap().into_array();

        let v0 = scalar_at(&r, 0).unwrap().as_bool().value();
        let v1 = scalar_at(&r, 1).unwrap().as_bool().value();
        let v2 = scalar_at(&r, 2).unwrap().as_bool().value();
        let v3 = scalar_at(&r, 3).unwrap().as_bool().value();

        assert!(v0.unwrap());
        assert!(v1.unwrap());
        assert!(v2.unwrap());
        assert!(v3.unwrap());
    }

    #[rstest]
    #[case(ConstantArray::new(true, 4).into_array(), BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array()
    )]
    #[case(BoolArray::from_iter([Some(true), Some(false), Some(true), Some(false)].into_iter()).into_array(),
        ConstantArray::new(true, 4).into_array())]
    fn test_and(#[case] lhs: ArrayData, #[case] rhs: ArrayData) {
        let r = and(&lhs, &rhs).unwrap().into_bool().unwrap().into_array();

        let v0 = scalar_at(&r, 0).unwrap().as_bool().value();
        let v1 = scalar_at(&r, 1).unwrap().as_bool().value();
        let v2 = scalar_at(&r, 2).unwrap().as_bool().value();
        let v3 = scalar_at(&r, 3).unwrap().as_bool().value();

        assert!(v0.unwrap());
        assert!(!v1.unwrap());
        assert!(v2.unwrap());
        assert!(!v3.unwrap());
    }
}