oxigraph/storage/
mod.rs

1use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef};
2pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError};
3use crate::storage::memory::{
4    MemoryDecodingGraphIterator, MemoryStorage, MemoryStorageBulkLoader, MemoryStorageReader,
5    MemoryStorageWriter, QuadIterator,
6};
7use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup};
8#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
9use crate::storage::rocksdb::{
10    RocksDbChainedDecodingQuadIterator, RocksDbDecodingGraphIterator, RocksDbStorage,
11    RocksDbStorageBulkLoader, RocksDbStorageReader, RocksDbStorageWriter,
12};
13use oxrdf::Quad;
14use std::error::Error;
15#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
16use std::path::Path;
17
18#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
19mod binary_encoder;
20mod error;
21mod memory;
22pub mod numeric_encoder;
23#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
24mod rocksdb;
25#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
26mod rocksdb_wrapper;
27pub mod small_string;
28
29/// Low level storage primitives
30#[derive(Clone)]
31pub struct Storage {
32    kind: StorageKind,
33}
34
35#[derive(Clone)]
36enum StorageKind {
37    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
38    RocksDb(RocksDbStorage),
39    Memory(MemoryStorage),
40}
41
42impl Storage {
43    #[allow(clippy::unnecessary_wraps)]
44    pub fn new() -> Result<Self, StorageError> {
45        Ok(Self {
46            kind: StorageKind::Memory(MemoryStorage::new()),
47        })
48    }
49
50    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
51    pub fn open(path: &Path) -> Result<Self, StorageError> {
52        Ok(Self {
53            kind: StorageKind::RocksDb(RocksDbStorage::open(path)?),
54        })
55    }
56
57    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
58    pub fn open_read_only(path: &Path) -> Result<Self, StorageError> {
59        Ok(Self {
60            kind: StorageKind::RocksDb(RocksDbStorage::open_read_only(path)?),
61        })
62    }
63
64    pub fn snapshot(&self) -> StorageReader {
65        match &self.kind {
66            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
67            StorageKind::RocksDb(storage) => StorageReader {
68                kind: StorageReaderKind::RocksDb(storage.snapshot()),
69            },
70            StorageKind::Memory(storage) => StorageReader {
71                kind: StorageReaderKind::Memory(storage.snapshot()),
72            },
73        }
74    }
75
76    pub fn transaction<T, E: Error + 'static + From<StorageError>>(
77        &self,
78        f: impl for<'a> Fn(StorageWriter<'a>) -> Result<T, E>,
79    ) -> Result<T, E> {
80        match &self.kind {
81            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
82            StorageKind::RocksDb(storage) => storage.transaction(|transaction| {
83                f(StorageWriter {
84                    kind: StorageWriterKind::RocksDb(transaction),
85                })
86            }),
87            StorageKind::Memory(storage) => storage.transaction(|transaction| {
88                f(StorageWriter {
89                    kind: StorageWriterKind::Memory(transaction),
90                })
91            }),
92        }
93    }
94
95    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
96    pub fn flush(&self) -> Result<(), StorageError> {
97        match &self.kind {
98            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
99            StorageKind::RocksDb(storage) => storage.flush(),
100            StorageKind::Memory(_) => Ok(()),
101        }
102    }
103
104    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
105    pub fn compact(&self) -> Result<(), StorageError> {
106        match &self.kind {
107            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
108            StorageKind::RocksDb(storage) => storage.compact(),
109            StorageKind::Memory(_) => Ok(()),
110        }
111    }
112
113    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
114    pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
115        match &self.kind {
116            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
117            StorageKind::RocksDb(storage) => storage.backup(target_directory),
118            StorageKind::Memory(_) => Err(StorageError::Other(
119                "It is not possible to backup an in-memory database".into(),
120            )),
121        }
122    }
123
124    pub fn bulk_loader(&self) -> StorageBulkLoader {
125        match &self.kind {
126            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
127            StorageKind::RocksDb(storage) => StorageBulkLoader {
128                kind: StorageBulkLoaderKind::RocksDb(storage.bulk_loader()),
129            },
130            StorageKind::Memory(storage) => StorageBulkLoader {
131                kind: StorageBulkLoaderKind::Memory(storage.bulk_loader()),
132            },
133        }
134    }
135}
136
137pub struct StorageReader {
138    kind: StorageReaderKind,
139}
140
141enum StorageReaderKind {
142    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
143    RocksDb(RocksDbStorageReader),
144    Memory(MemoryStorageReader),
145}
146
147#[allow(clippy::unnecessary_wraps)]
148impl StorageReader {
149    pub fn len(&self) -> Result<usize, StorageError> {
150        match &self.kind {
151            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
152            StorageReaderKind::RocksDb(reader) => reader.len(),
153            StorageReaderKind::Memory(reader) => Ok(reader.len()),
154        }
155    }
156
157    pub fn is_empty(&self) -> Result<bool, StorageError> {
158        match &self.kind {
159            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
160            StorageReaderKind::RocksDb(reader) => reader.is_empty(),
161            StorageReaderKind::Memory(reader) => Ok(reader.is_empty()),
162        }
163    }
164
165    pub fn contains(&self, quad: &EncodedQuad) -> Result<bool, StorageError> {
166        match &self.kind {
167            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
168            StorageReaderKind::RocksDb(reader) => reader.contains(quad),
169            StorageReaderKind::Memory(reader) => Ok(reader.contains(quad)),
170        }
171    }
172
173    pub fn quads_for_pattern(
174        &self,
175        subject: Option<&EncodedTerm>,
176        predicate: Option<&EncodedTerm>,
177        object: Option<&EncodedTerm>,
178        graph_name: Option<&EncodedTerm>,
179    ) -> DecodingQuadIterator {
180        DecodingQuadIterator {
181            kind: match &self.kind {
182                #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
183                StorageReaderKind::RocksDb(reader) => DecodingQuadIteratorKind::RocksDb(
184                    reader.quads_for_pattern(subject, predicate, object, graph_name),
185                ),
186                StorageReaderKind::Memory(reader) => DecodingQuadIteratorKind::Memory(
187                    reader.quads_for_pattern(subject, predicate, object, graph_name),
188                ),
189            },
190        }
191    }
192
193    pub fn named_graphs(&self) -> DecodingGraphIterator {
194        match &self.kind {
195            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
196            StorageReaderKind::RocksDb(reader) => DecodingGraphIterator {
197                kind: DecodingGraphIteratorKind::RocksDb(reader.named_graphs()),
198            },
199            StorageReaderKind::Memory(reader) => DecodingGraphIterator {
200                kind: DecodingGraphIteratorKind::Memory(reader.named_graphs()),
201            },
202        }
203    }
204
205    pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result<bool, StorageError> {
206        match &self.kind {
207            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
208            StorageReaderKind::RocksDb(reader) => reader.contains_named_graph(graph_name),
209            StorageReaderKind::Memory(reader) => Ok(reader.contains_named_graph(graph_name)),
210        }
211    }
212
213    pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
214        match &self.kind {
215            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
216            StorageReaderKind::RocksDb(reader) => reader.contains_str(key),
217            StorageReaderKind::Memory(reader) => Ok(reader.contains_str(key)),
218        }
219    }
220
221    /// Validates that all the storage invariants held in the data
222    pub fn validate(&self) -> Result<(), StorageError> {
223        match &self.kind {
224            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
225            StorageReaderKind::RocksDb(reader) => reader.validate(),
226            StorageReaderKind::Memory(reader) => reader.validate(),
227        }
228    }
229}
230
231pub struct DecodingQuadIterator {
232    kind: DecodingQuadIteratorKind,
233}
234
235enum DecodingQuadIteratorKind {
236    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
237    RocksDb(RocksDbChainedDecodingQuadIterator),
238    Memory(QuadIterator),
239}
240
241impl Iterator for DecodingQuadIterator {
242    type Item = Result<EncodedQuad, StorageError>;
243
244    fn next(&mut self) -> Option<Self::Item> {
245        match &mut self.kind {
246            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
247            DecodingQuadIteratorKind::RocksDb(iter) => iter.next(),
248            DecodingQuadIteratorKind::Memory(iter) => iter.next().map(Ok),
249        }
250    }
251}
252
253pub struct DecodingGraphIterator {
254    kind: DecodingGraphIteratorKind,
255}
256
257enum DecodingGraphIteratorKind {
258    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
259    RocksDb(RocksDbDecodingGraphIterator),
260    Memory(MemoryDecodingGraphIterator),
261}
262
263impl Iterator for DecodingGraphIterator {
264    type Item = Result<EncodedTerm, StorageError>;
265
266    fn next(&mut self) -> Option<Self::Item> {
267        match &mut self.kind {
268            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
269            DecodingGraphIteratorKind::RocksDb(iter) => iter.next(),
270            DecodingGraphIteratorKind::Memory(iter) => iter.next().map(Ok),
271        }
272    }
273}
274
275impl StrLookup for StorageReader {
276    fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
277        match &self.kind {
278            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
279            StorageReaderKind::RocksDb(reader) => reader.get_str(key),
280            StorageReaderKind::Memory(reader) => reader.get_str(key),
281        }
282    }
283}
284
285pub struct StorageWriter<'a> {
286    kind: StorageWriterKind<'a>,
287}
288
289enum StorageWriterKind<'a> {
290    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
291    RocksDb(RocksDbStorageWriter<'a>),
292    Memory(MemoryStorageWriter<'a>),
293}
294
295#[allow(clippy::unnecessary_wraps)]
296impl StorageWriter<'_> {
297    pub fn reader(&self) -> StorageReader {
298        match &self.kind {
299            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
300            StorageWriterKind::RocksDb(writer) => StorageReader {
301                kind: StorageReaderKind::RocksDb(writer.reader()),
302            },
303            StorageWriterKind::Memory(writer) => StorageReader {
304                kind: StorageReaderKind::Memory(writer.reader()),
305            },
306        }
307    }
308
309    pub fn insert(&mut self, quad: QuadRef<'_>) -> Result<bool, StorageError> {
310        match &mut self.kind {
311            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
312            StorageWriterKind::RocksDb(writer) => writer.insert(quad),
313            StorageWriterKind::Memory(writer) => Ok(writer.insert(quad)),
314        }
315    }
316
317    pub fn insert_named_graph(
318        &mut self,
319        graph_name: NamedOrBlankNodeRef<'_>,
320    ) -> Result<bool, StorageError> {
321        match &mut self.kind {
322            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
323            StorageWriterKind::RocksDb(writer) => writer.insert_named_graph(graph_name),
324            StorageWriterKind::Memory(writer) => Ok(writer.insert_named_graph(graph_name)),
325        }
326    }
327
328    pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool, StorageError> {
329        match &mut self.kind {
330            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
331            StorageWriterKind::RocksDb(writer) => writer.remove(quad),
332            StorageWriterKind::Memory(writer) => Ok(writer.remove(quad)),
333        }
334    }
335
336    pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<(), StorageError> {
337        match &mut self.kind {
338            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
339            StorageWriterKind::RocksDb(writer) => writer.clear_graph(graph_name),
340            StorageWriterKind::Memory(writer) => {
341                writer.clear_graph(graph_name);
342                Ok(())
343            }
344        }
345    }
346
347    pub fn clear_all_named_graphs(&mut self) -> Result<(), StorageError> {
348        match &mut self.kind {
349            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
350            StorageWriterKind::RocksDb(writer) => writer.clear_all_named_graphs(),
351            StorageWriterKind::Memory(writer) => {
352                writer.clear_all_named_graphs();
353                Ok(())
354            }
355        }
356    }
357
358    pub fn clear_all_graphs(&mut self) -> Result<(), StorageError> {
359        match &mut self.kind {
360            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
361            StorageWriterKind::RocksDb(writer) => writer.clear_all_graphs(),
362            StorageWriterKind::Memory(writer) => {
363                writer.clear_all_graphs();
364                Ok(())
365            }
366        }
367    }
368
369    pub fn remove_named_graph(
370        &mut self,
371        graph_name: NamedOrBlankNodeRef<'_>,
372    ) -> Result<bool, StorageError> {
373        match &mut self.kind {
374            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
375            StorageWriterKind::RocksDb(writer) => writer.remove_named_graph(graph_name),
376            StorageWriterKind::Memory(writer) => Ok(writer.remove_named_graph(graph_name)),
377        }
378    }
379
380    pub fn remove_all_named_graphs(&mut self) -> Result<(), StorageError> {
381        match &mut self.kind {
382            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
383            StorageWriterKind::RocksDb(writer) => writer.remove_all_named_graphs(),
384            StorageWriterKind::Memory(writer) => {
385                writer.remove_all_named_graphs();
386                Ok(())
387            }
388        }
389    }
390
391    pub fn clear(&mut self) -> Result<(), StorageError> {
392        match &mut self.kind {
393            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
394            StorageWriterKind::RocksDb(writer) => writer.clear(),
395            StorageWriterKind::Memory(writer) => {
396                writer.clear();
397                Ok(())
398            }
399        }
400    }
401}
402
403#[must_use]
404pub struct StorageBulkLoader {
405    kind: StorageBulkLoaderKind,
406}
407
408enum StorageBulkLoaderKind {
409    #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
410    RocksDb(RocksDbStorageBulkLoader),
411    Memory(MemoryStorageBulkLoader),
412}
413impl StorageBulkLoader {
414    #[allow(unused_variables)]
415    pub fn with_num_threads(self, num_threads: usize) -> Self {
416        match self.kind {
417            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
418            StorageBulkLoaderKind::RocksDb(loader) => Self {
419                kind: StorageBulkLoaderKind::RocksDb(loader.with_num_threads(num_threads)),
420            },
421            StorageBulkLoaderKind::Memory(loader) => Self {
422                kind: StorageBulkLoaderKind::Memory(loader),
423            },
424        }
425    }
426
427    #[allow(unused_variables)]
428    pub fn with_max_memory_size_in_megabytes(self, max_memory_size: usize) -> Self {
429        match self.kind {
430            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
431            StorageBulkLoaderKind::RocksDb(loader) => Self {
432                kind: StorageBulkLoaderKind::RocksDb(
433                    loader.with_max_memory_size_in_megabytes(max_memory_size),
434                ),
435            },
436            StorageBulkLoaderKind::Memory(loader) => Self {
437                kind: StorageBulkLoaderKind::Memory(loader),
438            },
439        }
440    }
441
442    pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self {
443        match self.kind {
444            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
445            StorageBulkLoaderKind::RocksDb(loader) => Self {
446                kind: StorageBulkLoaderKind::RocksDb(loader.on_progress(callback)),
447            },
448            StorageBulkLoaderKind::Memory(loader) => Self {
449                kind: StorageBulkLoaderKind::Memory(loader.on_progress(callback)),
450            },
451        }
452    }
453
454    #[allow(clippy::trait_duplication_in_bounds)]
455    pub fn load<EI, EO: From<StorageError> + From<EI>>(
456        &self,
457        quads: impl IntoIterator<Item = Result<Quad, EI>>,
458    ) -> Result<(), EO> {
459        match &self.kind {
460            #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
461            StorageBulkLoaderKind::RocksDb(loader) => loader.load(quads),
462            StorageBulkLoaderKind::Memory(loader) => loader.load(quads),
463        }
464    }
465}