async_std/stream/stream/
max.rs1use core::cmp::{Ord, Ordering};
2use core::future::Future;
3use core::pin::Pin;
4
5use pin_project_lite::pin_project;
6
7use crate::stream::Stream;
8use crate::task::{Context, Poll};
9
10pin_project! {
11 #[doc(hidden)]
12 #[allow(missing_debug_implementations)]
13 pub struct MaxFuture<S, T> {
14 #[pin]
15 stream: S,
16 max: Option<T>,
17 }
18}
19
20impl<S, T> MaxFuture<S, T> {
21 pub(super) fn new(stream: S) -> Self {
22 Self { stream, max: None }
23 }
24}
25
26impl<S> Future for MaxFuture<S, S::Item>
27where
28 S: Stream,
29 S::Item: Ord,
30{
31 type Output = Option<S::Item>;
32
33 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
34 let this = self.project();
35 let next = futures_core::ready!(this.stream.poll_next(cx));
36
37 match next {
38 Some(new) => {
39 cx.waker().wake_by_ref();
40 match this.max.take() {
41 None => *this.max = Some(new),
42
43 Some(old) => match new.cmp(&old) {
44 Ordering::Greater => *this.max = Some(new),
45 _ => *this.max = Some(old),
46 },
47 }
48 Poll::Pending
49 }
50 None => Poll::Ready(this.max.take()),
51 }
52 }
53}