wasm_streams/readable/
into_underlying_byte_source.rs

1use std::cell::RefCell;
2use std::pin::Pin;
3use std::rc::Rc;
4
5use futures_util::future::{abortable, AbortHandle, TryFutureExt};
6use futures_util::io::{AsyncRead, AsyncReadExt};
7use js_sys::{Error as JsError, Promise, Uint8Array};
8use wasm_bindgen::prelude::*;
9use wasm_bindgen_futures::future_to_promise;
10
11use crate::util::{checked_cast_to_u32, clamp_to_usize};
12
13use super::sys;
14
15#[wasm_bindgen]
16pub(crate) struct IntoUnderlyingByteSource {
17    inner: Rc<RefCell<Inner>>,
18    default_buffer_len: usize,
19    controller: Option<sys::ReadableByteStreamController>,
20    pull_handle: Option<AbortHandle>,
21}
22
23impl IntoUnderlyingByteSource {
24    pub fn new(async_read: Box<dyn AsyncRead>, default_buffer_len: usize) -> Self {
25        IntoUnderlyingByteSource {
26            inner: Rc::new(RefCell::new(Inner::new(async_read))),
27            default_buffer_len,
28            controller: None,
29            pull_handle: None,
30        }
31    }
32}
33
34#[wasm_bindgen(inline_js = "export function bytes_literal() { return \"bytes\"; }")]
35extern "C" {
36    fn bytes_literal() -> JsValue;
37}
38
39#[allow(clippy::await_holding_refcell_ref)]
40#[wasm_bindgen]
41impl IntoUnderlyingByteSource {
42    // Chromium has a bug where it only recognizes `new ReadableStream({ type: "bytes" })`,
43    // not `new ReadableStream({ type: "by" + "tes" })` or any other non-literal string
44    // that equals "bytes". Therefore, we cannot return a Rust `String` here, since
45    // that needs to be converted to a JavaScript string at runtime.
46    // Instead, we call a function that returns the desired string literal as a `JsValue`
47    // and pass that value around. It looks silly, but it works.
48    // See https://crbug.com/1187774
49    #[wasm_bindgen(getter, js_name = type)]
50    pub fn type_(&self) -> JsValue {
51        bytes_literal()
52    }
53
54    #[wasm_bindgen(getter, js_name = autoAllocateChunkSize)]
55    pub fn auto_allocate_chunk_size(&self) -> usize {
56        self.default_buffer_len
57    }
58
59    pub fn start(&mut self, controller: sys::ReadableByteStreamController) {
60        self.controller = Some(controller);
61    }
62
63    pub fn pull(&mut self, controller: sys::ReadableByteStreamController) -> Promise {
64        let inner = self.inner.clone();
65        let fut = async move {
66            // This mutable borrow can never panic, since the ReadableStream always queues
67            // each operation on the underlying source.
68            let mut inner = inner.try_borrow_mut().unwrap_throw();
69            inner.pull(controller).await
70        };
71
72        // Allow aborting the future from cancel().
73        let (fut, handle) = abortable(fut);
74        // Ignore errors from aborting the future.
75        let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
76
77        self.pull_handle = Some(handle);
78        future_to_promise(fut)
79    }
80
81    pub fn cancel(self) {
82        // The stream has been canceled, drop everything.
83        drop(self);
84    }
85}
86
87impl Drop for IntoUnderlyingByteSource {
88    fn drop(&mut self) {
89        // Abort the pending pull, if any.
90        if let Some(handle) = self.pull_handle.take() {
91            handle.abort();
92        }
93        // Close the pending BYOB request, if any. This is necessary for cancellation.
94        if let Some(request) = self.controller.take().and_then(|c| c.byob_request()) {
95            request.respond(0);
96        }
97    }
98}
99
100struct Inner {
101    async_read: Option<Pin<Box<dyn AsyncRead>>>,
102    buffer: Vec<u8>,
103}
104
105impl Inner {
106    fn new(async_read: Box<dyn AsyncRead>) -> Self {
107        Inner {
108            async_read: Some(async_read.into()),
109            buffer: Vec::new(),
110        }
111    }
112
113    async fn pull(
114        &mut self,
115        controller: sys::ReadableByteStreamController,
116    ) -> Result<JsValue, JsValue> {
117        // The AsyncRead should still exist, since pull() will not be called again
118        // after the stream has closed or encountered an error.
119        let async_read = self.async_read.as_mut().unwrap_throw();
120        // We set autoAllocateChunkSize, so there should always be a BYOB request.
121        let mut request = ByobRequestGuard::new(controller.byob_request().unwrap_throw());
122        // Resize the buffer to fit the BYOB request.
123        let request_view = request.view();
124        let request_len = clamp_to_usize(request_view.byte_length());
125        if self.buffer.len() < request_len {
126            self.buffer.resize(request_len, 0);
127        }
128        match async_read.read(&mut self.buffer[0..request_len]).await {
129            Ok(0) => {
130                // The stream has closed, drop it.
131                self.discard();
132                controller.close();
133                request.respond(0);
134            }
135            Ok(bytes_read) => {
136                // Copy read bytes from buffer to BYOB request view
137                debug_assert!(bytes_read <= request_len);
138                let bytes_read_u32 = checked_cast_to_u32(bytes_read);
139                let dest = Uint8Array::new_with_byte_offset_and_length(
140                    &request_view.buffer(),
141                    request_view.byte_offset(),
142                    bytes_read_u32,
143                );
144                dest.copy_from(&self.buffer[0..bytes_read]);
145                // Respond to BYOB request
146                request.respond(bytes_read_u32);
147            }
148            Err(err) => {
149                // The stream encountered an error, drop it.
150                self.discard();
151                return Err(JsError::new(&err.to_string()).into());
152            }
153        };
154        Ok(JsValue::undefined())
155    }
156
157    #[inline]
158    fn discard(&mut self) {
159        self.async_read = None;
160        self.buffer = Vec::new();
161    }
162}
163
164#[derive(Debug)]
165struct ByobRequestGuard(Option<sys::ReadableStreamBYOBRequest>);
166
167impl ByobRequestGuard {
168    fn new(request: sys::ReadableStreamBYOBRequest) -> Self {
169        Self(Some(request))
170    }
171
172    fn view(&mut self) -> sys::ArrayBufferView {
173        self.0.as_mut().unwrap_throw().view().unwrap_throw()
174    }
175
176    fn respond(mut self, bytes_read: u32) {
177        self.0.take().unwrap_throw().respond(bytes_read);
178    }
179}
180
181impl Drop for ByobRequestGuard {
182    fn drop(&mut self) {
183        // Close the BYOB request, if still pending. This is necessary for cancellation.
184        if let Some(request) = self.0.take() {
185            request.respond(0);
186        }
187    }
188}