wasm_streams/readable/mod.rs
1//! Bindings and conversions for
2//! [readable streams](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
3use futures_util::io::AsyncRead;
4use futures_util::Stream;
5use wasm_bindgen::prelude::*;
6use wasm_bindgen::JsCast;
7
8pub use byob_reader::ReadableStreamBYOBReader;
9pub use default_reader::ReadableStreamDefaultReader;
10pub use into_async_read::IntoAsyncRead;
11pub use into_stream::IntoStream;
12use into_underlying_source::IntoUnderlyingSource;
13pub use pipe_options::PipeOptions;
14
15use crate::queuing_strategy::QueuingStrategy;
16use crate::readable::into_underlying_byte_source::IntoUnderlyingByteSource;
17use crate::util::promise_to_void_future;
18use crate::writable::WritableStream;
19
20mod byob_reader;
21mod default_reader;
22mod into_async_read;
23mod into_stream;
24mod into_underlying_byte_source;
25mod into_underlying_source;
26mod pipe_options;
27pub mod sys;
28
29/// A [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
30///
31/// `ReadableStream`s can be created from a [raw JavaScript stream](sys::ReadableStream) with
32/// [`from_raw`](Self::from_raw), or from a Rust [`Stream`] with [`from_stream`](Self::from_stream).
33///
34/// They can be converted into a [raw JavaScript stream](sys::ReadableStream) with
35/// [`into_raw`](Self::into_raw), or into a Rust [`Stream`] with [`into_stream`](Self::into_stream).
36///
37/// If the browser supports [readable byte streams](https://streams.spec.whatwg.org/#readable-byte-stream),
38/// then they can be created from a Rust [`AsyncRead`] with [`from_async_read`](Self::from_async_read),
39/// or converted into one with [`into_async_read`](Self::into_async_read).
40///
41/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
42/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
43#[derive(Debug)]
44pub struct ReadableStream {
45 raw: sys::ReadableStream,
46}
47
48impl ReadableStream {
49 /// Creates a new `ReadableStream` from a [JavaScript stream](sys::ReadableStream).
50 #[inline]
51 pub fn from_raw(raw: sys::ReadableStream) -> Self {
52 Self { raw }
53 }
54
55 /// Creates a new `ReadableStream` from a [`Stream`].
56 ///
57 /// Items and errors must be represented as raw [`JsValue`](JsValue)s.
58 /// Use [`map`], [`map_ok`] and/or [`map_err`] to convert a stream's items to a `JsValue`
59 /// before passing it to this function.
60 ///
61 /// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
62 /// [`map`]: https://docs.rs/futures/0.3.18/futures/stream/trait.StreamExt.html#method.map
63 /// [`map_ok`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_ok
64 /// [`map_err`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_err
65 pub fn from_stream<St>(stream: St) -> Self
66 where
67 St: Stream<Item = Result<JsValue, JsValue>> + 'static,
68 {
69 let source = IntoUnderlyingSource::new(Box::new(stream));
70 // Set HWM to 0 to prevent the JS ReadableStream from buffering chunks in its queue,
71 // since the original Rust stream is better suited to handle that.
72 let strategy = QueuingStrategy::new(0.0);
73 let raw = sys::ReadableStream::new_with_source(source, strategy);
74 Self { raw }
75 }
76
77 /// Creates a new `ReadableStream` from an [`AsyncRead`].
78 ///
79 /// This creates a readable byte stream whose `autoAllocateChunkSize` is `default_buffer_len`.
80 /// Therefore, if a default reader is used to consume the stream, the given `async_read`
81 /// will be [polled][AsyncRead::poll_read] with a buffer of this size. If a BYOB reader is used,
82 /// then it will be polled with a buffer of the same size as the BYOB read request instead.
83 ///
84 /// **Panics** if readable byte streams are not supported by the browser.
85 ///
86 /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
87 /// [AsyncRead::poll_read]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html#tymethod.poll_read
88 // TODO Non-panicking variant?
89 pub fn from_async_read<R>(async_read: R, default_buffer_len: usize) -> Self
90 where
91 R: AsyncRead + 'static,
92 {
93 let source = IntoUnderlyingByteSource::new(Box::new(async_read), default_buffer_len);
94 let raw = sys::ReadableStream::new_with_byte_source(source)
95 .expect_throw("readable byte streams not supported");
96 Self { raw }
97 }
98
99 /// Acquires a reference to the underlying [JavaScript stream](sys::ReadableStream).
100 #[inline]
101 pub fn as_raw(&self) -> &sys::ReadableStream {
102 &self.raw
103 }
104
105 /// Consumes this `ReadableStream`, returning the underlying [JavaScript stream](sys::ReadableStream).
106 #[inline]
107 pub fn into_raw(self) -> sys::ReadableStream {
108 self.raw
109 }
110
111 /// Returns `true` if the stream is [locked to a reader](https://streams.spec.whatwg.org/#lock).
112 #[inline]
113 pub fn is_locked(&self) -> bool {
114 self.as_raw().is_locked()
115 }
116
117 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
118 /// signaling a loss of interest in the stream by a consumer.
119 ///
120 /// If the stream is currently locked to a reader, then this returns an error.
121 pub async fn cancel(&mut self) -> Result<(), JsValue> {
122 promise_to_void_future(self.as_raw().cancel()).await
123 }
124
125 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
126 /// signaling a loss of interest in the stream by a consumer.
127 ///
128 /// The supplied `reason` will be given to the underlying source, which may or may not use it.
129 ///
130 /// If the stream is currently locked to a reader, then this returns an error.
131 pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
132 promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
133 }
134
135 /// Creates a [default reader](ReadableStreamDefaultReader) and
136 /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
137 ///
138 /// While the stream is locked, no other reader can be acquired until this one is released.
139 ///
140 /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
141 /// use [`try_get_reader`](Self::try_get_reader).
142 #[inline]
143 pub fn get_reader(&mut self) -> ReadableStreamDefaultReader {
144 self.try_get_reader()
145 .expect_throw("already locked to a reader")
146 }
147
148 /// Try to create a [default reader](ReadableStreamDefaultReader) and
149 /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
150 ///
151 /// While the stream is locked, no other reader can be acquired until this one is released.
152 ///
153 /// If the stream is already locked to a reader, then this returns an error.
154 pub fn try_get_reader(&mut self) -> Result<ReadableStreamDefaultReader, js_sys::Error> {
155 ReadableStreamDefaultReader::new(self)
156 }
157
158 /// Creates a [BYOB reader](ReadableStreamBYOBReader) and
159 /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
160 ///
161 /// While the stream is locked, no other reader can be acquired until this one is released.
162 ///
163 /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
164 /// byte stream. For a non-panicking variant, use [`try_get_reader`](Self::try_get_reader).
165 pub fn get_byob_reader(&mut self) -> ReadableStreamBYOBReader {
166 self.try_get_byob_reader()
167 .expect_throw("already locked to a reader, or not a readable byte stream")
168 }
169
170 /// Try to create a [BYOB reader](ReadableStreamBYOBReader) and
171 /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
172 ///
173 /// While the stream is locked, no other reader can be acquired until this one is released.
174 ///
175 /// If the stream is already locked to a reader, then this returns an error.
176 pub fn try_get_byob_reader(&mut self) -> Result<ReadableStreamBYOBReader, js_sys::Error> {
177 ReadableStreamBYOBReader::new(self)
178 }
179
180 /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
181 /// writable stream.
182 ///
183 /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
184 /// of the pipe, preventing any other consumer from acquiring a reader.
185 ///
186 /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
187 /// was encountered during the process.
188 pub async fn pipe_to<'a>(&'a mut self, dest: &'a mut WritableStream) -> Result<(), JsValue> {
189 self.pipe_to_with_options(dest, &PipeOptions::default())
190 .await
191 }
192
193 /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
194 /// writable stream.
195 ///
196 /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
197 /// of the pipe, preventing any other consumer from acquiring a reader.
198 ///
199 /// Errors and closures of the source and destination streams propagate as follows:
200 /// * An error in the source readable stream will [abort](https://streams.spec.whatwg.org/#abort-a-writable-stream)
201 /// the destination writable stream, unless [`options.prevent_abort`](PipeOptions::prevent_abort)
202 /// is `true`.
203 /// * An error in the destination writable stream will [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream)
204 /// the source readable stream, unless [`options.prevent_cancel`](PipeOptions::prevent_cancel)
205 /// is `true`.
206 /// * When the source readable stream closes, the destination writable stream will be closed,
207 /// unless [`options.prevent_close`](PipeOptions::prevent_close) is `true`.
208 /// * If the destination writable stream starts out closed or closing, the source readable stream
209 /// will be [canceled](https://streams.spec.whatwg.org/#cancel-a-readable-stream),
210 /// unless unless [`options.prevent_cancel`](PipeOptions::prevent_cancel) is `true`.
211 ///
212 /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
213 /// was encountered during the process.
214 pub async fn pipe_to_with_options<'a>(
215 &'a mut self,
216 dest: &'a mut WritableStream,
217 options: &PipeOptions,
218 ) -> Result<(), JsValue> {
219 let promise = self
220 .as_raw()
221 .pipe_to(dest.as_raw(), options.clone().into_raw());
222 promise_to_void_future(promise).await
223 }
224
225 /// [Tees](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
226 /// returning the two resulting branches as new [`ReadableStream`](ReadableStream) instances.
227 ///
228 /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
229 /// consumer from acquiring a reader.
230 /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
231 /// cancel both of the resulting branches; a composite cancellation reason will then be
232 /// propagated to the stream's underlying source.
233 ///
234 /// Note that the chunks seen in each branch will be the same object.
235 /// If the chunks are not immutable, this could allow interference between the two branches.
236 ///
237 /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
238 /// use [`try_tee`](Self::try_tee).
239 pub fn tee(self) -> (ReadableStream, ReadableStream) {
240 self.try_tee().expect_throw("already locked to a reader")
241 }
242
243 /// Tries to [tee](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
244 /// returning the two resulting branches as new [`ReadableStream`](ReadableStream) instances.
245 ///
246 /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
247 /// consumer from acquiring a reader.
248 /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
249 /// cancel both of the resulting branches; a composite cancellation reason will then be
250 /// propagated to the stream's underlying source.
251 ///
252 /// Note that the chunks seen in each branch will be the same object.
253 /// If the chunks are not immutable, this could allow interference between the two branches.
254 ///
255 /// If the stream is already locked to a reader, then this returns an error
256 /// along with the original `ReadableStream`.
257 pub fn try_tee(self) -> Result<(ReadableStream, ReadableStream), (js_sys::Error, Self)> {
258 let branches = self.as_raw().tee().map_err(|err| (err, self))?;
259 debug_assert_eq!(branches.length(), 2);
260 let (left, right) = (branches.get(0), branches.get(1));
261 Ok((
262 Self::from_raw(left.unchecked_into()),
263 Self::from_raw(right.unchecked_into()),
264 ))
265 }
266
267 /// Converts this `ReadableStream` into a [`Stream`].
268 ///
269 /// Items and errors are represented by their raw [`JsValue`](JsValue).
270 /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
271 /// appropriate type.
272 ///
273 /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
274 /// use [`try_into_stream`](Self::try_into_stream).
275 ///
276 /// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
277 /// [`map`]: https://docs.rs/futures/0.3.18/futures/stream/trait.StreamExt.html#method.map
278 /// [`map_ok`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_ok
279 /// [`map_err`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_err
280 #[inline]
281 pub fn into_stream(self) -> IntoStream<'static> {
282 self.try_into_stream()
283 .expect_throw("already locked to a reader")
284 }
285
286 /// Try to convert this `ReadableStream` into a [`Stream`].
287 ///
288 /// Items and errors are represented by their raw [`JsValue`](JsValue).
289 /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
290 /// appropriate type.
291 ///
292 /// If the stream is already locked to a reader, then this returns an error
293 /// along with the original `ReadableStream`.
294 ///
295 /// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
296 /// [`map`]: https://docs.rs/futures/0.3.18/futures/stream/trait.StreamExt.html#method.map
297 /// [`map_ok`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_ok
298 /// [`map_err`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_err
299 pub fn try_into_stream(mut self) -> Result<IntoStream<'static>, (js_sys::Error, Self)> {
300 let reader = ReadableStreamDefaultReader::new(&mut self).map_err(|err| (err, self))?;
301 Ok(IntoStream::new(reader, true))
302 }
303
304 /// Converts this `ReadableStream` into an [`AsyncRead`].
305 ///
306 /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
307 /// byte stream. For a non-panicking variant, use [`try_into_async_read`](Self::try_into_async_read).
308 ///
309 /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
310 #[inline]
311 pub fn into_async_read(self) -> IntoAsyncRead<'static> {
312 self.try_into_async_read()
313 .expect_throw("already locked to a reader, or not a readable byte stream")
314 }
315
316 /// Try to convert this `ReadableStream` into an [`AsyncRead`].
317 ///
318 /// If the stream is already locked to a reader, or if this stream is not a readable byte
319 /// stream, then this returns an error along with the original `ReadableStream`.
320 ///
321 /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
322 pub fn try_into_async_read(mut self) -> Result<IntoAsyncRead<'static>, (js_sys::Error, Self)> {
323 let reader = ReadableStreamBYOBReader::new(&mut self).map_err(|err| (err, self))?;
324 Ok(IntoAsyncRead::new(reader, true))
325 }
326}
327
328impl<St> From<St> for ReadableStream
329where
330 St: Stream<Item = Result<JsValue, JsValue>> + 'static,
331{
332 /// Equivalent to [`from_stream`](Self::from_stream).
333 #[inline]
334 fn from(stream: St) -> Self {
335 Self::from_stream(stream)
336 }
337}