reqwest/async_impl/
body.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10//use sync_wrapper::SyncWrapper;
11use pin_project_lite::pin_project;
12#[cfg(feature = "stream")]
13use tokio::fs::File;
14use tokio::time::Sleep;
15#[cfg(feature = "stream")]
16use tokio_util::io::ReaderStream;
17
18/// An asynchronous request body.
19pub struct Body {
20    inner: Inner,
21}
22
23enum Inner {
24    Reusable(Bytes),
25    Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
26}
27
28pin_project! {
29    /// A body with a total timeout.
30    ///
31    /// The timeout does not reset upon each chunk, but rather requires the whole
32    /// body be streamed before the deadline is reached.
33    pub(crate) struct TotalTimeoutBody<B> {
34        #[pin]
35        inner: B,
36        timeout: Pin<Box<Sleep>>,
37    }
38}
39
40pin_project! {
41    pub(crate) struct ReadTimeoutBody<B> {
42        #[pin]
43        inner: B,
44        #[pin]
45        sleep: Option<Sleep>,
46        timeout: Duration,
47    }
48}
49
50/// Converts any `impl Body` into a `impl Stream` of just its DATA frames.
51#[cfg(any(feature = "stream", feature = "multipart",))]
52pub(crate) struct DataStream<B>(pub(crate) B);
53
54impl Body {
55    /// Returns a reference to the internal data of the `Body`.
56    ///
57    /// `None` is returned, if the underlying data is a stream.
58    pub fn as_bytes(&self) -> Option<&[u8]> {
59        match &self.inner {
60            Inner::Reusable(bytes) => Some(bytes.as_ref()),
61            Inner::Streaming(..) => None,
62        }
63    }
64
65    /// Wrap a futures `Stream` in a box inside `Body`.
66    ///
67    /// # Example
68    ///
69    /// ```
70    /// # use reqwest::Body;
71    /// # use futures_util;
72    /// # fn main() {
73    /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
74    ///     Ok("hello"),
75    ///     Ok(" "),
76    ///     Ok("world"),
77    /// ];
78    ///
79    /// let stream = futures_util::stream::iter(chunks);
80    ///
81    /// let body = Body::wrap_stream(stream);
82    /// # }
83    /// ```
84    ///
85    /// # Optional
86    ///
87    /// This requires the `stream` feature to be enabled.
88    #[cfg(feature = "stream")]
89    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
90    pub fn wrap_stream<S>(stream: S) -> Body
91    where
92        S: futures_core::stream::TryStream + Send + 'static,
93        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
94        Bytes: From<S::Ok>,
95    {
96        Body::stream(stream)
97    }
98
99    #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
100    pub(crate) fn stream<S>(stream: S) -> Body
101    where
102        S: futures_core::stream::TryStream + Send + 'static,
103        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
104        Bytes: From<S::Ok>,
105    {
106        use futures_util::TryStreamExt;
107        use http_body::Frame;
108        use http_body_util::StreamBody;
109
110        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
111            stream
112                .map_ok(|d| Frame::data(Bytes::from(d)))
113                .map_err(Into::into),
114        )));
115        Body {
116            inner: Inner::Streaming(body),
117        }
118    }
119
120    pub(crate) fn empty() -> Body {
121        Body::reusable(Bytes::new())
122    }
123
124    pub(crate) fn reusable(chunk: Bytes) -> Body {
125        Body {
126            inner: Inner::Reusable(chunk),
127        }
128    }
129
130    /// Wrap a [`HttpBody`] in a box inside `Body`.
131    ///
132    /// # Example
133    ///
134    /// ```
135    /// # use reqwest::Body;
136    /// # use futures_util;
137    /// # fn main() {
138    /// let content = "hello,world!".to_string();
139    ///
140    /// let body = Body::wrap(content);
141    /// # }
142    /// ```
143    pub fn wrap<B>(inner: B) -> Body
144    where
145        B: HttpBody + Send + Sync + 'static,
146        B::Data: Into<Bytes>,
147        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
148    {
149        use http_body_util::BodyExt;
150
151        let boxed = inner
152            .map_frame(|f| f.map_data(Into::into))
153            .map_err(Into::into)
154            .boxed();
155
156        Body {
157            inner: Inner::Streaming(boxed),
158        }
159    }
160
161    pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
162        let reuse = match self.inner {
163            Inner::Reusable(ref chunk) => Some(chunk.clone()),
164            Inner::Streaming { .. } => None,
165        };
166
167        (reuse, self)
168    }
169
170    pub(crate) fn try_clone(&self) -> Option<Body> {
171        match self.inner {
172            Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
173            Inner::Streaming { .. } => None,
174        }
175    }
176
177    #[cfg(feature = "multipart")]
178    pub(crate) fn into_stream(self) -> DataStream<Body> {
179        DataStream(self)
180    }
181
182    #[cfg(feature = "multipart")]
183    pub(crate) fn content_length(&self) -> Option<u64> {
184        match self.inner {
185            Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
186            Inner::Streaming(ref body) => body.size_hint().exact(),
187        }
188    }
189}
190
191impl Default for Body {
192    #[inline]
193    fn default() -> Body {
194        Body::empty()
195    }
196}
197
198/*
199impl From<hyper::Body> for Body {
200    #[inline]
201    fn from(body: hyper::Body) -> Body {
202        Self {
203            inner: Inner::Streaming {
204                body: Box::pin(WrapHyper(body)),
205            },
206        }
207    }
208}
209*/
210
211impl From<Bytes> for Body {
212    #[inline]
213    fn from(bytes: Bytes) -> Body {
214        Body::reusable(bytes)
215    }
216}
217
218impl From<Vec<u8>> for Body {
219    #[inline]
220    fn from(vec: Vec<u8>) -> Body {
221        Body::reusable(vec.into())
222    }
223}
224
225impl From<&'static [u8]> for Body {
226    #[inline]
227    fn from(s: &'static [u8]) -> Body {
228        Body::reusable(Bytes::from_static(s))
229    }
230}
231
232impl From<String> for Body {
233    #[inline]
234    fn from(s: String) -> Body {
235        Body::reusable(s.into())
236    }
237}
238
239impl From<&'static str> for Body {
240    #[inline]
241    fn from(s: &'static str) -> Body {
242        s.as_bytes().into()
243    }
244}
245
246#[cfg(feature = "stream")]
247#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
248impl From<File> for Body {
249    #[inline]
250    fn from(file: File) -> Body {
251        Body::wrap_stream(ReaderStream::new(file))
252    }
253}
254
255impl fmt::Debug for Body {
256    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
257        f.debug_struct("Body").finish()
258    }
259}
260
261impl HttpBody for Body {
262    type Data = Bytes;
263    type Error = crate::Error;
264
265    fn poll_frame(
266        mut self: Pin<&mut Self>,
267        cx: &mut Context,
268    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
269        match self.inner {
270            Inner::Reusable(ref mut bytes) => {
271                let out = bytes.split_off(0);
272                if out.is_empty() {
273                    Poll::Ready(None)
274                } else {
275                    Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
276                }
277            }
278            Inner::Streaming(ref mut body) => Poll::Ready(
279                futures_core::ready!(Pin::new(body).poll_frame(cx))
280                    .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
281            ),
282        }
283    }
284
285    fn size_hint(&self) -> http_body::SizeHint {
286        match self.inner {
287            Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
288            Inner::Streaming(ref body) => body.size_hint(),
289        }
290    }
291
292    fn is_end_stream(&self) -> bool {
293        match self.inner {
294            Inner::Reusable(ref bytes) => bytes.is_empty(),
295            Inner::Streaming(ref body) => body.is_end_stream(),
296        }
297    }
298}
299
300// ===== impl TotalTimeoutBody =====
301
302pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
303    TotalTimeoutBody {
304        inner: body,
305        timeout,
306    }
307}
308
309pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
310    ReadTimeoutBody {
311        inner: body,
312        sleep: None,
313        timeout,
314    }
315}
316
317impl<B> hyper::body::Body for TotalTimeoutBody<B>
318where
319    B: hyper::body::Body,
320    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
321{
322    type Data = B::Data;
323    type Error = crate::Error;
324
325    fn poll_frame(
326        self: Pin<&mut Self>,
327        cx: &mut Context,
328    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
329        let this = self.project();
330        if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
331            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
332        }
333        Poll::Ready(
334            futures_core::ready!(this.inner.poll_frame(cx))
335                .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
336        )
337    }
338
339    #[inline]
340    fn size_hint(&self) -> http_body::SizeHint {
341        self.inner.size_hint()
342    }
343
344    #[inline]
345    fn is_end_stream(&self) -> bool {
346        self.inner.is_end_stream()
347    }
348}
349
350impl<B> hyper::body::Body for ReadTimeoutBody<B>
351where
352    B: hyper::body::Body,
353    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
354{
355    type Data = B::Data;
356    type Error = crate::Error;
357
358    fn poll_frame(
359        self: Pin<&mut Self>,
360        cx: &mut Context,
361    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
362        let mut this = self.project();
363
364        // Start the `Sleep` if not active.
365        let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
366            some
367        } else {
368            this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
369            this.sleep.as_mut().as_pin_mut().unwrap()
370        };
371
372        // Error if the timeout has expired.
373        if let Poll::Ready(()) = sleep_pinned.poll(cx) {
374            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
375        }
376
377        let item = futures_core::ready!(this.inner.poll_frame(cx))
378            .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
379        // a ready frame means timeout is reset
380        this.sleep.set(None);
381        Poll::Ready(item)
382    }
383
384    #[inline]
385    fn size_hint(&self) -> http_body::SizeHint {
386        self.inner.size_hint()
387    }
388
389    #[inline]
390    fn is_end_stream(&self) -> bool {
391        self.inner.is_end_stream()
392    }
393}
394
395pub(crate) type ResponseBody =
396    http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
397
398pub(crate) fn boxed<B>(body: B) -> ResponseBody
399where
400    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
401    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
402{
403    use http_body_util::BodyExt;
404
405    body.map_err(box_err).boxed()
406}
407
408pub(crate) fn response<B>(
409    body: B,
410    deadline: Option<Pin<Box<Sleep>>>,
411    read_timeout: Option<Duration>,
412) -> ResponseBody
413where
414    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
415    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
416{
417    use http_body_util::BodyExt;
418
419    match (deadline, read_timeout) {
420        (Some(total), Some(read)) => {
421            let body = with_read_timeout(body, read).map_err(box_err);
422            total_timeout(body, total).map_err(box_err).boxed()
423        }
424        (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
425        (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
426        (None, None) => body.map_err(box_err).boxed(),
427    }
428}
429
430fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
431where
432    E: Into<Box<dyn std::error::Error + Send + Sync>>,
433{
434    err.into()
435}
436
437// ===== impl DataStream =====
438
439#[cfg(any(feature = "stream", feature = "multipart",))]
440impl<B> futures_core::Stream for DataStream<B>
441where
442    B: HttpBody<Data = Bytes> + Unpin,
443{
444    type Item = Result<Bytes, B::Error>;
445
446    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
447        loop {
448            return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
449                Some(Ok(frame)) => {
450                    // skip non-data frames
451                    if let Ok(buf) = frame.into_data() {
452                        Poll::Ready(Some(Ok(buf)))
453                    } else {
454                        continue;
455                    }
456                }
457                Some(Err(err)) => Poll::Ready(Some(Err(err))),
458                None => Poll::Ready(None),
459            };
460        }
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use http_body::Body as _;
467
468    use super::Body;
469
470    #[test]
471    fn test_as_bytes() {
472        let test_data = b"Test body";
473        let body = Body::from(&test_data[..]);
474        assert_eq!(body.as_bytes(), Some(&test_data[..]));
475    }
476
477    #[test]
478    fn body_exact_length() {
479        let empty_body = Body::empty();
480        assert!(empty_body.is_end_stream());
481        assert_eq!(empty_body.size_hint().exact(), Some(0));
482
483        let bytes_body = Body::reusable("abc".into());
484        assert!(!bytes_body.is_end_stream());
485        assert_eq!(bytes_body.size_hint().exact(), Some(3));
486
487        let stream_body = Body::wrap(bytes_body);
488        assert!(!stream_body.is_end_stream());
489        assert_eq!(stream_body.size_hint().exact(), None);
490    }
491}