blob: 8f409414d956b921abfaba1894521211e7f37519 [file] [log] [blame]
<!DOCTYPE HTML>
<!-- NewPage -->
<html lang="en">
<head>
<!-- Generated by javadoc -->
<title>KGroupedTable (kafka 2.8.1 API)</title>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<link rel="stylesheet" type="text/css" href="../../../../../stylesheet.css" title="Style">
<link rel="stylesheet" type="text/css" href="../../../../../jquery/jquery-ui.css" title="Style">
<script type="text/javascript" src="../../../../../script.js"></script>
<script type="text/javascript" src="../../../../../jquery/jszip/dist/jszip.min.js"></script>
<script type="text/javascript" src="../../../../../jquery/jszip-utils/dist/jszip-utils.min.js"></script>
<!--[if IE]>
<script type="text/javascript" src="../../../../../jquery/jszip-utils/dist/jszip-utils-ie.min.js"></script>
<![endif]-->
<script type="text/javascript" src="../../../../../jquery/jquery-3.3.1.js"></script>
<script type="text/javascript" src="../../../../../jquery/jquery-migrate-3.0.1.js"></script>
<script type="text/javascript" src="../../../../../jquery/jquery-ui.js"></script>
</head>
<body>
<script type="text/javascript"><!--
try {
if (location.href.indexOf('is-external=true') == -1) {
parent.document.title="KGroupedTable (kafka 2.8.1 API)";
}
}
catch(err) {
}
//-->
var data = {"i0":6,"i1":6,"i2":6,"i3":6,"i4":6,"i5":6,"i6":6,"i7":6,"i8":6,"i9":6,"i10":6};
var tabs = {65535:["t0","All Methods"],2:["t2","Instance Methods"],4:["t3","Abstract Methods"]};
var altColor = "altColor";
var rowColor = "rowColor";
var tableTab = "tableTab";
var activeTableTab = "activeTableTab";
var pathtoroot = "../../../../../";
var useModuleDirectories = true;
loadScripts(document, 'script');</script>
<noscript>
<div>JavaScript is disabled on your browser.</div>
</noscript>
<header role="banner">
<nav role="navigation">
<div class="fixedNav">
<!-- ========= START OF TOP NAVBAR ======= -->
<div class="topNav"><a id="navbar.top">
<!-- -->
</a>
<div class="skipNav"><a href="#skip.navbar.top" title="Skip navigation links">Skip navigation links</a></div>
<a id="navbar.top.firstrow">
<!-- -->
</a>
<ul class="navList" title="Navigation">
<li><a href="../../../../../index.html">Overview</a></li>
<li><a href="package-summary.html">Package</a></li>
<li class="navBarCell1Rev">Class</li>
<li><a href="package-tree.html">Tree</a></li>
<li><a href="../../../../../deprecated-list.html">Deprecated</a></li>
<li><a href="../../../../../index-all.html">Index</a></li>
<li><a href="../../../../../help-doc.html">Help</a></li>
</ul>
</div>
<div class="subNav">
<ul class="navList" id="allclasses_navbar_top">
<li><a href="../../../../../allclasses.html">All&nbsp;Classes</a></li>
</ul>
<ul class="navListSearch">
<li><label for="search">SEARCH:</label>
<input type="text" id="search" value="search" disabled="disabled">
<input type="reset" id="reset" value="reset" disabled="disabled">
</li>
</ul>
<div>
<script type="text/javascript"><!--
allClassesLink = document.getElementById("allclasses_navbar_top");
if(window==top) {
allClassesLink.style.display = "block";
}
else {
allClassesLink.style.display = "none";
}
//-->
</script>
<noscript>
<div>JavaScript is disabled on your browser.</div>
</noscript>
</div>
<div>
<ul class="subNavList">
<li>Summary:&nbsp;</li>
<li>Nested&nbsp;|&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.summary">Method</a></li>
</ul>
<ul class="subNavList">
<li>Detail:&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.detail">Method</a></li>
</ul>
</div>
<a id="skip.navbar.top">
<!-- -->
</a></div>
<!-- ========= END OF TOP NAVBAR ========= -->
</div>
<div class="navPadding">&nbsp;</div>
<script type="text/javascript"><!--
$('.navPadding').css('padding-top', $('.fixedNav').css("height"));
//-->
</script>
</nav>
</header>
<!-- ======== START OF CLASS DATA ======== -->
<main role="main">
<div class="header">
<div class="subTitle"><span class="packageLabelInType">Package</span>&nbsp;<a href="package-summary.html">org.apache.kafka.streams.kstream</a></div>
<h2 title="Interface KGroupedTable" class="title">Interface KGroupedTable&lt;K,&#8203;V&gt;</h2>
</div>
<div class="contentContainer">
<div class="description">
<ul class="blockList">
<li class="blockList">
<dl>
<dt><span class="paramLabel">Type Parameters:</span></dt>
<dd><code>K</code> - Type of keys</dd>
<dd><code>V</code> - Type of values</dd>
</dl>
<hr>
<pre>public interface <span class="typeNameLabel">KGroupedTable&lt;K,&#8203;V&gt;</span></pre>
<div class="block"><code>KGroupedTable</code> is an abstraction of a <i>re-grouped changelog stream</i> from a primary-keyed table,
usually on a different grouping key than the original primary key.
<p>
It is an intermediate representation after a re-grouping of a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> before an aggregation is applied to the
new partitions resulting in a new <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
<p>
A <code>KGroupedTable</code> must be obtained from a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> via <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>groupBy(...)</code></a>.</div>
<dl>
<dt><span class="seeLabel">See Also:</span></dt>
<dd><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd>
</dl>
</li>
</ul>
</div>
<div class="summary">
<ul class="blockList">
<li class="blockList">
<!-- ========== METHOD SUMMARY =========== -->
<section role="region">
<ul class="blockList">
<li class="blockList"><a id="method.summary">
<!-- -->
</a>
<h3>Method Summary</h3>
<table class="memberSummary">
<caption><span id="t0" class="activeTableTab"><span>All Methods</span><span class="tabEnd">&nbsp;</span></span><span id="t2" class="tableTab"><span><a href="javascript:show(2);">Instance Methods</a></span><span class="tabEnd">&nbsp;</span></span><span id="t3" class="tableTab"><span><a href="javascript:show(4);">Abstract Methods</a></span><span class="tabEnd">&nbsp;</span></span></caption>
<tr>
<th class="colFirst" scope="col">Modifier and Type</th>
<th class="colSecond" scope="col">Method</th>
<th class="colLast" scope="col">Description</th>
</tr>
<tr id="i0" class="altColor">
<td class="colFirst"><code>&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator)">aggregate</a></span>&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor)</code></th>
<td class="colLast">
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers.</div>
</td>
</tr>
<tr id="i1" class="rowColor">
<td class="colFirst"><code>&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)">aggregate</a></span>&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</code></th>
<td class="colLast">
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i2" class="altColor">
<td class="colFirst"><code>&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named)">aggregate</a></span>&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor,
<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named)</code></th>
<td class="colLast">
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers.</div>
</td>
</tr>
<tr id="i3" class="rowColor">
<td class="colFirst"><code>&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)">aggregate</a></span>&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor,
<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</code></th>
<td class="colLast">
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i4" class="altColor">
<td class="colFirst"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#count()">count</a></span>()</code></th>
<td class="colLast">
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i5" class="rowColor">
<td class="colFirst"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#count(org.apache.kafka.streams.kstream.Materialized)">count</a></span>&#8203;(<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</code></th>
<td class="colLast">
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i6" class="altColor">
<td class="colFirst"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#count(org.apache.kafka.streams.kstream.Named)">count</a></span>&#8203;(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named)</code></th>
<td class="colLast">
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i7" class="rowColor">
<td class="colFirst"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#count(org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)">count</a></span>&#8203;(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</code></th>
<td class="colLast">
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i8" class="altColor">
<td class="colFirst"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)">reduce</a></span>&#8203;(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;adder,
<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;subtractor)</code></th>
<td class="colLast">
<div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i9" class="rowColor">
<td class="colFirst"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)">reduce</a></span>&#8203;(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;adder,
<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;subtractor,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</code></th>
<td class="colLast">
<div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
<tr id="i10" class="altColor">
<td class="colFirst"><code><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;</code></td>
<th class="colSecond" scope="row"><code><span class="memberNameLink"><a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)">reduce</a></span>&#8203;(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;adder,
<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;subtractor,
<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</code></th>
<td class="colLast">
<div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.</div>
</td>
</tr>
</table>
</li>
</ul>
</section>
</li>
</ul>
</div>
<div class="details">
<ul class="blockList">
<li class="blockList">
<!-- ============ METHOD DETAIL ========== -->
<section role="region">
<ul class="blockList">
<li class="blockList"><a id="method.detail">
<!-- -->
</a>
<h3>Method Detail</h3>
<a id="count(org.apache.kafka.streams.kstream.Materialized)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>count</h4>
<pre class="methodSignature"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;&nbsp;count&#8203;(<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</pre>
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
that can be queried using the provided <code>queryableStoreName</code>.
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via
<a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>:
<pre><code>
KafkaStreams streams = ... // counting words
ReadOnlyKeyValueStore&lt;K, ValueAndTimestamp&lt;Long&gt;&gt; localStore = streams.store(queryableStoreName, QueryableStoreTypes.&lt;K, ValueAndTimestamp&lt;Long&gt;&gt; timestampedKeyValueStore());
K key = "some-word";
ValueAndTimestamp&lt;Long&gt; countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
</code></pre>
For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#allMetadata()"><code>KafkaStreams.allMetadata()</code></a> to
query the value of the key on a parallel running instance of your Kafka Streams application.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the
provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink"><code>Long</code></a> values that
represent the latest (rolling) count (i.e., number of records) for each key</dd>
</dl>
</li>
</ul>
<a id="count(org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>count</h4>
<pre class="methodSignature"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;&nbsp;count&#8203;(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</pre>
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
that can be queried using the provided <code>queryableStoreName</code>.
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via
<a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>:
<pre><code>
KafkaStreams streams = ... // counting words
ReadOnlyKeyValueStore&lt;K, ValueAndTimestamp&lt;Long&gt;&gt; localStore = streams.store(queryableStoreName, QueryableStoreTypes.&lt;K, ValueAndTimestamp&lt;Long&gt;&gt; timestampedKeyValueStore());
K key = "some-word";
ValueAndTimestamp&lt;Long&gt; countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
</code></pre>
For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#allMetadata()"><code>KafkaStreams.allMetadata()</code></a> to
query the value of the key on a parallel running instance of your Kafka Streams application.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the
provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>named</code> - the <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd>
<dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink"><code>Long</code></a> values that
represent the latest (rolling) count (i.e., number of records) for each key</dd>
</dl>
</li>
</ul>
<a id="count()">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>count</h4>
<pre class="methodSignature"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;&nbsp;count()</pre>
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink"><code>Long</code></a> values that
represent the latest (rolling) count (i.e., number of records) for each key</dd>
</dl>
</li>
</ul>
<a id="count(org.apache.kafka.streams.kstream.Named)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>count</h4>
<pre class="methodSignature"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink">Long</a>&gt;&nbsp;count&#8203;(<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named)</pre>
<div class="block">Count number of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to
the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>named</code> - the <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys and <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Long.html?is-external=true" title="class or interface in java.lang" class="externalLink"><code>Long</code></a> values that
represent the latest (rolling) count (i.e., number of records) for each key</dd>
</dl>
</li>
</ul>
<a id="reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>reduce</h4>
<pre class="methodSignature"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;reduce&#8203;(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;adder,
<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;subtractor,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</pre>
<div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
Combining implies that the type of the aggregate result is the same as the type of the input value
(c.f. <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>).
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
that can be queried using the provided <code>queryableStoreName</code>.
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the
current aggregate (first argument) and the record's value (second argument) by adding the new record to the
aggregate.
The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>
and computes a new aggregate using the current aggregate (first argument) and the record's value (second
argument) by "removing" the "replaced" record from the aggregate.
If there is no current aggregate the <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> is not applied and the new aggregate will be the record's
value as-is.
Thus, <code>reduce(Reducer, Reducer, String)</code> can be used to compute aggregate functions like sum.
For sum, the adder and subtractor would work as follows:
<pre><code>
public class SumAdder implements Reducer&lt;Integer&gt; {
public Integer apply(Integer currentAgg, Integer newValue) {
return currentAgg + newValue;
}
}
public class SumSubtractor implements Reducer&lt;Integer&gt; {
public Integer apply(Integer currentAgg, Integer oldValue) {
return currentAgg - oldValue;
}
}
</code></pre>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via
<a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>:
<pre><code>
KafkaStreams streams = ... // counting words
ReadOnlyKeyValueStore&lt;K, ValueAndTimestamp&lt;V&gt;&gt; localStore = streams.store(queryableStoreName, QueryableStoreTypes.&lt;K, ValueAndTimestamp&lt;V&gt;&gt; timestampedKeyValueStore());
K key = "some-word";
ValueAndTimestamp&lt;V&gt; reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
</code></pre>
For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#allMetadata()"><code>KafkaStreams.allMetadata()</code></a> to
query the value of the key on a parallel running instance of your Kafka Streams application.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the
provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>adder</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that adds a new value to the aggregate result</dd>
<dd><code>subtractor</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that removed an old value from the aggregate result</dd>
<dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key</dd>
</dl>
</li>
</ul>
<a id="reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>reduce</h4>
<pre class="methodSignature"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;reduce&#8203;(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;adder,
<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;subtractor,
<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</pre>
<div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
Combining implies that the type of the aggregate result is the same as the type of the input value
(c.f. <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>).
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
that can be queried using the provided <code>queryableStoreName</code>.
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the
current aggregate (first argument) and the record's value (second argument) by adding the new record to the
aggregate.
The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>
and computes a new aggregate using the current aggregate (first argument) and the record's value (second
argument) by "removing" the "replaced" record from the aggregate.
If there is no current aggregate the <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> is not applied and the new aggregate will be the record's
value as-is.
Thus, <code>reduce(Reducer, Reducer, String)</code> can be used to compute aggregate functions like sum.
For sum, the adder and subtractor would work as follows:
<pre><code>
public class SumAdder implements Reducer&lt;Integer&gt; {
public Integer apply(Integer currentAgg, Integer newValue) {
return currentAgg + newValue;
}
}
public class SumSubtractor implements Reducer&lt;Integer&gt; {
public Integer apply(Integer currentAgg, Integer oldValue) {
return currentAgg - oldValue;
}
}
</code></pre>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via
<a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>:
<pre><code>
KafkaStreams streams = ... // counting words
ReadOnlyKeyValueStore&lt;K, ValueAndTimestamp&lt;V&gt;&gt; localStore = streams.store(queryableStoreName, QueryableStoreTypes.&lt;K, ValueAndTimestamp&lt;V&gt;&gt; timestampedKeyValueStore());
K key = "some-word";
ValueAndTimestamp&lt;V&gt; reduceForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
</code></pre>
For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#allMetadata()"><code>KafkaStreams.allMetadata()</code></a> to
query the value of the key on a parallel running instance of your Kafka Streams application.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the
provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>adder</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that adds a new value to the aggregate result</dd>
<dd><code>subtractor</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that removed an old value from the aggregate result</dd>
<dd><code>named</code> - a <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd>
<dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key</dd>
</dl>
</li>
</ul>
<a id="reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>reduce</h4>
<pre class="methodSignature"><a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;reduce&#8203;(<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;adder,
<a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream">Reducer</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>&gt;&nbsp;subtractor)</pre>
<div class="block">Combine the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
Combining implies that the type of the aggregate result is the same as the type of the input value
(c.f. <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator)"><code>aggregate(Initializer, Aggregator, Aggregator)</code></a>).
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the
current aggregate and the record's value by adding the new record to the aggregate.
The specified <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
If there is no current aggregate the <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> is not applied and the new aggregate will be the record's
value as-is.
Thus, <code>reduce(Reducer, Reducer)</code> can be used to compute aggregate functions like sum.
For sum, the adder and subtractor would work as follows:
<pre><code>
public class SumAdder implements Reducer&lt;Integer&gt; {
public Integer apply(Integer currentAgg, Integer newValue) {
return currentAgg + newValue;
}
}
public class SumSubtractor implements Reducer&lt;Integer&gt; {
public Integer apply(Integer currentAgg, Integer oldValue) {
return currentAgg - oldValue;
}
}
</code></pre>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>adder</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that adds a new value to the aggregate result</dd>
<dd><code>subtractor</code> - a <a href="Reducer.html" title="interface in org.apache.kafka.streams.kstream"><code>Reducer</code></a> that removed an old value from the aggregate result</dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key</dd>
</dl>
</li>
</ul>
<a id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>aggregate</h4>
<pre class="methodSignature">&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;&nbsp;aggregate&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</pre>
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)"><code>combining via reduce(...)</code></a> as it,
for example, allows the result to have a different type than the input values.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
that can be queried using the provided <code>queryableStoreName</code>.
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, <code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code> can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
<pre><code>
// in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
public class SumInitializer implements Initializer&lt;Long&gt; {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
</code></pre>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via
<a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>:
<pre><code>
KafkaStreams streams = ... // counting words
ReadOnlyKeyValueStore&lt;K, ValueAndTimestamp&lt;VR&gt;&gt; localStore = streams.store(queryableStoreName, QueryableStoreTypes.&lt;K, ValueAndTimestamp&lt;VR&gt;&gt; timestampedKeyValueStore());
K key = "some-word";
ValueAndTimestamp&lt;VR&gt; aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
</code></pre>
For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#allMetadata()"><code>KafkaStreams.allMetadata()</code></a> to
query the value of the key on a parallel running instance of your Kafka Streams application.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the
provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Type Parameters:</span></dt>
<dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>initializer</code> - an <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd>
<dd><code>adder</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd>
<dd><code>subtractor</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd>
<dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key</dd>
</dl>
</li>
</ul>
<a id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Materialized)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>aggregate</h4>
<pre class="methodSignature">&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;&nbsp;aggregate&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor,
<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named,
<a href="Materialized.html" title="class in org.apache.kafka.streams.kstream">Materialized</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR,&#8203;<a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state">KeyValueStore</a>&lt;org.apache.kafka.common.utils.Bytes,&#8203;byte[]&gt;&gt;&nbsp;materialized)</pre>
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
Records with <code>null</code> key are ignored.
Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Materialized)"><code>combining via reduce(...)</code></a> as it,
for example, allows the result to have a different type than the input values.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
that can be queried using the provided <code>queryableStoreName</code>.
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, <code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code> can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
<pre><code>
// in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
public class SumInitializer implements Initializer&lt;Long&gt; {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
</code></pre>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
<p>
To query the local <a href="../state/ReadOnlyKeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>ReadOnlyKeyValueStore</code></a> it must be obtained via
<a href="../KafkaStreams.html#store(org.apache.kafka.streams.StoreQueryParameters)"><code>KafkaStreams#store(...)</code></a>:
<pre><code>
KafkaStreams streams = ... // counting words
ReadOnlyKeyValueStore&lt;K, ValueAndTimestamp&lt;VR&gt;&gt; localStore = streams.store(queryableStoreName, QueryableStoreTypes.&lt;K, ValueAndTimestamp&lt;VR&gt;&gt; timestampedKeyValueStore());
K key = "some-word";
ValueAndTimestamp&lt;VR&gt; aggregateForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
</code></pre>
For non-local keys, a custom RPC mechanism must be implemented using <a href="../KafkaStreams.html#allMetadata()"><code>KafkaStreams.allMetadata()</code></a> to
query the value of the key on a parallel running instance of your Kafka Streams application.
<p>
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII
alphanumerics, '.', '_' and '-'.
The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "storeName" is the
provide store name defined in <code>Materialized</code>, and "-changelog" is a fixed suffix.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Type Parameters:</span></dt>
<dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>initializer</code> - an <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd>
<dd><code>adder</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd>
<dd><code>subtractor</code> - an <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd>
<dd><code>named</code> - a <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd>
<dd><code>materialized</code> - the instance of <a href="Materialized.html" title="class in org.apache.kafka.streams.kstream"><code>Materialized</code></a> used to materialize the state store. Cannot be <code>null</code></dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key</dd>
</dl>
</li>
</ul>
<a id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator)">
<!-- -->
</a>
<ul class="blockList">
<li class="blockList">
<h4>aggregate</h4>
<pre class="methodSignature">&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;&nbsp;aggregate&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor)</pre>
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers.
Records with <code>null</code> key are ignored.
Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)"><code>combining via reduce(...)</code></a> as it,
for example, allows the result to have a different type than the input values.
If the result value type does not match the <a href="../StreamsConfig.html#DEFAULT_VALUE_SERDE_CLASS_CONFIG"><code>default value
serde</code></a> you should use <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, <code>aggregate(Initializer, Aggregator, Aggregator, String)</code> can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
<pre><code>
// in this example, LongSerde.class must be set as default value serde in StreamsConfig
public class SumInitializer implements Initializer&lt;Long&gt; {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
</code></pre>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Type Parameters:</span></dt>
<dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>initializer</code> - a <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd>
<dd><code>adder</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd>
<dd><code>subtractor</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key</dd>
</dl>
</li>
</ul>
<a id="aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Named)">
<!-- -->
</a>
<ul class="blockListLast">
<li class="blockList">
<h4>aggregate</h4>
<pre class="methodSignature">&lt;VR&gt;&nbsp;<a href="KTable.html" title="interface in org.apache.kafka.streams.kstream">KTable</a>&lt;<a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;VR&gt;&nbsp;aggregate&#8203;(<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream">Initializer</a>&lt;VR&gt;&nbsp;initializer,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;adder,
<a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream">Aggregator</a>&lt;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">K</a>,&#8203;? super <a href="KGroupedTable.html" title="type parameter in KGroupedTable">V</a>,&#8203;VR&gt;&nbsp;subtractor,
<a href="Named.html" title="class in org.apache.kafka.streams.kstream">Named</a>&nbsp;named)</pre>
<div class="block">Aggregate the value of records of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that got <a href="KTable.html#groupBy(org.apache.kafka.streams.kstream.KeyValueMapper)"><code>mapped</code></a> to the same key into a new instance of <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> using default serializers and deserializers.
Records with <code>null</code> key are ignored.
Aggregating is a generalization of <a href="#reduce(org.apache.kafka.streams.kstream.Reducer,org.apache.kafka.streams.kstream.Reducer)"><code>combining via reduce(...)</code></a> as it,
for example, allows the result to have a different type than the input values.
If the result value type does not match the <a href="../StreamsConfig.html#DEFAULT_VALUE_SERDE_CLASS_CONFIG"><code>default value
serde</code></a> you should use <a href="#aggregate(org.apache.kafka.streams.kstream.Initializer,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Aggregator,org.apache.kafka.streams.kstream.Materialized)"><code>aggregate(Initializer, Aggregator, Aggregator, Materialized)</code></a>.
The result is written into a local <a href="../state/KeyValueStore.html" title="interface in org.apache.kafka.streams.state"><code>KeyValueStore</code></a> (which is basically an ever-updating materialized view)
Furthermore, updates to the store are sent downstream into a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> changelog stream.
<p>
The specified <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> is applied once directly before the first input record is processed to
provide an initial intermediate aggregation result that is used to process the first record.
Each update to the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> results in a two step update of the result <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>adder</code></a> is applied for each update record and computes a new aggregate using the
current aggregate (or for the very first record using the intermediate aggregation result provided via the
<a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a>) and the record's value by adding the new record to the aggregate.
The specified <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>subtractor</code></a> is applied for each "replaced" record of the original <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a>
and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced"
record from the aggregate.
Thus, <code>aggregate(Initializer, Aggregator, Aggregator, String)</code> can be used to compute aggregate functions
like sum.
For sum, the initializer, adder, and subtractor would work as follows:
<pre><code>
// in this example, LongSerde.class must be set as default value serde in StreamsConfig
public class SumInitializer implements Initializer&lt;Long&gt; {
public Long apply() {
return 0L;
}
}
public class SumAdder implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer newValue, Long aggregate) {
return aggregate + newValue;
}
}
public class SumSubtractor implements Aggregator&lt;String, Integer, Long&gt; {
public Long apply(String key, Integer oldValue, Long aggregate) {
return aggregate - oldValue;
}
}
</code></pre>
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
the same key.
The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
parallel running Kafka Streams instances, and the <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>configuration</code></a> parameters for
<a href="../StreamsConfig.html#CACHE_MAX_BYTES_BUFFERING_CONFIG"><code>cache size</code></a>, and
<a href="../StreamsConfig.html#COMMIT_INTERVAL_MS_CONFIG"><code>commit interval</code></a>.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
user-specified in <a href="../StreamsConfig.html" title="class in org.apache.kafka.streams"><code>StreamsConfig</code></a> via parameter
<a href="../StreamsConfig.html#APPLICATION_ID_CONFIG"><code>APPLICATION_ID_CONFIG</code></a>, "internalStoreName" is an internal name
and "-changelog" is a fixed suffix.
Note that the internal store name may not be queryable through Interactive Queries.
You can retrieve all generated internal topic names via <a href="../Topology.html#describe()"><code>Topology.describe()</code></a>.</div>
<dl>
<dt><span class="paramLabel">Type Parameters:</span></dt>
<dd><code>VR</code> - the value type of the aggregated <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a></dd>
<dt><span class="paramLabel">Parameters:</span></dt>
<dd><code>initializer</code> - a <a href="Initializer.html" title="interface in org.apache.kafka.streams.kstream"><code>Initializer</code></a> that provides an initial aggregate result value</dd>
<dd><code>adder</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that adds a new record to the aggregate result</dd>
<dd><code>subtractor</code> - a <a href="Aggregator.html" title="interface in org.apache.kafka.streams.kstream"><code>Aggregator</code></a> that removed an old record from the aggregate result</dd>
<dd><code>named</code> - a <a href="Named.html" title="class in org.apache.kafka.streams.kstream"><code>Named</code></a> config used to name the processor in the topology</dd>
<dt><span class="returnLabel">Returns:</span></dt>
<dd>a <a href="KTable.html" title="interface in org.apache.kafka.streams.kstream"><code>KTable</code></a> that contains "update" records with unmodified keys, and values that represent the
latest (rolling) aggregate for each key</dd>
</dl>
</li>
</ul>
</li>
</ul>
</section>
</li>
</ul>
</div>
</div>
</main>
<!-- ========= END OF CLASS DATA ========= -->
<footer role="contentinfo">
<nav role="navigation">
<!-- ======= START OF BOTTOM NAVBAR ====== -->
<div class="bottomNav"><a id="navbar.bottom">
<!-- -->
</a>
<div class="skipNav"><a href="#skip.navbar.bottom" title="Skip navigation links">Skip navigation links</a></div>
<a id="navbar.bottom.firstrow">
<!-- -->
</a>
<ul class="navList" title="Navigation">
<li><a href="../../../../../index.html">Overview</a></li>
<li><a href="package-summary.html">Package</a></li>
<li class="navBarCell1Rev">Class</li>
<li><a href="package-tree.html">Tree</a></li>
<li><a href="../../../../../deprecated-list.html">Deprecated</a></li>
<li><a href="../../../../../index-all.html">Index</a></li>
<li><a href="../../../../../help-doc.html">Help</a></li>
</ul>
</div>
<div class="subNav">
<ul class="navList" id="allclasses_navbar_bottom">
<li><a href="../../../../../allclasses.html">All&nbsp;Classes</a></li>
</ul>
<div>
<script type="text/javascript"><!--
allClassesLink = document.getElementById("allclasses_navbar_bottom");
if(window==top) {
allClassesLink.style.display = "block";
}
else {
allClassesLink.style.display = "none";
}
//-->
</script>
<noscript>
<div>JavaScript is disabled on your browser.</div>
</noscript>
</div>
<div>
<ul class="subNavList">
<li>Summary:&nbsp;</li>
<li>Nested&nbsp;|&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.summary">Method</a></li>
</ul>
<ul class="subNavList">
<li>Detail:&nbsp;</li>
<li>Field&nbsp;|&nbsp;</li>
<li>Constr&nbsp;|&nbsp;</li>
<li><a href="#method.detail">Method</a></li>
</ul>
</div>
<a id="skip.navbar.bottom">
<!-- -->
</a></div>
<!-- ======== END OF BOTTOM NAVBAR ======= -->
</nav>
</footer>
</body>
</html>