futures_lite/stream.rs
1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[cfg(all(not(feature = "std"), feature = "alloc"))]
19extern crate alloc;
20
21#[doc(no_inline)]
22pub use futures_core::stream::Stream;
23
24#[cfg(all(not(feature = "std"), feature = "alloc"))]
25use alloc::boxed::Box;
26
27use core::fmt;
28use core::future::Future;
29use core::marker::PhantomData;
30use core::mem;
31use core::pin::Pin;
32use core::task::{Context, Poll};
33
34#[cfg(feature = "race")]
35use fastrand::Rng;
36
37use pin_project_lite::pin_project;
38
39use crate::ready;
40
41/// Converts a stream into a blocking iterator.
42///
43/// # Examples
44///
45/// ```
46/// use futures_lite::{pin, stream};
47///
48/// let stream = stream::once(7);
49/// pin!(stream);
50///
51/// let mut iter = stream::block_on(stream);
52/// assert_eq!(iter.next(), Some(7));
53/// assert_eq!(iter.next(), None);
54/// ```
55#[cfg(feature = "std")]
56pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
57 BlockOn(stream)
58}
59
60/// Iterator for the [`block_on()`] function.
61#[derive(Debug)]
62pub struct BlockOn<S>(S);
63
64#[cfg(feature = "std")]
65impl<S: Stream + Unpin> Iterator for BlockOn<S> {
66 type Item = S::Item;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 crate::future::block_on(self.0.next())
70 }
71
72 fn size_hint(&self) -> (usize, Option<usize>) {
73 self.0.size_hint()
74 }
75
76 fn count(self) -> usize {
77 crate::future::block_on(self.0.count())
78 }
79
80 fn last(self) -> Option<Self::Item> {
81 crate::future::block_on(self.0.last())
82 }
83
84 fn nth(&mut self, n: usize) -> Option<Self::Item> {
85 crate::future::block_on(self.0.nth(n))
86 }
87
88 fn fold<B, F>(self, init: B, f: F) -> B
89 where
90 F: FnMut(B, Self::Item) -> B,
91 {
92 crate::future::block_on(self.0.fold(init, f))
93 }
94
95 fn for_each<F>(self, f: F) -> F::Output
96 where
97 F: FnMut(Self::Item),
98 {
99 crate::future::block_on(self.0.for_each(f))
100 }
101
102 fn all<F>(&mut self, f: F) -> bool
103 where
104 F: FnMut(Self::Item) -> bool,
105 {
106 crate::future::block_on(self.0.all(f))
107 }
108
109 fn any<F>(&mut self, f: F) -> bool
110 where
111 F: FnMut(Self::Item) -> bool,
112 {
113 crate::future::block_on(self.0.any(f))
114 }
115
116 fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
117 where
118 P: FnMut(&Self::Item) -> bool,
119 {
120 crate::future::block_on(self.0.find(predicate))
121 }
122
123 fn find_map<B, F>(&mut self, f: F) -> Option<B>
124 where
125 F: FnMut(Self::Item) -> Option<B>,
126 {
127 crate::future::block_on(self.0.find_map(f))
128 }
129
130 fn position<P>(&mut self, predicate: P) -> Option<usize>
131 where
132 P: FnMut(Self::Item) -> bool,
133 {
134 crate::future::block_on(self.0.position(predicate))
135 }
136}
137
138/// Creates an empty stream.
139///
140/// # Examples
141///
142/// ```
143/// use futures_lite::stream::{self, StreamExt};
144///
145/// # spin_on::spin_on(async {
146/// let mut s = stream::empty::<i32>();
147/// assert_eq!(s.next().await, None);
148/// # })
149/// ```
150pub fn empty<T>() -> Empty<T> {
151 Empty {
152 _marker: PhantomData,
153 }
154}
155
156/// Stream for the [`empty()`] function.
157#[derive(Clone, Debug)]
158#[must_use = "streams do nothing unless polled"]
159pub struct Empty<T> {
160 _marker: PhantomData<T>,
161}
162
163impl<T> Unpin for Empty<T> {}
164
165impl<T> Stream for Empty<T> {
166 type Item = T;
167
168 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169 Poll::Ready(None)
170 }
171
172 fn size_hint(&self) -> (usize, Option<usize>) {
173 (0, Some(0))
174 }
175}
176
177/// Creates a stream from an iterator.
178///
179/// # Examples
180///
181/// ```
182/// use futures_lite::stream::{self, StreamExt};
183///
184/// # spin_on::spin_on(async {
185/// let mut s = stream::iter(vec![1, 2]);
186///
187/// assert_eq!(s.next().await, Some(1));
188/// assert_eq!(s.next().await, Some(2));
189/// assert_eq!(s.next().await, None);
190/// # })
191/// ```
192pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
193 Iter {
194 iter: iter.into_iter(),
195 }
196}
197
198/// Stream for the [`iter()`] function.
199#[derive(Clone, Debug)]
200#[must_use = "streams do nothing unless polled"]
201pub struct Iter<I> {
202 iter: I,
203}
204
205impl<I> Unpin for Iter<I> {}
206
207impl<I: Iterator> Stream for Iter<I> {
208 type Item = I::Item;
209
210 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211 Poll::Ready(self.iter.next())
212 }
213
214 fn size_hint(&self) -> (usize, Option<usize>) {
215 self.iter.size_hint()
216 }
217}
218
219/// Creates a stream that yields a single item.
220///
221/// # Examples
222///
223/// ```
224/// use futures_lite::stream::{self, StreamExt};
225///
226/// # spin_on::spin_on(async {
227/// let mut s = stream::once(7);
228///
229/// assert_eq!(s.next().await, Some(7));
230/// assert_eq!(s.next().await, None);
231/// # })
232/// ```
233pub fn once<T>(t: T) -> Once<T> {
234 Once { value: Some(t) }
235}
236
237pin_project! {
238 /// Stream for the [`once()`] function.
239 #[derive(Clone, Debug)]
240 #[must_use = "streams do nothing unless polled"]
241 pub struct Once<T> {
242 value: Option<T>,
243 }
244}
245
246impl<T> Stream for Once<T> {
247 type Item = T;
248
249 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
250 Poll::Ready(self.project().value.take())
251 }
252
253 fn size_hint(&self) -> (usize, Option<usize>) {
254 if self.value.is_some() {
255 (1, Some(1))
256 } else {
257 (0, Some(0))
258 }
259 }
260}
261
262/// Creates a stream that is always pending.
263///
264/// # Examples
265///
266/// ```no_run
267/// use futures_lite::stream::{self, StreamExt};
268///
269/// # spin_on::spin_on(async {
270/// let mut s = stream::pending::<i32>();
271/// s.next().await;
272/// unreachable!();
273/// # })
274/// ```
275pub fn pending<T>() -> Pending<T> {
276 Pending {
277 _marker: PhantomData,
278 }
279}
280
281/// Stream for the [`pending()`] function.
282#[derive(Clone, Debug)]
283#[must_use = "streams do nothing unless polled"]
284pub struct Pending<T> {
285 _marker: PhantomData<T>,
286}
287
288impl<T> Unpin for Pending<T> {}
289
290impl<T> Stream for Pending<T> {
291 type Item = T;
292
293 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
294 Poll::Pending
295 }
296
297 fn size_hint(&self) -> (usize, Option<usize>) {
298 (0, Some(0))
299 }
300}
301
302/// Creates a stream from a function returning [`Poll`].
303///
304/// # Examples
305///
306/// ```
307/// use futures_lite::stream::{self, StreamExt};
308/// use std::task::{Context, Poll};
309///
310/// # spin_on::spin_on(async {
311/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
312/// Poll::Ready(Some(7))
313/// }
314///
315/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
316/// # })
317/// ```
318pub fn poll_fn<T, F>(f: F) -> PollFn<F>
319where
320 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
321{
322 PollFn { f }
323}
324
325/// Stream for the [`poll_fn()`] function.
326#[derive(Clone)]
327#[must_use = "streams do nothing unless polled"]
328pub struct PollFn<F> {
329 f: F,
330}
331
332impl<F> Unpin for PollFn<F> {}
333
334impl<F> fmt::Debug for PollFn<F> {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 f.debug_struct("PollFn").finish()
337 }
338}
339
340impl<T, F> Stream for PollFn<F>
341where
342 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
343{
344 type Item = T;
345
346 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
347 (&mut self.f)(cx)
348 }
349}
350
351/// Creates an infinite stream that yields the same item repeatedly.
352///
353/// # Examples
354///
355/// ```
356/// use futures_lite::stream::{self, StreamExt};
357///
358/// # spin_on::spin_on(async {
359/// let mut s = stream::repeat(7);
360///
361/// assert_eq!(s.next().await, Some(7));
362/// assert_eq!(s.next().await, Some(7));
363/// # })
364/// ```
365pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
366 Repeat { item }
367}
368
369/// Stream for the [`repeat()`] function.
370#[derive(Clone, Debug)]
371#[must_use = "streams do nothing unless polled"]
372pub struct Repeat<T> {
373 item: T,
374}
375
376impl<T> Unpin for Repeat<T> {}
377
378impl<T: Clone> Stream for Repeat<T> {
379 type Item = T;
380
381 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
382 Poll::Ready(Some(self.item.clone()))
383 }
384
385 fn size_hint(&self) -> (usize, Option<usize>) {
386 (usize::MAX, None)
387 }
388}
389
390/// Creates an infinite stream from a closure that generates items.
391///
392/// # Examples
393///
394/// ```
395/// use futures_lite::stream::{self, StreamExt};
396///
397/// # spin_on::spin_on(async {
398/// let mut s = stream::repeat_with(|| 7);
399///
400/// assert_eq!(s.next().await, Some(7));
401/// assert_eq!(s.next().await, Some(7));
402/// # })
403/// ```
404pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
405where
406 F: FnMut() -> T,
407{
408 RepeatWith { f: repeater }
409}
410
411/// Stream for the [`repeat_with()`] function.
412#[derive(Clone, Debug)]
413#[must_use = "streams do nothing unless polled"]
414pub struct RepeatWith<F> {
415 f: F,
416}
417
418impl<F> Unpin for RepeatWith<F> {}
419
420impl<T, F> Stream for RepeatWith<F>
421where
422 F: FnMut() -> T,
423{
424 type Item = T;
425
426 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427 let item = (&mut self.f)();
428 Poll::Ready(Some(item))
429 }
430
431 fn size_hint(&self) -> (usize, Option<usize>) {
432 (usize::MAX, None)
433 }
434}
435
436/// Creates a stream from a seed value and an async closure operating on it.
437///
438/// # Examples
439///
440/// ```
441/// use futures_lite::stream::{self, StreamExt};
442///
443/// # spin_on::spin_on(async {
444/// let s = stream::unfold(0, |mut n| async move {
445/// if n < 2 {
446/// let m = n + 1;
447/// Some((n, m))
448/// } else {
449/// None
450/// }
451/// });
452///
453/// let v: Vec<i32> = s.collect().await;
454/// assert_eq!(v, [0, 1]);
455/// # })
456/// ```
457pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
458where
459 F: FnMut(T) -> Fut,
460 Fut: Future<Output = Option<(Item, T)>>,
461{
462 Unfold {
463 f,
464 state: Some(seed),
465 fut: None,
466 }
467}
468
469pin_project! {
470 /// Stream for the [`unfold()`] function.
471 #[derive(Clone)]
472 #[must_use = "streams do nothing unless polled"]
473 pub struct Unfold<T, F, Fut> {
474 f: F,
475 state: Option<T>,
476 #[pin]
477 fut: Option<Fut>,
478 }
479}
480
481impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
482where
483 T: fmt::Debug,
484 Fut: fmt::Debug,
485{
486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487 f.debug_struct("Unfold")
488 .field("state", &self.state)
489 .field("fut", &self.fut)
490 .finish()
491 }
492}
493
494impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
495where
496 F: FnMut(T) -> Fut,
497 Fut: Future<Output = Option<(Item, T)>>,
498{
499 type Item = Item;
500
501 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502 let mut this = self.project();
503
504 if let Some(state) = this.state.take() {
505 this.fut.set(Some((this.f)(state)));
506 }
507
508 let step = ready!(this
509 .fut
510 .as_mut()
511 .as_pin_mut()
512 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
513 .poll(cx));
514 this.fut.set(None);
515
516 if let Some((item, next_state)) = step {
517 *this.state = Some(next_state);
518 Poll::Ready(Some(item))
519 } else {
520 Poll::Ready(None)
521 }
522 }
523}
524
525/// Creates a stream from a seed value and a fallible async closure operating on it.
526///
527/// # Examples
528///
529/// ```
530/// use futures_lite::stream::{self, StreamExt};
531///
532/// # spin_on::spin_on(async {
533/// let s = stream::try_unfold(0, |mut n| async move {
534/// if n < 2 {
535/// let m = n + 1;
536/// Ok(Some((n, m)))
537/// } else {
538/// std::io::Result::Ok(None)
539/// }
540/// });
541///
542/// let v: Vec<i32> = s.try_collect().await?;
543/// assert_eq!(v, [0, 1]);
544/// # std::io::Result::Ok(()) });
545/// ```
546pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
547where
548 F: FnMut(T) -> Fut,
549 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
550{
551 TryUnfold {
552 f,
553 state: Some(init),
554 fut: None,
555 }
556}
557
558pin_project! {
559 /// Stream for the [`try_unfold()`] function.
560 #[derive(Clone)]
561 #[must_use = "streams do nothing unless polled"]
562 pub struct TryUnfold<T, F, Fut> {
563 f: F,
564 state: Option<T>,
565 #[pin]
566 fut: Option<Fut>,
567 }
568}
569
570impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
571where
572 T: fmt::Debug,
573 Fut: fmt::Debug,
574{
575 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576 f.debug_struct("TryUnfold")
577 .field("state", &self.state)
578 .field("fut", &self.fut)
579 .finish()
580 }
581}
582
583impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
584where
585 F: FnMut(T) -> Fut,
586 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
587{
588 type Item = Result<Item, E>;
589
590 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
591 let mut this = self.project();
592
593 if let Some(state) = this.state.take() {
594 this.fut.set(Some((this.f)(state)));
595 }
596
597 match this.fut.as_mut().as_pin_mut() {
598 None => {
599 // The future previously errored
600 Poll::Ready(None)
601 }
602 Some(future) => {
603 let step = ready!(future.poll(cx));
604 this.fut.set(None);
605
606 match step {
607 Ok(Some((item, next_state))) => {
608 *this.state = Some(next_state);
609 Poll::Ready(Some(Ok(item)))
610 }
611 Ok(None) => Poll::Ready(None),
612 Err(e) => Poll::Ready(Some(Err(e))),
613 }
614 }
615 }
616 }
617}
618
619/// Creates a stream that invokes the given future as its first item, and then
620/// produces no more items.
621///
622/// # Example
623///
624/// ```
625/// use futures_lite::{stream, prelude::*};
626///
627/// # spin_on::spin_on(async {
628/// let mut stream = Box::pin(stream::once_future(async { 1 }));
629/// assert_eq!(stream.next().await, Some(1));
630/// assert_eq!(stream.next().await, None);
631/// # });
632/// ```
633pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
634 OnceFuture {
635 future: Some(future),
636 }
637}
638
639pin_project! {
640 /// Stream for the [`once_future()`] method.
641 #[derive(Debug)]
642 #[must_use = "futures do nothing unless you `.await` or poll them"]
643 pub struct OnceFuture<F> {
644 #[pin]
645 future: Option<F>,
646 }
647}
648
649impl<F: Future> Stream for OnceFuture<F> {
650 type Item = F::Output;
651
652 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
653 let mut this = self.project();
654
655 match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
656 Some(Poll::Ready(t)) => {
657 this.future.set(None);
658 Poll::Ready(Some(t))
659 }
660 Some(Poll::Pending) => Poll::Pending,
661 None => Poll::Ready(None),
662 }
663 }
664}
665
666/// Take elements from this stream until the provided future resolves.
667///
668/// This function will take elements from the stream until the provided
669/// stopping future `fut` resolves. Once the `fut` future becomes ready,
670/// this stream combinator will always return that the stream is done.
671///
672/// The stopping future may return any type. Once the stream is stopped
673/// the result of the stopping future may be accessed with `StopAfterFuture::take_result()`.
674/// The stream may also be resumed with `StopAfterFuture::take_future()`.
675/// See the documentation of [`StopAfterFuture`] for more information.
676///
677/// ```
678/// use futures_lite::stream::{self, StreamExt, stop_after_future};
679/// use futures_lite::future;
680/// use std::task::Poll;
681///
682/// let stream = stream::iter(1..=10);
683///
684/// # spin_on::spin_on(async {
685/// let mut i = 0;
686/// let stop_fut = future::poll_fn(|_cx| {
687/// i += 1;
688/// if i <= 5 {
689/// Poll::Pending
690/// } else {
691/// Poll::Ready(())
692/// }
693/// });
694///
695/// let stream = stop_after_future(stream, stop_fut);
696///
697/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
698/// # });
699pub fn stop_after_future<S, F>(stream: S, future: F) -> StopAfterFuture<S, F>
700where
701 S: Sized + Stream,
702 F: Future,
703{
704 StopAfterFuture {
705 stream,
706 fut: Some(future),
707 fut_result: None,
708 free: false,
709 }
710}
711
712pin_project! {
713 /// Stream for the [`StreamExt::stop_after_future()`] method.
714 #[derive(Clone, Debug)]
715 #[must_use = "streams do nothing unless polled"]
716 pub struct StopAfterFuture<S: Stream, Fut: Future> {
717 #[pin]
718 stream: S,
719 // Contains the inner Future on start and None once the inner Future is resolved
720 // or taken out by the user.
721 #[pin]
722 fut: Option<Fut>,
723 // Contains fut's return value once fut is resolved
724 fut_result: Option<Fut::Output>,
725 // Whether the future was taken out by the user.
726 free: bool,
727 }
728}
729
730impl<St, Fut> StopAfterFuture<St, Fut>
731where
732 St: Stream,
733 Fut: Future,
734{
735 /// Extract the stopping future out of the combinator.
736 ///
737 /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
738 /// Taking out the future means the combinator will be yielding
739 /// elements from the wrapped stream without ever stopping it.
740 pub fn take_future(&mut self) -> Option<Fut> {
741 if self.fut.is_some() {
742 self.free = true;
743 }
744
745 self.fut.take()
746 }
747
748 /// Once the stopping future is resolved, this method can be used
749 /// to extract the value returned by the stopping future.
750 ///
751 /// This may be used to retrieve arbitrary data from the stopping
752 /// future, for example a reason why the stream was stopped.
753 ///
754 /// This method will return `None` if the future isn't resolved yet,
755 /// or if the result was already taken out.
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// # spin_on::spin_on(async {
761 /// use futures_lite::stream::{self, StreamExt, stop_after_future};
762 /// use futures_lite::future;
763 /// use std::task::Poll;
764 ///
765 /// let stream = stream::iter(1..=10);
766 ///
767 /// let mut i = 0;
768 /// let stop_fut = future::poll_fn(|_cx| {
769 /// i += 1;
770 /// if i <= 5 {
771 /// Poll::Pending
772 /// } else {
773 /// Poll::Ready("reason")
774 /// }
775 /// });
776 ///
777 /// let mut stream = stop_after_future(stream, stop_fut);
778 /// let _ = (&mut stream).collect::<Vec<_>>().await;
779 ///
780 /// let result = stream.take_result().unwrap();
781 /// assert_eq!(result, "reason");
782 /// # });
783 /// ```
784 pub fn take_result(&mut self) -> Option<Fut::Output> {
785 self.fut_result.take()
786 }
787
788 /// Whether the stream was stopped yet by the stopping future
789 /// being resolved.
790 pub fn is_stopped(&self) -> bool {
791 !self.free && self.fut.is_none()
792 }
793}
794
795impl<St, Fut> Stream for StopAfterFuture<St, Fut>
796where
797 St: Stream,
798 Fut: Future,
799{
800 type Item = St::Item;
801
802 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
803 let mut this = self.project();
804
805 if let Some(f) = this.fut.as_mut().as_pin_mut() {
806 if let Poll::Ready(result) = f.poll(cx) {
807 this.fut.set(None);
808 *this.fut_result = Some(result);
809 }
810 }
811
812 if !*this.free && this.fut.is_none() {
813 // Future resolved, inner stream stopped
814 Poll::Ready(None)
815 } else {
816 // Future either not resolved yet or taken out by the user
817 let item = ready!(this.stream.poll_next(cx));
818 if item.is_none() {
819 this.fut.set(None);
820 }
821 Poll::Ready(item)
822 }
823 }
824
825 fn size_hint(&self) -> (usize, Option<usize>) {
826 if self.is_stopped() {
827 return (0, Some(0));
828 }
829
830 // Original stream can be truncated at any moment, so the lower bound isn't reliable.
831 let (_, upper_bound) = self.stream.size_hint();
832 (0, upper_bound)
833 }
834}
835
836/// Extension trait for [`Stream`].
837pub trait StreamExt: Stream {
838 /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
839 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
840 where
841 Self: Unpin,
842 {
843 Stream::poll_next(Pin::new(self), cx)
844 }
845
846 /// Retrieves the next item in the stream.
847 ///
848 /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
849 /// resume iteration after that.
850 ///
851 /// # Examples
852 ///
853 /// ```
854 /// use futures_lite::stream::{self, StreamExt};
855 ///
856 /// # spin_on::spin_on(async {
857 /// let mut s = stream::iter(1..=3);
858 ///
859 /// assert_eq!(s.next().await, Some(1));
860 /// assert_eq!(s.next().await, Some(2));
861 /// assert_eq!(s.next().await, Some(3));
862 /// assert_eq!(s.next().await, None);
863 /// # });
864 /// ```
865 fn next(&mut self) -> NextFuture<'_, Self>
866 where
867 Self: Unpin,
868 {
869 NextFuture { stream: self }
870 }
871
872 /// Retrieves the next item in the stream.
873 ///
874 /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
875 /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
876 ///
877 /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// use futures_lite::stream::{self, StreamExt};
883 ///
884 /// # spin_on::spin_on(async {
885 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
886 ///
887 /// assert_eq!(s.try_next().await, Ok(Some(1)));
888 /// assert_eq!(s.try_next().await, Ok(Some(2)));
889 /// assert_eq!(s.try_next().await, Err("error"));
890 /// assert_eq!(s.try_next().await, Ok(None));
891 /// # });
892 /// ```
893 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
894 where
895 Self: Stream<Item = Result<T, E>> + Unpin,
896 {
897 TryNextFuture { stream: self }
898 }
899
900 /// Counts the number of items in the stream.
901 ///
902 /// # Examples
903 ///
904 /// ```
905 /// use futures_lite::stream::{self, StreamExt};
906 ///
907 /// # spin_on::spin_on(async {
908 /// let s1 = stream::iter(vec![0]);
909 /// let s2 = stream::iter(vec![1, 2, 3]);
910 ///
911 /// assert_eq!(s1.count().await, 1);
912 /// assert_eq!(s2.count().await, 3);
913 /// # });
914 /// ```
915 fn count(self) -> CountFuture<Self>
916 where
917 Self: Sized,
918 {
919 CountFuture {
920 stream: self,
921 count: 0,
922 }
923 }
924
925 /// Maps items of the stream to new values using a closure.
926 ///
927 /// # Examples
928 ///
929 /// ```
930 /// use futures_lite::stream::{self, StreamExt};
931 ///
932 /// # spin_on::spin_on(async {
933 /// let s = stream::iter(vec![1, 2, 3]);
934 /// let mut s = s.map(|x| 2 * x);
935 ///
936 /// assert_eq!(s.next().await, Some(2));
937 /// assert_eq!(s.next().await, Some(4));
938 /// assert_eq!(s.next().await, Some(6));
939 /// assert_eq!(s.next().await, None);
940 /// # });
941 /// ```
942 fn map<T, F>(self, f: F) -> Map<Self, F>
943 where
944 Self: Sized,
945 F: FnMut(Self::Item) -> T,
946 {
947 Map { stream: self, f }
948 }
949
950 /// Maps items to streams and then concatenates them.
951 ///
952 /// # Examples
953 ///
954 /// ```
955 /// use futures_lite::stream::{self, StreamExt};
956 ///
957 /// # spin_on::spin_on(async {
958 /// let words = stream::iter(vec!["one", "two"]);
959 ///
960 /// let s: String = words
961 /// .flat_map(|s| stream::iter(s.chars()))
962 /// .collect()
963 /// .await;
964 ///
965 /// assert_eq!(s, "onetwo");
966 /// # });
967 /// ```
968 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
969 where
970 Self: Sized,
971 U: Stream,
972 F: FnMut(Self::Item) -> U,
973 {
974 FlatMap {
975 stream: self.map(f),
976 inner_stream: None,
977 }
978 }
979
980 /// Concatenates inner streams.
981 ///
982 /// # Examples
983 ///
984 /// ```
985 /// use futures_lite::stream::{self, StreamExt};
986 ///
987 /// # spin_on::spin_on(async {
988 /// let s1 = stream::iter(vec![1, 2, 3]);
989 /// let s2 = stream::iter(vec![4, 5]);
990 ///
991 /// let s = stream::iter(vec![s1, s2]);
992 /// let v: Vec<_> = s.flatten().collect().await;
993 /// assert_eq!(v, [1, 2, 3, 4, 5]);
994 /// # });
995 /// ```
996 fn flatten(self) -> Flatten<Self>
997 where
998 Self: Sized,
999 Self::Item: Stream,
1000 {
1001 Flatten {
1002 stream: self,
1003 inner_stream: None,
1004 }
1005 }
1006
1007 /// Maps items of the stream to new values using an async closure.
1008 ///
1009 /// # Examples
1010 ///
1011 /// ```
1012 /// use futures_lite::pin;
1013 /// use futures_lite::stream::{self, StreamExt};
1014 ///
1015 /// # spin_on::spin_on(async {
1016 /// let s = stream::iter(vec![1, 2, 3]);
1017 /// let mut s = s.then(|x| async move { 2 * x });
1018 ///
1019 /// pin!(s);
1020 /// assert_eq!(s.next().await, Some(2));
1021 /// assert_eq!(s.next().await, Some(4));
1022 /// assert_eq!(s.next().await, Some(6));
1023 /// assert_eq!(s.next().await, None);
1024 /// # });
1025 /// ```
1026 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
1027 where
1028 Self: Sized,
1029 F: FnMut(Self::Item) -> Fut,
1030 Fut: Future,
1031 {
1032 Then {
1033 stream: self,
1034 future: None,
1035 f,
1036 }
1037 }
1038
1039 /// Keeps items of the stream for which `predicate` returns `true`.
1040 ///
1041 /// # Examples
1042 ///
1043 /// ```
1044 /// use futures_lite::stream::{self, StreamExt};
1045 ///
1046 /// # spin_on::spin_on(async {
1047 /// let s = stream::iter(vec![1, 2, 3, 4]);
1048 /// let mut s = s.filter(|i| i % 2 == 0);
1049 ///
1050 /// assert_eq!(s.next().await, Some(2));
1051 /// assert_eq!(s.next().await, Some(4));
1052 /// assert_eq!(s.next().await, None);
1053 /// # });
1054 /// ```
1055 fn filter<P>(self, predicate: P) -> Filter<Self, P>
1056 where
1057 Self: Sized,
1058 P: FnMut(&Self::Item) -> bool,
1059 {
1060 Filter {
1061 stream: self,
1062 predicate,
1063 }
1064 }
1065
1066 /// Filters and maps items of the stream using a closure.
1067 ///
1068 /// # Examples
1069 ///
1070 /// ```
1071 /// use futures_lite::stream::{self, StreamExt};
1072 ///
1073 /// # spin_on::spin_on(async {
1074 /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
1075 /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
1076 ///
1077 /// assert_eq!(s.next().await, Some(1));
1078 /// assert_eq!(s.next().await, Some(3));
1079 /// assert_eq!(s.next().await, Some(5));
1080 /// assert_eq!(s.next().await, None);
1081 /// # });
1082 /// ```
1083 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
1084 where
1085 Self: Sized,
1086 F: FnMut(Self::Item) -> Option<T>,
1087 {
1088 FilterMap { stream: self, f }
1089 }
1090
1091 /// Takes only the first `n` items of the stream.
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```
1096 /// use futures_lite::stream::{self, StreamExt};
1097 ///
1098 /// # spin_on::spin_on(async {
1099 /// let mut s = stream::repeat(7).take(2);
1100 ///
1101 /// assert_eq!(s.next().await, Some(7));
1102 /// assert_eq!(s.next().await, Some(7));
1103 /// assert_eq!(s.next().await, None);
1104 /// # });
1105 /// ```
1106 fn take(self, n: usize) -> Take<Self>
1107 where
1108 Self: Sized,
1109 {
1110 Take { stream: self, n }
1111 }
1112
1113 /// Takes items while `predicate` returns `true`.
1114 ///
1115 /// # Examples
1116 ///
1117 /// ```
1118 /// use futures_lite::stream::{self, StreamExt};
1119 ///
1120 /// # spin_on::spin_on(async {
1121 /// let s = stream::iter(vec![1, 2, 3, 4]);
1122 /// let mut s = s.take_while(|x| *x < 3);
1123 ///
1124 /// assert_eq!(s.next().await, Some(1));
1125 /// assert_eq!(s.next().await, Some(2));
1126 /// assert_eq!(s.next().await, None);
1127 /// # });
1128 /// ```
1129 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1130 where
1131 Self: Sized,
1132 P: FnMut(&Self::Item) -> bool,
1133 {
1134 TakeWhile {
1135 stream: self,
1136 predicate,
1137 }
1138 }
1139
1140 /// Skips the first `n` items of the stream.
1141 ///
1142 /// # Examples
1143 ///
1144 /// ```
1145 /// use futures_lite::stream::{self, StreamExt};
1146 ///
1147 /// # spin_on::spin_on(async {
1148 /// let s = stream::iter(vec![1, 2, 3]);
1149 /// let mut s = s.skip(2);
1150 ///
1151 /// assert_eq!(s.next().await, Some(3));
1152 /// assert_eq!(s.next().await, None);
1153 /// # });
1154 /// ```
1155 fn skip(self, n: usize) -> Skip<Self>
1156 where
1157 Self: Sized,
1158 {
1159 Skip { stream: self, n }
1160 }
1161
1162 /// Skips items while `predicate` returns `true`.
1163 ///
1164 /// # Examples
1165 ///
1166 /// ```
1167 /// use futures_lite::stream::{self, StreamExt};
1168 ///
1169 /// # spin_on::spin_on(async {
1170 /// let s = stream::iter(vec![-1i32, 0, 1]);
1171 /// let mut s = s.skip_while(|x| x.is_negative());
1172 ///
1173 /// assert_eq!(s.next().await, Some(0));
1174 /// assert_eq!(s.next().await, Some(1));
1175 /// assert_eq!(s.next().await, None);
1176 /// # });
1177 /// ```
1178 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1179 where
1180 Self: Sized,
1181 P: FnMut(&Self::Item) -> bool,
1182 {
1183 SkipWhile {
1184 stream: self,
1185 predicate: Some(predicate),
1186 }
1187 }
1188
1189 /// Yields every `step`th item.
1190 ///
1191 /// # Panics
1192 ///
1193 /// This method will panic if the `step` is 0.
1194 ///
1195 /// # Examples
1196 ///
1197 /// ```
1198 /// use futures_lite::stream::{self, StreamExt};
1199 ///
1200 /// # spin_on::spin_on(async {
1201 /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1202 /// let mut s = s.step_by(2);
1203 ///
1204 /// assert_eq!(s.next().await, Some(0));
1205 /// assert_eq!(s.next().await, Some(2));
1206 /// assert_eq!(s.next().await, Some(4));
1207 /// assert_eq!(s.next().await, None);
1208 /// # });
1209 /// ```
1210 fn step_by(self, step: usize) -> StepBy<Self>
1211 where
1212 Self: Sized,
1213 {
1214 assert!(step > 0, "`step` must be greater than zero");
1215 StepBy {
1216 stream: self,
1217 step,
1218 i: 0,
1219 }
1220 }
1221
1222 /// Appends another stream to the end of this one.
1223 ///
1224 /// # Examples
1225 ///
1226 /// ```
1227 /// use futures_lite::stream::{self, StreamExt};
1228 ///
1229 /// # spin_on::spin_on(async {
1230 /// let s1 = stream::iter(vec![1, 2]);
1231 /// let s2 = stream::iter(vec![7, 8]);
1232 /// let mut s = s1.chain(s2);
1233 ///
1234 /// assert_eq!(s.next().await, Some(1));
1235 /// assert_eq!(s.next().await, Some(2));
1236 /// assert_eq!(s.next().await, Some(7));
1237 /// assert_eq!(s.next().await, Some(8));
1238 /// assert_eq!(s.next().await, None);
1239 /// # });
1240 /// ```
1241 fn chain<U>(self, other: U) -> Chain<Self, U>
1242 where
1243 Self: Sized,
1244 U: Stream<Item = Self::Item> + Sized,
1245 {
1246 Chain {
1247 first: self.fuse(),
1248 second: other.fuse(),
1249 }
1250 }
1251
1252 /// Clones all items.
1253 ///
1254 /// # Examples
1255 ///
1256 /// ```
1257 /// use futures_lite::stream::{self, StreamExt};
1258 ///
1259 /// # spin_on::spin_on(async {
1260 /// let s = stream::iter(vec![&1, &2]);
1261 /// let mut s = s.cloned();
1262 ///
1263 /// assert_eq!(s.next().await, Some(1));
1264 /// assert_eq!(s.next().await, Some(2));
1265 /// assert_eq!(s.next().await, None);
1266 /// # });
1267 /// ```
1268 fn cloned<'a, T>(self) -> Cloned<Self>
1269 where
1270 Self: Stream<Item = &'a T> + Sized,
1271 T: Clone + 'a,
1272 {
1273 Cloned { stream: self }
1274 }
1275
1276 /// Copies all items.
1277 ///
1278 /// # Examples
1279 ///
1280 /// ```
1281 /// use futures_lite::stream::{self, StreamExt};
1282 ///
1283 /// # spin_on::spin_on(async {
1284 /// let s = stream::iter(vec![&1, &2]);
1285 /// let mut s = s.copied();
1286 ///
1287 /// assert_eq!(s.next().await, Some(1));
1288 /// assert_eq!(s.next().await, Some(2));
1289 /// assert_eq!(s.next().await, None);
1290 /// # });
1291 /// ```
1292 fn copied<'a, T>(self) -> Copied<Self>
1293 where
1294 Self: Stream<Item = &'a T> + Sized,
1295 T: Copy + 'a,
1296 {
1297 Copied { stream: self }
1298 }
1299
1300 /// Collects all items in the stream into a collection.
1301 ///
1302 /// # Examples
1303 ///
1304 /// ```
1305 /// use futures_lite::stream::{self, StreamExt};
1306 ///
1307 /// # spin_on::spin_on(async {
1308 /// let mut s = stream::iter(1..=3);
1309 ///
1310 /// let items: Vec<_> = s.collect().await;
1311 /// assert_eq!(items, [1, 2, 3]);
1312 /// # });
1313 /// ```
1314 fn collect<C>(self) -> CollectFuture<Self, C>
1315 where
1316 Self: Sized,
1317 C: Default + Extend<Self::Item>,
1318 {
1319 CollectFuture {
1320 stream: self,
1321 collection: Default::default(),
1322 }
1323 }
1324
1325 /// Collects all items in the fallible stream into a collection.
1326 ///
1327 /// ```
1328 /// use futures_lite::stream::{self, StreamExt};
1329 ///
1330 /// # spin_on::spin_on(async {
1331 /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1332 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1333 /// assert_eq!(res, Err(2));
1334 ///
1335 /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1336 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1337 /// assert_eq!(res, Ok(vec![1, 2, 3]));
1338 /// # })
1339 /// ```
1340 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1341 where
1342 Self: Stream<Item = Result<T, E>> + Sized,
1343 C: Default + Extend<T>,
1344 {
1345 TryCollectFuture {
1346 stream: self,
1347 items: Default::default(),
1348 }
1349 }
1350
1351 /// Partitions items into those for which `predicate` is `true` and those for which it is
1352 /// `false`, and then collects them into two collections.
1353 ///
1354 /// # Examples
1355 ///
1356 /// ```
1357 /// use futures_lite::stream::{self, StreamExt};
1358 ///
1359 /// # spin_on::spin_on(async {
1360 /// let s = stream::iter(vec![1, 2, 3]);
1361 /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1362 ///
1363 /// assert_eq!(even, &[2]);
1364 /// assert_eq!(odd, &[1, 3]);
1365 /// # })
1366 /// ```
1367 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1368 where
1369 Self: Sized,
1370 B: Default + Extend<Self::Item>,
1371 P: FnMut(&Self::Item) -> bool,
1372 {
1373 PartitionFuture {
1374 stream: self,
1375 predicate,
1376 res: Some(Default::default()),
1377 }
1378 }
1379
1380 /// Accumulates a computation over the stream.
1381 ///
1382 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1383 /// the accumulator and each item in the stream. The final accumulator value is returned.
1384 ///
1385 /// # Examples
1386 ///
1387 /// ```
1388 /// use futures_lite::stream::{self, StreamExt};
1389 ///
1390 /// # spin_on::spin_on(async {
1391 /// let s = stream::iter(vec![1, 2, 3]);
1392 /// let sum = s.fold(0, |acc, x| acc + x).await;
1393 ///
1394 /// assert_eq!(sum, 6);
1395 /// # })
1396 /// ```
1397 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1398 where
1399 Self: Sized,
1400 F: FnMut(T, Self::Item) -> T,
1401 {
1402 FoldFuture {
1403 stream: self,
1404 f,
1405 acc: Some(init),
1406 }
1407 }
1408
1409 /// Accumulates a fallible computation over the stream.
1410 ///
1411 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1412 /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1413 /// error if `f` failed the computation.
1414 ///
1415 /// # Examples
1416 ///
1417 /// ```
1418 /// use futures_lite::stream::{self, StreamExt};
1419 ///
1420 /// # spin_on::spin_on(async {
1421 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1422 ///
1423 /// let sum = s.try_fold(0, |acc, v| {
1424 /// if (acc + v) % 2 == 1 {
1425 /// Ok(acc + v)
1426 /// } else {
1427 /// Err("fail")
1428 /// }
1429 /// })
1430 /// .await;
1431 ///
1432 /// assert_eq!(sum, Err("fail"));
1433 /// # })
1434 /// ```
1435 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1436 where
1437 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1438 F: FnMut(B, T) -> Result<B, E>,
1439 {
1440 TryFoldFuture {
1441 stream: self,
1442 f,
1443 acc: Some(init),
1444 }
1445 }
1446
1447 /// Maps items of the stream to new values using a state value and a closure.
1448 ///
1449 /// Scanning begins with the initial state set to `initial_state`, and then applies `f` to the
1450 /// state and each item in the stream. The stream stops when `f` returns `None`.
1451 ///
1452 /// # Examples
1453 ///
1454 /// ```
1455 /// use futures_lite::stream::{self, StreamExt};
1456 ///
1457 /// # spin_on::spin_on(async {
1458 /// let s = stream::iter(vec![1, 2, 3]);
1459 /// let mut s = s.scan(1, |state, x| {
1460 /// *state = *state * x;
1461 /// Some(-*state)
1462 /// });
1463 ///
1464 /// assert_eq!(s.next().await, Some(-1));
1465 /// assert_eq!(s.next().await, Some(-2));
1466 /// assert_eq!(s.next().await, Some(-6));
1467 /// assert_eq!(s.next().await, None);
1468 /// # })
1469 /// ```
1470 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1471 where
1472 Self: Sized,
1473 F: FnMut(&mut St, Self::Item) -> Option<B>,
1474 {
1475 Scan {
1476 stream: self,
1477 state_f: (initial_state, f),
1478 }
1479 }
1480
1481 /// Fuses the stream so that it stops yielding items after the first [`None`].
1482 ///
1483 /// # Examples
1484 ///
1485 /// ```
1486 /// use futures_lite::stream::{self, StreamExt};
1487 ///
1488 /// # spin_on::spin_on(async {
1489 /// let mut s = stream::once(1).fuse();
1490 ///
1491 /// assert_eq!(s.next().await, Some(1));
1492 /// assert_eq!(s.next().await, None);
1493 /// assert_eq!(s.next().await, None);
1494 /// # })
1495 /// ```
1496 fn fuse(self) -> Fuse<Self>
1497 where
1498 Self: Sized,
1499 {
1500 Fuse {
1501 stream: self,
1502 done: false,
1503 }
1504 }
1505
1506 /// Repeats the stream from beginning to end, forever.
1507 ///
1508 /// # Examples
1509 ///
1510 /// ```
1511 /// use futures_lite::stream::{self, StreamExt};
1512 ///
1513 /// # spin_on::spin_on(async {
1514 /// let mut s = stream::iter(vec![1, 2]).cycle();
1515 ///
1516 /// assert_eq!(s.next().await, Some(1));
1517 /// assert_eq!(s.next().await, Some(2));
1518 /// assert_eq!(s.next().await, Some(1));
1519 /// assert_eq!(s.next().await, Some(2));
1520 /// # });
1521 /// ```
1522 fn cycle(self) -> Cycle<Self>
1523 where
1524 Self: Clone + Sized,
1525 {
1526 Cycle {
1527 orig: self.clone(),
1528 stream: self,
1529 }
1530 }
1531
1532 /// Enumerates items, mapping them to `(index, item)`.
1533 ///
1534 /// # Examples
1535 ///
1536 /// ```
1537 /// use futures_lite::stream::{self, StreamExt};
1538 ///
1539 /// # spin_on::spin_on(async {
1540 /// let s = stream::iter(vec!['a', 'b', 'c']);
1541 /// let mut s = s.enumerate();
1542 ///
1543 /// assert_eq!(s.next().await, Some((0, 'a')));
1544 /// assert_eq!(s.next().await, Some((1, 'b')));
1545 /// assert_eq!(s.next().await, Some((2, 'c')));
1546 /// assert_eq!(s.next().await, None);
1547 /// # });
1548 /// ```
1549 fn enumerate(self) -> Enumerate<Self>
1550 where
1551 Self: Sized,
1552 {
1553 Enumerate { stream: self, i: 0 }
1554 }
1555
1556 /// Calls a closure on each item and passes it on.
1557 ///
1558 /// # Examples
1559 ///
1560 /// ```
1561 /// use futures_lite::stream::{self, StreamExt};
1562 ///
1563 /// # spin_on::spin_on(async {
1564 /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1565 ///
1566 /// let sum = s
1567 /// .inspect(|x| println!("about to filter {}", x))
1568 /// .filter(|x| x % 2 == 0)
1569 /// .inspect(|x| println!("made it through filter: {}", x))
1570 /// .fold(0, |sum, i| sum + i)
1571 /// .await;
1572 /// # });
1573 /// ```
1574 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1575 where
1576 Self: Sized,
1577 F: FnMut(&Self::Item),
1578 {
1579 Inspect { stream: self, f }
1580 }
1581
1582 /// Gets the `n`th item of the stream.
1583 ///
1584 /// In the end, `n+1` items of the stream will be consumed.
1585 ///
1586 /// # Examples
1587 ///
1588 /// ```
1589 /// use futures_lite::stream::{self, StreamExt};
1590 ///
1591 /// # spin_on::spin_on(async {
1592 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1593 ///
1594 /// assert_eq!(s.nth(2).await, Some(2));
1595 /// assert_eq!(s.nth(2).await, Some(5));
1596 /// assert_eq!(s.nth(2).await, None);
1597 /// # });
1598 /// ```
1599 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1600 where
1601 Self: Unpin,
1602 {
1603 NthFuture { stream: self, n }
1604 }
1605
1606 /// Returns the last item in the stream.
1607 ///
1608 /// # Examples
1609 ///
1610 /// ```
1611 /// use futures_lite::stream::{self, StreamExt};
1612 ///
1613 /// # spin_on::spin_on(async {
1614 /// let s = stream::iter(vec![1, 2, 3, 4]);
1615 /// assert_eq!(s.last().await, Some(4));
1616 ///
1617 /// let s = stream::empty::<i32>();
1618 /// assert_eq!(s.last().await, None);
1619 /// # });
1620 /// ```
1621 fn last(self) -> LastFuture<Self>
1622 where
1623 Self: Sized,
1624 {
1625 LastFuture {
1626 stream: self,
1627 last: None,
1628 }
1629 }
1630
1631 /// Finds the first item of the stream for which `predicate` returns `true`.
1632 ///
1633 /// # Examples
1634 ///
1635 /// ```
1636 /// use futures_lite::stream::{self, StreamExt};
1637 ///
1638 /// # spin_on::spin_on(async {
1639 /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1640 ///
1641 /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1642 /// assert_eq!(s.next().await, Some(13));
1643 /// # });
1644 /// ```
1645 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1646 where
1647 Self: Unpin,
1648 P: FnMut(&Self::Item) -> bool,
1649 {
1650 FindFuture {
1651 stream: self,
1652 predicate,
1653 }
1654 }
1655
1656 /// Applies a closure to items in the stream and returns the first [`Some`] result.
1657 ///
1658 /// # Examples
1659 ///
1660 /// ```
1661 /// use futures_lite::stream::{self, StreamExt};
1662 ///
1663 /// # spin_on::spin_on(async {
1664 /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1665 /// let number = s.find_map(|s| s.parse().ok()).await;
1666 ///
1667 /// assert_eq!(number, Some(2));
1668 /// # });
1669 /// ```
1670 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1671 where
1672 Self: Unpin,
1673 F: FnMut(Self::Item) -> Option<B>,
1674 {
1675 FindMapFuture { stream: self, f }
1676 }
1677
1678 /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1679 ///
1680 /// # Examples
1681 ///
1682 /// ```
1683 /// use futures_lite::stream::{self, StreamExt};
1684 ///
1685 /// # spin_on::spin_on(async {
1686 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1687 ///
1688 /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1689 /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1690 /// assert_eq!(s.position(|x| x == 9).await, None);
1691 /// # });
1692 /// ```
1693 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1694 where
1695 Self: Unpin,
1696 P: FnMut(Self::Item) -> bool,
1697 {
1698 PositionFuture {
1699 stream: self,
1700 predicate,
1701 index: 0,
1702 }
1703 }
1704
1705 /// Tests if `predicate` returns `true` for all items in the stream.
1706 ///
1707 /// The result is `true` for an empty stream.
1708 ///
1709 /// # Examples
1710 ///
1711 /// ```
1712 /// use futures_lite::stream::{self, StreamExt};
1713 ///
1714 /// # spin_on::spin_on(async {
1715 /// let mut s = stream::iter(vec![1, 2, 3]);
1716 /// assert!(!s.all(|x| x % 2 == 0).await);
1717 ///
1718 /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1719 /// assert!(s.all(|x| x % 2 == 0).await);
1720 ///
1721 /// let mut s = stream::empty::<i32>();
1722 /// assert!(s.all(|x| x % 2 == 0).await);
1723 /// # });
1724 /// ```
1725 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1726 where
1727 Self: Unpin,
1728 P: FnMut(Self::Item) -> bool,
1729 {
1730 AllFuture {
1731 stream: self,
1732 predicate,
1733 }
1734 }
1735
1736 /// Tests if `predicate` returns `true` for any item in the stream.
1737 ///
1738 /// The result is `false` for an empty stream.
1739 ///
1740 /// # Examples
1741 ///
1742 /// ```
1743 /// use futures_lite::stream::{self, StreamExt};
1744 ///
1745 /// # spin_on::spin_on(async {
1746 /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1747 /// assert!(!s.any(|x| x % 2 == 0).await);
1748 ///
1749 /// let mut s = stream::iter(vec![1, 2, 3]);
1750 /// assert!(s.any(|x| x % 2 == 0).await);
1751 ///
1752 /// let mut s = stream::empty::<i32>();
1753 /// assert!(!s.any(|x| x % 2 == 0).await);
1754 /// # });
1755 /// ```
1756 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1757 where
1758 Self: Unpin,
1759 P: FnMut(Self::Item) -> bool,
1760 {
1761 AnyFuture {
1762 stream: self,
1763 predicate,
1764 }
1765 }
1766
1767 /// Calls a closure on each item of the stream.
1768 ///
1769 /// # Examples
1770 ///
1771 /// ```
1772 /// use futures_lite::stream::{self, StreamExt};
1773 ///
1774 /// # spin_on::spin_on(async {
1775 /// let mut s = stream::iter(vec![1, 2, 3]);
1776 /// s.for_each(|s| println!("{}", s)).await;
1777 /// # });
1778 /// ```
1779 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1780 where
1781 Self: Sized,
1782 F: FnMut(Self::Item),
1783 {
1784 ForEachFuture { stream: self, f }
1785 }
1786
1787 /// Calls a fallible closure on each item of the stream, stopping on first error.
1788 ///
1789 /// # Examples
1790 ///
1791 /// ```
1792 /// use futures_lite::stream::{self, StreamExt};
1793 ///
1794 /// # spin_on::spin_on(async {
1795 /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1796 ///
1797 /// let mut v = vec![];
1798 /// let res = s
1799 /// .try_for_each(|n| {
1800 /// if n < 2 {
1801 /// v.push(n);
1802 /// Ok(())
1803 /// } else {
1804 /// Err("too big")
1805 /// }
1806 /// })
1807 /// .await;
1808 ///
1809 /// assert_eq!(v, &[0, 1]);
1810 /// assert_eq!(res, Err("too big"));
1811 /// # });
1812 /// ```
1813 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1814 where
1815 Self: Unpin,
1816 F: FnMut(Self::Item) -> Result<(), E>,
1817 {
1818 TryForEachFuture { stream: self, f }
1819 }
1820
1821 /// Zips up two streams into a single stream of pairs.
1822 ///
1823 /// The stream of pairs stops when either of the original two streams is exhausted.
1824 ///
1825 /// # Examples
1826 ///
1827 /// ```
1828 /// use futures_lite::stream::{self, StreamExt};
1829 ///
1830 /// # spin_on::spin_on(async {
1831 /// let l = stream::iter(vec![1, 2, 3]);
1832 /// let r = stream::iter(vec![4, 5, 6, 7]);
1833 /// let mut s = l.zip(r);
1834 ///
1835 /// assert_eq!(s.next().await, Some((1, 4)));
1836 /// assert_eq!(s.next().await, Some((2, 5)));
1837 /// assert_eq!(s.next().await, Some((3, 6)));
1838 /// assert_eq!(s.next().await, None);
1839 /// # });
1840 /// ```
1841 fn zip<U>(self, other: U) -> Zip<Self, U>
1842 where
1843 Self: Sized,
1844 U: Stream,
1845 {
1846 Zip {
1847 item_slot: None,
1848 first: self,
1849 second: other,
1850 }
1851 }
1852
1853 /// Collects a stream of pairs into a pair of collections.
1854 ///
1855 /// # Examples
1856 ///
1857 /// ```
1858 /// use futures_lite::stream::{self, StreamExt};
1859 ///
1860 /// # spin_on::spin_on(async {
1861 /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1862 /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1863 ///
1864 /// assert_eq!(left, [1, 3]);
1865 /// assert_eq!(right, [2, 4]);
1866 /// # });
1867 /// ```
1868 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1869 where
1870 FromA: Default + Extend<A>,
1871 FromB: Default + Extend<B>,
1872 Self: Stream<Item = (A, B)> + Sized,
1873 {
1874 UnzipFuture {
1875 stream: self,
1876 res: Some(Default::default()),
1877 }
1878 }
1879
1880 /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1881 ///
1882 /// # Examples
1883 ///
1884 /// ```
1885 /// use futures_lite::stream::{self, StreamExt};
1886 /// use futures_lite::stream::{once, pending};
1887 ///
1888 /// # spin_on::spin_on(async {
1889 /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1890 /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1891 ///
1892 /// // The first future wins.
1893 /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1894 /// # })
1895 /// ```
1896 fn or<S>(self, other: S) -> Or<Self, S>
1897 where
1898 Self: Sized,
1899 S: Stream<Item = Self::Item>,
1900 {
1901 Or {
1902 stream1: self,
1903 stream2: other,
1904 }
1905 }
1906
1907 /// Merges with `other` stream, with no preference for either stream when both are ready.
1908 ///
1909 /// # Examples
1910 ///
1911 /// ```
1912 /// use futures_lite::stream::{self, StreamExt};
1913 /// use futures_lite::stream::{once, pending};
1914 ///
1915 /// # spin_on::spin_on(async {
1916 /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1917 /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1918 ///
1919 /// // One of the two stream is randomly chosen as the winner.
1920 /// let res = once(1).race(once(2)).next().await;
1921 /// # })
1922 /// ```
1923 #[cfg(all(feature = "std", feature = "race"))]
1924 fn race<S>(self, other: S) -> Race<Self, S>
1925 where
1926 Self: Sized,
1927 S: Stream<Item = Self::Item>,
1928 {
1929 Race {
1930 stream1: self,
1931 stream2: other,
1932 rng: Rng::new(),
1933 }
1934 }
1935
1936 /// Yields all immediately available values from a stream.
1937 ///
1938 /// This is intended to be used as a way of polling a stream without waiting, similar to the
1939 /// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
1940 /// on an [`async_channel::Receiver`] will return all messages that are currently in the
1941 /// channel, but will not wait for new messages.
1942 ///
1943 /// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
1944 /// polling context in order to poll the underlying stream. Since this stream will never return
1945 /// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
1946 /// [`Iterator`].
1947 ///
1948 /// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
1949 /// the future if it is polled again.
1950 ///
1951 /// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
1952 /// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
1953 /// [`Stream`]: crate::stream::Stream
1954 /// [`Iterator`]: std::iter::Iterator
1955 ///
1956 /// # Examples
1957 ///
1958 /// ```
1959 /// use futures_lite::{future, pin};
1960 /// use futures_lite::stream::{self, StreamExt};
1961 ///
1962 /// # #[cfg(feature = "std")] {
1963 /// // A stream that yields two values, returns `Pending`, and then yields one more value.
1964 /// let pend_once = stream::once_future(async {
1965 /// future::yield_now().await;
1966 /// 3
1967 /// });
1968 /// let s = stream::iter(vec![1, 2]).chain(pend_once);
1969 /// pin!(s);
1970 ///
1971 /// // This will return the first two values, and then `None` because the stream returns
1972 /// // `Pending` after that.
1973 /// let mut iter = stream::block_on(s.drain());
1974 /// assert_eq!(iter.next(), Some(1));
1975 /// assert_eq!(iter.next(), Some(2));
1976 /// assert_eq!(iter.next(), None);
1977 ///
1978 /// // This will return the last value, because the stream returns `Ready` when polled.
1979 /// assert_eq!(iter.next(), Some(3));
1980 /// assert_eq!(iter.next(), None);
1981 /// # }
1982 /// ```
1983 fn drain(&mut self) -> Drain<'_, Self> {
1984 Drain { stream: self }
1985 }
1986
1987 /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
1988 ///
1989 /// # Examples
1990 ///
1991 /// ```
1992 /// use futures_lite::stream::{self, StreamExt};
1993 ///
1994 /// # spin_on::spin_on(async {
1995 /// let a = stream::once(1);
1996 /// let b = stream::empty();
1997 ///
1998 /// // Streams of different types can be stored in
1999 /// // the same collection when they are boxed:
2000 /// let streams = vec![a.boxed(), b.boxed()];
2001 /// # })
2002 /// ```
2003 #[cfg(feature = "alloc")]
2004 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
2005 where
2006 Self: Send + Sized + 'a,
2007 {
2008 Box::pin(self)
2009 }
2010
2011 /// Boxes the stream and changes its type to `dyn Stream + 'a`.
2012 ///
2013 /// # Examples
2014 ///
2015 /// ```
2016 /// use futures_lite::stream::{self, StreamExt};
2017 ///
2018 /// # spin_on::spin_on(async {
2019 /// let a = stream::once(1);
2020 /// let b = stream::empty();
2021 ///
2022 /// // Streams of different types can be stored in
2023 /// // the same collection when they are boxed:
2024 /// let streams = vec![a.boxed_local(), b.boxed_local()];
2025 /// # })
2026 /// ```
2027 #[cfg(feature = "alloc")]
2028 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
2029 where
2030 Self: Sized + 'a,
2031 {
2032 Box::pin(self)
2033 }
2034}
2035
2036impl<S: Stream + ?Sized> StreamExt for S {}
2037
2038/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
2039///
2040/// # Examples
2041///
2042/// ```
2043/// use futures_lite::stream::{self, StreamExt};
2044///
2045/// // These two lines are equivalent:
2046/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
2047/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
2048/// ```
2049#[cfg(feature = "alloc")]
2050pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
2051
2052/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
2053///
2054/// # Examples
2055///
2056/// ```
2057/// use futures_lite::stream::{self, StreamExt};
2058///
2059/// // These two lines are equivalent:
2060/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
2061/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
2062/// ```
2063#[cfg(feature = "alloc")]
2064pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
2065
2066/// Future for the [`StreamExt::next()`] method.
2067#[derive(Debug)]
2068#[must_use = "futures do nothing unless you `.await` or poll them"]
2069pub struct NextFuture<'a, S: ?Sized> {
2070 stream: &'a mut S,
2071}
2072
2073impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
2074
2075impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
2076 type Output = Option<S::Item>;
2077
2078 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2079 self.stream.poll_next(cx)
2080 }
2081}
2082
2083/// Future for the [`StreamExt::try_next()`] method.
2084#[derive(Debug)]
2085#[must_use = "futures do nothing unless you `.await` or poll them"]
2086pub struct TryNextFuture<'a, S: ?Sized> {
2087 stream: &'a mut S,
2088}
2089
2090impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
2091
2092impl<T, E, S> Future for TryNextFuture<'_, S>
2093where
2094 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
2095{
2096 type Output = Result<Option<T>, E>;
2097
2098 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2099 let res = ready!(self.stream.poll_next(cx));
2100 Poll::Ready(res.transpose())
2101 }
2102}
2103
2104pin_project! {
2105 /// Future for the [`StreamExt::count()`] method.
2106 #[derive(Debug)]
2107 #[must_use = "futures do nothing unless you `.await` or poll them"]
2108 pub struct CountFuture<S: ?Sized> {
2109 count: usize,
2110 #[pin]
2111 stream: S,
2112 }
2113}
2114
2115impl<S: Stream + ?Sized> Future for CountFuture<S> {
2116 type Output = usize;
2117
2118 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2119 loop {
2120 match ready!(self.as_mut().project().stream.poll_next(cx)) {
2121 None => return Poll::Ready(self.count),
2122 Some(_) => *self.as_mut().project().count += 1,
2123 }
2124 }
2125 }
2126}
2127
2128pin_project! {
2129 /// Future for the [`StreamExt::collect()`] method.
2130 #[derive(Debug)]
2131 #[must_use = "futures do nothing unless you `.await` or poll them"]
2132 pub struct CollectFuture<S, C> {
2133 #[pin]
2134 stream: S,
2135 collection: C,
2136 }
2137}
2138
2139impl<S, C> Future for CollectFuture<S, C>
2140where
2141 S: Stream,
2142 C: Default + Extend<S::Item>,
2143{
2144 type Output = C;
2145
2146 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
2147 let mut this = self.as_mut().project();
2148 loop {
2149 match ready!(this.stream.as_mut().poll_next(cx)) {
2150 Some(e) => this.collection.extend(Some(e)),
2151 None => return Poll::Ready(mem::take(self.project().collection)),
2152 }
2153 }
2154 }
2155}
2156
2157pin_project! {
2158 /// Future for the [`StreamExt::try_collect()`] method.
2159 #[derive(Debug)]
2160 #[must_use = "futures do nothing unless you `.await` or poll them"]
2161 pub struct TryCollectFuture<S, C> {
2162 #[pin]
2163 stream: S,
2164 items: C,
2165 }
2166}
2167
2168impl<T, E, S, C> Future for TryCollectFuture<S, C>
2169where
2170 S: Stream<Item = Result<T, E>>,
2171 C: Default + Extend<T>,
2172{
2173 type Output = Result<C, E>;
2174
2175 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2176 let mut this = self.project();
2177 Poll::Ready(Ok(loop {
2178 match ready!(this.stream.as_mut().poll_next(cx)?) {
2179 Some(x) => this.items.extend(Some(x)),
2180 None => break mem::take(this.items),
2181 }
2182 }))
2183 }
2184}
2185
2186pin_project! {
2187 /// Future for the [`StreamExt::partition()`] method.
2188 #[derive(Debug)]
2189 #[must_use = "futures do nothing unless you `.await` or poll them"]
2190 pub struct PartitionFuture<S, P, B> {
2191 #[pin]
2192 stream: S,
2193 predicate: P,
2194 res: Option<(B, B)>,
2195 }
2196}
2197
2198impl<S, P, B> Future for PartitionFuture<S, P, B>
2199where
2200 S: Stream + Sized,
2201 P: FnMut(&S::Item) -> bool,
2202 B: Default + Extend<S::Item>,
2203{
2204 type Output = (B, B);
2205
2206 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2207 let mut this = self.project();
2208 loop {
2209 match ready!(this.stream.as_mut().poll_next(cx)) {
2210 Some(v) => {
2211 let res = this.res.as_mut().unwrap();
2212 if (this.predicate)(&v) {
2213 res.0.extend(Some(v))
2214 } else {
2215 res.1.extend(Some(v))
2216 }
2217 }
2218 None => return Poll::Ready(this.res.take().unwrap()),
2219 }
2220 }
2221 }
2222}
2223
2224pin_project! {
2225 /// Future for the [`StreamExt::fold()`] method.
2226 #[derive(Debug)]
2227 #[must_use = "futures do nothing unless you `.await` or poll them"]
2228 pub struct FoldFuture<S, F, T> {
2229 #[pin]
2230 stream: S,
2231 f: F,
2232 acc: Option<T>,
2233 }
2234}
2235
2236impl<S, F, T> Future for FoldFuture<S, F, T>
2237where
2238 S: Stream,
2239 F: FnMut(T, S::Item) -> T,
2240{
2241 type Output = T;
2242
2243 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2244 let mut this = self.project();
2245 loop {
2246 match ready!(this.stream.as_mut().poll_next(cx)) {
2247 Some(v) => {
2248 let old = this.acc.take().unwrap();
2249 let new = (this.f)(old, v);
2250 *this.acc = Some(new);
2251 }
2252 None => return Poll::Ready(this.acc.take().unwrap()),
2253 }
2254 }
2255 }
2256}
2257
2258/// Future for the [`StreamExt::try_fold()`] method.
2259#[derive(Debug)]
2260#[must_use = "futures do nothing unless you `.await` or poll them"]
2261pub struct TryFoldFuture<'a, S, F, B> {
2262 stream: &'a mut S,
2263 f: F,
2264 acc: Option<B>,
2265}
2266
2267impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
2268
2269impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
2270where
2271 S: Stream<Item = Result<T, E>> + Unpin,
2272 F: FnMut(B, T) -> Result<B, E>,
2273{
2274 type Output = Result<B, E>;
2275
2276 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2277 loop {
2278 match ready!(self.stream.poll_next(cx)) {
2279 Some(Err(e)) => return Poll::Ready(Err(e)),
2280 Some(Ok(t)) => {
2281 let old = self.acc.take().unwrap();
2282 let new = (&mut self.f)(old, t);
2283
2284 match new {
2285 Ok(t) => self.acc = Some(t),
2286 Err(e) => return Poll::Ready(Err(e)),
2287 }
2288 }
2289 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2290 }
2291 }
2292 }
2293}
2294
2295pin_project! {
2296 /// Stream for the [`StreamExt::scan()`] method.
2297 #[derive(Clone, Debug)]
2298 #[must_use = "streams do nothing unless polled"]
2299 pub struct Scan<S, St, F> {
2300 #[pin]
2301 stream: S,
2302 state_f: (St, F),
2303 }
2304}
2305
2306impl<S, St, F, B> Stream for Scan<S, St, F>
2307where
2308 S: Stream,
2309 F: FnMut(&mut St, S::Item) -> Option<B>,
2310{
2311 type Item = B;
2312
2313 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2314 let mut this = self.project();
2315 this.stream.as_mut().poll_next(cx).map(|item| {
2316 item.and_then(|item| {
2317 let (state, f) = this.state_f;
2318 f(state, item)
2319 })
2320 })
2321 }
2322}
2323
2324pin_project! {
2325 /// Stream for the [`StreamExt::fuse()`] method.
2326 #[derive(Clone, Debug)]
2327 #[must_use = "streams do nothing unless polled"]
2328 pub struct Fuse<S> {
2329 #[pin]
2330 stream: S,
2331 done: bool,
2332 }
2333}
2334
2335impl<S: Stream> Stream for Fuse<S> {
2336 type Item = S::Item;
2337
2338 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2339 let this = self.project();
2340
2341 if *this.done {
2342 Poll::Ready(None)
2343 } else {
2344 let next = ready!(this.stream.poll_next(cx));
2345 if next.is_none() {
2346 *this.done = true;
2347 }
2348 Poll::Ready(next)
2349 }
2350 }
2351}
2352
2353pin_project! {
2354 /// Stream for the [`StreamExt::map()`] method.
2355 #[derive(Clone, Debug)]
2356 #[must_use = "streams do nothing unless polled"]
2357 pub struct Map<S, F> {
2358 #[pin]
2359 stream: S,
2360 f: F,
2361 }
2362}
2363
2364impl<S, F, T> Stream for Map<S, F>
2365where
2366 S: Stream,
2367 F: FnMut(S::Item) -> T,
2368{
2369 type Item = T;
2370
2371 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2372 let this = self.project();
2373 let next = ready!(this.stream.poll_next(cx));
2374 Poll::Ready(next.map(this.f))
2375 }
2376
2377 fn size_hint(&self) -> (usize, Option<usize>) {
2378 self.stream.size_hint()
2379 }
2380}
2381
2382pin_project! {
2383 /// Stream for the [`StreamExt::flat_map()`] method.
2384 #[derive(Clone, Debug)]
2385 #[must_use = "streams do nothing unless polled"]
2386 pub struct FlatMap<S, U, F> {
2387 #[pin]
2388 stream: Map<S, F>,
2389 #[pin]
2390 inner_stream: Option<U>,
2391 }
2392}
2393
2394impl<S, U, F> Stream for FlatMap<S, U, F>
2395where
2396 S: Stream,
2397 U: Stream,
2398 F: FnMut(S::Item) -> U,
2399{
2400 type Item = U::Item;
2401
2402 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2403 let mut this = self.project();
2404 loop {
2405 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2406 match ready!(inner.poll_next(cx)) {
2407 Some(item) => return Poll::Ready(Some(item)),
2408 None => this.inner_stream.set(None),
2409 }
2410 }
2411
2412 match ready!(this.stream.as_mut().poll_next(cx)) {
2413 Some(stream) => this.inner_stream.set(Some(stream)),
2414 None => return Poll::Ready(None),
2415 }
2416 }
2417 }
2418}
2419
2420pin_project! {
2421 /// Stream for the [`StreamExt::flatten()`] method.
2422 #[derive(Clone, Debug)]
2423 #[must_use = "streams do nothing unless polled"]
2424 pub struct Flatten<S: Stream> {
2425 #[pin]
2426 stream: S,
2427 #[pin]
2428 inner_stream: Option<S::Item>,
2429 }
2430}
2431
2432impl<S, U> Stream for Flatten<S>
2433where
2434 S: Stream<Item = U>,
2435 U: Stream,
2436{
2437 type Item = U::Item;
2438
2439 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2440 let mut this = self.project();
2441 loop {
2442 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2443 match ready!(inner.poll_next(cx)) {
2444 Some(item) => return Poll::Ready(Some(item)),
2445 None => this.inner_stream.set(None),
2446 }
2447 }
2448
2449 match ready!(this.stream.as_mut().poll_next(cx)) {
2450 Some(inner) => this.inner_stream.set(Some(inner)),
2451 None => return Poll::Ready(None),
2452 }
2453 }
2454 }
2455}
2456
2457pin_project! {
2458 /// Stream for the [`StreamExt::then()`] method.
2459 #[derive(Clone, Debug)]
2460 #[must_use = "streams do nothing unless polled"]
2461 pub struct Then<S, F, Fut> {
2462 #[pin]
2463 stream: S,
2464 #[pin]
2465 future: Option<Fut>,
2466 f: F,
2467 }
2468}
2469
2470impl<S, F, Fut> Stream for Then<S, F, Fut>
2471where
2472 S: Stream,
2473 F: FnMut(S::Item) -> Fut,
2474 Fut: Future,
2475{
2476 type Item = Fut::Output;
2477
2478 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2479 let mut this = self.project();
2480
2481 loop {
2482 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2483 let item = ready!(fut.poll(cx));
2484 this.future.set(None);
2485 return Poll::Ready(Some(item));
2486 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2487 this.future.set(Some((this.f)(item)));
2488 } else {
2489 return Poll::Ready(None);
2490 }
2491 }
2492 }
2493
2494 fn size_hint(&self) -> (usize, Option<usize>) {
2495 let future_len = self.future.is_some() as usize;
2496 let (lower, upper) = self.stream.size_hint();
2497 let lower = lower.saturating_add(future_len);
2498 let upper = upper.and_then(|u| u.checked_add(future_len));
2499 (lower, upper)
2500 }
2501}
2502
2503pin_project! {
2504 /// Stream for the [`StreamExt::filter()`] method.
2505 #[derive(Clone, Debug)]
2506 #[must_use = "streams do nothing unless polled"]
2507 pub struct Filter<S, P> {
2508 #[pin]
2509 stream: S,
2510 predicate: P,
2511 }
2512}
2513
2514impl<S, P> Stream for Filter<S, P>
2515where
2516 S: Stream,
2517 P: FnMut(&S::Item) -> bool,
2518{
2519 type Item = S::Item;
2520
2521 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2522 let mut this = self.project();
2523 loop {
2524 match ready!(this.stream.as_mut().poll_next(cx)) {
2525 None => return Poll::Ready(None),
2526 Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2527 Some(_) => {}
2528 }
2529 }
2530 }
2531
2532 fn size_hint(&self) -> (usize, Option<usize>) {
2533 let (_, hi) = self.stream.size_hint();
2534
2535 // If the filter matches all of the elements, it will match the stream's upper bound.
2536 // If the filter matches none of the elements, there will be zero returned values.
2537 (0, hi)
2538 }
2539}
2540
2541/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2542///
2543/// # Examples
2544///
2545/// ```
2546/// use futures_lite::stream::{self, once, pending, StreamExt};
2547///
2548/// # spin_on::spin_on(async {
2549/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2550/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2551///
2552/// // The first stream wins.
2553/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2554/// # })
2555/// ```
2556pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2557where
2558 S1: Stream<Item = T>,
2559 S2: Stream<Item = T>,
2560{
2561 Or { stream1, stream2 }
2562}
2563
2564pin_project! {
2565 /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2566 #[derive(Clone, Debug)]
2567 #[must_use = "streams do nothing unless polled"]
2568 pub struct Or<S1, S2> {
2569 #[pin]
2570 stream1: S1,
2571 #[pin]
2572 stream2: S2,
2573 }
2574}
2575
2576impl<T, S1, S2> Stream for Or<S1, S2>
2577where
2578 S1: Stream<Item = T>,
2579 S2: Stream<Item = T>,
2580{
2581 type Item = T;
2582
2583 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2584 let mut this = self.project();
2585
2586 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2587 return Poll::Ready(Some(t));
2588 }
2589 this.stream2.as_mut().poll_next(cx)
2590 }
2591}
2592
2593/// Merges two streams, with no preference for either stream when both are ready.
2594///
2595/// # Examples
2596///
2597/// ```
2598/// use futures_lite::stream::{self, once, pending, StreamExt};
2599///
2600/// # spin_on::spin_on(async {
2601/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2602/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2603///
2604/// // One of the two stream is randomly chosen as the winner.
2605/// let res = stream::race(once(1), once(2)).next().await;
2606/// # })
2607/// ```
2608#[cfg(all(feature = "std", feature = "race"))]
2609pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2610where
2611 S1: Stream<Item = T>,
2612 S2: Stream<Item = T>,
2613{
2614 Race {
2615 stream1,
2616 stream2,
2617 rng: Rng::new(),
2618 }
2619}
2620
2621/// Races two streams, but with a user-provided seed for randomness.
2622///
2623/// # Examples
2624///
2625/// ```
2626/// use futures_lite::stream::{self, once, pending, StreamExt};
2627///
2628/// // A fixed seed is used for reproducibility.
2629/// const SEED: u64 = 123;
2630///
2631/// # spin_on::spin_on(async {
2632/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
2633/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
2634///
2635/// // One of the two stream is randomly chosen as the winner.
2636/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
2637/// # })
2638/// ```
2639#[cfg(feature = "race")]
2640pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
2641where
2642 S1: Stream<Item = T>,
2643 S2: Stream<Item = T>,
2644{
2645 Race {
2646 stream1,
2647 stream2,
2648 rng: Rng::with_seed(seed),
2649 }
2650}
2651
2652#[cfg(feature = "race")]
2653pin_project! {
2654 /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2655 #[derive(Clone, Debug)]
2656 #[must_use = "streams do nothing unless polled"]
2657 pub struct Race<S1, S2> {
2658 #[pin]
2659 stream1: S1,
2660 #[pin]
2661 stream2: S2,
2662 rng: Rng,
2663 }
2664}
2665
2666#[cfg(feature = "race")]
2667impl<T, S1, S2> Stream for Race<S1, S2>
2668where
2669 S1: Stream<Item = T>,
2670 S2: Stream<Item = T>,
2671{
2672 type Item = T;
2673
2674 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2675 let mut this = self.project();
2676
2677 if this.rng.bool() {
2678 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2679 return Poll::Ready(Some(t));
2680 }
2681 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2682 return Poll::Ready(Some(t));
2683 }
2684 } else {
2685 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2686 return Poll::Ready(Some(t));
2687 }
2688 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2689 return Poll::Ready(Some(t));
2690 }
2691 }
2692 Poll::Pending
2693 }
2694}
2695
2696pin_project! {
2697 /// Stream for the [`StreamExt::filter_map()`] method.
2698 #[derive(Clone, Debug)]
2699 #[must_use = "streams do nothing unless polled"]
2700 pub struct FilterMap<S, F> {
2701 #[pin]
2702 stream: S,
2703 f: F,
2704 }
2705}
2706
2707impl<S, F, T> Stream for FilterMap<S, F>
2708where
2709 S: Stream,
2710 F: FnMut(S::Item) -> Option<T>,
2711{
2712 type Item = T;
2713
2714 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2715 let mut this = self.project();
2716 loop {
2717 match ready!(this.stream.as_mut().poll_next(cx)) {
2718 None => return Poll::Ready(None),
2719 Some(v) => {
2720 if let Some(t) = (this.f)(v) {
2721 return Poll::Ready(Some(t));
2722 }
2723 }
2724 }
2725 }
2726 }
2727}
2728
2729pin_project! {
2730 /// Stream for the [`StreamExt::take()`] method.
2731 #[derive(Clone, Debug)]
2732 #[must_use = "streams do nothing unless polled"]
2733 pub struct Take<S> {
2734 #[pin]
2735 stream: S,
2736 n: usize,
2737 }
2738}
2739
2740impl<S: Stream> Stream for Take<S> {
2741 type Item = S::Item;
2742
2743 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2744 let this = self.project();
2745
2746 if *this.n == 0 {
2747 Poll::Ready(None)
2748 } else {
2749 let next = ready!(this.stream.poll_next(cx));
2750 match next {
2751 Some(_) => *this.n -= 1,
2752 None => *this.n = 0,
2753 }
2754 Poll::Ready(next)
2755 }
2756 }
2757}
2758
2759pin_project! {
2760 /// Stream for the [`StreamExt::take_while()`] method.
2761 #[derive(Clone, Debug)]
2762 #[must_use = "streams do nothing unless polled"]
2763 pub struct TakeWhile<S, P> {
2764 #[pin]
2765 stream: S,
2766 predicate: P,
2767 }
2768}
2769
2770impl<S, P> Stream for TakeWhile<S, P>
2771where
2772 S: Stream,
2773 P: FnMut(&S::Item) -> bool,
2774{
2775 type Item = S::Item;
2776
2777 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2778 let this = self.project();
2779
2780 match ready!(this.stream.poll_next(cx)) {
2781 Some(v) => {
2782 if (this.predicate)(&v) {
2783 Poll::Ready(Some(v))
2784 } else {
2785 Poll::Ready(None)
2786 }
2787 }
2788 None => Poll::Ready(None),
2789 }
2790 }
2791}
2792
2793pin_project! {
2794 /// Stream for the [`StreamExt::skip()`] method.
2795 #[derive(Clone, Debug)]
2796 #[must_use = "streams do nothing unless polled"]
2797 pub struct Skip<S> {
2798 #[pin]
2799 stream: S,
2800 n: usize,
2801 }
2802}
2803
2804impl<S: Stream> Stream for Skip<S> {
2805 type Item = S::Item;
2806
2807 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2808 let mut this = self.project();
2809 loop {
2810 match ready!(this.stream.as_mut().poll_next(cx)) {
2811 Some(v) => match *this.n {
2812 0 => return Poll::Ready(Some(v)),
2813 _ => *this.n -= 1,
2814 },
2815 None => return Poll::Ready(None),
2816 }
2817 }
2818 }
2819}
2820
2821pin_project! {
2822 /// Stream for the [`StreamExt::skip_while()`] method.
2823 #[derive(Clone, Debug)]
2824 #[must_use = "streams do nothing unless polled"]
2825 pub struct SkipWhile<S, P> {
2826 #[pin]
2827 stream: S,
2828 predicate: Option<P>,
2829 }
2830}
2831
2832impl<S, P> Stream for SkipWhile<S, P>
2833where
2834 S: Stream,
2835 P: FnMut(&S::Item) -> bool,
2836{
2837 type Item = S::Item;
2838
2839 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2840 let mut this = self.project();
2841 loop {
2842 match ready!(this.stream.as_mut().poll_next(cx)) {
2843 Some(v) => match this.predicate {
2844 Some(p) => {
2845 if !p(&v) {
2846 *this.predicate = None;
2847 return Poll::Ready(Some(v));
2848 }
2849 }
2850 None => return Poll::Ready(Some(v)),
2851 },
2852 None => return Poll::Ready(None),
2853 }
2854 }
2855 }
2856}
2857
2858pin_project! {
2859 /// Stream for the [`StreamExt::step_by()`] method.
2860 #[derive(Clone, Debug)]
2861 #[must_use = "streams do nothing unless polled"]
2862 pub struct StepBy<S> {
2863 #[pin]
2864 stream: S,
2865 step: usize,
2866 i: usize,
2867 }
2868}
2869
2870impl<S: Stream> Stream for StepBy<S> {
2871 type Item = S::Item;
2872
2873 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2874 let mut this = self.project();
2875 loop {
2876 match ready!(this.stream.as_mut().poll_next(cx)) {
2877 Some(v) => {
2878 if *this.i == 0 {
2879 *this.i = *this.step - 1;
2880 return Poll::Ready(Some(v));
2881 } else {
2882 *this.i -= 1;
2883 }
2884 }
2885 None => return Poll::Ready(None),
2886 }
2887 }
2888 }
2889}
2890
2891pin_project! {
2892 /// Stream for the [`StreamExt::chain()`] method.
2893 #[derive(Clone, Debug)]
2894 #[must_use = "streams do nothing unless polled"]
2895 pub struct Chain<S, U> {
2896 #[pin]
2897 first: Fuse<S>,
2898 #[pin]
2899 second: Fuse<U>,
2900 }
2901}
2902
2903impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2904 type Item = S::Item;
2905
2906 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2907 let mut this = self.project();
2908
2909 if !this.first.done {
2910 let next = ready!(this.first.as_mut().poll_next(cx));
2911 if let Some(next) = next {
2912 return Poll::Ready(Some(next));
2913 }
2914 }
2915
2916 if !this.second.done {
2917 let next = ready!(this.second.as_mut().poll_next(cx));
2918 if let Some(next) = next {
2919 return Poll::Ready(Some(next));
2920 }
2921 }
2922
2923 if this.first.done && this.second.done {
2924 Poll::Ready(None)
2925 } else {
2926 Poll::Pending
2927 }
2928 }
2929}
2930
2931pin_project! {
2932 /// Stream for the [`StreamExt::cloned()`] method.
2933 #[derive(Clone, Debug)]
2934 #[must_use = "streams do nothing unless polled"]
2935 pub struct Cloned<S> {
2936 #[pin]
2937 stream: S,
2938 }
2939}
2940
2941impl<'a, S, T: 'a> Stream for Cloned<S>
2942where
2943 S: Stream<Item = &'a T>,
2944 T: Clone,
2945{
2946 type Item = T;
2947
2948 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2949 let this = self.project();
2950 let next = ready!(this.stream.poll_next(cx));
2951 Poll::Ready(next.cloned())
2952 }
2953}
2954
2955pin_project! {
2956 /// Stream for the [`StreamExt::copied()`] method.
2957 #[derive(Clone, Debug)]
2958 #[must_use = "streams do nothing unless polled"]
2959 pub struct Copied<S> {
2960 #[pin]
2961 stream: S,
2962 }
2963}
2964
2965impl<'a, S, T: 'a> Stream for Copied<S>
2966where
2967 S: Stream<Item = &'a T>,
2968 T: Copy,
2969{
2970 type Item = T;
2971
2972 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2973 let this = self.project();
2974 let next = ready!(this.stream.poll_next(cx));
2975 Poll::Ready(next.copied())
2976 }
2977}
2978
2979pin_project! {
2980 /// Stream for the [`StreamExt::cycle()`] method.
2981 #[derive(Clone, Debug)]
2982 #[must_use = "streams do nothing unless polled"]
2983 pub struct Cycle<S> {
2984 orig: S,
2985 #[pin]
2986 stream: S,
2987 }
2988}
2989
2990impl<S> Stream for Cycle<S>
2991where
2992 S: Stream + Clone,
2993{
2994 type Item = S::Item;
2995
2996 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2997 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
2998 Some(item) => Poll::Ready(Some(item)),
2999 None => {
3000 let new = self.as_mut().orig.clone();
3001 self.as_mut().project().stream.set(new);
3002 self.project().stream.poll_next(cx)
3003 }
3004 }
3005 }
3006}
3007
3008pin_project! {
3009 /// Stream for the [`StreamExt::enumerate()`] method.
3010 #[derive(Clone, Debug)]
3011 #[must_use = "streams do nothing unless polled"]
3012 pub struct Enumerate<S> {
3013 #[pin]
3014 stream: S,
3015 i: usize,
3016 }
3017}
3018
3019impl<S> Stream for Enumerate<S>
3020where
3021 S: Stream,
3022{
3023 type Item = (usize, S::Item);
3024
3025 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3026 let this = self.project();
3027
3028 match ready!(this.stream.poll_next(cx)) {
3029 Some(v) => {
3030 let ret = (*this.i, v);
3031 *this.i += 1;
3032 Poll::Ready(Some(ret))
3033 }
3034 None => Poll::Ready(None),
3035 }
3036 }
3037}
3038
3039pin_project! {
3040 /// Stream for the [`StreamExt::inspect()`] method.
3041 #[derive(Clone, Debug)]
3042 #[must_use = "streams do nothing unless polled"]
3043 pub struct Inspect<S, F> {
3044 #[pin]
3045 stream: S,
3046 f: F,
3047 }
3048}
3049
3050impl<S, F> Stream for Inspect<S, F>
3051where
3052 S: Stream,
3053 F: FnMut(&S::Item),
3054{
3055 type Item = S::Item;
3056
3057 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3058 let mut this = self.project();
3059 let next = ready!(this.stream.as_mut().poll_next(cx));
3060 if let Some(x) = &next {
3061 (this.f)(x);
3062 }
3063 Poll::Ready(next)
3064 }
3065}
3066
3067/// Future for the [`StreamExt::nth()`] method.
3068#[derive(Debug)]
3069#[must_use = "futures do nothing unless you `.await` or poll them"]
3070pub struct NthFuture<'a, S: ?Sized> {
3071 stream: &'a mut S,
3072 n: usize,
3073}
3074
3075impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
3076
3077impl<'a, S> Future for NthFuture<'a, S>
3078where
3079 S: Stream + Unpin + ?Sized,
3080{
3081 type Output = Option<S::Item>;
3082
3083 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3084 loop {
3085 match ready!(self.stream.poll_next(cx)) {
3086 Some(v) => match self.n {
3087 0 => return Poll::Ready(Some(v)),
3088 _ => self.n -= 1,
3089 },
3090 None => return Poll::Ready(None),
3091 }
3092 }
3093 }
3094}
3095
3096pin_project! {
3097 /// Future for the [`StreamExt::last()`] method.
3098 #[derive(Debug)]
3099 #[must_use = "futures do nothing unless you `.await` or poll them"]
3100 pub struct LastFuture<S: Stream> {
3101 #[pin]
3102 stream: S,
3103 last: Option<S::Item>,
3104 }
3105}
3106
3107impl<S: Stream> Future for LastFuture<S> {
3108 type Output = Option<S::Item>;
3109
3110 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3111 let mut this = self.project();
3112 loop {
3113 match ready!(this.stream.as_mut().poll_next(cx)) {
3114 Some(new) => *this.last = Some(new),
3115 None => return Poll::Ready(this.last.take()),
3116 }
3117 }
3118 }
3119}
3120
3121/// Future for the [`StreamExt::find()`] method.
3122#[derive(Debug)]
3123#[must_use = "futures do nothing unless you `.await` or poll them"]
3124pub struct FindFuture<'a, S: ?Sized, P> {
3125 stream: &'a mut S,
3126 predicate: P,
3127}
3128
3129impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
3130
3131impl<'a, S, P> Future for FindFuture<'a, S, P>
3132where
3133 S: Stream + Unpin + ?Sized,
3134 P: FnMut(&S::Item) -> bool,
3135{
3136 type Output = Option<S::Item>;
3137
3138 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3139 loop {
3140 match ready!(self.stream.poll_next(cx)) {
3141 Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
3142 Some(_) => {}
3143 None => return Poll::Ready(None),
3144 }
3145 }
3146 }
3147}
3148
3149/// Future for the [`StreamExt::find_map()`] method.
3150#[derive(Debug)]
3151#[must_use = "futures do nothing unless you `.await` or poll them"]
3152pub struct FindMapFuture<'a, S: ?Sized, F> {
3153 stream: &'a mut S,
3154 f: F,
3155}
3156
3157impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
3158
3159impl<'a, S, B, F> Future for FindMapFuture<'a, S, F>
3160where
3161 S: Stream + Unpin + ?Sized,
3162 F: FnMut(S::Item) -> Option<B>,
3163{
3164 type Output = Option<B>;
3165
3166 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3167 loop {
3168 match ready!(self.stream.poll_next(cx)) {
3169 Some(v) => {
3170 if let Some(v) = (&mut self.f)(v) {
3171 return Poll::Ready(Some(v));
3172 }
3173 }
3174 None => return Poll::Ready(None),
3175 }
3176 }
3177 }
3178}
3179
3180/// Future for the [`StreamExt::position()`] method.
3181#[derive(Debug)]
3182#[must_use = "futures do nothing unless you `.await` or poll them"]
3183pub struct PositionFuture<'a, S: ?Sized, P> {
3184 stream: &'a mut S,
3185 predicate: P,
3186 index: usize,
3187}
3188
3189impl<'a, S: Unpin + ?Sized, P> Unpin for PositionFuture<'a, S, P> {}
3190
3191impl<'a, S, P> Future for PositionFuture<'a, S, P>
3192where
3193 S: Stream + Unpin + ?Sized,
3194 P: FnMut(S::Item) -> bool,
3195{
3196 type Output = Option<usize>;
3197
3198 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3199 loop {
3200 match ready!(self.stream.poll_next(cx)) {
3201 Some(v) => {
3202 if (&mut self.predicate)(v) {
3203 return Poll::Ready(Some(self.index));
3204 } else {
3205 self.index += 1;
3206 }
3207 }
3208 None => return Poll::Ready(None),
3209 }
3210 }
3211 }
3212}
3213
3214/// Future for the [`StreamExt::all()`] method.
3215#[derive(Debug)]
3216#[must_use = "futures do nothing unless you `.await` or poll them"]
3217pub struct AllFuture<'a, S: ?Sized, P> {
3218 stream: &'a mut S,
3219 predicate: P,
3220}
3221
3222impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
3223
3224impl<S, P> Future for AllFuture<'_, S, P>
3225where
3226 S: Stream + Unpin + ?Sized,
3227 P: FnMut(S::Item) -> bool,
3228{
3229 type Output = bool;
3230
3231 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3232 loop {
3233 match ready!(self.stream.poll_next(cx)) {
3234 Some(v) => {
3235 if !(&mut self.predicate)(v) {
3236 return Poll::Ready(false);
3237 }
3238 }
3239 None => return Poll::Ready(true),
3240 }
3241 }
3242 }
3243}
3244
3245/// Future for the [`StreamExt::any()`] method.
3246#[derive(Debug)]
3247#[must_use = "futures do nothing unless you `.await` or poll them"]
3248pub struct AnyFuture<'a, S: ?Sized, P> {
3249 stream: &'a mut S,
3250 predicate: P,
3251}
3252
3253impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
3254
3255impl<S, P> Future for AnyFuture<'_, S, P>
3256where
3257 S: Stream + Unpin + ?Sized,
3258 P: FnMut(S::Item) -> bool,
3259{
3260 type Output = bool;
3261
3262 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3263 loop {
3264 match ready!(self.stream.poll_next(cx)) {
3265 Some(v) => {
3266 if (&mut self.predicate)(v) {
3267 return Poll::Ready(true);
3268 }
3269 }
3270 None => return Poll::Ready(false),
3271 }
3272 }
3273 }
3274}
3275
3276pin_project! {
3277 /// Future for the [`StreamExt::for_each()`] method.
3278 #[derive(Debug)]
3279 #[must_use = "futures do nothing unless you `.await` or poll them"]
3280 pub struct ForEachFuture<S, F> {
3281 #[pin]
3282 stream: S,
3283 f: F,
3284 }
3285}
3286
3287impl<S, F> Future for ForEachFuture<S, F>
3288where
3289 S: Stream,
3290 F: FnMut(S::Item),
3291{
3292 type Output = ();
3293
3294 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3295 let mut this = self.project();
3296 loop {
3297 match ready!(this.stream.as_mut().poll_next(cx)) {
3298 Some(v) => (this.f)(v),
3299 None => return Poll::Ready(()),
3300 }
3301 }
3302 }
3303}
3304
3305/// Future for the [`StreamExt::try_for_each()`] method.
3306#[derive(Debug)]
3307#[must_use = "futures do nothing unless you `.await` or poll them"]
3308pub struct TryForEachFuture<'a, S: ?Sized, F> {
3309 stream: &'a mut S,
3310 f: F,
3311}
3312
3313impl<'a, S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'a, S, F> {}
3314
3315impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F>
3316where
3317 S: Stream + Unpin + ?Sized,
3318 F: FnMut(S::Item) -> Result<(), E>,
3319{
3320 type Output = Result<(), E>;
3321
3322 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3323 loop {
3324 match ready!(self.stream.poll_next(cx)) {
3325 None => return Poll::Ready(Ok(())),
3326 Some(v) => (&mut self.f)(v)?,
3327 }
3328 }
3329 }
3330}
3331
3332pin_project! {
3333 /// Stream for the [`StreamExt::zip()`] method.
3334 #[derive(Clone, Debug)]
3335 #[must_use = "streams do nothing unless polled"]
3336 pub struct Zip<A: Stream, B> {
3337 item_slot: Option<A::Item>,
3338 #[pin]
3339 first: A,
3340 #[pin]
3341 second: B,
3342 }
3343}
3344
3345impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3346 type Item = (A::Item, B::Item);
3347
3348 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3349 let this = self.project();
3350
3351 if this.item_slot.is_none() {
3352 match this.first.poll_next(cx) {
3353 Poll::Pending => return Poll::Pending,
3354 Poll::Ready(None) => return Poll::Ready(None),
3355 Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3356 }
3357 }
3358
3359 let second_item = ready!(this.second.poll_next(cx));
3360 let first_item = this.item_slot.take().unwrap();
3361 Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3362 }
3363}
3364
3365pin_project! {
3366 /// Future for the [`StreamExt::unzip()`] method.
3367 #[derive(Debug)]
3368 #[must_use = "futures do nothing unless you `.await` or poll them"]
3369 pub struct UnzipFuture<S, FromA, FromB> {
3370 #[pin]
3371 stream: S,
3372 res: Option<(FromA, FromB)>,
3373 }
3374}
3375
3376impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3377where
3378 S: Stream<Item = (A, B)>,
3379 FromA: Default + Extend<A>,
3380 FromB: Default + Extend<B>,
3381{
3382 type Output = (FromA, FromB);
3383
3384 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3385 let mut this = self.project();
3386
3387 loop {
3388 match ready!(this.stream.as_mut().poll_next(cx)) {
3389 Some((a, b)) => {
3390 let res = this.res.as_mut().unwrap();
3391 res.0.extend(Some(a));
3392 res.1.extend(Some(b));
3393 }
3394 None => return Poll::Ready(this.res.take().unwrap()),
3395 }
3396 }
3397 }
3398}
3399
3400/// Stream for the [`StreamExt::drain()`] method.
3401#[derive(Debug)]
3402#[must_use = "streams do nothing unless polled"]
3403pub struct Drain<'a, S: ?Sized> {
3404 stream: &'a mut S,
3405}
3406
3407impl<'a, S: Unpin + ?Sized> Unpin for Drain<'a, S> {}
3408
3409impl<'a, S: Unpin + ?Sized> Drain<'a, S> {
3410 /// Get a reference to the underlying stream.
3411 ///
3412 /// ## Examples
3413 ///
3414 /// ```
3415 /// use futures_lite::{prelude::*, stream};
3416 ///
3417 /// # spin_on::spin_on(async {
3418 /// let mut s = stream::iter(vec![1, 2, 3]);
3419 /// let s2 = s.drain();
3420 ///
3421 /// let inner = s2.get_ref();
3422 /// // s and inner are the same.
3423 /// # });
3424 /// ```
3425 pub fn get_ref(&self) -> &S {
3426 &self.stream
3427 }
3428
3429 /// Get a mutable reference to the underlying stream.
3430 ///
3431 /// ## Examples
3432 ///
3433 /// ```
3434 /// use futures_lite::{prelude::*, stream};
3435 ///
3436 /// # spin_on::spin_on(async {
3437 /// let mut s = stream::iter(vec![1, 2, 3]);
3438 /// let mut s2 = s.drain();
3439 ///
3440 /// let inner = s2.get_mut();
3441 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3442 /// # });
3443 /// ```
3444 pub fn get_mut(&mut self) -> &mut S {
3445 &mut self.stream
3446 }
3447
3448 /// Consume this stream and get the underlying stream.
3449 ///
3450 /// ## Examples
3451 ///
3452 /// ```
3453 /// use futures_lite::{prelude::*, stream};
3454 ///
3455 /// # spin_on::spin_on(async {
3456 /// let mut s = stream::iter(vec![1, 2, 3]);
3457 /// let mut s2 = s.drain();
3458 ///
3459 /// let inner = s2.into_inner();
3460 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3461 /// # });
3462 /// ```
3463 pub fn into_inner(self) -> &'a mut S {
3464 self.stream
3465 }
3466}
3467
3468impl<'a, S: Stream + Unpin + ?Sized> Stream for Drain<'a, S> {
3469 type Item = S::Item;
3470
3471 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3472 match self.stream.poll_next(cx) {
3473 Poll::Ready(x) => Poll::Ready(x),
3474 Poll::Pending => Poll::Ready(None),
3475 }
3476 }
3477
3478 fn size_hint(&self) -> (usize, Option<usize>) {
3479 let (_, hi) = self.stream.size_hint();
3480 (0, hi)
3481 }
3482}