blob: 3e9700ba67eb0ce44cba629cd88e1ef1267a61c0 [file] [log] [blame]
<!DOCTYPE html>
<!--
| Generated by Apache Maven Doxia Site Renderer 1.8 from src/site/markdown/metron-platform/metron-writer/index.md at 2019-05-14
| Rendered using Apache Maven Fluido Skin 1.7
-->
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="Date-Revision-yyyymmdd" content="20190514" />
<meta http-equiv="Content-Language" content="en" />
<title>Metron &#x2013; Writer</title>
<link rel="stylesheet" href="../../css/apache-maven-fluido-1.7.min.css" />
<link rel="stylesheet" href="../../css/site.css" />
<link rel="stylesheet" href="../../css/print.css" media="print" />
<script type="text/javascript" src="../../js/apache-maven-fluido-1.7.min.js"></script>
<script type="text/javascript">
$( document ).ready( function() { $( '.carousel' ).carousel( { interval: 3500 } ) } );
</script>
</head>
<body class="topBarDisabled">
<div class="container-fluid">
<div id="banner">
<div class="pull-left"><a href="http://metron.apache.org/" id="bannerLeft"><img src="../../images/metron-logo.png" alt="Apache Metron" width="148px" height="48px"/></a></div>
<div class="pull-right"></div>
<div class="clear"><hr/></div>
</div>
<div id="breadcrumbs">
<ul class="breadcrumb">
<li class=""><a href="http://www.apache.org" class="externalLink" title="Apache">Apache</a><span class="divider">/</span></li>
<li class=""><a href="http://metron.apache.org/" class="externalLink" title="Metron">Metron</a><span class="divider">/</span></li>
<li class=""><a href="../../index.html" title="Documentation">Documentation</a><span class="divider">/</span></li>
<li class="active ">Writer</li>
<li id="publishDate" class="pull-right"><span class="divider">|</span> Last Published: 2019-05-14</li>
<li id="projectVersion" class="pull-right">Version: 0.7.1</li>
</ul>
</div>
<div class="row-fluid">
<div id="leftColumn" class="span2">
<div class="well sidebar-nav">
<ul class="nav nav-list">
<li class="nav-header">User Documentation</li>
<li><a href="../../index.html" title="Metron"><span class="icon-chevron-down"></span>Metron</a>
<ul class="nav nav-list">
<li><a href="../../CONTRIBUTING.html" title="CONTRIBUTING"><span class="none"></span>CONTRIBUTING</a></li>
<li><a href="../../Upgrading.html" title="Upgrading"><span class="none"></span>Upgrading</a></li>
<li><a href="../../metron-analytics/index.html" title="Analytics"><span class="icon-chevron-right"></span>Analytics</a></li>
<li><a href="../../metron-contrib/metron-docker/index.html" title="Docker"><span class="none"></span>Docker</a></li>
<li><a href="../../metron-contrib/metron-performance/index.html" title="Performance"><span class="none"></span>Performance</a></li>
<li><a href="../../metron-deployment/index.html" title="Deployment"><span class="icon-chevron-right"></span>Deployment</a></li>
<li><a href="../../metron-interface/index.html" title="Interface"><span class="icon-chevron-right"></span>Interface</a></li>
<li><a href="../../metron-platform/index.html" title="Platform"><span class="icon-chevron-down"></span>Platform</a>
<ul class="nav nav-list">
<li><a href="../../metron-platform/Performance-tuning-guide.html" title="Performance-tuning-guide"><span class="none"></span>Performance-tuning-guide</a></li>
<li><a href="../../metron-platform/metron-common/index.html" title="Common"><span class="none"></span>Common</a></li>
<li><a href="../../metron-platform/metron-data-management/index.html" title="Data-management"><span class="none"></span>Data-management</a></li>
<li><a href="../../metron-platform/metron-elasticsearch/index.html" title="Elasticsearch"><span class="none"></span>Elasticsearch</a></li>
<li><a href="../../metron-platform/metron-enrichment/index.html" title="Enrichment"><span class="icon-chevron-right"></span>Enrichment</a></li>
<li><a href="../../metron-platform/metron-hbase-server/index.html" title="Hbase-server"><span class="none"></span>Hbase-server</a></li>
<li><a href="../../metron-platform/metron-indexing/index.html" title="Indexing"><span class="none"></span>Indexing</a></li>
<li><a href="../../metron-platform/metron-job/index.html" title="Job"><span class="none"></span>Job</a></li>
<li><a href="../../metron-platform/metron-management/index.html" title="Management"><span class="none"></span>Management</a></li>
<li><a href="../../metron-platform/metron-parsing/index.html" title="Parsing"><span class="icon-chevron-right"></span>Parsing</a></li>
<li><a href="../../metron-platform/metron-pcap-backend/index.html" title="Pcap-backend"><span class="none"></span>Pcap-backend</a></li>
<li><a href="../../metron-platform/metron-solr/index.html" title="Solr"><span class="none"></span>Solr</a></li>
<li class="active"><a href="#"><span class="none"></span>Writer</a></li>
</ul>
</li>
<li><a href="../../metron-sensors/index.html" title="Sensors"><span class="icon-chevron-right"></span>Sensors</a></li>
<li><a href="../../metron-stellar/stellar-3rd-party-example/index.html" title="Stellar-3rd-party-example"><span class="none"></span>Stellar-3rd-party-example</a></li>
<li><a href="../../metron-stellar/stellar-common/index.html" title="Stellar-common"><span class="icon-chevron-right"></span>Stellar-common</a></li>
<li><a href="../../metron-stellar/stellar-zeppelin/index.html" title="Stellar-zeppelin"><span class="none"></span>Stellar-zeppelin</a></li>
<li><a href="../../use-cases/index.html" title="Use-cases"><span class="icon-chevron-right"></span>Use-cases</a></li>
</ul>
</li>
</ul>
<hr />
<div id="poweredBy">
<div class="clear"></div>
<div class="clear"></div>
<div class="clear"></div>
<div class="clear"></div>
<a href="http://maven.apache.org/" title="Built by Maven" class="poweredBy"><img class="builtBy" alt="Built by Maven" src="../../images/logos/maven-feather.png" /></a>
</div>
</div>
</div>
<div id="bodyColumn" class="span10" >
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<h1>Writer</h1>
<p><a name="Writer"></a></p>
<div class="section">
<h2><a name="Introduction"></a>Introduction</h2>
<p>The writer module provides some utilties for writing to outside components from within Storm. This includes managing bulk writing. An implemention is included for writing to HDFS in this module. Other writers can be found in their own modules.</p></div>
<div class="section">
<h2><a name="Bulk_Message_Writing"></a>Bulk Message Writing</h2>
<p>Most external components encourage messages to be written in batches for performance reasons. The writer module includes an abstraction for doing this in an efficient manner. This abstraction provides the following features:</p>
<ul>
<li>A high-level <tt>BulkWriterComponent</tt> class that manages a per-sensor cache of batched messages and flushes when appropriate</li>
<li>An extension point for determining when a batch should be flushed</li>
<li>An extention point for handling a bulk message write response after a batch has been flushed</li>
</ul>
<div class="section">
<h3><a name="Flush_Policies"></a>Flush Policies</h3>
<p>Flushing behavior is controlled by a collection of <tt>FlushPolicy</tt> objects. They are responsible for 2 things:</p>
<ol style="list-style-type: decimal">
<li>Determining when a batch should be flushed</li>
<li>Handling a <tt>BulkWriterResponse</tt> after a batch is flushed and messages are written with a <tt>BulkMessageWriter</tt></li>
</ol>
<p>The <tt>FlushPolicy</tt> interface defines methods for handling these responsiblities:</p>
<ul>
<li><tt>boolean shouldFlush(String sensorType, WriterConfiguration configurations, List&lt;BulkMessage&lt;MESSAGE_T&gt;&gt; messages)</tt></li>
<li><tt>void onFlush(String sensorType, BulkWriterResponse response)</tt></li>
</ul>
<p>There are 2 <tt>FlushPolicy</tt> implementations included by default:</p>
<ul>
<li>The <tt>BatchSizePolicy</tt> will flush a batch whenever the batch size reaches a configured value. This configuration value is represented by the <tt>batchSize</tt> property in either the parser, enrichment or indexing configuration (whichever is appropriate in the current context).</li>
<li>The <tt>BatchTimeoutPolicy</tt> will flush a batch whenever the batch timeout has elapsed. This configuration value is represented by the <tt>batchTimeout</tt> property in either the parser, enrichment or indexing configuration (whichever is appropriate in the current context). A <tt>maxBatchTimeout</tt> is set at creation time and serves as the ceiling for a batch timeout. In Storm topologies, this value is set to 1/2 the tuple timeout setting to ensure messages are always flushed before their tuples timeout. After a batch is flushed, the batch timer is reset for that sensor type.</li>
</ul>
<p>For example, a configuration that sets the <tt>batchSize</tt> and <tt>batchTimeout</tt> in a parser topology will look like:</p>
<div>
<div>
<pre class="source">{
&quot;parserClassName&quot;: &quot;org.apache.metron.parsers.bro.BasicBroParser&quot;,
&quot;sensorTopic&quot;: &quot;bro&quot;,
&quot;parserConfig&quot;: {
&quot;batchSize&quot;: 5
&quot;batchTimeout&quot;: 2
}
}
</pre></div></div>
<p>Similarly for the enrichment topology (configured in the <a href="../metron-common/index.html#Global_Configuration">Global Configuration</a>):</p>
<div>
<div>
<pre class="source">{
&quot;enrichment.writer.batchSize&quot;: &quot;5&quot;,
&quot;enrichment.writer.batchTimeout&quot;: &quot;2&quot;,
...
}
</pre></div></div>
<p>And finally for the indexing topology:</p>
<div>
<div>
<pre class="source">{
&quot;elasticsearch&quot;: {
&quot;index&quot;: &quot;bro&quot;,
&quot;batchSize&quot;: 5,
&quot;batchTimeout&quot;: 2,
&quot;enabled&quot;: true
},
...
}
</pre></div></div>
<p>Additional policies can be added as needed. For example, an <tt>AckTuplesPolicy</tt> is added in the Storm bolts to handle acking tuples after a batch is flushed.</p></div>
<div class="section">
<h3><a name="Bulk_Writing_Workflow"></a>Bulk Writing Workflow</h3>
<p>The <tt>BulkWriterComponent</tt> class collects messages in separate sensor-specific caches. This class is instantiated and supplied to classes that need to write messages to external components. A collection of default <tt>FlushPolicy</tt> implementations are created by default with the option of passing in additional <tt>FlushPolicy</tt> objects as needed.</p>
<p>Batching and writing messages follows this process:</p>
<ol style="list-style-type: decimal">
<li>A single message is passed to the <tt>BulkWriterComponent.write</tt> method and stored in the appropriate cache based on the sensor type. A <tt>BulkMessageWriter</tt> is also supplied to do the actual writing when messages are flushed.</li>
<li>The collection of <tt>FlushPolicy</tt> implementations are checked and a batch is flushed whenever the first <tt>FlushPolicy.shouldFlush</tt> returns true.</li>
<li>If no policies signal a flush, then nothing happens. If a policy does signal a flush, the batch of messages for that sensor are written with the supplied <tt>BulkMessageWriter</tt>. Each <tt>FlushPolicy.onFlush</tt> method is then called with the <tt>BulkWriterResponse</tt>.</li>
<li>If a sensor type has been disabled, it&#x2019;s batch is flushed immediately (<tt>FlushPolicy.shouldFlush</tt> is not checked).</li>
<li>A <tt>BulkWriterComponent.flushAll</tt> method is available that immediately calls the <tt>FlushPolicy.shouldFlush</tt> methods for each sensor type in the cache. This should be called periodically by the class containing <tt>BulkWriterComponent</tt> to ensure messages are not left sitting in the cache. For example, the Storm bolts call this whenever a tick tuple is received.</li>
</ol></div>
<div class="section">
<h3><a name="Logging"></a>Logging</h3>
<p>Logging can be enabled for the classes described in this section to provide insight into how messages are being batched and flushed. This can be an important tool when performance tuning. Setting the log level to <tt>DEBUG</tt> on the <tt>org.apache.metron.writer</tt> package will produce detailed information about when batches are flushed, which sensor a flushed batch corresponds to, which policy caused the flush, and how long it took to write the batch.</p></div>
<div class="section">
<h3><a name="Performance_Tuning"></a>Performance Tuning</h3>
<p>The primary purpose of the Bulk Message Writing abstraction is to enable efficient writing to external components. This section provides recommendations and strategies for doing that. It is assumed that most Metron installations will include a variety of sensors with different volumes and velocities. Different sensors will likely need to be tuned differently.</p>
<div class="section">
<h4><a name="Set_batches_sizes_higher_for_high_volume_sensors"></a>Set batches sizes higher for high volume sensors</h4>
<p>High volume sensors should be identified and configured with higher batch sizes (1000+ is recommended). Use logging to verify these batches are in fact filling up. The number of actual message written should match the batch size. Keep in mind that large batch sizes also require more memory to hold messages. Streaming engines like Storm limit how many messages can be processed at a time (the <tt>topology.max.spout.pending</tt> setting).</p></div>
<div class="section">
<h4><a name="Set_batch_timeouts_lower_for_low_volume_sensors"></a>Set batch timeouts lower for low volume sensors</h4>
<p>Low volume sensors make take longer to fill up a batch, especially if the batch size is set higher. This can be undesirable because messages may stay cached for longer than necessary, consuming memory and increasing latency for that sensor type.</p></div>
<div class="section">
<h4><a name="Be_careful_with_threads"></a>Be careful with threads</h4>
<p>Each thread (executor in Storm) maintains it&#x2019;s own message cache. Allocating too many threads will cause messages to be spread too thin across separate caches and batches won&#x2019;t fill up completely. This should be balanced with having enough threads to take advantage of any parallel write capability offered by the endpoint that&#x2019;s being written to.</p></div>
<div class="section">
<h4><a name="Watch_for_high_write_times"></a>Watch for high write times</h4>
<p>Use logging to evaluate write timing. Unusually high write times can indicate that an endpoint is not configured correctly or undersized.</p></div></div></div>
<div class="section">
<h2><a name="Kafka_Writer"></a>Kafka Writer</h2>
<p>We have an implementation of a writer which will write batches of messages to Kafka. An interesting aspect of this writer is that it can be configured to allow users to specify a message field which contains the topic for the message.</p>
<p>The configuration for this writer is held in the individual Sensor Configurations:</p>
<ul>
<li><a href="../metron-enrichment/index.html#sensor-enrichment-configuration">Enrichment</a> under the <tt>config</tt> element</li>
<li><a href="../metron-parsers-common/index.html#parser-configuration">Parsers</a> in the <tt>parserConfig</tt> element</li>
<li>Profiler - Unsupported currently</li>
</ul>
<p>In each of these, the kafka writer can be configured via a map which has the following elements:</p>
<ul>
<li><tt>kafka.brokerUrl</tt> : The broker URL</li>
<li><tt>kafka.keySerializer</tt> : The key serializer (defaults to <tt>StringSerializer</tt>)</li>
<li><tt>kafka.valueSerializer</tt> : The key serializer (defaults to <tt>StringSerializer</tt>)</li>
<li><tt>kafka.zkQuorum</tt> : The zookeeper quorum</li>
<li><tt>kafka.requiredAcks</tt> : Whether to require acks.</li>
<li><tt>kafka.topic</tt> : The topic to write to</li>
<li><tt>kafka.topicField</tt> : The field to pull the topic from. If this is specified, then the producer will use this. If it is unspecified, then it will default to the <tt>kafka.topic</tt> property. If neither are specified, then an error will occur.</li>
<li><tt>kafka.producerConfigs</tt> : A map of kafka producer configs for advanced customization.</li>
</ul></div>
<div class="section">
<h2><a name="HDFS_Writer"></a>HDFS Writer</h2>
<p>The HDFS writer included here expands on what Storm has in several ways. There&#x2019;s customization in syncing to HDFS, rotation policy, etc. In addition, the writer allows for users to define output paths based on the fields in the provided JSON message. This can be defined using Stellar.</p>
<p>To manage the output path, a base path argument is provided by the Flux file, with the FileNameFormat as follows</p>
<div>
<div>
<pre class="source"> - id: &quot;fileNameFormat&quot;
className: &quot;org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat&quot;
configMethods:
- name: &quot;withPrefix&quot;
args:
- &quot;enrichment-&quot;
- name: &quot;withExtension&quot;
args:
- &quot;.json&quot;
- name: &quot;withPath&quot;
args:
- &quot;/apps/metron/&quot;
</pre></div></div>
<p>This means that all output will land in <tt>/apps/metron/</tt>. With no further adjustment, it will be <tt>/apps/metron/&lt;sensor&gt;/</tt>. However, by modifying the sensor&#x2019;s JSON config, it is possible to provide additional pathing based on the the message itself.</p>
<p>The output format of a file will be <tt>{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}</tt>. Notably, because of the way file rotations are handled by the HdfsWriter, <tt>rotationNum</tt> will always be 0, but RotationActions still get executed normally.</p>
<p>E.g.</p>
<div>
<div>
<pre class="source">{
&quot;index&quot;: &quot;bro&quot;,
&quot;batchSize&quot;: 5,
&quot;outputPathFunction&quot;: &quot;FORMAT('uid-%s', uid)&quot;
}
</pre></div></div>
<p>will land data in <tt>/apps/metron/uid-&lt;uid&gt;/</tt>.</p>
<p>For example, if the data contains uid&#x2019;s 1, 3, and 5, there will be 3 output folders in HDFS:</p>
<div>
<div>
<pre class="source">/apps/metron/uid-1/
/apps/metron/uid-3/
/apps/metron/uid-5/
</pre></div></div>
<p>The Stellar function must return a String, but is not limited to FORMAT functions. Other functions, such as <tt>TO_LOWER</tt>, <tt>TO_UPPER</tt>, etc. are all available for use. Typically, it&#x2019;s preferable to do nontrivial transformations as part of enrichment and simply reference the output here.</p>
<p>If no Stellar function is provided, it will default to putting the sensor in a folder, as above.</p>
<p>A caveat is that the writer will only allow a certain number of files to be created at once. HdfsWriter has a function <tt>withMaxOpenFiles</tt> allowing this to be set. The default is 500. This can be set in Flux:</p>
<div>
<div>
<pre class="source"> - id: &quot;hdfsWriter&quot;
className: &quot;org.apache.metron.writer.hdfs.HdfsWriter&quot;
configMethods:
- name: &quot;withFileNameFormat&quot;
args:
- ref: &quot;fileNameFormat&quot;
- name: &quot;withRotationPolicy&quot;
args:
- ref: &quot;hdfsRotationPolicy&quot;
- name: &quot;withMaxOpenFiles&quot;
args: 500
</pre></div></div></div>
</div>
</div>
</div>
<hr/>
<footer>
<div class="container-fluid">
<div class="row-fluid">
© 2015-2016 The Apache Software Foundation. Apache Metron, Metron, Apache, the Apache feather logo,
and the Apache Metron project logo are trademarks of The Apache Software Foundation.
</div>
</div>
</footer>
</body>
</html>