wasm_streams/readable/
into_underlying_source.rs

1use std::cell::RefCell;
2use std::pin::Pin;
3use std::rc::Rc;
4
5use futures_util::future::{abortable, AbortHandle, TryFutureExt};
6use futures_util::stream::{Stream, TryStreamExt};
7use js_sys::Promise;
8use wasm_bindgen::prelude::*;
9use wasm_bindgen_futures::future_to_promise;
10
11use super::sys;
12
13type JsValueStream = dyn Stream<Item = Result<JsValue, JsValue>>;
14
15#[wasm_bindgen]
16pub(crate) struct IntoUnderlyingSource {
17    inner: Rc<RefCell<Inner>>,
18    pull_handle: Option<AbortHandle>,
19}
20
21impl IntoUnderlyingSource {
22    pub fn new(stream: Box<JsValueStream>) -> Self {
23        IntoUnderlyingSource {
24            inner: Rc::new(RefCell::new(Inner::new(stream))),
25            pull_handle: None,
26        }
27    }
28}
29
30#[allow(clippy::await_holding_refcell_ref)]
31#[wasm_bindgen]
32impl IntoUnderlyingSource {
33    pub fn pull(&mut self, controller: sys::ReadableStreamDefaultController) -> Promise {
34        let inner = self.inner.clone();
35        let fut = async move {
36            // This mutable borrow can never panic, since the ReadableStream always queues
37            // each operation on the underlying source.
38            let mut inner = inner.try_borrow_mut().unwrap_throw();
39            inner.pull(controller).await
40        };
41
42        // Allow aborting the future from cancel().
43        let (fut, handle) = abortable(fut);
44        // Ignore errors from aborting the future.
45        let fut = fut.unwrap_or_else(|_| Ok(JsValue::undefined()));
46
47        self.pull_handle = Some(handle);
48        future_to_promise(fut)
49    }
50
51    pub fn cancel(self) {
52        // The stream has been canceled, drop everything.
53        drop(self);
54    }
55}
56
57impl Drop for IntoUnderlyingSource {
58    fn drop(&mut self) {
59        // Abort the pending pull, if any.
60        if let Some(handle) = self.pull_handle.take() {
61            handle.abort();
62        }
63    }
64}
65
66struct Inner {
67    stream: Option<Pin<Box<JsValueStream>>>,
68}
69
70impl Inner {
71    fn new(stream: Box<JsValueStream>) -> Self {
72        Inner {
73            stream: Some(stream.into()),
74        }
75    }
76
77    async fn pull(
78        &mut self,
79        controller: sys::ReadableStreamDefaultController,
80    ) -> Result<JsValue, JsValue> {
81        // The stream should still exist, since pull() will not be called again
82        // after the stream has closed or encountered an error.
83        let stream = self.stream.as_mut().unwrap_throw();
84        match stream.try_next().await {
85            Ok(Some(chunk)) => controller.enqueue(&chunk),
86            Ok(None) => {
87                // The stream has closed, drop it.
88                self.stream = None;
89                controller.close();
90            }
91            Err(err) => {
92                // The stream encountered an error, drop it.
93                self.stream = None;
94                return Err(err);
95            }
96        };
97        Ok(JsValue::undefined())
98    }
99}