wasm_streams/readable/default_reader.rs
1use std::marker::PhantomData;
2
3use wasm_bindgen::{throw_val, JsValue};
4use wasm_bindgen_futures::JsFuture;
5
6use crate::util::promise_to_void_future;
7
8use super::{sys, IntoStream, ReadableStream};
9
10/// A [`ReadableStreamDefaultReader`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader)
11/// that can be used to read chunks from a [`ReadableStream`](ReadableStream).
12///
13/// This is returned by the [`get_reader`](ReadableStream::get_reader) method.
14///
15/// When the reader is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
16/// If the reader still has a pending read request at this point (i.e. if a future returned
17/// by [`read`](Self::read) is not yet ready), then this will **panic**. You must either `await`
18/// all `read` futures, or [`cancel`](Self::cancel) the stream to discard any pending `read` futures.
19#[derive(Debug)]
20pub struct ReadableStreamDefaultReader<'stream> {
21 raw: sys::ReadableStreamDefaultReader,
22 _stream: PhantomData<&'stream mut ReadableStream>,
23}
24
25impl<'stream> ReadableStreamDefaultReader<'stream> {
26 pub(crate) fn new(stream: &mut ReadableStream) -> Result<Self, js_sys::Error> {
27 Ok(Self {
28 raw: stream.as_raw().get_reader()?,
29 _stream: PhantomData,
30 })
31 }
32
33 /// Acquires a reference to the underlying [JavaScript reader](sys::ReadableStreamDefaultReader).
34 #[inline]
35 pub fn as_raw(&self) -> &sys::ReadableStreamDefaultReader {
36 &self.raw
37 }
38
39 /// Waits for the stream to become closed.
40 ///
41 /// This returns an error if the stream ever errors, or if the reader's lock is
42 /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
43 /// closing.
44 pub async fn closed(&self) -> Result<(), JsValue> {
45 promise_to_void_future(self.as_raw().closed()).await
46 }
47
48 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
49 /// signaling a loss of interest in the stream by a consumer.
50 ///
51 /// Equivalent to [`ReadableStream.cancel`](ReadableStream::cancel).
52 pub async fn cancel(&mut self) -> Result<(), JsValue> {
53 promise_to_void_future(self.as_raw().cancel()).await
54 }
55
56 /// [Cancels](https://streams.spec.whatwg.org/#cancel-a-readable-stream) the stream,
57 /// signaling a loss of interest in the stream by a consumer.
58 ///
59 /// Equivalent to [`ReadableStream.cancel_with_reason`](ReadableStream::cancel_with_reason).
60 pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
61 promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
62 }
63
64 /// Reads the next chunk from the stream's internal queue.
65 ///
66 /// * If a next `chunk` becomes available, this returns `Ok(Some(chunk))`.
67 /// * If the stream closes and no more chunks are available, this returns `Ok(None)`.
68 /// * If the stream encounters an `error`, this returns `Err(error)`.
69 pub async fn read(&mut self) -> Result<Option<JsValue>, JsValue> {
70 let promise = self.as_raw().read();
71 let js_value = JsFuture::from(promise).await?;
72 let result = sys::ReadableStreamDefaultReadResult::from(js_value);
73 if result.is_done() {
74 Ok(None)
75 } else {
76 Ok(Some(result.value()))
77 }
78 }
79
80 /// [Releases](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
81 /// corresponding stream.
82 ///
83 /// **Panics** if the reader still has a pending read request, i.e. if a future returned
84 /// by [`read`](Self::read) is not yet ready. For a non-panicking variant,
85 /// use [`try_release_lock`](Self::try_release_lock).
86 #[inline]
87 pub fn release_lock(mut self) {
88 self.release_lock_mut()
89 }
90
91 fn release_lock_mut(&mut self) {
92 self.as_raw()
93 .release_lock()
94 .unwrap_or_else(|error| throw_val(error.into()))
95 }
96
97 /// Try to [release](https://streams.spec.whatwg.org/#release-a-lock) this reader's lock on the
98 /// corresponding stream.
99 ///
100 /// The lock cannot be released while the reader still has a pending read request, i.e.
101 /// if a future returned by [`read`](Self::read) is not yet ready. Attempting to do so will
102 /// return an error and leave the reader locked to the stream.
103 #[inline]
104 pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
105 self.as_raw().release_lock().map_err(|error| (error, self))
106 }
107
108 /// Converts this `ReadableStreamDefaultReader` into a [`Stream`].
109 ///
110 /// This is similar to [`ReadableStream.into_stream`](ReadableStream::into_stream),
111 /// except that after the returned `Stream` is dropped, the original `ReadableStream` is still
112 /// usable. This allows reading only a few chunks from the `Stream`, while still allowing
113 /// another reader to read the remaining chunks later on.
114 ///
115 /// [`Stream`]: https://docs.rs/futures/0.3.18/futures/stream/trait.Stream.html
116 #[inline]
117 pub fn into_stream(self) -> IntoStream<'stream> {
118 IntoStream::new(self, false)
119 }
120}
121
122impl Drop for ReadableStreamDefaultReader<'_> {
123 fn drop(&mut self) {
124 self.release_lock_mut();
125 }
126}