oxttl/
chunker.rs

1// Adapted from: https://github.com/pola-rs/polars/blob/main/crates/polars-io/src/csv/read/utils.rs#L10
2// and https://github.com/pola-rs/polars/blob/main/crates/polars-io/src/csv/read/parser.rs#L124
3// and https://github.com/pola-rs/polars/blob/main/crates/polars-io/src/csv/read/parser.rs#L310
4// Which has the following license:
5// Copyright (c) 2020 Ritchie Vink
6// Some portions Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
7//
8// Permission is hereby granted, free of charge, to any person obtaining a copy
9// of this software and associated documentation files (the "Software"), to deal
10// in the Software without restriction, including without limitation the rights
11// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12// copies of the Software, and to permit persons to whom the Software is
13// furnished to do so, subject to the following conditions:
14//
15// The above copyright notice and this permission notice shall be included in all
16// copies or substantial portions of the Software.
17//
18// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24// SOFTWARE.
25
26use crate::TurtleParser;
27use memchr::memchr;
28
29// Given a number of desired chunks, corresponding to threads, find offsets that break the file into chunks that can be read in parallel.
30// For NTriples, this can be done simply on newlines.
31pub fn get_ntriples_file_chunks(bytes: &[u8], n_chunks: usize) -> Vec<(usize, usize)> {
32    let mut last_pos = 0;
33    let total_len = bytes.len();
34    let chunk_size = total_len / n_chunks;
35    let mut offsets = Vec::with_capacity(n_chunks);
36    for _ in 0..n_chunks {
37        let search_pos = last_pos + chunk_size;
38
39        if search_pos >= bytes.len() {
40            break;
41        }
42
43        let Some(pos) = next_newline_position(&bytes[search_pos..]) else {
44            // We keep the valid chunks we found, and add (outside the loop) the rest of the bytes as a chunk.
45            break;
46        };
47        let end_pos = search_pos + pos;
48        offsets.push((last_pos, end_pos));
49        last_pos = end_pos;
50    }
51    if last_pos < total_len {
52        offsets.push((last_pos, total_len));
53    }
54    offsets
55}
56
57// Finds the first newline in input that is preceded by something that is not an escape char.
58// Such newlines split the triples in the NTriples format.
59fn next_newline_position(input: &[u8]) -> Option<usize> {
60    Some(memchr(b'\n', input)? + 1)
61}
62
63// Given a number of desired chunks, corresponding to threads find offsets that break the file into chunks that can be read in parallel.
64// A Turtle parser will be used to check (heuristically) if an offset starting with a period actually splits the file properly.
65// The parser should not be reused, hence it is passed by value.
66pub fn get_turtle_file_chunks(
67    bytes: &[u8],
68    n_chunks: usize,
69    parser: &TurtleParser,
70) -> Vec<(usize, usize)> {
71    let mut last_pos = 0;
72    let total_len = bytes.len();
73    let chunk_size = total_len / n_chunks;
74    let mut offsets = Vec::with_capacity(n_chunks);
75    for _ in 0..n_chunks {
76        let search_pos = last_pos + chunk_size;
77
78        if search_pos >= bytes.len() {
79            break;
80        }
81
82        let Some(pos) = next_terminating_char(parser, &bytes[search_pos..]) else {
83            // We keep the valid chunks we found,
84            // and add (outside the loop) the rest of the bytes as a chunk.
85            break;
86        };
87        let end_pos = search_pos + pos;
88        offsets.push((last_pos, end_pos));
89        last_pos = end_pos;
90    }
91    if last_pos < total_len {
92        offsets.push((last_pos, total_len));
93    }
94    offsets
95}
96
97// Heuristically, we assume that a period is terminating (a triple) if we can start immediately after it and parse three triples.
98// Parser should not be reused, hence it is passed by value.
99// If no such period can be found, looking at 1000 consecutive periods, we give up.
100// Important to keep this number this high, as some TTL files can have a lot of periods.
101fn next_terminating_char(parser: &TurtleParser, mut input: &[u8]) -> Option<usize> {
102    fn accept(parser: TurtleParser, input: &[u8]) -> bool {
103        let mut f = parser.for_slice(input);
104        for _ in 0..3 {
105            if let Some(r) = f.next() {
106                if r.is_err() {
107                    return false;
108                }
109            } else {
110                return false;
111            }
112        }
113        true
114    }
115
116    let mut total_pos = 0;
117    for _ in 0..1_000 {
118        let pos = memchr(b'.', input)? + 1;
119        if pos >= input.len() {
120            return None;
121        }
122        let new_input = &input[pos..];
123        let p = parser.clone();
124        let accepted = accept(p, new_input);
125        if accepted {
126            return Some(total_pos + pos);
127        }
128        input = &input[pos + 1..];
129        total_pos += pos + 1;
130    }
131    None
132}