wasm_streams/readable/
into_async_read.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3
4use futures_util::io::{AsyncRead, Error};
5use futures_util::ready;
6use futures_util::FutureExt;
7use js_sys::Uint8Array;
8use wasm_bindgen::prelude::*;
9use wasm_bindgen::JsCast;
10use wasm_bindgen_futures::JsFuture;
11
12use crate::util::{checked_cast_to_usize, clamp_to_u32, js_to_io_error};
13
14use super::sys::{ArrayBufferView, ReadableStreamBYOBReadResult};
15use super::ReadableStreamBYOBReader;
16
17#[must_use = "readers do nothing unless polled"]
32#[derive(Debug)]
33pub struct IntoAsyncRead<'reader> {
34 reader: Option<ReadableStreamBYOBReader<'reader>>,
35 buffer: Option<Uint8Array>,
36 fut: Option<JsFuture>,
37 cancel_on_drop: bool,
38}
39
40impl<'reader> IntoAsyncRead<'reader> {
41 #[inline]
42 pub(super) fn new(reader: ReadableStreamBYOBReader, cancel_on_drop: bool) -> IntoAsyncRead {
43 IntoAsyncRead {
44 reader: Some(reader),
45 buffer: None,
46 fut: None,
47 cancel_on_drop,
48 }
49 }
50
51 pub async fn cancel(mut self) -> Result<(), JsValue> {
54 match self.reader.take() {
55 Some(mut reader) => reader.cancel().await,
56 None => Ok(()),
57 }
58 }
59
60 pub async fn cancel_with_reason(mut self, reason: &JsValue) -> Result<(), JsValue> {
63 match self.reader.take() {
64 Some(mut reader) => reader.cancel_with_reason(reason).await,
65 None => Ok(()),
66 }
67 }
68
69 #[inline]
70 fn discard_reader(mut self: Pin<&mut Self>) {
71 self.reader = None;
72 self.buffer = None;
73 }
74}
75
76impl<'reader> AsyncRead for IntoAsyncRead<'reader> {
77 fn poll_read(
78 mut self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 buf: &mut [u8],
81 ) -> Poll<Result<usize, Error>> {
82 let read_fut = match self.fut.as_mut() {
83 Some(fut) => fut,
84 None => {
85 let buf_len = clamp_to_u32(buf.len());
87 let buffer = match self.buffer.take() {
88 Some(buffer) if buffer.byte_length() >= buf_len => buffer,
91 _ => Uint8Array::new_with_length(buf_len),
92 };
93 let buffer = buffer
95 .subarray(0, buf_len)
96 .unchecked_into::<ArrayBufferView>();
97 match &self.reader {
98 Some(reader) => {
99 let fut = JsFuture::from(reader.as_raw().read(&buffer));
101 self.fut.insert(fut)
102 }
103 None => {
104 return Poll::Ready(Ok(0));
106 }
107 }
108 }
109 };
110
111 let js_result = ready!(read_fut.poll_unpin(cx));
113 self.fut = None;
114
115 Poll::Ready(match js_result {
117 Ok(js_value) => {
118 let result = ReadableStreamBYOBReadResult::from(js_value);
119 if result.is_done() {
120 self.discard_reader();
122 Ok(0)
123 } else {
124 let filled_view = result.value().unwrap_throw();
126 let filled_len = checked_cast_to_usize(filled_view.byte_length());
128 debug_assert!(filled_len <= buf.len());
129 filled_view.copy_to(&mut buf[0..filled_len]);
130 self.buffer = Some(Uint8Array::new(&filled_view.buffer()));
132 Ok(filled_len)
133 }
134 }
135 Err(js_value) => {
136 self.discard_reader();
138 Err(js_to_io_error(js_value))
139 }
140 })
141 }
142}
143
144impl<'reader> Drop for IntoAsyncRead<'reader> {
145 fn drop(&mut self) {
146 if self.cancel_on_drop {
147 if let Some(reader) = self.reader.take() {
148 let _ = reader.as_raw().cancel().catch(&Closure::once(|_| {}));
149 }
150 }
151 }
152}