blob: facc2bf67ef44000825673f6162e36f748c7e304 [file] [log] [blame]
<!DOCTYPE html>
<!--
| Generated by Apache Maven Doxia Site Renderer 1.8 from src/site/markdown/metron-platform/Performance-tuning-guide.md at 2019-05-14
| Rendered using Apache Maven Fluido Skin 1.7
-->
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="Date-Revision-yyyymmdd" content="20190514" />
<meta http-equiv="Content-Language" content="en" />
<title>Metron &#x2013; Metron Performance Tuning Guide</title>
<link rel="stylesheet" href="../css/apache-maven-fluido-1.7.min.css" />
<link rel="stylesheet" href="../css/site.css" />
<link rel="stylesheet" href="../css/print.css" media="print" />
<script type="text/javascript" src="../js/apache-maven-fluido-1.7.min.js"></script>
<script type="text/javascript">
$( document ).ready( function() { $( '.carousel' ).carousel( { interval: 3500 } ) } );
</script>
</head>
<body class="topBarDisabled">
<div class="container-fluid">
<div id="banner">
<div class="pull-left"><a href="http://metron.apache.org/" id="bannerLeft"><img src="../images/metron-logo.png" alt="Apache Metron" width="148px" height="48px"/></a></div>
<div class="pull-right"></div>
<div class="clear"><hr/></div>
</div>
<div id="breadcrumbs">
<ul class="breadcrumb">
<li class=""><a href="http://www.apache.org" class="externalLink" title="Apache">Apache</a><span class="divider">/</span></li>
<li class=""><a href="http://metron.apache.org/" class="externalLink" title="Metron">Metron</a><span class="divider">/</span></li>
<li class=""><a href="../index.html" title="Documentation">Documentation</a><span class="divider">/</span></li>
<li class="active ">Metron Performance Tuning Guide</li>
<li id="publishDate" class="pull-right"><span class="divider">|</span> Last Published: 2019-05-14</li>
<li id="projectVersion" class="pull-right">Version: 0.7.1</li>
</ul>
</div>
<div class="row-fluid">
<div id="leftColumn" class="span2">
<div class="well sidebar-nav">
<ul class="nav nav-list">
<li class="nav-header">User Documentation</li>
<li><a href="../index.html" title="Metron"><span class="icon-chevron-down"></span>Metron</a>
<ul class="nav nav-list">
<li><a href="../CONTRIBUTING.html" title="CONTRIBUTING"><span class="none"></span>CONTRIBUTING</a></li>
<li><a href="../Upgrading.html" title="Upgrading"><span class="none"></span>Upgrading</a></li>
<li><a href="../metron-analytics/index.html" title="Analytics"><span class="icon-chevron-right"></span>Analytics</a></li>
<li><a href="../metron-contrib/metron-docker/index.html" title="Docker"><span class="none"></span>Docker</a></li>
<li><a href="../metron-contrib/metron-performance/index.html" title="Performance"><span class="none"></span>Performance</a></li>
<li><a href="../metron-deployment/index.html" title="Deployment"><span class="icon-chevron-right"></span>Deployment</a></li>
<li><a href="../metron-interface/index.html" title="Interface"><span class="icon-chevron-right"></span>Interface</a></li>
<li><a href="../metron-platform/index.html" title="Platform"><span class="icon-chevron-down"></span>Platform</a>
<ul class="nav nav-list">
<li class="active"><a href="#"><span class="none"></span>Performance-tuning-guide</a></li>
<li><a href="../metron-platform/metron-common/index.html" title="Common"><span class="none"></span>Common</a></li>
<li><a href="../metron-platform/metron-data-management/index.html" title="Data-management"><span class="none"></span>Data-management</a></li>
<li><a href="../metron-platform/metron-elasticsearch/index.html" title="Elasticsearch"><span class="none"></span>Elasticsearch</a></li>
<li><a href="../metron-platform/metron-enrichment/index.html" title="Enrichment"><span class="icon-chevron-right"></span>Enrichment</a></li>
<li><a href="../metron-platform/metron-hbase-server/index.html" title="Hbase-server"><span class="none"></span>Hbase-server</a></li>
<li><a href="../metron-platform/metron-indexing/index.html" title="Indexing"><span class="none"></span>Indexing</a></li>
<li><a href="../metron-platform/metron-job/index.html" title="Job"><span class="none"></span>Job</a></li>
<li><a href="../metron-platform/metron-management/index.html" title="Management"><span class="none"></span>Management</a></li>
<li><a href="../metron-platform/metron-parsing/index.html" title="Parsing"><span class="icon-chevron-right"></span>Parsing</a></li>
<li><a href="../metron-platform/metron-pcap-backend/index.html" title="Pcap-backend"><span class="none"></span>Pcap-backend</a></li>
<li><a href="../metron-platform/metron-solr/index.html" title="Solr"><span class="none"></span>Solr</a></li>
<li><a href="../metron-platform/metron-writer/index.html" title="Writer"><span class="none"></span>Writer</a></li>
</ul>
</li>
<li><a href="../metron-sensors/index.html" title="Sensors"><span class="icon-chevron-right"></span>Sensors</a></li>
<li><a href="../metron-stellar/stellar-3rd-party-example/index.html" title="Stellar-3rd-party-example"><span class="none"></span>Stellar-3rd-party-example</a></li>
<li><a href="../metron-stellar/stellar-common/index.html" title="Stellar-common"><span class="icon-chevron-right"></span>Stellar-common</a></li>
<li><a href="../metron-stellar/stellar-zeppelin/index.html" title="Stellar-zeppelin"><span class="none"></span>Stellar-zeppelin</a></li>
<li><a href="../use-cases/index.html" title="Use-cases"><span class="icon-chevron-right"></span>Use-cases</a></li>
</ul>
</li>
</ul>
<hr />
<div id="poweredBy">
<div class="clear"></div>
<div class="clear"></div>
<div class="clear"></div>
<div class="clear"></div>
<a href="http://maven.apache.org/" title="Built by Maven" class="poweredBy"><img class="builtBy" alt="Built by Maven" src="../images/logos/maven-feather.png" /></a>
</div>
</div>
</div>
<div id="bodyColumn" class="span10" >
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<h1>Metron Performance Tuning Guide</h1>
<p><a name="Metron_Performance_Tuning_Guide"></a></p>
<ul>
<li><a href="#Overview">Overview</a></li>
<li><a href="#General_Tuning_Suggestions">General Tuning Suggestions</a></li>
<li><a href="#Component_Tuning_Levers">Component Tuning Levers</a></li>
<li><a href="#Use_Case_Specific_Tuning_Suggestions">Use Case Specific Tuning Suggestions</a></li>
<li><a href="#Debugging">Debugging</a></li>
<li><a href="#Issues">Issues</a></li>
<li><a href="#Reference">Reference</a></li>
</ul>
<div class="section">
<h2><a name="Overview"></a>Overview</h2>
<p>This document provides guidance from our experiences tuning the Apache Metron Storm topologies for maximum performance. You&#x2019;ll find suggestions for optimum configurations under a 1 gbps load along with some guidance around the tooling we used to monitor and assess our throughput.</p>
<p>In the simplest terms, Metron is a streaming architecture created on top of Kafka and three main types of Storm topologies: parsers, enrichment, and indexing. Each parser has it&#x2019;s own topology and there is also a highly performant, specialized spout-only topology for streaming PCAP data to HDFS. We found that the architecture can be tuned almost exclusively through using a few primary Storm and Kafka parameters along with a few Metron-specific options. You can think of the data flow as being similar to water flowing through a pipe, and the majority of these options assist in tweaking the various pipe widths in the system.</p></div>
<div class="section">
<h2><a name="General_Tuning_Suggestions"></a>General Tuning Suggestions</h2>
<div class="section">
<h3><a name="Storm_Executors_vs._Tasks"></a>Storm Executors vs. Tasks</h3>
<p>Note that there is currently no method for specifying the number of tasks from the number of executors in Flux topologies (enrichment, indexing). By default, the number of tasks will equal the number of executors. Logically, setting the number of tasks equal to the number of executors is sensible. Storm enforces num executors &lt;= num tasks. The reason you might set the number of tasks higher than the number of executors is for future performance tuning and rebalancing without the need to bring down your topologies. The number of tasks is fixed at topology startup time whereas the number of executors can be increased up to a maximum value equal to the number of tasks.</p></div>
<div class="section">
<h3><a name="Kafka_Spout_Configuration"></a>Kafka Spout Configuration</h3>
<p>When configuring Storm Kafka spouts, we found that the default values for</p>
<ul>
<li><tt>poll.timeout.ms</tt></li>
<li><tt>offset.commit.period.ms</tt></li>
<li><tt>max.uncommitted.offsets</tt></li>
</ul>
<p>worked well in nearly all cases. As a general rule, it was optimal to set spout parallelism equal to the number of partitions used in your Kafka topic. Any greater parallelism will leave you with idle consumers since Kafka limits the max number of consumers to the number of partitions. This is important because Kafka has certain ordering guarantees for message delivery per partition that would not be possible if more than one consumer in a given consumer group were able to read from that partition.</p></div></div>
<div class="section">
<h2><a name="Sensor_Topology_Tuning_Suggestions"></a>Sensor Topology Tuning Suggestions</h2>
<p>If you are using stellar field transformations in your sensors, by default, stellar expressions are not cached. Sensors that use stellar field transformations by see a performance boost by turning on caching via setting the <tt>cacheConfig</tt> <a href="metron-parsers-common/index.html#parser_configuration">property</a>. This is beneficial if your transformations:</p>
<ul>
<li>Are complex (e.g. <tt>ENRICHMENT_GET</tt> calls or other high latency calls)</li>
<li>All Yield the same results for the same inputs ( caching is either off or applied to all transformations)
<ul>
<li>If any of your transformations are non-deterministic, caching should not be used as it will result in the likelihood of incorrect results being returned.</li>
</ul>
</li>
</ul></div>
<div class="section">
<h2><a name="Component_Tuning_Levers"></a>Component Tuning Levers</h2>
<div class="section">
<h3><a name="High_Level_Overview"></a>High Level Overview</h3>
<p>There are a number of levers that can be set while tuning a Metron cluster. The main services to interact with for performance tuning are: Kafka, Storm, HDFS, and indexing (Elasticsearch or Solr). For each service, here is a high level breakdown of the major knobs and levers that can be modified while tuning your cluster.</p>
<ul>
<li>Kafka
<ul>
<li>Number partitions</li>
</ul>
</li>
<li>Storm
<ul>
<li>Kafka spout
<ul>
<li>Polling frequency</li>
<li>Polling timeouts</li>
<li>Offset commit period</li>
<li>Max uncommitted offsets</li>
</ul>
</li>
<li>Number workers (OS processes)</li>
<li>Number executors (threads in a process)</li>
<li>Number ackers</li>
<li>Max spout pending</li>
<li>Spout and bolt parallelism</li>
</ul>
</li>
<li>HDFS
<ul>
<li>Replication factor</li>
</ul>
</li>
</ul></div>
<div class="section">
<h3><a name="Kafka_Tuning"></a>Kafka Tuning</h3>
<p>The main lever you&#x2019;re going to work with when tuning Kafka throughput will be the number of partitions. A handy method for deciding how many partitions to use is to first calculate the throughput for a single producer (p) and a single consumer (c), and then use that with the desired throughput (t) to roughly estimate the number of partitions to use. You would want at least max(t/p, t/c) partitions to attain the desired throughput. See <a class="externalLink" href="https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/">https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/</a> for more details.</p></div>
<div class="section">
<h3><a name="Storm_Tuning"></a>Storm Tuning</h3>
<div class="section">
<h4><a name="Overview"></a>Overview</h4>
<p>There are quite a few options you will be confronted with when tuning your Storm topologies and this is largely trial and error. As a general rule of thumb, we recommend starting with the defaults and smaller numbers in terms of parallelism while iteratively working up until the desired performance is achieved. You will find the offset lag tool indispensable while verifying your settings.</p>
<p>We won&#x2019;t go into a full discussion about Storm&#x2019;s architecture - see references section for more info - but there are some general rules of thumb that should be followed. It&#x2019;s first important to understand the ways you can impact parallelism in a Storm topology.</p>
<ul>
<li>num tasks</li>
<li>num executors (parallelism hint)</li>
<li>num workers</li>
</ul>
<p>Tasks are instances of a given spout or bolt, executors are threads in a process, and workers are jvm processes. You&#x2019;ll want the number of tasks as a multiple of the number of executors, the number of executors as multiple of the number of workers, and the number of workers as a multiple of the number of machines. The main reason for this approach is that it will give a uniform distribution of work to each machine and jvm process. More often than not, your number of tasks will be equal to the number of executors, which is the default in Storm. Flux does not actually provide a way to independently set number of tasks, so for enrichments and indexing, which use Flux, num tasks will always equal num executors.</p>
<p>You can change the number of workers via the Storm property <tt>topology.workers</tt></p>
<p><b>Other Storm Settings</b></p>
<div>
<div>
<pre class="source">topology.max.spout.pending
</pre></div></div>
<p>This is the maximum number of tuples that can be in flight (ie, not yet acked) at any given time within your topology. You set this as a form of backpressure to ensure you don&#x2019;t flood your topology.</p>
<div>
<div>
<pre class="source">topology.ackers.executors
</pre></div></div>
<p>This specifies how many threads should be dedicated to tuple acking. We found that setting this equal to the number of partitions in your inbound Kafka topic worked well.</p>
<p><b>spout-config.json</b></p>
<div>
<div>
<pre class="source">{
...
&quot;spout.pollTimeoutMs&quot; : 200,
&quot;spout.maxUncommittedOffsets&quot; : 10000000,
&quot;spout.offsetCommitPeriodMs&quot; : 30000
}
</pre></div></div>
<p>Above is a snippet for configuring parsers. These are the spout recommended defaults from Storm and are currently the defaults provided in the Kafka spout itself. In fact, if you find the recommended defaults work fine for you, then you can omit these settings altogether.</p></div>
<div class="section">
<h4><a name="Where_to_Find_Tuning_Properties"></a>Where to Find Tuning Properties</h4>
<p><b>Important:</b> The parser topologies are deployed via a builder pattern that takes parameters from the CLI as set via Ambari. The enrichment and indexing topologies are configured using a Storm Flux file, a configuration properties file, and Ambari. Here is a setting materialization summary for each of the topology types:</p>
<ul>
<li>Parsers
<ul>
<li>Management UI -&gt; parser json config and CLI -&gt; Storm</li>
</ul>
</li>
<li>Enrichment
<ul>
<li>Ambari UI -&gt; properties file -&gt; Flux -&gt; Storm</li>
</ul>
</li>
<li>Indexing
<ul>
<li>Ambari UI -&gt; properties file -&gt; Flux -&gt; Storm</li>
</ul>
</li>
</ul>
<p><b>Parsers</b></p>
<p>This is a mapping of the various performance tuning properties for parsers and how they are materialized.</p>
<p>See more detail on starting parsers <a class="externalLink" href="https://github.com/apache/metron/blob/master/metron-platform/metron-parsing/metron-parsers-common/README.md#starting-the-parser-topology">here</a></p>
<table border="0" class="table table-striped">
<thead>
<tr class="a">
<th> Category </th>
<th> Management UI Property Name </th>
<th> JSON Config File Property Name </th>
<th> CLI Option </th>
<th> Storm Property Name </th>
<th> Notes </th></tr>
</thead><tbody>
<tr class="b">
<td> Storm topology config </td>
<td> Num Workers </td>
<td> n/a </td>
<td> -nw,&#x2013;num_workers &lt;NUM_WORKERS&gt; </td>
<td> topology.workers </td>
<td> </td></tr>
<tr class="a">
<td> </td>
<td> Num Ackers </td>
<td> n/a </td>
<td> -na,&#x2013;num_ackers &lt;NUM_ACKERS&gt; </td>
<td> topology.acker.executors </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> Storm Config </td>
<td> topology.max.spout.pending </td>
<td> -e,&#x2013;extra_topology_options &lt;JSON_FILE&gt;, e.g. { &#x201c;topology.max.spout.pending&#x201d; : NUM } </td>
<td> topology.max.spout.pending </td>
<td> Put property in JSON format in a file named <tt>storm-&lt;MY_PARSER&gt;-config.json</tt> </td></tr>
<tr class="a">
<td> Kafka spout </td>
<td> Spout Parallelism </td>
<td> n/a </td>
<td> -sp,&#x2013;spout_p &lt;SPOUT_PARALLELISM_HINT&gt; </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> Spout Num Tasks </td>
<td> n/a </td>
<td> -snt,&#x2013;spout_num_tasks &lt;NUM_TASKS&gt; </td>
<td> n/a </td>
<td> </td></tr>
<tr class="a">
<td> </td>
<td> Spout Config </td>
<td> spout.pollTimeoutMs </td>
<td> -esc,&#x2013;extra_kafka_spout_config &lt;JSON_FILE&gt;, e.g. { &#x201c;spout.pollTimeoutMs&#x201d; : 200 } </td>
<td> n/a </td>
<td> Put property in JSON format in a file named <tt>spout-&lt;MY_PARSER&gt;-config.json</tt> </td></tr>
<tr class="b">
<td> </td>
<td> Spout Config </td>
<td> spout.maxUncommittedOffsets </td>
<td> -esc,&#x2013;extra_kafka_spout_config &lt;JSON_FILE&gt;, e.g. { &#x201c;spout.maxUncommittedOffsets&#x201d; : 10000000 } </td>
<td> n/a </td>
<td> Put property in JSON format in a file named <tt>spout-&lt;MY_PARSER&gt;-config.json</tt> </td></tr>
<tr class="a">
<td> </td>
<td> Spout Config </td>
<td> spout.offsetCommitPeriodMs </td>
<td> -esc,&#x2013;extra_kafka_spout_config &lt;JSON_FILE&gt;, e.g. { &#x201c;spout.offsetCommitPeriodMs&#x201d; : 30000 } </td>
<td> n/a </td>
<td> Put property in JSON format in a file named <tt>spout-&lt;MY_PARSER&gt;-config.json</tt> </td></tr>
<tr class="b">
<td> Parser bolt </td>
<td> Parser Num Tasks </td>
<td> n/a </td>
<td> -pnt,&#x2013;parser_num_tasks &lt;NUM_TASKS&gt; </td>
<td> n/a </td>
<td> </td></tr>
<tr class="a">
<td> </td>
<td> Parser Parallelism </td>
<td> n/a </td>
<td> -pp,&#x2013;parser_p &lt;PARALLELISM_HINT&gt; </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> Parser Parallelism </td>
<td> n/a </td>
<td> -pp,&#x2013;parser_p &lt;PARALLELISM_HINT&gt; </td>
<td> n/a </td>
<td> </td></tr>
</tbody>
</table>
<p><b>Enrichment</b></p>
<p><b>Note</b> These recommendations are based on the deprecated split-join enrichment topology. See <a href="metron-enrichment/Performance.html">Enrichment Performance</a> for tuning recommendations for the new default unified enrichment topology.</p>
<p>This is a mapping of the various performance tuning properties for enrichments and how they are materialized.</p>
<p>Flux file found here - $METRON_HOME/flux/enrichment/remote-splitjoin.yaml</p>
<p><i>Note 1:</i> Changes to Flux file properties that are managed by Ambari will render Ambari unable to further manage the property.</p>
<p><i>Note 2:</i> Many of these settings will be irrelevant in the alternate non-split-join topology</p>
<table border="0" class="table table-striped">
<thead>
<tr class="a">
<th> Category </th>
<th> Ambari Property Name </th>
<th> enrichment.properties property </th>
<th> Flux Property </th>
<th> Flux Section Location </th>
<th> Storm Property Name </th>
<th> Notes </th></tr>
</thead><tbody>
<tr class="b">
<td> Storm topology config </td>
<td> enrichment_workers </td>
<td> enrichment.workers </td>
<td> topology.workers </td>
<td> line 18, config </td>
<td> topology.workers </td>
<td> </td></tr>
<tr class="a">
<td> </td>
<td> enrichment_acker_executors </td>
<td> enrichment.acker.executors </td>
<td> topology.acker.executors </td>
<td> line 18, config </td>
<td> topology.acker.executors </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> enrichment_topology_max_spout_pending </td>
<td> topology.max.spout.pending </td>
<td> topology.max.spout.pending </td>
<td> line 18, config </td>
<td> topology.max.spout.pending </td>
<td> </td></tr>
<tr class="a">
<td> Kafka spout </td>
<td> enrichment_kafka_spout_parallelism </td>
<td> kafka.spout.parallelism </td>
<td> parallelism </td>
<td> line 245, id: kafkaSpout </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> n/a </td>
<td> session.timeout.ms </td>
<td> session.timeout.ms </td>
<td> line 201, id: kafkaProps </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="a">
<td> </td>
<td> n/a </td>
<td> enable.auto.commit </td>
<td> enable.auto.commit </td>
<td> line 201, id: kafkaProps </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="b">
<td> </td>
<td> n/a </td>
<td> n/a </td>
<td> setPollTimeoutMs </td>
<td> line 230, id: kafkaConfig </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="a">
<td> </td>
<td> n/a </td>
<td> n/a </td>
<td> setMaxUncommittedOffsets </td>
<td> line 230, id: kafkaConfig </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="b">
<td> </td>
<td> n/a </td>
<td> n/a </td>
<td> setOffsetCommitPeriodMs </td>
<td> line 230, id: kafkaConfig </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="a">
<td> Enrichment splitter </td>
<td> enrichment_split_parallelism </td>
<td> enrichment.split.parallelism </td>
<td> parallelism </td>
<td> line 253, id: enrichmentSplitBolt </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> Enrichment joiner </td>
<td> enrichment_join_parallelism </td>
<td> enrichment.join.parallelism </td>
<td> parallelism </td>
<td> line 316, id: enrichmentJoinBolt </td>
<td> n/a </td>
<td> </td></tr>
<tr class="a">
<td> Threat intel splitter </td>
<td> threat_intel_split_parallelism </td>
<td> threat.intel.split.parallelism </td>
<td> parallelism </td>
<td> line 338, id: threatIntelSplitBolt </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> Threat intel joiner </td>
<td> threat_intel_join_parallelism </td>
<td> threat.intel.join.parallelism </td>
<td> parallelism </td>
<td> line 376, id: threatIntelJoinBolt </td>
<td> n/a </td>
<td> </td></tr>
<tr class="a">
<td> Output bolt </td>
<td> kafka_writer_parallelism </td>
<td> kafka.writer.parallelism </td>
<td> parallelism </td>
<td> line 397, id: outputBolt </td>
<td> n/a </td>
<td> </td></tr>
</tbody>
</table>
<p>When adding Kafka spout properties, there are 3 ways you&#x2019;ll do this.</p>
<ol style="list-style-type: decimal">
<li>
<p>Ambari: If they are properties managed by Ambari (noted in the table under &#x2018;Ambari Property Name&#x2019;), look for the setting in Ambari.</p>
</li>
<li>
<p>Flux -&gt; kafkaProps: add a new key/value to the kafkaProps section HashMap on line 201. For example, if you want to set the Kafka Spout consumer&#x2019;s session.timeout.ms to 30 seconds, you would add the following:</p>
<div>
<div>
<pre class="source"> - name: &quot;put&quot;
args:
- &quot;session.timeout.ms&quot;
- 30000
</pre></div></div>
</li>
<li>
<p>Flux -&gt; kafkaConfig: add a new setter to the kafkaConfig section on line 230. For example, if you want to set the Kafka Spout consumer&#x2019;s poll timeout to 200 milliseconds, you would add the following under <tt>configMethods</tt>:</p>
<div>
<div>
<pre class="source"> - name: &quot;setPollTimeoutMs&quot;
args:
- 200
</pre></div></div>
</li>
</ol>
<p><b>Indexing (Batch)</b></p>
<p>This is a mapping of the various performance tuning properties for indexing and how they are materialized.</p>
<p>Flux file can be found here - $METRON_HOME/flux/indexing/batch/remote.yaml.</p>
<p>Note: Changes to Flux file properties that are managed by Ambari will render Ambari unable to further manage the property.</p>
<table border="0" class="table table-striped">
<thead>
<tr class="a">
<th> Category </th>
<th> Ambari Property Name </th>
<th> hdfs.properties property </th>
<th> Flux Property </th>
<th> Flux Section Location </th>
<th> Storm Property Name </th>
<th> Notes </th></tr>
</thead><tbody>
<tr class="b">
<td> Storm topology config </td>
<td> enrichment_workers </td>
<td> enrichment.workers </td>
<td> topology.workers </td>
<td> line 19, config </td>
<td> topology.workers </td>
<td> </td></tr>
<tr class="a">
<td> </td>
<td> enrichment_acker_executors </td>
<td> enrichment.acker.executors </td>
<td> topology.acker.executors </td>
<td> line 19, config </td>
<td> topology.acker.executors </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> enrichment_topology_max_spout_pending </td>
<td> topology.max.spout.pending </td>
<td> topology.max.spout.pending </td>
<td> line 19, config </td>
<td> topology.max.spout.pending </td>
<td> </td></tr>
<tr class="a">
<td> Kafka spout </td>
<td> batch_indexing_kafka_spout_parallelism </td>
<td> kafka.spout.parallelism </td>
<td> parallelism </td>
<td> line 123, id: kafkaSpout </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> n/a </td>
<td> session.timeout.ms </td>
<td> session.timeout.ms </td>
<td> line 80, id: kafkaProps </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="a">
<td> </td>
<td> n/a </td>
<td> enable.auto.commit </td>
<td> enable.auto.commit </td>
<td> line 80, id: kafkaProps </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="b">
<td> </td>
<td> n/a </td>
<td> n/a </td>
<td> setPollTimeoutMs </td>
<td> line 108, id: kafkaConfig </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="a">
<td> </td>
<td> n/a </td>
<td> n/a </td>
<td> setMaxUncommittedOffsets </td>
<td> line 108, id: kafkaConfig </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="b">
<td> </td>
<td> n/a </td>
<td> n/a </td>
<td> setOffsetCommitPeriodMs </td>
<td> line 108, id: kafkaConfig </td>
<td> n/a </td>
<td> Kafka consumer client property </td></tr>
<tr class="a">
<td> Output bolt </td>
<td> hdfs_writer_parallelism </td>
<td> hdfs.writer.parallelism </td>
<td> parallelism </td>
<td> line 133, id: hdfsIndexingBolt </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> n/a </td>
<td> n/a </td>
<td> hdfsSyncPolicy (see notes below) </td>
<td> line 47, id: hdfsWriter </td>
<td> n/a </td>
<td> See notes below about adding this prop </td></tr>
<tr class="a">
<td> </td>
<td> bolt_hdfs_rotation_policy_units </td>
<td> bolt.hdfs.rotation.policy.units </td>
<td> constructorArgs </td>
<td> line 41, id: hdfsRotationPolicy </td>
<td> n/a </td>
<td> </td></tr>
<tr class="b">
<td> </td>
<td> bolt_hdfs_rotation_policy_count </td>
<td> bolt.hdfs.rotation.policy.count </td>
<td> constructorArgs </td>
<td> line 41, id: hdfsRotationPolicy </td>
<td> n/a </td>
<td> </td></tr>
</tbody>
</table>
<p><i>Note</i>: HDFS sync policy is not currently managed via Ambari. You will need to modify the Flux file directly to accommodate this setting. e.g.</p>
<p>Add a new setter to the hdfsWriter around line 56. Lines 53-55 provided for context.</p>
<div>
<div>
<pre class="source"> 53 - name: &quot;withRotationPolicy&quot;
54 args:
55 - ref: &quot;hdfsRotationPolicy
56 - name: &quot;withSyncPolicy&quot;
57 args:
58 - ref: &quot;hdfsSyncPolicy
</pre></div></div>
<p>Add an hdfsSyncPolicy after the hdfsRotationPolicy that appears on line 41. e.g.</p>
<div>
<div>
<pre class="source"> 41 - id: &quot;hdfsRotationPolicy&quot;
...
45 - &quot;${bolt.hdfs.rotation.policy.units}&quot;
46
47 - id: &quot;hdfsSyncPolicy&quot;
48 className: &quot;org.apache.storm.hdfs.bolt.sync.CountSyncPolicy&quot;
49 constructorArgs:
50 - 100000
</pre></div></div>
</div></div></div>
<div class="section">
<h2><a name="Use_Case_Specific_Tuning_Suggestions"></a>Use Case Specific Tuning Suggestions</h2>
<p>The below discussion outlines a specific tuning exercise we went through for driving 1 Gbps of traffic through a Metron cluster running with 4 Kafka brokers and 4 Storm Supervisors.</p>
<p>General machine specs</p>
<ul>
<li>10 Gb network cards</li>
<li>256 GB memory</li>
<li>12 disks</li>
<li>32 cores</li>
</ul>
<div class="section">
<h3><a name="Performance_Monitoring_Tools"></a>Performance Monitoring Tools</h3>
<p>Before we get to tuning our cluster, it helps to describe what we might actually want to monitor as well as any potential pain points. Prior to switching over to the new Storm Kafka client, which leverages the new Kafka consumer API under the hood, offsets were stored in Zookeeper. While the broker hosts are still stored in Zookeeper, this is no longer true for the offsets which are now stored in Kafka itself. This is a configurable option, and you may switch back to Zookeeper if you choose, but Metron is currently using the new defaults. With this in mind, there are some useful tools that come with Storm and Kafka that we can use to monitor our topologies.</p>
<div class="section">
<h4><a name="Tooling"></a>Tooling</h4>
<p>Kafka</p>
<ul>
<li>consumer group offset lag viewer</li>
<li>There is a GUI tool to make creating, modifying, and generally managing your Kafka topics a bit easier - see <a class="externalLink" href="https://github.com/yahoo/kafka-manager">https://github.com/yahoo/kafka-manager</a></li>
<li>console consumer - useful for quickly verifying topic contents</li>
</ul>
<p>Storm</p>
<ul>
<li>Storm UI - <a class="externalLink" href="http://www.malinga.me/reading-and-understanding-the-storm-ui-storm-ui-explained/">http://www.malinga.me/reading-and-understanding-the-storm-ui-storm-ui-explained/</a></li>
</ul></div>
<div class="section">
<h4><a name="Example_-_Viewing_Kafka_Offset_Lags"></a>Example - Viewing Kafka Offset Lags</h4>
<p>First we need to setup some environment variables</p>
<div>
<div>
<pre class="source">export BROKERLIST=&lt;your broker comma-delimated list of host:ports&gt;
export ZOOKEEPER=&lt;your zookeeper comma-delimated list of host:ports&gt;
export KAFKA_HOME=&lt;kafka home dir&gt;
export METRON_HOME=&lt;your metron home&gt;
export HDP_HOME=&lt;your HDP home&gt;
</pre></div></div>
<p>If you have Kerberos enabled, setup the security protocol</p>
<div>
<div>
<pre class="source">$ cat /tmp/consumergroup.config
security.protocol=SASL_PLAINTEXT
</pre></div></div>
<p>Now run the following command for a running topology&#x2019;s consumer group. In this example we are using enrichments.</p>
<div>
<div>
<pre class="source">${KAFKA_HOME}/bin/kafka-consumer-groups.sh \
--command-config=/tmp/consumergroup.config \
--describe \
--group enrichments \
--bootstrap-server $BROKERLIST \
--new-consumer
</pre></div></div>
<p>This will return a table with the following output depicting offsets for all partitions and consumers associated with the specified consumer group:</p>
<div>
<div>
<pre class="source">GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
enrichments enrichments 9 29746066 29746067 1 consumer-2_/xxx.xxx.xxx.xxx
enrichments enrichments 3 29754325 29754326 1 consumer-1_/xxx.xxx.xxx.xxx
enrichments enrichments 43 29754331 29754332 1 consumer-6_/xxx.xxx.xxx.xxx
...
</pre></div></div>
<p><i>Note</i>: You won&#x2019;t see any output until a topology is actually running because the consumer groups only exist while consumers in the spouts are up and running.</p>
<p>The primary column we&#x2019;re concerned with paying attention to is the LAG column, which is the current delta calculation between the current and end offset for the partition. This tells us how close we are to keeping up with incoming data. And, as we found through multiple trials, whether there are any problems with specific consumers getting stuck.</p>
<p>Taking this one step further, it&#x2019;s probably more useful if we can watch the offsets and lags change over time. In order to do this we&#x2019;ll add a &#x201c;watch&#x201d; command and set the refresh rate to 10 seconds.</p>
<div>
<div>
<pre class="source">watch -n 10 -d ${KAFKA_HOME}/bin/kafka-consumer-groups.sh \
--command-config=/tmp/consumergroup.config \
--describe \
--group enrichments \
--bootstrap-server $BROKERLIST \
--new-consumer
</pre></div></div>
<p>Every 10 seconds the command will re-run and the screen will be refreshed with new information. The most useful bit is that the watch command will highlight the differences from the current output and the last output screens.</p></div></div>
<div class="section">
<h3><a name="Parser_Tuning"></a>Parser Tuning</h3>
<p>We&#x2019;ll be using the bro sensor in this example. Note that the parsers and PCAP use a builder utility, as opposed to enrichments and indexing, which use Flux.</p>
<p>We started with a single partition for the inbound Kafka topics and eventually worked our way up to 48. And We&#x2019;re using the following pending value, as shown below. The default is &#x2018;null&#x2019; which would result in no limit.</p>
<p><b>storm-bro.config</b></p>
<div>
<div>
<pre class="source">{
...
&quot;topology.max.spout.pending&quot; : 2000
...
}
</pre></div></div>
<p>And the following default spout settings. Again, this can be ommitted entirely since we are using the defaults.</p>
<p><b>spout-bro.config</b></p>
<div>
<div>
<pre class="source">{
...
&quot;spout.pollTimeoutMs&quot; : 200,
&quot;spout.maxUncommittedOffsets&quot; : 10000000,
&quot;spout.offsetCommitPeriodMs&quot; : 30000
}
</pre></div></div>
<p>And we ran our bro parser topology with the following options. We did not need to fully match the number of Kafka partitions with our parallelism in this case, though you could certainly do so if necessary. Notice that we only needed 1 worker.</p>
<div>
<div>
<pre class="source">$METRON_HOME/bin/start_parser_topology.sh \
-e ~metron/.storm/storm-bro.config \
-esc ~/.storm/spout-bro.config \
-k $BROKERLIST \
-ksp SASL_PLAINTEXT \
-nw 1 \
-ot enrichments \
-pnt 24 \
-pp 24 \
-s bro \
-snt 24 \
-sp 24 \
-z $ZOOKEEPER \
</pre></div></div>
<p>From the usage docs, here are the options we&#x2019;ve used. The full reference can be found <a href="../metron-platform/metron-parsing/metron-parsers-common/index.html#Starting_the_Parser_Topology">here</a>.</p>
<div>
<div>
<pre class="source">usage: start_parser_topology.sh
-e,--extra_topology_options &lt;JSON_FILE&gt; Extra options in the form
of a JSON file with a map
for content.
-esc,--extra_kafka_spout_config &lt;JSON_FILE&gt; Extra spout config options
in the form of a JSON file
with a map for content.
Possible keys are:
retryDelayMaxMs,retryDelay
Multiplier,retryInitialDel
ayMs,stateUpdateIntervalMs
,bufferSizeBytes,fetchMaxW
ait,fetchSizeBytes,maxOffs
etBehind,metricsTimeBucket
SizeInSecs,socketTimeoutMs
-k,--kafka &lt;BROKER_URL&gt; Kafka Broker URL
-ksp,--kafka_security_protocol &lt;SECURITY_PROTOCOL&gt; Kafka Security Protocol
-nw,--num_workers &lt;NUM_WORKERS&gt; Number of Workers
-ot,--output_topic &lt;KAFKA_TOPIC&gt; Output Kafka Topic
-pnt,--parser_num_tasks &lt;NUM_TASKS&gt; Parser Num Tasks
-pp,--parser_p &lt;PARALLELISM_HINT&gt; Parser Parallelism Hint
-s,--sensor &lt;SENSOR_TYPE&gt; Sensor Type
-snt,--spout_num_tasks &lt;NUM_TASKS&gt; Spout Num Tasks
-sp,--spout_p &lt;SPOUT_PARALLELISM_HINT&gt; Spout Parallelism Hint
-z,--zk &lt;ZK_QUORUM&gt; Zookeeper Quroum URL
(zk1:2181,zk2:2181,...
</pre></div></div>
</div>
<div class="section">
<h3><a name="Enrichment_Tuning"></a>Enrichment Tuning</h3>
<p><b>Note</b> These tuning suggestions are based on the deprecated split-join topology.</p>
<p>We landed on the same number of partitions for enrichemnt and indexing as we did for bro - 48.</p>
<p>For configuring Storm, there is a flux file and properties file that we modified. Here are the settings we changed for bro in Flux. Note that the main Metron-specific option we&#x2019;ve changed to accomodate the desired rate of data throughput is max cache size in the join bolts. More information on Flux can be found here - <a class="externalLink" href="http://storm.apache.org/releases/1.0.1/flux.html">http://storm.apache.org/releases/1.0.1/flux.html</a></p>
<p><b>General storm settings</b></p>
<div>
<div>
<pre class="source">topology.workers: 8
topology.acker.executors: 48
topology.max.spout.pending: 2000
</pre></div></div>
<p><b>Spout and Bolt Settings</b></p>
<div>
<div>
<pre class="source">kafkaSpout
parallelism=48
session.timeout.ms=29999
enable.auto.commit=false
setPollTimeoutMs=200
setMaxUncommittedOffsets=10000000
setOffsetCommitPeriodMs=30000
enrichmentSplitBolt
parallelism=4
enrichmentJoinBolt
parallelism=8
withMaxCacheSize=200000
withMaxTimeRetain=10
threatIntelSplitBolt
parallelism=4
threatIntelJoinBolt
parallelism=4
withMaxCacheSize=200000
withMaxTimeRetain=10
outputBolt
parallelism=48
</pre></div></div>
</div>
<div class="section">
<h3><a name="Indexing_.28HDFS.29_Tuning"></a>Indexing (HDFS) Tuning</h3>
<p>There are 48 partitions set for the indexing partition, per the enrichment exercise above.</p>
<p>These are the batch size settings for the bro index</p>
<div>
<div>
<pre class="source">cat ${METRON_HOME}/config/zookeeper/indexing/bro.json
{
&quot;hdfs&quot; : {
&quot;index&quot;: &quot;bro&quot;,
&quot;batchSize&quot;: 50,
&quot;enabled&quot; : true
}...
}
</pre></div></div>
<p>And here are the settings we used for the indexing topology</p>
<p><b>General storm settings</b></p>
<div>
<div>
<pre class="source">topology.workers: 4
topology.acker.executors: 24
topology.max.spout.pending: 2000
</pre></div></div>
<p><b>Spout and Bolt Settings</b></p>
<div>
<div>
<pre class="source">hdfsSyncPolicy
org.apache.storm.hdfs.bolt.sync.CountSyncPolicy
constructor arg=100000
hdfsRotationPolicy
bolt.hdfs.rotation.policy.units=DAYS
bolt.hdfs.rotation.policy.count=1
kafkaSpout
parallelism: 24
session.timeout.ms=29999
enable.auto.commit=false
setPollTimeoutMs=200
setMaxUncommittedOffsets=10000000
setOffsetCommitPeriodMs=30000
hdfsIndexingBolt
parallelism: 24
</pre></div></div>
</div>
<div class="section">
<h3><a name="PCAP_Tuning"></a>PCAP Tuning</h3>
<p>PCAP is a specialized topology that is a Spout-only topology. Both Kafka topic consumption and HDFS writing is done within a spout to avoid the additional network hop required if using an additional bolt.</p>
<p><b>General Storm topology properties</b></p>
<div>
<div>
<pre class="source">topology.workers=16
topology.ackers.executors: 0
</pre></div></div>
<p><b>Spout and Bolt properties</b></p>
<div>
<div>
<pre class="source">kafkaSpout
parallelism: 128
poll.timeout.ms=100
offset.commit.period.ms=30000
session.timeout.ms=39000
max.uncommitted.offsets=200000000
max.poll.interval.ms=10
max.poll.records=200000
receive.buffer.bytes=431072
max.partition.fetch.bytes=10000000
enable.auto.commit=false
setMaxUncommittedOffsets=20000000
setOffsetCommitPeriodMs=30000
writerConfig
withNumPackets=1265625
withMaxTimeMS=0
withReplicationFactor=1
withSyncEvery=80000
withHDFSConfig
io.file.buffer.size=1000000
dfs.blocksize=1073741824
</pre></div></div>
</div></div>
<div class="section">
<h2><a name="Debugging"></a>Debugging</h2>
<p>Set the following env vars accordingly for your cluster. This is how we would configure it for the Metron full dev development environment.</p>
<div>
<div>
<pre class="source">export HDP_HOME=/usr/hdp/current
export KAFKA_HOME=$HDP_HOME/kafka-broker
export STORM_UI=http://node1:8744
export ELASTIC=http://node1:9200
export ZOOKEEPER=node1:2181
export METRON_VERSION=0.7.1
export METRON_HOME=/usr/metron/${METRON_VERSION}
</pre></div></div>
<p>Note that the output from Storm will be a flattened blob of JSON. In order to pretty print for readability, you can pipe it through a JSON formatter, e.g.</p>
<div>
<div>
<pre class="source">[some Storm curl command] | python -m json.tool
</pre></div></div>
<p><b>Getting Storm Configuration Details</b></p>
<p>Storm has a useful REST API you can use to get full details about your running topologies. This is generally more convenient and complete for troubleshooting performance problems than going to the Storm UI alone. See Storm&#x2019;s <a class="externalLink" href="http://storm.apache.org/releases/1.1.0/STORM-UI-REST-API.html">REST API docs</a> for more details.</p>
<div>
<div>
<pre class="source"># get Storm cluster summary info including version
curl -XGET ${STORM_UI}'/api/v1/cluster/summary'
</pre></div></div>
<div>
<div>
<pre class="source"># get overall Storm cluster configuration
curl -XGET ${STORM_UI}'/api/v1/cluster/configuration'
</pre></div></div>
<div>
<div>
<pre class="source"># get list of topologies and brief summary detail
curl -XGET ${STORM_UI}'/api/v1/topology/summary'
</pre></div></div>
<div>
<div>
<pre class="source"># get all topology runtime settings. Plugin the ID for your topology, which you can get from the topology summary command or from the Storm UI. Passing sys=1 will also return system stats.
curl -XGET ${STORM_UI}'/api/v1/topology/:id?sys=1&#x200b;'
</pre></div></div>
<p><b>Getting Kafka Configuration Details</b></p>
<div>
<div>
<pre class="source"># Get list of Kafka topics
${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --list
</pre></div></div>
<div>
<div>
<pre class="source"># Get Kafka topic details - plugin the desired topic name in place of &quot;enrichments&quot;
${HDP_HOME}/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --topic enrichments --describe
</pre></div></div>
<p><b>Getting Metron Topology Zookeeper Configuration</b></p>
<div>
<div>
<pre class="source"># Provides a full listing of all Metron parser, enrichment, and indexing topology configuration
$METRON_HOME/bin/zk_load_configs.sh -m DUMP -z $ZOOKEEPER
</pre></div></div>
</div>
<div class="section">
<h2><a name="Issues"></a>Issues</h2>
<p><b>Error</b></p>
<div>
<div>
<pre class="source">org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned
the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms,
which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the
session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records
</pre></div></div>
<p><b>Suggestions</b></p>
<p>This implies that the spout hasn&#x2019;t been given enough time between polls before committing the offsets. In other words, the amount of time taken to process the messages is greater than the timeout window. In order to fix this, you can improve message throughput by modifying the options outlined above, increasing the poll timeout, or both.</p></div>
<div class="section">
<h2><a name="Reference"></a>Reference</h2>
<ul>
<li><a href="metron-enrichment/Performance.html">Enrichment Performance</a></li>
<li><a class="externalLink" href="http://storm.apache.org/releases/1.1.0/flux.html">http://storm.apache.org/releases/1.1.0/flux.html</a></li>
<li><a class="externalLink" href="https://stackoverflow.com/questions/17257448/what-is-the-task-in-storm-parallelism">https://stackoverflow.com/questions/17257448/what-is-the-task-in-storm-parallelism</a></li>
<li><a class="externalLink" href="http://storm.apache.org/releases/current/Understanding-the-parallelism-of-a-Storm-topology.html">http://storm.apache.org/releases/current/Understanding-the-parallelism-of-a-Storm-topology.html</a></li>
<li><a class="externalLink" href="http://www.malinga.me/reading-and-understanding-the-storm-ui-storm-ui-explained/">http://www.malinga.me/reading-and-understanding-the-storm-ui-storm-ui-explained/</a></li>
<li><a class="externalLink" href="https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/">https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/</a></li>
<li><a class="externalLink" href="https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_storm-component-guide/content/storm-kafkaspout-perf.html">https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_storm-component-guide/content/storm-kafkaspout-perf.html</a></li>
<li><a class="externalLink" href="http://storm.apache.org/releases/1.1.0/STORM-UI-REST-API.html">http://storm.apache.org/releases/1.1.0/STORM-UI-REST-API.html</a></li>
</ul></div>
</div>
</div>
</div>
<hr/>
<footer>
<div class="container-fluid">
<div class="row-fluid">
© 2015-2016 The Apache Software Foundation. Apache Metron, Metron, Apache, the Apache feather logo,
and the Apache Metron project logo are trademarks of The Apache Software Foundation.
</div>
</div>
</footer>
</body>
</html>