blob: 79ef482b0ed474f996666e3b1e82bfffca80c09a [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<script><!--#include virtual="../../js/templateData.js" --></script>
<script id="content-template" type="text/x-handlebars-template">
<!-- h1>Developer Guide for Kafka Streams</h1 -->
<div class="sub-nav-sticky">
<div class="sticky-top">
<!-- div style="height:35px">
<a href="/{{version}}/documentation/streams/">Introduction</a>
<a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
<a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
<a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
<a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
</div -->
</div>
</div>
<div class="section" id="memory-management">
<span id="streams-developer-guide-memory-management"></span><h1>Memory Management<a class="headerlink" href="#memory-management" title="Permalink to this headline"></a></h1>
<p>You can specify the total memory (RAM) size used for internal caching and compacting of records. This caching happens
before the records are written to state stores or forwarded downstream to other nodes.</p>
<p>The record caches are implemented slightly different in the DSL and Processor API.</p>
<div class="contents local topic" id="table-of-contents">
<p class="topic-title first"><b>Table of Contents</b></p>
<ul class="simple">
<li><a class="reference internal" href="#record-caches-in-the-dsl" id="id1">Record caches in the DSL</a></li>
<li><a class="reference internal" href="#record-caches-in-the-processor-api" id="id2">Record caches in the Processor API</a></li>
<li><a class="reference internal" href="#rocksdb" id="id3">RocksDB</a></li>
<li><a class="reference internal" href="#other-memory-usage" id="id4">Other memory usage</a></li>
</ul>
</div>
<div class="section" id="record-caches-in-the-dsl">
<span id="streams-developer-guide-memory-management-record-cache"></span><h2><a class="toc-backref" href="#id1">Record caches in the DSL</a><a class="headerlink" href="#record-caches-in-the-dsl" title="Permalink to this headline"></a></h2>
<p>You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is leveraged
by the following <code class="docutils literal"><span class="pre">KTable</span></code> instances:</p>
<ul class="simple">
<li>Source <code class="docutils literal"><span class="pre">KTable</span></code>: <code class="docutils literal"><span class="pre">KTable</span></code> instances that are created via <code class="docutils literal"><span class="pre">StreamsBuilder#table()</span></code> or <code class="docutils literal"><span class="pre">StreamsBuilder#globalTable()</span></code>.</li>
<li>Aggregation <code class="docutils literal"><span class="pre">KTable</span></code>: instances of <code class="docutils literal"><span class="pre">KTable</span></code> that are created as a result of <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregations</span></a>.</li>
</ul>
<p>For such <code class="docutils literal"><span class="pre">KTable</span></code> instances, the record cache is used for:</p>
<ul class="simple">
<li>Internal caching and compacting of output records before they are written by the underlying stateful
<a class="reference internal" href="../core-concepts#streams_processor_node"><span class="std std-ref">processor node</span></a> to its internal state stores.</li>
<li>Internal caching and compacting of output records before they are forwarded from the underlying stateful
<a class="reference internal" href="../core-concepts#streams_processor_node"><span class="std std-ref">processor node</span></a> to any of its downstream processor nodes.</li>
</ul>
<p>Use the following example to understand the behaviors with and without record caching. In this example, the input is a
<code class="docutils literal"><span class="pre">KStream&lt;String,</span> <span class="pre">Integer&gt;</span></code> with the records <code class="docutils literal"><span class="pre">&lt;K,V&gt;:</span> <span class="pre">&lt;A,</span> <span class="pre">1&gt;,</span> <span class="pre">&lt;D,</span> <span class="pre">5&gt;,</span> <span class="pre">&lt;A,</span> <span class="pre">20&gt;,</span> <span class="pre">&lt;A,</span> <span class="pre">300&gt;</span></code>. The focus in this example is
on the records with key == <code class="docutils literal"><span class="pre">A</span></code>.</p>
<ul>
<li><p class="first">An <a class="reference internal" href="dsl-api.html#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregation</span></a> computes the sum of record values, grouped by key, for
the input and returns a <code class="docutils literal"><span class="pre">KTable&lt;String,</span> <span class="pre">Integer&gt;</span></code>.</p>
<blockquote>
<div><ul class="simple">
<li><strong>Without caching</strong>: a sequence of output records is emitted for key <code class="docutils literal"><span class="pre">A</span></code> that represent changes in the
resulting aggregation table. The parentheses (<code class="docutils literal"><span class="pre">()</span></code>) denote changes, the left number is the new aggregate value
and the right number is the old aggregate value: <code class="docutils literal"><span class="pre">&lt;A,</span> <span class="pre">(1,</span> <span class="pre">null)&gt;,</span> <span class="pre">&lt;A,</span> <span class="pre">(21,</span> <span class="pre">1)&gt;,</span> <span class="pre">&lt;A,</span> <span class="pre">(321,</span> <span class="pre">21)&gt;</span></code>.</li>
<li><strong>With caching</strong>: a single output record is emitted for key <code class="docutils literal"><span class="pre">A</span></code> that would likely be compacted in the cache,
leading to a single output record of <code class="docutils literal"><span class="pre">&lt;A,</span> <span class="pre">(321,</span> <span class="pre">null)&gt;</span></code>. This record is written to the aggregation&#8217;s internal state
store and forwarded to any downstream operations.</li>
</ul>
</div></blockquote>
</li>
</ul>
<p>The cache size is specified through the <code class="docutils literal"><span class="pre">cache.max.bytes.buffering</span></code> parameter, which is a global setting per
processing topology:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Enable record cache of size 10 MB.</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">10</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span></code></pre></div>
</div>
<p>This parameter controls the number of bytes allocated for caching. Specifically, for a processor topology instance with
<code class="docutils literal"><span class="pre">T</span></code> threads and <code class="docutils literal"><span class="pre">C</span></code> bytes allocated for caching, each thread will have an even <code class="docutils literal"><span class="pre">C/T</span></code> bytes to construct its own
cache and use as it sees fit among its tasks. This means that there are as many caches as there are threads, but no sharing of
caches across threads happens.</p>
<p>The basic API for the cache is made of <code class="docutils literal"><span class="pre">put()</span></code> and <code class="docutils literal"><span class="pre">get()</span></code> calls. Records are
evicted using a simple LRU scheme after the cache size is reached. The first time a keyed record <code class="docutils literal"><span class="pre">R1</span> <span class="pre">=</span> <span class="pre">&lt;K1,</span> <span class="pre">V1&gt;</span></code>
finishes processing at a node, it is marked as dirty in the cache. Any other keyed record <code class="docutils literal"><span class="pre">R2</span> <span class="pre">=</span> <span class="pre">&lt;K1,</span> <span class="pre">V2&gt;</span></code> with the
same key <code class="docutils literal"><span class="pre">K1</span></code> that is processed on that node during that time will overwrite <code class="docutils literal"><span class="pre">&lt;K1,</span> <span class="pre">V1&gt;</span></code>, this is referred to as
&#8220;being compacted&#8221;. This has the same effect as
<a class="reference external" href="https://kafka.apache.org/documentation.html#compaction">Kafka&#8217;s log compaction</a>, but happens earlier, while the
records are still in memory, and within your client-side application, rather than on the server-side (i.e. the Kafka
broker). After flushing, <code class="docutils literal"><span class="pre">R2</span></code> is forwarded to the next processing node and then written to the local state store.</p>
<p>The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node
whenever the earliest of <code class="docutils literal"><span class="pre">commit.interval.ms</span></code> or <code class="docutils literal"><span class="pre">cache.max.bytes.buffering</span></code> (cache pressure) hits. Both
<code class="docutils literal"><span class="pre">commit.interval.ms</span></code> and <code class="docutils literal"><span class="pre">cache.max.bytes.buffering</span></code> are global parameters. As such, it is not possible to specify
different parameters for individual nodes.</p>
<p>Here are example settings for both parameters based on desired scenarios.</p>
<ul>
<li><p class="first">To turn off caching the cache size can be set to zero:</p>
<blockquote>
<div><div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Disable record cache</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">0</span><span class="o">);</span></code></pre></div>
</div>
<p>Turning off caching might result in high write traffic for the underlying RocksDB store.
With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled.
Thus, to avoid high write traffic it is recommended to enable RocksDB caching if Kafka Streams caching is turned off.</p>
<p>For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size to 32 MB. For more information, see
the <a class="reference internal" href="config-streams.html#streams-developer-guide-rocksdb-config"><span class="std std-ref">RocksDB config</span></a>.</p>
</div></blockquote>
</li>
<li><p class="first">To enable caching but still have an upper bound on how long records will be cached, you can set the commit interval. In this example, it is set to 1000 milliseconds:</p>
<blockquote>
<div><div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="c1">// Enable record cache of size 10 MB.</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">CACHE_MAX_BYTES_BUFFERING_CONFIG</span><span class="o">,</span> <span class="mi">10</span> <span class="o">*</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024L</span><span class="o">);</span>
<span class="c1">// Set commit interval to 1 second.</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="n">StreamsConfig</span><span class="o">.</span><span class="na">COMMIT_INTERVAL_MS_CONFIG</span><span class="o">,</span> <span class="mi">1000</span><span class="o">);</span></code></pre></div>
</div>
</div></blockquote>
</li>
</ul>
<p>The effect of these two configurations is described in the figure below. The records are shown using 4 keys: blue, red, yellow, and green. Assume the cache has space for only 3 keys.</p>
<ul>
<li><p class="first">When the cache is disabled (a), all of the input records will be output.</p>
</li>
<li><p class="first">When the cache is enabled (b):</p>
<blockquote>
<div><ul class="simple">
<li>Most records are output at the end of commit intervals (e.g., at <code class="docutils literal"><span class="pre">t1</span></code> a single blue record is output, which is the final over-write of the blue key up to that time).</li>
<li>Some records are output because of cache pressure (i.e. before the end of a commit interval). For example, see the red record before <code class="docutils literal"><span class="pre">t2</span></code>. With smaller cache sizes we expect cache pressure to be the primary factor that dictates when records are output. With large cache sizes, the commit interval will be the primary factor.</li>
<li>The total number of records output has been reduced from 15 to 8.</li>
</ul>
</div></blockquote>
</li>
</ul>
<div class="figure align-center">
<img class="centered" src="/{{version}}/images/streams-cache-and-commit-interval.png">
</div>
</div>
<div class="section" id="record-caches-in-the-processor-api">
<span id="streams-developer-guide-memory-management-state-store-cache"></span><h2><a class="toc-backref" href="#id2">Record caches in the Processor API</a><a class="headerlink" href="#record-caches-in-the-processor-api" title="Permalink to this headline"></a></h2>
<p>You can specify the total memory (RAM) size of the record cache for an instance of the processing topology. It is used
for internal caching and compacting of output records before they are written from a stateful processor node to its
state stores.</p>
<p>The record cache in the Processor API does not cache or compact any output records that are being forwarded downstream.
This means that all downstream processor nodes can see all records, whereas the state stores see a reduced number of records.
This does not impact correctness of the system, but is a performance optimization for the state stores. For example, with the
Processor API you can store a record in a state store while forwarding a different value downstream.</p>
<p>Following from the example first shown in section <a class="reference internal" href="processor-api.html#streams-developer-guide-state-store"><span class="std std-ref">State Stores</span></a>, to disable caching, you can
add the <code class="docutils literal"><span class="pre">withCachingDisabled</span></code> call (note that caches are enabled by default, however there is an explicit <code class="docutils literal"><span class="pre">withCachingEnabled</span></code>
call).</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">StoreBuilder</span> <span class="n">countStoreBuilder</span> <span class="o">=</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">keyValueStoreBuilder</span><span class="o">(</span>
<span class="n">Stores</span><span class="o">.</span><span class="na">persistentKeyValueStore</span><span class="o">(</span><span class="s">&quot;Counts&quot;</span><span class="o">),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span>
<span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span>
<span class="o">.</span><span class="na">withCachingEnabled</span><span class="o">()</span></code></pre></div>
</div>
</div>
<div class="section" id="rocksdb">
<h2><a class="toc-backref" href="#id3">RocksDB</a><a class="headerlink" href="#rocksdb" title="Permalink to this headline"></a></h2>
<p> Each instance of RocksDB allocates off-heap memory for a block cache, index and filter blocks, and memtable (write buffer). Critical configs (for RocksDB version 4.1.0) include
<code class="docutils literal"><span class="pre">block_cache_size</span></code>, <code class="docutils literal"><span class="pre">write_buffer_size</span></code> and <code class="docutils literal"><span class="pre">max_write_buffer_number</span></code>. These can be specified through the
<code class="docutils literal"><span class="pre">rocksdb.config.setter</span></code> configuration.</li>
<p> As of 2.3.0 the memory usage across all instances can be bounded, limiting the total off-heap memory of your Kafka Streams application. To do so you must configure RocksDB to cache the index and filter blocks in the block cache, limit the memtable memory through a shared <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager">WriteBufferManager</a> and count its memory against the block cache, and then pass the same Cache object to each instance. See <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB">RocksDB Memory Usage</a> for details. An example RocksDBConfigSetter implementing this is shown below:</p>
<div class="highlight-java"><div class="highlight"><pre><span></span> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">BoundedMemoryRocksDBConfig</span> <span class="kd">implements</span> <span class="n">RocksDBConfigSetter</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">static</span> <span class="n">org.rocksdb.Cache</span> <span class="n">cache</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">LRUCache</span><span class="o">(</span><span class="mi">TOTAL_OFF_HEAP_MEMORY</span><span class="o">,</span> <span class="n">-1</span><span class="o">,</span> <span class="n">false</span><span class="o">,</span> <span class="n">INDEX_FILTER_BLOCK_RATIO</span><span class="o">);</span><sup><a href="#fn1" id="ref1">1</a></sup>
<span class="kd">private</span> <span class="kt">static</span> <span class="n">org.rocksdb.WriteBufferManager</span> <span class="n">writeBufferManager</span> <span class="o">=</span> <span class="k">new</span> <span class="n">org</span><span class="o">.</span><span class="na">rocksdb</span><span class="o">.</span><span class="na">WriteBufferManager</span><span class="o">(</span><span class="mi">TOTAL_MEMTABLE_MEMORY</span><span class="o">,</span> cache<span class="o">);</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">setConfig</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">&gt;</span> <span class="n">configs</span><span class="o">)</span> <span class="o">{</span>
<span class="n">BlockBasedTableConfig</span> <span class="n">tableConfig</span> <span class="o">=</span> <span class="k">(BlockBasedTableConfig)</span> <span class="n">options</span><span><span class="o">.</span><span class="na">tableFormatConfig</span><span class="o">();</span>
<span class="c1"> // These three options in combination will limit the memory used by RocksDB to the size passed to the block cache (TOTAL_OFF_HEAP_MEMORY)</span>
<span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockCache</span><span class="o">(</span><span class="mi">cache</span><span class="o">);</span>
<span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocks</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">options</span><span class="o">.</span><span class="na">setWriteBufferManager</span><span class="o">(</span><span class="mi">writeBufferManager</span><span class="o">);</span>
<span class="c1"> // These options are recommended to be set when bounding the total memory</span>
<span class="n">tableConfig</span><span class="o">.</span><span class="na">setCacheIndexAndFilterBlocksWithHighPriority</span><span class="o">(</span><span class="mi">true</span><span class="o">);</span><sup><a href="#fn2" id="ref2">2</a></sup>
<span class="n">tableConfig</span><span class="o">.</span><span class="na">setPinTopLevelIndexAndFilter</span><span class="o">(</span><span class="mi">true</span><span class="o">);</span>
<span class="n">tableConfig</span><span class="o">.</span><span class="na">setBlockSize</span><span class="o">(</span><span class="mi">BLOCK_SIZE</span><span class="o">);</span><sup><a href="#fn3" id="ref3">3</a></sup>
<span class="n">options</span><span class="o">.</span><span class="na">setMaxWriteBufferNumber</span><span class="o">(</span><span class="mi">N_MEMTABLES</span><span class="o">);</span>
<span class="n">options</span><span class="o">.</span><span class="na">setWriteBufferSize</span><span class="o">(</span><span class="mi">MEMTABLE_SIZE</span><span class="o">);</span>
<span class="n">options</span><span class="o">.</span><span class="na">setTableFormatConfig</span><span class="o">(</span><span class="n">tableConfig</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">(</span><span class="kd">final</span> <span class="n">String</span> <span class="n">storeName</span><span class="o">,</span> <span class="kd">final</span> <span class="n">Options</span> <span class="n">options</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.</span>
<span class="o">}</span>
<span class="o">}</span>
</div>
<sup id="fn1">1. INDEX_FILTER_BLOCK_RATIO can be used to set a fraction of the block cache to set aside for "high priority" (aka index and filter) blocks, preventing them from being evicted by data blocks. See the full signature of the <a class="reference external" href="https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/LRUCache.java#L72">LRUCache constructor</a>.
NOTE: the boolean parameter in the cache constructor lets you control whether the cache should enforce a strict memory limit by failing the read or iteration in the rare cases where it might go larger than its capacity. Due to a
<a class="reference external" href="https://github.com/facebook/rocksdb/issues/6247">bug in RocksDB</a>, this option cannot be used
if the write buffer memory is also counted against the cache. If you set this to true, you should NOT pass the cache in to the <code>WriteBufferManager</code> and just control the write buffer and cache memory separately.</sup>
<br>
<sup id="fn2">2. This must be set in order for INDEX_FILTER_BLOCK_RATIO to take effect (see footnote 1) as described in the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-and-filter-blocks">RocksDB docs</a></sup>
<br>
<sup id="fn3">3. You may want to modify the default <a class="reference external" href="https://github.com/apache/kafka/blob/2.3/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L79">block size</a> per these instructions from the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#indexes-and-filter-blocks">RocksDB GitHub</a>. A larger block size means index blocks will be smaller, but the cached data blocks may contain more cold data that would otherwise be evicted.
<br>
<dl class="docutils">
<dt>Note:</dt>
While we recommend setting at least the above configs, the specific options that yield the best performance are workload dependent and you should consider experimenting with these to determine the best choices for your specific use case. Keep in mind that the optimal configs for one app may not apply to one with a different topology or input topic.
In addition to the recommended configs above, you may want to consider using partitioned index filters as described by the <a class="reference external" href="https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters">RocksDB docs</a>.
</dl>
</div>
<div class="section" id="other-memory-usage">
<h2><a class="toc-backref" href="#id4">Other memory usage</a><a class="headerlink" href="#other-memory-usage" title="Permalink to this headline"></a></h2>
<p>There are other modules inside Apache Kafka that allocate memory during runtime. They include the following:</p>
<ul class="simple">
<li>Producer buffering, managed by the producer config <code class="docutils literal"><span class="pre">buffer.memory</span></code>.</li>
<li>Consumer buffering, currently not strictly managed, but can be indirectly controlled by fetch size, i.e.,
<code class="docutils literal"><span class="pre">fetch.max.bytes</span></code> and <code class="docutils literal"><span class="pre">fetch.max.wait.ms</span></code>.</li>
<li>Both producer and consumer also have separate TCP send / receive buffers that are not counted as the buffering memory.
These are controlled by the <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code> / <code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> configs.</li>
<li>Deserialized objects buffering: after <code class="docutils literal"><span class="pre">consumer.poll()</span></code> returns records, they will be deserialized to extract
timestamp and buffered in the streams space. Currently this is only indirectly controlled by
<code class="docutils literal"><span class="pre">buffered.records.per.partition</span></code>.</li>
</ul>
<div class="admonition tip">
<p><b>Tip</b></p>
<p><strong>Iterators should be closed explicitly to release resources:</strong> Store iterators (e.g., <code class="docutils literal"><span class="pre">KeyValueIterator</span></code> and <code class="docutils literal"><span class="pre">WindowStoreIterator</span></code>) must be closed explicitly upon completeness to release resources such as open file handlers and in-memory read buffers, or use try-with-resources statement (available since JDK7) for this Closeable class.</p>
<p class="last">Otherwise, stream application&#8217;s memory usage keeps increasing when running until it hits an OOM.</p>
</div>
</div>
</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/interactive-queries" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/running-app" class="pagination__btn pagination__btn__next">Next</a>
</div>
</script>
<!--#include virtual="../../../includes/_header.htm" -->
<!--#include virtual="../../../includes/_top.htm" -->
<div class="content documentation ">
<!--#include virtual="../../../includes/_nav.htm" -->
<div class="right">
<!--#include virtual="../../../includes/_docs_banner.htm" -->
<ul class="breadcrumbs">
<li><a href="/documentation">Documentation</a></li>
<li><a href="/documentation/streams">Kafka Streams</a></li>
<li><a href="/documentation/streams/developer-guide/">Developer Guide</a></li>
</ul>
<div class="p-content"></div>
</div>
</div>
<!--#include virtual="../../../includes/_footer.htm" -->
<script>
$(function() {
// Show selected style on nav item
$('.b-nav__streams').addClass('selected');
//sticky secondary nav
var $navbar = $(".sub-nav-sticky"),
y_pos = $navbar.offset().top,
height = $navbar.height();
$(window).scroll(function() {
var scrollTop = $(window).scrollTop();
if (scrollTop > y_pos - height) {
$navbar.addClass("navbar-fixed")
} else if (scrollTop <= y_pos) {
$navbar.removeClass("navbar-fixed")
}
});
// Display docs subnav items
$('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded');
});
</script>