wasm_streams/readable/
into_async_read.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::io::{AsyncRead, Error};
5use futures_util::ready;
6use futures_util::FutureExt;
7use js_sys::Uint8Array;
8use wasm_bindgen::prelude::*;
9use wasm_bindgen::JsCast;
10use wasm_bindgen_futures::JsFuture;
11
12use crate::util::{checked_cast_to_usize, clamp_to_u32, js_to_io_error};
13
14use super::sys::{ArrayBufferView, ReadableStreamBYOBReadResult};
15use super::ReadableStreamBYOBReader;
16
17/// An [`AsyncRead`] for the [`into_async_read`](super::ReadableStream::into_async_read) method.
18///
19/// This `AsyncRead` holds a reader, and therefore locks the [`ReadableStream`](super::ReadableStream).
20/// When this `AsyncRead` is dropped, it also drops its reader which in turn
21/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
22///
23/// When used through [`ReadableStream::into_async_read`](super::ReadableStream::into_async_read),
24/// the stream is automatically cancelled before dropping the reader, discarding any pending read requests.
25/// When used through [`ReadableStreamBYOBReader::into_async_read`](super::ReadableStreamBYOBReader::into_async_read),
26/// it is up to the user to either manually [cancel](Self::cancel) the stream,
27/// or to ensure that there are no pending read requests when dropped.
28/// See the documentation on [`ReadableStreamBYOBReader`] for more details on the drop behavior.
29///
30/// [`AsyncRead`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncRead.html
31#[must_use = "readers do nothing unless polled"]
32#[derive(Debug)]
33pub struct IntoAsyncRead<'reader> {
34    reader: Option<ReadableStreamBYOBReader<'reader>>,
35    buffer: Option<Uint8Array>,
36    fut: Option<JsFuture>,
37    cancel_on_drop: bool,
38}
39
40impl<'reader> IntoAsyncRead<'reader> {
41    #[inline]
42    pub(super) fn new(reader: ReadableStreamBYOBReader, cancel_on_drop: bool) -> IntoAsyncRead {
43        IntoAsyncRead {
44            reader: Some(reader),
45            buffer: None,
46            fut: None,
47            cancel_on_drop,
48        }
49    }
50
51    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
52    /// signaling a loss of interest in the stream by a consumer.
53    pub async fn cancel(mut self) -> Result<(), JsValue> {
54        match self.reader.take() {
55            Some(mut reader) => reader.cancel().await,
56            None => Ok(()),
57        }
58    }
59
60    /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
61    /// signaling a loss of interest in the stream by a consumer.
62    pub async fn cancel_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
63        match self.reader.take() {
64            Some(mut reader) => reader.cancel_with_reason(reason).await,
65            None => Ok(()),
66        }
67    }
68
69    #[inline]
70    fn discard_reader(mut self: Pin<&mut Self>) {
71        self.reader = None;
72        self.buffer = None;
73    }
74}
75
76impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
77    fn poll_read(
78        mut self: Pin<&mut Self>,
79        cx: &mut Context<'_>,
80        buf: &mut [u8],
81    ) -> Poll<Result<usize, Error>> {
82        let read_fut = match self.fut.as_mut() {
83            Some(fut) => fut,
84            None => {
85                // No pending read, start reading the next bytes
86                let buf_len = clamp_to_u32(buf.len());
87                let buffer = match self.buffer.take() {
88                    // Re-use the internal buffer if it is large enough,
89                    // otherwise allocate a new one
90                    Some(buffer) if buffer.byte_length() >= buf_len => buffer,
91                    _ => Uint8Array::new_with_length(buf_len),
92                };
93                // Limit to output buffer size
94                let buffer = buffer
95                    .subarray(0, buf_len)
96                    .unchecked_into::<ArrayBufferView>();
97                match &self.reader {
98                    Some(reader) => {
99                        // Read into internal buffer and store its future
100                        let fut = JsFuture::from(reader.as_raw().read(&buffer));
101                        self.fut.insert(fut)
102                    }
103                    None => {
104                        // Reader was already dropped
105                        return Poll::Ready(Ok(0));
106                    }
107                }
108            }
109        };
110
111        // Poll the future for the pending read
112        let js_result = ready!(read_fut.poll_unpin(cx));
113        self.fut = None;
114
115        // Read completed
116        Poll::Ready(match js_result {
117            Ok(js_value) => {
118                let result = ReadableStreamBYOBReadResult::from(js_value);
119                if result.is_done() {
120                    // End of stream
121                    self.discard_reader();
122                    Ok(0)
123                } else {
124                    // Cannot be canceled, so view must exist
125                    let filled_view = result.value().unwrap_throw();
126                    // Copy bytes to output buffer
127                    let filled_len = checked_cast_to_usize(filled_view.byte_length());
128                    debug_assert!(filled_len <= buf.len());
129                    filled_view.copy_to(&mut buf[0..filled_len]);
130                    // Re-construct internal buffer with the new ArrayBuffer
131                    self.buffer = Some(Uint8Array::new(&filled_view.buffer()));
132                    Ok(filled_len)
133                }
134            }
135            Err(js_value) => {
136                // Error
137                self.discard_reader();
138                Err(js_to_io_error(js_value))
139            }
140        })
141    }
142}
143
144impl<'reader> Drop for IntoAsyncRead<'reader> {
145    fn drop(&mut self) {
146        if self.cancel_on_drop {
147            if let Some(reader) = self.reader.take() {
148                let _ = reader.as_raw().cancel().catch(&Closure::once(|_| {}));
149            }
150        }
151    }
152}