wasm_streams/writable/
into_sink.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::Sink;
5use futures_util::{ready, FutureExt};
6use wasm_bindgen::prelude::*;
7use wasm_bindgen_futures::JsFuture;
8
9use super::WritableStreamDefaultWriter;
10
11#[must_use = "sinks do nothing unless polled"]
19#[derive(Debug)]
20pub struct IntoSink<'writer> {
21 writer: Option<WritableStreamDefaultWriter<'writer>>,
22 ready_fut: Option<JsFuture>,
23 write_fut: Option<JsFuture>,
24 close_fut: Option<JsFuture>,
25}
26
27impl<'writer> IntoSink<'writer> {
28 #[inline]
29 pub(super) fn new(writer: WritableStreamDefaultWriter) -> IntoSink {
30 IntoSink {
31 writer: Some(writer),
32 ready_fut: None,
33 write_fut: None,
34 close_fut: None,
35 }
36 }
37
38 pub async fn abort(mut self) -> Result<(), JsValue> {
41 match self.writer.take() {
42 Some(mut writer) => writer.abort().await,
43 None => Ok(()),
44 }
45 }
46
47 pub async fn abort_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
50 match self.writer.take() {
51 Some(mut writer) => writer.abort_with_reason(reason).await,
52 None => Ok(()),
53 }
54 }
55}
56
57impl<'writer> Sink<JsValue> for IntoSink<'writer> {
58 type Error = JsValue;
59
60 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
61 let ready_fut = match self.ready_fut.as_mut() {
62 Some(fut) => fut,
63 None => match &self.writer {
64 Some(writer) => {
65 let fut = JsFuture::from(writer.as_raw().ready());
67 self.ready_fut.insert(fut)
68 }
69 None => {
70 return Poll::Ready(Ok(()));
73 }
74 },
75 };
76
77 let js_result = ready!(ready_fut.poll_unpin(cx));
79 self.ready_fut = None;
80
81 Poll::Ready(match js_result {
83 Ok(js_value) => {
84 debug_assert!(js_value.is_undefined());
85 Ok(())
86 }
87 Err(js_value) => {
88 self.writer = None;
90 Err(js_value)
91 }
92 })
93 }
94
95 fn start_send(mut self: Pin<&mut Self>, item: JsValue) -> Result<(), Self::Error> {
96 match &self.writer {
97 Some(writer) => {
98 let fut = JsFuture::from(writer.as_raw().write(item));
99 self.write_fut = Some(fut);
101 Ok(())
102 }
103 None => {
104 Ok(())
106 }
107 }
108 }
109
110 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
111 let write_fut = match self.write_fut.as_mut() {
112 Some(fut) => fut,
113 None => {
114 return Poll::Ready(Ok(()));
116 }
117 };
118
119 let js_result = ready!(write_fut.poll_unpin(cx));
121 self.write_fut = None;
122
123 Poll::Ready(match js_result {
125 Ok(js_value) => {
126 debug_assert!(js_value.is_undefined());
127 Ok(())
128 }
129 Err(js_value) => {
130 self.writer = None;
132 Err(js_value)
133 }
134 })
135 }
136
137 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 let close_fut = match self.close_fut.as_mut() {
139 Some(fut) => fut,
140 None => match &self.writer {
141 Some(writer) => {
142 let fut = JsFuture::from(writer.as_raw().close());
145 self.close_fut.insert(fut)
146 }
147 None => {
148 return Poll::Ready(Ok(()));
151 }
152 },
153 };
154
155 let js_result = ready!(close_fut.poll_unpin(cx));
157 self.close_fut = None;
158
159 self.writer = None;
161 Poll::Ready(match js_result {
162 Ok(js_value) => {
163 debug_assert!(js_value.is_undefined());
164 Ok(())
165 }
166 Err(js_value) => Err(js_value),
167 })
168 }
169}