wasm_streams/writable/
into_underlying_sink.rs

1use std::cell::RefCell;
2use std::pin::Pin;
3use std::rc::Rc;
4
5use futures_util::{Sink, SinkExt};
6use js_sys::Promise;
7use wasm_bindgen::prelude::*;
8use wasm_bindgen_futures::future_to_promise;
9
10#[wasm_bindgen]
11pub(crate) struct IntoUnderlyingSink {
12    inner: Rc<RefCell<Inner>>,
13}
14
15impl IntoUnderlyingSink {
16    pub fn new(sink: Box<dyn Sink<JsValue, Error = JsValue>>) -> Self {
17        IntoUnderlyingSink {
18            inner: Rc::new(RefCell::new(Inner::new(sink))),
19        }
20    }
21}
22
23#[allow(clippy::await_holding_refcell_ref)]
24#[wasm_bindgen]
25impl IntoUnderlyingSink {
26    pub fn write(&mut self, chunk: JsValue) -> Promise {
27        let inner = self.inner.clone();
28        future_to_promise(async move {
29            // This mutable borrow can never panic, since the WritableStream always queues
30            // each operation on the underlying sink.
31            let mut inner = inner.try_borrow_mut().unwrap_throw();
32            inner.write(chunk).await.map(|_| JsValue::undefined())
33        })
34    }
35
36    pub fn close(self) -> Promise {
37        future_to_promise(async move {
38            let mut inner = self.inner.try_borrow_mut().unwrap_throw();
39            inner.close().await.map(|_| JsValue::undefined())
40        })
41    }
42
43    pub fn abort(self, reason: JsValue) -> Promise {
44        future_to_promise(async move {
45            let mut inner = self.inner.try_borrow_mut().unwrap_throw();
46            inner.abort(reason).await.map(|_| JsValue::undefined())
47        })
48    }
49}
50
51struct Inner {
52    sink: Option<Pin<Box<dyn Sink<JsValue, Error = JsValue>>>>,
53}
54
55impl Inner {
56    fn new(sink: Box<dyn Sink<JsValue, Error = JsValue>>) -> Self {
57        Inner {
58            sink: Some(sink.into()),
59        }
60    }
61
62    async fn write(&mut self, chunk: JsValue) -> Result<(), JsValue> {
63        // The stream should still exist, since write() will not be called again
64        // after the sink has closed, aborted or encountered an error.
65        let sink = self.sink.as_mut().unwrap_throw();
66        match sink.send(chunk).await {
67            Ok(()) => Ok(()),
68            Err(err) => {
69                // The stream encountered an error, drop it.
70                self.sink = None;
71                Err(err)
72            }
73        }
74    }
75
76    async fn close(&mut self) -> Result<(), JsValue> {
77        let sink = self.sink.as_mut().unwrap_throw();
78        let result = sink.close().await;
79        self.sink = None;
80        result
81    }
82
83    async fn abort(&mut self, _reason: JsValue) -> Result<(), JsValue> {
84        self.sink = None;
85        Ok(())
86    }
87}