wasm_streams/readable/
mod.rs

1//! Bindings and conversions for
2//! [readable streams](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
3use futures_util::io::AsyncRead;
4use futures_util::Stream;
5use wasm_bindgen::prelude::*;
6use wasm_bindgen::JsCast;
7
8pub use byob_reader::ReadableStreamBYOBReader;
9pub use default_reader::ReadableStreamDefaultReader;
10pub use into_async_read::IntoAsyncRead;
11pub use into_stream::IntoStream;
12use into_underlying_source::IntoUnderlyingSource;
13pub use pipe_options::PipeOptions;
14
15use crate::queuing_strategy::QueuingStrategy;
16use crate::readable::into_underlying_byte_source::IntoUnderlyingByteSource;
17use crate::util::promise_to_void_future;
18use crate::writable::WritableStream;
19
20mod byob_reader;
21mod default_reader;
22mod into_async_read;
23mod into_stream;
24mod into_underlying_byte_source;
25mod into_underlying_source;
26mod pipe_options;
27pub mod sys;
28
29/// A [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream).
30///
31/// `ReadableStream`s can be created from a [raw JavaScript stream](sys::ReadableStream) with
32/// [`from_raw`](Self::from_raw), or from a Rust [`Stream`] with [`from_stream`](Self::from_stream).
33///
34/// They can be converted into a [raw JavaScript stream](sys::ReadableStream) with
35/// [`into_raw`](Self::into_raw), or into a Rust [`Stream`] with [`into_stream`](Self::into_stream).
36///
37/// If the browser supports [readable byte streams](https://streams.spec.whatwg.org/#readable-byte-stream),
38/// then they can be created from a Rust [`AsyncRead`] with [`from_async_read`](Self::from_async_read),
39/// or converted into one with [`into_async_read`](Self::into_async_read).
40///
41/// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
42/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
43#[derive(Debug)]
44pub struct ReadableStream {
45    raw: sys::ReadableStream,
46}
47
48impl ReadableStream {
49    /// Creates a new `ReadableStream` from a [JavaScript stream](sys::ReadableStream).
50    #[inline]
51    pub fn from_raw(raw: sys::ReadableStream) -> Self {
52        Self { raw }
53    }
54
55    /// Creates a new `ReadableStream` from a [`Stream`].
56    ///
57    /// Items and errors must be represented as raw [`JsValue`](JsValue)s.
58    /// Use [`map`], [`map_ok`] and/or [`map_err`] to convert a stream's items to a `JsValue`
59    /// before passing it to this function.
60    ///
61    /// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
62    /// [`map`]: https://docs.rs/futures/0.3.18/futures/stream/trait.StreamExt.html#method.map
63    /// [`map_ok`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_ok
64    /// [`map_err`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_err
65    pub fn from_stream<St>(stream: St) -> Self
66    where
67        St: Stream<Item = Result<JsValue, JsValue>> + 'static,
68    {
69        let source = IntoUnderlyingSource::new(Box::new(stream));
70        // Set HWM to 0 to prevent the JS ReadableStream from buffering chunks in its queue,
71        // since the original Rust stream is better suited to handle that.
72        let strategy = QueuingStrategy::new(0.0);
73        let raw = sys::ReadableStream::new_with_source(source, strategy);
74        Self { raw }
75    }
76
77    /// Creates a new `ReadableStream` from an [`AsyncRead`].
78    ///
79    /// This creates a readable byte stream whose `autoAllocateChunkSize` is `default_buffer_len`.
80    /// Therefore, if a default reader is used to consume the stream, the given `async_read`
81    /// will be [polled][AsyncRead::poll_read] with a buffer of this size. If a BYOB reader is used,
82    /// then it will be polled with a buffer of the same size as the BYOB read request instead.
83    ///
84    /// **Panics** if readable byte streams are not supported by the browser.
85    ///
86    /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
87    /// [AsyncRead::poll_read]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html#tymethod.poll_read
88    // TODO Non-panicking variant?
89    pub fn from_async_read<R>(async_read: R, default_buffer_len: usize) -> Self
90    where
91        R: AsyncRead + 'static,
92    {
93        let source = IntoUnderlyingByteSource::new(Box::new(async_read), default_buffer_len);
94        let raw = sys::ReadableStream::new_with_byte_source(source)
95            .expect_throw("readable byte streams not supported");
96        Self { raw }
97    }
98
99    /// Acquires a reference to the underlying [JavaScript stream](sys::ReadableStream).
100    #[inline]
101    pub fn as_raw(&self) -> &sys::ReadableStream {
102        &self.raw
103    }
104
105    /// Consumes this `ReadableStream`, returning the underlying [JavaScript stream](sys::ReadableStream).
106    #[inline]
107    pub fn into_raw(self) -> sys::ReadableStream {
108        self.raw
109    }
110
111    /// Returns `true` if the stream is [locked to a reader](https://streams.spec.whatwg.org/#lock).
112    #[inline]
113    pub fn is_locked(&self) -> bool {
114        self.as_raw().is_locked()
115    }
116
117    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
118    /// signaling a loss of interest in the stream by a consumer.
119    ///
120    /// If the stream is currently locked to a reader, then this returns an error.
121    pub async fn cancel(&mut self) -> Result<(), JsValue> {
122        promise_to_void_future(self.as_raw().cancel()).await
123    }
124
125    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
126    /// signaling a loss of interest in the stream by a consumer.
127    ///
128    /// The supplied `reason` will be given to the underlying source, which may or may not use it.
129    ///
130    /// If the stream is currently locked to a reader, then this returns an error.
131    pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
132        promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
133    }
134
135    /// Creates a [default reader](ReadableStreamDefaultReader) and
136    /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
137    ///
138    /// While the stream is locked, no other reader can be acquired until this one is released.
139    ///
140    /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
141    /// use [`try_get_reader`](Self::try_get_reader).
142    #[inline]
143    pub fn get_reader(&mut self) -> ReadableStreamDefaultReader {
144        self.try_get_reader()
145            .expect_throw("already locked to a reader")
146    }
147
148    /// Try to create a [default reader](ReadableStreamDefaultReader) and
149    /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
150    ///
151    /// While the stream is locked, no other reader can be acquired until this one is released.
152    ///
153    /// If the stream is already locked to a reader, then this returns an error.
154    pub fn try_get_reader(&mut self) -> Result<ReadableStreamDefaultReader, js_sys::Error> {
155        ReadableStreamDefaultReader::new(self)
156    }
157
158    /// Creates a [BYOB reader](ReadableStreamBYOBReader) and
159    /// [locks](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
160    ///
161    /// While the stream is locked, no other reader can be acquired until this one is released.
162    ///
163    /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
164    /// byte stream. For a non-panicking variant, use [`try_get_reader`](Self::try_get_reader).
165    pub fn get_byob_reader(&mut self) -> ReadableStreamBYOBReader {
166        self.try_get_byob_reader()
167            .expect_throw("already locked to a reader, or not a readable byte stream")
168    }
169
170    /// Try to create a [BYOB reader](ReadableStreamBYOBReader) and
171    /// [lock](https://streams.spec.whatwg.org/#lock) the stream to the new reader.
172    ///
173    /// While the stream is locked, no other reader can be acquired until this one is released.
174    ///
175    /// If the stream is already locked to a reader, then this returns an error.
176    pub fn try_get_byob_reader(&mut self) -> Result<ReadableStreamBYOBReader, js_sys::Error> {
177        ReadableStreamBYOBReader::new(self)
178    }
179
180    /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
181    /// writable stream.
182    ///
183    /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
184    /// of the pipe, preventing any other consumer from acquiring a reader.
185    ///
186    /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
187    /// was encountered during the process.
188    pub async fn pipe_to<'a>(&'a mut self, dest: &'a mut WritableStream) -> Result<(), JsValue> {
189        self.pipe_to_with_options(dest, &PipeOptions::default())
190            .await
191    }
192
193    /// [Pipes](https://streams.spec.whatwg.org/#piping) this readable stream to a given
194    /// writable stream.
195    ///
196    /// Piping a stream will [lock](https://streams.spec.whatwg.org/#lock) it for the duration
197    /// of the pipe, preventing any other consumer from acquiring a reader.
198    ///
199    /// Errors and closures of the source and destination streams propagate as follows:
200    /// * An error in the source readable stream will [abort](https://streams.spec.whatwg.org/#abort-a-writable-stream)
201    ///   the destination writable stream, unless [`options.prevent_abort`](PipeOptions::prevent_abort)
202    ///   is `true`.
203    /// * An error in the destination writable stream will [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream)
204    ///   the source readable stream, unless [`options.prevent_cancel`](PipeOptions::prevent_cancel)
205    ///   is `true`.
206    /// * When the source readable stream closes, the destination writable stream will be closed,
207    ///   unless [`options.prevent_close`](PipeOptions::prevent_close) is `true`.
208    /// * If the destination writable stream starts out closed or closing, the source readable stream
209    ///   will be [canceled](https://streams.spec.whatwg.org/#cancel-a-readable-stream),
210    ///   unless unless [`options.prevent_cancel`](PipeOptions::prevent_cancel) is `true`.
211    ///
212    /// This returns `()` if the pipe completes successfully, or `Err(error)` if any `error`
213    /// was encountered during the process.
214    pub async fn pipe_to_with_options<'a>(
215        &'a mut self,
216        dest: &'a mut WritableStream,
217        options: &PipeOptions,
218    ) -> Result<(), JsValue> {
219        let promise = self
220            .as_raw()
221            .pipe_to(dest.as_raw(), options.clone().into_raw());
222        promise_to_void_future(promise).await
223    }
224
225    /// [Tees](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
226    /// returning the two resulting branches as new [`ReadableStream`](ReadableStream) instances.
227    ///
228    /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
229    /// consumer from acquiring a reader.
230    /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
231    /// cancel both of the resulting branches; a composite cancellation reason will then be
232    /// propagated to the stream's underlying source.
233    ///
234    /// Note that the chunks seen in each branch will be the same object.
235    /// If the chunks are not immutable, this could allow interference between the two branches.
236    ///
237    /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
238    /// use [`try_tee`](Self::try_tee).
239    pub fn tee(self) -> (ReadableStream, ReadableStream) {
240        self.try_tee().expect_throw("already locked to a reader")
241    }
242
243    /// Tries to [tee](https://streams.spec.whatwg.org/#tee-a-readable-stream) this readable stream,
244    /// returning the two resulting branches as new [`ReadableStream`](ReadableStream) instances.
245    ///
246    /// Teeing a stream will [lock](https://streams.spec.whatwg.org/#lock) it, preventing any other
247    /// consumer from acquiring a reader.
248    /// To [cancel](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
249    /// cancel both of the resulting branches; a composite cancellation reason will then be
250    /// propagated to the stream's underlying source.
251    ///
252    /// Note that the chunks seen in each branch will be the same object.
253    /// If the chunks are not immutable, this could allow interference between the two branches.
254    ///
255    /// If the stream is already locked to a reader, then this returns an error
256    /// along with the original `ReadableStream`.
257    pub fn try_tee(self) -> Result<(ReadableStream, ReadableStream), (js_sys::Error, Self)> {
258        let branches = self.as_raw().tee().map_err(|err| (err, self))?;
259        debug_assert_eq!(branches.length(), 2);
260        let (left, right) = (branches.get(0), branches.get(1));
261        Ok((
262            Self::from_raw(left.unchecked_into()),
263            Self::from_raw(right.unchecked_into()),
264        ))
265    }
266
267    /// Converts this `ReadableStream` into a [`Stream`].
268    ///
269    /// Items and errors are represented by their raw [`JsValue`](JsValue).
270    /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
271    /// appropriate type.
272    ///
273    /// **Panics** if the stream is already locked to a reader. For a non-panicking variant,
274    /// use [`try_into_stream`](Self::try_into_stream).
275    ///
276    /// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
277    /// [`map`]: https://docs.rs/futures/0.3.18/futures/stream/trait.StreamExt.html#method.map
278    /// [`map_ok`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_ok
279    /// [`map_err`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_err
280    #[inline]
281    pub fn into_stream(self) -> IntoStream<'static> {
282        self.try_into_stream()
283            .expect_throw("already locked to a reader")
284    }
285
286    /// Try to convert this `ReadableStream` into a [`Stream`].
287    ///
288    /// Items and errors are represented by their raw [`JsValue`](JsValue).
289    /// Use [`map`], [`map_ok`] and/or [`map_err`] on the returned stream to convert them to a more
290    /// appropriate type.
291    ///
292    /// If the stream is already locked to a reader, then this returns an error
293    /// along with the original `ReadableStream`.
294    ///
295    /// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
296    /// [`map`]: https://docs.rs/futures/0.3.18/futures/stream/trait.StreamExt.html#method.map
297    /// [`map_ok`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_ok
298    /// [`map_err`]: https://docs.rs/futures/0.3.18/futures/stream/trait.TryStreamExt.html#method.map_err
299    pub fn try_into_stream(mut self) -> Result<IntoStream<'static>, (js_sys::Error, Self)> {
300        let reader = ReadableStreamDefaultReader::new(&mut self).map_err(|err| (err, self))?;
301        Ok(IntoStream::new(reader, true))
302    }
303
304    /// Converts this `ReadableStream` into an [`AsyncRead`].
305    ///
306    /// **Panics** if the stream is already locked to a reader, or if this stream is not a readable
307    /// byte stream. For a non-panicking variant, use [`try_into_async_read`](Self::try_into_async_read).
308    ///
309    /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
310    #[inline]
311    pub fn into_async_read(self) -> IntoAsyncRead<'static> {
312        self.try_into_async_read()
313            .expect_throw("already locked to a reader, or not a readable byte stream")
314    }
315
316    /// Try to convert this `ReadableStream` into an [`AsyncRead`].
317    ///
318    /// If the stream is already locked to a reader, or if this stream is not a readable byte
319    /// stream, then this returns an error along with the original `ReadableStream`.
320    ///
321    /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
322    pub fn try_into_async_read(mut self) -> Result<IntoAsyncRead<'static>, (js_sys::Error, Self)> {
323        let reader = ReadableStreamBYOBReader::new(&mut self).map_err(|err| (err, self))?;
324        Ok(IntoAsyncRead::new(reader, true))
325    }
326}
327
328impl<St> From<St> for ReadableStream
329where
330    St: Stream<Item = Result<JsValue, JsValue>> + 'static,
331{
332    /// Equivalent to [`from_stream`](Self::from_stream).
333    #[inline]
334    fn from(stream: St) -> Self {
335        Self::from_stream(stream)
336    }
337}