wasm_streams/readable/
into_underlying_source.rs1use std::cell::RefCell;
2use std::pin::Pin;
3use std::rc::Rc;
4
5use futures_util::future::{abortable, AbortHandle, TryFutureExt};
6use futures_util::stream::{Stream, TryStreamExt};
7use js_sys::Promise;
8use wasm_bindgen::prelude::*;
9use wasm_bindgen_futures::future_to_promise;
10
11use super::sys;
12
13type JsValueStream = dyn Stream<Item = Result<JsValue, JsValue>>;
14
15#[wasm_bindgen]
16pub(crate) struct IntoUnderlyingSource {
17 inner: Rc<RefCell<Inner>>,
18 pull_handle: Option<AbortHandle>,
19}
20
21impl IntoUnderlyingSource {
22 pub fn new(stream: Box<JsValueStream>) -> Self {
23 IntoUnderlyingSource {
24 inner: Rc::new(RefCell::new(Inner::new(stream))),
25 pull_handle: None,
26 }
27 }
28}
29
30#[allow(clippy::await_holding_refcell_ref)]
31#[wasm_bindgen]
32impl IntoUnderlyingSource {
33 pub fn pull(&mut self, controller: sys::ReadableStreamDefaultController) -> Promise {
34 let inner = self.inner.clone();
35 let fut = async move {
36 let mut inner = inner.try_borrow_mut().unwrap_throw();
39 inner.pull(controller).await
40 };
41
42 let (fut, handle) = abortable(fut);
44 let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
46
47 self.pull_handle = Some(handle);
48 future_to_promise(fut)
49 }
50
51 pub fn cancel(self) {
52 drop(self);
54 }
55}
56
57impl Drop for IntoUnderlyingSource {
58 fn drop(&mut self) {
59 if let Some(handle) = self.pull_handle.take() {
61 handle.abort();
62 }
63 }
64}
65
66struct Inner {
67 stream: Option<Pin<Box<JsValueStream>>>,
68}
69
70impl Inner {
71 fn new(stream: Box<JsValueStream>) -> Self {
72 Inner {
73 stream: Some(stream.into()),
74 }
75 }
76
77 async fn pull(
78 &mut self,
79 controller: sys::ReadableStreamDefaultController,
80 ) -> Result<JsValue, JsValue> {
81 let stream = self.stream.as_mut().unwrap_throw();
84 match stream.try_next().await {
85 Ok(Some(chunk)) => controller.enqueue(&chunk),
86 Ok(None) => {
87 self.stream = None;
89 controller.close();
90 }
91 Err(err) => {
92 self.stream = None;
94 return Err(err);
95 }
96 };
97 Ok(JsValue::undefined())
98 }
99}