</pre><pre class="rust"><code><span class="kw">use </span>core::future::Future;
<span class="kw">use </span>futures_core::Stream;
<span class="kw">mod </span>all;
<span class="kw">use </span>all::AllFuture;
<span class="kw">mod </span>any;
<span class="kw">use </span>any::AnyFuture;
<span class="kw">mod </span>chain;
<span class="kw">use </span>chain::Chain;
<span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">mod </span>collect;
<span class="kw">use </span>collect::{Collect, FromStream};
<span class="kw">mod </span>filter;
<span class="kw">use </span>filter::Filter;
<span class="kw">mod </span>filter_map;
<span class="kw">use </span>filter_map::FilterMap;
<span class="kw">mod </span>fold;
<span class="kw">use </span>fold::FoldFuture;
<span class="kw">mod </span>fuse;
<span class="kw">use </span>fuse::Fuse;
<span class="kw">mod </span>map;
<span class="kw">use </span>map::Map;
<span class="kw">mod </span>map_while;
<span class="kw">use </span>map_while::MapWhile;
<span class="kw">mod </span>merge;
<span class="kw">use </span>merge::Merge;
<span class="kw">mod </span>next;
<span class="kw">use </span>next::Next;
<span class="kw">mod </span>skip;
<span class="kw">use </span>skip::Skip;
<span class="kw">mod </span>skip_while;
<span class="kw">use </span>skip_while::SkipWhile;
<span class="kw">mod </span>take;
<span class="kw">use </span>take::Take;
<span class="kw">mod </span>take_while;
<span class="kw">use </span>take_while::TakeWhile;
<span class="kw">mod </span>then;
<span class="kw">use </span>then::Then;
<span class="kw">mod </span>try_next;
<span class="kw">use </span>try_next::TryNext;
<span class="macro">cfg_time! </span>{
<span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">mod </span>timeout;
<span class="kw">pub</span>(<span class="kw">crate</span>) <span class="kw">mod </span>timeout_repeating;
<span class="kw">use </span>timeout::Timeout;
<span class="kw">use </span>timeout_repeating::TimeoutRepeating;
<span class="kw">use </span>tokio::time::{Duration, Interval};
<span class="kw">mod </span>throttle;
<span class="kw">use </span>throttle::{throttle, Throttle};
<span class="kw">mod </span>chunks_timeout;
<span class="kw">use </span>chunks_timeout::ChunksTimeout;
<span class="doccomment">/// An extension trait for the [`Stream`] trait that provides a variety of
/// convenient combinator functions.
/// Be aware that the `Stream` trait in Tokio is a re-export of the trait found
/// in the [futures] crate, however both Tokio and futures provide separate
/// `StreamExt` utility traits, and some utilities are only available on one of
/// these traits. Click [here][futures-StreamExt] to see the other `StreamExt`
/// trait in the futures crate.
/// If you need utilities from both `StreamExt` traits, you should prefer to
/// import one of them, and use the other through the fully qualified call
/// syntax. For example:
/// ```
/// // import one of the traits:
/// use futures::stream::StreamExt;
/// # #[tokio::main(flavor = &quot;current_thread&quot;)]
/// # async fn main() {
/// let a = tokio_stream::iter(vec![1, 3, 5]);
/// let b = tokio_stream::iter(vec![2, 4, 6]);
/// // use the fully qualified call syntax for the other trait:
/// let merged = tokio_stream::StreamExt::merge(a, b);
/// // use normal call notation for futures::stream::StreamExt::collect
/// let output: Vec&lt;_&gt; = merged.collect().await;
/// assert_eq!(output, vec![1, 2, 3, 4, 5, 6]);
/// # }
/// ```
/// [`Stream`]: crate::Stream
/// [futures]:
/// [futures-StreamExt]:
</span><span class="kw">pub trait </span>StreamExt: Stream {
<span class="doccomment">/// Consumes and returns the next value in the stream or `None` if the
/// stream is finished.
/// Equivalent to:
/// ```ignore
/// async fn next(&amp;mut self) -&gt; Option&lt;Self::Item&gt;;
/// ```
/// Note that because `next` doesn&#39;t take ownership over the stream,
/// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
/// [`!Unpin`](Unpin) stream, you&#39;ll first have to pin the stream. This can
/// be done by boxing the stream using [`Box::pin`] or
/// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
/// crate.
/// # Cancel safety
/// This method is cancel safe. The returned future only
/// holds onto a reference to the underlying stream,
/// so dropping it will never lose a value.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let mut stream = stream::iter(1..=3);
/// assert_eq!(, Some(1));
/// assert_eq!(, Some(2));
/// assert_eq!(, Some(3));
/// assert_eq!(, None);
/// # }
/// ```
</span><span class="kw">fn </span>next(<span class="kw-2">&amp;mut </span><span class="self">self</span>) -&gt; Next&lt;<span class="lifetime">&#39;_</span>, <span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Unpin,
Next::new(<span class="self">self</span>)
<span class="doccomment">/// Consumes and returns the next item in the stream. If an error is
/// encountered before the next item, the error is returned instead.
/// Equivalent to:
/// ```ignore
/// async fn try_next(&amp;mut self) -&gt; Result&lt;Option&lt;T&gt;, E&gt;;
/// ```
/// This is similar to the [`next`](StreamExt::next) combinator,
/// but returns a [`Result&lt;Option&lt;T&gt;, E&gt;`](Result) rather than
/// an [`Option&lt;Result&lt;T, E&gt;&gt;`](Option), making for easy use
/// with the [`?`](std::ops::Try) operator.
/// # Cancel safety
/// This method is cancel safe. The returned future only
/// holds onto a reference to the underlying stream,
/// so dropping it will never lose a value.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err(&quot;nope&quot;)]);
/// assert_eq!(stream.try_next().await, Ok(Some(1)));
/// assert_eq!(stream.try_next().await, Ok(Some(2)));
/// assert_eq!(stream.try_next().await, Err(&quot;nope&quot;));
/// # }
/// ```
</span><span class="kw">fn </span>try_next&lt;T, E&gt;(<span class="kw-2">&amp;mut </span><span class="self">self</span>) -&gt; TryNext&lt;<span class="lifetime">&#39;_</span>, <span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Stream&lt;Item = <span class="prelude-ty">Result</span>&lt;T, E&gt;&gt; + Unpin,
TryNext::new(<span class="self">self</span>)
<span class="doccomment">/// Maps this stream&#39;s items to a different type, returning a new stream of
/// the resulting type.
/// The provided closure is executed over all elements of this stream as
/// they are made available. It is executed inline with calls to
/// [`poll_next`](Stream::poll_next).
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let stream = stream::iter(1..=3);
/// let mut stream =|x| x + 3);
/// assert_eq!(, Some(4));
/// assert_eq!(, Some(5));
/// assert_eq!(, Some(6));
/// # }
/// ```
</span><span class="kw">fn </span>map&lt;T, F&gt;(<span class="self">self</span>, f: F) -&gt; Map&lt;<span class="self">Self</span>, F&gt;
<span class="kw">where
</span>F: FnMut(<span class="self">Self</span>::Item) -&gt; T,
<span class="self">Self</span>: Sized,
Map::new(<span class="self">self</span>, f)
<span class="doccomment">/// Map this stream&#39;s items to a different type for as long as determined by
/// the provided closure. A stream of the target type will be returned,
/// which will yield elements until the closure returns `None`.
/// The provided closure is executed over all elements of this stream as
/// they are made available, until it returns `None`. It is executed inline
/// with calls to [`poll_next`](Stream::poll_next). Once `None` is returned,
/// the underlying stream will not be polled again.
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the [`Iterator::map_while`] method in the
/// standard library.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let stream = stream::iter(1..=10);
/// let mut stream = stream.map_while(|x| {
/// if x &lt; 4 {
/// Some(x + 3)
/// } else {
/// None
/// }
/// });
/// assert_eq!(, Some(4));
/// assert_eq!(, Some(5));
/// assert_eq!(, Some(6));
/// assert_eq!(, None);
/// # }
/// ```
</span><span class="kw">fn </span>map_while&lt;T, F&gt;(<span class="self">self</span>, f: F) -&gt; MapWhile&lt;<span class="self">Self</span>, F&gt;
<span class="kw">where
</span>F: FnMut(<span class="self">Self</span>::Item) -&gt; <span class="prelude-ty">Option</span>&lt;T&gt;,
<span class="self">Self</span>: Sized,
MapWhile::new(<span class="self">self</span>, f)
<span class="doccomment">/// Maps this stream&#39;s items asynchronously to a different type, returning a
/// new stream of the resulting type.
/// The provided closure is executed over all elements of this stream as
/// they are made available, and the returned future is executed. Only one
/// future is executed at the time.
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to the existing `then` methods in the
/// standard library.
/// Be aware that if the future is not `Unpin`, then neither is the `Stream`
/// returned by this method. To handle this, you can use `tokio::pin!` as in
/// the example below or put the stream in a `Box` with `Box::pin(stream)`.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// async fn do_async_work(value: i32) -&gt; i32 {
/// value + 3
/// }
/// let stream = stream::iter(1..=3);
/// let stream = stream.then(do_async_work);
/// tokio::pin!(stream);
/// assert_eq!(, Some(4));
/// assert_eq!(, Some(5));
/// assert_eq!(, Some(6));
/// # }
/// ```
</span><span class="kw">fn </span>then&lt;F, Fut&gt;(<span class="self">self</span>, f: F) -&gt; Then&lt;<span class="self">Self</span>, Fut, F&gt;
<span class="kw">where
</span>F: FnMut(<span class="self">Self</span>::Item) -&gt; Fut,
Fut: Future,
<span class="self">Self</span>: Sized,
Then::new(<span class="self">self</span>, f)
<span class="doccomment">/// Combine two streams into one by interleaving the output of both as it
/// is produced.
/// Values are produced from the merged stream in the order they arrive from
/// the two source streams. If both source streams provide values
/// simultaneously, the merge stream alternates between them. This provides
/// some level of fairness. You should not chain calls to `merge`, as this
/// will break the fairness of the merging.
/// The merged stream completes once **both** source streams complete. When
/// one source stream completes before the other, the merge stream
/// exclusively polls the remaining stream.
/// For merging multiple streams, consider using [`StreamMap`] instead.
/// [`StreamMap`]: crate::StreamMap
/// # Examples
/// ```
/// use tokio_stream::{StreamExt, Stream};
/// use tokio::sync::mpsc;
/// use tokio::time;
/// use std::time::Duration;
/// use std::pin::Pin;
/// # /*
/// #[tokio::main]
/// # */
/// # #[tokio::main(flavor = &quot;current_thread&quot;)]
/// async fn main() {
/// # time::pause();
/// let (tx1, mut rx1) = mpsc::channel::&lt;usize&gt;(10);
/// let (tx2, mut rx2) = mpsc::channel::&lt;usize&gt;(10);
/// // Convert the channels to a `Stream`.
/// let rx1 = Box::pin(async_stream::stream! {
/// while let Some(item) = rx1.recv().await {
/// yield item;
/// }
/// }) as Pin&lt;Box&lt;dyn Stream&lt;Item = usize&gt; + Send&gt;&gt;;
/// let rx2 = Box::pin(async_stream::stream! {
/// while let Some(item) = rx2.recv().await {
/// yield item;
/// }
/// }) as Pin&lt;Box&lt;dyn Stream&lt;Item = usize&gt; + Send&gt;&gt;;
/// let mut rx = rx1.merge(rx2);
/// tokio::spawn(async move {
/// // Send some values immediately
/// tx1.send(1).await.unwrap();
/// tx1.send(2).await.unwrap();
/// // Let the other task send values
/// time::sleep(Duration::from_millis(20)).await;
/// tx1.send(4).await.unwrap();
/// });
/// tokio::spawn(async move {
/// // Wait for the first task to send values
/// time::sleep(Duration::from_millis(5)).await;
/// tx2.send(3).await.unwrap();
/// time::sleep(Duration::from_millis(25)).await;
/// // Send the final value
/// tx2.send(5).await.unwrap();
/// });
/// assert_eq!(1,;
/// assert_eq!(2,;
/// assert_eq!(3,;
/// assert_eq!(4,;
/// assert_eq!(5,;
/// // The merged stream is consumed
/// assert!(;
/// }
/// ```
</span><span class="kw">fn </span>merge&lt;U&gt;(<span class="self">self</span>, other: U) -&gt; Merge&lt;<span class="self">Self</span>, U&gt;
<span class="kw">where
</span>U: Stream&lt;Item = <span class="self">Self</span>::Item&gt;,
<span class="self">Self</span>: Sized,
Merge::new(<span class="self">self</span>, other)
<span class="doccomment">/// Filters the values produced by this stream according to the provided
/// predicate.
/// As values of this stream are made available, the provided predicate `f`
/// will be run against them. If the predicate
/// resolves to `true`, then the stream will yield the value, but if the
/// predicate resolves to `false`, then the value
/// will be discarded and the next value will be produced.
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to [`Iterator::filter`] method in the
/// standard library.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let stream = stream::iter(1..=8);
/// let mut evens = stream.filter(|x| x % 2 == 0);
/// assert_eq!(Some(2),;
/// assert_eq!(Some(4),;
/// assert_eq!(Some(6),;
/// assert_eq!(Some(8),;
/// assert_eq!(None,;
/// # }
/// ```
</span><span class="kw">fn </span>filter&lt;F&gt;(<span class="self">self</span>, f: F) -&gt; Filter&lt;<span class="self">Self</span>, F&gt;
<span class="kw">where
</span>F: FnMut(<span class="kw-2">&amp;</span><span class="self">Self</span>::Item) -&gt; bool,
<span class="self">Self</span>: Sized,
Filter::new(<span class="self">self</span>, f)
<span class="doccomment">/// Filters the values produced by this stream while simultaneously mapping
/// them to a different type according to the provided closure.
/// As values of this stream are made available, the provided function will
/// be run on them. If the predicate `f` resolves to
/// [`Some(item)`](Some) then the stream will yield the value `item`, but if
/// it resolves to [`None`], then the value will be skipped.
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to [`Iterator::filter_map`] method in the
/// standard library.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let stream = stream::iter(1..=8);
/// let mut evens = stream.filter_map(|x| {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
/// assert_eq!(Some(3),;
/// assert_eq!(Some(5),;
/// assert_eq!(Some(7),;
/// assert_eq!(Some(9),;
/// assert_eq!(None,;
/// # }
/// ```
</span><span class="kw">fn </span>filter_map&lt;T, F&gt;(<span class="self">self</span>, f: F) -&gt; FilterMap&lt;<span class="self">Self</span>, F&gt;
<span class="kw">where
</span>F: FnMut(<span class="self">Self</span>::Item) -&gt; <span class="prelude-ty">Option</span>&lt;T&gt;,
<span class="self">Self</span>: Sized,
FilterMap::new(<span class="self">self</span>, f)
<span class="doccomment">/// Creates a stream which ends after the first `None`.
/// After a stream returns `None`, behavior is undefined. Future calls to
/// `poll_next` may or may not return `Some(T)` again or they may panic.
/// `fuse()` adapts a stream, ensuring that after `None` is given, it will
/// return `None` forever.
/// # Examples
/// ```
/// use tokio_stream::{Stream, StreamExt};
/// use std::pin::Pin;
/// use std::task::{Context, Poll};
/// // a stream which alternates between Some and None
/// struct Alternate {
/// state: i32,
/// }
/// impl Stream for Alternate {
/// type Item = i32;
/// fn poll_next(mut self: Pin&lt;&amp;mut Self&gt;, _cx: &amp;mut Context&lt;&#39;_&gt;) -&gt; Poll&lt;Option&lt;i32&gt;&gt; {
/// let val = self.state;
/// self.state = self.state + 1;
/// // if it&#39;s even, Some(i32), else None
/// if val % 2 == 0 {
/// Poll::Ready(Some(val))
/// } else {
/// Poll::Ready(None)
/// }
/// }
/// }
/// #[tokio::main]
/// async fn main() {
/// let mut stream = Alternate { state: 0 };
/// // the stream goes back and forth
/// assert_eq!(, Some(0));
/// assert_eq!(, None);
/// assert_eq!(, Some(2));
/// assert_eq!(, None);
/// // however, once it is fused
/// let mut stream = stream.fuse();
/// assert_eq!(, Some(4));
/// assert_eq!(, None);
/// // it will always return `None` after the first time.
/// assert_eq!(, None);
/// assert_eq!(, None);
/// assert_eq!(, None);
/// }
/// ```
</span><span class="kw">fn </span>fuse(<span class="self">self</span>) -&gt; Fuse&lt;<span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
Fuse::new(<span class="self">self</span>)
<span class="doccomment">/// Creates a new stream of at most `n` items of the underlying stream.
/// Once `n` items have been yielded from this stream then it will always
/// return that the stream is done.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let mut stream = stream::iter(1..=10).take(3);
/// assert_eq!(Some(1),;
/// assert_eq!(Some(2),;
/// assert_eq!(Some(3),;
/// assert_eq!(None,;
/// # }
/// ```
</span><span class="kw">fn </span>take(<span class="self">self</span>, n: usize) -&gt; Take&lt;<span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
Take::new(<span class="self">self</span>, n)
<span class="doccomment">/// Take elements from this stream while the provided predicate
/// resolves to `true`.
/// This function, like `Iterator::take_while`, will take elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false it will always return that the stream is done.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let mut stream = stream::iter(1..=10).take_while(|x| *x &lt;= 3);
/// assert_eq!(Some(1),;
/// assert_eq!(Some(2),;
/// assert_eq!(Some(3),;
/// assert_eq!(None,;
/// # }
/// ```
</span><span class="kw">fn </span>take_while&lt;F&gt;(<span class="self">self</span>, f: F) -&gt; TakeWhile&lt;<span class="self">Self</span>, F&gt;
<span class="kw">where
</span>F: FnMut(<span class="kw-2">&amp;</span><span class="self">Self</span>::Item) -&gt; bool,
<span class="self">Self</span>: Sized,
TakeWhile::new(<span class="self">self</span>, f)
<span class="doccomment">/// Creates a new stream that will skip the `n` first items of the
/// underlying stream.
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let mut stream = stream::iter(1..=10).skip(7);
/// assert_eq!(Some(8),;
/// assert_eq!(Some(9),;
/// assert_eq!(Some(10),;
/// assert_eq!(None,;
/// # }
/// ```
</span><span class="kw">fn </span>skip(<span class="self">self</span>, n: usize) -&gt; Skip&lt;<span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
Skip::new(<span class="self">self</span>, n)
<span class="doccomment">/// Skip elements from the underlying stream while the provided predicate
/// resolves to `true`.
/// This function, like [`Iterator::skip_while`], will ignore elements from the
/// stream until the predicate `f` resolves to `false`. Once one element
/// returns false, the rest of the elements will be yielded.
/// [`Iterator::skip_while`]: std::iter::Iterator::skip_while()
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x &lt; 3);
/// assert_eq!(Some(3),;
/// assert_eq!(Some(4),;
/// assert_eq!(Some(1),;
/// assert_eq!(None,;
/// # }
/// ```
</span><span class="kw">fn </span>skip_while&lt;F&gt;(<span class="self">self</span>, f: F) -&gt; SkipWhile&lt;<span class="self">Self</span>, F&gt;
<span class="kw">where
</span>F: FnMut(<span class="kw-2">&amp;</span><span class="self">Self</span>::Item) -&gt; bool,
<span class="self">Self</span>: Sized,
SkipWhile::new(<span class="self">self</span>, f)
<span class="doccomment">/// Tests if every element of the stream matches a predicate.
/// Equivalent to:
/// ```ignore
/// async fn all&lt;F&gt;(&amp;mut self, f: F) -&gt; bool;
/// ```
/// `all()` takes a closure that returns `true` or `false`. It applies
/// this closure to each element of the stream, and if they all return
/// `true`, then so does `all`. If any of them return `false`, it
/// returns `false`. An empty stream returns `true`.
/// `all()` is short-circuiting; in other words, it will stop processing
/// as soon as it finds a `false`, given that no matter what else happens,
/// the result will also be `false`.
/// An empty stream returns `true`.
/// # Examples
/// Basic usage:
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let a = [1, 2, 3];
/// assert!(stream::iter(&amp;a).all(|&amp;x| x &gt; 0).await);
/// assert!(!stream::iter(&amp;a).all(|&amp;x| x &gt; 2).await);
/// # }
/// ```
/// Stopping at the first `false`:
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let a = [1, 2, 3];
/// let mut iter = stream::iter(&amp;a);
/// assert!(!iter.all(|&amp;x| x != 2).await);
/// // we can still use `iter`, as there are more elements.
/// assert_eq!(, Some(&amp;3));
/// # }
/// ```
</span><span class="kw">fn </span>all&lt;F&gt;(<span class="kw-2">&amp;mut </span><span class="self">self</span>, f: F) -&gt; AllFuture&lt;<span class="lifetime">&#39;_</span>, <span class="self">Self</span>, F&gt;
<span class="kw">where
</span><span class="self">Self</span>: Unpin,
F: FnMut(<span class="self">Self</span>::Item) -&gt; bool,
AllFuture::new(<span class="self">self</span>, f)
<span class="doccomment">/// Tests if any element of the stream matches a predicate.
/// Equivalent to:
/// ```ignore
/// async fn any&lt;F&gt;(&amp;mut self, f: F) -&gt; bool;
/// ```
/// `any()` takes a closure that returns `true` or `false`. It applies
/// this closure to each element of the stream, and if any of them return
/// `true`, then so does `any()`. If they all return `false`, it
/// returns `false`.
/// `any()` is short-circuiting; in other words, it will stop processing
/// as soon as it finds a `true`, given that no matter what else happens,
/// the result will also be `true`.
/// An empty stream returns `false`.
/// Basic usage:
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let a = [1, 2, 3];
/// assert!(stream::iter(&amp;a).any(|&amp;x| x &gt; 0).await);
/// assert!(!stream::iter(&amp;a).any(|&amp;x| x &gt; 5).await);
/// # }
/// ```
/// Stopping at the first `true`:
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// let a = [1, 2, 3];
/// let mut iter = stream::iter(&amp;a);
/// assert!(iter.any(|&amp;x| x != 2).await);
/// // we can still use `iter`, as there are more elements.
/// assert_eq!(, Some(&amp;2));
/// # }
/// ```
</span><span class="kw">fn </span>any&lt;F&gt;(<span class="kw-2">&amp;mut </span><span class="self">self</span>, f: F) -&gt; AnyFuture&lt;<span class="lifetime">&#39;_</span>, <span class="self">Self</span>, F&gt;
<span class="kw">where
</span><span class="self">Self</span>: Unpin,
F: FnMut(<span class="self">Self</span>::Item) -&gt; bool,
AnyFuture::new(<span class="self">self</span>, f)
<span class="doccomment">/// Combine two streams into one by first returning all values from the
/// first stream then all values from the second stream.
/// As long as `self` still has values to emit, no values from `other` are
/// emitted, even if some are ready.
/// # Examples
/// ```
/// use tokio_stream::{self as stream, StreamExt};
/// #[tokio::main]
/// async fn main() {
/// let one = stream::iter(vec![1, 2, 3]);
/// let two = stream::iter(vec![4, 5, 6]);
/// let mut stream = one.chain(two);
/// assert_eq!(, Some(1));
/// assert_eq!(, Some(2));
/// assert_eq!(, Some(3));
/// assert_eq!(, Some(4));
/// assert_eq!(, Some(5));
/// assert_eq!(, Some(6));
/// assert_eq!(, None);
/// }
/// ```
</span><span class="kw">fn </span>chain&lt;U&gt;(<span class="self">self</span>, other: U) -&gt; Chain&lt;<span class="self">Self</span>, U&gt;
<span class="kw">where
</span>U: Stream&lt;Item = <span class="self">Self</span>::Item&gt;,
<span class="self">Self</span>: Sized,
Chain::new(<span class="self">self</span>, other)
<span class="doccomment">/// A combinator that applies a function to every element in a stream
/// producing a single, final value.
/// Equivalent to:
/// ```ignore
/// async fn fold&lt;B, F&gt;(self, init: B, f: F) -&gt; B;
/// ```
/// # Examples
/// Basic usage:
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, *};
/// let s = stream::iter(vec![1u8, 2, 3]);
/// let sum = s.fold(0, |acc, x| acc + x).await;
/// assert_eq!(sum, 6);
/// # }
/// ```
</span><span class="kw">fn </span>fold&lt;B, F&gt;(<span class="self">self</span>, init: B, f: F) -&gt; FoldFuture&lt;<span class="self">Self</span>, B, F&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
F: FnMut(B, <span class="self">Self</span>::Item) -&gt; B,
FoldFuture::new(<span class="self">self</span>, init, f)
<span class="doccomment">/// Drain stream pushing all emitted values into a collection.
/// Equivalent to:
/// ```ignore
/// async fn collect&lt;T&gt;(self) -&gt; T;
/// ```
/// `collect` streams all values, awaiting as needed. Values are pushed into
/// a collection. A number of different target collection types are
/// supported, including [`Vec`](std::vec::Vec),
/// [`String`](std::string::String), and [`Bytes`].
/// [`Bytes`]:
/// # `Result`
/// `collect()` can also be used with streams of type `Result&lt;T, E&gt;` where
/// `T: FromStream&lt;_&gt;`. In this case, `collect()` will stream as long as
/// values yielded from the stream are `Ok(_)`. If `Err(_)` is encountered,
/// streaming is terminated and `collect()` returns the `Err`.
/// # Notes
/// `FromStream` is currently a sealed trait. Stabilization is pending
/// enhancements to the Rust language.
/// # Examples
/// Basic usage:
/// ```
/// use tokio_stream::{self as stream, StreamExt};
/// #[tokio::main]
/// async fn main() {
/// let doubled: Vec&lt;i32&gt; =
/// stream::iter(vec![1, 2, 3])
/// .map(|x| x * 2)
/// .collect()
/// .await;
/// assert_eq!(vec![2, 4, 6], doubled);
/// }
/// ```
/// Collecting a stream of `Result` values
/// ```
/// use tokio_stream::{self as stream, StreamExt};
/// #[tokio::main]
/// async fn main() {
/// // A stream containing only `Ok` values will be collected
/// let values: Result&lt;Vec&lt;i32&gt;, &amp;str&gt; =
/// stream::iter(vec![Ok(1), Ok(2), Ok(3)])
/// .collect()
/// .await;
/// assert_eq!(Ok(vec![1, 2, 3]), values);
/// // A stream containing `Err` values will return the first error.
/// let results = vec![Ok(1), Err(&quot;no&quot;), Ok(2), Ok(3), Err(&quot;nein&quot;)];
/// let values: Result&lt;Vec&lt;i32&gt;, &amp;str&gt; =
/// stream::iter(results)
/// .collect()
/// .await;
/// assert_eq!(Err(&quot;no&quot;), values);
/// }
/// ```
</span><span class="kw">fn </span>collect&lt;T&gt;(<span class="self">self</span>) -&gt; Collect&lt;<span class="self">Self</span>, T&gt;
<span class="kw">where
</span>T: FromStream&lt;<span class="self">Self</span>::Item&gt;,
<span class="self">Self</span>: Sized,
Collect::new(<span class="self">self</span>)
<span class="doccomment">/// Applies a per-item timeout to the passed stream.
/// `timeout()` takes a `Duration` that represents the maximum amount of
/// time each element of the stream has to complete before timing out.
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available. See
/// [`timeout_repeating`](StreamExt::timeout_repeating) for an alternative
/// where the timeouts will repeat.
/// # Notes
/// This function consumes the stream passed into it and returns a
/// wrapped version of it.
/// Polling the returned stream will continue to poll the inner stream even
/// if one or more items time out.
/// # Examples
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
/// let int_stream = int_stream.timeout(Duration::from_secs(1));
/// tokio::pin!(int_stream);
/// // When no items time out, we get the 3 elements in succession:
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// // If the second item times out, we get an error and continue polling the stream:
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert!(int_stream.try_next().await.is_err());
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// // If we want to stop consuming the source stream the first time an
/// // element times out, we can use the `take_while` operator:
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// let mut int_stream = int_stream.take_while(Result::is_ok);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
/// Once a timeout error is received, no further events will be received
/// unless the wrapped stream yields a value (timeouts do not repeat).
/// ```
/// # #[tokio::main(flavor = &quot;current_thread&quot;, start_paused = true)]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(100)));
/// let timeout_stream = interval_stream.timeout(Duration::from_millis(10));
/// tokio::pin!(timeout_stream);
/// // Only one timeout will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err(), &quot;expected one timeout&quot;);
/// assert!(timeout_stream.try_next().await.is_ok(), &quot;expected no more timeouts&quot;);
/// # }
/// ```
</span><span class="attribute">#[cfg(all(feature = <span class="string">&quot;time&quot;</span>))]
#[cfg_attr(docsrs, doc(cfg(feature = <span class="string">&quot;time&quot;</span>)))]
</span><span class="kw">fn </span>timeout(<span class="self">self</span>, duration: Duration) -&gt; Timeout&lt;<span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
Timeout::new(<span class="self">self</span>, duration)
<span class="doccomment">/// Applies a per-item timeout to the passed stream.
/// `timeout_repeating()` takes an [`Interval`](tokio::time::Interval) that
/// controls the time each element of the stream has to complete before
/// timing out.
/// If the wrapped stream yields a value before the deadline is reached, the
/// value is returned. Otherwise, an error is returned. The caller may decide
/// to continue consuming the stream and will eventually get the next source
/// stream value once it becomes available. Unlike `timeout()`, if no value
/// becomes available before the deadline is reached, additional errors are
/// returned at the specified interval. See [`timeout`](StreamExt::timeout)
/// for an alternative where the timeouts do not repeat.
/// # Notes
/// This function consumes the stream passed into it and returns a
/// wrapped version of it.
/// Polling the returned stream will continue to poll the inner stream even
/// if one or more items time out.
/// # Examples
/// Suppose we have a stream `int_stream` that yields 3 numbers (1, 2, 3):
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
/// use std::time::Duration;
/// # let int_stream = stream::iter(1..=3);
/// let int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(1)));
/// tokio::pin!(int_stream);
/// // When no items time out, we get the 3 elements in succession:
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// // If the second item times out, we get an error and continue polling the stream:
/// # let mut int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert!(int_stream.try_next().await.is_err());
/// assert_eq!(int_stream.try_next().await, Ok(Some(2)));
/// assert_eq!(int_stream.try_next().await, Ok(Some(3)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// // If we want to stop consuming the source stream the first time an
/// // element times out, we can use the `take_while` operator:
/// # let int_stream = stream::iter(vec![Ok(1), Err(()), Ok(2), Ok(3)]);
/// let mut int_stream = int_stream.take_while(Result::is_ok);
/// assert_eq!(int_stream.try_next().await, Ok(Some(1)));
/// assert_eq!(int_stream.try_next().await, Ok(None));
/// # }
/// ```
/// Timeout errors will be continuously produced at the specified interval
/// until the wrapped stream yields a value.
/// ```
/// # #[tokio::main(flavor = &quot;current_thread&quot;, start_paused = true)]
/// # async fn main() {
/// use tokio_stream::{StreamExt, wrappers::IntervalStream};
/// use std::time::Duration;
/// let interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(23)));
/// let timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(9)));
/// tokio::pin!(timeout_stream);
/// // Multiple timeouts will be received between values in the source stream.
/// assert!(timeout_stream.try_next().await.is_ok());
/// assert!(timeout_stream.try_next().await.is_err(), &quot;expected one timeout&quot;);
/// assert!(timeout_stream.try_next().await.is_err(), &quot;expected a second timeout&quot;);
/// // Will eventually receive another value from the source stream...
/// assert!(timeout_stream.try_next().await.is_ok(), &quot;expected non-timeout&quot;);
/// # }
/// ```
</span><span class="attribute">#[cfg(all(feature = <span class="string">&quot;time&quot;</span>))]
#[cfg_attr(docsrs, doc(cfg(feature = <span class="string">&quot;time&quot;</span>)))]
</span><span class="kw">fn </span>timeout_repeating(<span class="self">self</span>, interval: Interval) -&gt; TimeoutRepeating&lt;<span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
TimeoutRepeating::new(<span class="self">self</span>, interval)
<span class="doccomment">/// Slows down a stream by enforcing a delay between items.
/// The underlying timer behind this utility has a granularity of one millisecond.
/// # Example
/// Create a throttled stream.
/// ```rust,no_run
/// use std::time::Duration;
/// use tokio_stream::StreamExt;
/// # async fn dox() {
/// let item_stream = futures::stream::repeat(&quot;one&quot;).throttle(Duration::from_secs(2));
/// tokio::pin!(item_stream);
/// loop {
/// // The string will be produced at most every 2 seconds
/// println!(&quot;{:?}&quot;,;
/// }
/// # }
/// ```
</span><span class="attribute">#[cfg(all(feature = <span class="string">&quot;time&quot;</span>))]
#[cfg_attr(docsrs, doc(cfg(feature = <span class="string">&quot;time&quot;</span>)))]
</span><span class="kw">fn </span>throttle(<span class="self">self</span>, duration: Duration) -&gt; Throttle&lt;<span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
throttle(duration, <span class="self">self</span>)
<span class="doccomment">/// Batches the items in the given stream using a maximum duration and size for each batch.
/// This stream returns the next batch of items in the following situations:
/// 1. The inner stream has returned at least `max_size` many items since the last batch.
/// 2. The time since the first item of a batch is greater than the given duration.
/// 3. The end of the stream is reached.
/// The length of the returned vector is never empty or greater than the maximum size. Empty batches
/// will not be emitted if no items are received upstream.
/// # Panics
/// This function panics if `max_size` is zero
/// # Example
/// ```rust
/// use std::time::Duration;
/// use tokio::time;
/// use tokio_stream::{self as stream, StreamExt};
/// use futures::FutureExt;
/// #[tokio::main]
/// # async fn _unused() {}
/// # #[tokio::main(flavor = &quot;current_thread&quot;, start_paused = true)]
/// async fn main() {
/// let iter = vec![1, 2, 3, 4].into_iter();
/// let stream0 = stream::iter(iter);
/// let iter = vec![5].into_iter();
/// let stream1 = stream::iter(iter)
/// .then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
/// let chunk_stream = stream0
/// .chain(stream1)
/// .chunks_timeout(3, Duration::from_secs(2));
/// tokio::pin!(chunk_stream);
/// // a full batch was received
/// assert_eq!(, Some(vec![1,2,3]));
/// // deadline was reached before max_size was reached
/// assert_eq!(, Some(vec![4]));
/// // last element in the stream
/// assert_eq!(, Some(vec![5]));
/// }
/// ```
</span><span class="attribute">#[cfg(feature = <span class="string">&quot;time&quot;</span>)]
#[cfg_attr(docsrs, doc(cfg(feature = <span class="string">&quot;time&quot;</span>)))]
</span><span class="kw">fn </span>chunks_timeout(<span class="self">self</span>, max_size: usize, duration: Duration) -&gt; ChunksTimeout&lt;<span class="self">Self</span>&gt;
<span class="kw">where
</span><span class="self">Self</span>: Sized,
<span class="macro">assert!</span>(max_size &gt; <span class="number">0</span>, <span class="string">&quot;`max_size` must be non-zero.&quot;</span>);
ChunksTimeout::new(<span class="self">self</span>, max_size, duration)
<span class="kw">impl</span>&lt;St: <span class="question-mark">?</span>Sized&gt; StreamExt <span class="kw">for </span>St <span class="kw">where </span>St: Stream {}
<span class="doccomment">/// Merge the size hints from two streams.
</span><span class="kw">fn </span>merge_size_hints(
(left_low, left_high): (usize, <span class="prelude-ty">Option</span>&lt;usize&gt;),
(right_low, right_high): (usize, <span class="prelude-ty">Option</span>&lt;usize&gt;),
) -&gt; (usize, <span class="prelude-ty">Option</span>&lt;usize&gt;) {
<span class="kw">let </span>low = left_low.saturating_add(right_low);
<span class="kw">let </span>high = <span class="kw">match </span>(left_high, right_high) {
(<span class="prelude-val">Some</span>(h1), <span class="prelude-val">Some</span>(h2)) =&gt; h1.checked_add(h2),
<span class="kw">_ </span>=&gt; <span class="prelude-val">None</span>,
(low, high)
</section></div></main><div id="rustdoc-vars" data-root-path="../../" data-current-crate="tokio_stream" data-themes="ayu,dark,light" data-resource-suffix="" data-rustdoc-version="1.66.0-nightly (5c8bff74b 2022-10-21)" ></div></body></html>