wasm_streams/readable/
byob_reader.rs

1use std::marker::PhantomData;
2
3use js_sys::Uint8Array;
4use wasm_bindgen::{throw_val, JsCast, JsValue};
5use wasm_bindgen_futures::JsFuture;
6
7use crate::util::{checked_cast_to_usize, clamp_to_u32, promise_to_void_future};
8
9use super::{sys, IntoAsyncRead, ReadableStream};
10
11/// A [`ReadableStreamBYOBReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader)
12/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
13///
14/// This is returned by the [`get_byob_reader`](ReadableStream::get_byob_reader) method.
15///
16/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
17/// If the reader still has a pending read request at this point (i.e. if a future returned
18/// by [`read`](Self::read) is not yet ready), then this will **panic**. You must either `await`
19/// all `read` futures, or [`cancel`](Self::cancel) the stream to discard any pending `read` futures.
20#[derive(Debug)]
21pub struct ReadableStreamBYOBReader<'stream> {
22    raw: sys::ReadableStreamBYOBReader,
23    _stream: PhantomData<&'stream mut ReadableStream>,
24}
25
26impl<'stream> ReadableStreamBYOBReader<'stream> {
27    pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
28        Ok(Self {
29            raw: stream.as_raw().get_reader_with_options(
30                sys::ReadableStreamGetReaderOptions::new(sys::ReadableStreamReaderMode::BYOB),
31            )?,
32            _stream: PhantomData,
33        })
34    }
35
36    /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamBYOBReader).
37    #[inline]
38    pub fn as_raw(&self) -> &sys::ReadableStreamBYOBReader {
39        &self.raw
40    }
41
42    /// Waits for the stream to become closed.
43    ///
44    /// This returns an error if the stream ever errors, or if the reader's lock is
45    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
46    /// closing.
47    pub async fn closed(&self) -> Result<(), JsValue> {
48        promise_to_void_future(self.as_raw().closed()).await
49    }
50
51    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
52    /// signaling a loss of interest in the stream by a consumer.
53    ///
54    /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
55    pub async fn cancel(&mut self) -> Result<(), JsValue> {
56        promise_to_void_future(self.as_raw().cancel()).await
57    }
58
59    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
60    /// signaling a loss of interest in the stream by a consumer.
61    ///
62    /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
63    pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
64        promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
65    }
66
67    /// Reads the next chunk from the stream's internal queue into `dst`,
68    /// and returns the number of bytes read.
69    ///
70    /// * If some bytes were read into `dst`, this returns `Ok(bytes_read)`.
71    /// * If the stream closes and no more bytes are available, this returns `Ok(0)`.
72    /// * If the stream cancels, this returns `Ok(0)`.
73    /// * If the stream encounters an `error`, this returns `Err(error)`.
74    ///
75    /// This always allocated a new temporary `Uint8Array` with the same size as `dst` to hold
76    /// the result before copying to `dst`. We cannot pass a view on the backing WebAssembly memory
77    /// directly, because:
78    /// * `reader.read(view)` needs to transfer `view.buffer`, but `WebAssembly.Memory` buffers
79    ///    are non-transferable.
80    /// * `view.buffer` can be invalidated if the WebAssembly memory grows while `read(view)`
81    ///    is still in progress.
82    ///
83    /// Therefore, it is necessary to use a separate buffer living in the JavaScript heap.
84    /// To avoid repeated allocations for repeated reads,
85    /// use [`read_with_buffer`](Self::read_with_buffer).
86    pub async fn read(&mut self, dst: &mut [u8]) -> Result<usize, JsValue> {
87        let buffer = Uint8Array::new_with_length(clamp_to_u32(dst.len()));
88        let (bytes_read, _) = self.read_with_buffer(dst, buffer).await?;
89        Ok(bytes_read)
90    }
91
92    /// Reads the next chunk from the stream's internal queue into `dst`,
93    /// and returns the number of bytes read.
94    ///
95    /// The given `buffer` is used to store the bytes before they are copied to `dst`.
96    /// This buffer is returned back together with the result, so it can be re-used for subsequent
97    /// reads without extra allocations. Note that the underlying `ArrayBuffer` is transferred
98    /// in the process, so any other views on the original buffer will become unusable.
99    ///
100    /// * If some bytes were read into `dst`, this returns `Ok((bytes_read, Some(buffer)))`.
101    /// * If the stream closes and no more bytes are available, this returns `Ok((0, Some(buffer)))`.
102    /// * If the stream cancels, this returns `Ok((0, None))`. In this case, the given buffer is
103    ///   not returned.
104    /// * If the stream encounters an `error`, this returns `Err(error)`.
105    pub async fn read_with_buffer(
106        &mut self,
107        dst: &mut [u8],
108        buffer: Uint8Array,
109    ) -> Result<(usize, Option<Uint8Array>), JsValue> {
110        // Save the original buffer's byte offset and length.
111        let buffer_offset = buffer.byte_offset();
112        let buffer_len = buffer.byte_length();
113        // Limit view to destination slice's length.
114        let dst_len = clamp_to_u32(dst.len());
115        let view = buffer
116            .subarray(0, dst_len)
117            .unchecked_into::<sys::ArrayBufferView>();
118        // Read into view. This transfers `buffer.buffer()`.
119        let promise = self.as_raw().read(&view);
120        let js_value = JsFuture::from(promise).await?;
121        let result = sys::ReadableStreamBYOBReadResult::from(js_value);
122        let filled_view = match result.value() {
123            Some(view) => view,
124            None => {
125                // No new view was returned. The stream must have been canceled.
126                assert!(result.is_done());
127                return Ok((0, None));
128            }
129        };
130        let filled_len = checked_cast_to_usize(filled_view.byte_length());
131        debug_assert!(filled_len <= dst.len());
132        // Re-construct the original Uint8Array with the new ArrayBuffer.
133        let new_buffer = Uint8Array::new_with_byte_offset_and_length(
134            &filled_view.buffer(),
135            buffer_offset,
136            buffer_len,
137        );
138        if result.is_done() {
139            debug_assert_eq!(filled_len, 0);
140        } else {
141            filled_view.copy_to(&mut dst[0..filled_len]);
142        }
143        Ok((filled_len, Some(new_buffer)))
144    }
145
146    /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
147    /// corresponding stream.
148    ///
149    /// **Panics** if the reader still has a pending read request, i.e. if a future returned
150    /// by [`read`](Self::read) is not yet ready. For a non-panicking variant,
151    /// use [`try_release_lock`](Self::try_release_lock).
152    #[inline]
153    pub fn release_lock(mut self) {
154        self.release_lock_mut()
155    }
156
157    fn release_lock_mut(&mut self) {
158        self.as_raw()
159            .release_lock()
160            .unwrap_or_else(|error| throw_val(error.into()))
161    }
162
163    /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
164    /// corresponding stream.
165    ///
166    /// The lock cannot be released while the reader still has a pending read request, i.e.
167    /// if a future returned by [`read`](Self::read) is not yet ready. Attempting to do so will
168    /// return an error and leave the reader locked to the stream.
169    #[inline]
170    pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
171        self.as_raw().release_lock().map_err(|error| (error, self))
172    }
173
174    /// Converts this `ReadableStreamBYOBReader` into an [`AsyncRead`].
175    ///
176    /// This is similar to [`ReadableStream.into_async_read`](ReadableStream::into_async_read),
177    /// except that after the returned `AsyncRead` is dropped, the original `ReadableStream` is
178    /// still usable. This allows reading only a few bytes from the `AsyncRead`, while still
179    /// allowing another reader to read the remaining bytes later on.
180    ///
181    /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
182    #[inline]
183    pub fn into_async_read(self) -> IntoAsyncRead<'stream> {
184        IntoAsyncRead::new(self, false)
185    }
186}
187
188impl Drop for ReadableStreamBYOBReader<'_> {
189    fn drop(&mut self) {
190        self.release_lock_mut();
191    }
192}