async_std/stream/stream/
cycle.rs1use core::pin::Pin;
2
3use futures_core::ready;
4use pin_project_lite::pin_project;
5
6use crate::stream::Stream;
7use crate::task::{Context, Poll};
8
9pin_project! {
10 #[derive(Debug)]
12 pub struct Cycle<S> {
13 orig: S,
14 #[pin]
15 source: S,
16 }
17}
18
19impl<S> Cycle<S>
20where
21 S: Stream + Clone,
22{
23 pub(crate) fn new(source: S) -> Self {
24 Self {
25 orig: source.clone(),
26 source,
27 }
28 }
29}
30
31impl<S> Stream for Cycle<S>
32where
33 S: Stream + Clone,
34{
35 type Item = S::Item;
36
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let mut this = self.project();
39
40 match ready!(this.source.as_mut().poll_next(cx)) {
41 None => {
42 this.source.set(this.orig.clone());
43 this.source.poll_next(cx)
44 }
45 item => Poll::Ready(item),
46 }
47 }
48}