blob: a7eade04f9df690a6ce93e3a25fd6d89d0df6a56 [file] [log] [blame]
<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><meta name="generator" content="rustdoc"><meta name="description" content="An extension trait for the `Stream` trait that provides a variety of convenient combinator functions."><meta name="keywords" content="rust, rustlang, rust-lang, StreamExt"><title>StreamExt in tokio_stream - Rust</title><link rel="preload" as="font" type="font/woff2" crossorigin href="../SourceSerif4-Regular.ttf.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../FiraSans-Regular.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../FiraSans-Medium.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../SourceCodePro-Regular.ttf.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../SourceSerif4-Bold.ttf.woff2"><link rel="preload" as="font" type="font/woff2" crossorigin href="../SourceCodePro-Semibold.ttf.woff2"><link rel="stylesheet" href="../normalize.css"><link rel="stylesheet" href="../rustdoc.css" id="mainThemeStyle"><link rel="stylesheet" href="../ayu.css" disabled><link rel="stylesheet" href="../dark.css" disabled><link rel="stylesheet" href="../light.css" id="themeStyle"><script id="default-settings" ></script><script src="../storage.js"></script><script defer src="sidebar-items.js"></script><script defer src="../main.js"></script><noscript><link rel="stylesheet" href="../noscript.css"></noscript><link rel="alternate icon" type="image/png" href="../favicon-16x16.png"><link rel="alternate icon" type="image/png" href="../favicon-32x32.png"><link rel="icon" type="image/svg+xml" href="../favicon.svg"></head><body class="rustdoc trait"><!--[if lte IE 11]><div class="warning">This old browser is unsupported and will most likely display funky things.</div><![endif]--><nav class="mobile-topbar"><button class="sidebar-menu-toggle">&#9776;</button><a class="sidebar-logo" href="../tokio_stream/index.html"><div class="logo-container"><img class="rust-logo" src="../rust-logo.svg" alt="logo"></div></a><h2></h2></nav><nav class="sidebar"><a class="sidebar-logo" href="../tokio_stream/index.html"><div class="logo-container"><img class="rust-logo" src="../rust-logo.svg" alt="logo"></div></a><h2 class="location"><a href="#">StreamExt</a></h2><div class="sidebar-elems"><section><h3><a href="#provided-methods">Provided Methods</a></h3><ul class="block"><li><a href="#method.all">all</a></li><li><a href="#method.any">any</a></li><li><a href="#method.chain">chain</a></li><li><a href="#method.chunks_timeout">chunks_timeout</a></li><li><a href="#method.collect">collect</a></li><li><a href="#method.filter">filter</a></li><li><a href="#method.filter_map">filter_map</a></li><li><a href="#method.fold">fold</a></li><li><a href="#method.fuse">fuse</a></li><li><a href="#method.map">map</a></li><li><a href="#method.map_while">map_while</a></li><li><a href="#method.merge">merge</a></li><li><a href="#method.next">next</a></li><li><a href="#method.skip">skip</a></li><li><a href="#method.skip_while">skip_while</a></li><li><a href="#method.take">take</a></li><li><a href="#method.take_while">take_while</a></li><li><a href="#method.then">then</a></li><li><a href="#method.throttle">throttle</a></li><li><a href="#method.timeout">timeout</a></li><li><a href="#method.timeout_repeating">timeout_repeating</a></li><li><a href="#method.try_next">try_next</a></li></ul><h3><a href="#implementors">Implementors</a></h3></section><h2><a href="index.html">In tokio_stream</a></h2></div></nav><main><div class="width-limiter"><nav class="sub"><form class="search-form"><div class="search-container"><span></span><input class="search-input" name="search" autocomplete="off" spellcheck="false" placeholder="Click or press ‘S’ to search, ‘?’ for more options…" type="search"><div id="help-button" title="help" tabindex="-1"><a href="../help.html">?</a></div><div id="settings-menu" tabindex="-1"><a href="../settings.html" title="settings"><img width="22" height="22" alt="Change settings" src="../wheel.svg"></a></div></div></form></nav><section id="main-content" class="content"><div class="main-heading"><h1 class="fqn">Trait <a href="index.html">tokio_stream</a>::<wbr><a class="trait" href="#">StreamExt</a><button id="copy-path" onclick="copy_path(this)" title="Copy item path to clipboard"><img src="../clipboard.svg" width="19" height="18" alt="Copy item path"></button></h1><span class="out-of-band"><a class="srclink" href="../src/tokio_stream/stream_ext.rs.html#103-1181">source</a> · <a id="toggle-all-docs" href="javascript:void(0)" title="collapse all docs">[<span class="inner">&#x2212;</span>]</a></span></div><div class="item-decl"><pre class="rust trait"><code>pub trait StreamExt: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a> {
<details class="rustdoc-toggle type-contents-toggle"><summary class="hideme"><span>Show 22 methods</span></summary> fn <a href="#method.next" class="fnname">next</a>(&amp;mut self) -&gt; Next&lt;'_, Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Unpin</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.try_next" class="fnname">try_next</a>&lt;T, E&gt;(&amp;mut self) -&gt; TryNext&lt;'_, Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a>&lt;Item = Result&lt;T, E&gt;&gt; + Unpin</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.map" class="fnname">map</a>&lt;T, F&gt;(self, f: F) -&gt; Map&lt;Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; T,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.map_while" class="fnname">map_while</a>&lt;T, F&gt;(self, f: F) -&gt; MapWhile&lt;Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; Option&lt;T&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.then" class="fnname">then</a>&lt;F, Fut&gt;(self, f: F) -&gt; Then&lt;Self, Fut, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; Fut,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Fut: Future,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.merge" class="fnname">merge</a>&lt;U&gt;(self, other: U) -&gt; Merge&lt;Self, U&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;U: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a>&lt;Item = Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.filter" class="fnname">filter</a>&lt;F&gt;(self, f: F) -&gt; Filter&lt;Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(&amp;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.filter_map" class="fnname">filter_map</a>&lt;T, F&gt;(self, f: F) -&gt; FilterMap&lt;Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; Option&lt;T&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.fuse" class="fnname">fuse</a>(self) -&gt; Fuse&lt;Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.take" class="fnname">take</a>(self, n: usize) -&gt; Take&lt;Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.take_while" class="fnname">take_while</a>&lt;F&gt;(self, f: F) -&gt; TakeWhile&lt;Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(&amp;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.skip" class="fnname">skip</a>(self, n: usize) -&gt; Skip&lt;Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.skip_while" class="fnname">skip_while</a>&lt;F&gt;(self, f: F) -&gt; SkipWhile&lt;Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(&amp;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.all" class="fnname">all</a>&lt;F&gt;(&amp;mut self, f: F) -&gt; AllFuture&lt;'_, Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Unpin,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.any" class="fnname">any</a>&lt;F&gt;(&amp;mut self, f: F) -&gt; AnyFuture&lt;'_, Self, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Unpin,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.chain" class="fnname">chain</a>&lt;U&gt;(self, other: U) -&gt; Chain&lt;Self, U&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;U: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a>&lt;Item = Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.fold" class="fnname">fold</a>&lt;B, F&gt;(self, init: B, f: F) -&gt; FoldFuture&lt;Self, B, F&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(B, Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; B</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.collect" class="fnname">collect</a>&lt;T&gt;(self) -&gt; Collect&lt;Self, T&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;T: <a class="trait" href="trait.FromStream.html" title="trait tokio_stream::FromStream">FromStream</a>&lt;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.timeout" class="fnname">timeout</a>(self, duration: Duration) -&gt; <a class="struct" href="struct.Timeout.html" title="struct tokio_stream::Timeout">Timeout</a>&lt;Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.timeout_repeating" class="fnname">timeout_repeating</a>(self, interval: <a class="struct" href="../tokio/time/interval/struct.Interval.html" title="struct tokio::time::interval::Interval">Interval</a>) -&gt; TimeoutRepeating&lt;Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.throttle" class="fnname">throttle</a>(self, duration: Duration) -&gt; Throttle&lt;Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
<span class="item-spacer"></span> fn <a href="#method.chunks_timeout" class="fnname">chunks_timeout</a>(<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;self,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;max_size: usize,<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;duration: Duration<br>&nbsp;&nbsp;&nbsp;&nbsp;) -&gt; ChunksTimeout&lt;Self&gt;<br>&nbsp;&nbsp;&nbsp;&nbsp;<span class="where">where<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized</span>,
{ ... }
</details>}</code></pre></div><details class="rustdoc-toggle top-doc" open><summary class="hideme"><span>Expand description</span></summary><div class="docblock"><p>An extension trait for the <a href="../futures_core/stream/trait.Stream.html"><code>Stream</code></a> trait that provides a variety of
convenient combinator functions.</p>
<p>Be aware that the <code>Stream</code> trait in Tokio is a re-export of the trait found
in the <a href="https://docs.rs/futures">futures</a> crate, however both Tokio and futures provide separate
<code>StreamExt</code> utility traits, and some utilities are only available on one of
these traits. Click <a href="https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html">here</a> to see the other <code>StreamExt</code>
trait in the futures crate.</p>
<p>If you need utilities from both <code>StreamExt</code> traits, you should prefer to
import one of them, and use the other through the fully qualified call
syntax. For example:</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="comment">// import one of the traits:
</span><span class="kw">use </span>futures::stream::StreamExt;
<span class="kw">let </span>a = tokio_stream::iter(<span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">3</span>, <span class="number">5</span>]);
<span class="kw">let </span>b = tokio_stream::iter(<span class="macro">vec!</span>[<span class="number">2</span>, <span class="number">4</span>, <span class="number">6</span>]);
<span class="comment">// use the fully qualified call syntax for the other trait:
</span><span class="kw">let </span>merged = tokio_stream::StreamExt::merge(a, b);
<span class="comment">// use normal call notation for futures::stream::StreamExt::collect
</span><span class="kw">let </span>output: Vec&lt;<span class="kw">_</span>&gt; = merged.collect().<span class="kw">await</span>;
<span class="macro">assert_eq!</span>(output, <span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>, <span class="number">4</span>, <span class="number">5</span>, <span class="number">6</span>]);</code></pre></div>
</div></details><h2 id="provided-methods" class="small-section-header">Provided Methods<a href="#provided-methods" class="anchor"></a></h2><div class="methods"><details class="rustdoc-toggle method-toggle" open><summary><section id="method.next" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#141-146">source</a><h4 class="code-header">fn <a href="#method.next" class="fnname">next</a>(&amp;mut self) -&gt; Next&lt;'_, Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Unpin,</span></h4></section></summary><div class="docblock"><p>Consumes and returns the next value in the stream or <code>None</code> if the
stream is finished.</p>
<p>Equivalent to:</p>
<div class="example-wrap ignore"><div class='tooltip'></div><pre class="rust rust-example-rendered"><code><span class="kw">async fn </span>next(<span class="kw-2">&amp;mut </span><span class="self">self</span>) -&gt; <span class="prelude-ty">Option</span>&lt;<span class="self">Self</span>::Item&gt;;</code></pre></div>
<p>Note that because <code>next</code> doesn’t take ownership over the stream,
the <a href="../futures_core/stream/trait.Stream.html" title="Stream"><code>Stream</code></a> type must be [<code>Unpin</code>]. If you want to use <code>next</code> with a
<a href="Unpin"><code>!Unpin</code></a> stream, you’ll first have to pin the stream. This can
be done by boxing the stream using [<code>Box::pin</code>] or
pinning it to the stack using the <code>pin_mut!</code> macro from the <code>pin_utils</code>
crate.</p>
<h5 id="cancel-safety"><a href="#cancel-safety">Cancel safety</a></h5>
<p>This method is cancel safe. The returned future only
holds onto a reference to the underlying stream,
so dropping it will never lose a value.</p>
<h5 id="examples"><a href="#examples">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">3</span>);
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">1</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">2</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">3</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.try_next" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#182-187">source</a><h4 class="code-header">fn <a href="#method.try_next" class="fnname">try_next</a>&lt;T, E&gt;(&amp;mut self) -&gt; TryNext&lt;'_, Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a>&lt;Item = Result&lt;T, E&gt;&gt; + Unpin,</span></h4></section></summary><div class="docblock"><p>Consumes and returns the next item in the stream. If an error is
encountered before the next item, the error is returned instead.</p>
<p>Equivalent to:</p>
<div class="example-wrap ignore"><div class='tooltip'></div><pre class="rust rust-example-rendered"><code><span class="kw">async fn </span>try_next(<span class="kw-2">&amp;mut </span><span class="self">self</span>) -&gt; <span class="prelude-ty">Result</span>&lt;<span class="prelude-ty">Option</span>&lt;T&gt;, E&gt;;</code></pre></div>
<p>This is similar to the <a href="trait.StreamExt.html#method.next"><code>next</code></a> combinator,
but returns a <a href="Result"><code>Result&lt;Option&lt;T&gt;, E&gt;</code></a> rather than
an <a href="Option"><code>Option&lt;Result&lt;T, E&gt;&gt;</code></a>, making for easy use
with the <a href="std::ops::Try"><code>?</code></a> operator.</p>
<h5 id="cancel-safety-1"><a href="#cancel-safety-1">Cancel safety</a></h5>
<p>This method is cancel safe. The returned future only
holds onto a reference to the underlying stream,
so dropping it will never lose a value.</p>
<h5 id="examples-1"><a href="#examples-1">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream::iter(<span class="macro">vec!</span>[<span class="prelude-val">Ok</span>(<span class="number">1</span>), <span class="prelude-val">Ok</span>(<span class="number">2</span>), <span class="prelude-val">Err</span>(<span class="string">&quot;nope&quot;</span>)]);
<span class="macro">assert_eq!</span>(stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>)));
<span class="macro">assert_eq!</span>(stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>)));
<span class="macro">assert_eq!</span>(stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Err</span>(<span class="string">&quot;nope&quot;</span>));</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.map" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#215-221">source</a><h4 class="code-header">fn <a href="#method.map" class="fnname">map</a>&lt;T, F&gt;(self, f: F) -&gt; Map&lt;Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; T,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Maps this stream’s items to a different type, returning a new stream of
the resulting type.</p>
<p>The provided closure is executed over all elements of this stream as
they are made available. It is executed inline with calls to
<a href="../futures_core/stream/trait.Stream.html#tymethod.poll_next"><code>poll_next</code></a>.</p>
<p>Note that this function consumes the stream passed into it and returns a
wrapped version of it, similar to the existing <code>map</code> methods in the
standard library.</p>
<h5 id="examples-2"><a href="#examples-2">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">3</span>);
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream.map(|x| x + <span class="number">3</span>);
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">4</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">5</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">6</span>));</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.map_while" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#257-263">source</a><h4 class="code-header">fn <a href="#method.map_while" class="fnname">map_while</a>&lt;T, F&gt;(self, f: F) -&gt; MapWhile&lt;Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; Option&lt;T&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Map this stream’s items to a different type for as long as determined by
the provided closure. A stream of the target type will be returned,
which will yield elements until the closure returns <code>None</code>.</p>
<p>The provided closure is executed over all elements of this stream as
they are made available, until it returns <code>None</code>. It is executed inline
with calls to <a href="../futures_core/stream/trait.Stream.html#tymethod.poll_next"><code>poll_next</code></a>. Once <code>None</code> is returned,
the underlying stream will not be polled again.</p>
<p>Note that this function consumes the stream passed into it and returns a
wrapped version of it, similar to the [<code>Iterator::map_while</code>] method in the
standard library.</p>
<h5 id="examples-3"><a href="#examples-3">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">10</span>);
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream.map_while(|x| {
<span class="kw">if </span>x &lt; <span class="number">4 </span>{
<span class="prelude-val">Some</span>(x + <span class="number">3</span>)
} <span class="kw">else </span>{
<span class="prelude-val">None
</span>}
});
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">4</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">5</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">6</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.then" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#301-308">source</a><h4 class="code-header">fn <a href="#method.then" class="fnname">then</a>&lt;F, Fut&gt;(self, f: F) -&gt; Then&lt;Self, Fut, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; Fut,<br>&nbsp;&nbsp;&nbsp;&nbsp;Fut: Future,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Maps this stream’s items asynchronously to a different type, returning a
new stream of the resulting type.</p>
<p>The provided closure is executed over all elements of this stream as
they are made available, and the returned future is executed. Only one
future is executed at the time.</p>
<p>Note that this function consumes the stream passed into it and returns a
wrapped version of it, similar to the existing <code>then</code> methods in the
standard library.</p>
<p>Be aware that if the future is not <code>Unpin</code>, then neither is the <code>Stream</code>
returned by this method. To handle this, you can use <code>tokio::pin!</code> as in
the example below or put the stream in a <code>Box</code> with <code>Box::pin(stream)</code>.</p>
<h5 id="examples-4"><a href="#examples-4">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">async fn </span>do_async_work(value: i32) -&gt; i32 {
value + <span class="number">3
</span>}
<span class="kw">let </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">3</span>);
<span class="kw">let </span>stream = stream.then(do_async_work);
<span class="macro">tokio::pin!</span>(stream);
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">4</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">5</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">6</span>));</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.merge" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#394-400">source</a><h4 class="code-header">fn <a href="#method.merge" class="fnname">merge</a>&lt;U&gt;(self, other: U) -&gt; Merge&lt;Self, U&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;U: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a>&lt;Item = Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Combine two streams into one by interleaving the output of both as it
is produced.</p>
<p>Values are produced from the merged stream in the order they arrive from
the two source streams. If both source streams provide values
simultaneously, the merge stream alternates between them. This provides
some level of fairness. You should not chain calls to <code>merge</code>, as this
will break the fairness of the merging.</p>
<p>The merged stream completes once <strong>both</strong> source streams complete. When
one source stream completes before the other, the merge stream
exclusively polls the remaining stream.</p>
<p>For merging multiple streams, consider using <a href="struct.StreamMap.html"><code>StreamMap</code></a> instead.</p>
<h5 id="examples-5"><a href="#examples-5">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{StreamExt, Stream};
<span class="kw">use </span>tokio::sync::mpsc;
<span class="kw">use </span>tokio::time;
<span class="kw">use </span>std::time::Duration;
<span class="kw">use </span>std::pin::Pin;
<span class="attribute">#[tokio::main]
</span><span class="kw">async fn </span>main() {
<span class="kw">let </span>(tx1, <span class="kw-2">mut </span>rx1) = mpsc::channel::&lt;usize&gt;(<span class="number">10</span>);
<span class="kw">let </span>(tx2, <span class="kw-2">mut </span>rx2) = mpsc::channel::&lt;usize&gt;(<span class="number">10</span>);
<span class="comment">// Convert the channels to a `Stream`.
</span><span class="kw">let </span>rx1 = Box::pin(<span class="macro">async_stream::stream! </span>{
<span class="kw">while let </span><span class="prelude-val">Some</span>(item) = rx1.recv().<span class="kw">await </span>{
<span class="kw">yield </span>item;
}
}) <span class="kw">as </span>Pin&lt;Box&lt;<span class="kw">dyn </span>Stream&lt;Item = usize&gt; + Send&gt;&gt;;
<span class="kw">let </span>rx2 = Box::pin(<span class="macro">async_stream::stream! </span>{
<span class="kw">while let </span><span class="prelude-val">Some</span>(item) = rx2.recv().<span class="kw">await </span>{
<span class="kw">yield </span>item;
}
}) <span class="kw">as </span>Pin&lt;Box&lt;<span class="kw">dyn </span>Stream&lt;Item = usize&gt; + Send&gt;&gt;;
<span class="kw">let </span><span class="kw-2">mut </span>rx = rx1.merge(rx2);
tokio::spawn(<span class="kw">async move </span>{
<span class="comment">// Send some values immediately
</span>tx1.send(<span class="number">1</span>).<span class="kw">await</span>.unwrap();
tx1.send(<span class="number">2</span>).<span class="kw">await</span>.unwrap();
<span class="comment">// Let the other task send values
</span>time::sleep(Duration::from_millis(<span class="number">20</span>)).<span class="kw">await</span>;
tx1.send(<span class="number">4</span>).<span class="kw">await</span>.unwrap();
});
tokio::spawn(<span class="kw">async move </span>{
<span class="comment">// Wait for the first task to send values
</span>time::sleep(Duration::from_millis(<span class="number">5</span>)).<span class="kw">await</span>;
tx2.send(<span class="number">3</span>).<span class="kw">await</span>.unwrap();
time::sleep(Duration::from_millis(<span class="number">25</span>)).<span class="kw">await</span>;
<span class="comment">// Send the final value
</span>tx2.send(<span class="number">5</span>).<span class="kw">await</span>.unwrap();
});
<span class="macro">assert_eq!</span>(<span class="number">1</span>, rx.next().<span class="kw">await</span>.unwrap());
<span class="macro">assert_eq!</span>(<span class="number">2</span>, rx.next().<span class="kw">await</span>.unwrap());
<span class="macro">assert_eq!</span>(<span class="number">3</span>, rx.next().<span class="kw">await</span>.unwrap());
<span class="macro">assert_eq!</span>(<span class="number">4</span>, rx.next().<span class="kw">await</span>.unwrap());
<span class="macro">assert_eq!</span>(<span class="number">5</span>, rx.next().<span class="kw">await</span>.unwrap());
<span class="comment">// The merged stream is consumed
</span><span class="macro">assert!</span>(rx.next().<span class="kw">await</span>.is_none());
}</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.filter" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#432-438">source</a><h4 class="code-header">fn <a href="#method.filter" class="fnname">filter</a>&lt;F&gt;(self, f: F) -&gt; Filter&lt;Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(&amp;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Filters the values produced by this stream according to the provided
predicate.</p>
<p>As values of this stream are made available, the provided predicate <code>f</code>
will be run against them. If the predicate
resolves to <code>true</code>, then the stream will yield the value, but if the
predicate resolves to <code>false</code>, then the value
will be discarded and the next value will be produced.</p>
<p>Note that this function consumes the stream passed into it and returns a
wrapped version of it, similar to [<code>Iterator::filter</code>] method in the
standard library.</p>
<h5 id="examples-6"><a href="#examples-6">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">8</span>);
<span class="kw">let </span><span class="kw-2">mut </span>evens = stream.filter(|x| x % <span class="number">2 </span>== <span class="number">0</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">4</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">6</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">8</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">None</span>, evens.next().<span class="kw">await</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.filter_map" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#470-476">source</a><h4 class="code-header">fn <a href="#method.filter_map" class="fnname">filter_map</a>&lt;T, F&gt;(self, f: F) -&gt; FilterMap&lt;Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; Option&lt;T&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Filters the values produced by this stream while simultaneously mapping
them to a different type according to the provided closure.</p>
<p>As values of this stream are made available, the provided function will
be run on them. If the predicate <code>f</code> resolves to
<a href="Some"><code>Some(item)</code></a> then the stream will yield the value <code>item</code>, but if
it resolves to [<code>None</code>], then the value will be skipped.</p>
<p>Note that this function consumes the stream passed into it and returns a
wrapped version of it, similar to [<code>Iterator::filter_map</code>] method in the
standard library.</p>
<h5 id="examples-7"><a href="#examples-7">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">8</span>);
<span class="kw">let </span><span class="kw-2">mut </span>evens = stream.filter_map(|x| {
<span class="kw">if </span>x % <span class="number">2 </span>== <span class="number">0 </span>{ <span class="prelude-val">Some</span>(x + <span class="number">1</span>) } <span class="kw">else </span>{ <span class="prelude-val">None </span>}
});
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">5</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">7</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">9</span>), evens.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">None</span>, evens.next().<span class="kw">await</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.fuse" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#536-541">source</a><h4 class="code-header">fn <a href="#method.fuse" class="fnname">fuse</a>(self) -&gt; Fuse&lt;Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Creates a stream which ends after the first <code>None</code>.</p>
<p>After a stream returns <code>None</code>, behavior is undefined. Future calls to
<code>poll_next</code> may or may not return <code>Some(T)</code> again or they may panic.
<code>fuse()</code> adapts a stream, ensuring that after <code>None</code> is given, it will
return <code>None</code> forever.</p>
<h5 id="examples-8"><a href="#examples-8">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{Stream, StreamExt};
<span class="kw">use </span>std::pin::Pin;
<span class="kw">use </span>std::task::{Context, Poll};
<span class="comment">// a stream which alternates between Some and None
</span><span class="kw">struct </span>Alternate {
state: i32,
}
<span class="kw">impl </span>Stream <span class="kw">for </span>Alternate {
<span class="kw">type </span>Item = i32;
<span class="kw">fn </span>poll_next(<span class="kw-2">mut </span><span class="self">self</span>: Pin&lt;<span class="kw-2">&amp;mut </span><span class="self">Self</span>&gt;, _cx: <span class="kw-2">&amp;mut </span>Context&lt;<span class="lifetime">&#39;_</span>&gt;) -&gt; Poll&lt;<span class="prelude-ty">Option</span>&lt;i32&gt;&gt; {
<span class="kw">let </span>val = <span class="self">self</span>.state;
<span class="self">self</span>.state = <span class="self">self</span>.state + <span class="number">1</span>;
<span class="comment">// if it&#39;s even, Some(i32), else None
</span><span class="kw">if </span>val % <span class="number">2 </span>== <span class="number">0 </span>{
Poll::Ready(<span class="prelude-val">Some</span>(val))
} <span class="kw">else </span>{
Poll::Ready(<span class="prelude-val">None</span>)
}
}
}
<span class="attribute">#[tokio::main]
</span><span class="kw">async fn </span>main() {
<span class="kw">let </span><span class="kw-2">mut </span>stream = Alternate { state: <span class="number">0 </span>};
<span class="comment">// the stream goes back and forth
</span><span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">0</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">2</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);
<span class="comment">// however, once it is fused
</span><span class="kw">let </span><span class="kw-2">mut </span>stream = stream.fuse();
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">4</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);
<span class="comment">// it will always return `None` after the first time.
</span><span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);
}</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.take" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#563-568">source</a><h4 class="code-header">fn <a href="#method.take" class="fnname">take</a>(self, n: usize) -&gt; Take&lt;Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Creates a new stream of at most <code>n</code> items of the underlying stream.</p>
<p>Once <code>n</code> items have been yielded from this stream then it will always
return that the stream is done.</p>
<h5 id="examples-9"><a href="#examples-9">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">10</span>).take(<span class="number">3</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">None</span>, stream.next().<span class="kw">await</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.take_while" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#592-598">source</a><h4 class="code-header">fn <a href="#method.take_while" class="fnname">take_while</a>&lt;F&gt;(self, f: F) -&gt; TakeWhile&lt;Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(&amp;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Take elements from this stream while the provided predicate
resolves to <code>true</code>.</p>
<p>This function, like <code>Iterator::take_while</code>, will take elements from the
stream until the predicate <code>f</code> resolves to <code>false</code>. Once one element
returns false it will always return that the stream is done.</p>
<h5 id="examples-10"><a href="#examples-10">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">10</span>).take_while(|x| <span class="kw-2">*</span>x &lt;= <span class="number">3</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">None</span>, stream.next().<span class="kw">await</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.skip" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#618-623">source</a><h4 class="code-header">fn <a href="#method.skip" class="fnname">skip</a>(self, n: usize) -&gt; Skip&lt;Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Creates a new stream that will skip the <code>n</code> first items of the
underlying stream.</p>
<h5 id="examples-11"><a href="#examples-11">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream::iter(<span class="number">1</span>..=<span class="number">10</span>).skip(<span class="number">7</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">8</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">9</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">10</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">None</span>, stream.next().<span class="kw">await</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.skip_while" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#648-654">source</a><h4 class="code-header">fn <a href="#method.skip_while" class="fnname">skip_while</a>&lt;F&gt;(self, f: F) -&gt; SkipWhile&lt;Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(&amp;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Skip elements from the underlying stream while the provided predicate
resolves to <code>true</code>.</p>
<p>This function, like <a href="std::iter::Iterator::skip_while()"><code>Iterator::skip_while</code></a>, will ignore elements from the
stream until the predicate <code>f</code> resolves to <code>false</code>. Once one element
returns false, the rest of the elements will be yielded.</p>
<h5 id="examples-12"><a href="#examples-12">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span><span class="kw-2">mut </span>stream = stream::iter(<span class="macro">vec!</span>[<span class="number">1</span>,<span class="number">2</span>,<span class="number">3</span>,<span class="number">4</span>,<span class="number">1</span>]).skip_while(|x| <span class="kw-2">*</span>x &lt; <span class="number">3</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">4</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>), stream.next().<span class="kw">await</span>);
<span class="macro">assert_eq!</span>(<span class="prelude-val">None</span>, stream.next().<span class="kw">await</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.all" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#709-715">source</a><h4 class="code-header">fn <a href="#method.all" class="fnname">all</a>&lt;F&gt;(&amp;mut self, f: F) -&gt; AllFuture&lt;'_, Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Unpin,<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,</span></h4></section></summary><div class="docblock"><p>Tests if every element of the stream matches a predicate.</p>
<p>Equivalent to:</p>
<div class="example-wrap ignore"><div class='tooltip'></div><pre class="rust rust-example-rendered"><code><span class="kw">async fn </span>all&lt;F&gt;(<span class="kw-2">&amp;mut </span><span class="self">self</span>, f: F) -&gt; bool;</code></pre></div>
<p><code>all()</code> takes a closure that returns <code>true</code> or <code>false</code>. It applies
this closure to each element of the stream, and if they all return
<code>true</code>, then so does <code>all</code>. If any of them return <code>false</code>, it
returns <code>false</code>. An empty stream returns <code>true</code>.</p>
<p><code>all()</code> is short-circuiting; in other words, it will stop processing
as soon as it finds a <code>false</code>, given that no matter what else happens,
the result will also be <code>false</code>.</p>
<p>An empty stream returns <code>true</code>.</p>
<h5 id="examples-13"><a href="#examples-13">Examples</a></h5>
<p>Basic usage:</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>a = [<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>];
<span class="macro">assert!</span>(stream::iter(<span class="kw-2">&amp;</span>a).all(|<span class="kw-2">&amp;</span>x| x &gt; <span class="number">0</span>).<span class="kw">await</span>);
<span class="macro">assert!</span>(!stream::iter(<span class="kw-2">&amp;</span>a).all(|<span class="kw-2">&amp;</span>x| x &gt; <span class="number">2</span>).<span class="kw">await</span>);</code></pre></div>
<p>Stopping at the first <code>false</code>:</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>a = [<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>];
<span class="kw">let </span><span class="kw-2">mut </span>iter = stream::iter(<span class="kw-2">&amp;</span>a);
<span class="macro">assert!</span>(!iter.all(|<span class="kw-2">&amp;</span>x| x != <span class="number">2</span>).<span class="kw">await</span>);
<span class="comment">// we can still use `iter`, as there are more elements.
</span><span class="macro">assert_eq!</span>(iter.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="kw-2">&amp;</span><span class="number">3</span>));</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.any" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#768-774">source</a><h4 class="code-header">fn <a href="#method.any" class="fnname">any</a>&lt;F&gt;(&amp;mut self, f: F) -&gt; AnyFuture&lt;'_, Self, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Unpin,<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; bool,</span></h4></section></summary><div class="docblock"><p>Tests if any element of the stream matches a predicate.</p>
<p>Equivalent to:</p>
<div class="example-wrap ignore"><div class='tooltip'></div><pre class="rust rust-example-rendered"><code><span class="kw">async fn </span>any&lt;F&gt;(<span class="kw-2">&amp;mut </span><span class="self">self</span>, f: F) -&gt; bool;</code></pre></div>
<p><code>any()</code> takes a closure that returns <code>true</code> or <code>false</code>. It applies
this closure to each element of the stream, and if any of them return
<code>true</code>, then so does <code>any()</code>. If they all return <code>false</code>, it
returns <code>false</code>.</p>
<p><code>any()</code> is short-circuiting; in other words, it will stop processing
as soon as it finds a <code>true</code>, given that no matter what else happens,
the result will also be <code>true</code>.</p>
<p>An empty stream returns <code>false</code>.</p>
<p>Basic usage:</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>a = [<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>];
<span class="macro">assert!</span>(stream::iter(<span class="kw-2">&amp;</span>a).any(|<span class="kw-2">&amp;</span>x| x &gt; <span class="number">0</span>).<span class="kw">await</span>);
<span class="macro">assert!</span>(!stream::iter(<span class="kw-2">&amp;</span>a).any(|<span class="kw-2">&amp;</span>x| x &gt; <span class="number">5</span>).<span class="kw">await</span>);</code></pre></div>
<p>Stopping at the first <code>true</code>:</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">let </span>a = [<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>];
<span class="kw">let </span><span class="kw-2">mut </span>iter = stream::iter(<span class="kw-2">&amp;</span>a);
<span class="macro">assert!</span>(iter.any(|<span class="kw-2">&amp;</span>x| x != <span class="number">2</span>).<span class="kw">await</span>);
<span class="comment">// we can still use `iter`, as there are more elements.
</span><span class="macro">assert_eq!</span>(iter.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="kw-2">&amp;</span><span class="number">2</span>));</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.chain" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#803-809">source</a><h4 class="code-header">fn <a href="#method.chain" class="fnname">chain</a>&lt;U&gt;(self, other: U) -&gt; Chain&lt;Self, U&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;U: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a>&lt;Item = Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Combine two streams into one by first returning all values from the
first stream then all values from the second stream.</p>
<p>As long as <code>self</code> still has values to emit, no values from <code>other</code> are
emitted, even if some are ready.</p>
<h5 id="examples-14"><a href="#examples-14">Examples</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="attribute">#[tokio::main]
</span><span class="kw">async fn </span>main() {
<span class="kw">let </span>one = stream::iter(<span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>]);
<span class="kw">let </span>two = stream::iter(<span class="macro">vec!</span>[<span class="number">4</span>, <span class="number">5</span>, <span class="number">6</span>]);
<span class="kw">let </span><span class="kw-2">mut </span>stream = one.chain(two);
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">1</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">2</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">3</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">4</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">5</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="number">6</span>));
<span class="macro">assert_eq!</span>(stream.next().<span class="kw">await</span>, <span class="prelude-val">None</span>);
}</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.fold" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#833-839">source</a><h4 class="code-header">fn <a href="#method.fold" class="fnname">fold</a>&lt;B, F&gt;(self, init: B, f: F) -&gt; FoldFuture&lt;Self, B, F&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,<br>&nbsp;&nbsp;&nbsp;&nbsp;F: FnMut(B, Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>) -&gt; B,</span></h4></section></summary><div class="docblock"><p>A combinator that applies a function to every element in a stream
producing a single, final value.</p>
<p>Equivalent to:</p>
<div class="example-wrap ignore"><div class='tooltip'></div><pre class="rust rust-example-rendered"><code><span class="kw">async fn </span>fold&lt;B, F&gt;(<span class="self">self</span>, init: B, f: F) -&gt; B;</code></pre></div>
<h5 id="examples-15"><a href="#examples-15">Examples</a></h5>
<p>Basic usage:</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, <span class="kw-2">*</span>};
<span class="kw">let </span>s = stream::iter(<span class="macro">vec!</span>[<span class="number">1u8</span>, <span class="number">2</span>, <span class="number">3</span>]);
<span class="kw">let </span>sum = s.fold(<span class="number">0</span>, |acc, x| acc + x).<span class="kw">await</span>;
<span class="macro">assert_eq!</span>(sum, <span class="number">6</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.collect" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#913-919">source</a><h4 class="code-header">fn <a href="#method.collect" class="fnname">collect</a>&lt;T&gt;(self) -&gt; Collect&lt;Self, T&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;T: <a class="trait" href="trait.FromStream.html" title="trait tokio_stream::FromStream">FromStream</a>&lt;Self::<a class="associatedtype" href="../futures_core/stream/trait.Stream.html#associatedtype.Item" title="type futures_core::stream::Stream::Item">Item</a>&gt;,<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Drain stream pushing all emitted values into a collection.</p>
<p>Equivalent to:</p>
<div class="example-wrap ignore"><div class='tooltip'></div><pre class="rust rust-example-rendered"><code><span class="kw">async fn </span>collect&lt;T&gt;(<span class="self">self</span>) -&gt; T;</code></pre></div>
<p><code>collect</code> streams all values, awaiting as needed. Values are pushed into
a collection. A number of different target collection types are
supported, including <a href="std::vec::Vec"><code>Vec</code></a>,
<a href="std::string::String"><code>String</code></a>, and <a href="https://docs.rs/bytes/0.6.0/bytes/struct.Bytes.html"><code>Bytes</code></a>.</p>
<h5 id="result"><a href="#result"><code>Result</code></a></h5>
<p><code>collect()</code> can also be used with streams of type <code>Result&lt;T, E&gt;</code> where
<code>T: FromStream&lt;_&gt;</code>. In this case, <code>collect()</code> will stream as long as
values yielded from the stream are <code>Ok(_)</code>. If <code>Err(_)</code> is encountered,
streaming is terminated and <code>collect()</code> returns the <code>Err</code>.</p>
<h5 id="notes"><a href="#notes">Notes</a></h5>
<p><code>FromStream</code> is currently a sealed trait. Stabilization is pending
enhancements to the Rust language.</p>
<h5 id="examples-16"><a href="#examples-16">Examples</a></h5>
<p>Basic usage:</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="attribute">#[tokio::main]
</span><span class="kw">async fn </span>main() {
<span class="kw">let </span>doubled: Vec&lt;i32&gt; =
stream::iter(<span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>])
.map(|x| x * <span class="number">2</span>)
.collect()
.<span class="kw">await</span>;
<span class="macro">assert_eq!</span>(<span class="macro">vec!</span>[<span class="number">2</span>, <span class="number">4</span>, <span class="number">6</span>], doubled);
}</code></pre></div>
<p>Collecting a stream of <code>Result</code> values</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="attribute">#[tokio::main]
</span><span class="kw">async fn </span>main() {
<span class="comment">// A stream containing only `Ok` values will be collected
</span><span class="kw">let </span>values: <span class="prelude-ty">Result</span>&lt;Vec&lt;i32&gt;, <span class="kw-2">&amp;</span>str&gt; =
stream::iter(<span class="macro">vec!</span>[<span class="prelude-val">Ok</span>(<span class="number">1</span>), <span class="prelude-val">Ok</span>(<span class="number">2</span>), <span class="prelude-val">Ok</span>(<span class="number">3</span>)])
.collect()
.<span class="kw">await</span>;
<span class="macro">assert_eq!</span>(<span class="prelude-val">Ok</span>(<span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>]), values);
<span class="comment">// A stream containing `Err` values will return the first error.
</span><span class="kw">let </span>results = <span class="macro">vec!</span>[<span class="prelude-val">Ok</span>(<span class="number">1</span>), <span class="prelude-val">Err</span>(<span class="string">&quot;no&quot;</span>), <span class="prelude-val">Ok</span>(<span class="number">2</span>), <span class="prelude-val">Ok</span>(<span class="number">3</span>), <span class="prelude-val">Err</span>(<span class="string">&quot;nein&quot;</span>)];
<span class="kw">let </span>values: <span class="prelude-ty">Result</span>&lt;Vec&lt;i32&gt;, <span class="kw-2">&amp;</span>str&gt; =
stream::iter(results)
.collect()
.<span class="kw">await</span>;
<span class="macro">assert_eq!</span>(<span class="prelude-val">Err</span>(<span class="string">&quot;no&quot;</span>), values);
}</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.timeout" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#999-1004">source</a><h4 class="code-header">fn <a href="#method.timeout" class="fnname">timeout</a>(self, duration: Duration) -&gt; <a class="struct" href="struct.Timeout.html" title="struct tokio_stream::Timeout">Timeout</a>&lt;Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Applies a per-item timeout to the passed stream.</p>
<p><code>timeout()</code> takes a <code>Duration</code> that represents the maximum amount of
time each element of the stream has to complete before timing out.</p>
<p>If the wrapped stream yields a value before the deadline is reached, the
value is returned. Otherwise, an error is returned. The caller may decide
to continue consuming the stream and will eventually get the next source
stream value once it becomes available. See
<a href="trait.StreamExt.html#method.timeout_repeating"><code>timeout_repeating</code></a> for an alternative
where the timeouts will repeat.</p>
<h5 id="notes-1"><a href="#notes-1">Notes</a></h5>
<p>This function consumes the stream passed into it and returns a
wrapped version of it.</p>
<p>Polling the returned stream will continue to poll the inner stream even
if one or more items time out.</p>
<h5 id="examples-17"><a href="#examples-17">Examples</a></h5>
<p>Suppose we have a stream <code>int_stream</code> that yields 3 numbers (1, 2, 3):</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">use </span>std::time::Duration;
<span class="kw">let </span>int_stream = int_stream.timeout(Duration::from_secs(<span class="number">1</span>));
<span class="macro">tokio::pin!</span>(int_stream);
<span class="comment">// When no items time out, we get the 3 elements in succession:
</span><span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">None</span>));
<span class="comment">// If the second item times out, we get an error and continue polling the stream:
</span><span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>)));
<span class="macro">assert!</span>(int_stream.try_next().<span class="kw">await</span>.is_err());
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">None</span>));
<span class="comment">// If we want to stop consuming the source stream the first time an
// element times out, we can use the `take_while` operator:
</span><span class="kw">let </span><span class="kw-2">mut </span>int_stream = int_stream.take_while(Result::is_ok);
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">None</span>));</code></pre></div>
<p>Once a timeout error is received, no further events will be received
unless the wrapped stream yields a value (timeouts do not repeat).</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{StreamExt, wrappers::IntervalStream};
<span class="kw">use </span>std::time::Duration;
<span class="kw">let </span>interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(<span class="number">100</span>)));
<span class="kw">let </span>timeout_stream = interval_stream.timeout(Duration::from_millis(<span class="number">10</span>));
<span class="macro">tokio::pin!</span>(timeout_stream);
<span class="comment">// Only one timeout will be received between values in the source stream.
</span><span class="macro">assert!</span>(timeout_stream.try_next().<span class="kw">await</span>.is_ok());
<span class="macro">assert!</span>(timeout_stream.try_next().<span class="kw">await</span>.is_err(), <span class="string">&quot;expected one timeout&quot;</span>);
<span class="macro">assert!</span>(timeout_stream.try_next().<span class="kw">await</span>.is_ok(), <span class="string">&quot;expected no more timeouts&quot;</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.timeout_repeating" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#1088-1093">source</a><h4 class="code-header">fn <a href="#method.timeout_repeating" class="fnname">timeout_repeating</a>(self, interval: <a class="struct" href="../tokio/time/interval/struct.Interval.html" title="struct tokio::time::interval::Interval">Interval</a>) -&gt; TimeoutRepeating&lt;Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Applies a per-item timeout to the passed stream.</p>
<p><code>timeout_repeating()</code> takes an <a href="../tokio/time/interval/struct.Interval.html"><code>Interval</code></a> that
controls the time each element of the stream has to complete before
timing out.</p>
<p>If the wrapped stream yields a value before the deadline is reached, the
value is returned. Otherwise, an error is returned. The caller may decide
to continue consuming the stream and will eventually get the next source
stream value once it becomes available. Unlike <code>timeout()</code>, if no value
becomes available before the deadline is reached, additional errors are
returned at the specified interval. See <a href="trait.StreamExt.html#method.timeout"><code>timeout</code></a>
for an alternative where the timeouts do not repeat.</p>
<h5 id="notes-2"><a href="#notes-2">Notes</a></h5>
<p>This function consumes the stream passed into it and returns a
wrapped version of it.</p>
<p>Polling the returned stream will continue to poll the inner stream even
if one or more items time out.</p>
<h5 id="examples-18"><a href="#examples-18">Examples</a></h5>
<p>Suppose we have a stream <code>int_stream</code> that yields 3 numbers (1, 2, 3):</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">use </span>std::time::Duration;
<span class="kw">let </span>int_stream = int_stream.timeout_repeating(tokio::time::interval(Duration::from_secs(<span class="number">1</span>)));
<span class="macro">tokio::pin!</span>(int_stream);
<span class="comment">// When no items time out, we get the 3 elements in succession:
</span><span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">None</span>));
<span class="comment">// If the second item times out, we get an error and continue polling the stream:
</span><span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>)));
<span class="macro">assert!</span>(int_stream.try_next().<span class="kw">await</span>.is_err());
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">2</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">3</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">None</span>));
<span class="comment">// If we want to stop consuming the source stream the first time an
// element times out, we can use the `take_while` operator:
</span><span class="kw">let </span><span class="kw-2">mut </span>int_stream = int_stream.take_while(Result::is_ok);
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">Some</span>(<span class="number">1</span>)));
<span class="macro">assert_eq!</span>(int_stream.try_next().<span class="kw">await</span>, <span class="prelude-val">Ok</span>(<span class="prelude-val">None</span>));</code></pre></div>
<p>Timeout errors will be continuously produced at the specified interval
until the wrapped stream yields a value.</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>tokio_stream::{StreamExt, wrappers::IntervalStream};
<span class="kw">use </span>std::time::Duration;
<span class="kw">let </span>interval_stream = IntervalStream::new(tokio::time::interval(Duration::from_millis(<span class="number">23</span>)));
<span class="kw">let </span>timeout_stream = interval_stream.timeout_repeating(tokio::time::interval(Duration::from_millis(<span class="number">9</span>)));
<span class="macro">tokio::pin!</span>(timeout_stream);
<span class="comment">// Multiple timeouts will be received between values in the source stream.
</span><span class="macro">assert!</span>(timeout_stream.try_next().<span class="kw">await</span>.is_ok());
<span class="macro">assert!</span>(timeout_stream.try_next().<span class="kw">await</span>.is_err(), <span class="string">&quot;expected one timeout&quot;</span>);
<span class="macro">assert!</span>(timeout_stream.try_next().<span class="kw">await</span>.is_err(), <span class="string">&quot;expected a second timeout&quot;</span>);
<span class="comment">// Will eventually receive another value from the source stream...
</span><span class="macro">assert!</span>(timeout_stream.try_next().<span class="kw">await</span>.is_ok(), <span class="string">&quot;expected non-timeout&quot;</span>);</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.throttle" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#1118-1123">source</a><h4 class="code-header">fn <a href="#method.throttle" class="fnname">throttle</a>(self, duration: Duration) -&gt; Throttle&lt;Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Slows down a stream by enforcing a delay between items.</p>
<p>The underlying timer behind this utility has a granularity of one millisecond.</p>
<h5 id="example"><a href="#example">Example</a></h5>
<p>Create a throttled stream.</p>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>std::time::Duration;
<span class="kw">use </span>tokio_stream::StreamExt;
<span class="kw">let </span>item_stream = futures::stream::repeat(<span class="string">&quot;one&quot;</span>).throttle(Duration::from_secs(<span class="number">2</span>));
<span class="macro">tokio::pin!</span>(item_stream);
<span class="kw">loop </span>{
<span class="comment">// The string will be produced at most every 2 seconds
</span><span class="macro">println!</span>(<span class="string">&quot;{:?}&quot;</span>, item_stream.next().<span class="kw">await</span>);
}</code></pre></div>
</div></details><details class="rustdoc-toggle method-toggle" open><summary><section id="method.chunks_timeout" class="method has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#1174-1180">source</a><h4 class="code-header">fn <a href="#method.chunks_timeout" class="fnname">chunks_timeout</a>(<br>&nbsp;&nbsp;&nbsp;&nbsp;self,<br>&nbsp;&nbsp;&nbsp;&nbsp;max_size: usize,<br>&nbsp;&nbsp;&nbsp;&nbsp;duration: Duration<br>) -&gt; ChunksTimeout&lt;Self&gt;<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;Self: Sized,</span></h4></section></summary><div class="docblock"><p>Batches the items in the given stream using a maximum duration and size for each batch.</p>
<p>This stream returns the next batch of items in the following situations:</p>
<ol>
<li>The inner stream has returned at least <code>max_size</code> many items since the last batch.</li>
<li>The time since the first item of a batch is greater than the given duration.</li>
<li>The end of the stream is reached.</li>
</ol>
<p>The length of the returned vector is never empty or greater than the maximum size. Empty batches
will not be emitted if no items are received upstream.</p>
<h5 id="panics"><a href="#panics">Panics</a></h5>
<p>This function panics if <code>max_size</code> is zero</p>
<h5 id="example-1"><a href="#example-1">Example</a></h5>
<div class="example-wrap"><pre class="rust rust-example-rendered"><code><span class="kw">use </span>std::time::Duration;
<span class="kw">use </span>tokio::time;
<span class="kw">use </span>tokio_stream::{<span class="self">self </span><span class="kw">as </span>stream, StreamExt};
<span class="kw">use </span>futures::FutureExt;
<span class="attribute">#[tokio::main]
</span><span class="kw">async fn </span>main() {
<span class="kw">let </span>iter = <span class="macro">vec!</span>[<span class="number">1</span>, <span class="number">2</span>, <span class="number">3</span>, <span class="number">4</span>].into_iter();
<span class="kw">let </span>stream0 = stream::iter(iter);
<span class="kw">let </span>iter = <span class="macro">vec!</span>[<span class="number">5</span>].into_iter();
<span class="kw">let </span>stream1 = stream::iter(iter)
.then(<span class="kw">move </span>|n| time::sleep(Duration::from_secs(<span class="number">5</span>)).map(<span class="kw">move </span>|<span class="kw">_</span>| n));
<span class="kw">let </span>chunk_stream = stream0
.chain(stream1)
.chunks_timeout(<span class="number">3</span>, Duration::from_secs(<span class="number">2</span>));
<span class="macro">tokio::pin!</span>(chunk_stream);
<span class="comment">// a full batch was received
</span><span class="macro">assert_eq!</span>(chunk_stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="macro">vec!</span>[<span class="number">1</span>,<span class="number">2</span>,<span class="number">3</span>]));
<span class="comment">// deadline was reached before max_size was reached
</span><span class="macro">assert_eq!</span>(chunk_stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="macro">vec!</span>[<span class="number">4</span>]));
<span class="comment">// last element in the stream
</span><span class="macro">assert_eq!</span>(chunk_stream.next().<span class="kw">await</span>, <span class="prelude-val">Some</span>(<span class="macro">vec!</span>[<span class="number">5</span>]));
}</code></pre></div>
</div></details></div><h2 id="implementors" class="small-section-header">Implementors<a href="#implementors" class="anchor"></a></h2><div id="implementors-list"><section id="impl-StreamExt-for-St" class="impl has-srclink"><a class="srclink rightside" href="../src/tokio_stream/stream_ext.rs.html#1183">source</a><a href="#impl-StreamExt-for-St" class="anchor"></a><h3 class="code-header">impl&lt;St:&nbsp;?Sized&gt; <a class="trait" href="trait.StreamExt.html" title="trait tokio_stream::StreamExt">StreamExt</a> for St<span class="where fmt-newline">where<br>&nbsp;&nbsp;&nbsp;&nbsp;St: <a class="trait" href="../futures_core/stream/trait.Stream.html" title="trait futures_core::stream::Stream">Stream</a>,</span></h3></section></div><script src="../implementors/tokio_stream/stream_ext/trait.StreamExt.js" async></script></section></div></main><div id="rustdoc-vars" data-root-path="../" data-current-crate="tokio_stream" data-themes="ayu,dark,light" data-resource-suffix="" data-rustdoc-version="1.66.0-nightly (5c8bff74b 2022-10-21)" ></div></body></html>