wasm_streams/readable/byob_reader.rs
1use std::marker::PhantomData;
2
3use js_sys::Uint8Array;
4use wasm_bindgen::{throw_val, JsCast, JsValue};
5use wasm_bindgen_futures::JsFuture;
6
7use crate::util::{checked_cast_to_usize, clamp_to_u32, promise_to_void_future};
8
9use super::{sys, IntoAsyncRead, ReadableStream};
10
11/// A [`ReadableStreamBYOBReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamBYOBReader)
12/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
13///
14/// This is returned by the [`get_byob_reader`](ReadableStream::get_byob_reader) method.
15///
16/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
17/// If the reader still has a pending read request at this point (i.e. if a future returned
18/// by [`read`](Self::read) is not yet ready), then this will **panic**. You must either `await`
19/// all `read` futures, or [`cancel`](Self::cancel) the stream to discard any pending `read` futures.
20#[derive(Debug)]
21pub struct ReadableStreamBYOBReader<'stream> {
22 raw: sys::ReadableStreamBYOBReader,
23 _stream: PhantomData<&'stream mut ReadableStream>,
24}
25
26impl<'stream> ReadableStreamBYOBReader<'stream> {
27 pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
28 Ok(Self {
29 raw: stream.as_raw().get_reader_with_options(
30 sys::ReadableStreamGetReaderOptions::new(sys::ReadableStreamReaderMode::BYOB),
31 )?,
32 _stream: PhantomData,
33 })
34 }
35
36 /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamBYOBReader).
37 #[inline]
38 pub fn as_raw(&self) -> &sys::ReadableStreamBYOBReader {
39 &self.raw
40 }
41
42 /// Waits for the stream to become closed.
43 ///
44 /// This returns an error if the stream ever errors, or if the reader's lock is
45 /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
46 /// closing.
47 pub async fn closed(&self) -> Result<(), JsValue> {
48 promise_to_void_future(self.as_raw().closed()).await
49 }
50
51 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
52 /// signaling a loss of interest in the stream by a consumer.
53 ///
54 /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
55 pub async fn cancel(&mut self) -> Result<(), JsValue> {
56 promise_to_void_future(self.as_raw().cancel()).await
57 }
58
59 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
60 /// signaling a loss of interest in the stream by a consumer.
61 ///
62 /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
63 pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
64 promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
65 }
66
67 /// Reads the next chunk from the stream's internal queue into `dst`,
68 /// and returns the number of bytes read.
69 ///
70 /// * If some bytes were read into `dst`, this returns `Ok(bytes_read)`.
71 /// * If the stream closes and no more bytes are available, this returns `Ok(0)`.
72 /// * If the stream cancels, this returns `Ok(0)`.
73 /// * If the stream encounters an `error`, this returns `Err(error)`.
74 ///
75 /// This always allocated a new temporary `Uint8Array` with the same size as `dst` to hold
76 /// the result before copying to `dst`. We cannot pass a view on the backing WebAssembly memory
77 /// directly, because:
78 /// * `reader.read(view)` needs to transfer `view.buffer`, but `WebAssembly.Memory` buffers
79 /// are non-transferable.
80 /// * `view.buffer` can be invalidated if the WebAssembly memory grows while `read(view)`
81 /// is still in progress.
82 ///
83 /// Therefore, it is necessary to use a separate buffer living in the JavaScript heap.
84 /// To avoid repeated allocations for repeated reads,
85 /// use [`read_with_buffer`](Self::read_with_buffer).
86 pub async fn read(&mut self, dst: &mut [u8]) -> Result<usize, JsValue> {
87 let buffer = Uint8Array::new_with_length(clamp_to_u32(dst.len()));
88 let (bytes_read, _) = self.read_with_buffer(dst, buffer).await?;
89 Ok(bytes_read)
90 }
91
92 /// Reads the next chunk from the stream's internal queue into `dst`,
93 /// and returns the number of bytes read.
94 ///
95 /// The given `buffer` is used to store the bytes before they are copied to `dst`.
96 /// This buffer is returned back together with the result, so it can be re-used for subsequent
97 /// reads without extra allocations. Note that the underlying `ArrayBuffer` is transferred
98 /// in the process, so any other views on the original buffer will become unusable.
99 ///
100 /// * If some bytes were read into `dst`, this returns `Ok((bytes_read, Some(buffer)))`.
101 /// * If the stream closes and no more bytes are available, this returns `Ok((0, Some(buffer)))`.
102 /// * If the stream cancels, this returns `Ok((0, None))`. In this case, the given buffer is
103 /// not returned.
104 /// * If the stream encounters an `error`, this returns `Err(error)`.
105 pub async fn read_with_buffer(
106 &mut self,
107 dst: &mut [u8],
108 buffer: Uint8Array,
109 ) -> Result<(usize, Option<Uint8Array>), JsValue> {
110 // Save the original buffer's byte offset and length.
111 let buffer_offset = buffer.byte_offset();
112 let buffer_len = buffer.byte_length();
113 // Limit view to destination slice's length.
114 let dst_len = clamp_to_u32(dst.len());
115 let view = buffer
116 .subarray(0, dst_len)
117 .unchecked_into::<sys::ArrayBufferView>();
118 // Read into view. This transfers `buffer.buffer()`.
119 let promise = self.as_raw().read(&view);
120 let js_value = JsFuture::from(promise).await?;
121 let result = sys::ReadableStreamBYOBReadResult::from(js_value);
122 let filled_view = match result.value() {
123 Some(view) => view,
124 None => {
125 // No new view was returned. The stream must have been canceled.
126 assert!(result.is_done());
127 return Ok((0, None));
128 }
129 };
130 let filled_len = checked_cast_to_usize(filled_view.byte_length());
131 debug_assert!(filled_len <= dst.len());
132 // Re-construct the original Uint8Array with the new ArrayBuffer.
133 let new_buffer = Uint8Array::new_with_byte_offset_and_length(
134 &filled_view.buffer(),
135 buffer_offset,
136 buffer_len,
137 );
138 if result.is_done() {
139 debug_assert_eq!(filled_len, 0);
140 } else {
141 filled_view.copy_to(&mut dst[0..filled_len]);
142 }
143 Ok((filled_len, Some(new_buffer)))
144 }
145
146 /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
147 /// corresponding stream.
148 ///
149 /// **Panics** if the reader still has a pending read request, i.e. if a future returned
150 /// by [`read`](Self::read) is not yet ready. For a non-panicking variant,
151 /// use [`try_release_lock`](Self::try_release_lock).
152 #[inline]
153 pub fn release_lock(mut self) {
154 self.release_lock_mut()
155 }
156
157 fn release_lock_mut(&mut self) {
158 self.as_raw()
159 .release_lock()
160 .unwrap_or_else(|error| throw_val(error.into()))
161 }
162
163 /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
164 /// corresponding stream.
165 ///
166 /// The lock cannot be released while the reader still has a pending read request, i.e.
167 /// if a future returned by [`read`](Self::read) is not yet ready. Attempting to do so will
168 /// return an error and leave the reader locked to the stream.
169 #[inline]
170 pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
171 self.as_raw().release_lock().map_err(|error| (error, self))
172 }
173
174 /// Converts this `ReadableStreamBYOBReader` into an [`AsyncRead`].
175 ///
176 /// This is similar to [`ReadableStream.into_async_read`](ReadableStream::into_async_read),
177 /// except that after the returned `AsyncRead` is dropped, the original `ReadableStream` is
178 /// still usable. This allows reading only a few bytes from the `AsyncRead`, while still
179 /// allowing another reader to read the remaining bytes later on.
180 ///
181 /// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
182 #[inline]
183 pub fn into_async_read(self) -> IntoAsyncRead<'stream> {
184 IntoAsyncRead::new(self, false)
185 }
186}
187
188impl Drop for ReadableStreamBYOBReader<'_> {
189 fn drop(&mut self) {
190 self.release_lock_mut();
191 }
192}