futures_lite/
stream.rs

1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(all(not(feature = "std"), feature = "alloc"))]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34#[cfg(feature = "race")]
35use fastrand::Rng;
36
37use pin_project_lite::pin_project;
38
39use crate::ready;
40
41/// Converts a stream into a blocking iterator.
42///
43/// # Examples
44///
45/// ```
46/// use futures_lite::{pin, stream};
47///
48/// let stream = stream::once(7);
49/// pin!(stream);
50///
51/// let mut iter = stream::block_on(stream);
52/// assert_eq!(iter.next(), Some(7));
53/// assert_eq!(iter.next(), None);
54/// ```
55#[cfg(feature = "std")]
56pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
57    BlockOn(stream)
58}
59
60/// Iterator for the [`block_on()`] function.
61#[derive(Debug)]
62pub struct BlockOn<S>(S);
63
64#[cfg(feature = "std")]
65impl<S: Stream + Unpin> Iterator for BlockOn<S> {
66    type Item = S::Item;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        crate::future::block_on(self.0.next())
70    }
71
72    fn size_hint(&self) -> (usize, Option<usize>) {
73        self.0.size_hint()
74    }
75
76    fn count(self) -> usize {
77        crate::future::block_on(self.0.count())
78    }
79
80    fn last(self) -> Option<Self::Item> {
81        crate::future::block_on(self.0.last())
82    }
83
84    fn nth(&mut self, n: usize) -> Option<Self::Item> {
85        crate::future::block_on(self.0.nth(n))
86    }
87
88    fn fold<B, F>(self, init: B, f: F) -> B
89    where
90        F: FnMut(B, Self::Item) -> B,
91    {
92        crate::future::block_on(self.0.fold(init, f))
93    }
94
95    fn for_each<F>(self, f: F) -> F::Output
96    where
97        F: FnMut(Self::Item),
98    {
99        crate::future::block_on(self.0.for_each(f))
100    }
101
102    fn all<F>(&mut self, f: F) -> bool
103    where
104        F: FnMut(Self::Item) -> bool,
105    {
106        crate::future::block_on(self.0.all(f))
107    }
108
109    fn any<F>(&mut self, f: F) -> bool
110    where
111        F: FnMut(Self::Item) -> bool,
112    {
113        crate::future::block_on(self.0.any(f))
114    }
115
116    fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
117    where
118        P: FnMut(&Self::Item) -> bool,
119    {
120        crate::future::block_on(self.0.find(predicate))
121    }
122
123    fn find_map<B, F>(&mut self, f: F) -> Option<B>
124    where
125        F: FnMut(Self::Item) -> Option<B>,
126    {
127        crate::future::block_on(self.0.find_map(f))
128    }
129
130    fn position<P>(&mut self, predicate: P) -> Option<usize>
131    where
132        P: FnMut(Self::Item) -> bool,
133    {
134        crate::future::block_on(self.0.position(predicate))
135    }
136}
137
138/// Creates an empty stream.
139///
140/// # Examples
141///
142/// ```
143/// use futures_lite::stream::{self, StreamExt};
144///
145/// # spin_on::spin_on(async {
146/// let mut s = stream::empty::<i32>();
147/// assert_eq!(s.next().await, None);
148/// # })
149/// ```
150pub fn empty<T>() -> Empty<T> {
151    Empty {
152        _marker: PhantomData,
153    }
154}
155
156/// Stream for the [`empty()`] function.
157#[derive(Clone, Debug)]
158#[must_use = "streams do nothing unless polled"]
159pub struct Empty<T> {
160    _marker: PhantomData<T>,
161}
162
163impl<T> Unpin for Empty<T> {}
164
165impl<T> Stream for Empty<T> {
166    type Item = T;
167
168    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169        Poll::Ready(None)
170    }
171
172    fn size_hint(&self) -> (usize, Option<usize>) {
173        (0, Some(0))
174    }
175}
176
177/// Creates a stream from an iterator.
178///
179/// # Examples
180///
181/// ```
182/// use futures_lite::stream::{self, StreamExt};
183///
184/// # spin_on::spin_on(async {
185/// let mut s = stream::iter(vec![1, 2]);
186///
187/// assert_eq!(s.next().await, Some(1));
188/// assert_eq!(s.next().await, Some(2));
189/// assert_eq!(s.next().await, None);
190/// # })
191/// ```
192pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
193    Iter {
194        iter: iter.into_iter(),
195    }
196}
197
198/// Stream for the [`iter()`] function.
199#[derive(Clone, Debug)]
200#[must_use = "streams do nothing unless polled"]
201pub struct Iter<I> {
202    iter: I,
203}
204
205impl<I> Unpin for Iter<I> {}
206
207impl<I: Iterator> Stream for Iter<I> {
208    type Item = I::Item;
209
210    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211        Poll::Ready(self.iter.next())
212    }
213
214    fn size_hint(&self) -> (usize, Option<usize>) {
215        self.iter.size_hint()
216    }
217}
218
219/// Creates a stream that yields a single item.
220///
221/// # Examples
222///
223/// ```
224/// use futures_lite::stream::{self, StreamExt};
225///
226/// # spin_on::spin_on(async {
227/// let mut s = stream::once(7);
228///
229/// assert_eq!(s.next().await, Some(7));
230/// assert_eq!(s.next().await, None);
231/// # })
232/// ```
233pub fn once<T>(t: T) -> Once<T> {
234    Once { value: Some(t) }
235}
236
237pin_project! {
238    /// Stream for the [`once()`] function.
239    #[derive(Clone, Debug)]
240    #[must_use = "streams do nothing unless polled"]
241    pub struct Once<T> {
242        value: Option<T>,
243    }
244}
245
246impl<T> Stream for Once<T> {
247    type Item = T;
248
249    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
250        Poll::Ready(self.project().value.take())
251    }
252
253    fn size_hint(&self) -> (usize, Option<usize>) {
254        if self.value.is_some() {
255            (1, Some(1))
256        } else {
257            (0, Some(0))
258        }
259    }
260}
261
262/// Creates a stream that is always pending.
263///
264/// # Examples
265///
266/// ```no_run
267/// use futures_lite::stream::{self, StreamExt};
268///
269/// # spin_on::spin_on(async {
270/// let mut s = stream::pending::<i32>();
271/// s.next().await;
272/// unreachable!();
273/// # })
274/// ```
275pub fn pending<T>() -> Pending<T> {
276    Pending {
277        _marker: PhantomData,
278    }
279}
280
281/// Stream for the [`pending()`] function.
282#[derive(Clone, Debug)]
283#[must_use = "streams do nothing unless polled"]
284pub struct Pending<T> {
285    _marker: PhantomData<T>,
286}
287
288impl<T> Unpin for Pending<T> {}
289
290impl<T> Stream for Pending<T> {
291    type Item = T;
292
293    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
294        Poll::Pending
295    }
296
297    fn size_hint(&self) -> (usize, Option<usize>) {
298        (0, Some(0))
299    }
300}
301
302/// Creates a stream from a function returning [`Poll`].
303///
304/// # Examples
305///
306/// ```
307/// use futures_lite::stream::{self, StreamExt};
308/// use std::task::{Context, Poll};
309///
310/// # spin_on::spin_on(async {
311/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
312///     Poll::Ready(Some(7))
313/// }
314///
315/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
316/// # })
317/// ```
318pub fn poll_fn<T, F>(f: F) -> PollFn<F>
319where
320    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
321{
322    PollFn { f }
323}
324
325/// Stream for the [`poll_fn()`] function.
326#[derive(Clone)]
327#[must_use = "streams do nothing unless polled"]
328pub struct PollFn<F> {
329    f: F,
330}
331
332impl<F> Unpin for PollFn<F> {}
333
334impl<F> fmt::Debug for PollFn<F> {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        f.debug_struct("PollFn").finish()
337    }
338}
339
340impl<T, F> Stream for PollFn<F>
341where
342    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
343{
344    type Item = T;
345
346    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
347        (&mut self.f)(cx)
348    }
349}
350
351/// Creates an infinite stream that yields the same item repeatedly.
352///
353/// # Examples
354///
355/// ```
356/// use futures_lite::stream::{self, StreamExt};
357///
358/// # spin_on::spin_on(async {
359/// let mut s = stream::repeat(7);
360///
361/// assert_eq!(s.next().await, Some(7));
362/// assert_eq!(s.next().await, Some(7));
363/// # })
364/// ```
365pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
366    Repeat { item }
367}
368
369/// Stream for the [`repeat()`] function.
370#[derive(Clone, Debug)]
371#[must_use = "streams do nothing unless polled"]
372pub struct Repeat<T> {
373    item: T,
374}
375
376impl<T> Unpin for Repeat<T> {}
377
378impl<T: Clone> Stream for Repeat<T> {
379    type Item = T;
380
381    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
382        Poll::Ready(Some(self.item.clone()))
383    }
384
385    fn size_hint(&self) -> (usize, Option<usize>) {
386        (usize::MAX, None)
387    }
388}
389
390/// Creates an infinite stream from a closure that generates items.
391///
392/// # Examples
393///
394/// ```
395/// use futures_lite::stream::{self, StreamExt};
396///
397/// # spin_on::spin_on(async {
398/// let mut s = stream::repeat_with(|| 7);
399///
400/// assert_eq!(s.next().await, Some(7));
401/// assert_eq!(s.next().await, Some(7));
402/// # })
403/// ```
404pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
405where
406    F: FnMut() -> T,
407{
408    RepeatWith { f: repeater }
409}
410
411/// Stream for the [`repeat_with()`] function.
412#[derive(Clone, Debug)]
413#[must_use = "streams do nothing unless polled"]
414pub struct RepeatWith<F> {
415    f: F,
416}
417
418impl<F> Unpin for RepeatWith<F> {}
419
420impl<T, F> Stream for RepeatWith<F>
421where
422    F: FnMut() -> T,
423{
424    type Item = T;
425
426    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427        let item = (&mut self.f)();
428        Poll::Ready(Some(item))
429    }
430
431    fn size_hint(&self) -> (usize, Option<usize>) {
432        (usize::MAX, None)
433    }
434}
435
436/// Creates a stream from a seed value and an async closure operating on it.
437///
438/// # Examples
439///
440/// ```
441/// use futures_lite::stream::{self, StreamExt};
442///
443/// # spin_on::spin_on(async {
444/// let s = stream::unfold(0, |mut n| async move {
445///     if n < 2 {
446///         let m = n + 1;
447///         Some((n, m))
448///     } else {
449///         None
450///     }
451/// });
452///
453/// let v: Vec<i32> = s.collect().await;
454/// assert_eq!(v, [0, 1]);
455/// # })
456/// ```
457pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
458where
459    F: FnMut(T) -> Fut,
460    Fut: Future<Output = Option<(Item, T)>>,
461{
462    Unfold {
463        f,
464        state: Some(seed),
465        fut: None,
466    }
467}
468
469pin_project! {
470    /// Stream for the [`unfold()`] function.
471    #[derive(Clone)]
472    #[must_use = "streams do nothing unless polled"]
473    pub struct Unfold<T, F, Fut> {
474        f: F,
475        state: Option<T>,
476        #[pin]
477        fut: Option<Fut>,
478    }
479}
480
481impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
482where
483    T: fmt::Debug,
484    Fut: fmt::Debug,
485{
486    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487        f.debug_struct("Unfold")
488            .field("state", &self.state)
489            .field("fut", &self.fut)
490            .finish()
491    }
492}
493
494impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
495where
496    F: FnMut(T) -> Fut,
497    Fut: Future<Output = Option<(Item, T)>>,
498{
499    type Item = Item;
500
501    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502        let mut this = self.project();
503
504        if let Some(state) = this.state.take() {
505            this.fut.set(Some((this.f)(state)));
506        }
507
508        let step = ready!(this
509            .fut
510            .as_mut()
511            .as_pin_mut()
512            .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
513            .poll(cx));
514        this.fut.set(None);
515
516        if let Some((item, next_state)) = step {
517            *this.state = Some(next_state);
518            Poll::Ready(Some(item))
519        } else {
520            Poll::Ready(None)
521        }
522    }
523}
524
525/// Creates a stream from a seed value and a fallible async closure operating on it.
526///
527/// # Examples
528///
529/// ```
530/// use futures_lite::stream::{self, StreamExt};
531///
532/// # spin_on::spin_on(async {
533/// let s = stream::try_unfold(0, |mut n| async move {
534///     if n < 2 {
535///         let m = n + 1;
536///         Ok(Some((n, m)))
537///     } else {
538///         std::io::Result::Ok(None)
539///     }
540/// });
541///
542/// let v: Vec<i32> = s.try_collect().await?;
543/// assert_eq!(v, [0, 1]);
544/// # std::io::Result::Ok(()) });
545/// ```
546pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
547where
548    F: FnMut(T) -> Fut,
549    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
550{
551    TryUnfold {
552        f,
553        state: Some(init),
554        fut: None,
555    }
556}
557
558pin_project! {
559    /// Stream for the [`try_unfold()`] function.
560    #[derive(Clone)]
561    #[must_use = "streams do nothing unless polled"]
562    pub struct TryUnfold<T, F, Fut> {
563        f: F,
564        state: Option<T>,
565        #[pin]
566        fut: Option<Fut>,
567    }
568}
569
570impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
571where
572    T: fmt::Debug,
573    Fut: fmt::Debug,
574{
575    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576        f.debug_struct("TryUnfold")
577            .field("state", &self.state)
578            .field("fut", &self.fut)
579            .finish()
580    }
581}
582
583impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
584where
585    F: FnMut(T) -> Fut,
586    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
587{
588    type Item = Result<Item, E>;
589
590    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591        let mut this = self.project();
592
593        if let Some(state) = this.state.take() {
594            this.fut.set(Some((this.f)(state)));
595        }
596
597        match this.fut.as_mut().as_pin_mut() {
598            None => {
599                // The future previously errored
600                Poll::Ready(None)
601            }
602            Some(future) => {
603                let step = ready!(future.poll(cx));
604                this.fut.set(None);
605
606                match step {
607                    Ok(Some((item, next_state))) => {
608                        *this.state = Some(next_state);
609                        Poll::Ready(Some(Ok(item)))
610                    }
611                    Ok(None) => Poll::Ready(None),
612                    Err(e) => Poll::Ready(Some(Err(e))),
613                }
614            }
615        }
616    }
617}
618
619/// Creates a stream that invokes the given future as its first item, and then
620/// produces no more items.
621///
622/// # Example
623///
624/// ```
625/// use futures_lite::{stream, prelude::*};
626///
627/// # spin_on::spin_on(async {
628/// let mut stream = Box::pin(stream::once_future(async { 1 }));
629/// assert_eq!(stream.next().await, Some(1));
630/// assert_eq!(stream.next().await, None);
631/// # });
632/// ```
633pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
634    OnceFuture {
635        future: Some(future),
636    }
637}
638
639pin_project! {
640    /// Stream for the [`once_future()`] method.
641    #[derive(Debug)]
642    #[must_use = "futures do nothing unless you `.await` or poll them"]
643    pub struct OnceFuture<F> {
644        #[pin]
645        future: Option<F>,
646    }
647}
648
649impl<F: Future> Stream for OnceFuture<F> {
650    type Item = F::Output;
651
652    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
653        let mut this = self.project();
654
655        match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
656            Some(Poll::Ready(t)) => {
657                this.future.set(None);
658                Poll::Ready(Some(t))
659            }
660            Some(Poll::Pending) => Poll::Pending,
661            None => Poll::Ready(None),
662        }
663    }
664}
665
666/// Take elements from this stream until the provided future resolves.
667///
668/// This function will take elements from the stream until the provided
669/// stopping future `fut` resolves. Once the `fut` future becomes ready,
670/// this stream combinator will always return that the stream is done.
671///
672/// The stopping future may return any type. Once the stream is stopped
673/// the result of the stopping future may be accessed with `StopAfterFuture::take_result()`.
674/// The stream may also be resumed with `StopAfterFuture::take_future()`.
675/// See the documentation of [`StopAfterFuture`] for more information.
676///
677/// ```
678/// use futures_lite::stream::{self, StreamExt, stop_after_future};
679/// use futures_lite::future;
680/// use std::task::Poll;
681///
682/// let stream = stream::iter(1..=10);
683///
684/// # spin_on::spin_on(async {
685/// let mut i = 0;
686/// let stop_fut = future::poll_fn(|_cx| {
687///     i += 1;
688///     if i <= 5 {
689///         Poll::Pending
690///     } else {
691///         Poll::Ready(())
692///     }
693/// });
694///
695/// let stream = stop_after_future(stream, stop_fut);
696///
697/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
698/// # });
699pub fn stop_after_future<S, F>(stream: S, future: F) -> StopAfterFuture<S, F>
700where
701    S: Sized + Stream,
702    F: Future,
703{
704    StopAfterFuture {
705        stream,
706        fut: Some(future),
707        fut_result: None,
708        free: false,
709    }
710}
711
712pin_project! {
713    /// Stream for the [`StreamExt::stop_after_future()`] method.
714    #[derive(Clone, Debug)]
715    #[must_use = "streams do nothing unless polled"]
716    pub struct StopAfterFuture<S: Stream, Fut: Future> {
717        #[pin]
718        stream: S,
719        // Contains the inner Future on start and None once the inner Future is resolved
720        // or taken out by the user.
721        #[pin]
722        fut: Option<Fut>,
723        // Contains fut's return value once fut is resolved
724        fut_result: Option<Fut::Output>,
725        // Whether the future was taken out by the user.
726        free: bool,
727    }
728}
729
730impl<St, Fut> StopAfterFuture<St, Fut>
731where
732    St: Stream,
733    Fut: Future,
734{
735    /// Extract the stopping future out of the combinator.
736    ///
737    /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
738    /// Taking out the future means the combinator will be yielding
739    /// elements from the wrapped stream without ever stopping it.
740    pub fn take_future(&mut self) -> Option<Fut> {
741        if self.fut.is_some() {
742            self.free = true;
743        }
744
745        self.fut.take()
746    }
747
748    /// Once the stopping future is resolved, this method can be used
749    /// to extract the value returned by the stopping future.
750    ///
751    /// This may be used to retrieve arbitrary data from the stopping
752    /// future, for example a reason why the stream was stopped.
753    ///
754    /// This method will return `None` if the future isn't resolved yet,
755    /// or if the result was already taken out.
756    ///
757    /// # Examples
758    ///
759    /// ```
760    /// # spin_on::spin_on(async {
761    /// use futures_lite::stream::{self, StreamExt, stop_after_future};
762    /// use futures_lite::future;
763    /// use std::task::Poll;
764    ///
765    /// let stream = stream::iter(1..=10);
766    ///
767    /// let mut i = 0;
768    /// let stop_fut = future::poll_fn(|_cx| {
769    ///     i += 1;
770    ///     if i <= 5 {
771    ///         Poll::Pending
772    ///     } else {
773    ///         Poll::Ready("reason")
774    ///     }
775    /// });
776    ///
777    /// let mut stream = stop_after_future(stream, stop_fut);
778    /// let _ = (&mut stream).collect::<Vec<_>>().await;
779    ///
780    /// let result = stream.take_result().unwrap();
781    /// assert_eq!(result, "reason");
782    /// # });
783    /// ```
784    pub fn take_result(&mut self) -> Option<Fut::Output> {
785        self.fut_result.take()
786    }
787
788    /// Whether the stream was stopped yet by the stopping future
789    /// being resolved.
790    pub fn is_stopped(&self) -> bool {
791        !self.free && self.fut.is_none()
792    }
793}
794
795impl<St, Fut> Stream for StopAfterFuture<St, Fut>
796where
797    St: Stream,
798    Fut: Future,
799{
800    type Item = St::Item;
801
802    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
803        let mut this = self.project();
804
805        if let Some(f) = this.fut.as_mut().as_pin_mut() {
806            if let Poll::Ready(result) = f.poll(cx) {
807                this.fut.set(None);
808                *this.fut_result = Some(result);
809            }
810        }
811
812        if !*this.free && this.fut.is_none() {
813            // Future resolved, inner stream stopped
814            Poll::Ready(None)
815        } else {
816            // Future either not resolved yet or taken out by the user
817            let item = ready!(this.stream.poll_next(cx));
818            if item.is_none() {
819                this.fut.set(None);
820            }
821            Poll::Ready(item)
822        }
823    }
824
825    fn size_hint(&self) -> (usize, Option<usize>) {
826        if self.is_stopped() {
827            return (0, Some(0));
828        }
829
830        // Original stream can be truncated at any moment, so the lower bound isn't reliable.
831        let (_, upper_bound) = self.stream.size_hint();
832        (0, upper_bound)
833    }
834}
835
836/// Extension trait for [`Stream`].
837pub trait StreamExt: Stream {
838    /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
839    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
840    where
841        Self: Unpin,
842    {
843        Stream::poll_next(Pin::new(self), cx)
844    }
845
846    /// Retrieves the next item in the stream.
847    ///
848    /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
849    /// resume iteration after that.
850    ///
851    /// # Examples
852    ///
853    /// ```
854    /// use futures_lite::stream::{self, StreamExt};
855    ///
856    /// # spin_on::spin_on(async {
857    /// let mut s = stream::iter(1..=3);
858    ///
859    /// assert_eq!(s.next().await, Some(1));
860    /// assert_eq!(s.next().await, Some(2));
861    /// assert_eq!(s.next().await, Some(3));
862    /// assert_eq!(s.next().await, None);
863    /// # });
864    /// ```
865    fn next(&mut self) -> NextFuture<'_, Self>
866    where
867        Self: Unpin,
868    {
869        NextFuture { stream: self }
870    }
871
872    /// Retrieves the next item in the stream.
873    ///
874    /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
875    /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
876    ///
877    /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
878    ///
879    /// # Examples
880    ///
881    /// ```
882    /// use futures_lite::stream::{self, StreamExt};
883    ///
884    /// # spin_on::spin_on(async {
885    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
886    ///
887    /// assert_eq!(s.try_next().await, Ok(Some(1)));
888    /// assert_eq!(s.try_next().await, Ok(Some(2)));
889    /// assert_eq!(s.try_next().await, Err("error"));
890    /// assert_eq!(s.try_next().await, Ok(None));
891    /// # });
892    /// ```
893    fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
894    where
895        Self: Stream<Item = Result<T, E>> + Unpin,
896    {
897        TryNextFuture { stream: self }
898    }
899
900    /// Counts the number of items in the stream.
901    ///
902    /// # Examples
903    ///
904    /// ```
905    /// use futures_lite::stream::{self, StreamExt};
906    ///
907    /// # spin_on::spin_on(async {
908    /// let s1 = stream::iter(vec![0]);
909    /// let s2 = stream::iter(vec![1, 2, 3]);
910    ///
911    /// assert_eq!(s1.count().await, 1);
912    /// assert_eq!(s2.count().await, 3);
913    /// # });
914    /// ```
915    fn count(self) -> CountFuture<Self>
916    where
917        Self: Sized,
918    {
919        CountFuture {
920            stream: self,
921            count: 0,
922        }
923    }
924
925    /// Maps items of the stream to new values using a closure.
926    ///
927    /// # Examples
928    ///
929    /// ```
930    /// use futures_lite::stream::{self, StreamExt};
931    ///
932    /// # spin_on::spin_on(async {
933    /// let s = stream::iter(vec![1, 2, 3]);
934    /// let mut s = s.map(|x| 2 * x);
935    ///
936    /// assert_eq!(s.next().await, Some(2));
937    /// assert_eq!(s.next().await, Some(4));
938    /// assert_eq!(s.next().await, Some(6));
939    /// assert_eq!(s.next().await, None);
940    /// # });
941    /// ```
942    fn map<T, F>(self, f: F) -> Map<Self, F>
943    where
944        Self: Sized,
945        F: FnMut(Self::Item) -> T,
946    {
947        Map { stream: self, f }
948    }
949
950    /// Maps items to streams and then concatenates them.
951    ///
952    /// # Examples
953    ///
954    /// ```
955    /// use futures_lite::stream::{self, StreamExt};
956    ///
957    /// # spin_on::spin_on(async {
958    /// let words = stream::iter(vec!["one", "two"]);
959    ///
960    /// let s: String = words
961    ///     .flat_map(|s| stream::iter(s.chars()))
962    ///     .collect()
963    ///     .await;
964    ///
965    /// assert_eq!(s, "onetwo");
966    /// # });
967    /// ```
968    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
969    where
970        Self: Sized,
971        U: Stream,
972        F: FnMut(Self::Item) -> U,
973    {
974        FlatMap {
975            stream: self.map(f),
976            inner_stream: None,
977        }
978    }
979
980    /// Concatenates inner streams.
981    ///
982    /// # Examples
983    ///
984    /// ```
985    /// use futures_lite::stream::{self, StreamExt};
986    ///
987    /// # spin_on::spin_on(async {
988    /// let s1 = stream::iter(vec![1, 2, 3]);
989    /// let s2 = stream::iter(vec![4, 5]);
990    ///
991    /// let s = stream::iter(vec![s1, s2]);
992    /// let v: Vec<_> = s.flatten().collect().await;
993    /// assert_eq!(v, [1, 2, 3, 4, 5]);
994    /// # });
995    /// ```
996    fn flatten(self) -> Flatten<Self>
997    where
998        Self: Sized,
999        Self::Item: Stream,
1000    {
1001        Flatten {
1002            stream: self,
1003            inner_stream: None,
1004        }
1005    }
1006
1007    /// Maps items of the stream to new values using an async closure.
1008    ///
1009    /// # Examples
1010    ///
1011    /// ```
1012    /// use futures_lite::pin;
1013    /// use futures_lite::stream::{self, StreamExt};
1014    ///
1015    /// # spin_on::spin_on(async {
1016    /// let s = stream::iter(vec![1, 2, 3]);
1017    /// let mut s = s.then(|x| async move { 2 * x });
1018    ///
1019    /// pin!(s);
1020    /// assert_eq!(s.next().await, Some(2));
1021    /// assert_eq!(s.next().await, Some(4));
1022    /// assert_eq!(s.next().await, Some(6));
1023    /// assert_eq!(s.next().await, None);
1024    /// # });
1025    /// ```
1026    fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
1027    where
1028        Self: Sized,
1029        F: FnMut(Self::Item) -> Fut,
1030        Fut: Future,
1031    {
1032        Then {
1033            stream: self,
1034            future: None,
1035            f,
1036        }
1037    }
1038
1039    /// Keeps items of the stream for which `predicate` returns `true`.
1040    ///
1041    /// # Examples
1042    ///
1043    /// ```
1044    /// use futures_lite::stream::{self, StreamExt};
1045    ///
1046    /// # spin_on::spin_on(async {
1047    /// let s = stream::iter(vec![1, 2, 3, 4]);
1048    /// let mut s = s.filter(|i| i % 2 == 0);
1049    ///
1050    /// assert_eq!(s.next().await, Some(2));
1051    /// assert_eq!(s.next().await, Some(4));
1052    /// assert_eq!(s.next().await, None);
1053    /// # });
1054    /// ```
1055    fn filter<P>(self, predicate: P) -> Filter<Self, P>
1056    where
1057        Self: Sized,
1058        P: FnMut(&Self::Item) -> bool,
1059    {
1060        Filter {
1061            stream: self,
1062            predicate,
1063        }
1064    }
1065
1066    /// Filters and maps items of the stream using a closure.
1067    ///
1068    /// # Examples
1069    ///
1070    /// ```
1071    /// use futures_lite::stream::{self, StreamExt};
1072    ///
1073    /// # spin_on::spin_on(async {
1074    /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
1075    /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
1076    ///
1077    /// assert_eq!(s.next().await, Some(1));
1078    /// assert_eq!(s.next().await, Some(3));
1079    /// assert_eq!(s.next().await, Some(5));
1080    /// assert_eq!(s.next().await, None);
1081    /// # });
1082    /// ```
1083    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
1084    where
1085        Self: Sized,
1086        F: FnMut(Self::Item) -> Option<T>,
1087    {
1088        FilterMap { stream: self, f }
1089    }
1090
1091    /// Takes only the first `n` items of the stream.
1092    ///
1093    /// # Examples
1094    ///
1095    /// ```
1096    /// use futures_lite::stream::{self, StreamExt};
1097    ///
1098    /// # spin_on::spin_on(async {
1099    /// let mut s = stream::repeat(7).take(2);
1100    ///
1101    /// assert_eq!(s.next().await, Some(7));
1102    /// assert_eq!(s.next().await, Some(7));
1103    /// assert_eq!(s.next().await, None);
1104    /// # });
1105    /// ```
1106    fn take(self, n: usize) -> Take<Self>
1107    where
1108        Self: Sized,
1109    {
1110        Take { stream: self, n }
1111    }
1112
1113    /// Takes items while `predicate` returns `true`.
1114    ///
1115    /// # Examples
1116    ///
1117    /// ```
1118    /// use futures_lite::stream::{self, StreamExt};
1119    ///
1120    /// # spin_on::spin_on(async {
1121    /// let s = stream::iter(vec![1, 2, 3, 4]);
1122    /// let mut s = s.take_while(|x| *x < 3);
1123    ///
1124    /// assert_eq!(s.next().await, Some(1));
1125    /// assert_eq!(s.next().await, Some(2));
1126    /// assert_eq!(s.next().await, None);
1127    /// # });
1128    /// ```
1129    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1130    where
1131        Self: Sized,
1132        P: FnMut(&Self::Item) -> bool,
1133    {
1134        TakeWhile {
1135            stream: self,
1136            predicate,
1137        }
1138    }
1139
1140    /// Skips the first `n` items of the stream.
1141    ///
1142    /// # Examples
1143    ///
1144    /// ```
1145    /// use futures_lite::stream::{self, StreamExt};
1146    ///
1147    /// # spin_on::spin_on(async {
1148    /// let s = stream::iter(vec![1, 2, 3]);
1149    /// let mut s = s.skip(2);
1150    ///
1151    /// assert_eq!(s.next().await, Some(3));
1152    /// assert_eq!(s.next().await, None);
1153    /// # });
1154    /// ```
1155    fn skip(self, n: usize) -> Skip<Self>
1156    where
1157        Self: Sized,
1158    {
1159        Skip { stream: self, n }
1160    }
1161
1162    /// Skips items while `predicate` returns `true`.
1163    ///
1164    /// # Examples
1165    ///
1166    /// ```
1167    /// use futures_lite::stream::{self, StreamExt};
1168    ///
1169    /// # spin_on::spin_on(async {
1170    /// let s = stream::iter(vec![-1i32, 0, 1]);
1171    /// let mut s = s.skip_while(|x| x.is_negative());
1172    ///
1173    /// assert_eq!(s.next().await, Some(0));
1174    /// assert_eq!(s.next().await, Some(1));
1175    /// assert_eq!(s.next().await, None);
1176    /// # });
1177    /// ```
1178    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1179    where
1180        Self: Sized,
1181        P: FnMut(&Self::Item) -> bool,
1182    {
1183        SkipWhile {
1184            stream: self,
1185            predicate: Some(predicate),
1186        }
1187    }
1188
1189    /// Yields every `step`th item.
1190    ///
1191    /// # Panics
1192    ///
1193    /// This method will panic if the `step` is 0.
1194    ///
1195    /// # Examples
1196    ///
1197    /// ```
1198    /// use futures_lite::stream::{self, StreamExt};
1199    ///
1200    /// # spin_on::spin_on(async {
1201    /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1202    /// let mut s = s.step_by(2);
1203    ///
1204    /// assert_eq!(s.next().await, Some(0));
1205    /// assert_eq!(s.next().await, Some(2));
1206    /// assert_eq!(s.next().await, Some(4));
1207    /// assert_eq!(s.next().await, None);
1208    /// # });
1209    /// ```
1210    fn step_by(self, step: usize) -> StepBy<Self>
1211    where
1212        Self: Sized,
1213    {
1214        assert!(step > 0, "`step` must be greater than zero");
1215        StepBy {
1216            stream: self,
1217            step,
1218            i: 0,
1219        }
1220    }
1221
1222    /// Appends another stream to the end of this one.
1223    ///
1224    /// # Examples
1225    ///
1226    /// ```
1227    /// use futures_lite::stream::{self, StreamExt};
1228    ///
1229    /// # spin_on::spin_on(async {
1230    /// let s1 = stream::iter(vec![1, 2]);
1231    /// let s2 = stream::iter(vec![7, 8]);
1232    /// let mut s = s1.chain(s2);
1233    ///
1234    /// assert_eq!(s.next().await, Some(1));
1235    /// assert_eq!(s.next().await, Some(2));
1236    /// assert_eq!(s.next().await, Some(7));
1237    /// assert_eq!(s.next().await, Some(8));
1238    /// assert_eq!(s.next().await, None);
1239    /// # });
1240    /// ```
1241    fn chain<U>(self, other: U) -> Chain<Self, U>
1242    where
1243        Self: Sized,
1244        U: Stream<Item = Self::Item> + Sized,
1245    {
1246        Chain {
1247            first: self.fuse(),
1248            second: other.fuse(),
1249        }
1250    }
1251
1252    /// Clones all items.
1253    ///
1254    /// # Examples
1255    ///
1256    /// ```
1257    /// use futures_lite::stream::{self, StreamExt};
1258    ///
1259    /// # spin_on::spin_on(async {
1260    /// let s = stream::iter(vec![&1, &2]);
1261    /// let mut s = s.cloned();
1262    ///
1263    /// assert_eq!(s.next().await, Some(1));
1264    /// assert_eq!(s.next().await, Some(2));
1265    /// assert_eq!(s.next().await, None);
1266    /// # });
1267    /// ```
1268    fn cloned<'a, T>(self) -> Cloned<Self>
1269    where
1270        Self: Stream<Item = &'a T> + Sized,
1271        T: Clone + 'a,
1272    {
1273        Cloned { stream: self }
1274    }
1275
1276    /// Copies all items.
1277    ///
1278    /// # Examples
1279    ///
1280    /// ```
1281    /// use futures_lite::stream::{self, StreamExt};
1282    ///
1283    /// # spin_on::spin_on(async {
1284    /// let s = stream::iter(vec![&1, &2]);
1285    /// let mut s = s.copied();
1286    ///
1287    /// assert_eq!(s.next().await, Some(1));
1288    /// assert_eq!(s.next().await, Some(2));
1289    /// assert_eq!(s.next().await, None);
1290    /// # });
1291    /// ```
1292    fn copied<'a, T>(self) -> Copied<Self>
1293    where
1294        Self: Stream<Item = &'a T> + Sized,
1295        T: Copy + 'a,
1296    {
1297        Copied { stream: self }
1298    }
1299
1300    /// Collects all items in the stream into a collection.
1301    ///
1302    /// # Examples
1303    ///
1304    /// ```
1305    /// use futures_lite::stream::{self, StreamExt};
1306    ///
1307    /// # spin_on::spin_on(async {
1308    /// let mut s = stream::iter(1..=3);
1309    ///
1310    /// let items: Vec<_> = s.collect().await;
1311    /// assert_eq!(items, [1, 2, 3]);
1312    /// # });
1313    /// ```
1314    fn collect<C>(self) -> CollectFuture<Self, C>
1315    where
1316        Self: Sized,
1317        C: Default + Extend<Self::Item>,
1318    {
1319        CollectFuture {
1320            stream: self,
1321            collection: Default::default(),
1322        }
1323    }
1324
1325    /// Collects all items in the fallible stream into a collection.
1326    ///
1327    /// ```
1328    /// use futures_lite::stream::{self, StreamExt};
1329    ///
1330    /// # spin_on::spin_on(async {
1331    /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1332    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1333    /// assert_eq!(res, Err(2));
1334    ///
1335    /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1336    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1337    /// assert_eq!(res, Ok(vec![1, 2, 3]));
1338    /// # })
1339    /// ```
1340    fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1341    where
1342        Self: Stream<Item = Result<T, E>> + Sized,
1343        C: Default + Extend<T>,
1344    {
1345        TryCollectFuture {
1346            stream: self,
1347            items: Default::default(),
1348        }
1349    }
1350
1351    /// Partitions items into those for which `predicate` is `true` and those for which it is
1352    /// `false`, and then collects them into two collections.
1353    ///
1354    /// # Examples
1355    ///
1356    /// ```
1357    /// use futures_lite::stream::{self, StreamExt};
1358    ///
1359    /// # spin_on::spin_on(async {
1360    /// let s = stream::iter(vec![1, 2, 3]);
1361    /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1362    ///
1363    /// assert_eq!(even, &[2]);
1364    /// assert_eq!(odd, &[1, 3]);
1365    /// # })
1366    /// ```
1367    fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1368    where
1369        Self: Sized,
1370        B: Default + Extend<Self::Item>,
1371        P: FnMut(&Self::Item) -> bool,
1372    {
1373        PartitionFuture {
1374            stream: self,
1375            predicate,
1376            res: Some(Default::default()),
1377        }
1378    }
1379
1380    /// Accumulates a computation over the stream.
1381    ///
1382    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1383    /// the accumulator and each item in the stream. The final accumulator value is returned.
1384    ///
1385    /// # Examples
1386    ///
1387    /// ```
1388    /// use futures_lite::stream::{self, StreamExt};
1389    ///
1390    /// # spin_on::spin_on(async {
1391    /// let s = stream::iter(vec![1, 2, 3]);
1392    /// let sum = s.fold(0, |acc, x| acc + x).await;
1393    ///
1394    /// assert_eq!(sum, 6);
1395    /// # })
1396    /// ```
1397    fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1398    where
1399        Self: Sized,
1400        F: FnMut(T, Self::Item) -> T,
1401    {
1402        FoldFuture {
1403            stream: self,
1404            f,
1405            acc: Some(init),
1406        }
1407    }
1408
1409    /// Accumulates a fallible computation over the stream.
1410    ///
1411    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1412    /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1413    /// error if `f` failed the computation.
1414    ///
1415    /// # Examples
1416    ///
1417    /// ```
1418    /// use futures_lite::stream::{self, StreamExt};
1419    ///
1420    /// # spin_on::spin_on(async {
1421    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1422    ///
1423    /// let sum = s.try_fold(0, |acc, v| {
1424    ///     if (acc + v) % 2 == 1 {
1425    ///         Ok(acc + v)
1426    ///     } else {
1427    ///         Err("fail")
1428    ///     }
1429    /// })
1430    /// .await;
1431    ///
1432    /// assert_eq!(sum, Err("fail"));
1433    /// # })
1434    /// ```
1435    fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1436    where
1437        Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1438        F: FnMut(B, T) -> Result<B, E>,
1439    {
1440        TryFoldFuture {
1441            stream: self,
1442            f,
1443            acc: Some(init),
1444        }
1445    }
1446
1447    /// Maps items of the stream to new values using a state value and a closure.
1448    ///
1449    /// Scanning begins with the initial state set to `initial_state`, and then applies `f` to the
1450    /// state and each item in the stream. The stream stops when `f` returns `None`.
1451    ///
1452    /// # Examples
1453    ///
1454    /// ```
1455    /// use futures_lite::stream::{self, StreamExt};
1456    ///
1457    /// # spin_on::spin_on(async {
1458    /// let s = stream::iter(vec![1, 2, 3]);
1459    /// let mut s = s.scan(1, |state, x| {
1460    ///     *state = *state * x;
1461    ///     Some(-*state)
1462    /// });
1463    ///
1464    /// assert_eq!(s.next().await, Some(-1));
1465    /// assert_eq!(s.next().await, Some(-2));
1466    /// assert_eq!(s.next().await, Some(-6));
1467    /// assert_eq!(s.next().await, None);
1468    /// # })
1469    /// ```
1470    fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1471    where
1472        Self: Sized,
1473        F: FnMut(&mut St, Self::Item) -> Option<B>,
1474    {
1475        Scan {
1476            stream: self,
1477            state_f: (initial_state, f),
1478        }
1479    }
1480
1481    /// Fuses the stream so that it stops yielding items after the first [`None`].
1482    ///
1483    /// # Examples
1484    ///
1485    /// ```
1486    /// use futures_lite::stream::{self, StreamExt};
1487    ///
1488    /// # spin_on::spin_on(async {
1489    /// let mut s = stream::once(1).fuse();
1490    ///
1491    /// assert_eq!(s.next().await, Some(1));
1492    /// assert_eq!(s.next().await, None);
1493    /// assert_eq!(s.next().await, None);
1494    /// # })
1495    /// ```
1496    fn fuse(self) -> Fuse<Self>
1497    where
1498        Self: Sized,
1499    {
1500        Fuse {
1501            stream: self,
1502            done: false,
1503        }
1504    }
1505
1506    /// Repeats the stream from beginning to end, forever.
1507    ///
1508    /// # Examples
1509    ///
1510    /// ```
1511    /// use futures_lite::stream::{self, StreamExt};
1512    ///
1513    /// # spin_on::spin_on(async {
1514    /// let mut s = stream::iter(vec![1, 2]).cycle();
1515    ///
1516    /// assert_eq!(s.next().await, Some(1));
1517    /// assert_eq!(s.next().await, Some(2));
1518    /// assert_eq!(s.next().await, Some(1));
1519    /// assert_eq!(s.next().await, Some(2));
1520    /// # });
1521    /// ```
1522    fn cycle(self) -> Cycle<Self>
1523    where
1524        Self: Clone + Sized,
1525    {
1526        Cycle {
1527            orig: self.clone(),
1528            stream: self,
1529        }
1530    }
1531
1532    /// Enumerates items, mapping them to `(index, item)`.
1533    ///
1534    /// # Examples
1535    ///
1536    /// ```
1537    /// use futures_lite::stream::{self, StreamExt};
1538    ///
1539    /// # spin_on::spin_on(async {
1540    /// let s = stream::iter(vec!['a', 'b', 'c']);
1541    /// let mut s = s.enumerate();
1542    ///
1543    /// assert_eq!(s.next().await, Some((0, 'a')));
1544    /// assert_eq!(s.next().await, Some((1, 'b')));
1545    /// assert_eq!(s.next().await, Some((2, 'c')));
1546    /// assert_eq!(s.next().await, None);
1547    /// # });
1548    /// ```
1549    fn enumerate(self) -> Enumerate<Self>
1550    where
1551        Self: Sized,
1552    {
1553        Enumerate { stream: self, i: 0 }
1554    }
1555
1556    /// Calls a closure on each item and passes it on.
1557    ///
1558    /// # Examples
1559    ///
1560    /// ```
1561    /// use futures_lite::stream::{self, StreamExt};
1562    ///
1563    /// # spin_on::spin_on(async {
1564    /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1565    ///
1566    /// let sum = s
1567    ///    .inspect(|x| println!("about to filter {}", x))
1568    ///    .filter(|x| x % 2 == 0)
1569    ///    .inspect(|x| println!("made it through filter: {}", x))
1570    ///    .fold(0, |sum, i| sum + i)
1571    ///    .await;
1572    /// # });
1573    /// ```
1574    fn inspect<F>(self, f: F) -> Inspect<Self, F>
1575    where
1576        Self: Sized,
1577        F: FnMut(&Self::Item),
1578    {
1579        Inspect { stream: self, f }
1580    }
1581
1582    /// Gets the `n`th item of the stream.
1583    ///
1584    /// In the end, `n+1` items of the stream will be consumed.
1585    ///
1586    /// # Examples
1587    ///
1588    /// ```
1589    /// use futures_lite::stream::{self, StreamExt};
1590    ///
1591    /// # spin_on::spin_on(async {
1592    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1593    ///
1594    /// assert_eq!(s.nth(2).await, Some(2));
1595    /// assert_eq!(s.nth(2).await, Some(5));
1596    /// assert_eq!(s.nth(2).await, None);
1597    /// # });
1598    /// ```
1599    fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1600    where
1601        Self: Unpin,
1602    {
1603        NthFuture { stream: self, n }
1604    }
1605
1606    /// Returns the last item in the stream.
1607    ///
1608    /// # Examples
1609    ///
1610    /// ```
1611    /// use futures_lite::stream::{self, StreamExt};
1612    ///
1613    /// # spin_on::spin_on(async {
1614    /// let s = stream::iter(vec![1, 2, 3, 4]);
1615    /// assert_eq!(s.last().await, Some(4));
1616    ///
1617    /// let s = stream::empty::<i32>();
1618    /// assert_eq!(s.last().await, None);
1619    /// # });
1620    /// ```
1621    fn last(self) -> LastFuture<Self>
1622    where
1623        Self: Sized,
1624    {
1625        LastFuture {
1626            stream: self,
1627            last: None,
1628        }
1629    }
1630
1631    /// Finds the first item of the stream for which `predicate` returns `true`.
1632    ///
1633    /// # Examples
1634    ///
1635    /// ```
1636    /// use futures_lite::stream::{self, StreamExt};
1637    ///
1638    /// # spin_on::spin_on(async {
1639    /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1640    ///
1641    /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1642    /// assert_eq!(s.next().await, Some(13));
1643    /// # });
1644    /// ```
1645    fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1646    where
1647        Self: Unpin,
1648        P: FnMut(&Self::Item) -> bool,
1649    {
1650        FindFuture {
1651            stream: self,
1652            predicate,
1653        }
1654    }
1655
1656    /// Applies a closure to items in the stream and returns the first [`Some`] result.
1657    ///
1658    /// # Examples
1659    ///
1660    /// ```
1661    /// use futures_lite::stream::{self, StreamExt};
1662    ///
1663    /// # spin_on::spin_on(async {
1664    /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1665    /// let number = s.find_map(|s| s.parse().ok()).await;
1666    ///
1667    /// assert_eq!(number, Some(2));
1668    /// # });
1669    /// ```
1670    fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1671    where
1672        Self: Unpin,
1673        F: FnMut(Self::Item) -> Option<B>,
1674    {
1675        FindMapFuture { stream: self, f }
1676    }
1677
1678    /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1679    ///
1680    /// # Examples
1681    ///
1682    /// ```
1683    /// use futures_lite::stream::{self, StreamExt};
1684    ///
1685    /// # spin_on::spin_on(async {
1686    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1687    ///
1688    /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1689    /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1690    /// assert_eq!(s.position(|x| x == 9).await, None);
1691    /// # });
1692    /// ```
1693    fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1694    where
1695        Self: Unpin,
1696        P: FnMut(Self::Item) -> bool,
1697    {
1698        PositionFuture {
1699            stream: self,
1700            predicate,
1701            index: 0,
1702        }
1703    }
1704
1705    /// Tests if `predicate` returns `true` for all items in the stream.
1706    ///
1707    /// The result is `true` for an empty stream.
1708    ///
1709    /// # Examples
1710    ///
1711    /// ```
1712    /// use futures_lite::stream::{self, StreamExt};
1713    ///
1714    /// # spin_on::spin_on(async {
1715    /// let mut s = stream::iter(vec![1, 2, 3]);
1716    /// assert!(!s.all(|x| x % 2 == 0).await);
1717    ///
1718    /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1719    /// assert!(s.all(|x| x % 2 == 0).await);
1720    ///
1721    /// let mut s = stream::empty::<i32>();
1722    /// assert!(s.all(|x| x % 2 == 0).await);
1723    /// # });
1724    /// ```
1725    fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1726    where
1727        Self: Unpin,
1728        P: FnMut(Self::Item) -> bool,
1729    {
1730        AllFuture {
1731            stream: self,
1732            predicate,
1733        }
1734    }
1735
1736    /// Tests if `predicate` returns `true` for any item in the stream.
1737    ///
1738    /// The result is `false` for an empty stream.
1739    ///
1740    /// # Examples
1741    ///
1742    /// ```
1743    /// use futures_lite::stream::{self, StreamExt};
1744    ///
1745    /// # spin_on::spin_on(async {
1746    /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1747    /// assert!(!s.any(|x| x % 2 == 0).await);
1748    ///
1749    /// let mut s = stream::iter(vec![1, 2, 3]);
1750    /// assert!(s.any(|x| x % 2 == 0).await);
1751    ///
1752    /// let mut s = stream::empty::<i32>();
1753    /// assert!(!s.any(|x| x % 2 == 0).await);
1754    /// # });
1755    /// ```
1756    fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1757    where
1758        Self: Unpin,
1759        P: FnMut(Self::Item) -> bool,
1760    {
1761        AnyFuture {
1762            stream: self,
1763            predicate,
1764        }
1765    }
1766
1767    /// Calls a closure on each item of the stream.
1768    ///
1769    /// # Examples
1770    ///
1771    /// ```
1772    /// use futures_lite::stream::{self, StreamExt};
1773    ///
1774    /// # spin_on::spin_on(async {
1775    /// let mut s = stream::iter(vec![1, 2, 3]);
1776    /// s.for_each(|s| println!("{}", s)).await;
1777    /// # });
1778    /// ```
1779    fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1780    where
1781        Self: Sized,
1782        F: FnMut(Self::Item),
1783    {
1784        ForEachFuture { stream: self, f }
1785    }
1786
1787    /// Calls a fallible closure on each item of the stream, stopping on first error.
1788    ///
1789    /// # Examples
1790    ///
1791    /// ```
1792    /// use futures_lite::stream::{self, StreamExt};
1793    ///
1794    /// # spin_on::spin_on(async {
1795    /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1796    ///
1797    /// let mut v = vec![];
1798    /// let res = s
1799    ///     .try_for_each(|n| {
1800    ///         if n < 2 {
1801    ///             v.push(n);
1802    ///             Ok(())
1803    ///         } else {
1804    ///             Err("too big")
1805    ///         }
1806    ///     })
1807    ///     .await;
1808    ///
1809    /// assert_eq!(v, &[0, 1]);
1810    /// assert_eq!(res, Err("too big"));
1811    /// # });
1812    /// ```
1813    fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1814    where
1815        Self: Unpin,
1816        F: FnMut(Self::Item) -> Result<(), E>,
1817    {
1818        TryForEachFuture { stream: self, f }
1819    }
1820
1821    /// Zips up two streams into a single stream of pairs.
1822    ///
1823    /// The stream of pairs stops when either of the original two streams is exhausted.
1824    ///
1825    /// # Examples
1826    ///
1827    /// ```
1828    /// use futures_lite::stream::{self, StreamExt};
1829    ///
1830    /// # spin_on::spin_on(async {
1831    /// let l = stream::iter(vec![1, 2, 3]);
1832    /// let r = stream::iter(vec![4, 5, 6, 7]);
1833    /// let mut s = l.zip(r);
1834    ///
1835    /// assert_eq!(s.next().await, Some((1, 4)));
1836    /// assert_eq!(s.next().await, Some((2, 5)));
1837    /// assert_eq!(s.next().await, Some((3, 6)));
1838    /// assert_eq!(s.next().await, None);
1839    /// # });
1840    /// ```
1841    fn zip<U>(self, other: U) -> Zip<Self, U>
1842    where
1843        Self: Sized,
1844        U: Stream,
1845    {
1846        Zip {
1847            item_slot: None,
1848            first: self,
1849            second: other,
1850        }
1851    }
1852
1853    /// Collects a stream of pairs into a pair of collections.
1854    ///
1855    /// # Examples
1856    ///
1857    /// ```
1858    /// use futures_lite::stream::{self, StreamExt};
1859    ///
1860    /// # spin_on::spin_on(async {
1861    /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1862    /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1863    ///
1864    /// assert_eq!(left, [1, 3]);
1865    /// assert_eq!(right, [2, 4]);
1866    /// # });
1867    /// ```
1868    fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1869    where
1870        FromA: Default + Extend<A>,
1871        FromB: Default + Extend<B>,
1872        Self: Stream<Item = (A, B)> + Sized,
1873    {
1874        UnzipFuture {
1875            stream: self,
1876            res: Some(Default::default()),
1877        }
1878    }
1879
1880    /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1881    ///
1882    /// # Examples
1883    ///
1884    /// ```
1885    /// use futures_lite::stream::{self, StreamExt};
1886    /// use futures_lite::stream::{once, pending};
1887    ///
1888    /// # spin_on::spin_on(async {
1889    /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1890    /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1891    ///
1892    /// // The first future wins.
1893    /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1894    /// # })
1895    /// ```
1896    fn or<S>(self, other: S) -> Or<Self, S>
1897    where
1898        Self: Sized,
1899        S: Stream<Item = Self::Item>,
1900    {
1901        Or {
1902            stream1: self,
1903            stream2: other,
1904        }
1905    }
1906
1907    /// Merges with `other` stream, with no preference for either stream when both are ready.
1908    ///
1909    /// # Examples
1910    ///
1911    /// ```
1912    /// use futures_lite::stream::{self, StreamExt};
1913    /// use futures_lite::stream::{once, pending};
1914    ///
1915    /// # spin_on::spin_on(async {
1916    /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1917    /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1918    ///
1919    /// // One of the two stream is randomly chosen as the winner.
1920    /// let res = once(1).race(once(2)).next().await;
1921    /// # })
1922    /// ```
1923    #[cfg(all(feature = "std", feature = "race"))]
1924    fn race<S>(self, other: S) -> Race<Self, S>
1925    where
1926        Self: Sized,
1927        S: Stream<Item = Self::Item>,
1928    {
1929        Race {
1930            stream1: self,
1931            stream2: other,
1932            rng: Rng::new(),
1933        }
1934    }
1935
1936    /// Yields all immediately available values from a stream.
1937    ///
1938    /// This is intended to be used as a way of polling a stream without waiting, similar to the
1939    /// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
1940    /// on an [`async_channel::Receiver`] will return all messages that are currently in the
1941    /// channel, but will not wait for new messages.
1942    ///
1943    /// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
1944    /// polling context in order to poll the underlying stream. Since this stream will never return
1945    /// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
1946    /// [`Iterator`].
1947    ///
1948    /// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
1949    /// the future if it is polled again.
1950    ///
1951    /// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
1952    /// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
1953    /// [`Stream`]: crate::stream::Stream
1954    /// [`Iterator`]: std::iter::Iterator
1955    ///
1956    /// # Examples
1957    ///
1958    /// ```
1959    /// use futures_lite::{future, pin};
1960    /// use futures_lite::stream::{self, StreamExt};
1961    ///
1962    /// # #[cfg(feature = "std")] {
1963    /// // A stream that yields two values, returns `Pending`, and then yields one more value.
1964    /// let pend_once = stream::once_future(async {
1965    ///     future::yield_now().await;
1966    ///     3
1967    /// });
1968    /// let s = stream::iter(vec![1, 2]).chain(pend_once);
1969    /// pin!(s);
1970    ///
1971    /// // This will return the first two values, and then `None` because the stream returns
1972    /// // `Pending` after that.
1973    /// let mut iter = stream::block_on(s.drain());
1974    /// assert_eq!(iter.next(), Some(1));
1975    /// assert_eq!(iter.next(), Some(2));
1976    /// assert_eq!(iter.next(), None);
1977    ///
1978    /// // This will return the last value, because the stream returns `Ready` when polled.
1979    /// assert_eq!(iter.next(), Some(3));
1980    /// assert_eq!(iter.next(), None);
1981    /// # }
1982    /// ```
1983    fn drain(&mut self) -> Drain<'_, Self> {
1984        Drain { stream: self }
1985    }
1986
1987    /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1988    ///
1989    /// # Examples
1990    ///
1991    /// ```
1992    /// use futures_lite::stream::{self, StreamExt};
1993    ///
1994    /// # spin_on::spin_on(async {
1995    /// let a = stream::once(1);
1996    /// let b = stream::empty();
1997    ///
1998    /// // Streams of different types can be stored in
1999    /// // the same collection when they are boxed:
2000    /// let streams = vec![a.boxed(), b.boxed()];
2001    /// # })
2002    /// ```
2003    #[cfg(feature = "alloc")]
2004    fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
2005    where
2006        Self: Send + Sized + 'a,
2007    {
2008        Box::pin(self)
2009    }
2010
2011    /// Boxes the stream and changes its type to `dyn Stream + 'a`.
2012    ///
2013    /// # Examples
2014    ///
2015    /// ```
2016    /// use futures_lite::stream::{self, StreamExt};
2017    ///
2018    /// # spin_on::spin_on(async {
2019    /// let a = stream::once(1);
2020    /// let b = stream::empty();
2021    ///
2022    /// // Streams of different types can be stored in
2023    /// // the same collection when they are boxed:
2024    /// let streams = vec![a.boxed_local(), b.boxed_local()];
2025    /// # })
2026    /// ```
2027    #[cfg(feature = "alloc")]
2028    fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
2029    where
2030        Self: Sized + 'a,
2031    {
2032        Box::pin(self)
2033    }
2034}
2035
2036impl<S: Stream + ?Sized> StreamExt for S {}
2037
2038/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
2039///
2040/// # Examples
2041///
2042/// ```
2043/// use futures_lite::stream::{self, StreamExt};
2044///
2045/// // These two lines are equivalent:
2046/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
2047/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
2048/// ```
2049#[cfg(feature = "alloc")]
2050pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
2051
2052/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
2053///
2054/// # Examples
2055///
2056/// ```
2057/// use futures_lite::stream::{self, StreamExt};
2058///
2059/// // These two lines are equivalent:
2060/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
2061/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
2062/// ```
2063#[cfg(feature = "alloc")]
2064pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
2065
2066/// Future for the [`StreamExt::next()`] method.
2067#[derive(Debug)]
2068#[must_use = "futures do nothing unless you `.await` or poll them"]
2069pub struct NextFuture<'a, S: ?Sized> {
2070    stream: &'a mut S,
2071}
2072
2073impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
2074
2075impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
2076    type Output = Option<S::Item>;
2077
2078    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2079        self.stream.poll_next(cx)
2080    }
2081}
2082
2083/// Future for the [`StreamExt::try_next()`] method.
2084#[derive(Debug)]
2085#[must_use = "futures do nothing unless you `.await` or poll them"]
2086pub struct TryNextFuture<'a, S: ?Sized> {
2087    stream: &'a mut S,
2088}
2089
2090impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
2091
2092impl<T, E, S> Future for TryNextFuture<'_, S>
2093where
2094    S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
2095{
2096    type Output = Result<Option<T>, E>;
2097
2098    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2099        let res = ready!(self.stream.poll_next(cx));
2100        Poll::Ready(res.transpose())
2101    }
2102}
2103
2104pin_project! {
2105    /// Future for the [`StreamExt::count()`] method.
2106    #[derive(Debug)]
2107    #[must_use = "futures do nothing unless you `.await` or poll them"]
2108    pub struct CountFuture<S: ?Sized> {
2109        count: usize,
2110        #[pin]
2111        stream: S,
2112    }
2113}
2114
2115impl<S: Stream + ?Sized> Future for CountFuture<S> {
2116    type Output = usize;
2117
2118    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2119        loop {
2120            match ready!(self.as_mut().project().stream.poll_next(cx)) {
2121                None => return Poll::Ready(self.count),
2122                Some(_) => *self.as_mut().project().count += 1,
2123            }
2124        }
2125    }
2126}
2127
2128pin_project! {
2129    /// Future for the [`StreamExt::collect()`] method.
2130    #[derive(Debug)]
2131    #[must_use = "futures do nothing unless you `.await` or poll them"]
2132    pub struct CollectFuture<S, C> {
2133        #[pin]
2134        stream: S,
2135        collection: C,
2136    }
2137}
2138
2139impl<S, C> Future for CollectFuture<S, C>
2140where
2141    S: Stream,
2142    C: Default + Extend<S::Item>,
2143{
2144    type Output = C;
2145
2146    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
2147        let mut this = self.as_mut().project();
2148        loop {
2149            match ready!(this.stream.as_mut().poll_next(cx)) {
2150                Some(e) => this.collection.extend(Some(e)),
2151                None => return Poll::Ready(mem::take(self.project().collection)),
2152            }
2153        }
2154    }
2155}
2156
2157pin_project! {
2158    /// Future for the [`StreamExt::try_collect()`] method.
2159    #[derive(Debug)]
2160    #[must_use = "futures do nothing unless you `.await` or poll them"]
2161    pub struct TryCollectFuture<S, C> {
2162        #[pin]
2163        stream: S,
2164        items: C,
2165    }
2166}
2167
2168impl<T, E, S, C> Future for TryCollectFuture<S, C>
2169where
2170    S: Stream<Item = Result<T, E>>,
2171    C: Default + Extend<T>,
2172{
2173    type Output = Result<C, E>;
2174
2175    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2176        let mut this = self.project();
2177        Poll::Ready(Ok(loop {
2178            match ready!(this.stream.as_mut().poll_next(cx)?) {
2179                Some(x) => this.items.extend(Some(x)),
2180                None => break mem::take(this.items),
2181            }
2182        }))
2183    }
2184}
2185
2186pin_project! {
2187    /// Future for the [`StreamExt::partition()`] method.
2188    #[derive(Debug)]
2189    #[must_use = "futures do nothing unless you `.await` or poll them"]
2190    pub struct PartitionFuture<S, P, B> {
2191        #[pin]
2192        stream: S,
2193        predicate: P,
2194        res: Option<(B, B)>,
2195    }
2196}
2197
2198impl<S, P, B> Future for PartitionFuture<S, P, B>
2199where
2200    S: Stream + Sized,
2201    P: FnMut(&S::Item) -> bool,
2202    B: Default + Extend<S::Item>,
2203{
2204    type Output = (B, B);
2205
2206    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2207        let mut this = self.project();
2208        loop {
2209            match ready!(this.stream.as_mut().poll_next(cx)) {
2210                Some(v) => {
2211                    let res = this.res.as_mut().unwrap();
2212                    if (this.predicate)(&v) {
2213                        res.0.extend(Some(v))
2214                    } else {
2215                        res.1.extend(Some(v))
2216                    }
2217                }
2218                None => return Poll::Ready(this.res.take().unwrap()),
2219            }
2220        }
2221    }
2222}
2223
2224pin_project! {
2225    /// Future for the [`StreamExt::fold()`] method.
2226    #[derive(Debug)]
2227    #[must_use = "futures do nothing unless you `.await` or poll them"]
2228    pub struct FoldFuture<S, F, T> {
2229        #[pin]
2230        stream: S,
2231        f: F,
2232        acc: Option<T>,
2233    }
2234}
2235
2236impl<S, F, T> Future for FoldFuture<S, F, T>
2237where
2238    S: Stream,
2239    F: FnMut(T, S::Item) -> T,
2240{
2241    type Output = T;
2242
2243    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2244        let mut this = self.project();
2245        loop {
2246            match ready!(this.stream.as_mut().poll_next(cx)) {
2247                Some(v) => {
2248                    let old = this.acc.take().unwrap();
2249                    let new = (this.f)(old, v);
2250                    *this.acc = Some(new);
2251                }
2252                None => return Poll::Ready(this.acc.take().unwrap()),
2253            }
2254        }
2255    }
2256}
2257
2258/// Future for the [`StreamExt::try_fold()`] method.
2259#[derive(Debug)]
2260#[must_use = "futures do nothing unless you `.await` or poll them"]
2261pub struct TryFoldFuture<'a, S, F, B> {
2262    stream: &'a mut S,
2263    f: F,
2264    acc: Option<B>,
2265}
2266
2267impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
2268
2269impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
2270where
2271    S: Stream<Item = Result<T, E>> + Unpin,
2272    F: FnMut(B, T) -> Result<B, E>,
2273{
2274    type Output = Result<B, E>;
2275
2276    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2277        loop {
2278            match ready!(self.stream.poll_next(cx)) {
2279                Some(Err(e)) => return Poll::Ready(Err(e)),
2280                Some(Ok(t)) => {
2281                    let old = self.acc.take().unwrap();
2282                    let new = (&mut self.f)(old, t);
2283
2284                    match new {
2285                        Ok(t) => self.acc = Some(t),
2286                        Err(e) => return Poll::Ready(Err(e)),
2287                    }
2288                }
2289                None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2290            }
2291        }
2292    }
2293}
2294
2295pin_project! {
2296    /// Stream for the [`StreamExt::scan()`] method.
2297    #[derive(Clone, Debug)]
2298    #[must_use = "streams do nothing unless polled"]
2299    pub struct Scan<S, St, F> {
2300        #[pin]
2301        stream: S,
2302        state_f: (St, F),
2303    }
2304}
2305
2306impl<S, St, F, B> Stream for Scan<S, St, F>
2307where
2308    S: Stream,
2309    F: FnMut(&mut St, S::Item) -> Option<B>,
2310{
2311    type Item = B;
2312
2313    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2314        let mut this = self.project();
2315        this.stream.as_mut().poll_next(cx).map(|item| {
2316            item.and_then(|item| {
2317                let (state, f) = this.state_f;
2318                f(state, item)
2319            })
2320        })
2321    }
2322}
2323
2324pin_project! {
2325    /// Stream for the [`StreamExt::fuse()`] method.
2326    #[derive(Clone, Debug)]
2327    #[must_use = "streams do nothing unless polled"]
2328    pub struct Fuse<S> {
2329        #[pin]
2330        stream: S,
2331        done: bool,
2332    }
2333}
2334
2335impl<S: Stream> Stream for Fuse<S> {
2336    type Item = S::Item;
2337
2338    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2339        let this = self.project();
2340
2341        if *this.done {
2342            Poll::Ready(None)
2343        } else {
2344            let next = ready!(this.stream.poll_next(cx));
2345            if next.is_none() {
2346                *this.done = true;
2347            }
2348            Poll::Ready(next)
2349        }
2350    }
2351}
2352
2353pin_project! {
2354    /// Stream for the [`StreamExt::map()`] method.
2355    #[derive(Clone, Debug)]
2356    #[must_use = "streams do nothing unless polled"]
2357    pub struct Map<S, F> {
2358        #[pin]
2359        stream: S,
2360        f: F,
2361    }
2362}
2363
2364impl<S, F, T> Stream for Map<S, F>
2365where
2366    S: Stream,
2367    F: FnMut(S::Item) -> T,
2368{
2369    type Item = T;
2370
2371    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2372        let this = self.project();
2373        let next = ready!(this.stream.poll_next(cx));
2374        Poll::Ready(next.map(this.f))
2375    }
2376
2377    fn size_hint(&self) -> (usize, Option<usize>) {
2378        self.stream.size_hint()
2379    }
2380}
2381
2382pin_project! {
2383    /// Stream for the [`StreamExt::flat_map()`] method.
2384    #[derive(Clone, Debug)]
2385    #[must_use = "streams do nothing unless polled"]
2386    pub struct FlatMap<S, U, F> {
2387        #[pin]
2388        stream: Map<S, F>,
2389        #[pin]
2390        inner_stream: Option<U>,
2391    }
2392}
2393
2394impl<S, U, F> Stream for FlatMap<S, U, F>
2395where
2396    S: Stream,
2397    U: Stream,
2398    F: FnMut(S::Item) -> U,
2399{
2400    type Item = U::Item;
2401
2402    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2403        let mut this = self.project();
2404        loop {
2405            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2406                match ready!(inner.poll_next(cx)) {
2407                    Some(item) => return Poll::Ready(Some(item)),
2408                    None => this.inner_stream.set(None),
2409                }
2410            }
2411
2412            match ready!(this.stream.as_mut().poll_next(cx)) {
2413                Some(stream) => this.inner_stream.set(Some(stream)),
2414                None => return Poll::Ready(None),
2415            }
2416        }
2417    }
2418}
2419
2420pin_project! {
2421    /// Stream for the [`StreamExt::flatten()`] method.
2422    #[derive(Clone, Debug)]
2423    #[must_use = "streams do nothing unless polled"]
2424    pub struct Flatten<S: Stream> {
2425        #[pin]
2426        stream: S,
2427        #[pin]
2428        inner_stream: Option<S::Item>,
2429    }
2430}
2431
2432impl<S, U> Stream for Flatten<S>
2433where
2434    S: Stream<Item = U>,
2435    U: Stream,
2436{
2437    type Item = U::Item;
2438
2439    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2440        let mut this = self.project();
2441        loop {
2442            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2443                match ready!(inner.poll_next(cx)) {
2444                    Some(item) => return Poll::Ready(Some(item)),
2445                    None => this.inner_stream.set(None),
2446                }
2447            }
2448
2449            match ready!(this.stream.as_mut().poll_next(cx)) {
2450                Some(inner) => this.inner_stream.set(Some(inner)),
2451                None => return Poll::Ready(None),
2452            }
2453        }
2454    }
2455}
2456
2457pin_project! {
2458    /// Stream for the [`StreamExt::then()`] method.
2459    #[derive(Clone, Debug)]
2460    #[must_use = "streams do nothing unless polled"]
2461    pub struct Then<S, F, Fut> {
2462        #[pin]
2463        stream: S,
2464        #[pin]
2465        future: Option<Fut>,
2466        f: F,
2467    }
2468}
2469
2470impl<S, F, Fut> Stream for Then<S, F, Fut>
2471where
2472    S: Stream,
2473    F: FnMut(S::Item) -> Fut,
2474    Fut: Future,
2475{
2476    type Item = Fut::Output;
2477
2478    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2479        let mut this = self.project();
2480
2481        loop {
2482            if let Some(fut) = this.future.as_mut().as_pin_mut() {
2483                let item = ready!(fut.poll(cx));
2484                this.future.set(None);
2485                return Poll::Ready(Some(item));
2486            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2487                this.future.set(Some((this.f)(item)));
2488            } else {
2489                return Poll::Ready(None);
2490            }
2491        }
2492    }
2493
2494    fn size_hint(&self) -> (usize, Option<usize>) {
2495        let future_len = self.future.is_some() as usize;
2496        let (lower, upper) = self.stream.size_hint();
2497        let lower = lower.saturating_add(future_len);
2498        let upper = upper.and_then(|u| u.checked_add(future_len));
2499        (lower, upper)
2500    }
2501}
2502
2503pin_project! {
2504    /// Stream for the [`StreamExt::filter()`] method.
2505    #[derive(Clone, Debug)]
2506    #[must_use = "streams do nothing unless polled"]
2507    pub struct Filter<S, P> {
2508        #[pin]
2509        stream: S,
2510        predicate: P,
2511    }
2512}
2513
2514impl<S, P> Stream for Filter<S, P>
2515where
2516    S: Stream,
2517    P: FnMut(&S::Item) -> bool,
2518{
2519    type Item = S::Item;
2520
2521    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2522        let mut this = self.project();
2523        loop {
2524            match ready!(this.stream.as_mut().poll_next(cx)) {
2525                None => return Poll::Ready(None),
2526                Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2527                Some(_) => {}
2528            }
2529        }
2530    }
2531
2532    fn size_hint(&self) -> (usize, Option<usize>) {
2533        let (_, hi) = self.stream.size_hint();
2534
2535        // If the filter matches all of the elements, it will match the stream's upper bound.
2536        // If the filter matches none of the elements, there will be zero returned values.
2537        (0, hi)
2538    }
2539}
2540
2541/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2542///
2543/// # Examples
2544///
2545/// ```
2546/// use futures_lite::stream::{self, once, pending, StreamExt};
2547///
2548/// # spin_on::spin_on(async {
2549/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2550/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2551///
2552/// // The first stream wins.
2553/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2554/// # })
2555/// ```
2556pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2557where
2558    S1: Stream<Item = T>,
2559    S2: Stream<Item = T>,
2560{
2561    Or { stream1, stream2 }
2562}
2563
2564pin_project! {
2565    /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2566    #[derive(Clone, Debug)]
2567    #[must_use = "streams do nothing unless polled"]
2568    pub struct Or<S1, S2> {
2569        #[pin]
2570        stream1: S1,
2571        #[pin]
2572        stream2: S2,
2573    }
2574}
2575
2576impl<T, S1, S2> Stream for Or<S1, S2>
2577where
2578    S1: Stream<Item = T>,
2579    S2: Stream<Item = T>,
2580{
2581    type Item = T;
2582
2583    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2584        let mut this = self.project();
2585
2586        if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2587            return Poll::Ready(Some(t));
2588        }
2589        this.stream2.as_mut().poll_next(cx)
2590    }
2591}
2592
2593/// Merges two streams, with no preference for either stream when both are ready.
2594///
2595/// # Examples
2596///
2597/// ```
2598/// use futures_lite::stream::{self, once, pending, StreamExt};
2599///
2600/// # spin_on::spin_on(async {
2601/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2602/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2603///
2604/// // One of the two stream is randomly chosen as the winner.
2605/// let res = stream::race(once(1), once(2)).next().await;
2606/// # })
2607/// ```
2608#[cfg(all(feature = "std", feature = "race"))]
2609pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2610where
2611    S1: Stream<Item = T>,
2612    S2: Stream<Item = T>,
2613{
2614    Race {
2615        stream1,
2616        stream2,
2617        rng: Rng::new(),
2618    }
2619}
2620
2621/// Races two streams, but with a user-provided seed for randomness.
2622///
2623/// # Examples
2624///
2625/// ```
2626/// use futures_lite::stream::{self, once, pending, StreamExt};
2627///
2628/// // A fixed seed is used for reproducibility.
2629/// const SEED: u64 = 123;
2630///
2631/// # spin_on::spin_on(async {
2632/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
2633/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
2634///
2635/// // One of the two stream is randomly chosen as the winner.
2636/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
2637/// # })
2638/// ```
2639#[cfg(feature = "race")]
2640pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
2641where
2642    S1: Stream<Item = T>,
2643    S2: Stream<Item = T>,
2644{
2645    Race {
2646        stream1,
2647        stream2,
2648        rng: Rng::with_seed(seed),
2649    }
2650}
2651
2652#[cfg(feature = "race")]
2653pin_project! {
2654    /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2655    #[derive(Clone, Debug)]
2656    #[must_use = "streams do nothing unless polled"]
2657    pub struct Race<S1, S2> {
2658        #[pin]
2659        stream1: S1,
2660        #[pin]
2661        stream2: S2,
2662        rng: Rng,
2663    }
2664}
2665
2666#[cfg(feature = "race")]
2667impl<T, S1, S2> Stream for Race<S1, S2>
2668where
2669    S1: Stream<Item = T>,
2670    S2: Stream<Item = T>,
2671{
2672    type Item = T;
2673
2674    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2675        let mut this = self.project();
2676
2677        if this.rng.bool() {
2678            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2679                return Poll::Ready(Some(t));
2680            }
2681            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2682                return Poll::Ready(Some(t));
2683            }
2684        } else {
2685            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2686                return Poll::Ready(Some(t));
2687            }
2688            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2689                return Poll::Ready(Some(t));
2690            }
2691        }
2692        Poll::Pending
2693    }
2694}
2695
2696pin_project! {
2697    /// Stream for the [`StreamExt::filter_map()`] method.
2698    #[derive(Clone, Debug)]
2699    #[must_use = "streams do nothing unless polled"]
2700    pub struct FilterMap<S, F> {
2701        #[pin]
2702        stream: S,
2703        f: F,
2704    }
2705}
2706
2707impl<S, F, T> Stream for FilterMap<S, F>
2708where
2709    S: Stream,
2710    F: FnMut(S::Item) -> Option<T>,
2711{
2712    type Item = T;
2713
2714    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2715        let mut this = self.project();
2716        loop {
2717            match ready!(this.stream.as_mut().poll_next(cx)) {
2718                None => return Poll::Ready(None),
2719                Some(v) => {
2720                    if let Some(t) = (this.f)(v) {
2721                        return Poll::Ready(Some(t));
2722                    }
2723                }
2724            }
2725        }
2726    }
2727}
2728
2729pin_project! {
2730    /// Stream for the [`StreamExt::take()`] method.
2731    #[derive(Clone, Debug)]
2732    #[must_use = "streams do nothing unless polled"]
2733    pub struct Take<S> {
2734        #[pin]
2735        stream: S,
2736        n: usize,
2737    }
2738}
2739
2740impl<S: Stream> Stream for Take<S> {
2741    type Item = S::Item;
2742
2743    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2744        let this = self.project();
2745
2746        if *this.n == 0 {
2747            Poll::Ready(None)
2748        } else {
2749            let next = ready!(this.stream.poll_next(cx));
2750            match next {
2751                Some(_) => *this.n -= 1,
2752                None => *this.n = 0,
2753            }
2754            Poll::Ready(next)
2755        }
2756    }
2757}
2758
2759pin_project! {
2760    /// Stream for the [`StreamExt::take_while()`] method.
2761    #[derive(Clone, Debug)]
2762    #[must_use = "streams do nothing unless polled"]
2763    pub struct TakeWhile<S, P> {
2764        #[pin]
2765        stream: S,
2766        predicate: P,
2767    }
2768}
2769
2770impl<S, P> Stream for TakeWhile<S, P>
2771where
2772    S: Stream,
2773    P: FnMut(&S::Item) -> bool,
2774{
2775    type Item = S::Item;
2776
2777    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2778        let this = self.project();
2779
2780        match ready!(this.stream.poll_next(cx)) {
2781            Some(v) => {
2782                if (this.predicate)(&v) {
2783                    Poll::Ready(Some(v))
2784                } else {
2785                    Poll::Ready(None)
2786                }
2787            }
2788            None => Poll::Ready(None),
2789        }
2790    }
2791}
2792
2793pin_project! {
2794    /// Stream for the [`StreamExt::skip()`] method.
2795    #[derive(Clone, Debug)]
2796    #[must_use = "streams do nothing unless polled"]
2797    pub struct Skip<S> {
2798        #[pin]
2799        stream: S,
2800        n: usize,
2801    }
2802}
2803
2804impl<S: Stream> Stream for Skip<S> {
2805    type Item = S::Item;
2806
2807    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2808        let mut this = self.project();
2809        loop {
2810            match ready!(this.stream.as_mut().poll_next(cx)) {
2811                Some(v) => match *this.n {
2812                    0 => return Poll::Ready(Some(v)),
2813                    _ => *this.n -= 1,
2814                },
2815                None => return Poll::Ready(None),
2816            }
2817        }
2818    }
2819}
2820
2821pin_project! {
2822    /// Stream for the [`StreamExt::skip_while()`] method.
2823    #[derive(Clone, Debug)]
2824    #[must_use = "streams do nothing unless polled"]
2825    pub struct SkipWhile<S, P> {
2826        #[pin]
2827        stream: S,
2828        predicate: Option<P>,
2829    }
2830}
2831
2832impl<S, P> Stream for SkipWhile<S, P>
2833where
2834    S: Stream,
2835    P: FnMut(&S::Item) -> bool,
2836{
2837    type Item = S::Item;
2838
2839    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2840        let mut this = self.project();
2841        loop {
2842            match ready!(this.stream.as_mut().poll_next(cx)) {
2843                Some(v) => match this.predicate {
2844                    Some(p) => {
2845                        if !p(&v) {
2846                            *this.predicate = None;
2847                            return Poll::Ready(Some(v));
2848                        }
2849                    }
2850                    None => return Poll::Ready(Some(v)),
2851                },
2852                None => return Poll::Ready(None),
2853            }
2854        }
2855    }
2856}
2857
2858pin_project! {
2859    /// Stream for the [`StreamExt::step_by()`] method.
2860    #[derive(Clone, Debug)]
2861    #[must_use = "streams do nothing unless polled"]
2862    pub struct StepBy<S> {
2863        #[pin]
2864        stream: S,
2865        step: usize,
2866        i: usize,
2867    }
2868}
2869
2870impl<S: Stream> Stream for StepBy<S> {
2871    type Item = S::Item;
2872
2873    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2874        let mut this = self.project();
2875        loop {
2876            match ready!(this.stream.as_mut().poll_next(cx)) {
2877                Some(v) => {
2878                    if *this.i == 0 {
2879                        *this.i = *this.step - 1;
2880                        return Poll::Ready(Some(v));
2881                    } else {
2882                        *this.i -= 1;
2883                    }
2884                }
2885                None => return Poll::Ready(None),
2886            }
2887        }
2888    }
2889}
2890
2891pin_project! {
2892    /// Stream for the [`StreamExt::chain()`] method.
2893    #[derive(Clone, Debug)]
2894    #[must_use = "streams do nothing unless polled"]
2895    pub struct Chain<S, U> {
2896        #[pin]
2897        first: Fuse<S>,
2898        #[pin]
2899        second: Fuse<U>,
2900    }
2901}
2902
2903impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2904    type Item = S::Item;
2905
2906    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2907        let mut this = self.project();
2908
2909        if !this.first.done {
2910            let next = ready!(this.first.as_mut().poll_next(cx));
2911            if let Some(next) = next {
2912                return Poll::Ready(Some(next));
2913            }
2914        }
2915
2916        if !this.second.done {
2917            let next = ready!(this.second.as_mut().poll_next(cx));
2918            if let Some(next) = next {
2919                return Poll::Ready(Some(next));
2920            }
2921        }
2922
2923        if this.first.done && this.second.done {
2924            Poll::Ready(None)
2925        } else {
2926            Poll::Pending
2927        }
2928    }
2929}
2930
2931pin_project! {
2932    /// Stream for the [`StreamExt::cloned()`] method.
2933    #[derive(Clone, Debug)]
2934    #[must_use = "streams do nothing unless polled"]
2935    pub struct Cloned<S> {
2936        #[pin]
2937        stream: S,
2938    }
2939}
2940
2941impl<'a, S, T: 'a> Stream for Cloned<S>
2942where
2943    S: Stream<Item = &'a T>,
2944    T: Clone,
2945{
2946    type Item = T;
2947
2948    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2949        let this = self.project();
2950        let next = ready!(this.stream.poll_next(cx));
2951        Poll::Ready(next.cloned())
2952    }
2953}
2954
2955pin_project! {
2956    /// Stream for the [`StreamExt::copied()`] method.
2957    #[derive(Clone, Debug)]
2958    #[must_use = "streams do nothing unless polled"]
2959    pub struct Copied<S> {
2960        #[pin]
2961        stream: S,
2962    }
2963}
2964
2965impl<'a, S, T: 'a> Stream for Copied<S>
2966where
2967    S: Stream<Item = &'a T>,
2968    T: Copy,
2969{
2970    type Item = T;
2971
2972    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2973        let this = self.project();
2974        let next = ready!(this.stream.poll_next(cx));
2975        Poll::Ready(next.copied())
2976    }
2977}
2978
2979pin_project! {
2980    /// Stream for the [`StreamExt::cycle()`] method.
2981    #[derive(Clone, Debug)]
2982    #[must_use = "streams do nothing unless polled"]
2983    pub struct Cycle<S> {
2984        orig: S,
2985        #[pin]
2986        stream: S,
2987    }
2988}
2989
2990impl<S> Stream for Cycle<S>
2991where
2992    S: Stream + Clone,
2993{
2994    type Item = S::Item;
2995
2996    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2997        match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2998            Some(item) => Poll::Ready(Some(item)),
2999            None => {
3000                let new = self.as_mut().orig.clone();
3001                self.as_mut().project().stream.set(new);
3002                self.project().stream.poll_next(cx)
3003            }
3004        }
3005    }
3006}
3007
3008pin_project! {
3009    /// Stream for the [`StreamExt::enumerate()`] method.
3010    #[derive(Clone, Debug)]
3011    #[must_use = "streams do nothing unless polled"]
3012    pub struct Enumerate<S> {
3013        #[pin]
3014        stream: S,
3015        i: usize,
3016    }
3017}
3018
3019impl<S> Stream for Enumerate<S>
3020where
3021    S: Stream,
3022{
3023    type Item = (usize, S::Item);
3024
3025    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3026        let this = self.project();
3027
3028        match ready!(this.stream.poll_next(cx)) {
3029            Some(v) => {
3030                let ret = (*this.i, v);
3031                *this.i += 1;
3032                Poll::Ready(Some(ret))
3033            }
3034            None => Poll::Ready(None),
3035        }
3036    }
3037}
3038
3039pin_project! {
3040    /// Stream for the [`StreamExt::inspect()`] method.
3041    #[derive(Clone, Debug)]
3042    #[must_use = "streams do nothing unless polled"]
3043    pub struct Inspect<S, F> {
3044        #[pin]
3045        stream: S,
3046        f: F,
3047    }
3048}
3049
3050impl<S, F> Stream for Inspect<S, F>
3051where
3052    S: Stream,
3053    F: FnMut(&S::Item),
3054{
3055    type Item = S::Item;
3056
3057    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3058        let mut this = self.project();
3059        let next = ready!(this.stream.as_mut().poll_next(cx));
3060        if let Some(x) = &next {
3061            (this.f)(x);
3062        }
3063        Poll::Ready(next)
3064    }
3065}
3066
3067/// Future for the [`StreamExt::nth()`] method.
3068#[derive(Debug)]
3069#[must_use = "futures do nothing unless you `.await` or poll them"]
3070pub struct NthFuture<'a, S: ?Sized> {
3071    stream: &'a mut S,
3072    n: usize,
3073}
3074
3075impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
3076
3077impl<'a, S> Future for NthFuture<'a, S>
3078where
3079    S: Stream + Unpin + ?Sized,
3080{
3081    type Output = Option<S::Item>;
3082
3083    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3084        loop {
3085            match ready!(self.stream.poll_next(cx)) {
3086                Some(v) => match self.n {
3087                    0 => return Poll::Ready(Some(v)),
3088                    _ => self.n -= 1,
3089                },
3090                None => return Poll::Ready(None),
3091            }
3092        }
3093    }
3094}
3095
3096pin_project! {
3097    /// Future for the [`StreamExt::last()`] method.
3098    #[derive(Debug)]
3099    #[must_use = "futures do nothing unless you `.await` or poll them"]
3100    pub struct LastFuture<S: Stream> {
3101        #[pin]
3102        stream: S,
3103        last: Option<S::Item>,
3104    }
3105}
3106
3107impl<S: Stream> Future for LastFuture<S> {
3108    type Output = Option<S::Item>;
3109
3110    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3111        let mut this = self.project();
3112        loop {
3113            match ready!(this.stream.as_mut().poll_next(cx)) {
3114                Some(new) => *this.last = Some(new),
3115                None => return Poll::Ready(this.last.take()),
3116            }
3117        }
3118    }
3119}
3120
3121/// Future for the [`StreamExt::find()`] method.
3122#[derive(Debug)]
3123#[must_use = "futures do nothing unless you `.await` or poll them"]
3124pub struct FindFuture<'a, S: ?Sized, P> {
3125    stream: &'a mut S,
3126    predicate: P,
3127}
3128
3129impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
3130
3131impl<'a, S, P> Future for FindFuture<'a, S, P>
3132where
3133    S: Stream + Unpin + ?Sized,
3134    P: FnMut(&S::Item) -> bool,
3135{
3136    type Output = Option<S::Item>;
3137
3138    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3139        loop {
3140            match ready!(self.stream.poll_next(cx)) {
3141                Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
3142                Some(_) => {}
3143                None => return Poll::Ready(None),
3144            }
3145        }
3146    }
3147}
3148
3149/// Future for the [`StreamExt::find_map()`] method.
3150#[derive(Debug)]
3151#[must_use = "futures do nothing unless you `.await` or poll them"]
3152pub struct FindMapFuture<'a, S: ?Sized, F> {
3153    stream: &'a mut S,
3154    f: F,
3155}
3156
3157impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
3158
3159impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
3160where
3161    S: Stream + Unpin + ?Sized,
3162    F: FnMut(S::Item) -> Option<B>,
3163{
3164    type Output = Option<B>;
3165
3166    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3167        loop {
3168            match ready!(self.stream.poll_next(cx)) {
3169                Some(v) => {
3170                    if let Some(v) = (&mut self.f)(v) {
3171                        return Poll::Ready(Some(v));
3172                    }
3173                }
3174                None => return Poll::Ready(None),
3175            }
3176        }
3177    }
3178}
3179
3180/// Future for the [`StreamExt::position()`] method.
3181#[derive(Debug)]
3182#[must_use = "futures do nothing unless you `.await` or poll them"]
3183pub struct PositionFuture<'a, S: ?Sized, P> {
3184    stream: &'a mut S,
3185    predicate: P,
3186    index: usize,
3187}
3188
3189impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
3190
3191impl<'a, S, P> Future for PositionFuture<'a, S, P>
3192where
3193    S: Stream + Unpin + ?Sized,
3194    P: FnMut(S::Item) -> bool,
3195{
3196    type Output = Option<usize>;
3197
3198    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3199        loop {
3200            match ready!(self.stream.poll_next(cx)) {
3201                Some(v) => {
3202                    if (&mut self.predicate)(v) {
3203                        return Poll::Ready(Some(self.index));
3204                    } else {
3205                        self.index += 1;
3206                    }
3207                }
3208                None => return Poll::Ready(None),
3209            }
3210        }
3211    }
3212}
3213
3214/// Future for the [`StreamExt::all()`] method.
3215#[derive(Debug)]
3216#[must_use = "futures do nothing unless you `.await` or poll them"]
3217pub struct AllFuture<'a, S: ?Sized, P> {
3218    stream: &'a mut S,
3219    predicate: P,
3220}
3221
3222impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
3223
3224impl<S, P> Future for AllFuture<'_, S, P>
3225where
3226    S: Stream + Unpin + ?Sized,
3227    P: FnMut(S::Item) -> bool,
3228{
3229    type Output = bool;
3230
3231    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3232        loop {
3233            match ready!(self.stream.poll_next(cx)) {
3234                Some(v) => {
3235                    if !(&mut self.predicate)(v) {
3236                        return Poll::Ready(false);
3237                    }
3238                }
3239                None => return Poll::Ready(true),
3240            }
3241        }
3242    }
3243}
3244
3245/// Future for the [`StreamExt::any()`] method.
3246#[derive(Debug)]
3247#[must_use = "futures do nothing unless you `.await` or poll them"]
3248pub struct AnyFuture<'a, S: ?Sized, P> {
3249    stream: &'a mut S,
3250    predicate: P,
3251}
3252
3253impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
3254
3255impl<S, P> Future for AnyFuture<'_, S, P>
3256where
3257    S: Stream + Unpin + ?Sized,
3258    P: FnMut(S::Item) -> bool,
3259{
3260    type Output = bool;
3261
3262    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3263        loop {
3264            match ready!(self.stream.poll_next(cx)) {
3265                Some(v) => {
3266                    if (&mut self.predicate)(v) {
3267                        return Poll::Ready(true);
3268                    }
3269                }
3270                None => return Poll::Ready(false),
3271            }
3272        }
3273    }
3274}
3275
3276pin_project! {
3277    /// Future for the [`StreamExt::for_each()`] method.
3278    #[derive(Debug)]
3279    #[must_use = "futures do nothing unless you `.await` or poll them"]
3280    pub struct ForEachFuture<S, F> {
3281        #[pin]
3282        stream: S,
3283        f: F,
3284    }
3285}
3286
3287impl<S, F> Future for ForEachFuture<S, F>
3288where
3289    S: Stream,
3290    F: FnMut(S::Item),
3291{
3292    type Output = ();
3293
3294    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3295        let mut this = self.project();
3296        loop {
3297            match ready!(this.stream.as_mut().poll_next(cx)) {
3298                Some(v) => (this.f)(v),
3299                None => return Poll::Ready(()),
3300            }
3301        }
3302    }
3303}
3304
3305/// Future for the [`StreamExt::try_for_each()`] method.
3306#[derive(Debug)]
3307#[must_use = "futures do nothing unless you `.await` or poll them"]
3308pub struct TryForEachFuture<'a, S: ?Sized, F> {
3309    stream: &'a mut S,
3310    f: F,
3311}
3312
3313impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
3314
3315impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
3316where
3317    S: Stream + Unpin + ?Sized,
3318    F: FnMut(S::Item) -> Result<(), E>,
3319{
3320    type Output = Result<(), E>;
3321
3322    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3323        loop {
3324            match ready!(self.stream.poll_next(cx)) {
3325                None => return Poll::Ready(Ok(())),
3326                Some(v) => (&mut self.f)(v)?,
3327            }
3328        }
3329    }
3330}
3331
3332pin_project! {
3333    /// Stream for the [`StreamExt::zip()`] method.
3334    #[derive(Clone, Debug)]
3335    #[must_use = "streams do nothing unless polled"]
3336    pub struct Zip<A: Stream, B> {
3337        item_slot: Option<A::Item>,
3338        #[pin]
3339        first: A,
3340        #[pin]
3341        second: B,
3342    }
3343}
3344
3345impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3346    type Item = (A::Item, B::Item);
3347
3348    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3349        let this = self.project();
3350
3351        if this.item_slot.is_none() {
3352            match this.first.poll_next(cx) {
3353                Poll::Pending => return Poll::Pending,
3354                Poll::Ready(None) => return Poll::Ready(None),
3355                Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3356            }
3357        }
3358
3359        let second_item = ready!(this.second.poll_next(cx));
3360        let first_item = this.item_slot.take().unwrap();
3361        Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3362    }
3363}
3364
3365pin_project! {
3366    /// Future for the [`StreamExt::unzip()`] method.
3367    #[derive(Debug)]
3368    #[must_use = "futures do nothing unless you `.await` or poll them"]
3369    pub struct UnzipFuture<S, FromA, FromB> {
3370        #[pin]
3371        stream: S,
3372        res: Option<(FromA, FromB)>,
3373    }
3374}
3375
3376impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3377where
3378    S: Stream<Item = (A, B)>,
3379    FromA: Default + Extend<A>,
3380    FromB: Default + Extend<B>,
3381{
3382    type Output = (FromA, FromB);
3383
3384    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3385        let mut this = self.project();
3386
3387        loop {
3388            match ready!(this.stream.as_mut().poll_next(cx)) {
3389                Some((a, b)) => {
3390                    let res = this.res.as_mut().unwrap();
3391                    res.0.extend(Some(a));
3392                    res.1.extend(Some(b));
3393                }
3394                None => return Poll::Ready(this.res.take().unwrap()),
3395            }
3396        }
3397    }
3398}
3399
3400/// Stream for the [`StreamExt::drain()`] method.
3401#[derive(Debug)]
3402#[must_use = "streams do nothing unless polled"]
3403pub struct Drain<'a, S: ?Sized> {
3404    stream: &'a mut S,
3405}
3406
3407impl<'a, S: Unpin + ?Sized> Unpin for Drain<'a, S> {}
3408
3409impl<'a, S: Unpin + ?Sized> Drain<'a, S> {
3410    /// Get a reference to the underlying stream.
3411    ///
3412    /// ## Examples
3413    ///
3414    /// ```
3415    /// use futures_lite::{prelude::*, stream};
3416    ///
3417    /// # spin_on::spin_on(async {
3418    /// let mut s = stream::iter(vec![1, 2, 3]);
3419    /// let s2 = s.drain();
3420    ///
3421    /// let inner = s2.get_ref();
3422    /// // s and inner are the same.
3423    /// # });
3424    /// ```
3425    pub fn get_ref(&self) -> &S {
3426        &self.stream
3427    }
3428
3429    /// Get a mutable reference to the underlying stream.
3430    ///
3431    /// ## Examples
3432    ///
3433    /// ```
3434    /// use futures_lite::{prelude::*, stream};
3435    ///
3436    /// # spin_on::spin_on(async {
3437    /// let mut s = stream::iter(vec![1, 2, 3]);
3438    /// let mut s2 = s.drain();
3439    ///
3440    /// let inner = s2.get_mut();
3441    /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3442    /// # });
3443    /// ```
3444    pub fn get_mut(&mut self) -> &mut S {
3445        &mut self.stream
3446    }
3447
3448    /// Consume this stream and get the underlying stream.
3449    ///
3450    /// ## Examples
3451    ///
3452    /// ```
3453    /// use futures_lite::{prelude::*, stream};
3454    ///
3455    /// # spin_on::spin_on(async {
3456    /// let mut s = stream::iter(vec![1, 2, 3]);
3457    /// let mut s2 = s.drain();
3458    ///
3459    /// let inner = s2.into_inner();
3460    /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3461    /// # });
3462    /// ```
3463    pub fn into_inner(self) -> &'a mut S {
3464        self.stream
3465    }
3466}
3467
3468impl<'a, S: Stream + Unpin + ?Sized> Stream for Drain<'a, S> {
3469    type Item = S::Item;
3470
3471    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3472        match self.stream.poll_next(cx) {
3473            Poll::Ready(x) => Poll::Ready(x),
3474            Poll::Pending => Poll::Ready(None),
3475        }
3476    }
3477
3478    fn size_hint(&self) -> (usize, Option<usize>) {
3479        let (_, hi) = self.stream.size_hint();
3480        (0, hi)
3481    }
3482}