wasm_streams/readable/
into_underlying_byte_source.rs1use std::cell::RefCell;
2use std::pin::Pin;
3use std::rc::Rc;
4
5use futures_util::future::{abortable, AbortHandle, TryFutureExt};
6use futures_util::io::{AsyncRead, AsyncReadExt};
7use js_sys::{Error as JsError, Promise, Uint8Array};
8use wasm_bindgen::prelude::*;
9use wasm_bindgen_futures::future_to_promise;
10
11use crate::util::{checked_cast_to_u32, clamp_to_usize};
12
13use super::sys;
14
15#[wasm_bindgen]
16pub(crate) struct IntoUnderlyingByteSource {
17 inner: Rc<RefCell<Inner>>,
18 default_buffer_len: usize,
19 controller: Option<sys::ReadableByteStreamController>,
20 pull_handle: Option<AbortHandle>,
21}
22
23impl IntoUnderlyingByteSource {
24 pub fn new(async_read: Box<dyn AsyncRead>, default_buffer_len: usize) -> Self {
25 IntoUnderlyingByteSource {
26 inner: Rc::new(RefCell::new(Inner::new(async_read))),
27 default_buffer_len,
28 controller: None,
29 pull_handle: None,
30 }
31 }
32}
33
34#[wasm_bindgen(inline_js = "export function bytes_literal() { return \"bytes\"; }")]
35extern "C" {
36 fn bytes_literal() -> JsValue;
37}
38
39#[allow(clippy::await_holding_refcell_ref)]
40#[wasm_bindgen]
41impl IntoUnderlyingByteSource {
42 #[wasm_bindgen(getter, js_name = type)]
50 pub fn type_(&self) -> JsValue {
51 bytes_literal()
52 }
53
54 #[wasm_bindgen(getter, js_name = autoAllocateChunkSize)]
55 pub fn auto_allocate_chunk_size(&self) -> usize {
56 self.default_buffer_len
57 }
58
59 pub fn start(&mut self, controller: sys::ReadableByteStreamController) {
60 self.controller = Some(controller);
61 }
62
63 pub fn pull(&mut self, controller: sys::ReadableByteStreamController) -> Promise {
64 let inner = self.inner.clone();
65 let fut = async move {
66 let mut inner = inner.try_borrow_mut().unwrap_throw();
69 inner.pull(controller).await
70 };
71
72 let (fut, handle) = abortable(fut);
74 let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
76
77 self.pull_handle = Some(handle);
78 future_to_promise(fut)
79 }
80
81 pub fn cancel(self) {
82 drop(self);
84 }
85}
86
87impl Drop for IntoUnderlyingByteSource {
88 fn drop(&mut self) {
89 if let Some(handle) = self.pull_handle.take() {
91 handle.abort();
92 }
93 if let Some(request) = self.controller.take().and_then(|c| c.byob_request()) {
95 request.respond(0);
96 }
97 }
98}
99
100struct Inner {
101 async_read: Option<Pin<Box<dyn AsyncRead>>>,
102 buffer: Vec<u8>,
103}
104
105impl Inner {
106 fn new(async_read: Box<dyn AsyncRead>) -> Self {
107 Inner {
108 async_read: Some(async_read.into()),
109 buffer: Vec::new(),
110 }
111 }
112
113 async fn pull(
114 &mut self,
115 controller: sys::ReadableByteStreamController,
116 ) -> Result<JsValue, JsValue> {
117 let async_read = self.async_read.as_mut().unwrap_throw();
120 let mut request = ByobRequestGuard::new(controller.byob_request().unwrap_throw());
122 let request_view = request.view();
124 let request_len = clamp_to_usize(request_view.byte_length());
125 if self.buffer.len() < request_len {
126 self.buffer.resize(request_len, 0);
127 }
128 match async_read.read(&mut self.buffer[0..request_len]).await {
129 Ok(0) => {
130 self.discard();
132 controller.close();
133 request.respond(0);
134 }
135 Ok(bytes_read) => {
136 debug_assert!(bytes_read <= request_len);
138 let bytes_read_u32 = checked_cast_to_u32(bytes_read);
139 let dest = Uint8Array::new_with_byte_offset_and_length(
140 &request_view.buffer(),
141 request_view.byte_offset(),
142 bytes_read_u32,
143 );
144 dest.copy_from(&self.buffer[0..bytes_read]);
145 request.respond(bytes_read_u32);
147 }
148 Err(err) => {
149 self.discard();
151 return Err(JsError::new(&err.to_string()).into());
152 }
153 };
154 Ok(JsValue::undefined())
155 }
156
157 #[inline]
158 fn discard(&mut self) {
159 self.async_read = None;
160 self.buffer = Vec::new();
161 }
162}
163
164#[derive(Debug)]
165struct ByobRequestGuard(Option<sys::ReadableStreamBYOBRequest>);
166
167impl ByobRequestGuard {
168 fn new(request: sys::ReadableStreamBYOBRequest) -> Self {
169 Self(Some(request))
170 }
171
172 fn view(&mut self) -> sys::ArrayBufferView {
173 self.0.as_mut().unwrap_throw().view().unwrap_throw()
174 }
175
176 fn respond(mut self, bytes_read: u32) {
177 self.0.take().unwrap_throw().respond(bytes_read);
178 }
179}
180
181impl Drop for ByobRequestGuard {
182 fn drop(&mut self) {
183 if let Some(request) = self.0.take() {
185 request.respond(0);
186 }
187 }
188}