1use crate::error::{QueryResultsParseError, QueryResultsSyntaxError, TextPosition};
4use memchr::memchr;
5use oxrdf::vocab::xsd;
6use oxrdf::*;
7use std::io::{self, Read, Write};
8use std::str::{self, FromStr};
9#[cfg(feature = "async-tokio")]
10use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
11
12const MAX_BUFFER_SIZE: usize = 4096 * 4096;
13
14pub fn write_boolean_csv_result<W: Write>(mut writer: W, value: bool) -> io::Result<W> {
15 writer.write_all(if value { b"true" } else { b"false" })?;
16 Ok(writer)
17}
18
19#[cfg(feature = "async-tokio")]
20pub async fn tokio_async_write_boolean_csv_result<W: AsyncWrite + Unpin>(
21 mut writer: W,
22 value: bool,
23) -> io::Result<W> {
24 writer
25 .write_all(if value { b"true" } else { b"false" })
26 .await?;
27 Ok(writer)
28}
29
30pub struct WriterCsvSolutionsSerializer<W: Write> {
31 inner: InnerCsvSolutionsSerializer,
32 writer: W,
33 buffer: String,
34}
35
36impl<W: Write> WriterCsvSolutionsSerializer<W> {
37 pub fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
38 let mut buffer = String::new();
39 let inner = InnerCsvSolutionsSerializer::start(&mut buffer, variables);
40 writer.write_all(buffer.as_bytes())?;
41 buffer.clear();
42 Ok(Self {
43 inner,
44 writer,
45 buffer,
46 })
47 }
48
49 pub fn serialize<'a>(
50 &mut self,
51 solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
52 ) -> io::Result<()> {
53 self.inner.write(&mut self.buffer, solution);
54 self.writer.write_all(self.buffer.as_bytes())?;
55 self.buffer.clear();
56 Ok(())
57 }
58
59 pub fn finish(self) -> W {
60 self.writer
61 }
62}
63
64#[cfg(feature = "async-tokio")]
65pub struct TokioAsyncWriterCsvSolutionsSerializer<W: AsyncWrite + Unpin> {
66 inner: InnerCsvSolutionsSerializer,
67 writer: W,
68 buffer: String,
69}
70
71#[cfg(feature = "async-tokio")]
72impl<W: AsyncWrite + Unpin> TokioAsyncWriterCsvSolutionsSerializer<W> {
73 pub async fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
74 let mut buffer = String::new();
75 let inner = InnerCsvSolutionsSerializer::start(&mut buffer, variables);
76 writer.write_all(buffer.as_bytes()).await?;
77 buffer.clear();
78 Ok(Self {
79 inner,
80 writer,
81 buffer,
82 })
83 }
84
85 pub async fn serialize<'a>(
86 &mut self,
87 solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
88 ) -> io::Result<()> {
89 self.inner.write(&mut self.buffer, solution);
90 self.writer.write_all(self.buffer.as_bytes()).await?;
91 self.buffer.clear();
92 Ok(())
93 }
94
95 pub fn finish(self) -> W {
96 self.writer
97 }
98}
99
100struct InnerCsvSolutionsSerializer {
101 variables: Vec<Variable>,
102}
103
104impl InnerCsvSolutionsSerializer {
105 fn start(output: &mut String, variables: Vec<Variable>) -> Self {
106 let mut start_vars = true;
107 for variable in &variables {
108 if start_vars {
109 start_vars = false;
110 } else {
111 output.push(',');
112 }
113 output.push_str(variable.as_str());
114 }
115 output.push_str("\r\n");
116 Self { variables }
117 }
118
119 fn write<'a>(
120 &self,
121 output: &mut String,
122 solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
123 ) {
124 let mut values = vec![None; self.variables.len()];
125 for (variable, value) in solution {
126 if let Some(position) = self.variables.iter().position(|v| *v == variable) {
127 values[position] = Some(value);
128 }
129 }
130 let mut start_binding = true;
131 for value in values {
132 if start_binding {
133 start_binding = false;
134 } else {
135 output.push(',');
136 }
137 if let Some(value) = value {
138 write_csv_term(output, value);
139 }
140 }
141 output.push_str("\r\n");
142 }
143}
144
145fn write_csv_term<'a>(output: &mut String, term: impl Into<TermRef<'a>>) {
146 match term.into() {
147 TermRef::NamedNode(uri) => output.push_str(uri.as_str()),
148 TermRef::BlankNode(bnode) => {
149 output.push_str("_:");
150 output.push_str(bnode.as_str())
151 }
152 TermRef::Literal(literal) => write_escaped_csv_string(output, literal.value()),
153 #[cfg(feature = "rdf-star")]
154 TermRef::Triple(triple) => {
155 write_csv_term(output, &triple.subject);
156 output.push(' ');
157 write_csv_term(output, &triple.predicate);
158 output.push(' ');
159 write_csv_term(output, &triple.object)
160 }
161 }
162}
163
164fn write_escaped_csv_string(output: &mut String, s: &str) {
165 if s.bytes().any(|c| matches!(c, b'"' | b',' | b'\n' | b'\r')) {
166 output.push('"');
167 for c in s.chars() {
168 if c == '"' {
169 output.push('"');
170 output.push('"');
171 } else {
172 output.push(c)
173 };
174 }
175 output.push('"');
176 } else {
177 output.push_str(s)
178 }
179}
180
181pub struct WriterTsvSolutionsSerializer<W: Write> {
182 inner: InnerTsvSolutionsSerializer,
183 writer: W,
184 buffer: String,
185}
186
187impl<W: Write> WriterTsvSolutionsSerializer<W> {
188 pub fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
189 let mut buffer = String::new();
190 let inner = InnerTsvSolutionsSerializer::start(&mut buffer, variables);
191 writer.write_all(buffer.as_bytes())?;
192 buffer.clear();
193 Ok(Self {
194 inner,
195 writer,
196 buffer,
197 })
198 }
199
200 pub fn serialize<'a>(
201 &mut self,
202 solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
203 ) -> io::Result<()> {
204 self.inner.write(&mut self.buffer, solution);
205 self.writer.write_all(self.buffer.as_bytes())?;
206 self.buffer.clear();
207 Ok(())
208 }
209
210 pub fn finish(self) -> W {
211 self.writer
212 }
213}
214
215#[cfg(feature = "async-tokio")]
216pub struct TokioAsyncWriterTsvSolutionsSerializer<W: AsyncWrite + Unpin> {
217 inner: InnerTsvSolutionsSerializer,
218 writer: W,
219 buffer: String,
220}
221
222#[cfg(feature = "async-tokio")]
223impl<W: AsyncWrite + Unpin> TokioAsyncWriterTsvSolutionsSerializer<W> {
224 pub async fn start(mut writer: W, variables: Vec<Variable>) -> io::Result<Self> {
225 let mut buffer = String::new();
226 let inner = InnerTsvSolutionsSerializer::start(&mut buffer, variables);
227 writer.write_all(buffer.as_bytes()).await?;
228 buffer.clear();
229 Ok(Self {
230 inner,
231 writer,
232 buffer,
233 })
234 }
235
236 pub async fn serialize<'a>(
237 &mut self,
238 solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
239 ) -> io::Result<()> {
240 self.inner.write(&mut self.buffer, solution);
241 self.writer.write_all(self.buffer.as_bytes()).await?;
242 self.buffer.clear();
243 Ok(())
244 }
245
246 pub fn finish(self) -> W {
247 self.writer
248 }
249}
250
251struct InnerTsvSolutionsSerializer {
252 variables: Vec<Variable>,
253}
254
255impl InnerTsvSolutionsSerializer {
256 fn start(output: &mut String, variables: Vec<Variable>) -> Self {
257 let mut start_vars = true;
258 for variable in &variables {
259 if start_vars {
260 start_vars = false;
261 } else {
262 output.push('\t');
263 }
264 output.push('?');
265 output.push_str(variable.as_str());
266 }
267 output.push('\n');
268 Self { variables }
269 }
270
271 fn write<'a>(
272 &self,
273 output: &mut String,
274 solution: impl IntoIterator<Item = (VariableRef<'a>, TermRef<'a>)>,
275 ) {
276 let mut values = vec![None; self.variables.len()];
277 for (variable, value) in solution {
278 if let Some(position) = self.variables.iter().position(|v| *v == variable) {
279 values[position] = Some(value);
280 }
281 }
282 let mut start_binding = true;
283 for value in values {
284 if start_binding {
285 start_binding = false;
286 } else {
287 output.push('\t');
288 }
289 if let Some(value) = value {
290 write_tsv_term(output, value);
291 }
292 }
293 output.push('\n');
294 }
295}
296
297fn write_tsv_term<'a>(output: &mut String, term: impl Into<TermRef<'a>>) {
298 match term.into() {
299 TermRef::NamedNode(node) => {
300 output.push('<');
301 output.push_str(node.as_str());
302 output.push('>');
303 }
304 TermRef::BlankNode(node) => {
305 output.push_str("_:");
306 output.push_str(node.as_str());
307 }
308 TermRef::Literal(literal) => {
309 let value = literal.value();
310 if let Some(language) = literal.language() {
311 write_tsv_quoted_str(output, value);
312 output.push('@');
313 output.push_str(language);
314 } else {
315 match literal.datatype() {
316 xsd::BOOLEAN if is_turtle_boolean(value) => output.push_str(value),
317 xsd::INTEGER if is_turtle_integer(value) => output.push_str(value),
318 xsd::DECIMAL if is_turtle_decimal(value) => output.push_str(value),
319 xsd::DOUBLE if is_turtle_double(value) => output.push_str(value),
320 xsd::STRING => write_tsv_quoted_str(output, value),
321 datatype => {
322 write_tsv_quoted_str(output, value);
323 output.push_str("^^");
324 write_tsv_term(output, datatype);
325 }
326 }
327 }
328 }
329 #[cfg(feature = "rdf-star")]
330 TermRef::Triple(triple) => {
331 output.push_str("<< ");
332 write_tsv_term(output, &triple.subject);
333 output.push(' ');
334 write_tsv_term(output, &triple.predicate);
335 output.push(' ');
336 write_tsv_term(output, &triple.object);
337 output.push_str(" >>");
338 }
339 }
340}
341
342fn write_tsv_quoted_str(output: &mut String, string: &str) {
343 output.push('"');
344 for c in string.chars() {
345 match c {
346 '\t' => output.push_str("\\t"),
347 '\n' => output.push_str("\\n"),
348 '\r' => output.push_str("\\r"),
349 '"' => output.push_str("\\\""),
350 '\\' => output.push_str("\\\\"),
351 _ => output.push(c),
352 };
353 }
354 output.push('"');
355}
356
357fn is_turtle_boolean(value: &str) -> bool {
358 matches!(value, "true" | "false")
359}
360
361fn is_turtle_integer(value: &str) -> bool {
362 let mut value = value.as_bytes();
364 if let Some(v) = value.strip_prefix(b"+") {
365 value = v;
366 } else if let Some(v) = value.strip_prefix(b"-") {
367 value = v;
368 }
369 !value.is_empty() && value.iter().all(u8::is_ascii_digit)
370}
371
372fn is_turtle_decimal(value: &str) -> bool {
373 let mut value = value.as_bytes();
375 if let Some(v) = value.strip_prefix(b"+") {
376 value = v;
377 } else if let Some(v) = value.strip_prefix(b"-") {
378 value = v;
379 }
380 while value.first().is_some_and(u8::is_ascii_digit) {
381 value = &value[1..];
382 }
383 let Some(value) = value.strip_prefix(b".") else {
384 return false;
385 };
386 !value.is_empty() && value.iter().all(u8::is_ascii_digit)
387}
388
389fn is_turtle_double(value: &str) -> bool {
390 let mut value = value.as_bytes();
393 if let Some(v) = value.strip_prefix(b"+") {
394 value = v;
395 } else if let Some(v) = value.strip_prefix(b"-") {
396 value = v;
397 }
398 let mut with_before = false;
399 while value.first().is_some_and(u8::is_ascii_digit) {
400 value = &value[1..];
401 with_before = true;
402 }
403 let mut with_after = false;
404 if let Some(v) = value.strip_prefix(b".") {
405 value = v;
406 while value.first().is_some_and(u8::is_ascii_digit) {
407 value = &value[1..];
408 with_after = true;
409 }
410 }
411 if let Some(v) = value.strip_prefix(b"e") {
412 value = v;
413 } else if let Some(v) = value.strip_prefix(b"E") {
414 value = v;
415 } else {
416 return false;
417 }
418 if let Some(v) = value.strip_prefix(b"+") {
419 value = v;
420 } else if let Some(v) = value.strip_prefix(b"-") {
421 value = v;
422 }
423 (with_before || with_after) && !value.is_empty() && value.iter().all(u8::is_ascii_digit)
424}
425
426pub enum ReaderTsvQueryResultsParserOutput<R: Read> {
427 Solutions {
428 variables: Vec<Variable>,
429 solutions: ReaderTsvSolutionsParser<R>,
430 },
431 Boolean(bool),
432}
433
434impl<R: Read> ReaderTsvQueryResultsParserOutput<R> {
435 pub fn read(mut reader: R) -> Result<Self, QueryResultsParseError> {
436 let mut line_reader = LineReader::new();
437 let mut buffer = Vec::new();
438 let line = line_reader.next_line_from_reader(&mut buffer, &mut reader)?;
439 Ok(match inner_read_first_line(line_reader, line)? {
440 TsvInnerQueryResults::Solutions {
441 variables,
442 solutions,
443 } => Self::Solutions {
444 variables,
445 solutions: ReaderTsvSolutionsParser {
446 reader,
447 inner: solutions,
448 buffer,
449 },
450 },
451 TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
452 })
453 }
454}
455
456pub struct ReaderTsvSolutionsParser<R: Read> {
457 reader: R,
458 inner: TsvInnerSolutionsParser,
459 buffer: Vec<u8>,
460}
461
462impl<R: Read> ReaderTsvSolutionsParser<R> {
463 pub fn parse_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
464 let line = self
465 .inner
466 .line_reader
467 .next_line_from_reader(&mut self.buffer, &mut self.reader)?;
468 Ok(self.inner.parse_next(line)?)
469 }
470}
471
472#[cfg(feature = "async-tokio")]
473pub enum TokioAsyncReaderTsvQueryResultsParserOutput<R: AsyncRead + Unpin> {
474 Solutions {
475 variables: Vec<Variable>,
476 solutions: TokioAsyncReaderTsvSolutionsParser<R>,
477 },
478 Boolean(bool),
479}
480
481#[cfg(feature = "async-tokio")]
482impl<R: AsyncRead + Unpin> TokioAsyncReaderTsvQueryResultsParserOutput<R> {
483 pub async fn read(mut reader: R) -> Result<Self, QueryResultsParseError> {
484 let mut line_reader = LineReader::new();
485 let mut buffer = Vec::new();
486 let line = line_reader
487 .next_line_from_tokio_async_read(&mut buffer, &mut reader)
488 .await?;
489 Ok(match inner_read_first_line(line_reader, line)? {
490 TsvInnerQueryResults::Solutions {
491 variables,
492 solutions,
493 } => Self::Solutions {
494 variables,
495 solutions: TokioAsyncReaderTsvSolutionsParser {
496 reader,
497 inner: solutions,
498 buffer,
499 },
500 },
501 TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
502 })
503 }
504}
505
506#[cfg(feature = "async-tokio")]
507pub struct TokioAsyncReaderTsvSolutionsParser<R: AsyncRead + Unpin> {
508 reader: R,
509 inner: TsvInnerSolutionsParser,
510 buffer: Vec<u8>,
511}
512
513#[cfg(feature = "async-tokio")]
514impl<R: AsyncRead + Unpin> TokioAsyncReaderTsvSolutionsParser<R> {
515 pub async fn parse_next(
516 &mut self,
517 ) -> Result<Option<Vec<Option<Term>>>, QueryResultsParseError> {
518 let line = self
519 .inner
520 .line_reader
521 .next_line_from_tokio_async_read(&mut self.buffer, &mut self.reader)
522 .await?;
523 Ok(self.inner.parse_next(line)?)
524 }
525}
526
527pub enum SliceTsvQueryResultsParserOutput<'a> {
528 Solutions {
529 variables: Vec<Variable>,
530 solutions: SliceTsvSolutionsParser<'a>,
531 },
532 Boolean(bool),
533}
534
535impl<'a> SliceTsvQueryResultsParserOutput<'a> {
536 pub fn read(slice: &'a [u8]) -> Result<Self, QueryResultsSyntaxError> {
537 let mut reader = LineReader::new();
538 let line = reader.next_line_from_slice(slice)?;
539 Ok(match inner_read_first_line(reader, line)? {
540 TsvInnerQueryResults::Solutions {
541 variables,
542 solutions,
543 } => Self::Solutions {
544 variables,
545 solutions: SliceTsvSolutionsParser {
546 slice,
547 inner: solutions,
548 },
549 },
550 TsvInnerQueryResults::Boolean(value) => Self::Boolean(value),
551 })
552 }
553}
554
555pub struct SliceTsvSolutionsParser<'a> {
556 slice: &'a [u8],
557 inner: TsvInnerSolutionsParser,
558}
559
560impl SliceTsvSolutionsParser<'_> {
561 pub fn parse_next(&mut self) -> Result<Option<Vec<Option<Term>>>, QueryResultsSyntaxError> {
562 let line = self.inner.line_reader.next_line_from_slice(self.slice)?;
563 self.inner.parse_next(line)
564 }
565}
566
567enum TsvInnerQueryResults {
568 Solutions {
569 variables: Vec<Variable>,
570 solutions: TsvInnerSolutionsParser,
571 },
572 Boolean(bool),
573}
574
575fn inner_read_first_line(
576 reader: LineReader,
577 line: &str,
578) -> Result<TsvInnerQueryResults, QueryResultsSyntaxError> {
579 let line = line.trim_matches(|c| matches!(c, ' ' | '\r' | '\n'));
580 if line.eq_ignore_ascii_case("true") {
581 return Ok(TsvInnerQueryResults::Boolean(true));
582 }
583 if line.eq_ignore_ascii_case("false") {
584 return Ok(TsvInnerQueryResults::Boolean(false));
585 }
586 let mut variables = Vec::new();
587 if !line.is_empty() {
588 for v in line.split('\t') {
589 let v = v.trim();
590 if v.is_empty() {
591 return Err(QueryResultsSyntaxError::msg("Empty column on the first row. The first row should be a list of variables like ?foo or $bar"));
592 }
593 let variable = Variable::from_str(v).map_err(|e| {
594 QueryResultsSyntaxError::msg(format!("Invalid variable declaration '{v}': {e}"))
595 })?;
596 if variables.contains(&variable) {
597 return Err(QueryResultsSyntaxError::msg(format!(
598 "The variable {variable} is declared twice"
599 )));
600 }
601 variables.push(variable);
602 }
603 }
604 let column_len = variables.len();
605 Ok(TsvInnerQueryResults::Solutions {
606 variables,
607 solutions: TsvInnerSolutionsParser {
608 line_reader: reader,
609 column_len,
610 },
611 })
612}
613
614struct TsvInnerSolutionsParser {
615 line_reader: LineReader,
616 column_len: usize,
617}
618
619impl TsvInnerSolutionsParser {
620 #[allow(clippy::unwrap_in_result)]
621 pub fn parse_next(
622 &self,
623 line: &str,
624 ) -> Result<Option<Vec<Option<Term>>>, QueryResultsSyntaxError> {
625 if line.is_empty() {
626 return Ok(None); }
628 let elements = line
629 .split('\t')
630 .enumerate()
631 .map(|(i, v)| {
632 let v = v.trim();
633 if v.is_empty() {
634 Ok(None)
635 } else {
636 Ok(Some(Term::from_str(v).map_err(|e| {
637 let start_position_char = line
638 .split('\t')
639 .take(i)
640 .map(|c| c.chars().count() + 1)
641 .sum::<usize>();
642 let start_position_bytes =
643 line.split('\t').take(i).map(|c| c.len() + 1).sum::<usize>();
644 QueryResultsSyntaxError::term(
645 e,
646 v.into(),
647 TextPosition {
648 line: self.line_reader.line_count - 1,
649 column: start_position_char.try_into().unwrap(),
650 offset: self.line_reader.last_line_start
651 + u64::try_from(start_position_bytes).unwrap(),
652 }..TextPosition {
653 line: self.line_reader.line_count - 1,
654 column: (start_position_char + v.chars().count())
655 .try_into()
656 .unwrap(),
657 offset: self.line_reader.last_line_start
658 + u64::try_from(start_position_bytes + v.len()).unwrap(),
659 },
660 )
661 })?))
662 }
663 })
664 .collect::<Result<Vec<_>, QueryResultsSyntaxError>>()?;
665 if elements.len() == self.column_len {
666 Ok(Some(elements))
667 } else if self.column_len == 0 && elements == [None] {
668 Ok(Some(Vec::new())) } else {
670 Err(QueryResultsSyntaxError::located_message(
671 format!(
672 "This TSV files has {} columns but we found a row on line {} with {} columns: {}",
673 self.column_len,
674 self.line_reader.line_count - 1,
675 elements.len(),
676 line
677 ),
678 TextPosition {
679 line: self.line_reader.line_count - 1,
680 column: 0,
681 offset: self.line_reader.last_line_start,
682 }..TextPosition {
683 line: self.line_reader.line_count - 1,
684 column: line.chars().count().try_into().unwrap(),
685 offset: self.line_reader.last_line_end,
686 },
687 ))
688 }
689 }
690}
691
692struct LineReader {
693 buffer_start: usize,
694 buffer_end: usize,
695 line_count: u64,
696 last_line_start: u64,
697 last_line_end: u64,
698}
699
700impl LineReader {
701 fn new() -> Self {
702 Self {
703 buffer_start: 0,
704 buffer_end: 0,
705 line_count: 0,
706 last_line_start: 0,
707 last_line_end: 0,
708 }
709 }
710
711 #[allow(clippy::unwrap_in_result)]
712 fn next_line_from_reader<'a>(
713 &mut self,
714 buffer: &'a mut Vec<u8>,
715 reader: &mut impl Read,
716 ) -> Result<&'a str, QueryResultsParseError> {
717 let line_end = loop {
718 if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) {
719 break self.buffer_start + eol + 1;
720 }
721 if self.buffer_start > 0 {
722 buffer.copy_within(self.buffer_start..self.buffer_end, 0);
723 self.buffer_end -= self.buffer_start;
724 self.buffer_start = 0;
725 }
726 if self.buffer_end + 1024 > buffer.len() {
727 if self.buffer_end + 1024 > MAX_BUFFER_SIZE {
728 return Err(io::Error::new(
729 io::ErrorKind::OutOfMemory,
730 format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"),
731 )
732 .into());
733 }
734 buffer.resize(self.buffer_end + 1024, b'\0');
735 }
736 let read = reader.read(&mut buffer[self.buffer_end..])?;
737 if read == 0 {
738 break self.buffer_end;
739 }
740 self.buffer_end += read;
741 };
742 let result = str::from_utf8(&buffer[self.buffer_start..line_end]).map_err(|e| {
743 QueryResultsSyntaxError::msg(format!("Invalid UTF-8 in the TSV file: {e}")).into()
744 });
745 self.line_count += 1;
746 self.last_line_start = self.last_line_end;
747 self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap();
748 self.buffer_start = line_end;
749 result
750 }
751
752 #[cfg(feature = "async-tokio")]
753 #[allow(clippy::unwrap_in_result)]
754 async fn next_line_from_tokio_async_read<'a>(
755 &mut self,
756 buffer: &'a mut Vec<u8>,
757 reader: &mut (impl AsyncRead + Unpin),
758 ) -> Result<&'a str, QueryResultsParseError> {
759 let line_end = loop {
760 if let Some(eol) = memchr(b'\n', &buffer[self.buffer_start..self.buffer_end]) {
761 break self.buffer_start + eol + 1;
762 }
763 if self.buffer_start > 0 {
764 buffer.copy_within(self.buffer_start..self.buffer_end, 0);
765 self.buffer_end -= self.buffer_start;
766 self.buffer_start = 0;
767 }
768 if self.buffer_end + 1024 > buffer.len() {
769 if self.buffer_end + 1024 > MAX_BUFFER_SIZE {
770 return Err(io::Error::new(
771 io::ErrorKind::OutOfMemory,
772 format!("Reached the buffer maximal size of {MAX_BUFFER_SIZE}"),
773 )
774 .into());
775 }
776 buffer.resize(self.buffer_end + 1024, b'\0');
777 }
778 let read = reader.read(&mut buffer[self.buffer_end..]).await?;
779 if read == 0 {
780 break self.buffer_end;
781 }
782 self.buffer_end += read;
783 };
784 let result = str::from_utf8(&buffer[self.buffer_start..line_end]).map_err(|e| {
785 QueryResultsSyntaxError::msg(format!("Invalid UTF-8 in the TSV file: {e}")).into()
786 });
787 self.line_count += 1;
788 self.last_line_start = self.last_line_end;
789 self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap();
790 self.buffer_start = line_end;
791 result
792 }
793
794 #[allow(clippy::unwrap_in_result)]
795 fn next_line_from_slice<'a>(
796 &mut self,
797 slice: &'a [u8],
798 ) -> Result<&'a str, QueryResultsSyntaxError> {
799 let line_end = memchr(b'\n', &slice[self.buffer_start..])
800 .map_or_else(|| slice.len(), |eol| self.buffer_start + eol + 1);
801 let result = str::from_utf8(&slice[self.buffer_start..line_end]).map_err(|e| {
802 QueryResultsSyntaxError::msg(format!("Invalid UTF-8 in the TSV file: {e}"))
803 });
804 self.line_count += 1;
805 self.last_line_start = self.last_line_end;
806 self.last_line_end += u64::try_from(line_end - self.buffer_start).unwrap();
807 self.buffer_start = line_end;
808 result
809 }
810}
811
812#[cfg(test)]
813#[allow(clippy::panic_in_result_fn)]
814mod tests {
815 use super::*;
816 use std::error::Error;
817
818 fn build_example() -> (Vec<Variable>, Vec<Vec<Option<Term>>>) {
819 (
820 vec![
821 Variable::new_unchecked("x"),
822 Variable::new_unchecked("literal"),
823 ],
824 vec![
825 vec![
826 Some(NamedNode::new_unchecked("http://example/x").into()),
827 Some(Literal::new_simple_literal("String").into()),
828 ],
829 vec![
830 Some(NamedNode::new_unchecked("http://example/x").into()),
831 Some(Literal::new_simple_literal("String-with-dquote\"").into()),
832 ],
833 vec![
834 Some(BlankNode::new_unchecked("b0").into()),
835 Some(Literal::new_simple_literal("Blank node").into()),
836 ],
837 vec![
838 None,
839 Some(Literal::new_simple_literal("Missing 'x'").into()),
840 ],
841 vec![None, None],
842 vec![
843 Some(NamedNode::new_unchecked("http://example/x").into()),
844 None,
845 ],
846 vec![
847 Some(BlankNode::new_unchecked("b1").into()),
848 Some(
849 Literal::new_language_tagged_literal_unchecked("String-with-lang", "en")
850 .into(),
851 ),
852 ],
853 vec![
854 Some(BlankNode::new_unchecked("b1").into()),
855 Some(Literal::new_typed_literal("123", xsd::INTEGER).into()),
856 ],
857 vec![
858 None,
859 Some(Literal::new_simple_literal("escape,\t\r\n").into()),
860 ],
861 ],
862 )
863 }
864
865 #[test]
866 fn test_csv_serialization() {
867 let (variables, solutions) = build_example();
868 let mut buffer = String::new();
869 let serializer = InnerCsvSolutionsSerializer::start(&mut buffer, variables.clone());
870 for solution in solutions {
871 serializer.write(
872 &mut buffer,
873 variables
874 .iter()
875 .zip(&solution)
876 .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))),
877 );
878 }
879 assert_eq!(buffer, "x,literal\r\nhttp://example/x,String\r\nhttp://example/x,\"String-with-dquote\"\"\"\r\n_:b0,Blank node\r\n,Missing 'x'\r\n,\r\nhttp://example/x,\r\n_:b1,String-with-lang\r\n_:b1,123\r\n,\"escape,\t\r\n\"\r\n");
880 }
881
882 #[test]
883 fn test_tsv_roundtrip() -> Result<(), Box<dyn Error>> {
884 let (variables, solutions) = build_example();
885
886 let mut buffer = String::new();
888 let serializer = InnerTsvSolutionsSerializer::start(&mut buffer, variables.clone());
889 for solution in &solutions {
890 serializer.write(
891 &mut buffer,
892 variables
893 .iter()
894 .zip(solution)
895 .filter_map(|(v, s)| s.as_ref().map(|s| (v.as_ref(), s.as_ref()))),
896 );
897 }
898 assert_eq!(buffer, "?x\t?literal\n<http://example/x>\t\"String\"\n<http://example/x>\t\"String-with-dquote\\\"\"\n_:b0\t\"Blank node\"\n\t\"Missing 'x'\"\n\t\n<http://example/x>\t\n_:b1\t\"String-with-lang\"@en\n_:b1\t123\n\t\"escape,\\t\\r\\n\"\n");
899
900 if let SliceTsvQueryResultsParserOutput::Solutions {
902 solutions: mut solutions_iter,
903 variables: actual_variables,
904 } = SliceTsvQueryResultsParserOutput::read(buffer.as_bytes())?
905 {
906 assert_eq!(actual_variables.as_slice(), variables.as_slice());
907 let mut rows = Vec::new();
908 while let Some(row) = solutions_iter.parse_next()? {
909 rows.push(row);
910 }
911 assert_eq!(rows, solutions);
912 } else {
913 unreachable!()
914 }
915
916 Ok(())
917 }
918
919 #[test]
920 fn test_bad_tsv() {
921 let mut bad_tsvs = vec![
922 "?",
923 "?p",
924 "?p?o",
925 "?p\n<",
926 "?p\n_",
927 "?p\n_:",
928 "?p\n\"",
929 "?p\n<<",
930 "?p\n1\t2\n",
931 "?p\n\n",
932 ];
933 let a_lot_of_strings = format!("?p\n{}\n", "<".repeat(100_000));
934 bad_tsvs.push(&a_lot_of_strings);
935 for bad_tsv in bad_tsvs {
936 if let Ok(ReaderTsvQueryResultsParserOutput::Solutions { mut solutions, .. }) =
937 ReaderTsvQueryResultsParserOutput::read(bad_tsv.as_bytes())
938 {
939 while let Ok(Some(_)) = solutions.parse_next() {}
940 }
941 }
942 }
943
944 #[test]
945 fn test_no_columns_csv_serialization() {
946 let mut buffer = String::new();
947 let serializer = InnerCsvSolutionsSerializer::start(&mut buffer, Vec::new());
948 serializer.write(&mut buffer, []);
949 assert_eq!(buffer, "\r\n\r\n");
950 }
951
952 #[test]
953 fn test_no_columns_tsv_serialization() {
954 let mut buffer = String::new();
955 let serializer = InnerTsvSolutionsSerializer::start(&mut buffer, Vec::new());
956 serializer.write(&mut buffer, []);
957 assert_eq!(buffer, "\n\n");
958 }
959
960 #[test]
961 fn test_no_columns_tsv_parsing() -> io::Result<()> {
962 if let ReaderTsvQueryResultsParserOutput::Solutions {
963 mut solutions,
964 variables,
965 } = ReaderTsvQueryResultsParserOutput::read(b"\n\n".as_slice())?
966 {
967 assert_eq!(variables, Vec::<Variable>::new());
968 assert_eq!(solutions.parse_next()?, Some(Vec::new()));
969 assert_eq!(solutions.parse_next()?, None);
970 } else {
971 unreachable!()
972 }
973 Ok(())
974 }
975
976 #[test]
977 fn test_no_results_csv_serialization() {
978 let mut buffer = String::new();
979 InnerCsvSolutionsSerializer::start(&mut buffer, vec![Variable::new_unchecked("a")]);
980 assert_eq!(buffer, "a\r\n");
981 }
982
983 #[test]
984 fn test_no_results_tsv_serialization() {
985 let mut buffer = String::new();
986 InnerTsvSolutionsSerializer::start(&mut buffer, vec![Variable::new_unchecked("a")]);
987 assert_eq!(buffer, "?a\n");
988 }
989
990 #[test]
991 fn test_no_results_tsv_parsing() -> io::Result<()> {
992 if let ReaderTsvQueryResultsParserOutput::Solutions {
993 mut solutions,
994 variables,
995 } = ReaderTsvQueryResultsParserOutput::read(b"?a\n".as_slice())?
996 {
997 assert_eq!(variables, vec![Variable::new_unchecked("a")]);
998 assert_eq!(solutions.parse_next()?, None);
999 } else {
1000 unreachable!()
1001 }
1002 Ok(())
1003 }
1004}