wasm_streams/writable/
default_writer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use std::marker::PhantomData;

use wasm_bindgen::JsValue;

use crate::util::promise_to_void_future;

use super::{sys, IntoAsyncWrite, IntoSink, WritableStream};

/// A [`WritableStreamDefaultWriter`](https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultWriter)
/// that can be used to write chunks to a [`WritableStream`](WritableStream).
///
/// This is returned by the [`get_writer`](WritableStream::get_writer) method.
///
/// When the writer is dropped, it automatically [releases its lock](https://streams.spec.whatwg.org/#release-a-lock).
#[derive(Debug)]
pub struct WritableStreamDefaultWriter<'stream> {
    raw: sys::WritableStreamDefaultWriter,
    _stream: PhantomData<&'stream mut WritableStream>,
}

impl<'stream> WritableStreamDefaultWriter<'stream> {
    pub(crate) fn new(stream: &mut WritableStream) -> Result<Self, js_sys::Error> {
        Ok(Self {
            raw: stream.as_raw().get_writer()?,
            _stream: PhantomData,
        })
    }

    /// Acquires a reference to the underlying [JavaScript writer](sys::WritableStreamDefaultWriter).
    #[inline]
    pub fn as_raw(&self) -> &sys::WritableStreamDefaultWriter {
        &self.raw
    }

    /// Waits for the stream to become closed.
    ///
    /// This returns an error if the stream ever errors, or if the writer's lock is
    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
    /// closing.
    pub async fn closed(&self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().closed()).await
    }

    /// Returns the desired size to fill the stream's internal queue.
    ///
    /// * It can be negative, if the queue is over-full.
    ///   A producer can use this information to determine the right amount of data to write.
    /// * It will be `None` if the stream cannot be successfully written to
    ///   (due to either being errored, or having an abort queued up).
    /// * It will return zero if the stream is closed.
    #[inline]
    pub fn desired_size(&self) -> Option<f64> {
        self.as_raw().desired_size()
    }

    /// Waits until the desired size to fill the stream's internal queue transitions
    /// from non-positive to positive, signaling that it is no longer applying backpressure.
    ///
    /// Once the desired size to fill the stream's internal queue dips back to zero or below,
    /// this will return a new future that stays pending until the next transition.
    ///
    /// This returns an error if the stream ever errors, or if the writer's lock is
    /// [released](https://streams.spec.whatwg.org/#release-a-lock) before the stream finishes
    /// closing.
    pub async fn ready(&self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().ready()).await
    }

    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream,
    /// signaling that the producer can no longer successfully write to the stream.
    ///
    /// Equivalent to [`WritableStream.abort`](WritableStream::abort).
    pub async fn abort(&mut self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().abort()).await
    }

    /// [Aborts](https://streams.spec.whatwg.org/#abort-a-writable-stream) the stream with the
    /// given `reason`, signaling that the producer can no longer successfully write to the stream.
    ///
    /// Equivalent to [`WritableStream.abort_with_reason`](WritableStream::abort_with_reason).
    pub async fn abort_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().abort_with_reason(reason)).await
    }

    /// Writes the given `chunk` to the writable stream, by waiting until any previous writes
    /// have finished successfully, and then sending the chunk to the underlying sink's `write()`
    /// method.
    ///
    /// This returns `Ok(())` upon a successful write, or `Err(error)` if the write fails or stream
    /// becomes errored before the writing process is initiated.
    ///
    /// Note that what "success" means is up to the underlying sink; it might indicate simply
    /// that the chunk has been accepted, and not necessarily that it is safely saved to
    /// its ultimate destination.
    pub async fn write(&mut self, chunk: JsValue) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().write(chunk)).await
    }

    /// Closes the stream.
    ///
    /// The underlying sink will finish processing any previously-written chunks, before invoking
    /// its close behavior. During this time any further attempts to write will fail
    /// (without erroring the stream).
    ///
    /// This returns `Ok(())` if all remaining chunks are successfully written and the stream
    /// successfully closes, or `Err(error)` if an error is encountered during this process.
    pub async fn close(&mut self) -> Result<(), JsValue> {
        promise_to_void_future(self.as_raw().close()).await
    }

    /// Converts this `WritableStreamDefaultWriter` into a [`Sink`].
    ///
    /// This is similar to [`WritableStream.into_sink`](WritableStream::into_sink),
    /// except that after the returned `Sink` is dropped, the original `WritableStream` is still
    /// usable. This allows writing only a few chunks through the `Sink`, while still allowing
    /// another writer to write more chunks later on.
    ///
    /// [`Sink`]: https://docs.rs/futures/0.3.18/futures/sink/trait.Sink.html
    #[inline]
    pub fn into_sink(self) -> IntoSink<'stream> {
        IntoSink::new(self)
    }

    /// Converts this `WritableStreamDefaultWriter` into an [`AsyncWrite`].
    ///
    /// The writable stream must accept [`Uint8Array`](js_sys::Uint8Array) chunks.
    ///
    /// This is similar to [`WritableStream.into_async_write`](WritableStream::into_async_write),
    /// except that after the returned `AsyncWrite` is dropped, the original `WritableStream` is
    /// still usable. This allows writing only a few bytes through the `AsyncWrite`, while still
    /// allowing another writer to write more bytes later on.
    ///
    /// [`AsyncWrite`]: https://docs.rs/futures/0.3.18/futures/io/trait.AsyncWrite.html
    #[inline]
    pub fn into_async_write(self) -> IntoAsyncWrite<'stream> {
        IntoAsyncWrite::new(self.into_sink())
    }
}

impl Drop for WritableStreamDefaultWriter<'_> {
    fn drop(&mut self) {
        self.as_raw().release_lock()
    }
}