1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
12#[cfg(feature = "stream")]
13use tokio::fs::File;
14use tokio::time::Sleep;
15#[cfg(feature = "stream")]
16use tokio_util::io::ReaderStream;
17
18pub struct Body {
20 inner: Inner,
21}
22
23enum Inner {
24 Reusable(Bytes),
25 Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
26}
27
28pin_project! {
29 pub(crate) struct TotalTimeoutBody<B> {
34 #[pin]
35 inner: B,
36 timeout: Pin<Box<Sleep>>,
37 }
38}
39
40pin_project! {
41 pub(crate) struct ReadTimeoutBody<B> {
42 #[pin]
43 inner: B,
44 #[pin]
45 sleep: Option<Sleep>,
46 timeout: Duration,
47 }
48}
49
50#[cfg(any(feature = "stream", feature = "multipart",))]
52pub(crate) struct DataStream<B>(pub(crate) B);
53
54impl Body {
55 pub fn as_bytes(&self) -> Option<&[u8]> {
59 match &self.inner {
60 Inner::Reusable(bytes) => Some(bytes.as_ref()),
61 Inner::Streaming(..) => None,
62 }
63 }
64
65 #[cfg(feature = "stream")]
89 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
90 pub fn wrap_stream<S>(stream: S) -> Body
91 where
92 S: futures_core::stream::TryStream + Send + 'static,
93 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
94 Bytes: From<S::Ok>,
95 {
96 Body::stream(stream)
97 }
98
99 #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
100 pub(crate) fn stream<S>(stream: S) -> Body
101 where
102 S: futures_core::stream::TryStream + Send + 'static,
103 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
104 Bytes: From<S::Ok>,
105 {
106 use futures_util::TryStreamExt;
107 use http_body::Frame;
108 use http_body_util::StreamBody;
109
110 let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
111 stream
112 .map_ok(|d| Frame::data(Bytes::from(d)))
113 .map_err(Into::into),
114 )));
115 Body {
116 inner: Inner::Streaming(body),
117 }
118 }
119
120 pub(crate) fn empty() -> Body {
121 Body::reusable(Bytes::new())
122 }
123
124 pub(crate) fn reusable(chunk: Bytes) -> Body {
125 Body {
126 inner: Inner::Reusable(chunk),
127 }
128 }
129
130 pub fn wrap<B>(inner: B) -> Body
144 where
145 B: HttpBody + Send + Sync + 'static,
146 B::Data: Into<Bytes>,
147 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
148 {
149 use http_body_util::BodyExt;
150
151 let boxed = inner
152 .map_frame(|f| f.map_data(Into::into))
153 .map_err(Into::into)
154 .boxed();
155
156 Body {
157 inner: Inner::Streaming(boxed),
158 }
159 }
160
161 pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
162 let reuse = match self.inner {
163 Inner::Reusable(ref chunk) => Some(chunk.clone()),
164 Inner::Streaming { .. } => None,
165 };
166
167 (reuse, self)
168 }
169
170 pub(crate) fn try_clone(&self) -> Option<Body> {
171 match self.inner {
172 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
173 Inner::Streaming { .. } => None,
174 }
175 }
176
177 #[cfg(feature = "multipart")]
178 pub(crate) fn into_stream(self) -> DataStream<Body> {
179 DataStream(self)
180 }
181
182 #[cfg(feature = "multipart")]
183 pub(crate) fn content_length(&self) -> Option<u64> {
184 match self.inner {
185 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
186 Inner::Streaming(ref body) => body.size_hint().exact(),
187 }
188 }
189}
190
191impl Default for Body {
192 #[inline]
193 fn default() -> Body {
194 Body::empty()
195 }
196}
197
198impl From<Bytes> for Body {
212 #[inline]
213 fn from(bytes: Bytes) -> Body {
214 Body::reusable(bytes)
215 }
216}
217
218impl From<Vec<u8>> for Body {
219 #[inline]
220 fn from(vec: Vec<u8>) -> Body {
221 Body::reusable(vec.into())
222 }
223}
224
225impl From<&'static [u8]> for Body {
226 #[inline]
227 fn from(s: &'static [u8]) -> Body {
228 Body::reusable(Bytes::from_static(s))
229 }
230}
231
232impl From<String> for Body {
233 #[inline]
234 fn from(s: String) -> Body {
235 Body::reusable(s.into())
236 }
237}
238
239impl From<&'static str> for Body {
240 #[inline]
241 fn from(s: &'static str) -> Body {
242 s.as_bytes().into()
243 }
244}
245
246#[cfg(feature = "stream")]
247#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
248impl From<File> for Body {
249 #[inline]
250 fn from(file: File) -> Body {
251 Body::wrap_stream(ReaderStream::new(file))
252 }
253}
254
255impl fmt::Debug for Body {
256 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
257 f.debug_struct("Body").finish()
258 }
259}
260
261impl HttpBody for Body {
262 type Data = Bytes;
263 type Error = crate::Error;
264
265 fn poll_frame(
266 mut self: Pin<&mut Self>,
267 cx: &mut Context,
268 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
269 match self.inner {
270 Inner::Reusable(ref mut bytes) => {
271 let out = bytes.split_off(0);
272 if out.is_empty() {
273 Poll::Ready(None)
274 } else {
275 Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
276 }
277 }
278 Inner::Streaming(ref mut body) => Poll::Ready(
279 futures_core::ready!(Pin::new(body).poll_frame(cx))
280 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
281 ),
282 }
283 }
284
285 fn size_hint(&self) -> http_body::SizeHint {
286 match self.inner {
287 Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
288 Inner::Streaming(ref body) => body.size_hint(),
289 }
290 }
291
292 fn is_end_stream(&self) -> bool {
293 match self.inner {
294 Inner::Reusable(ref bytes) => bytes.is_empty(),
295 Inner::Streaming(ref body) => body.is_end_stream(),
296 }
297 }
298}
299
300pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
303 TotalTimeoutBody {
304 inner: body,
305 timeout,
306 }
307}
308
309pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
310 ReadTimeoutBody {
311 inner: body,
312 sleep: None,
313 timeout,
314 }
315}
316
317impl<B> hyper::body::Body for TotalTimeoutBody<B>
318where
319 B: hyper::body::Body,
320 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
321{
322 type Data = B::Data;
323 type Error = crate::Error;
324
325 fn poll_frame(
326 self: Pin<&mut Self>,
327 cx: &mut Context,
328 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
329 let this = self.project();
330 if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
331 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
332 }
333 Poll::Ready(
334 futures_core::ready!(this.inner.poll_frame(cx))
335 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
336 )
337 }
338
339 #[inline]
340 fn size_hint(&self) -> http_body::SizeHint {
341 self.inner.size_hint()
342 }
343
344 #[inline]
345 fn is_end_stream(&self) -> bool {
346 self.inner.is_end_stream()
347 }
348}
349
350impl<B> hyper::body::Body for ReadTimeoutBody<B>
351where
352 B: hyper::body::Body,
353 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
354{
355 type Data = B::Data;
356 type Error = crate::Error;
357
358 fn poll_frame(
359 self: Pin<&mut Self>,
360 cx: &mut Context,
361 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
362 let mut this = self.project();
363
364 let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
366 some
367 } else {
368 this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
369 this.sleep.as_mut().as_pin_mut().unwrap()
370 };
371
372 if let Poll::Ready(()) = sleep_pinned.poll(cx) {
374 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
375 }
376
377 let item = futures_core::ready!(this.inner.poll_frame(cx))
378 .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
379 this.sleep.set(None);
381 Poll::Ready(item)
382 }
383
384 #[inline]
385 fn size_hint(&self) -> http_body::SizeHint {
386 self.inner.size_hint()
387 }
388
389 #[inline]
390 fn is_end_stream(&self) -> bool {
391 self.inner.is_end_stream()
392 }
393}
394
395pub(crate) type ResponseBody =
396 http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
397
398pub(crate) fn boxed<B>(body: B) -> ResponseBody
399where
400 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
401 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
402{
403 use http_body_util::BodyExt;
404
405 body.map_err(box_err).boxed()
406}
407
408pub(crate) fn response<B>(
409 body: B,
410 deadline: Option<Pin<Box<Sleep>>>,
411 read_timeout: Option<Duration>,
412) -> ResponseBody
413where
414 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
415 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
416{
417 use http_body_util::BodyExt;
418
419 match (deadline, read_timeout) {
420 (Some(total), Some(read)) => {
421 let body = with_read_timeout(body, read).map_err(box_err);
422 total_timeout(body, total).map_err(box_err).boxed()
423 }
424 (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
425 (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
426 (None, None) => body.map_err(box_err).boxed(),
427 }
428}
429
430fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
431where
432 E: Into<Box<dyn std::error::Error + Send + Sync>>,
433{
434 err.into()
435}
436
437#[cfg(any(feature = "stream", feature = "multipart",))]
440impl<B> futures_core::Stream for DataStream<B>
441where
442 B: HttpBody<Data = Bytes> + Unpin,
443{
444 type Item = Result<Bytes, B::Error>;
445
446 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
447 loop {
448 return match futures_core::ready!(Pin::new(&mut self.0).poll_frame(cx)) {
449 Some(Ok(frame)) => {
450 if let Ok(buf) = frame.into_data() {
452 Poll::Ready(Some(Ok(buf)))
453 } else {
454 continue;
455 }
456 }
457 Some(Err(err)) => Poll::Ready(Some(Err(err))),
458 None => Poll::Ready(None),
459 };
460 }
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use http_body::Body as _;
467
468 use super::Body;
469
470 #[test]
471 fn test_as_bytes() {
472 let test_data = b"Test body";
473 let body = Body::from(&test_data[..]);
474 assert_eq!(body.as_bytes(), Some(&test_data[..]));
475 }
476
477 #[test]
478 fn body_exact_length() {
479 let empty_body = Body::empty();
480 assert!(empty_body.is_end_stream());
481 assert_eq!(empty_body.size_hint().exact(), Some(0));
482
483 let bytes_body = Body::reusable("abc".into());
484 assert!(!bytes_body.is_end_stream());
485 assert_eq!(bytes_body.size_hint().exact(), Some(3));
486
487 let stream_body = Body::wrap(bytes_body);
488 assert!(!stream_body.is_end_stream());
489 assert_eq!(stream_body.size_hint().exact(), None);
490 }
491}