1use crate::model::Trusted;
10use sophia_api::source::{StreamError, StreamError::*, StreamResult};
11use std::error::Error;
12
13pub struct StrictRioSource<T>(pub T);
19
20impl<T> sophia_api::source::TripleSource for StrictRioSource<T>
21where
22 T: rio_api::parser::TriplesParser,
23 T::Error: Error + 'static,
24{
25 type Triple<'x> = Trusted<rio_api::model::Triple<'x>>;
26
27 type Error = T::Error;
28
29 fn try_for_some_triple<EF, F>(&mut self, mut f: F) -> StreamResult<bool, T::Error, EF>
30 where
31 EF: Error,
32 F: FnMut(Self::Triple<'_>) -> Result<(), EF>,
33 {
34 let parser = &mut self.0;
35 if parser.is_end() {
36 return Ok(false);
37 }
38 parser
39 .parse_step(&mut |t| -> Result<(), RioStreamError<T::Error, EF>> {
40 f(Trusted(t)).map_err(RioStreamError::Sink)
41 })
44 .map_err(StreamError::from)
45 .and(Ok(true))
46 }
47}
48
49impl<T> sophia_api::source::QuadSource for StrictRioSource<T>
50where
51 T: rio_api::parser::QuadsParser,
52 T::Error: Error + 'static,
53{
54 type Quad<'x> = Trusted<rio_api::model::Quad<'x>>;
55
56 type Error = T::Error;
57
58 fn try_for_some_quad<EF, F>(&mut self, mut f: F) -> StreamResult<bool, T::Error, EF>
59 where
60 EF: Error,
61 F: FnMut(Self::Quad<'_>) -> Result<(), EF>,
62 {
63 let parser = &mut self.0;
64 if parser.is_end() {
65 return Ok(false);
66 }
67 parser
68 .parse_step(&mut |q| -> Result<(), RioStreamError<T::Error, EF>> {
69 f(Trusted(q)).map_err(RioStreamError::Sink)
70 })
73 .map_err(StreamError::from)
74 .and(Ok(true))
75 }
76}
77
78pub struct GeneralizedRioSource<T>(pub T);
81
82impl<T> sophia_api::source::QuadSource for GeneralizedRioSource<T>
83where
84 T: rio_api::parser::GeneralizedQuadsParser,
85 T::Error: Error + 'static,
86{
87 type Quad<'x> = Trusted<rio_api::model::GeneralizedQuad<'x>>;
88
89 type Error = T::Error;
90
91 fn try_for_some_quad<EF, F>(&mut self, mut f: F) -> StreamResult<bool, T::Error, EF>
92 where
93 EF: Error,
94 F: FnMut(Self::Quad<'_>) -> Result<(), EF>,
95 {
96 let parser = &mut self.0;
97 if parser.is_end() {
98 return Ok(false);
99 }
100 parser
101 .parse_step(&mut |q| -> Result<(), RioStreamError<T::Error, EF>> {
102 f(Trusted(q)).map_err(RioStreamError::Sink)
103 })
106 .map_err(StreamError::from)
107 .and(Ok(true))
108 }
109}
110
111enum RioStreamError<E1, E2> {
120 Source(E1),
122 Sink(E2),
124}
125impl<E1, E2> From<E1> for RioStreamError<E1, E2>
126where
127 E1: Error,
128 E2: Error,
129{
130 fn from(other: E1) -> Self {
131 RioStreamError::Source(other)
132 }
133}
134impl<E1, E2> From<RioStreamError<E1, E2>> for StreamError<E1, E2>
135where
136 E1: Error,
137 E2: Error,
138{
139 fn from(other: RioStreamError<E1, E2>) -> Self {
140 match other {
141 RioStreamError::Source(err) => SourceError(err),
142 RioStreamError::Sink(err) => SinkError(err),
143 }
144 }
145}