sparesults/
csv.rs

1//! Implementation of [SPARQL 1.1 Query Results CSV and TSV Formats](https://www.w3.org/TR/sparql11-results-csv-tsv/)
2
3use crate::error::{QueryResultsParseError, QueryResultsSyntaxError, TextPosition};
4use memchr::memchr;
5use oxrdf::vocab::xsd;
6use oxrdf::*;
7use std::io::{self, Read, Write};
8use std::str::{self, FromStr};
9#[cfg(feature = "async-tokio")]
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12const MAX_BUFFER_SIZE: usize = 4096 * 4096;
13
14pub fn write_boolean_csv_result<W: Write>(mut writer: W, value: bool) -> io::Result<W> {
15    writer.write_all(if value { b"true" } else { b"false" })?;
16    Ok(writer)
17}
18
19#[cfg(feature = "async-tokio")]
20pub async fn tokio_async_write_boolean_csv_result<W: AsyncWrite + Unpin>(
21    mut writer: W,
22    value: bool,
23) -> io::Result<W> {
24    writer
25        .write_all(if value { b"true" } else { b"false" })
26        .await?;
27    Ok(writer)
28}
29
30pub struct WriterCsvSolutionsSerializer<W: Write> {
31    inner: InnerCsvSolutionsSerializer,
32    writer: W,
33    buffer: String,
34}
35
36impl<W: Write> WriterCsvSolutionsSerializer<W> {
37    pub fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
38        let mut buffer = String::new();
39        let inner = InnerCsvSolutionsSerializer::start(&mut buffer, variables);
40        writer.write_all(buffer.as_bytes())?;
41        buffer.clear();
42        Ok(Self {
43            inner,
44            writer,
45            buffer,
46        })
47    }
48
49    pub fn serialize<'a>(
50        &mut self,
51        solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
52    ) -> io::Result<()> {
53        self.inner.write(&mut self.buffer, solution);
54        self.writer.write_all(self.buffer.as_bytes())?;
55        self.buffer.clear();
56        Ok(())
57    }
58
59    pub fn finish(self) -> W {
60        self.writer
61    }
62}
63
64#[cfg(feature = "async-tokio")]
65pub struct TokioAsyncWriterCsvSolutionsSerializer<W: AsyncWrite + Unpin> {
66    inner: InnerCsvSolutionsSerializer,
67    writer: W,
68    buffer: String,
69}
70
71#[cfg(feature = "async-tokio")]
72impl<W: AsyncWrite + Unpin> TokioAsyncWriterCsvSolutionsSerializer<W> {
73    pub async fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
74        let mut buffer = String::new();
75        let inner = InnerCsvSolutionsSerializer::start(&mut buffer, variables);
76        writer.write_all(buffer.as_bytes()).await?;
77        buffer.clear();
78        Ok(Self {
79            inner,
80            writer,
81            buffer,
82        })
83    }
84
85    pub async fn serialize<'a>(
86        &mut self,
87        solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
88    ) -> io::Result<()> {
89        self.inner.write(&mut self.buffer, solution);
90        self.writer.write_all(self.buffer.as_bytes()).await?;
91        self.buffer.clear();
92        Ok(())
93    }
94
95    pub fn finish(self) -> W {
96        self.writer
97    }
98}
99
100struct InnerCsvSolutionsSerializer {
101    variables: Vec<Variable>,
102}
103
104impl InnerCsvSolutionsSerializer {
105    fn start(output: &mut String, variables: Vec<Variable>) -> Self {
106        let mut start_vars = true;
107        for variable in &variables {
108            if start_vars {
109                start_vars = false;
110            } else {
111                output.push(',');
112            }
113            output.push_str(variable.as_str());
114        }
115        output.push_str("\r\n");
116        Self { variables }
117    }
118
119    fn write<'a>(
120        &self,
121        output: &mut String,
122        solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
123    ) {
124        let mut values = vec![None; self.variables.len()];
125        for (variable, value) in solution {
126            if let Some(position) = self.variables.iter().position(|v| *v == variable) {
127                values[position] = Some(value);
128            }
129        }
130        let mut start_binding = true;
131        for value in values {
132            if start_binding {
133                start_binding = false;
134            } else {
135                output.push(',');
136            }
137            if let Some(value) = value {
138                write_csv_term(output, value);
139            }
140        }
141        output.push_str("\r\n");
142    }
143}
144
145fn write_csv_term<'a>(output: &mut String, term: impl Into<TermRef<'a>>) {
146    match term.into() {
147        TermRef::NamedNode(uri) => output.push_str(uri.as_str()),
148        TermRef::BlankNode(bnode) => {
149            output.push_str("_:");
150            output.push_str(bnode.as_str())
151        }
152        TermRef::Literal(literal) => write_escaped_csv_string(output, literal.value()),
153        #[cfg(feature = "rdf-star")]
154        TermRef::Triple(triple) => {
155            write_csv_term(output, &triple.subject);
156            output.push(' ');
157            write_csv_term(output, &triple.predicate);
158            output.push(' ');
159            write_csv_term(output, &triple.object)
160        }
161    }
162}
163
164fn write_escaped_csv_string(output: &mut String, s: &str) {
165    if s.bytes().any(|c| matches!(c, b'"' | b',' | b'\n' | b'\r')) {
166        output.push('"');
167        for c in s.chars() {
168            if c == '"' {
169                output.push('"');
170                output.push('"');
171            } else {
172                output.push(c)
173            };
174        }
175        output.push('"');
176    } else {
177        output.push_str(s)
178    }
179}
180
181pub struct WriterTsvSolutionsSerializer<W: Write> {
182    inner: InnerTsvSolutionsSerializer,
183    writer: W,
184    buffer: String,
185}
186
187impl<W: Write> WriterTsvSolutionsSerializer<W> {
188    pub fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
189        let mut buffer = String::new();
190        let inner = InnerTsvSolutionsSerializer::start(&mut buffer, variables);
191        writer.write_all(buffer.as_bytes())?;
192        buffer.clear();
193        Ok(Self {
194            inner,
195            writer,
196            buffer,
197        })
198    }
199
200    pub fn serialize<'a>(
201        &mut self,
202        solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
203    ) -> io::Result<()> {
204        self.inner.write(&mut self.buffer, solution);
205        self.writer.write_all(self.buffer.as_bytes())?;
206        self.buffer.clear();
207        Ok(())
208    }
209
210    pub fn finish(self) -> W {
211        self.writer
212    }
213}
214
215#[cfg(feature = "async-tokio")]
216pub struct TokioAsyncWriterTsvSolutionsSerializer<W: AsyncWrite + Unpin> {
217    inner: InnerTsvSolutionsSerializer,
218    writer: W,
219    buffer: String,
220}
221
222#[cfg(feature = "async-tokio")]
223impl<W: AsyncWrite + Unpin> TokioAsyncWriterTsvSolutionsSerializer<W> {
224    pub async fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
225        let mut buffer = String::new();
226        let inner = InnerTsvSolutionsSerializer::start(&mut buffer, variables);
227        writer.write_all(buffer.as_bytes()).await?;
228        buffer.clear();
229        Ok(Self {
230            inner,
231            writer,
232            buffer,
233        })
234    }
235
236    pub async fn serialize<'a>(
237        &mut self,
238        solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
239    ) -> io::Result<()> {
240        self.inner.write(&mut self.buffer, solution);
241        self.writer.write_all(self.buffer.as_bytes()).await?;
242        self.buffer.clear();
243        Ok(())
244    }
245
246    pub fn finish(self) -> W {
247        self.writer
248    }
249}
250
251struct InnerTsvSolutionsSerializer {
252    variables: Vec<Variable>,
253}
254
255impl InnerTsvSolutionsSerializer {
256    fn start(output: &mut String, variables: Vec<Variable>) -> Self {
257        let mut start_vars = true;
258        for variable in &variables {
259            if start_vars {
260                start_vars = false;
261            } else {
262                output.push('\t');
263            }
264            output.push('?');
265            output.push_str(variable.as_str());
266        }
267        output.push('\n');
268        Self { variables }
269    }
270
271    fn write<'a>(
272        &self,
273        output: &mut String,
274        solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
275    ) {
276        let mut values = vec![None; self.variables.len()];
277        for (variable, value) in solution {
278            if let Some(position) = self.variables.iter().position(|v| *v == variable) {
279                values[position] = Some(value);
280            }
281        }
282        let mut start_binding = true;
283        for value in values {
284            if start_binding {
285                start_binding = false;
286            } else {
287                output.push('\t');
288            }
289            if let Some(value) = value {
290                write_tsv_term(output, value);
291            }
292        }
293        output.push('\n');
294    }
295}
296
297fn write_tsv_term<'a>(output: &mut String, term: impl Into<TermRef<'a>>) {
298    match term.into() {
299        TermRef::NamedNode(node) => {
300            output.push('<');
301            output.push_str(node.as_str());
302            output.push('>');
303        }
304        TermRef::BlankNode(node) => {
305            output.push_str("_:");
306            output.push_str(node.as_str());
307        }
308        TermRef::Literal(literal) => {
309            let value = literal.value();
310            if let Some(language) = literal.language() {
311                write_tsv_quoted_str(output, value);
312                output.push('@');
313                output.push_str(language);
314            } else {
315                match literal.datatype() {
316                    xsd::BOOLEAN if is_turtle_boolean(value) => output.push_str(value),
317                    xsd::INTEGER if is_turtle_integer(value) => output.push_str(value),
318                    xsd::DECIMAL if is_turtle_decimal(value) => output.push_str(value),
319                    xsd::DOUBLE if is_turtle_double(value) => output.push_str(value),
320                    xsd::STRING => write_tsv_quoted_str(output, value),
321                    datatype => {
322                        write_tsv_quoted_str(output, value);
323                        output.push_str("^^");
324                        write_tsv_term(output, datatype);
325                    }
326                }
327            }
328        }
329        #[cfg(feature = "rdf-star")]
330        TermRef::Triple(triple) => {
331            output.push_str("<< ");
332            write_tsv_term(output, &triple.subject);
333            output.push(' ');
334            write_tsv_term(output, &triple.predicate);
335            output.push(' ');
336            write_tsv_term(output, &triple.object);
337            output.push_str(" >>");
338        }
339    }
340}
341
342fn write_tsv_quoted_str(output: &mut String, string: &str) {
343    output.push('"');
344    for c in string.chars() {
345        match c {
346            '\t' => output.push_str("\\t"),
347            '\n' => output.push_str("\\n"),
348            '\r' => output.push_str("\\r"),
349            '"' => output.push_str("\\\""),
350            '\\' => output.push_str("\\\\"),
351            _ => output.push(c),
352        };
353    }
354    output.push('"');
355}
356
357fn is_turtle_boolean(value: &str) -> bool {
358    matches!(value, "true" | "false")
359}
360
361fn is_turtle_integer(value: &str) -> bool {
362    // [19]  INTEGER  ::=  [+-]? [0-9]+
363    let mut value = value.as_bytes();
364    if let Some(v) = value.strip_prefix(b"+") {
365        value = v;
366    } else if let Some(v) = value.strip_prefix(b"-") {
367        value = v;
368    }
369    !value.is_empty() && value.iter().all(u8::is_ascii_digit)
370}
371
372fn is_turtle_decimal(value: &str) -> bool {
373    // [20]  DECIMAL  ::=  [+-]? [0-9]* '.' [0-9]+
374    let mut value = value.as_bytes();
375    if let Some(v) = value.strip_prefix(b"+") {
376        value = v;
377    } else if let Some(v) = value.strip_prefix(b"-") {
378        value = v;
379    }
380    while value.first().is_some_and(u8::is_ascii_digit) {
381        value = &value[1..];
382    }
383    let Some(value) = value.strip_prefix(b".") else {
384        return false;
385    };
386    !value.is_empty() && value.iter().all(u8::is_ascii_digit)
387}
388
389fn is_turtle_double(value: &str) -> bool {
390    // [21]    DOUBLE    ::=  [+-]? ([0-9]+ '.' [0-9]* EXPONENT | '.' [0-9]+ EXPONENT | [0-9]+ EXPONENT)
391    // [154s]  EXPONENT  ::=  [eE] [+-]? [0-9]+
392    let mut value = value.as_bytes();
393    if let Some(v) = value.strip_prefix(b"+") {
394        value = v;
395    } else if let Some(v) = value.strip_prefix(b"-") {
396        value = v;
397    }
398    let mut with_before = false;
399    while value.first().is_some_and(u8::is_ascii_digit) {
400        value = &value[1..];
401        with_before = true;
402    }
403    let mut with_after = false;
404    if let Some(v) = value.strip_prefix(b".") {
405        value = v;
406        while value.first().is_some_and(u8::is_ascii_digit) {
407            value = &value[1..];
408            with_after = true;
409        }
410    }
411    if let Some(v) = value.strip_prefix(b"e") {
412        value = v;
413    } else if let Some(v) = value.strip_prefix(b"E") {
414        value = v;
415    } else {
416        return false;
417    }
418    if let Some(v) = value.strip_prefix(b"+") {
419        value = v;
420    } else if let Some(v) = value.strip_prefix(b"-") {
421        value = v;
422    }
423    (with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit)
424}
425
426pub enum ReaderTsvQueryResultsParserOutput<R: Read> {
427    Solutions {
428        variables: Vec<Variable>,
429        solutions: ReaderTsvSolutionsParser<R>,
430    },
431    Boolean(bool),
432}
433
434impl<R: Read> ReaderTsvQueryResultsParserOutput<R> {
435    pub fn read(mut reader: R) -> Result<Self, QueryResultsParseError> {
436        let mut line_reader = LineReader::new();
437        let mut buffer = Vec::new();
438        let line = line_reader.next_line_from_reader(&mut buffer, &mut reader)?;
439        Ok(match inner_read_first_line(line_reader, line)? {
440            TsvInnerQueryResults::Solutions {
441                variables,
442                solutions,
443            } => Self::Solutions {
444                variables,
445                solutions: ReaderTsvSolutionsParser {
446                    reader,
447                    inner: solutions,
448                    buffer,
449                },
450            },
451            TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
452        })
453    }
454}
455
456pub struct ReaderTsvSolutionsParser<R: Read> {
457    reader: R,
458    inner: TsvInnerSolutionsParser,
459    buffer: Vec<u8>,
460}
461
462impl<R: Read> ReaderTsvSolutionsParser<R> {
463    pub fn parse_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
464        let line = self
465            .inner
466            .line_reader
467            .next_line_from_reader(&mut self.buffer, &mut self.reader)?;
468        Ok(self.inner.parse_next(line)?)
469    }
470}
471
472#[cfg(feature = "async-tokio")]
473pub enum TokioAsyncReaderTsvQueryResultsParserOutput<R: AsyncRead + Unpin> {
474    Solutions {
475        variables: Vec<Variable>,
476        solutions: TokioAsyncReaderTsvSolutionsParser<R>,
477    },
478    Boolean(bool),
479}
480
481#[cfg(feature = "async-tokio")]
482impl<R: AsyncRead + Unpin> TokioAsyncReaderTsvQueryResultsParserOutput<R> {
483    pub async fn read(mut reader: R) -> Result<Self, QueryResultsParseError> {
484        let mut line_reader = LineReader::new();
485        let mut buffer = Vec::new();
486        let line = line_reader
487            .next_line_from_tokio_async_read(&mut buffer, &mut reader)
488            .await?;
489        Ok(match inner_read_first_line(line_reader, line)? {
490            TsvInnerQueryResults::Solutions {
491                variables,
492                solutions,
493            } => Self::Solutions {
494                variables,
495                solutions: TokioAsyncReaderTsvSolutionsParser {
496                    reader,
497                    inner: solutions,
498                    buffer,
499                },
500            },
501            TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
502        })
503    }
504}
505
506#[cfg(feature = "async-tokio")]
507pub struct TokioAsyncReaderTsvSolutionsParser<R: AsyncRead + Unpin> {
508    reader: R,
509    inner: TsvInnerSolutionsParser,
510    buffer: Vec<u8>,
511}
512
513#[cfg(feature = "async-tokio")]
514impl<R: AsyncRead + Unpin> TokioAsyncReaderTsvSolutionsParser<R> {
515    pub async fn parse_next(
516        &mut self,
517    ) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
518        let line = self
519            .inner
520            .line_reader
521            .next_line_from_tokio_async_read(&mut self.buffer, &mut self.reader)
522            .await?;
523        Ok(self.inner.parse_next(line)?)
524    }
525}
526
527pub enum SliceTsvQueryResultsParserOutput<'a> {
528    Solutions {
529        variables: Vec<Variable>,
530        solutions: SliceTsvSolutionsParser<'a>,
531    },
532    Boolean(bool),
533}
534
535impl<'a> SliceTsvQueryResultsParserOutput<'a> {
536    pub fn read(slice: &'a [u8]) -> Result<Self, QueryResultsSyntaxError> {
537        let mut reader = LineReader::new();
538        let line = reader.next_line_from_slice(slice)?;
539        Ok(match inner_read_first_line(reader, line)? {
540            TsvInnerQueryResults::Solutions {
541                variables,
542                solutions,
543            } => Self::Solutions {
544                variables,
545                solutions: SliceTsvSolutionsParser {
546                    slice,
547                    inner: solutions,
548                },
549            },
550            TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
551        })
552    }
553}
554
555pub struct SliceTsvSolutionsParser<'a> {
556    slice: &'a [u8],
557    inner: TsvInnerSolutionsParser,
558}
559
560impl SliceTsvSolutionsParser<'_> {
561    pub fn parse_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsSyntaxError> {
562        let line = self.inner.line_reader.next_line_from_slice(self.slice)?;
563        self.inner.parse_next(line)
564    }
565}
566
567enum TsvInnerQueryResults {
568    Solutions {
569        variables: Vec<Variable>,
570        solutions: TsvInnerSolutionsParser,
571    },
572    Boolean(bool),
573}
574
575fn inner_read_first_line(
576    reader: LineReader,
577    line: &str,
578) -> Result<TsvInnerQueryResults, QueryResultsSyntaxError> {
579    let line = line.trim_matches(|c| matches!(c, ' ' | '\r' | '\n'));
580    if line.eq_ignore_ascii_case("true") {
581        return Ok(TsvInnerQueryResults::Boolean(true));
582    }
583    if line.eq_ignore_ascii_case("false") {
584        return Ok(TsvInnerQueryResults::Boolean(false));
585    }
586    let mut variables = Vec::new();
587    if !line.is_empty() {
588        for v in line.split('\t') {
589            let v = v.trim();
590            if v.is_empty() {
591                return Err(QueryResultsSyntaxError::msg("Empty column on the first row. The first row should be a list of variables like ?foo or $bar"));
592            }
593            let variable = Variable::from_str(v).map_err(|e| {
594                QueryResultsSyntaxError::msg(format!("Invalid variable declaration '{v}': {e}"))
595            })?;
596            if variables.contains(&variable) {
597                return Err(QueryResultsSyntaxError::msg(format!(
598                    "The variable {variable} is declared twice"
599                )));
600            }
601            variables.push(variable);
602        }
603    }
604    let column_len = variables.len();
605    Ok(TsvInnerQueryResults::Solutions {
606        variables,
607        solutions: TsvInnerSolutionsParser {
608            line_reader: reader,
609            column_len,
610        },
611    })
612}
613
614struct TsvInnerSolutionsParser {
615    line_reader: LineReader,
616    column_len: usize,
617}
618
619impl TsvInnerSolutionsParser {
620    #[allow(clippy::unwrap_in_result)]
621    pub fn parse_next(
622        &self,
623        line: &str,
624    ) -> Result<Option<Vec<Option<Term>>>, QueryResultsSyntaxError> {
625        if line.is_empty() {
626            return Ok(None); // EOF
627        }
628        let elements = line
629            .split('\t')
630            .enumerate()
631            .map(|(i, v)| {
632                let v = v.trim();
633                if v.is_empty() {
634                    Ok(None)
635                } else {
636                    Ok(Some(Term::from_str(v).map_err(|e| {
637                        let start_position_char = line
638                            .split('\t')
639                            .take(i)
640                            .map(|c| c.chars().count() + 1)
641                            .sum::<usize>();
642                        let start_position_bytes =
643                            line.split('\t').take(i).map(|c| c.len() + 1).sum::<usize>();
644                        QueryResultsSyntaxError::term(
645                            e,
646                            v.into(),
647                            TextPosition {
648                                line: self.line_reader.line_count - 1,
649                                column: start_position_char.try_into().unwrap(),
650                                offset: self.line_reader.last_line_start
651                                    + u64::try_from(start_position_bytes).unwrap(),
652                            }..TextPosition {
653                                line: self.line_reader.line_count - 1,
654                                column: (start_position_char + v.chars().count())
655                                    .try_into()
656                                    .unwrap(),
657                                offset: self.line_reader.last_line_start
658                                    + u64::try_from(start_position_bytes + v.len()).unwrap(),
659                            },
660                        )
661                    })?))
662                }
663            })
664            .collect::<Result<Vec<_>, QueryResultsSyntaxError>>()?;
665        if elements.len() == self.column_len {
666            Ok(Some(elements))
667        } else if self.column_len == 0 && elements == [None] {
668            Ok(Some(Vec::new())) // Zero columns case
669        } else {
670            Err(QueryResultsSyntaxError::located_message(
671                format!(
672                    "This TSV files has {} columns but we found a row on line {} with {} columns: {}",
673                    self.column_len,
674                    self.line_reader.line_count - 1,
675                    elements.len(),
676                    line
677                ),
678                TextPosition {
679                    line: self.line_reader.line_count - 1,
680                    column: 0,
681                    offset: self.line_reader.last_line_start,
682                }..TextPosition {
683                    line: self.line_reader.line_count - 1,
684                    column: line.chars().count().try_into().unwrap(),
685                    offset: self.line_reader.last_line_end,
686                },
687            ))
688        }
689    }
690}
691
692struct LineReader {
693    buffer_start: usize,
694    buffer_end: usize,
695    line_count: u64,
696    last_line_start: u64,
697    last_line_end: u64,
698}
699
700impl LineReader {
701    fn new() -> Self {
702        Self {
703            buffer_start: 0,
704            buffer_end: 0,
705            line_count: 0,
706            last_line_start: 0,
707            last_line_end: 0,
708        }
709    }
710
711    #[allow(clippy::unwrap_in_result)]
712    fn next_line_from_reader<'a>(
713        &mut self,
714        buffer: &'a mut Vec<u8>,
715        reader: &mut impl Read,
716    ) -> Result<&'a str, QueryResultsParseError> {
717        let line_end = loop {
718            if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) {
719                break self.buffer_start + eol + 1;
720            }
721            if self.buffer_start > 0 {
722                buffer.copy_within(self.buffer_start..self.buffer_end, 0);
723                self.buffer_end -= self.buffer_start;
724                self.buffer_start = 0;
725            }
726            if self.buffer_end + 1024 > buffer.len() {
727                if self.buffer_end + 1024 > MAX_BUFFER_SIZE {
728                    return Err(io::Error::new(
729                        io::ErrorKind::OutOfMemory,
730                        format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"),
731                    )
732                    .into());
733                }
734                buffer.resize(self.buffer_end + 1024, b'\0');
735            }
736            let read = reader.read(&mut buffer[self.buffer_end..])?;
737            if read == 0 {
738                break self.buffer_end;
739            }
740            self.buffer_end += read;
741        };
742        let result = str::from_utf8(&buffer[self.buffer_start..line_end]).map_err(|e| {
743            QueryResultsSyntaxError::msg(format!("Invalid UTF-8 in the TSV file: {e}")).into()
744        });
745        self.line_count += 1;
746        self.last_line_start = self.last_line_end;
747        self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap();
748        self.buffer_start = line_end;
749        result
750    }
751
752    #[cfg(feature = "async-tokio")]
753    #[allow(clippy::unwrap_in_result)]
754    async fn next_line_from_tokio_async_read<'a>(
755        &mut self,
756        buffer: &'a mut Vec<u8>,
757        reader: &mut (impl AsyncRead + Unpin),
758    ) -> Result<&'a str, QueryResultsParseError> {
759        let line_end = loop {
760            if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) {
761                break self.buffer_start + eol + 1;
762            }
763            if self.buffer_start > 0 {
764                buffer.copy_within(self.buffer_start..self.buffer_end, 0);
765                self.buffer_end -= self.buffer_start;
766                self.buffer_start = 0;
767            }
768            if self.buffer_end + 1024 > buffer.len() {
769                if self.buffer_end + 1024 > MAX_BUFFER_SIZE {
770                    return Err(io::Error::new(
771                        io::ErrorKind::OutOfMemory,
772                        format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"),
773                    )
774                    .into());
775                }
776                buffer.resize(self.buffer_end + 1024, b'\0');
777            }
778            let read = reader.read(&mut buffer[self.buffer_end..]).await?;
779            if read == 0 {
780                break self.buffer_end;
781            }
782            self.buffer_end += read;
783        };
784        let result = str::from_utf8(&buffer[self.buffer_start..line_end]).map_err(|e| {
785            QueryResultsSyntaxError::msg(format!("Invalid UTF-8 in the TSV file: {e}")).into()
786        });
787        self.line_count += 1;
788        self.last_line_start = self.last_line_end;
789        self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap();
790        self.buffer_start = line_end;
791        result
792    }
793
794    #[allow(clippy::unwrap_in_result)]
795    fn next_line_from_slice<'a>(
796        &mut self,
797        slice: &'a [u8],
798    ) -> Result<&'a str, QueryResultsSyntaxError> {
799        let line_end = memchr(b'\n', &slice[self.buffer_start..])
800            .map_or_else(|| slice.len(), |eol| self.buffer_start + eol + 1);
801        let result = str::from_utf8(&slice[self.buffer_start..line_end]).map_err(|e| {
802            QueryResultsSyntaxError::msg(format!("Invalid UTF-8 in the TSV file: {e}"))
803        });
804        self.line_count += 1;
805        self.last_line_start = self.last_line_end;
806        self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap();
807        self.buffer_start = line_end;
808        result
809    }
810}
811
812#[cfg(test)]
813#[allow(clippy::panic_in_result_fn)]
814mod tests {
815    use super::*;
816    use std::error::Error;
817
818    fn build_example() -> (Vec<Variable>, Vec<Vec<Option<Term>>>) {
819        (
820            vec![
821                Variable::new_unchecked("x"),
822                Variable::new_unchecked("literal"),
823            ],
824            vec![
825                vec![
826                    Some(NamedNode::new_unchecked("http://example/x").into()),
827                    Some(Literal::new_simple_literal("String").into()),
828                ],
829                vec![
830                    Some(NamedNode::new_unchecked("http://example/x").into()),
831                    Some(Literal::new_simple_literal("String-with-dquote\"").into()),
832                ],
833                vec![
834                    Some(BlankNode::new_unchecked("b0").into()),
835                    Some(Literal::new_simple_literal("Blank node").into()),
836                ],
837                vec![
838                    None,
839                    Some(Literal::new_simple_literal("Missing 'x'").into()),
840                ],
841                vec![None, None],
842                vec![
843                    Some(NamedNode::new_unchecked("http://example/x").into()),
844                    None,
845                ],
846                vec![
847                    Some(BlankNode::new_unchecked("b1").into()),
848                    Some(
849                        Literal::new_language_tagged_literal_unchecked("String-with-lang", "en")
850                            .into(),
851                    ),
852                ],
853                vec![
854                    Some(BlankNode::new_unchecked("b1").into()),
855                    Some(Literal::new_typed_literal("123", xsd::INTEGER).into()),
856                ],
857                vec![
858                    None,
859                    Some(Literal::new_simple_literal("escape,\t\r\n").into()),
860                ],
861            ],
862        )
863    }
864
865    #[test]
866    fn test_csv_serialization() {
867        let (variables, solutions) = build_example();
868        let mut buffer = String::new();
869        let serializer = InnerCsvSolutionsSerializer::start(&mut buffer, variables.clone());
870        for solution in solutions {
871            serializer.write(
872                &mut buffer,
873                variables
874                    .iter()
875                    .zip(&solution)
876                    .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))),
877            );
878        }
879        assert_eq!(buffer, "x,literal\r\nhttp://example/x,String\r\nhttp://example/x,\"String-with-dquote\"\"\"\r\n_:b0,Blank node\r\n,Missing 'x'\r\n,\r\nhttp://example/x,\r\n_:b1,String-with-lang\r\n_:b1,123\r\n,\"escape,\t\r\n\"\r\n");
880    }
881
882    #[test]
883    fn test_tsv_roundtrip() -> Result<(), Box<dyn Error>> {
884        let (variables, solutions) = build_example();
885
886        // Write
887        let mut buffer = String::new();
888        let serializer = InnerTsvSolutionsSerializer::start(&mut buffer, variables.clone());
889        for solution in &solutions {
890            serializer.write(
891                &mut buffer,
892                variables
893                    .iter()
894                    .zip(solution)
895                    .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))),
896            );
897        }
898        assert_eq!(buffer, "?x\t?literal\n<http://example/x>\t\"String\"\n<http://example/x>\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n<http://example/x>\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n");
899
900        // Read
901        if let SliceTsvQueryResultsParserOutput::Solutions {
902            solutions: mut solutions_iter,
903            variables: actual_variables,
904        } = SliceTsvQueryResultsParserOutput::read(buffer.as_bytes())?
905        {
906            assert_eq!(actual_variables.as_slice(), variables.as_slice());
907            let mut rows = Vec::new();
908            while let Some(row) = solutions_iter.parse_next()? {
909                rows.push(row);
910            }
911            assert_eq!(rows, solutions);
912        } else {
913            unreachable!()
914        }
915
916        Ok(())
917    }
918
919    #[test]
920    fn test_bad_tsv() {
921        let mut bad_tsvs = vec![
922            "?",
923            "?p",
924            "?p?o",
925            "?p\n<",
926            "?p\n_",
927            "?p\n_:",
928            "?p\n\"",
929            "?p\n<<",
930            "?p\n1\t2\n",
931            "?p\n\n",
932        ];
933        let a_lot_of_strings = format!("?p\n{}\n", "<".repeat(100_000));
934        bad_tsvs.push(&a_lot_of_strings);
935        for bad_tsv in bad_tsvs {
936            if let Ok(ReaderTsvQueryResultsParserOutput::Solutions { mut solutions, .. }) =
937                ReaderTsvQueryResultsParserOutput::read(bad_tsv.as_bytes())
938            {
939                while let Ok(Some(_)) = solutions.parse_next() {}
940            }
941        }
942    }
943
944    #[test]
945    fn test_no_columns_csv_serialization() {
946        let mut buffer = String::new();
947        let serializer = InnerCsvSolutionsSerializer::start(&mut buffer, Vec::new());
948        serializer.write(&mut buffer, []);
949        assert_eq!(buffer, "\r\n\r\n");
950    }
951
952    #[test]
953    fn test_no_columns_tsv_serialization() {
954        let mut buffer = String::new();
955        let serializer = InnerTsvSolutionsSerializer::start(&mut buffer, Vec::new());
956        serializer.write(&mut buffer, []);
957        assert_eq!(buffer, "\n\n");
958    }
959
960    #[test]
961    fn test_no_columns_tsv_parsing() -> io::Result<()> {
962        if let ReaderTsvQueryResultsParserOutput::Solutions {
963            mut solutions,
964            variables,
965        } = ReaderTsvQueryResultsParserOutput::read(b"\n\n".as_slice())?
966        {
967            assert_eq!(variables, Vec::<Variable>::new());
968            assert_eq!(solutions.parse_next()?, Some(Vec::new()));
969            assert_eq!(solutions.parse_next()?, None);
970        } else {
971            unreachable!()
972        }
973        Ok(())
974    }
975
976    #[test]
977    fn test_no_results_csv_serialization() {
978        let mut buffer = String::new();
979        InnerCsvSolutionsSerializer::start(&mut buffer, vec![Variable::new_unchecked("a")]);
980        assert_eq!(buffer, "a\r\n");
981    }
982
983    #[test]
984    fn test_no_results_tsv_serialization() {
985        let mut buffer = String::new();
986        InnerTsvSolutionsSerializer::start(&mut buffer, vec![Variable::new_unchecked("a")]);
987        assert_eq!(buffer, "?a\n");
988    }
989
990    #[test]
991    fn test_no_results_tsv_parsing() -> io::Result<()> {
992        if let ReaderTsvQueryResultsParserOutput::Solutions {
993            mut solutions,
994            variables,
995        } = ReaderTsvQueryResultsParserOutput::read(b"?a\n".as_slice())?
996        {
997            assert_eq!(variables, vec![Variable::new_unchecked("a")]);
998            assert_eq!(solutions.parse_next()?, None);
999        } else {
1000            unreachable!()
1001        }
1002        Ok(())
1003    }
1004}