wasm_streams/writable/
into_sink.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::Sink;
5use futures_util::{ready, FutureExt};
6use wasm_bindgen::prelude::*;
7use wasm_bindgen_futures::JsFuture;
8
9use super::WritableStreamDefaultWriter;
10
11/// A [`Sink`] for the [`into_sink`](super::WritableStream::into_sink) method.
12///
13/// This sink holds a writer, and therefore locks the [`WritableStream`](super::WritableStream).
14/// When this sink is dropped, it also drops its writer which in turn
15/// [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
16///
17/// [`Sink`]: https://docs.rs/futures/0.3.18/futures/sink/trait.Sink.html
18#[must_use = "sinks do nothing unless polled"]
19#[derive(Debug)]
20pub struct IntoSink<'writer> {
21    writer: Option<WritableStreamDefaultWriter<'writer>>,
22    ready_fut: Option<JsFuture>,
23    write_fut: Option<JsFuture>,
24    close_fut: Option<JsFuture>,
25}
26
27impl<'writer> IntoSink<'writer> {
28    #[inline]
29    pub(super) fn new(writer: WritableStreamDefaultWriter) -> IntoSink {
30        IntoSink {
31            writer: Some(writer),
32            ready_fut: None,
33            write_fut: None,
34            close_fut: None,
35        }
36    }
37
38    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream,
39    /// signaling that the producer can no longer successfully write to the stream.
40    pub async fn abort(mut self) -> Result<(), JsValue> {
41        match self.writer.take() {
42            Some(mut writer) => writer.abort().await,
43            None => Ok(()),
44        }
45    }
46
47    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream,
48    /// signaling that the producer can no longer successfully write to the stream.
49    pub async fn abort_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
50        match self.writer.take() {
51            Some(mut writer) => writer.abort_with_reason(reason).await,
52            None => Ok(()),
53        }
54    }
55}
56
57impl<'writer> Sink<JsValue> for IntoSink<'writer> {
58    type Error = JsValue;
59
60    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61        let ready_fut = match self.ready_fut.as_mut() {
62            Some(fut) => fut,
63            None => match &self.writer {
64                Some(writer) => {
65                    // No pending ready future yet, create one from ready promise
66                    let fut = JsFuture::from(writer.as_raw().ready());
67                    self.ready_fut.insert(fut)
68                }
69                None => {
70                    // Writer was already dropped
71                    // TODO Return error?
72                    return Poll::Ready(Ok(()));
73                }
74            },
75        };
76
77        // Poll the ready future
78        let js_result = ready!(ready_fut.poll_unpin(cx));
79        self.ready_fut = None;
80
81        // Ready future completed
82        Poll::Ready(match js_result {
83            Ok(js_value) => {
84                debug_assert!(js_value.is_undefined());
85                Ok(())
86            }
87            Err(js_value) => {
88                // Error, drop writer
89                self.writer = None;
90                Err(js_value)
91            }
92        })
93    }
94
95    fn start_send(mut self: Pin<&mut Self>, item: JsValue) -> Result<(), Self::Error> {
96        match &self.writer {
97            Some(writer) => {
98                let fut = JsFuture::from(writer.as_raw().write(item));
99                // Set or replace the pending write future
100                self.write_fut = Some(fut);
101                Ok(())
102            }
103            None => {
104                // TODO Return error?
105                Ok(())
106            }
107        }
108    }
109
110    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
111        let write_fut = match self.write_fut.as_mut() {
112            Some(fut) => fut,
113            None => {
114                // If we're not writing, then there's nothing to flush
115                return Poll::Ready(Ok(()));
116            }
117        };
118
119        // Poll the write future
120        let js_result = ready!(write_fut.poll_unpin(cx));
121        self.write_fut = None;
122
123        // Write future completed
124        Poll::Ready(match js_result {
125            Ok(js_value) => {
126                debug_assert!(js_value.is_undefined());
127                Ok(())
128            }
129            Err(js_value) => {
130                // Error, drop writer
131                self.writer = None;
132                Err(js_value)
133            }
134        })
135    }
136
137    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        let close_fut = match self.close_fut.as_mut() {
139            Some(fut) => fut,
140            None => match &self.writer {
141                Some(writer) => {
142                    // No pending close future
143                    // Start closing the stream and create future from close promise
144                    let fut = JsFuture::from(writer.as_raw().close());
145                    self.close_fut.insert(fut)
146                }
147                None => {
148                    // Writer was already dropped
149                    // TODO Return error?
150                    return Poll::Ready(Ok(()));
151                }
152            },
153        };
154
155        // Poll the close future
156        let js_result = ready!(close_fut.poll_unpin(cx));
157        self.close_fut = None;
158
159        // Close future completed
160        self.writer = None;
161        Poll::Ready(match js_result {
162            Ok(js_value) => {
163                debug_assert!(js_value.is_undefined());
164                Ok(())
165            }
166            Err(js_value) => Err(js_value),
167        })
168    }
169}