async_std/stream/stream/
max.rs

1use core::cmp::{Ord, Ordering};
2use core::future::Future;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9
10pin_project! {
11    #[doc(hidden)]
12    #[allow(missing_debug_implementations)]
13    pub struct MaxFuture<S, T> {
14        #[pin]
15        stream: S,
16        max: Option<T>,
17    }
18}
19
20impl<S, T> MaxFuture<S, T> {
21    pub(super) fn new(stream: S) -> Self {
22        Self { stream, max: None }
23    }
24}
25
26impl<S> Future for MaxFuture<S, S::Item>
27where
28    S: Stream,
29    S::Item: Ord,
30{
31    type Output = Option<S::Item>;
32
33    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
34        let this = self.project();
35        let next = futures_core::ready!(this.stream.poll_next(cx));
36
37        match next {
38            Some(new) => {
39                cx.waker().wake_by_ref();
40                match this.max.take() {
41                    None => *this.max = Some(new),
42
43                    Some(old) => match new.cmp(&old) {
44                        Ordering::Greater => *this.max = Some(new),
45                        _ => *this.max = Some(old),
46                    },
47                }
48                Poll::Pending
49            }
50            None => Poll::Ready(this.max.take()),
51        }
52    }
53}