sophia_api/source/
_triple.rs

1use super::*;
2use crate::graph::{CollectibleGraph, Graph, MutableGraph};
3use crate::triple::Triple;
4
5/// A triple source produces [triples](Triple), and may also fail in the process.
6///
7/// see [module documentation](super) for the rationale of his trait.
8///
9/// # Common implementors
10///
11/// Any iterator yielding [results](std::result::Result) of [`Triple`]
12/// implements the [`TripleSource`] trait.
13///
14/// Any iterator of [`Triple`] can also be converted to an [`Infallible`] [`TripleSource`]
15/// thanks to the [`IntoTripleSource`] extension trait.
16pub trait TripleSource {
17    /// The type of triples this source yields.
18    type Triple<'x>: Triple;
19    /// The type of errors produced by this source.
20    type Error: Error + 'static;
21
22    /// Call f for some triple(s) (possibly zero) from this source, if any.
23    ///
24    /// Return `Ok(false)` if there are no more triples in this source.
25    ///
26    /// Return an error if either the source or `f` errs.
27    fn try_for_some_triple<E, F>(&mut self, f: F) -> StreamResult<bool, Self::Error, E>
28    where
29        E: Error,
30        F: FnMut(Self::Triple<'_>) -> Result<(), E>;
31
32    /// Call f for all triples from this source.
33    ///
34    /// Return an error if either the source or `f` errs.
35    #[inline]
36    fn try_for_each_triple<F, E>(&mut self, mut f: F) -> StreamResult<(), Self::Error, E>
37    where
38        F: FnMut(Self::Triple<'_>) -> Result<(), E>,
39        E: Error,
40    {
41        while self.try_for_some_triple(&mut f)? {}
42        Ok(())
43    }
44
45    /// Call f for some triple(s) (possibly zero) from this source, if any.
46    ///
47    /// Return false if there are no more triples in this source.
48    ///
49    /// Return an error if either the source errs.
50    #[inline]
51    fn for_some_triple<F>(&mut self, f: &mut F) -> Result<bool, Self::Error>
52    where
53        F: FnMut(Self::Triple<'_>),
54    {
55        self.try_for_some_triple(|t| -> Result<(), Self::Error> {
56            f(t);
57            Ok(())
58        })
59        .map_err(StreamError::inner_into)
60    }
61
62    /// Call f for all triples from this source.
63    ///
64    /// Return an error if either the source errs.
65    #[inline]
66    fn for_each_triple<F>(&mut self, f: F) -> Result<(), Self::Error>
67    where
68        F: FnMut(Self::Triple<'_>),
69    {
70        let mut f = f;
71        while self.for_some_triple(&mut f)? {}
72        Ok(())
73    }
74
75    /// Returns a source which uses `predicate` to determine if an triple should be yielded.
76    #[inline]
77    fn filter_triples<F>(self, predicate: F) -> filter::FilterTripleSource<Self, F>
78    where
79        Self: Sized,
80        F: FnMut(&Self::Triple<'_>) -> bool,
81    {
82        filter::FilterTripleSource {
83            source: self,
84            predicate,
85        }
86    }
87
88    /// Returns a source that both filters and maps.
89    ///
90    /// See also [`TripleSource::filter_triples`] and [`TripleSource::map_triples`].
91    #[inline]
92    fn filter_map_triples<F, T>(self, filter_map: F) -> filter_map::FilterMapTripleSource<Self, F>
93    where
94        Self: Sized,
95        F: FnMut(Self::Triple<'_>) -> Option<T>,
96    {
97        filter_map::FilterMapTripleSource {
98            source: self,
99            filter_map,
100        }
101    }
102
103    /// Returns a source which yield the result of `map` for each triple.
104    ///
105    /// See also [`TripleSource::to_quads`].
106    ///
107    /// NB: due to [some limitations in GATsĀ (Generic) Associated Types](https://blog.rust-lang.org/2022/10/28/gats-stabilization.html),
108    /// the `map` function is currently restricted in what it can return.
109    /// In particular, passing functions as trivial as `|t| t` or `|t| t.to_spo()`
110    /// currently do not compile on all implementations of [`TripleSource`].
111    /// Furthermore, some functions returning a [`Triple`] are accepted,
112    /// but fail to make the resulting [`map::MapTripleSource`] recognized as a [`TripleSource`].
113    ///
114    /// As a rule of thumb,
115    /// whenever `map` returns something satisfying the `'static` lifetime,
116    /// things should work as expected.
117    #[inline]
118    fn map_triples<F, T>(self, map: F) -> map::MapTripleSource<Self, F>
119    where
120        Self: Sized,
121        F: FnMut(Self::Triple<'_>) -> T,
122    {
123        map::MapTripleSource { source: self, map }
124    }
125
126    /// Convert of triples in this source to quads (belonging to the default graph).
127    #[inline]
128    fn to_quads(self) -> convert::ToQuads<Self>
129    where
130        Self: Sized,
131    {
132        convert::ToQuads(self)
133    }
134
135    /// Returns the bounds on the remaining length of the source.
136    ///
137    /// This method has the same contract as [`Iterator::size_hint`].
138    fn size_hint_triples(&self) -> (usize, Option<usize>) {
139        (0, None)
140    }
141
142    /// Collect these triples into a new graph.
143    #[inline]
144    fn collect_triples<G>(self) -> StreamResult<G, Self::Error, <G as Graph>::Error>
145    where
146        Self: Sized,
147        for<'x> Self::Triple<'x>: Triple,
148        G: CollectibleGraph,
149    {
150        G::from_triple_source(self)
151    }
152
153    /// Insert all triples from this source into the given [MutableGraph].
154    ///
155    /// Stop on the first error (in the source or in the graph).
156    #[inline]
157    fn add_to_graph<G: MutableGraph>(
158        self,
159        graph: &mut G,
160    ) -> StreamResult<usize, Self::Error, <G as MutableGraph>::MutationError>
161    where
162        Self: Sized,
163        for<'x> Self::Triple<'x>: Triple,
164    {
165        graph.insert_all(self)
166    }
167}
168
169impl<'a, I, T, E> TripleSource for I
170where
171    I: Iterator<Item = Result<T, E>> + 'a,
172    T: Triple,
173    E: Error + 'static,
174{
175    type Triple<'x> = T;
176    type Error = E;
177
178    fn try_for_some_triple<E2, F>(&mut self, mut f: F) -> StreamResult<bool, Self::Error, E2>
179    where
180        E2: Error,
181        F: FnMut(Self::Triple<'_>) -> Result<(), E2>,
182    {
183        match self.next() {
184            Some(Err(e)) => Err(SourceError(e)),
185            Some(Ok(t)) => {
186                f(t).map_err(SinkError)?;
187                Ok(true)
188            }
189            None => Ok(false),
190        }
191    }
192
193    fn size_hint_triples(&self) -> (usize, Option<usize>) {
194        self.size_hint()
195    }
196}
197
198/// An extension trait for iterators,
199/// converting them to an [`Infallible`] [`TripleSource`].
200pub trait IntoTripleSource: Iterator + Sized {
201    /// Convert this iterator into an [`Infallible`] [`TripleSource`].
202    #[allow(clippy::type_complexity)]
203    fn into_triple_source(
204        self,
205    ) -> std::iter::Map<
206        Self,
207        fn(<Self as Iterator>::Item) -> Result<<Self as Iterator>::Item, Infallible>,
208    > {
209        self.map(Ok::<_, Infallible>)
210    }
211}
212
213impl<I> IntoTripleSource for I
214where
215    I: Iterator,
216    I::Item: Triple,
217{
218}
219
220#[cfg(test)]
221mod check_triple_source {
222    use super::*;
223    use crate::term::{SimpleTerm, Term};
224    use sophia_iri::IriRef;
225    use std::convert::Infallible;
226    use std::fmt::Write;
227
228    #[allow(dead_code)] // only checks that this compiles
229    pub fn check_for_each<TS>(ts: TS)
230    where
231        TS: TripleSource,
232    {
233        ts.filter_triples(|t| t.s().is_iri())
234            .for_each_triple(|t| println!("{:?}", t.s()))
235            .unwrap();
236    }
237
238    #[allow(dead_code)] // only checks that this compiles
239    fn check_triple_source_impl_generic_graph<G: Graph>(g: &G) {
240        g.triples()
241            .filter_triples(|t| t.s().is_iri())
242            .for_each_triple(|t| println!("{:?}", t.s()))
243            .unwrap();
244    }
245
246    #[allow(dead_code)] // only checks that this compiles
247    fn check_triple_source_impl_concrete_graph(g: &[[SimpleTerm; 3]]) {
248        g.triples()
249            .filter_triples(|t| t.s().is_iri())
250            .for_each_triple(|t| println!("{:?}", t.s()))
251            .unwrap();
252    }
253
254    // checking that TripleSource can be implemented
255    struct DummyParser<'a> {
256        tokens: &'a [usize],
257        pos: usize,
258        buffers: [String; 3],
259    }
260
261    impl<'a> TripleSource for DummyParser<'a> {
262        type Triple<'x> = [SimpleTerm<'x>; 3];
263        type Error = Infallible;
264
265        fn try_for_some_triple<E2, F>(&mut self, mut f: F) -> StreamResult<bool, Self::Error, E2>
266        where
267            E2: Error,
268            F: FnMut(Self::Triple<'_>) -> Result<(), E2>,
269        {
270            if self.tokens.len() - self.pos < 3 {
271                Ok(false)
272            } else {
273                for i in 0..3 {
274                    write!(&mut self.buffers[i], "b{}", self.tokens[self.pos + i]).unwrap();
275                }
276                let t = [
277                    IriRef::new_unchecked(&self.buffers[0][..]).into_term(),
278                    IriRef::new_unchecked(&self.buffers[1][..]).into_term(),
279                    IriRef::new_unchecked(&self.buffers[2][..]).into_term(),
280                ];
281                f(t).map_err(SinkError).map(|_| true)
282            }
283        }
284    }
285
286    #[allow(dead_code)] // only checks that this compiles
287    fn check_triple_source_impl_by_iterator(v: Vec<Result<[SimpleTerm; 3], std::io::Error>>) {
288        v.into_iter()
289            .for_each_triple(|t| println!("{:?}", t.s()))
290            .unwrap();
291    }
292}