async_std/stream/stream/
for_each.rs

1use core::pin::Pin;
2use core::future::Future;
3
4use pin_project_lite::pin_project;
5
6use crate::stream::Stream;
7use crate::task::{Context, Poll};
8
9pin_project! {
10    #[doc(hidden)]
11    #[allow(missing_debug_implementations)]
12    pub struct ForEachFuture<S, F> {
13        #[pin]
14        stream: S,
15        f: F,
16    }
17}
18
19impl<S, F> ForEachFuture<S, F> {
20    pub(super) fn new(stream: S, f: F) -> Self {
21        Self {
22            stream,
23            f,
24        }
25    }
26}
27
28impl<S, F> Future for ForEachFuture<S, F>
29where
30    S: Stream + Sized,
31    F: FnMut(S::Item),
32{
33    type Output = ();
34
35    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36        let mut this = self.project();
37        loop {
38            let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
39
40            match next {
41                Some(v) => (this.f)(v),
42                None => return Poll::Ready(()),
43            }
44        }
45    }
46}