blob: 1d27360afb11a4b1474533b2d95381a70e0e48ac [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="interactive-queries">
<span id="streams-developer-guide-interactive-queries"></span><h1 class="docs-title">Interactive Queries<a class="headerlink" href="#interactive-queries" title="Permalink to this headline"></a></h1>
<p>Interactive queries allow you to leverage the state of your application from outside your application. The Kafka Streams enables your applications to be queryable.</p>
<div class="contents local topic" id="table-of-contents">
<p class="topic-title first"><b>Table of Contents</b></p>
<ul class="simple">
<li><a class="reference internal" href="#querying-local-state-stores-for-an-app-instance" id="id3">Querying local state stores for an app instance</a><ul>
<li><a class="reference internal" href="#querying-local-key-value-stores" id="id4">Querying local key-value stores</a></li>
<li><a class="reference internal" href="#querying-local-window-stores" id="id5">Querying local window stores</a></li>
<li><a class="reference internal" href="#querying-local-custom-state-stores" id="id6">Querying local custom state stores</a></li>
</ul>
</li>
<li><a class="reference internal" href="#querying-remote-state-stores-for-the-entire-app" id="id7">Querying remote state stores for the entire app</a><ul>
<li><a class="reference internal" href="#adding-an-rpc-layer-to-your-application" id="id8">Adding an RPC layer to your application</a></li>
<li><a class="reference internal" href="#exposing-the-rpc-endpoints-of-your-application" id="id9">Exposing the RPC endpoints of your application</a></li>
<li><a class="reference internal" href="#discovering-and-accessing-application-instances-and-their-local-state-stores" id="id10">Discovering and accessing application instances and their local state stores</a></li>
</ul>
</li>
<li><a class="reference internal" href="#demo-applications" id="id11">Demo applications</a></li>
</ul>
</div>
<p>The full state of your application is typically <a class="reference internal" href="../architecture.html#streams_architecture_state"><span class="std std-ref">split across many distributed instances of your application</span></a>, and across many state stores that are managed locally by these application instances.</p>
<div class="figure align-center">
<img class="centered" src="/{{version}}/images/streams-interactive-queries-03.png">
</div>
<p>There are local and remote components to interactively querying the state of your application.</p>
<dl class="docutils">
<dt>Local state</dt>
<dd>An application instance can query the locally managed portion of the state and directly query its own local state stores. You can use the corresponding local data in other parts of your application code, as long as it doesn&#8217;t require calling the Kafka Streams API. Querying state stores is always read-only to guarantee that the underlying state stores will never be mutated out-of-band (e.g., you cannot add new entries). State stores should only be mutated by the corresponding processor topology and the input data it operates on. For more information, see <a class="reference internal" href="#streams-developer-guide-interactive-queries-local-stores"><span class="std std-ref">Querying local state stores for an app instance</span></a>.</dd>
<dt>Remote state</dt>
<dd><p class="first">To query the full state of your application, you must connect the various fragments of the state, including:</p>
<ul class="simple">
<li>query local state stores</li>
<li>discover all running instances of your application in the network and their state stores</li>
<li>communicate with these instances over the network (e.g., an RPC layer)</li>
</ul>
<p class="last">Connecting these fragments enables communication between instances of the same app and communication from other applications for interactive queries. For more information, see <a class="reference internal" href="#streams-developer-guide-interactive-queries-discovery"><span class="std std-ref">Querying remote state stores for the entire app</span></a>.</p>
</dd>
</dl>
<p>Kafka Streams natively provides all of the required functionality for interactively querying the state of your application, except if you want to expose the full state of your application via interactive queries. To allow application instances to communicate over the network, you must add a Remote Procedure Call (RPC) layer to your application (e.g., REST API).</p>
<p>This table shows the Kafka Streams native communication support for various procedures.</p>
<table border="1" class="docutils">
<colgroup>
<col width="42%" />
<col width="27%" />
<col width="31%" />
</colgroup>
<thead valign="bottom">
<tr class="row-odd"><th class="head">Procedure</th>
<th class="head">Application instance</th>
<th class="head">Entire application</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>Query local state stores of an app instance</td>
<td>Supported</td>
<td>Supported</td>
</tr>
<tr class="row-odd"><td>Make an app instance discoverable to others</td>
<td>Supported</td>
<td>Supported</td>
</tr>
<tr class="row-even"><td>Discover all running app instances and their state stores</td>
<td>Supported</td>
<td>Supported</td>
</tr>
<tr class="row-odd"><td>Communicate with app instances over the network (RPC)</td>
<td>Supported</td>
<td>Not supported (you must configure)</td>
</tr>
</tbody>
</table>
<div class="section" id="querying-local-state-stores-for-an-app-instance">
<span id="streams-developer-guide-interactive-queries-local-stores"></span><h2><a class="toc-backref" href="#id3">Querying local state stores for an app instance</a><a class="headerlink" href="#querying-local-state-stores-for-an-app-instance" title="Permalink to this headline"></a></h2>
<p>A Kafka Streams application typically runs on multiple instances. The state that is locally available on any given instance is only a subset of the <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">application&#8217;s entire state</span></a>. Querying the local stores on an instance will only return data locally available on that particular instance.</p>
<p>The method <code class="docutils literal"><span class="pre">KafkaStreams#store(...)</span></code> finds an application instance&#8217;s local state stores by name and type.</p>
<div class="figure align-center" id="id1">
<img class="centered" src="/{{version}}/images/streams-interactive-queries-api-01.png">
<p class="caption"><span class="caption-text">Every application instance can directly query any of its local state stores.</span></p>
</div>
<p>The <em>name</em> of a state store is defined when you create the store. You can create the store explicitly by using the Processor API or implicitly by using stateful operations in the DSL.</p>
<p>The <em>type</em> of a state store is defined by <code class="docutils literal"><span class="pre">QueryableStoreType</span></code>. You can access the built-in types via the class <code class="docutils literal"><span class="pre">QueryableStoreTypes</span></code>.
Kafka Streams currently has two built-in types:</p>
<ul class="simple">
<li>A key-value store <code class="docutils literal"><span class="pre">QueryableStoreTypes#keyValueStore()</span></code>, see <a class="reference internal" href="#streams-developer-guide-interactive-queries-local-key-value-stores"><span class="std std-ref">Querying local key-value stores</span></a>.</li>
<li>A window store <code class="docutils literal"><span class="pre">QueryableStoreTypes#windowStore()</span></code>, see <a class="reference internal" href="#streams-developer-guide-interactive-queries-local-window-stores"><span class="std std-ref">Querying local window stores</span></a>.</li>
</ul>
<p>You can also <a class="reference internal" href="#streams-developer-guide-interactive-queries-custom-stores"><span class="std std-ref">implement your own QueryableStoreType</span></a> as described in section <a class="reference internal" href="#streams-developer-guide-interactive-queries-custom-stores"><span class="std std-ref">Querying local custom state stores</span></a>.</p>
<div class="admonition note">
<p><b>Note</b></p>
<p class="last">Kafka Streams materializes one state store per stream partition. This means your application will potentially manage
many underlying state stores. The API enables you to query all of the underlying stores without having to know which
partition the data is in.</p>
</div>
<div class="section" id="querying-local-key-value-stores">
<span id="streams-developer-guide-interactive-queries-local-key-value-stores"></span><h3><a class="toc-backref" href="#id4">Querying local key-value stores</a><a class="headerlink" href="#querying-local-key-value-stores" title="Permalink to this headline"></a></h3>
<p>To query a local key-value store, you must first create a topology with a key-value store. This example creates a key-value
store named &#8220;CountsKeyValueStore&#8221;. This store will hold the latest count for any word that is found on the topic &#8220;word-count-input&#8221;.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties </span> <span class="n">props</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">textLines</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Define the processing topology (here: WordCount)</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span>
<span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// Create a key-value store named &quot;CountsKeyValueStore&quot; for the all-time word counts</span>
<span class="n">groupedByWord</span><span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;CountsKeyValueStore&quot;</span><span class="o">));</span>
<span class="c1">// Start an instance of the topology</span>
<span class="n">KafkaStreams</span> <span class="n">streams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaStreams</span><span class="o">(</span><span class="n">builder</span><span class="o">,</span> <span class="n">props</span><span class="o">);</span>
<span class="n">streams</span><span class="o">.</span><span class="na">start</span><span class="o">();</span></pre></div>
</div>
<p>After the application has started, you can get access to &#8220;CountsKeyValueStore&#8221; and then query it via the <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.java">ReadOnlyKeyValueStore</a> API:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Get the key-value store CountsKeyValueStore</span>
<span class="n">ReadOnlyKeyValueStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">keyValueStore</span> <span class="o">=</span>
<span class="n">streams</span><span class="o">.</span><span class="na">store</span><span class="o">(</span><span class="s">&quot;CountsKeyValueStore&quot;</span><span class="o">,</span> <span class="n">QueryableStoreTypes</span><span class="o">.</span><span class="na">keyValueStore</span><span class="o">());</span>
<span class="c1">// Get value by key</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;count for hello:&quot;</span> <span class="o">+</span> <span class="n">keyValueStore</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;hello&quot;</span><span class="o">));</span>
<span class="c1">// Get the values for a range of keys available in this application instance</span>
<span class="n">KeyValueIterator</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">range</span> <span class="o">=</span> <span class="n">keyValueStore</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="s">&quot;all&quot;</span><span class="o">,</span> <span class="s">&quot;streams&quot;</span><span class="o">);</span>
<span class="k">while</span> <span class="o">(</span><span class="n">range</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">next</span> <span class="o">=</span> <span class="n">range</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;count for &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">key</span> <span class="o">+</span> <span class="s">&quot;: &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">value</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Get the values for all of the keys available in this application instance</span>
<span class="n">KeyValueIterator</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">range</span> <span class="o">=</span> <span class="n">keyValueStore</span><span class="o">.</span><span class="na">all</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(</span><span class="n">range</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">next</span> <span class="o">=</span> <span class="n">range</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;count for &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">key</span> <span class="o">+</span> <span class="s">&quot;: &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">value</span><span class="o">);</span>
<span class="o">}</span></pre></div>
</div>
<p>You can also materialize the results of stateless operators by using the overloaded methods that take a <code class="docutils literal"><span class="pre">queryableStoreName</span></code>
as shown in the example below:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span>
<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">regionCounts</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// materialize the result of filtering corresponding to odd numbers</span>
<span class="c1">// the &quot;queryableStoreName&quot; can be subsequently queried.</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">oddCounts</span> <span class="o">=</span> <span class="n">numberLines</span><span class="o">.</span><span class="na">filter</span><span class="o">((</span><span class="n">region</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">(</span><span class="n">count</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">!=</span> <span class="mi">0</span><span class="o">),</span>
<span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;queryableStoreName&quot;</span><span class="o">));</span>
<span class="c1">// do not materialize the result of filtering corresponding to even numbers</span>
<span class="c1">// this means that these results will not be materialized and cannot be queried.</span>
<span class="n">KTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">oddCounts</span> <span class="o">=</span> <span class="n">numberLines</span><span class="o">.</span><span class="na">filter</span><span class="o">((</span><span class="n">region</span><span class="o">,</span> <span class="n">count</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">(</span><span class="n">count</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">==</span> <span class="mi">0</span><span class="o">));</span></pre></div>
</div>
</div>
<div class="section" id="querying-local-window-stores">
<span id="streams-developer-guide-interactive-queries-local-window-stores"></span><h3><a class="toc-backref" href="#id5">Querying local window stores</a><a class="headerlink" href="#querying-local-window-stores" title="Permalink to this headline"></a></h3>
<p>A window store will potentially have many results for any given key because the key can be present in multiple windows.
However, there is only one result per window for a given key.</p>
<p>To query a local window store, you must first create a topology with a window store. This example creates a window store
named &#8220;CountsWindowStore&#8221; that contains the counts for words in 1-minute windows.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span>
<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">textLines</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Define the processing topology (here: WordCount)</span>
<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span>
<span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// Create a window state store named &quot;CountsWindowStore&quot; that contains the word counts for every minute</span>
<span class="n">groupedByWord</span><span class="o">.</span><span class="na">windowedBy</span><span class="o">(</span><span class="n">TimeWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(<span class="n">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">60</span><span class="o">)))</span>
<span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">WindowStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;CountsWindowStore&quot;</span><span class="o">));</span></pre></div>
</div>
<p>After the application has started, you can get access to &#8220;CountsWindowStore&#8221; and then query it via the <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java">ReadOnlyWindowStore</a> API:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Get the window store named &quot;CountsWindowStore&quot;</span>
<span class="n">ReadOnlyWindowStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">windowStore</span> <span class="o">=</span>
<span class="n">streams</span><span class="o">.</span><span class="na">store</span><span class="o">(</span><span class="s">&quot;CountsWindowStore&quot;</span><span class="o">,</span> <span class="n">QueryableStoreTypes</span><span class="o">.</span><span class="na">windowStore</span><span class="o">());</span>
<span class="c1">// Fetch values for the key &quot;world&quot; for all of the windows available in this application instance.</span>
<span class="c1">// To get *all* available windows we fetch windows from the beginning of time until now.</span>
<span class="kt">Instant</span> <span class="n">timeFrom</span> <span class="o">=</span> <span class="na">Instant</span><span class="o">.</span><span class="na">ofEpochMilli<span class="o">(</span><span class="mi">0</span><span class="o">);</span> <span class="c1">// beginning of time = oldest available</span>
<span class="kt">Instant</span> <span class="n">timeTo</span> <span class="o">=</span> <span class="n">Instant</span><span class="o">.</span><span class="na">now</span><span class="o">();</span> <span class="c1">// now (in processing-time)</span>
<span class="n">WindowStoreIterator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">iterator</span> <span class="o">=</span> <span class="n">windowStore</span><span class="o">.</span><span class="na">fetch</span><span class="o">(</span><span class="s">&quot;world&quot;</span><span class="o">,</span> <span class="n">timeFrom</span><span class="o">,</span> <span class="n">timeTo</span><span class="o">);</span>
<span class="k">while</span> <span class="o">(</span><span class="n">iterator</span><span class="o">.</span><span class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">next</span> <span class="o">=</span> <span class="n">iterator</span><span class="o">.</span><span class="na">next</span><span class="o">();</span>
<span class="kt">long</span> <span class="n">windowTimestamp</span> <span class="o">=</span> <span class="n">next</span><span class="o">.</span><span class="na">key</span><span class="o">;</span>
<span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;Count of &#39;world&#39; @ time &quot;</span> <span class="o">+</span> <span class="n">windowTimestamp</span> <span class="o">+</span> <span class="s">&quot; is &quot;</span> <span class="o">+</span> <span class="n">next</span><span class="o">.</span><span class="na">value</span><span class="o">);</span>
<span class="o">}</span></pre></div>
</div>
</div>
<div class="section" id="querying-local-custom-state-stores">
<span id="streams-developer-guide-interactive-queries-custom-stores"></span><h3><a class="toc-backref" href="#id6">Querying local custom state stores</a><a class="headerlink" href="#querying-local-custom-state-stores" title="Permalink to this headline"></a></h3>
<div class="admonition note">
<p><b>Note</b></p>
<p class="last">Only the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> supports custom state stores.</p>
</div>
<p>Before querying the custom state stores you must implement these interfaces:</p>
<ul class="simple">
<li>Your custom state store must implement <code class="docutils literal"><span class="pre">StateStore</span></code>.</li>
<li>You must have an interface to represent the operations available on the store.</li>
<li>You must provide an implementation of <code class="docutils literal"><span class="pre">StoreBuilder</span></code> for creating instances of your store.</li>
<li>It is recommended that you provide an interface that restricts access to read-only operations. This prevents users of this API from mutating the state of your running Kafka Streams application out-of-band.</li>
</ul>
<p>The class/interface hierarchy for your custom store might look something like:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">StateStore</span><span class="o">,</span> <span class="n">MyWriteableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="c1">// implementation of the actual store</span>
<span class="o">}</span>
<span class="c1">// Read-write interface for MyCustomStore</span>
<span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MyWriteableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="kd">extends</span> <span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kt">void</span> <span class="nf">write</span><span class="o">(</span><span class="n">K</span> <span class="n">Key</span><span class="o">,</span> <span class="n">V</span> <span class="n">value</span><span class="o">);</span>
<span class="o">}</span>
<span class="c1">// Read-only interface for MyCustomStore</span>
<span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="n">V</span> <span class="nf">read</span><span class="o">(</span><span class="n">K</span> <span class="n">key</span><span class="o">);</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStoreBuilder</span> <span class="kd">implements</span> <span class="n">StoreBuilder</span> <span class="o">{</span>
<span class="c1">// implementation of the supplier for MyCustomStore</span>
<span class="o">}</span></pre></div>
</div>
<p>To make this store queryable you must:</p>
<ul class="simple">
<li>Provide an implementation of <a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreType.java">QueryableStoreType</a>.</li>
<li>Provide a wrapper class that has access to all of the underlying instances of the store and is used for querying.</li>
</ul>
<p>Here is how to implement <code class="docutils literal"><span class="pre">QueryableStoreType</span></code>:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStoreType</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">QueryableStoreType</span><span class="o">&lt;</span><span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="c1">// Only accept StateStores that are of type MyCustomStore</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">accepts</span><span class="o">(</span><span class="kd">final</span> <span class="n">StateStore</span> <span class="n">stateStore</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="n">stateStore</span> <span class="n">instanceOf</span> <span class="n">MyCustomStore</span><span class="o">;</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="nf">create</span><span class="o">(</span><span class="kd">final</span> <span class="n">StateStoreProvider</span> <span class="n">storeProvider</span><span class="o">,</span> <span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="n">MyCustomStoreTypeWrapper</span><span class="o">(</span><span class="n">storeProvider</span><span class="o">,</span> <span class="n">storeName</span><span class="o">,</span> <span class="k">this</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></pre></div>
</div>
<p>A wrapper class is required because each instance of a Kafka Streams application may run multiple stream tasks and manage
multiple local instances of a particular state store. The wrapper class hides this complexity and lets you query a &#8220;logical&#8221;
state store by name without having to know about all of the underlying local instances of that state store.</p>
<p>When implementing your wrapper class you must use the
<a class="reference external" href="https://github.com/apache/kafka/blob/1.0/streams/src/main/java/org/apache/kafka/streams/state/internals/StateStoreProvider.java">StateStoreProvider</a>
interface to get access to the underlying instances of your store.
<code class="docutils literal"><span class="pre">StateStoreProvider#stores(String</span> <span class="pre">storeName,</span> <span class="pre">QueryableStoreType&lt;T&gt;</span> <span class="pre">queryableStoreType)</span></code> returns a <code class="docutils literal"><span class="pre">List</span></code> of state
stores with the given storeName and of the type as defined by <code class="docutils literal"><span class="pre">queryableStoreType</span></code>.</p>
<p>Here is an example implementation of the wrapper follows (Java 8+):</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// We strongly recommended implementing a read-only interface</span>
<span class="c1">// to restrict usage of the store to safe read operations!</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyCustomStoreTypeWrapper</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="kd">implements</span> <span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span><span class="n">V</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kd">final</span> <span class="n">QueryableStoreType</span><span class="o">&lt;</span><span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;&gt;</span> <span class="n">customStoreType</span><span class="o">;</span>
<span class="kd">private</span> <span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">;</span>
<span class="kd">private</span> <span class="kd">final</span> <span class="n">StateStoreProvider</span> <span class="n">provider</span><span class="o">;</span>
<span class="kd">public</span> <span class="nf">CustomStoreTypeWrapper</span><span class="o">(</span><span class="kd">final</span> <span class="n">StateStoreProvider</span> <span class="n">provider</span><span class="o">,</span>
<span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span>
<span class="kd">final</span> <span class="n">QueryableStoreType</span><span class="o">&lt;</span><span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;&gt;</span> <span class="n">customStoreType</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// ... assign fields ...</span>
<span class="o">}</span>
<span class="c1">// Implement a safe read method</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">V</span> <span class="nf">read</span><span class="o">(</span><span class="kd">final</span> <span class="n">K</span> <span class="n">key</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Get all the stores with storeName and of customStoreType</span>
<span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">K</span><span class="o">,</span> <span class="n">V</span><span class="o">&gt;&gt;</span> <span class="n">stores</span> <span class="o">=</span> <span class="n">provider</span><span class="o">.</span><span class="na">getStores</span><span class="o">(</span><span class="n">storeName</span><span class="o">,</span> <span class="n">customStoreType</span><span class="o">);</span>
<span class="c1">// Try and find the value for the given key</span>
<span class="kd">final</span> <span class="n">Optional</span><span class="o">&lt;</span><span class="n">V</span><span class="o">&gt;</span> <span class="n">value</span> <span class="o">=</span> <span class="n">stores</span><span class="o">.</span><span class="na">stream</span><span class="o">().</span><span class="na">filter</span><span class="o">(</span><span class="n">store</span> <span class="o">-&gt;</span> <span class="n">store</span><span class="o">.</span><span class="na">read</span><span class="o">(</span><span class="n">key</span><span class="o">)</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">).</span><span class="na">findFirst</span><span class="o">();</span>
<span class="c1">// Return the value if it exists</span>
<span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">orElse</span><span class="o">(</span><span class="kc">null</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></pre></div>
</div>
<p>You can now find and query your custom store:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span>
<span class="n">Topology</span> <span class="n">topology</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">ProcessorSupplier</span> <span class="n">processorSuppler</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Create CustomStoreSupplier for store name the-custom-store</span>
<span class="n">MyCustomStoreBuilder</span> <span class="n">customStoreBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyCustomStoreBuilder</span><span class="o">(</span><span class="s">&quot;the-custom-store&quot;</span><span class="o">)</span> <span class="c1">//...;</span>
<span class="c1">// Add the source topic</span>
<span class="n">topology</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="s">&quot;input&quot;</span><span class="o">,</span> <span class="s">&quot;inputTopic&quot;</span><span class="o">);</span>
<span class="c1">// Add a custom processor that reads from the source topic</span>
<span class="n">topology</span><span class="o">.</span><span class="na">addProcessor</span><span class="o">(</span><span class="s">&quot;the-processor&quot;</span><span class="o">,</span> <span class="n">processorSupplier</span><span class="o">,</span> <span class="s">&quot;input&quot;</span><span class="o">);</span>
<span class="c1">// Connect your custom state store to the custom processor above</span>
<span class="n">topology</span><span class="o">.</span><span class="na">addStateStore</span><span class="o">(</span><span class="n">customStoreBuilder</span><span class="o">,</span> <span class="s">&quot;the-processor&quot;</span><span class="o">);</span>
<span class="n">KafkaStreams</span> <span class="n">streams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaStreams</span><span class="o">(</span><span class="n">topology</span><span class="o">,</span> <span class="n">config</span><span class="o">);</span>
<span class="n">streams</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
<span class="c1">// Get access to the custom store</span>
<span class="n">MyReadableCustomStore</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">store</span> <span class="o">=</span> <span class="n">streams</span><span class="o">.</span><span class="na">store</span><span class="o">(</span><span class="s">&quot;the-custom-store&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="n">MyCustomStoreType</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">String</span><span class="o">&gt;());</span>
<span class="c1">// Query the store</span>
<span class="n">String</span> <span class="n">value</span> <span class="o">=</span> <span class="n">store</span><span class="o">.</span><span class="na">read</span><span class="o">(</span><span class="s">&quot;key&quot;</span><span class="o">);</span></pre></div>
</div>
</div>
</div>
<div class="section" id="querying-remote-state-stores-for-the-entire-app">
<span id="streams-developer-guide-interactive-queries-discovery"></span><h2><a class="toc-backref" href="#id7">Querying remote state stores for the entire app</a><a class="headerlink" href="#querying-remote-state-stores-for-the-entire-app" title="Permalink to this headline"></a></h2>
<p>To query remote states for the entire app, you must expose the application&#8217;s full state to other applications, including
applications that are running on different machines.</p>
<p>For example, you have a Kafka Streams application that processes user events in a multi-player video game, and you want to retrieve the latest status of each user directly and display it in a mobile app. Here are the required steps to make the full state of your application queryable:</p>
<ol class="arabic simple">
<li><a class="reference internal" href="#streams-developer-guide-interactive-queries-rpc-layer"><span class="std std-ref">Add an RPC layer to your application</span></a> so that
the instances of your application can be interacted with via the network (e.g., a REST API, Thrift, a custom protocol,
and so on). The instances must respond to interactive queries. You can follow the reference examples provided to get
started.</li>
<li><a class="reference internal" href="#streams-developer-guide-interactive-queries-expose-rpc"><span class="std std-ref">Expose the RPC endpoints</span></a> of
your application&#8217;s instances via the <code class="docutils literal"><span class="pre">application.server</span></code> configuration setting of Kafka Streams. Because RPC
endpoints must be unique within a network, each instance has its own value for this configuration setting.
This makes an application instance discoverable by other instances.</li>
<li>In the RPC layer, <a class="reference internal" href="#streams-developer-guide-interactive-queries-discover-app-instances-and-stores"><span class="std std-ref">discover remote application instances</span></a> and their state stores and <a class="reference internal" href="#streams-developer-guide-interactive-queries-local-stores"><span class="std std-ref">query locally available state stores</span></a> to make the full state of your application queryable. The remote application instances can forward queries to other app instances if a particular instance lacks the local data to respond to a query. The locally available state stores can directly respond to queries.</li>
</ol>
<div class="figure align-center" id="id2">
<img class="centered" src="/{{version}}/images/streams-interactive-queries-api-02.png">
<p class="caption"><span class="caption-text">Discover any running instances of the same application as well as the respective RPC endpoints they expose for
interactive queries</span></p>
</div>
<div class="section" id="adding-an-rpc-layer-to-your-application">
<span id="streams-developer-guide-interactive-queries-rpc-layer"></span><h3><a class="toc-backref" href="#id8">Adding an RPC layer to your application</a><a class="headerlink" href="#adding-an-rpc-layer-to-your-application" title="Permalink to this headline"></a></h3>
<p>There are many ways to add an RPC layer. The only requirements are that the RPC layer is embedded within the Kafka Streams
application and that it exposes an endpoint that other application instances and applications can connect to.</p>
</div>
<div class="section" id="exposing-the-rpc-endpoints-of-your-application">
<span id="streams-developer-guide-interactive-queries-expose-rpc"></span><h3><a class="toc-backref" href="#id9">Exposing the RPC endpoints of your application</a><a class="headerlink" href="#exposing-the-rpc-endpoints-of-your-application" title="Permalink to this headline"></a></h3>
<p>To enable remote state store discovery in a distributed Kafka Streams application, you must set the <a class="reference internal" href="config-streams.html#streams-developer-guide-required-configs"><span class="std std-ref">configuration property</span></a> in the config properties.
The <code class="docutils literal"><span class="pre">application.server</span></code> property defines a unique <code class="docutils literal"><span class="pre">host:port</span></code> pair that points to the RPC endpoint of the respective instance of a Kafka Streams application.
The value of this configuration property will vary across the instances of your application.
When this property is set, Kafka Streams will keep track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html">StreamsMetadata</a>.</p>
<div class="admonition tip">
<p><b>Tip</b></p>
<p class="last">Consider leveraging the exposed RPC endpoints of your application for further functionality, such as
piggybacking additional inter-application communication that goes beyond interactive queries.</p>
</div>
<p>This example shows how to configure and run a Kafka Streams application that supports the discovery of its state stores.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="c1">// Set the unique RPC endpoint of this application instance through which it</span>
<span class="c1">// can be interactively queried. In a real application, the value would most</span>
<span class="c1">// probably not be hardcoded but derived dynamically.</span>
<span class="n">String</span> <span class="n">rpcEndpoint</span> <span class="o">=</span> <span class="s">&quot;host1:4460&quot;</span><span class="o">;</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">APPLICATION_SERVER_CONFIG</span><span class="o">,</span> <span class="n">rpcEndpoint</span><span class="o">);</span>
<span class="c1">// ... further settings may follow here ...</span>
<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamsBuilder</span><span class="o">();</span>
<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">textLines</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">stream</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">,</span> <span class="s">&quot;word-count-input&quot;</span><span class="o">);</span>
<span class="kd">final</span> <span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedByWord</span> <span class="o">=</span> <span class="n">textLines</span>
<span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
<span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">,</span> <span class="n">Grouped</span><span class="o">.</span><span class="na">with</span><span class="o">(</span><span class="n">stringSerde</span><span class="o">,</span> <span class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// This call to `count()` creates a state store named &quot;word-count&quot;.</span>
<span class="c1">// The state store is discoverable and can be queried interactively.</span>
<span class="n">groupedByWord</span><span class="o">.</span><span class="na">count</span><span class="o">(</span><span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;</span><span class="n">as</span><span class="o">(</span><span class="s">&quot;word-count&quot;</span><span class="o">));</span>
<span class="c1">// Start an instance of the topology</span>
<span class="n">KafkaStreams</span> <span class="n">streams</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaStreams</span><span class="o">(</span><span class="n">builder</span><span class="o">,</span> <span class="n">props</span><span class="o">);</span>
<span class="n">streams</span><span class="o">.</span><span class="na">start</span><span class="o">();</span>
<span class="c1">// Then, create and start the actual RPC service for remote access to this</span>
<span class="c1">// application instance&#39;s local state stores.</span>
<span class="c1">//</span>
<span class="c1">// This service should be started on the same host and port as defined above by</span>
<span class="c1">// the property `StreamsConfig.APPLICATION_SERVER_CONFIG`. The example below is</span>
<span class="c1">// fictitious, but we provide end-to-end demo applications (such as KafkaMusicExample)</span>
<span class="c1">// that showcase how to implement such a service to get you started.</span>
<span class="n">MyRPCService</span> <span class="n">rpcService</span> <span class="o">=</span> <span class="o">...;</span>
<span class="n">rpcService</span><span class="o">.</span><span class="na">listenAt</span><span class="o">(</span><span class="n">rpcEndpoint</span><span class="o">);</span></pre></div>
</div>
</div>
<div class="section" id="discovering-and-accessing-application-instances-and-their-local-state-stores">
<span id="streams-developer-guide-interactive-queries-discover-app-instances-and-stores"></span><h3><a class="toc-backref" href="#id10">Discovering and accessing application instances and their local state stores</a><a class="headerlink" href="#discovering-and-accessing-application-instances-and-their-local-state-stores" title="Permalink to this headline"></a></h3>
<p>The following methods return <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html">StreamsMetadata</a> objects, which provide meta-information about application instances such as their RPC endpoint and locally available state stores.</p>
<ul class="simple">
<li><code class="docutils literal"><span class="pre">KafkaStreams#allMetadata()</span></code>: find all instances of this application</li>
<li><code class="docutils literal"><span class="pre">KafkaStreams#allMetadataForStore(String</span> <span class="pre">storeName)</span></code>: find those applications instances that manage local instances of the state store &#8220;storeName&#8221;</li>
<li><code class="docutils literal"><span class="pre">KafkaStreams#metadataForKey(String</span> <span class="pre">storeName,</span> <span class="pre">K</span> <span class="pre">key,</span> <span class="pre">Serializer&lt;K&gt;</span> <span class="pre">keySerializer)</span></code>: using the default stream partitioning strategy, find the one application instance that holds the data for the given key in the given state store</li>
<li><code class="docutils literal"><span class="pre">KafkaStreams#metadataForKey(String</span> <span class="pre">storeName,</span> <span class="pre">K</span> <span class="pre">key,</span> <span class="pre">StreamPartitioner&lt;K,</span> <span class="pre">?&gt;</span> <span class="pre">partitioner)</span></code>: using <code class="docutils literal"><span class="pre">partitioner</span></code>, find the one application instance that holds the data for the given key in the given state store</li>
</ul>
<div class="admonition attention">
<p class="first admonition-title">Attention</p>
<p class="last">If <code class="docutils literal"><span class="pre">application.server</span></code> is not configured for an application instance, then the above methods will not find any <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/state/StreamsMetadata.html">StreamsMetadata</a> for it.</p>
</div>
<p>For example, we can now find the <code class="docutils literal"><span class="pre">StreamsMetadata</span></code> for the state store named &#8220;word-count&#8221; that we defined in the
code example shown in the previous section:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KafkaStreams</span> <span class="n">streams</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Find all the locations of local instances of the state store named &quot;word-count&quot;</span>
<span class="n">Collection</span><span class="o">&lt;</span><span class="n">StreamsMetadata</span><span class="o">&gt;</span> <span class="n">wordCountHosts</span> <span class="o">=</span> <span class="n">streams</span><span class="o">.</span><span class="na">allMetadataForStore</span><span class="o">(</span><span class="s">&quot;word-count&quot;</span><span class="o">);</span>
<span class="c1">// For illustrative purposes, we assume using an HTTP client to talk to remote app instances.</span>
<span class="n">HttpClient</span> <span class="n">http</span> <span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Get the word count for word (aka key) &#39;alice&#39;: Approach 1</span>
<span class="c1">//</span>
<span class="c1">// We first find the one app instance that manages the count for &#39;alice&#39; in its local state stores.</span>
<span class="n">StreamsMetadata</span> <span class="n">metadata</span> <span class="o">=</span> <span class="n">streams</span><span class="o">.</span><span class="na">metadataForKey</span><span class="o">(</span><span class="s">&quot;word-count&quot;</span><span class="o">,</span> <span class="s">&quot;alice&quot;</span><span class="o">,</span> <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">().</span><span class="na">serializer</span><span class="o">());</span>
<span class="c1">// Then, we query only that single app instance for the latest count of &#39;alice&#39;.</span>
<span class="c1">// Note: The RPC URL shown below is fictitious and only serves to illustrate the idea. Ultimately,</span>
<span class="c1">// the URL (or, in general, the method of communication) will depend on the RPC layer you opted to</span>
<span class="c1">// implement. Again, we provide end-to-end demo applications (such as KafkaMusicExample) that showcase</span>
<span class="c1">// how to implement such an RPC layer.</span>
<span class="n">Long</span> <span class="n">result</span> <span class="o">=</span> <span class="n">http</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="s">&quot;http://&quot;</span> <span class="o">+</span> <span class="n">metadata</span><span class="o">.</span><span class="na">host</span><span class="o">()</span> <span class="o">+</span> <span class="s">&quot;:&quot;</span> <span class="o">+</span> <span class="n">metadata</span><span class="o">.</span><span class="na">port</span><span class="o">()</span> <span class="o">+</span> <span class="s">&quot;/word-count/alice&quot;</span><span class="o">);</span>
<span class="c1">// Get the word count for word (aka key) &#39;alice&#39;: Approach 2</span>
<span class="c1">//</span>
<span class="c1">// Alternatively, we could also choose (say) a brute-force approach where we query every app instance</span>
<span class="c1">// until we find the one that happens to know about &#39;alice&#39;.</span>
<span class="n">Optional</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="n">streams</span><span class="o">.</span><span class="na">allMetadataForStore</span><span class="o">(</span><span class="s">&quot;word-count&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">stream</span><span class="o">()</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">streamsMetadata</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="c1">// Construct the (fictituous) full endpoint URL to query the current remote application instance</span>
<span class="n">String</span> <span class="n">url</span> <span class="o">=</span> <span class="s">&quot;http://&quot;</span> <span class="o">+</span> <span class="n">streamsMetadata</span><span class="o">.</span><span class="na">host</span><span class="o">()</span> <span class="o">+</span> <span class="s">&quot;:&quot;</span> <span class="o">+</span> <span class="n">streamsMetadata</span><span class="o">.</span><span class="na">port</span><span class="o">()</span> <span class="o">+</span> <span class="s">&quot;/word-count/alice&quot;</span><span class="o">;</span>
<span class="c1">// Read and return the count for &#39;alice&#39;, if any.</span>
<span class="k">return</span> <span class="n">http</span><span class="o">.</span><span class="na">getLong</span><span class="o">(</span><span class="n">url</span><span class="o">);</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">s</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span>
<span class="o">.</span><span class="na">findFirst</span><span class="o">();</span></pre></div>
</div>
<p>At this point the full state of the application is interactively queryable:</p>
<ul class="simple">
<li>You can discover the running instances of the application and the state stores they manage locally.</li>
<li>Through the RPC layer that was added to the application, you can communicate with these application instances over the
network and query them for locally available state.</li>
<li>The application instances are able to serve such queries because they can directly query their own local state stores
and respond via the RPC layer.</li>
<li>Collectively, this allows us to query the full state of the entire application.</li>
</ul>
<p>To see an end-to-end application with interactive queries, review the
<a class="reference internal" href="#streams-developer-guide-interactive-queries-demos"><span class="std std-ref">demo applications</span></a>.</p>
</div>
</div>
</div>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/testing" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/memory-mgmt" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation ">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>