| <!DOCTYPE HTML> |
| <html lang="en"> |
| <head> |
| <!-- Generated by javadoc (17) --> |
| <title>KGroupedTable (kafka 3.6.1 API)</title> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> |
| <meta name="description" content="declaration: package: org.apache.kafka.streams.kstream, interface: KGroupedTable"> |
| <meta name="generator" content="javadoc/ClassWriterImpl"> |
| <link rel="stylesheet" type="text/css" href="../../../../../stylesheet.css" title="Style"> |
| <link rel="stylesheet" type="text/css" href="../../../../../script-dir/jquery-ui.min.css" title="Style"> |
| <link rel="stylesheet" type="text/css" href="../../../../../jquery-ui.overrides.css" title="Style"> |
| <script type="text/javascript" src="../../../../../script.js"></script> |
| <script type="text/javascript" src="../../../../../script-dir/jquery-3.5.1.min.js"></script> |
| <script type="text/javascript" src="../../../../../script-dir/jquery-ui.min.js"></script> |
| </head> |
| <body class="class-declaration-page"> |
| <script type="text/javascript">var evenRowColor = "even-row-color"; |
| var oddRowColor = "odd-row-color"; |
| var tableTab = "table-tab"; |
| var activeTableTab = "active-table-tab"; |
| var pathtoroot = "../../../../../"; |
| loadScripts(document, 'script');</script> |
| <noscript> |
| <div>JavaScript is disabled on your browser.</div> |
| </noscript> |
| <div class="flex-box"> |
| <header role="banner" class="flex-header"> |
| <nav role="navigation"> |
| <!-- ========= START OF TOP NAVBAR ======= --> |
| <div class="top-nav" id="navbar-top"> |
| <div class="skip-nav"><a href="#skip-navbar-top" title="Skip navigation links">Skip navigation links</a></div> |
| <ul id="navbar-top-firstrow" class="nav-list" title="Navigation"> |
| <li><a href="../../../../../index.html">Overview</a></li> |
| <li><a href="package-summary.html">Package</a></li> |
| <li class="nav-bar-cell1-rev">Class</li> |
| <li><a href="package-tree.html">Tree</a></li> |
| <li><a href="../../../../../deprecated-list.html">Deprecated</a></li> |
| <li><a href="../../../../../index-all.html">Index</a></li> |
| <li><a href="../../../../../help-doc.html#class">Help</a></li> |
| </ul> |
| </div> |
| <div class="sub-nav"> |
| <div> |
| <ul class="sub-nav-list"> |
| <li>Summary: </li> |
| <li>Nested | </li> |
| <li>Field | </li> |
| <li>Constr | </li> |
| <li><a href="#method-summary">Method</a></li> |
| </ul> |
| <ul class="sub-nav-list"> |
| <li>Detail: </li> |
| <li>Field | </li> |
| <li>Constr | </li> |
| <li><a href="#method-detail">Method</a></li> |
| </ul> |
| </div> |
| <div class="nav-list-search"><label for="search-input">SEARCH:</label> |
| <input type="text" id="search-input" value="search" disabled="disabled"> |
| <input type="reset" id="reset-button" value="reset" disabled="disabled"> |
| </div> |
| </div> |
| <!-- ========= END OF TOP NAVBAR ========= --> |
| <span class="skip-nav" id="skip-navbar-top"></span></nav> |
| </header> |
| <div class="flex-content"> |
| <main role="main"> |
| <!-- ======== START OF CLASS DATA ======== --> |
| <div class="header"> |
| <div class="sub-title"><span class="package-label-in-type">Package</span> <a href="package-summary.html">org.apache.kafka.streams.kstream</a></div> |
| <h1 title="Interface KGroupedTable" class="title">Interface KGroupedTable<K,<wbr>V></h1> |
| </div> |
| <section class="class-description" id="class-description"> |
| <dl class="notes"> |
| <dt>Type Parameters:</dt> |
| <dd><code>K</code> - Type of keys</dd> |
| <dd><code>V</code> - Type of values</dd> |
| </dl> |
| <hr> |
| <div class="type-signature"><span class="modifiers">public interface </span><span class="element-name type-name-label">KGroupedTable<K,<wbr>V></span></div> |
| <div class="block"><code>KGroupedTable</code> is an abstraction of a <i>re-grouped changelog stream</i> from a primary-keyed table, |
| usually on a different grouping key than the original primary key. |
| <p> |
| It is an intermediate representation after a re-grouping of a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> before an aggregation is applied to the |
| new partitions resulting in a new <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| <p> |
| A <code>KGroupedTable</code> must be obtained from a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> via <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>groupBy(...)</code></a>.</div> |
| <dl class="notes"> |
| <dt>See Also:</dt> |
| <dd> |
| <ul class="see-list"> |
| <li><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></li> |
| </ul> |
| </dd> |
| </dl> |
| </section> |
| <section class="summary"> |
| <ul class="summary-list"> |
| <!-- ========== METHOD SUMMARY =========== --> |
| <li> |
| <section class="method-summary" id="method-summary"> |
| <h2>Method Summary</h2> |
| <div id="method-summary-table"> |
| <div class="table-tabs" role="tablist" aria-orientation="horizontal"><button id="method-summary-table-tab0" role="tab" aria-selected="true" aria-controls="method-summary-table.tabpanel" tabindex="0" onkeydown="switchTab(event)" onclick="show('method-summary-table', 'method-summary-table', 3)" class="active-table-tab">All Methods</button><button id="method-summary-table-tab2" role="tab" aria-selected="false" aria-controls="method-summary-table.tabpanel" tabindex="-1" onkeydown="switchTab(event)" onclick="show('method-summary-table', 'method-summary-table-tab2', 3)" class="table-tab">Instance Methods</button><button id="method-summary-table-tab3" role="tab" aria-selected="false" aria-controls="method-summary-table.tabpanel" tabindex="-1" onkeydown="switchTab(event)" onclick="show('method-summary-table', 'method-summary-table-tab3', 3)" class="table-tab">Abstract Methods</button></div> |
| <div id="method-summary-table.tabpanel" role="tabpanel"> |
| <div class="summary-table three-column-summary" aria-labelledby="method-summary-table-tab0"> |
| <div class="table-header col-first">Modifier and Type</div> |
| <div class="table-header col-second">Method</div> |
| <div class="table-header col-last">Description</div> |
| <div class="col-first even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><VR> <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></code></div> |
| <div class="col-second even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator)" class="member-name-link">aggregate</a><wbr>(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor)</code></div> |
| <div class="col-last even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers.</div> |
| </div> |
| <div class="col-first odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><VR> <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></code></div> |
| <div class="col-second odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)" class="member-name-link">aggregate</a><wbr>(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</code></div> |
| <div class="col-last odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><VR> <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></code></div> |
| <div class="col-second even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named)" class="member-name-link">aggregate</a><wbr>(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor, |
| <a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named)</code></div> |
| <div class="col-last even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers.</div> |
| </div> |
| <div class="col-first odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><VR> <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></code></div> |
| <div class="col-second odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)" class="member-name-link">aggregate</a><wbr>(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor, |
| <a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</code></div> |
| <div class="col-last odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></code></div> |
| <div class="col-second even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#count()" class="member-name-link">count</a>()</code></div> |
| <div class="col-last even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></code></div> |
| <div class="col-second odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#count(org.apache.kafka.streams.kstream.Materialized)" class="member-name-link">count</a><wbr>(<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</code></div> |
| <div class="col-last odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></code></div> |
| <div class="col-second even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#count(org.apache.kafka.streams.kstream.Named)" class="member-name-link">count</a><wbr>(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named)</code></div> |
| <div class="col-last even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></code></div> |
| <div class="col-second odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#count(org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)" class="member-name-link">count</a><wbr>(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</code></div> |
| <div class="col-last odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>></code></div> |
| <div class="col-second even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)" class="member-name-link">reduce</a><wbr>(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> adder, |
| <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> subtractor)</code></div> |
| <div class="col-last even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>></code></div> |
| <div class="col-second odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)" class="member-name-link">reduce</a><wbr>(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> adder, |
| <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> subtractor, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</code></div> |
| <div class="col-last odd-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| <div class="col-first even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>></code></div> |
| <div class="col-second even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"><code><a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)" class="member-name-link">reduce</a><wbr>(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> adder, |
| <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> subtractor, |
| <a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</code></div> |
| <div class="col-last even-row-color method-summary-table method-summary-table-tab2 method-summary-table-tab3"> |
| <div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div> |
| </div> |
| </div> |
| </div> |
| </div> |
| </section> |
| </li> |
| </ul> |
| </section> |
| <section class="details"> |
| <ul class="details-list"> |
| <!-- ============ METHOD DETAIL ========== --> |
| <li> |
| <section class="method-details" id="method-detail"> |
| <h2>Method Details</h2> |
| <ul class="member-list"> |
| <li> |
| <section class="detail" id="count(org.apache.kafka.streams.kstream.Materialized)"> |
| <h3>count</h3> |
| <div class="member-signature"><span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></span> <span class="element-name">count</span><wbr><span class="parameters">(<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</span></div> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| that can be queried using the provided <code>queryableStoreName</code>. |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via |
| <a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>: |
| <pre><code> |
| KafkaStreams streams = ... // counting words |
| StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); |
| ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams); |
| K key = "some-word"; |
| ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) |
| </code></pre> |
| For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#metadataForAllStreamsClients()"><code>KafkaStreams.metadataForAllStreamsClients()</code></a> to |
| query the value of the key on a parallel running instance of your Kafka Streams application. |
| |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII |
| alphanumerics, '.', '_' and '-'. |
| The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the |
| provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Parameters:</dt> |
| <dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link"><code>Long</code></a> values that |
| represent the latest (rolling) count (i.e., number of records) for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="count(org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)"> |
| <h3>count</h3> |
| <div class="member-signature"><span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></span> <span class="element-name">count</span><wbr><span class="parameters">(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</span></div> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| that can be queried using the provided <code>queryableStoreName</code>. |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via |
| <a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>: |
| <pre><code> |
| KafkaStreams streams = ... // counting words |
| StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); |
| ReadOnlyKeyValueStore<K, ValueAndTimestamp<Long>> localStore = streams.store(storeQueryParams); |
| K key = "some-word"; |
| ValueAndTimestamp<Long> countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) |
| </code></pre> |
| For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#metadataForAllStreamsClients()"><code>KafkaStreams.metadataForAllStreamsClients()</code></a> to |
| query the value of the key on a parallel running instance of your Kafka Streams application. |
| |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII |
| alphanumerics, '.', '_' and '-'. |
| The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the |
| provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Parameters:</dt> |
| <dd><code>named</code> - the <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd> |
| <dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link"><code>Long</code></a> values that |
| represent the latest (rolling) count (i.e., number of records) for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="count()"> |
| <h3>count</h3> |
| <div class="member-signature"><span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></span> <span class="element-name">count</span>()</div> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name |
| and "-changelog" is a fixed suffix. |
| Note that the internal store name may not be queryable through Interactive Queries. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link"><code>Long</code></a> values that |
| represent the latest (rolling) count (i.e., number of records) for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="count(org.apache.kafka.streams.kstream.Named)"> |
| <h3>count</h3> |
| <div class="member-signature"><span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link">Long</a>></span> <span class="element-name">count</span><wbr><span class="parameters">(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named)</span></div> |
| <div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to |
| the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name |
| and "-changelog" is a fixed suffix. |
| Note that the internal store name may not be queryable through Interactive Queries. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Parameters:</dt> |
| <dd><code>named</code> - the <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Long.html" title="class or interface in java.lang" class="external-link"><code>Long</code></a> values that |
| represent the latest (rolling) count (i.e., number of records) for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)"> |
| <h3>reduce</h3> |
| <div class="member-signature"><span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>></span> <span class="element-name">reduce</span><wbr><span class="parameters">(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> adder, |
| <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> subtractor, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</span></div> |
| <div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| Combining implies that the type of the aggregate result is the same as the type of the input value |
| (c.f. <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>). |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| that can be queried using the provided <code>queryableStoreName</code>. |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the |
| current aggregate (first argument) and the record's value (second argument) by adding the new record to the |
| aggregate. |
| The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> |
| and computes a new aggregate using the current aggregate (first argument) and the record's value (second |
| argument) by "removing" the "replaced" record from the aggregate. |
| If there is no current aggregate the <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> is not applied and the new aggregate will be the record's |
| value as-is. |
| Thus, <code>reduce(Reducer, Reducer, String)</code> can be used to compute aggregate functions like sum. |
| For sum, the adder and subtractor would work as follows: |
| <pre><code> |
| public class SumAdder implements Reducer<Integer> { |
| public Integer apply(Integer currentAgg, Integer newValue) { |
| return currentAgg + newValue; |
| } |
| } |
| |
| public class SumSubtractor implements Reducer<Integer> { |
| public Integer apply(Integer currentAgg, Integer oldValue) { |
| return currentAgg - oldValue; |
| } |
| } |
| </code></pre> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via |
| <a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>: |
| <pre><code> |
| KafkaStreams streams = ... // counting words |
| StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); |
| ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); |
| K key = "some-word"; |
| ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) |
| </code></pre> |
| For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#metadataForAllStreamsClients()"><code>KafkaStreams.metadataForAllStreamsClients()</code></a> to |
| query the value of the key on a parallel running instance of your Kafka Streams application. |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII |
| alphanumerics, '.', '_' and '-'. |
| The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the |
| provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Parameters:</dt> |
| <dd><code>adder</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that adds a new value to the aggregate result</dd> |
| <dd><code>subtractor</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that removed an old value from the aggregate result</dd> |
| <dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the |
| latest (rolling) aggregate for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)"> |
| <h3>reduce</h3> |
| <div class="member-signature"><span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>></span> <span class="element-name">reduce</span><wbr><span class="parameters">(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> adder, |
| <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> subtractor, |
| <a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</span></div> |
| <div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| Combining implies that the type of the aggregate result is the same as the type of the input value |
| (c.f. <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>). |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| that can be queried using the provided <code>queryableStoreName</code>. |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the |
| current aggregate (first argument) and the record's value (second argument) by adding the new record to the |
| aggregate. |
| The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> |
| and computes a new aggregate using the current aggregate (first argument) and the record's value (second |
| argument) by "removing" the "replaced" record from the aggregate. |
| If there is no current aggregate the <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> is not applied and the new aggregate will be the record's |
| value as-is. |
| Thus, <code>reduce(Reducer, Reducer, String)</code> can be used to compute aggregate functions like sum. |
| For sum, the adder and subtractor would work as follows: |
| <pre><code> |
| public class SumAdder implements Reducer<Integer> { |
| public Integer apply(Integer currentAgg, Integer newValue) { |
| return currentAgg + newValue; |
| } |
| } |
| |
| public class SumSubtractor implements Reducer<Integer> { |
| public Integer apply(Integer currentAgg, Integer oldValue) { |
| return currentAgg - oldValue; |
| } |
| } |
| </code></pre> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via |
| <a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>: |
| <pre><code> |
| KafkaStreams streams = ... // counting words |
| StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); |
| ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> localStore = streams.store(storeQueryParams); |
| K key = "some-word"; |
| ValueAndTimestamp<V> reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) |
| </code></pre> |
| For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#metadataForAllStreamsClients()"><code>KafkaStreams.metadataForAllStreamsClients()</code></a> to |
| query the value of the key on a parallel running instance of your Kafka Streams application. |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII |
| alphanumerics, '.', '_' and '-'. |
| The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the |
| provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Parameters:</dt> |
| <dd><code>adder</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that adds a new value to the aggregate result</dd> |
| <dd><code>subtractor</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that removed an old value from the aggregate result</dd> |
| <dd><code>named</code> - a <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd> |
| <dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the |
| latest (rolling) aggregate for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)"> |
| <h3>reduce</h3> |
| <div class="member-signature"><span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr><a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>></span> <span class="element-name">reduce</span><wbr><span class="parameters">(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> adder, |
| <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>> subtractor)</span></div> |
| <div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| Combining implies that the type of the aggregate result is the same as the type of the input value |
| (c.f. <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator)"><code>aggregate(Initializer, Aggregator, Aggregator)</code></a>). |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the |
| current aggregate and the record's value by adding the new record to the aggregate. |
| The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> |
| and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" |
| record from the aggregate. |
| If there is no current aggregate the <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> is not applied and the new aggregate will be the record's |
| value as-is. |
| Thus, <code>reduce(Reducer, Reducer)</code> can be used to compute aggregate functions like sum. |
| For sum, the adder and subtractor would work as follows: |
| <pre><code> |
| public class SumAdder implements Reducer<Integer> { |
| public Integer apply(Integer currentAgg, Integer newValue) { |
| return currentAgg + newValue; |
| } |
| } |
| |
| public class SumSubtractor implements Reducer<Integer> { |
| public Integer apply(Integer currentAgg, Integer oldValue) { |
| return currentAgg - oldValue; |
| } |
| } |
| </code></pre> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name |
| and "-changelog" is a fixed suffix. |
| Note that the internal store name may not be queryable through Interactive Queries. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Parameters:</dt> |
| <dd><code>adder</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that adds a new value to the aggregate result</dd> |
| <dd><code>subtractor</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that removed an old value from the aggregate result</dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the |
| latest (rolling) aggregate for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"> |
| <h3>aggregate</h3> |
| <div class="member-signature"><span class="type-parameters"><VR></span> <span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></span> <span class="element-name">aggregate</span><wbr><span class="parameters">(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</span></div> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)"><code>combining via reduce(...)</code></a> as it, |
| for example, allows the result to have a different type than the input values. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| that can be queried using the provided <code>queryableStoreName</code>. |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to |
| provide an initial intermediate aggregation result that is used to process the first record. |
| Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the |
| current aggregate (or for the very first record using the intermediate aggregation result provided via the |
| <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> |
| and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" |
| record from the aggregate. |
| Thus, <code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code> can be used to compute aggregate functions |
| like sum. |
| For sum, the initializer, adder, and subtractor would work as follows: |
| <pre><code> |
| // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde |
| public class SumInitializer implements Initializer<Long> { |
| public Long apply() { |
| return 0L; |
| } |
| } |
| |
| public class SumAdder implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer newValue, Long aggregate) { |
| return aggregate + newValue; |
| } |
| } |
| |
| public class SumSubtractor implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer oldValue, Long aggregate) { |
| return aggregate - oldValue; |
| } |
| } |
| </code></pre> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via |
| <a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>: |
| <pre><code> |
| KafkaStreams streams = ... // counting words |
| StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); |
| ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams); |
| K key = "some-word"; |
| ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) |
| </code></pre> |
| For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#metadataForAllStreamsClients()"><code>KafkaStreams.metadataForAllStreamsClients()</code></a> to |
| query the value of the key on a parallel running instance of your Kafka Streams application. |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII |
| alphanumerics, '.', '_' and '-'. |
| The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the |
| provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Type Parameters:</dt> |
| <dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd> |
| <dt>Parameters:</dt> |
| <dd><code>initializer</code> - an <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd> |
| <dd><code>adder</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd> |
| <dd><code>subtractor</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd> |
| <dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the |
| latest (rolling) aggregate for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)"> |
| <h3>aggregate</h3> |
| <div class="member-signature"><span class="type-parameters"><VR></span> <span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></span> <span class="element-name">aggregate</span><wbr><span class="parameters">(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor, |
| <a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named, |
| <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR,<wbr><a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a><org.apache.kafka.common.utils.Bytes,<wbr>byte[]>> materialized)</span></div> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| Records with <code>null</code> key are ignored. |
| Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)"><code>combining via reduce(...)</code></a> as it, |
| for example, allows the result to have a different type than the input values. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| that can be queried using the provided <code>queryableStoreName</code>. |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to |
| provide an initial intermediate aggregation result that is used to process the first record. |
| Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the |
| current aggregate (or for the very first record using the intermediate aggregation result provided via the |
| <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> |
| and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" |
| record from the aggregate. |
| Thus, <code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code> can be used to compute aggregate functions |
| like sum. |
| For sum, the initializer, adder, and subtractor would work as follows: |
| <pre><code> |
| // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde |
| public class SumInitializer implements Initializer<Long> { |
| public Long apply() { |
| return 0L; |
| } |
| } |
| |
| public class SumAdder implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer newValue, Long aggregate) { |
| return aggregate + newValue; |
| } |
| } |
| |
| public class SumSubtractor implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer oldValue, Long aggregate) { |
| return aggregate - oldValue; |
| } |
| } |
| </code></pre> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| <p> |
| To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via |
| <a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>: |
| <pre><code> |
| KafkaStreams streams = ... // counting words |
| StoreQueryParameters<ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>>> storeQueryParams = StoreQueryParameters.fromNameAndType(queryableStoreName, QueryableStoreTypes.timestampedKeyValueStore()); |
| ReadOnlyKeyValueStore<K, ValueAndTimestamp<VR>> localStore = streams.store(storeQueryParams); |
| K key = "some-word"; |
| ValueAndTimestamp<VR> aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) |
| </code></pre> |
| For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#metadataForAllStreamsClients()"><code>KafkaStreams.metadataForAllStreamsClients()</code></a> to |
| query the value of the key on a parallel running instance of your Kafka Streams application. |
| <p> |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII |
| alphanumerics, '.', '_' and '-'. |
| The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the |
| provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Type Parameters:</dt> |
| <dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd> |
| <dt>Parameters:</dt> |
| <dd><code>initializer</code> - an <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd> |
| <dd><code>adder</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd> |
| <dd><code>subtractor</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd> |
| <dd><code>named</code> - a <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd> |
| <dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the |
| latest (rolling) aggregate for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator)"> |
| <h3>aggregate</h3> |
| <div class="member-signature"><span class="type-parameters"><VR></span> <span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></span> <span class="element-name">aggregate</span><wbr><span class="parameters">(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor)</span></div> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers. |
| Records with <code>null</code> key are ignored. |
| Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)"><code>combining via reduce(...)</code></a> as it, |
| for example, allows the result to have a different type than the input values. |
| If the result value type does not match the <a href="../StreamsConfig.html#DEFAULT_VALUE_SERDE_CLASS_CONFIG"><code>default value |
| serde</code></a> you should use <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to |
| provide an initial intermediate aggregation result that is used to process the first record. |
| Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the |
| current aggregate (or for the very first record using the intermediate aggregation result provided via the |
| <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> |
| and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" |
| record from the aggregate. |
| Thus, <code>aggregate(Initializer, Aggregator, Aggregator, String)</code> can be used to compute aggregate functions |
| like sum. |
| For sum, the initializer, adder, and subtractor would work as follows: |
| <pre><code> |
| // in this example, LongSerde.class must be set as default value serde in StreamsConfig |
| public class SumInitializer implements Initializer<Long> { |
| public Long apply() { |
| return 0L; |
| } |
| } |
| |
| public class SumAdder implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer newValue, Long aggregate) { |
| return aggregate + newValue; |
| } |
| } |
| |
| public class SumSubtractor implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer oldValue, Long aggregate) { |
| return aggregate - oldValue; |
| } |
| } |
| </code></pre> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name |
| and "-changelog" is a fixed suffix. |
| Note that the internal store name may not be queryable through Interactive Queries. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Type Parameters:</dt> |
| <dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd> |
| <dt>Parameters:</dt> |
| <dd><code>initializer</code> - a <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd> |
| <dd><code>adder</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd> |
| <dd><code>subtractor</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the |
| latest (rolling) aggregate for each key</dd> |
| </dl> |
| </section> |
| </li> |
| <li> |
| <section class="detail" id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named)"> |
| <h3>aggregate</h3> |
| <div class="member-signature"><span class="type-parameters"><VR></span> <span class="return-type"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a><<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>VR></span> <span class="element-name">aggregate</span><wbr><span class="parameters">(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a><VR> initializer, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> adder, |
| <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a><? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,<wbr>? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,<wbr>VR> subtractor, |
| <a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a> named)</span></div> |
| <div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers. |
| Records with <code>null</code> key are ignored. |
| Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)"><code>combining via reduce(...)</code></a> as it, |
| for example, allows the result to have a different type than the input values. |
| If the result value type does not match the <a href="../StreamsConfig.html#DEFAULT_VALUE_SERDE_CLASS_CONFIG"><code>default value |
| serde</code></a> you should use <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>. |
| The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view) |
| Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream. |
| <p> |
| The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to |
| provide an initial intermediate aggregation result that is used to process the first record. |
| Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the |
| current aggregate (or for the very first record using the intermediate aggregation result provided via the |
| <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate. |
| The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> |
| and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" |
| record from the aggregate. |
| Thus, <code>aggregate(Initializer, Aggregator, Aggregator, String)</code> can be used to compute aggregate functions |
| like sum. |
| For sum, the initializer, adder, and subtractor would work as follows: |
| <pre><code> |
| // in this example, LongSerde.class must be set as default value serde in StreamsConfig |
| public class SumInitializer implements Initializer<Long> { |
| public Long apply() { |
| return 0L; |
| } |
| } |
| |
| public class SumAdder implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer newValue, Long aggregate) { |
| return aggregate + newValue; |
| } |
| } |
| |
| public class SumSubtractor implements Aggregator<String, Integer, Long> { |
| public Long apply(String key, Integer oldValue, Long aggregate) { |
| return aggregate - oldValue; |
| } |
| } |
| </code></pre> |
| Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to |
| the same key. |
| The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of |
| parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for |
| <a href="../StreamsConfig.html#STATESTORE_CACHE_MAX_BYTES_CONFIG"><code>cache size</code></a>, and |
| <a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>. |
| For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. |
| The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is |
| user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter |
| <a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name |
| and "-changelog" is a fixed suffix. |
| Note that the internal store name may not be queryable through Interactive Queries. |
| |
| You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div> |
| <dl class="notes"> |
| <dt>Type Parameters:</dt> |
| <dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd> |
| <dt>Parameters:</dt> |
| <dd><code>initializer</code> - a <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd> |
| <dd><code>adder</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd> |
| <dd><code>subtractor</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd> |
| <dd><code>named</code> - a <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd> |
| <dt>Returns:</dt> |
| <dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the |
| latest (rolling) aggregate for each key</dd> |
| </dl> |
| </section> |
| </li> |
| </ul> |
| </section> |
| </li> |
| </ul> |
| </section> |
| <!-- ========= END OF CLASS DATA ========= --> |
| </main> |
| </div> |
| </div> |
| </body> |
| </html> |