blob: b588e641fcce70b31b0fbfdac178294d4976188a [file] [log] [blame]
<!DOCTYPE html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
<meta charset="utf-8">
<title>Samza Configuration Reference</title>
<style type="text/css">
body {
font-family: "Helvetica Neue",Helvetica,Arial,sans-serif;
font-size: 14px;
line-height: 22px;
color: #333;
background-color: #fff;
}
table {
border-collapse: collapse;
margin: 1em 0;
}
table th, table td {
text-align: left;
vertical-align: top;
padding: 12px;
border-bottom: 1px solid #ccc;
border-top: 1px solid #ccc;
border-left: 0;
border-right: 0;
}
table td.property, table td.default {
white-space: nowrap;
}
table th.section {
background-color: #eee;
}
table th.section .subtitle {
font-weight: normal;
}
code, a.property {
font-family: monospace;
}
span.system, span.stream, span.store, span.serde, span.rewriter, span.listener, span.reporter, span.resource {
padding: 1px;
margin: 1px;
border-width: 1px;
border-style: solid;
border-radius: 4px;
}
span.system {
background-color: #ddf;
border-color: #bbd;
}
span.stream {
background-color: #dfd;
border-color: #bdb;
}
span.store {
background-color: #fdf;
border-color: #dbd;
}
span.serde {
background-color: #fdd;
border-color: #dbb;
}
span.rewriter {
background-color: #eee;
border-color: #ccc;
}
span.listener {
background-color: #ffd;
border-color: #ddb;
}
span.reporter {
background-color: #dff;
border-color: #bdd;
}
span.resource {
background-color: #ded;
border-color: #bcb;
}
</style>
</head>
<body>
<h1>Samza Configuration Reference</h1>
<p>The following table lists all the standard properties that can be included in a Samza job configuration file.</p>
<p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names.</p>
<table>
<tbody>
<tr><th>Name</th><th>Default</th><th>Description</th></tr>
<tr>
<th colspan="3" class="section" id="application">Samza application configuration</th>
</tr>
<tr>
<td class="property" id="app-name">app.name</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The name of your application.
</td>
</tr>
<tr>
<td class="property" id="app-id">app.id</td>
<td class="default">1</td>
<td class="description">
If you run several instances of your application at the same time, you need to give each instance a
different <code>app.id</code>. This is important, since otherwise the applications will overwrite each
others' checkpoints, and perhaps interfere with each other in other ways.
</td>
</tr>
<tr>
<td class="property" id="app-class">app.class</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The application to run. The value is a fully-qualified Java classname,
which must implement <a href="../api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a>.
A StreamApplication describes as a series of transformations on the streams.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="job"><a href="configuration.html">Samza job configuration</a></th>
</tr>
<tr>
<td class="property" id="job-factory-class">job.factory.class</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The <a href="job-runner.html">job factory</a> to use for running this job.
The value is a fully-qualified Java classname, which must implement
<a href="../api/javadocs/org/apache/samza/job/StreamJobFactory.html">StreamJobFactory</a>.
Samza ships with three implementations:
<dl>
<dt><code>org.apache.samza.job.local.ThreadJobFactory</code></dt>
<dd>Runs your job on your local machine using threads. This is intended only for
development, not for production deployments.</dd>
<dt><code>org.apache.samza.job.local.ProcessJobFactory</code></dt>
<dd>Runs your job on your local machine as a subprocess. An optional command builder
property can also be specified (see <a href="#task-command-class" class="property">
task.command.class</a> for details). This is intended only for development,
not for production deployments.</dd>
<dt><code>org.apache.samza.job.yarn.YarnJobFactory</code></dt>
<dd>Runs your job on a YARN grid. See <a href="#yarn">below</a> for YARN-specific configuration.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="job-name">job.name</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The name of your job. This name appears on the Samza dashboard, and it
is used to tell apart this job's checkpoints from other jobs' checkpoints.
</td>
</tr>
<tr>
<td class="property" id="job-id">job.id</td>
<td class="default">1</td>
<td class="description">
If you run several instances of your job at the same time, you need to give each execution a
different <code>job.id</code>. This is important, since otherwise the jobs will overwrite each
others' checkpoints, and perhaps interfere with each other in other ways.
</td>
</tr>
<td class="property" id="job-coordinator-system">job.coordinator.system</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The <span class="system">system-name</span> to use for creating and maintaining the <a href="../container/coordinator-stream.html">Coordinator Stream</a>.
</td>
</tr>
<tr>
<td class="property" id="job-default-system">job.default.system</td>
<td class="default"></td>
<td class="description">
The <span class="system">system-name</span> to access any input or output streams for which the system is not explicitly configured.
This property is for input and output streams whereas job.coordinator.system is for samza metadata streams.</a>.
</td>
</tr>
<tr>
<td class="property" id="job-coordinator-replication-factor">job.coordinator.<br />replication.factor</td>
<td class="default">3</td>
<td class="description">
If you are using Kafka for coordinator stream, this is the number of Kafka nodes to which you want the
coordinator topic replicated for durability.
</td>
</tr>
<tr>
<td class="property" id="job-coordinator-segment-bytes">job.coordinator.<br />segment.bytes</td>
<td class="default">26214400</td>
<td class="description">
If you are using a Kafka system for coordinator stream, this is the segment size to be used for the coordinator
topic's log segments. Keeping this number small is useful because it increases the frequency
that Kafka will garbage collect old messages.
</td>
</tr>
<tr>
<td class="property" id="job-coordinator-max-message-bytes">job.coordinator.<br>max.message.bytes</td>
<td class="default">1000012</td>
<td class="description">
If you are using Kafka for coordinator stream, this sets the largest record size for the checkpoint
topic.
</td>
</tr>
<tr>
<td class="property" id="job-coordinator-monitor-partition-change">job.coordinator.<br />monitor-partition-change</td>
<td class="default">false</td>
<td class="description">
This is deprecated in favor of <a href="#job-coordinator-monitor-partition-change-frequency-ms">job.coordinator.monitor-partition-change.frequency.ms</a>.
</td>
</tr>
<tr>
<td class="property" id="job-coordinator-monitor-partition-change-frequency-ms">job.coordinator.<br />monitor-partition-change.frequency.ms</td>
<td class="default">300000</td>
<td class="description">
The frequency at which the input streams' partition count change should be detected. When the input
partition count change is detected, Samza will automatically restart a stateless job or fail a
stateful job. A longer time interval is recommended for jobs w/ large number of input system stream
partitions, since gathering partition count may incur measurable overhead to the job. You can
completely disable partition count monitoring by setting this value to 0 or a negative integer,
which will also disable auto-restart/failing behavior of a Samza job on partition count changes.
</td>
</tr>
<tr>
<td class="property" id="job-config-rewriter-class">job.config.rewriter.<br><span class="rewriter">rewriter-name</span>.class</td>
<td class="default"></td>
<td class="description">
You can optionally define configuration rewriters, which have the opportunity to dynamically
modify the job configuration before the job is started. For example, this can be useful for
pulling configuration from an external configuration management system, or for determining
the set of input streams dynamically at runtime. The value of this property is a
fully-qualified Java classname which must implement
<a href="../api/javadocs/org/apache/samza/config/ConfigRewriter.html">ConfigRewriter</a>.
Samza ships with these rewriters by default:
<dl>
<dt><code>org.apache.samza.config.RegExTopicGenerator</code></dt>
<dd>When consuming from Kafka, this allows you to consume all Kafka topics that match
some regular expression (rather than having to list each topic explicitly).
This rewriter has <a href="#regex-rewriter">additional configuration</a>.</dd>
<dt><code>org.apache.samza.config.EnvironmentConfigRewriter</code></dt>
<dd>This rewriter takes environment variables that are prefixed with <i>SAMZA_</i>
and adds them to the configuration, overriding previous values where they
exist. The keys are lowercased and underscores are converted to dots.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="job-config-rewriters">job.config.rewriters</td>
<td class="default"></td>
<td class="description">
If you have defined configuration rewriters, you need to list them here, in the order in
which they should be applied. The value of this property is a comma-separated list of
<span class="rewriter">rewriter-name</span> tokens.
</td>
</tr>
<tr>
<td class="property" id="job-systemstreampartition-grouper-factory">job.systemstreampartition.<br>grouper.factory</td>
<td class="default">org.apache.samza.<br>container.grouper.stream.<br>GroupByPartitionFactory</td>
<td class="description">
A factory class that is used to determine how input SystemStreamPartitions are grouped together for processing in individual StreamTask instances. The factory must implement the SystemStreamPartitionGrouperFactory interface. Once this configuration is set, it can't be changed, since doing so could violate state semantics, and lead to a loss of data.
<dl>
<dt><code>org.apache.samza.container.grouper.stream.GroupByPartitionFactory</code></dt>
<dd>Groups input stream partitions according to their partition number. This grouping leads to a single StreamTask processing all messages for a single partition (e.g. partition 0) across all input streams that have a partition 0. Therefore, the default is that you get one StreamTask for all input partitions with the same partition number. Using this strategy, if two input streams have a partition 0, then messages from both partitions will be routed to a single StreamTask. This partitioning strategy is useful for joining and aggregating streams.</dt>
<dt><code>org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory</code></dt>
<dd>Assigns each SystemStreamPartition to its own unique StreamTask. The GroupBySystemStreamPartitionFactory is useful in cases where you want increased parallelism (more containers), and don't care about co-locating partitions for grouping or joins, since it allows for a greater number of StreamTasks to be divided up amongst Samza containers.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="job-systemstreampartition-matcher-class">job.systemstreampartition.<br>matcher.class</td>
<td class="default"></td>
<td class="description">
If you want to enable static partition assignment, then this is a <strong>required</strong> configuration.
The value of this property is a fully-qualified Java class name that implements the interface
<code>org.apache.samza.system.SystemStreamPartitionMatcher</code>.
Samza ships with two matcher classes:
<dl>
<dt><code>org.apache.samza.system.RangeSystemStreamPartitionMatcher</code></dt>
<dd>This classes uses a comma separated list of range(s) to determine which partition matches,
and thus statically assigned to the Job. For example "2,3,1-2", statically assigns partition
1, 2, and 3 for all the specified system and streams (topics in case of Kafka) to the job.
For config validation each element in the comma separated list much conform to one of the
following regex:
<ul>
<li><code>"(\\d+)"</code> or </li>
<li><code>"(\\d+-\\d+)"</code> </li>
</ul>
<code>JobConfig.SSP_MATCHER_CLASS_RANGE</code> constant has the canonical name of this class.
</dd>
</dl>
<dl>
<dt><code>org.apache.samza.system.RegexSystemStreamPartitionMatcher</code></dt>
<dd>This classes uses a standard Java supported regex to determine which partition matches,
and thus statically assigned to the Job. For example "[1-2]", statically assigns partition 1 and 2
for all the specified system and streams (topics in case of Kafka) to the job.
<code>JobConfig.SSP_MATCHER_CLASS_REGEX</code> constant has the canonical name of this class.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="job_systemstreampartition_matcher_config_range">job.systemstreampartition.<br>matcher.config.range</td>
<td class="default"></td>
<td class="description">
If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is
<code>org.apache.samza.system.RangeSystemStreamPartitionMatcher</code>, then this property is a
<strong>required</strong> configuration. Specify a comma separated list of range(s) to determine which
partition matches, and thus statically assigned to the Job. For example "2,3,11-20", statically assigns
partition 2, 3, and 11 to 20 for all the specified system and streams (topics in case of Kafka) to the job.
A single configuration value like "19" is valid as well. This statically assigns partition 19.
For config validation each element in the comma separated list much conform to one of the
following regex:
<ul>
<li><code>"(\\d+)"</code> or </li>
<li><code>"(\\d+-\\d+)"</code> </li>
</ul>
</td>
</tr>
<tr>
<td class="property" id="job_systemstreampartition_matcher_config_regex">job.systemstreampartition.<br>matcher.config.regex</td>
<td class="default"></td>
<td class="description">
If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is
<code>org.apache.samza.system.RegexSystemStreamPartitionMatcher</code>, then this property is a
<strong>required</strong> configuration. The value should be a valid Java supported regex. For example "[1-2]",
statically assigns partition 1 and 2 for all the specified system and streams (topics in case of Kakfa) to the job.
</td>
</tr>
<tr>
<td class="property" id="job_systemstreampartition_matcher_config_job_factory_regex">job.systemstreampartition.<br>matcher.config.job.factory.regex</td>
<td class="default"></td>
<td class="description">
This configuration can be used to specify the Java supported regex to match the <code>StreamJobFactory</code>
for which the static partition assignment should be enabled. This configuration enables the partition
assignment feature to be used for custom <code>StreamJobFactory</code>(ies) as well.
<p>
This config defaults to the following value:
<code>"org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)"</code>,
which enables static partition assignment when <code>job.factory.class</code> is set to
<code>org.apache.samza.job.local.ProcessJobFactory</code> or <code>org.apache.samza.job.local.ThreadJobFactory.</code>
</p>
</td>
</tr>
<tr>
<td class="property" id="job-checkpoint-validation-enabled">job.checkpoint.<br>validation.enabled</td>
<td class="default">true</td>
<td class="description">
This setting controls if the job should fail(true) or just warn(false) in case the validation of checkpoint partition number fails. <br/> <b>CAUTION</b>: this configuration needs to be used w/ care. It should only be used as a work-around after the checkpoint has been auto-created with wrong number of partitions by mistake.
</td>
</tr>
<tr>
<td class="property" id="job-security-manager-factory">job.security.manager.factory</td>
<td class="default"></td>
<td class="description">
This is the factory class used to create the proper <a href="../api/javadocs/org/apache/samza/container/SecurityManager.html">SecurityManager</a> to handle security for Samza containers when running in a secure environment, such as Yarn with Kerberos eanbled.
Samza ships with one security manager by default:
<dl>
<dt><code>org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory</code></dt>
<dd>Supports Samza containers to run properly in a Kerberos enabled Yarn cluster. Each Samza container, once started, will create a <a href="../api/javadocs/org/apache/samza/job/yarn/SamzaContainerSecurityManager.html">SamzaContainerSecurityManager</a>. SamzaContainerSecurityManager runs on its separate thread and update user's delegation tokens at the interval specified by <a href="#yarn-token-renewal-interval-seconds" class="property">yarn.token.renewal.interval.seconds</a>. See <a href="../yarn/yarn-security.html">Yarn Security</a> for details.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="job-container-count">job.container.count</td>
<td class="default">1</td>
<td class="description">
The number of YARN containers to request for running your job. This is the main parameter
for controlling the scale (allocated computing resources) of your job: to increase the
parallelism of processing, you need to increase the number of containers. The minimum is one
container, and the maximum number of containers is the number of task instances (usually the
<a href="../container/samza-container.html#tasks-and-partitions">number of input stream partitions</a>).
Task instances are evenly distributed across the number of containers that you specify.
</td>
</tr>
<tr>
<td class="property" id="job-container-thread-pool-size">job.container.thread.pool.size</td>
<td class="default"></td>
<td class="description">
If configured, the container thread pool will be used to run synchronous operations of each task in parallel. The operations include StreamTask.process(), WindowableTask.window(), and internally Task.commit(). Note that the thread pool is not applicable to AsyncStremTask.processAsync(). The size should always be greater than zero. If not configured, all task operations will run in a single thread.
</td>
</tr>
<tr>
<td class="property" id="job-host_affinity-enabled">job.host-affinity.enabled</td>
<td class="default">false</td>
<td class="description">
This property indicates whether host-affinity is enabled or not. Host-affinity refers to the ability of Samza to request and allocate a container on the same host every time the job is deployed.
When host-affinity is enabled, Samza makes a "best-effort" to honor the host-affinity constraint.
The property <a href="#cluster-manager-container-request-timeout-ms" class="property">cluster-manager.container.request.timeout.ms</a> determines how long to wait before de-prioritizing the host-affinity constraint and assigning the container to any available resource.
<b>Please Note</b>: This feature is tested to work with the FairScheduler in Yarn when continuous-scheduling is enabled.
</td>
</tr>
<tr>
<td class="property" id="job-changelog-system">job.changelog.system</td>
<td class="default"></td>
<td class="description">
This property specifies a default system for changelog, which will be used with the stream specified in
<a href="#stores-changelog" class="property">stores.store-name.changelog</a> config.
You can override this system by specifying both the system and the stream in
<a href="#stores-changelog" class="property">stores.store-name.changelog</a>.
</td>
</tr>
<tr>
<td class="property" id="job.coordinator.factory">job.coordinator.factory</td>
<td class="default">org.apache.samza.zk.ZkJobCoordinatorFactory</td>
<td class="description">
The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator.
The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors. <br>
Samza supports the following coordination modes out of the box.
<dl>
<dt><code>org.apache.samza.standalone.PassthroughJobCoordinatorFactory</code></dt>
<dd>Fixed partition mapping. No Zoookeeper. </dd>
<dt><code>org.apache.samza.zk.ZkJobCoordinatorFactory</code></dt>
<dd>Zookeeper-based coordination. </dd>
<dt><code>org.apache.samza.AzureJobCoordinatorFactory</code></dt>
<dd>Azure-based coordination</dl>
Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
</td>
</tr>
<tr>
<td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td>
<td class="default">
<i>user.dir</i> environment property if set, else current working directory of the process
</td>
<td class="description">
The base directory for changelog stores used by Samza application. Another way to configure the base directory is by setting environment variable <i>LOGGED_STORE_BASE_DIR</i>.
<b>Note:</b> The environment variable takes precedence over <i>job.logged.store.base.dir</i>.
<br>By opting in, users are responsible for cleaning up the store directories if necessary. Jobs using host affinity should ensure that the stores are persisted across application/container restarts.
This means that the location and cleanup of this directory should be separate from the container lifecycle and resource cleanup.
</td>
</tr>
<tr>
<td class="property" id="job.non-logged.store.base.dir">job.non-logged.store.base.dir</td>
<td class="default">
<i>user.dir</i> environment property if set, else current working directory of the process
</td>
<td class="description">
The base directory for non-changelog stores used by Samza application.
<br>In YARN, the default behaviour without the configuration is to create non-changelog store directories in CWD which happens to be the YARN container directory.
This gets cleaned up periodically as part of NodeManager's deletion service, which is controlled by the YARN config <i>yarn.nodemanager.delete.debug-delay-sec</i>.
<br>In non-YARN deployment models or when using a different directory other than YARN container directory, stores need to be cleaned up periodically.
</td>
</tr>
<tr>
<!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
<th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>
</tr>
<tr>
<td class="property" id="job.coordinator.zk.connect">job.coordinator.zk.connect</td>
<td class="default"></td>
<td class="description">
<strong>Required</strong> for applications with Zookeeper-based coordination. Zookeeper coordinates (in "host:port[/znode]" format) to be used for coordination.
</td>
</tr>
<tr>
<td class="property" id="job.coordinator.zk.session.timeout.ms">job.coordinator.zk.session.timeout.ms</td>
<td class="default"> 30000 </td>
<td class="description">
Zookeeper session timeout for all the ZK connections in milliseconds. Session timeout controls how long zk client will wait before throwing an exception, when it cannot talk to one of ZK servers.
</td>
</tr>
<tr>
<td class="property" id="job.coordinator.zk.connection.timeout.ms">job.coordinator.zk.connection.timeout.ms</td>
<td class="default"> 60000 </td>
<td class="description">
Zookeeper connection timeout in milliseconds. Zk connection timeout controls how long client tries to connect to ZK server before giving up.
</td>
</tr>
<tr>
<td class="property" id="job.coordinator.zk.new.consensus.timeout.ms">job.coordinator.zk.consensus.timeout.ms</td>
<td class="default"> 40000 </td>
<td class="description">
How long each processor will wait for all the processors to report acceptance of the new job model before rolling back.
</td>
</tr>
<tr>
<td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td>
<td class="default"> 20000 </td>
<td class="description">
How long the Leader processor will wait before recalculating the JobModel on change of registered processors.
</td>
</tr>
<th colspan="3" class="section" id="AzureBasedJobCoordination"><a href="../index.html">Azure-based job configuration</a></th>
</tr>
<tr>
<td class="property" id="azure.storage.connect">azure.storage.connect</td>
<td class="default"></td>
<td class="description">
<strong>Required</strong> for applications with Azure-based coordination. This is the storage connection string related to every Azure account. It is of the format: "DefaultEndpointsProtocol=https;AccountName=&ltInsert your account name&gt;;AccountKey=&ltInsert your account key&gt;"
</td>
</tr>
<tr>
<td class="property" id="job.coordinator.azure.blob.length">job.coordinator.azure.blob.length</td>
<td class="default"> 5120000 </td>
<td class="description">
Length in bytes, of the page blob on which the leader stores the shared data. Different types of data is stored on different pages with predefined lengths. The offsets of these pages are dependent on the total page blob length.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th>
</tr>
<tr>
<td class="property" id="task-class">task.class</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The fully-qualified name of the Java class which processes
incoming messages from input streams. The class must implement
<a href="../api/javadocs/org/apache/samza/task/StreamTask.html">StreamTask</a> or
<a href="../api/javadocs/org/apache/samza/task/AsyncStreamTask.html">AsyncStreamTask</a>,
and may optionally implement
<a href="../api/javadocs/org/apache/samza/task/InitableTask.html">InitableTask</a>,
<a href="../api/javadocs/org/apache/samza/task/ClosableTask.html">ClosableTask</a> and/or
<a href="../api/javadocs/org/apache/samza/task/WindowableTask.html">WindowableTask</a>.
The class will be instantiated several times, once for every
<a href="../container/samza-container.html#tasks-and-partitions">input stream partition</a>.
</td>
</tr>
<tr>
<td class="property" id="task-inputs">task.inputs</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> A comma-separated list of streams that are consumed by this job.
Each stream is given in the format
<span class="system">system-name</span>.<span class="stream">stream-name</span>.
For example, if you have one input system called <code>my-kafka</code>, and want to consume two
Kafka topics called <code>PageViewEvent</code> and <code>UserActivityEvent</code>, then you would set
<code>task.inputs=my-kafka.PageViewEvent, my-kafka.UserActivityEvent</code>.
</td>
</tr>
<tr>
<td class="property" id="task-window-ms">task.window.ms</td>
<td class="default">-1</td>
<td class="description">
If <a href="#task-class" class="property">task.class</a> implements
<a href="../api/javadocs/org/apache/samza/task/WindowableTask.html">WindowableTask</a>, it can
receive a <a href="../container/windowing.html">windowing callback</a> in regular intervals.
This property specifies the time between window() calls, in milliseconds. If the number is
negative (the default), window() is never called. Note that Samza is
<a href="../container/event-loop.html">single-threaded</a>, so a window() call will never
occur concurrently with the processing of a message. If a message is being processed at the
time when a window() call is due, the window() call occurs after the processing of the current
message has completed.
</td>
</tr>
<tr>
<td class="property" id="task-checkpoint-factory">task.checkpoint.factory</td>
<td class="default"></td>
<td class="description">
To enable <a href="../container/checkpointing.html">checkpointing</a>, you must set
this property to the fully-qualified name of a Java class that implements
<a href="../api/javadocs/org/apache/samza/checkpoint/CheckpointManagerFactory.html">CheckpointManagerFactory</a>.
This is not required, but recommended for most jobs. If you don't configure checkpointing,
and a job or container restarts, it does not remember which messages it has already processed.
Without checkpointing, consumer behavior is determined by the
<a href="#systems-samza-offset-default" class="property">...samza.offset.default</a>
setting, which by default skips any messages that were published while the container was
restarting. Checkpointing allows a job to start up where it previously left off.
Samza ships with two checkpoint managers by default:
<dl>
<dt><code>org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory</code></dt>
<dd>Writes checkpoints to files on the local filesystem. You can configure the file path
with the <a href="#task-checkpoint-path" class="property">task.checkpoint.path</a>
property. This is a simple option if your job always runs on the same machine.
On a multi-machine cluster, this would require a network filesystem mount.</dd>
<dt><code>org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory</code></dt>
<dd>Writes checkpoints to a dedicated topic on a Kafka cluster. This is the recommended
option if you are already using Kafka for input or output streams. Use the
<a href="#task-checkpoint-system" class="property">task.checkpoint.system</a>
property to configure which Kafka cluster to use for checkpoints.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="task-commit-ms">task.commit.ms</td>
<td class="default">60000</td>
<td class="description">
If <a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a> is
configured, this property determines how often a checkpoint is written. The value is
the time between checkpoints, in milliseconds. The frequency of checkpointing affects
failure recovery: if a container fails unexpectedly (e.g. due to crash or machine failure)
and is restarted, it resumes processing at the last checkpoint. Any messages processed
since the last checkpoint on the failed container are processed again. Checkpointing
more frequently reduces the number of messages that may be processed twice, but also
uses more resources.
</td>
</tr>
<tr>
<td class="property" id="task-command-class">task.command.class</td>
<td class="default">org.apache.samza.job.<br>ShellCommandBuilder</td>
<td class="description">
The fully-qualified name of the Java class which determines the command line and environment
variables for a <a href="../container/samza-container.html">container</a>. It must be a subclass of
<a href="../api/javadocs/org/apache/samza/job/CommandBuilder.html">CommandBuilder</a>.
This defaults to <code>task.command.class=org.apache.samza.job.ShellCommandBuilder</code>.
</td>
</tr>
<tr>
<td class="property" id="task-opts">task.opts</td>
<td class="default"></td>
<td class="description">
Any JVM options to include in the command line when executing Samza containers. For example,
this can be used to set the JVM heap size, to tune the garbage collector, or to enable
<a href="/learn/tutorials/{{site.version}}/remote-debugging-samza.html">remote debugging</a>.
This cannot be used when running with <code>ThreadJobFactory</code>. Anything you put in
<code>task.opts</code> gets forwarded directly to the commandline as part of the JVM invocation.
<dl>
<dt>Example: <code>task.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC</code></dt>
</dl>
</td>
</tr>
<tr>
<td class="property" id="task-java-home">task.java.home</td>
<td class="default"></td>
<td class="description">
The JAVA_HOME path for Samza containers. By setting this property, you can use a java version that is
different from your cluster's java version. Remember to set the <code>yarn.am.java.home</code> as well.
<dl>
<dt>Example: <code>task.java.home=/usr/java/jdk1.8.0_05</code></dt>
</dl>
</td>
</tr>
<tr>
<td class="property" id="task-execute">task.execute</td>
<td class="default">bin/run-container.sh</td>
<td class="description">
The command that starts a Samza container. The script must be included in the
<a href="packaging.html">job package</a>. There is usually no need to customize this.
</td>
</tr>
<tr>
<td class="property" id="task-chooser-class">task.chooser.class</td>
<td class="default">org.apache.samza.<br>system.chooser.<br>RoundRobinChooserFactory</td>
<td class="description">
This property can be optionally set to override the default
<a href="../container/streams.html#messagechooser">message chooser</a>, which determines the
order in which messages from multiple input streams are processed. The value of this
property is the fully-qualified name of a Java class that implements
<a href="../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html">MessageChooserFactory</a>.
</td>
</tr>
<tr>
<td class="property" id="task-drop-deserialization-errors">task.drop.deserialization.errors</td>
<td class="default"></td>
<td class="description">
This property is to define how the system deals with deserialization failure situation. If set to true, the system will
skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
is false.
</td>
</tr>
<tr>
<td class="property" id="task-drop-serialization-errors">task.drop.serialization.errors</td>
<td class="default"></td>
<td class="description">
This property is to define how the system deals with serialization failure situation. If set to true, the system will
drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
is false.
</td>
</tr>
<tr>
<td class="property" id="task-drop-producer-errors">task.drop.producer.errors</td>
<td class="default">false</td>
<td class="description">
If true, producer errors will be logged and ignored. The only exceptions that will be thrown are
those which are likely caused by the application itself (e.g. serializaiton errors). If false,
the producer will be closed and producer errors will be propagated upward until the container
ultimately fails. Failing the container is a safety precaution to ensure the latest checkpoints
only reflect the events that have been completely and successfully processed. However, some
applications prefer to remain running at all costs, even if that means lost messages. Setting
this property to true will enable applications to recover from producer errors at the expense of
one or many (in the case of batching producers) dropped messages. If you enable this, it is highly
recommended that you also configure alerting on the 'producer-send-failed' metric, since the
producer might drop messages indefinitely. The logic for this property is specific to each
SystemProducer implementation. It will have no effect for SystemProducers that ignore the property.
</td>
</tr>
<tr>
<td class="property" id="task-log4j-system">task.log4j.system</td>
<td class="default"></td>
<td class="description">
Specify the system name for the StreamAppender. If this property is not specified in the config,
Samza throws exception. (See
<a href="logging.html#stream-log4j-appender">Stream Log4j Appender</a>)
<dl>
<dt>Example: <code>task.log4j.system=kafka</code></dt>
</dl>
</td>
</tr>
<tr>
<td class="property" id="task-log4j-location-info-enabled">task.log4j.location.info.enabled</td>
<td class="default">false</td>
<td class="description">
Defines whether or not to include log4j's LocationInfo data in Log4j StreamAppender messages. LocationInfo includes
information such as the file, class, and line that wrote a log message. This setting is only active if the Log4j
stream appender is being used. (See <a href="logging.html#stream-log4j-appender">Stream Log4j Appender</a>)
<dl>
<dt>Example: <code>task.log4j.location.info.enabled=true</code></dt>
</dl>
</td>
</tr>
<tr>
<td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td>
<td class="default"></td>
<td class="description">
Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining
buffered messages to process for any input SystemStreamPartition. The second condition arises when some input
SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how
often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty
SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions
will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this
value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing
CPU and network utilization.
</td>
</tr>
<tr>
<td class="property" id="task-max-idle-ms">task.max.idle.ms</td>
<td class="default">10</td>
<td class="description">
The maximum time to wait for a task worker to complete when there are no new messages to handle before resuming the main
loop and potentially polling for more messages. See <a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a>
This timeout value prevents the main loop from spinning when there is nothing for it to do. Increasing this value will reduce
the background load of the thread, but, also potentially increase message latency. It should not be set greater than the
<a href="#task-poll-interval-ms" class="property">task.poll.interval.ms</a>.
</td>
</tr>
<tr>
<td class="property" id="task-ignored-exceptions">task.ignored.exceptions</td>
<td class="default"></td>
<td class="description">
This property specifies which exceptions should be ignored if thrown in a task's <code>process</code> or <code>window</code>
methods. The exceptions to be ignored should be a comma-separated list of fully-qualified class names of the exceptions or
<code>*</code> to ignore all exceptions.
</td>
</tr>
<tr>
<td class="property" id="task-shutdown-ms">task.shutdown.ms</td>
<td class="default">30000</td>
<td class="description">
This property controls how long the Samza container will wait for an orderly shutdown of task instances.
</td>
</tr>
<tr>
<td class="property" id="task-name-grouper-factory">task.name.grouper.factory</td>
<td class="default">org.apache.samza.<br>container.grouper.task.<br>GroupByContainerCountFactory</td>
<td class="description">
The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper.
The default configuration value if the property is not present is <code>task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory</code>.<br>
The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks.
<p><strong>Note:</strong> For non-cluster applications (ones using coordination service) one must use <i>org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</i>
</td>
</tr>
<tr>
<td class="property" id="task-broadcast-inputs">task.broadcast.inputs</td>
<td class="default"></td>
<td class="description">
This property specifies the partitions that all tasks should consume. The systemStreamPartitions you put
here will be sent to all the tasks.
<dl>
<dt>Format: <span class="system">system-name</span>.<span class="stream">stream-name</span>#<i>partitionId</i>
or <span class="system">system-name</span>.<span class="stream">stream-name</span>#[<i>startingPartitionId</i>-<i>endingPartitionId</i>]</dt>
</dl>
<dl>
<dt>Example: <code>task.broadcast.inputs=mySystem.broadcastStream#[0-2], mySystem.broadcastStream#0</code></dt>
</dl>
</td>
</tr>
<tr>
<td class="property" id="task-max-concurrency">task.max.concurrency</td>
<td class="default">1</td>
<td class="description">
Max number of outstanding messages being processed per task at a time, and it’s applicable to both StreamTask and AsyncStreamTask. The values can be:
<dl>
<dt><code>1</code></dt>
<dd>Each task processes one message at a time. Next message will wait until the current message process completes. This ensures strict in-order processing.</dd>
<dt><code>&gt1</code></dt>
<dd>Multiple outstanding messages are allowed to be processed per task at a time. The completion can be out of order. This option increases the parallelism within a task, but may result in out-of-order processing.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="task-callback-timeout-ms">task.callback.timeout.ms</td>
<td class="default"></td>
<td class="description">
This property is for AsyncStreamTask only. It defines the max time interval from processAsync() to callback is fired. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. Default is no timeout.
</td>
</tr>
<tr>
<td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
<td class="default">1</td>
<td class="description">
If set to a positive integer, the task will try to consume
<a href="../container/streams.html#batching">batches</a> with the given number of messages
from each input stream, rather than consuming round-robin from all the input streams on
each individual message. Setting this property can improve performance in some cases.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="systems">Systems</th>
</tr>
<tr>
<td class="property" id="systems-samza-factory">systems.<span class="system">system-name</span>.<br>samza.factory</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The fully-qualified name of a Java class which provides a
<em>system</em>. A system can provide input streams which you can consume in your Samza job,
or output streams to which you can write, or both. The requirements on a system are very
flexible &mdash; it may connect to a message broker, or read and write files, or use a database,
or anything else. The class must implement
<a href="../api/javadocs/org/apache/samza/system/SystemFactory.html">SystemFactory</a>.
Samza ships with the following implementations:
<dl>
<dt><code>org.apache.samza.system.kafka.KafkaSystemFactory</code></dt>
<dd>Connects to a cluster of <a href="http://kafka.apache.org/">Kafka</a> brokers, allows
Kafka topics to be consumed as streams in Samza, allows messages to be published to
Kafka topics, and allows Kafka to be used for checkpointing (see
<a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a>).
See also <a href="#kafka">configuration of a Kafka system</a>.</dd>
<dt><code>org.apache.samza.system.filereader.FileReaderSystemFactory</code></dt>
<dd>Reads data from a file on the local filesystem (the stream name is the path of the
file to read). The file is read as ASCII, and treated as a stream of messages separated
by newline (<code>\n</code>) characters. A task can consume each line of the file as
a <code>java.lang.String</code> object. This system does not provide output streams.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="systems-default-stream">systems.<span class="system">system-name</span>.<br>default.stream.*</td>
<td class="default"></td>
<td class="description">
A set of default properties for any stream associated with the system. For example, if
"systems.kafka-system.default.stream.replication.factor"=2 was configured, then every Kafka stream
created on the kafka-system will have a replication factor of 2 unless the property is explicitly
overridden at the stream scope using <a href="#streams-properties">streams properties</a>.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-key-serde">systems.<span class="system">system-name</span>.<br>default.stream.samza.key.serde</td>
<td class="default"></td>
<td class="description">
The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
<em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
output streams. This property defines the serde for an for all streams in the system. See the
<a href="#streams-samza-key-serde">stream-scoped property</a> to define the serde for an
individual stream. If both are defined, the stream-level definition takes precedence.
The value of this property must be a <span class="serde">serde-name</span> that is registered
with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
If this property is not set, messages are passed unmodified between the input stream consumer,
the task and the output stream producer.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-msg-serde">systems.<span class="system">system-name</span>.<br>default.stream.samza.msg.serde</td>
<td class="default"></td>
<td class="description">
The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
<em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
output streams. This property defines the serde for an for all streams in the system. See the
<a href="#streams-samza-msg-serde">stream-scoped property</a> to define the serde for an
individual stream. If both are defined, the stream-level definition takes precedence.
The value of this property must be a <span class="serde">serde-name</span> that is registered
with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
If this property is not set, messages are passed unmodified between the input stream consumer,
the task and the output stream producer.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>default.stream.samza.offset.default</td>
<td class="default">upcoming</td>
<td class="description">
If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
this property determines where in the input stream we should start consuming. The value must be an
<a href="../api/javadocs/org/apache/samza/system/SystemStreamMetadata.OffsetType.html">OffsetType</a>,
one of the following:
<dl>
<dt><code>upcoming</code></dt>
<dd>Start processing messages that are published after the job starts. Any messages published while
the job was not running are not processed.</dd>
<dt><code>oldest</code></dt>
<dd>Start processing at the oldest available message in the system, and
<a href="reprocessing.html">reprocess</a> the entire available message history.</dd>
</dl>
This property is for all streams within a system. To set it for an individual stream, see
<a href="#streams-samza-offset-default">streams.<span class="stream">stream-id</span>.<br>samza.offset.default</a>
If both are defined, the stream-level definition takes precedence.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-delete-committed-messages">systems.<span class="system">system-name</span>.<br>samza.delete.committed.messages</td>
<td class="default">false</td>
<td class="description">
If set to true, automatically delete committed messages from streams whose committed messages can be deleted.
A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually
set <a href="#streams-samza-delete-committed-messages">streams.<span class="stream">stream-id</span>.<br>samza.delete.committed.messages</a> to true in the configuration.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-key-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
<td class="default" rowspan="2"></td>
<td class="description">
This is deprecated in favor of <a href="#systems-samza-key-serde" class="property">
systems.<span class="system">system-name</span>.default.stream.samza.key.serde</a>.
</td>
</tr>
<tr>
<td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td>
<td class="description">
This is deprecated in favor of <a href="#streams-samza-key-serde" class="property">
streams.<span class="stream">stream-id</span>.samza.key.serde</a>.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-msg-serde-legacy">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td>
<td class="default" rowspan="2"></td>
<td class="description">
This is deprecated in favor of <a href="#systems-samza-msg-serde" class="property">
systems.<span class="system">system-name</span>.default.stream.samza.msg.serde</a>.
</td>
</tr>
<tr>
<td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td>
<td class="description">
This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property">
streams.<span class="stream">stream-id</span>.samza.msg.serde</a>.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-offset-default-legacy">systems.<span class="system">system-name</span>.<br>samza.offset.default</td>
<td class="default" rowspan="2">upcoming</td>
<td class="description">
This is deprecated in favor of <a href="#systems-samza-offset-default" class="property">
systems.<span class="system">system-name</span>.default.stream.samza.offset.default</a>.
</td>
</tr>
<tr>
<td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td>
<td class="description">
This is deprecated in favor of <a href="#streams-samza-offset-default" class="property">
streams.<span class="stream">stream-id</span>.samza.offset.default</a>.
</td>
</tr>
<tr>
<td class="property" id="systems-streams-samza-reset-offset-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.reset.offset</td>
<td>false</td>
<td>
This is deprecated in favor of <a href="#streams-samza-reset-offset" class="property">
streams.<span class="stream">stream-id</span>.samza.reset.offset</a>.
</td>
</tr>
<tr>
<td class="property" id="systems-streams-samza-priority-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.priority</td>
<td>-1</td>
<td>
This is deprecated in favor of <a href="#streams-samza-priority" class="property">
streams.<span class="stream">stream-id</span>.samza.priority</a>.
</td>
</tr>
<tr>
<td class="property" id="systems-streams-samza-bootstrap-legacy">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.bootstrap</td>
<td>false</td>
<td>
This is deprecated in favor of <a href="#streams-samza-bootstrap" class="property">
streams.<span class="stream">stream-id</span>.samza.bootstrap</a>.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="streams"><a href="../container/streams.html">Streams</a></th>
</tr>
<tr>
<td class="property" id="streams-system">streams.<span class="stream">stream-id</span>.<br>samza.system</td>
<td class="default"></td>
<td class="description">
The <span class="system">system-name</span> of the system on which this stream will be accessed.
This property binds the stream to one of the systems defined with the property
systems.<span class="system">system-name</span>.samza.factory. <br>
If this property isn't specified, it is inherited from job.default.system.
</td>
</tr>
<tr>
<td class="property" id="streams-physical-name">streams.<span class="stream">stream-id</span>.<br>samza.physical.name</td>
<td class="default"></td>
<td class="description">
The physical name of the stream on the system on which this stream will be accessed.
This is opposed to the stream-id which is the logical name that Samza uses to identify the stream.
A physical name could be a Kafka topic name, an HDFS file URN or any other system-specific identifier.
</td>
</tr>
<tr>
<td class="property" id="streams-samza-key-serde">streams.<span class="stream">stream-id</span>.<br>samza.key.serde</td>
<td class="default"></td>
<td class="description">
The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
<em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
output streams. This property defines the serde for an individual stream. See the
<a href="#systems-samza-key-serde">system-scoped property</a> to define the serde for all
streams within a system. If both are defined, the stream-level definition takes precedence.
The value of this property must be a <span class="serde">serde-name</span> that is registered
with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
If this property is not set, messages are passed unmodified between the input stream consumer,
the task and the output stream producer.
</td>
</tr>
<tr>
<td class="property" id="streams-samza-msg-serde">streams.<span class="stream">stream-id</span>.<br>samza.msg.serde</td>
<td class="default"></td>
<td class="description">
The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
<em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
output streams. This property defines the serde for an individual stream. See the
<a href="#systems-samza-msg-serde">system-scoped property</a> to define the serde for all
streams within a system. If both are defined, the stream-level definition takes precedence.
The value of this property must be a <span class="serde">serde-name</span> that is registered
with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
If this property is not set, messages are passed unmodified between the input stream consumer,
the task and the output stream producer.
</td>
</tr>
<tr>
<td class="property" id="streams-samza-offset-default">streams.<span class="stream">stream-id</span>.<br>samza.offset.default</td>
<td class="default">upcoming</td>
<td class="description">
If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
this property determines where in the input stream we should start consuming. The value must be an
<a href="../api/javadocs/org/apache/samza/system/SystemStreamMetadata.OffsetType.html">OffsetType</a>,
one of the following:
<dl>
<dt><code>upcoming</code></dt>
<dd>Start processing messages that are published after the job starts. Any messages published while
the job was not running are not processed.</dd>
<dt><code>oldest</code></dt>
<dd>Start processing at the oldest available message in the system, and
<a href="reprocessing.html">reprocess</a> the entire available message history.</dd>
</dl>
This property is for an individual stream. To set it for all streams within a system, see
<a href="#systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>samza.offset.default</a>
If both are defined, the stream-level definition takes precedence.
</td>
</tr>
<tr>
<td class="property" id="streams-samza-delete-committed-messages">streams.<span class="stream">stream-id</span>.<br>samza.delete.committed.messages</td>
<td class="default">false</td>
<td class="description">
If set to true, committed messages of this stream can be deleted. Committed messages of this stream will be deleted
if <a href="#systems-samza-delete-committed-messages">systems.<span class="system">system-name</span>.samza.delete.committed.messages</a> is also
set to true.
</td>
</tr>
<tr>
<td class="property" id="streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
<td class="default">false</td>
<td class="description">
If set to <code>true</code>, when a Samza container starts up, it ignores any
<a href="../container/checkpointing.html">checkpointed offset</a> for this particular input
stream. Its behavior is thus determined by the <code>samza.offset.default</code> setting.
Note that the reset takes effect <em>every time a container is started</em>, which may be
every time you restart your job, or more frequently if a container fails and is restarted
by the framework.
</td>
</tr>
<tr>
<td class="property" id="streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td>
<td class="default">-1</td>
<td class="description">
If one or more streams have a priority set (any positive integer), they will be processed
with <a href="../container/streams.html#prioritizing-input-streams">higher priority</a> than the other streams.
You can set several streams to the same priority, or define multiple priority levels by
assigning a higher number to the higher-priority streams. If a higher-priority stream has
any messages available, they will always be processed first; messages from lower-priority
streams are only processed when there are no new messages on higher-priority inputs.
</td>
</tr>
<tr>
<td class="property" id="streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td>
<td class="default">false</td>
<td class="description">
If set to <code>true</code>, this stream will be processed as a
<a href="../container/streams.html#bootstrapping">bootstrap stream</a>. This means that every time
a Samza container starts up, this stream will be fully consumed before messages from any
other stream are processed.
</td>
</tr>
<td class="property" id="streams-samza-broadcast">streams.<span class="stream">stream-id</span>.<br>samza.broadcast</td>
<td class="default">false</td>
<td class="description">
If set to <code>true</code>, this stream will be broadcasted to all the tasks.
</td>
</tr>
<tr>
<td class="property" id="streams-properties">streams.<span class="stream">stream-id</span>.*</td>
<td class="default"></td>
<td class="description">
Any properties of the stream. These are typically system-specific and can be used by the system
for stream creation or validation. Note that the other properties are prefixed with <em>samza.</em>
which distinguishes them as Samza properties that are not system-specific.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="serdes"><a href="../container/serialization.html">Serializers/Deserializers (Serdes)</a></th>
</tr>
<tr>
<td class="property" id="serializers-registry-class">serializers.registry.<br><span class="serde">serde-name</span>.class</td>
<td class="default"></td>
<td class="description">
Use this property to register a <a href="../container/serialization.html">serializer/deserializer</a>,
which defines a way of encoding application objects as an array of bytes (used for messages
in streams, and for data in persistent storage). You can give a serde any
<span class="serde">serde-name</span> you want, and reference that name in properties like
<a href="#systems-samza-key-serde" class="property">systems.*.samza.key.serde</a>,
<a href="#systems-samza-msg-serde" class="property">systems.*.samza.msg.serde</a>,
<a href="#streams-samza-key-serde" class="property">streams.*.samza.key.serde</a>,
<a href="#streams-samza-msg-serde" class="property">streams.*.samza.msg.serde</a>,
<a href="#stores-key-serde" class="property">stores.*.key.serde</a> and
<a href="#stores-msg-serde" class="property">stores.*.msg.serde</a>.
The value of this property is the fully-qualified name of a Java class that implements
<a href="../api/javadocs/org/apache/samza/serializers/SerdeFactory.html">SerdeFactory</a>.
Samza ships with several serdes:
<dl>
<dt><code>org.apache.samza.serializers.ByteSerdeFactory</code></dt>
<dd>A no-op serde which passes through the undecoded byte array.</dd>
<dt><code>org.apache.samza.serializers.ByteBufferSerdeFactory</code></dt>
<dd>Encodes <code>java.nio.ByteBuffer</code> objects.</dd>
<dt><code>org.apache.samza.serializers.IntegerSerdeFactory</code></dt>
<dd>Encodes <code>java.lang.Integer</code> objects as binary (4 bytes fixed-length big-endian encoding).</dd>
<dt><code>org.apache.samza.serializers.StringSerdeFactory</code></dt>
<dd>Encodes <code>java.lang.String</code> objects as UTF-8.</dd>
<dt><code>org.apache.samza.serializers.JsonSerdeFactory</code></dt>
<dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.<br/>
Note: This Serde enforces a dash-separated property naming convention, while JsonSerdeV2 doesn't.
This serde is primarily meant for Samza's internal usage, and is publicly available for backwards compatibility.</dd>
<dt><code>org.apache.samza.serializers.JsonSerdeV2Factory</code></dt>
<dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.<br/>
Note: This Serde uses Jackson's default (camelCase) property naming convention. This serde should be <br/>
preferred over JsonSerde, especially in High Level API, unless the dasherized naming convention is required <br/>
(e.g., for backwards compatibility).</dd>
<dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt>
<dd>Encodes <code>java.lang.Long</code> as binary (8 bytes fixed-length big-endian encoding).</dd>
<dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt>
<dd>Encodes <code>java.lang.Double</code> as binary (8 bytes double-precision float point).</dd>
<dt><code>org.apache.samza.serializers.UUIDSerdeFactory</code></dt>
<dd>Encodes <code>java.util.UUID</code> objects.</dd>
<dt><code>org.apache.samza.serializers.SerializableSerdeFactory</code></dt>
<dd>Encodes <code>java.io.Serializable</code> objects.</dd>
<dt><code>org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code></dt>
<dd>Encodes <code>org.apache.samza.metrics.reporter.MetricsSnapshot</code> objects (which are
used for <a href="../container/metrics.html">reporting metrics</a>) as JSON.</dd>
<dt><code>org.apache.samza.serializers.KafkaSerdeFactory</code></dt>
<dd>Adapter which allows existing <code>kafka.serializer.Encoder</code> and
<code>kafka.serializer.Decoder</code> implementations to be used as Samza serdes.
Set serializers.registry.<span class="serde">serde-name</span>.encoder and
serializers.registry.<span class="serde">serde-name</span>.decoder to the appropriate
class names.</dd>
</dl>
</td>
</tr>
<tr>
<th colspan="3" class="section" id="filesystem-checkpoints">
Using the filesystem for checkpoints<br>
<span class="subtitle">
(This section applies if you have set
<a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a>
<code>= org.apache.samza.checkpoint.file.FileSystemCheckpointManagerFactory</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="task-checkpoint-path">task.checkpoint.path</td>
<td class="default"></td>
<td class="description">
Required if you are using the filesystem for checkpoints. Set this to the path on your local filesystem
where checkpoint files should be stored.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="elasticsearch">
Using <a href="https://github.com/elastic/elasticsearch">Elasticsearch</a> for output streams<br>
<span class="subtitle">
(This section applies if you have set
<a href="#systems-samza-factory" class="property">systems.*.samza.factory</a>
<code>= org.apache.samza.system.elasticsearch.ElasticsearchSystemFactory</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="systems-samza-client-factory-class">systems.<span class="system">system-name</span>.<br>client.factory</td>
<td class="default"></td>
<td class="description">
<strong>Required:</strong> The elasticsearch client factory used for connecting
to the Elasticsearch cluster. Samza ships with the following implementations:
<dl>
<dt><code>org.apache.samza.system.elasticsearch.client.TransportClientFactory</code></dt>
<dd>Creates a TransportClient that connects to the cluster remotely without
joining it. This requires the transport host and port properties to be set.</dd>
<dt><code>org.apache.samza.system.elasticsearch.client.NodeClientFactory</code></dt>
<dd>Creates a Node client that connects to the cluster by joining it. By default
this uses <a href="http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html">zen discovery</a> to find the cluster but other methods can be configured.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="systems-samza-index-request-factory-class">systems.<span class="system">system-name</span>.<br>index.request.factory</td>
<td class="default">org.apache.samza.system</br>.elasticsearch.indexrequest.</br>DefaultIndexRequestFactory</td>
<td class="description">
The index request factory that converts the Samza OutgoingMessageEnvelope into the IndexRequest
to be send to elasticsearch. The default IndexRequestFactory behaves as follows:
<dl>
<dt><code>Stream name</code></dt>
<dd>The stream name is of the format {index-name}/{type-name} which
map on to the elasticsearch index and type.</dd>
<dt><code>Message id</code></dt>
<dd>If the message has a key this is set as the document id, otherwise Elasticsearch will generate one for each document.</dd>
<dt><code>Partition id</code></dt>
<dd>If the partition key is set then this is used as the Elasticsearch routing key.</dd>
<dt><code>Message</code></dt>
<dd>The message must be either a byte[] which is passed directly on to Elasticsearch, or a Map which is passed on to the
Elasticsearch client which serialises it into a JSON String. Samza serdes are not currently supported.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="systems-samza-client-host">systems.<span class="system">system-name</span>.<br>client.transport.host</td>
<td class="default"></td>
<td class="description">
<strong>Required</strong> for <code>TransportClientFactory</code>
<p>The hostname that the TransportClientFactory connects to.</p>
</td>
</tr>
<tr>
<td class="property" id="systems-samza-client-port">systems.<span class="system">system-name</span>.<br>client.transport.port</td>
<td class="default"></td>
<td class="description">
<strong>Required</strong> for <code>TransportClientFactory</code>
<p>The port that the TransportClientFactory connects to.</p>
</td>
</tr>
<tr>
<td class="property" id="systems-samza-client-settings">systems.<span class="system">system-name</span>.<br>client.elasticsearch.*</td>
<td class="default"></td>
<td class="description">
Any <a href="http://www.elastic.co/guide/en/elasticsearch/client/java-api/1.x/client.html">Elasticsearch client settings</a> can be used here. They will all be passed to both the transport and node clients.
Some of the common settings you will want to provide are.
<dl>
<dt><code>systems.<span class="system">system-name</span>.client.elasticsearch.cluster.name</code></dt>
<dd>The name of the Elasticsearch cluster the client is connecting to.</dd>
<dt><code>systems.<span class="system">system-name</span>.client.elasticsearch.client.transport.sniff</code></dt>
<dd>If set to <code>true</code> then the transport client will discover and keep
up to date all cluster nodes. This is used for load balancing and fail-over on retries.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="systems-samza-bulk-flush-max-actions">systems.<span class="system">system-name</span>.<br>bulk.flush.max.actions</td>
<td class="default">1000</td>
<td class="description">
The maximum number of messages to be buffered before flushing.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-bulk-flush-max-size-mb">systems.<span class="system">system-name</span>.<br>bulk.flush.max.size.mb</td>
<td class="default">5</td>
<td class="description">
The maximum aggregate size of messages in the buffered before flushing.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-bulk-flush-interval-ms">systems.<span class="system">system-name</span>.<br>bulk.flush.interval.ms</td>
<td class="default">never</td>
<td class="description">
How often buffered messages should be flushed.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="kafka">
Using <a href="http://kafka.apache.org/">Kafka</a> for input streams, output streams and checkpoints<br>
<span class="subtitle">
(This section applies if you have set
<a href="#systems-samza-factory" class="property">systems.*.samza.factory</a>
<code>= org.apache.samza.system.kafka.KafkaSystemFactory</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="systems-samza-consumer-zookeeper-connect">systems.<span class="system">system-name</span>.<br>consumer.zookeeper.connect</td>
<td class="default"></td>
<td class="description">
The hostname and port of one or more Zookeeper nodes where information about the
Kafka cluster can be found. This is given as a comma-separated list of
<code>hostname:port</code> pairs, such as
<code>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</code>.
If the cluster information is at some sub-path of the Zookeeper namespace, you need to
include the path at the end of the list of hostnames, for example:
<code>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181/clusters/my-kafka</code>
</td>
</tr>
<tr>
<td class="property" id="systems-samza-consumer-auto-offset-reset">systems.<span class="system">system-name</span>.<br>consumer.auto.offset.reset</td>
<td class="default">largest</td>
<td class="description">
This setting determines what happens if a consumer attempts to read an offset that is
outside of the current valid range. This could happen if the topic does not exist, or
if a checkpoint is older than the maximum message history retained by the brokers.
This property is not to be confused with
<a href="#systems-samza-offset-default">systems.*.samza.offset.default</a>,
which determines what happens if there is no checkpoint. The following are valid
values for <code>auto.offset.reset</code>:
<dl>
<dt><code>smallest</code></dt>
<dd>Start consuming at the smallest (oldest) offset available on the broker
(process as much message history as available).</dd>
<dt><code>largest</code></dt>
<dd>Start consuming at the largest (newest) offset available on the broker
(skip any messages published while the job was not running).</dd>
<dt>anything else</dt>
<dd>Throw an exception and refuse to start up the job.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="systems-samza-consumer">systems.<span class="system">system-name</span>.<br>consumer.*</td>
<td class="default"></td>
<td class="description">
Any <a href="http://kafka.apache.org/documentation.html#consumerconfigs">Kafka consumer configuration</a>
can be included here. For example, to change the socket timeout, you can set
systems.<span class="system">system-name</span>.consumer.socket.timeout.ms.
(There is no need to configure <code>group.id</code> or <code>client.id</code>,
as they are automatically configured by Samza. Also, there is no need to set
<code>auto.commit.enable</code> because Samza has its own checkpointing mechanism.)
</td>
</tr>
<tr>
<td class="property" id="systems-samza-producer-bootstrap-servers">systems.<span class="system">system-name</span>.<br>producer.bootstrap.servers</td>
<td class="default"></td>
<td class="description">
<b>Note</b>:
<i>This variable was previously defined as "producer.metadata.broker.list", which has been deprecated with this version.</i>
<br />
A list of network endpoints where the Kafka brokers are running. This is given as
a comma-separated list of <code>hostname:port</code> pairs, for example
<code>kafka1.example.com:9092,kafka2.example.com:9092,kafka3.example.com:9092</code>.
It's not necessary to list every single Kafka node in the cluster: Samza uses this
property in order to discover which topics and partitions are hosted on which broker.
This property is needed even if you are only consuming from Kafka, and not writing
to it, because Samza uses it to discover metadata about streams being consumed.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-producer">systems.<span class="system">system-name</span>.<br>producer.*</td>
<td class="default"></td>
<td class="description">
Any <a href="http://kafka.apache.org/documentation.html#newproducerconfigs">Kafka producer configuration</a>
can be included here. For example, to change the request timeout, you can set
systems.<span class="system">system-name</span>.producer.timeout.ms.
(There is no need to configure <code>client.id</code> as it is automatically
configured by Samza.)
</td>
</tr>
<tr>
<td class="property" id="systems-samza-fetch-threshold">systems.<span class="system">system-name</span>.<br>samza.fetch.threshold</td>
<td class="default">50000</td>
<td class="description">
When consuming streams from Kafka, a Samza container maintains an in-memory buffer
for incoming messages in order to increase throughput (the stream task can continue
processing buffered messages while new messages are fetched from Kafka). This
parameter determines the number of messages we aim to buffer across all stream
partitions consumed by a container. For example, if a container consumes 50 partitions,
it will try to buffer 1000 messages per partition by default. When the number of
buffered messages falls below that threshold, Samza fetches more messages from the
Kafka broker to replenish the buffer. Increasing this parameter can increase a job's
processing throughput, but also increases the amount of memory used.
</td>
</tr>
<tr>
<td class="property" id="systems-samza-fetch-threshold-bytes">systems.<span class="system">system-name</span>.<br>samza.fetch.threshold.bytes</td>
<td class="default">-1</td>
<td class="description">
When consuming streams from Kafka, a Samza container maintains an in-memory buffer
for incoming messages in order to increase throughput (the stream task can continue
processing buffered messages while new messages are fetched from Kafka). This
parameter determines the total size of messages we aim to buffer across all stream
partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered
prefetch messages for job as a whole. The bytes for a single system/stream/partition are computed based on this.
This fetches the entire messages, hence this bytes limit is a soft one, and the actual usage can be the bytes
limit + size of max message in the partition for a given stream. If the value of this property is > 0
then this takes precedence over systems.<span class="system">system-name</span>.samza.fetch.threshold.<br>
For example, if fetchThresholdBytes is set to 100000 bytes, and there are 50 SystemStreamPartitions registered,
then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As this is a soft limit, the actual usage
can be 1000 bytes + size of max message. As soon as a SystemStreamPartition's buffered messages bytes drops
below 1000, a fetch request will be executed to get more data for it.
Increasing this parameter will decrease the latency between when a queue is drained of messages and when new
messages are enqueued, but also leads to an increase in memory usage since more messages will be held in memory.
The default value is -1, which means this is not used.
</td>
</tr>
<tr>
<td class="property" id="task-checkpoint-system">task.checkpoint.system</td>
<td class="default"></td>
<td class="description">
This property is required if you are using Kafka for checkpoints
(<a href="#task-checkpoint-factory" class="property">task.checkpoint.factory</a>
<code>= org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory</code>).
You must set it to the <span class="system">system-name</span> of a Kafka system. The stream
name (topic name) within that system is automatically determined from the job name and ID:
<code>__samza_checkpoint_${<a href="#job-name" class="property">job.name</a>}_${<a href="#job-id" class="property">job.id</a>}</code>
(with underscores in the job name and ID replaced by hyphens).
</td>
</tr>
<tr>
<td class="property" id="task-checkpoint-replication-factor">task.checkpoint.<br>replication.factor</td>
<td class="default">3</td>
<td class="description">
If you are using Kafka for checkpoints, this is the number of Kafka nodes to which you want the
checkpoint topic replicated for durability.
</td>
</tr>
<tr>
<td class="property" id="task-checkpoint-segment-bytes">task.checkpoint.<br>segment.bytes</td>
<td class="default">26214400</td>
<td class="description">
If you are using Kafka for checkpoints, this is the segment size to be used for the checkpoint
topic's log segments. Keeping this number small is useful because it increases the frequency
that Kafka will garbage collect old checkpoints.
</td>
</tr>
<tr>
<td class="property" id="task-checkpoint-max-message-bytes">task.checkpoint.<br>max.message.bytes</td>
<td class="default">1000012</td>
<td class="description">
If you are using Kafka for checkpoints, this sets the largest record size for the checkpoint
topic.
</td>
</tr>
<tr>
<td class="property" id="store-changelog-replication-factor">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
<td class="default">stores.default.changelog.replication.factor</td>
<td class="description">
The property defines the number of replicas to use for the change log stream.
</td>
</tr>
<tr>
<td class="property" id="store-default-changelog-replication-factor">stores.default.changelog.replication.factor</td>
<td class="default">2</td>
<td class="description">
This property defines the default number of replicas to use for the change log stream.
</td>
</tr>
<tr>
<td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>kafka.topic-level-property</td>
<td class="default"></td>
<td class="description">
The property allows you to specify topic level settings for the changelog topic to be created.
For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete".
Please refer to http://kafka.apache.org/documentation.html#configuration for more topic level configurations.
</td>
</tr>
<tr>
<td class="property" id="store-default-changelog-min-compaction-lag-ms">stores.default.changelog.min.compaction.lag.ms</td>
<td class="default">14400000</td>
<td class="description">
This property defines the default minimum period that must pass before a changelog message can be compacted.
Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be.
This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.
</td>
</tr>
<tr>
<td class="property" id="store-changelog-min-compaction-lag-ms">stores.<span class="store">store-name</span>.changelog.min.compaction.lag.ms</td>
<td class="default">stores.default.changelog.min.compaction.lag.ms</td>
<td class="description">
This property defines the minimum period that must pass before a message in the store's changelog can be compacted.
Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be.
This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="regex-rewriter">
Consuming all Kafka topics matching a regular expression<br>
<span class="subtitle">
(This section applies if you have set
<a href="#job-config-rewriter-class" class="property">job.config.rewriter.*.class</a>
<code>= org.apache.samza.config.RegExTopicGenerator</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="job-config-rewriter-system">job.config.rewriter.<br><span class="rewriter">rewriter-name</span>.system</td>
<td class="default"></td>
<td class="description">
Set this property to the <span class="system">system-name</span> of the Kafka system
from which you want to consume all matching topics.
</td>
</tr>
<tr>
<td class="property" id="job-config-rewriter-regex">job.config.rewriter.<br><span class="rewriter">rewriter-name</span>.regex</td>
<td class="default"></td>
<td class="description">
A regular expression specifying which topics you want to consume within the Kafka system
<a href="#job-config-rewriter-system" class="property">job.config.rewriter.*.system</a>.
Any topics matched by this regular expression will be consumed <em>in addition to</em> any
topics you specify with <a href="#task-inputs" class="property">task.inputs</a>.
</td>
</tr>
<tr>
<td class="property" id="job-config-rewriter-config">job.config.rewriter.<br><span class="rewriter">rewriter-name</span>.config.*</td>
<td class="default"></td>
<td class="description">
Any properties specified within this namespace are applied to the configuration of streams
that match the regex in
<a href="#job-config-rewriter-regex" class="property">job.config.rewriter.*.regex</a>.
For example, you can set <code>job.config.rewriter.*.config.samza.msg.serde</code> to configure
the deserializer for messages in the matching streams, which is equivalent to setting
<a href="#systems-samza-msg-serde" class="property">systems.*.streams.*.samza.msg.serde</a>
for each topic that matches the regex.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="state"><a href="../container/state-management.html">Storage and State Management</a></th>
</tr>
<tr>
<td class="property" id="stores-factory">stores.<span class="store">store-name</span>.factory</td>
<td class="default"></td>
<td class="description">
This property defines a store, Samza's mechanism for efficient
<a href="../container/state-management.html">stateful stream processing</a>. You can give a
store any <span class="store">store-name</span> except <em>default</em> (the <span class="store">store-name</span>
<em>default</em> is reserved for defining default store parameters), and use that name to get a
reference to the store in your stream task (call
<a href="../api/javadocs/org/apache/samza/context/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a>
in your task's
<a href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.context.Context)">init()</a>
method). The value of this property is the fully-qualified name of a Java class that implements
<a href="../api/javadocs/org/apache/samza/storage/StorageEngineFactory.html">StorageEngineFactory</a>.
Samza currently ships with one storage engine implementation:
<dl>
<dt><code>org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory</code></dt>
<dd>An on-disk storage engine with a key-value interface, implemented using
<a href="http://rocksdb.org/">RocksDB</a>. It supports fast random-access
reads and writes, as well as range queries on keys. RocksDB can be configured with
various <a href="#keyvalue-rocksdb">additional tuning parameters</a>.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="stores-key-serde">stores.<span class="store">store-name</span>.key.serde</td>
<td class="default"></td>
<td class="description">
If the storage engine expects keys in the store to be simple byte arrays, this
<a href="../container/serialization.html">serde</a> allows the stream task to access the
store using another object type as key. The value of this property must be a
<span class="serde">serde-name</span> that is registered with
<a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
If this property is not set, keys are passed unmodified to the storage engine
(and the <a href="#stores-changelog">changelog stream</a>, if appropriate).
</td>
</tr>
<tr>
<td class="property" id="stores-msg-serde">stores.<span class="store">store-name</span>.msg.serde</td>
<td class="default"></td>
<td class="description">
If the storage engine expects values in the store to be simple byte arrays, this
<a href="../container/serialization.html">serde</a> allows the stream task to access the
store using another object type as value. The value of this property must be a
<span class="serde">serde-name</span> that is registered with
<a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
If this property is not set, values are passed unmodified to the storage engine
(and the <a href="#stores-changelog">changelog stream</a>, if appropriate).
</td>
</tr>
<tr>
<td class="property" id="stores-changelog">stores.<span class="store">store-name</span>.changelog</td>
<td class="default"></td>
<td class="description">
Samza stores are local to a container. If the container fails, the contents of the
store are lost. To prevent loss of data, you need to set this property to configure
a changelog stream: Samza then ensures that writes to the store are replicated to
this stream, and the store is restored from this stream after a failure. The value
of this property is given in the form
<span class="system">system-name</span>.<span class="stream">stream-name</span>.
The "system-name" part is optional. If it is omitted you must specify the system in
<a href="#job-changelog-system" class="property">job.changelog.system</a> config.
Any output stream can be used as changelog, but you must ensure that only one job
ever writes to a given changelog stream (each instance of a job and each store
needs its own changelog stream).
</td>
</tr>
<tr>
<td class="property" id="stores-changelog-max-message-size-bytes">stores.<span class="store">store-name</span>.changelog.max.message.size.bytes</td>
<td class="default">1048576</td>
<td class="description">
This property sets the maximum size of the messages allowed in the changelog.
The default value is 1 MB.
</td>
</tr>
<tr>
<td class="property" id="stores-disallow-large-messages">stores.<span class="store">store-name</span>.disallow.large.messages</td>
<td class="default">false</td>
<td class="description">
This property, when turned on, tells the system to expect large messages to be put in the stores
and disallows them. It looks out for any large messages greater than
<a href="#stores-changelog-max-message-size-bytes" class="property">stores.*.changelog.max.message.size.bytes</a>
and throws a SamzaException when it finds one, stating that the record is too large.
In the case of using CachedStore, it will serialize the message first, validate
its size and then cache it if the size is under the permissible limit.
Note that if enabled retroactively, this may cause a performance regression due to the pre-caching serialization.
When this property is turned on, the <a href="#stores-drop-large-messages" class="property">stores.*.drop.large.messages</a>
configuration is ignored. The default value for this config is false. When this property is not set,
<a href="#stores-drop-large-messages" class="property">stores.*.drop.large.messages</a>
determines the large message handling behavior.
</td>
</tr>
<tr>
<td class="property" id="stores-drop-large-messages">stores.<span class="store">store-name</span>.drop.large.messages</td>
<td class="default">false</td>
<td class="description">
This property, when turned on, causes messages larger than
<a href="#stores-changelog-max-message-size-bytes" class="property">stores.*.changelog.max.message.size.bytes</a>
to be dropped from the underlying store and changelog. No exception is thrown when a large message is encountered.
For the case when storing messages in the cache is enabled along with this config being
turned on (look at the <a href="#stores-rocksdb-object-cache-size" class="property">stores.*.object.cache.size</a>
config), the large message is stored in the cache but is not written to the
changelog and underlying store, resulting in an inconsistent state temporarily.
When this property is turned off, large messages will be sent to the changelog topic as is,
and may cause the container to fail during commit.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="keyvalue-rocksdb">
Using RocksDB for key-value storage<br>
<span class="subtitle">
(This section applies if you have set
<a href="#stores-factory" class="property">stores.*.factory</a>
<code>= org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="stores-rocksdb-write-batch-size">stores.<span class="store">store-name</span>.<br>write.batch.size</td>
<td class="default">500</td>
<td class="description">
For better write performance, the storage engine buffers writes and applies them
to the underlying store in a batch. If the same key is written multiple times
in quick succession, this buffer also deduplicates writes to the same key. This
property is set to the number of key/value pairs that should be kept in this
in-memory buffer, per task instance. The number cannot be greater than
<a href="#stores-rocksdb-object-cache-size" class="property">stores.*.object.cache.size</a>.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-object-cache-size">stores.<span class="store">store-name</span>.<br>object.cache.size</td>
<td class="default">1000</td>
<td class="description">
Samza maintains an additional cache in front of RocksDB for frequently-accessed
objects. This cache contains deserialized objects (avoiding the deserialization
overhead on cache hits), in contrast to the RocksDB block cache
(<a href="#stores-rocksdb-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>),
which caches serialized objects. This property determines the number of objects
to keep in Samza's cache, per task instance. This same cache is also used for
write buffering (see <a href="#stores-rocksdb-write-batch-size" class="property">stores.*.write.batch.size</a>).
A value of 0 disables all caching and batching.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-container-cache-size-bytes">stores.<span class="store">store-name</span>.container.<br>cache.size.bytes</td>
<td class="default">104857600</td>
<td class="description">
The size of RocksDB's block cache in bytes, per container. If there are several
task instances within one container, each is given a proportional share of this cache.
Note that this is an off-heap memory allocation, so the container's total memory use
is the maximum JVM heap size <em>plus</em> the size of this cache.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-container-write-buffer-size-bytes">stores.<span class="store">store-name</span>.container.<br>write.buffer.size.bytes</td>
<td class="default">33554432</td>
<td class="description">
The amount of memory (in bytes) that RocksDB uses for buffering writes before they are
written to disk, per container. If there are several task instances within one
container, each is given a proportional share of this buffer. This setting also
determines the size of RocksDB's segment files.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-compression">stores.<span class="store">store-name</span>.<br>rocksdb.compression</td>
<td class="default">snappy</td>
<td class="description">
This property controls whether RocksDB should compress data on disk and in the
block cache. The following values are valid:
<dl>
<dt><code>snappy</code></dt>
<dd>Compress data using the <a href="https://code.google.com/p/snappy/">Snappy</a> codec.</dd>
<dt><code>bzip2</code></dt>
<dd>Compress data using the <a href="http://en.wikipedia.org/wiki/Bzip2">bzip2</a> codec.</dd>
<dt><code>zlib</code></dt>
<dd>Compress data using the <a href="http://en.wikipedia.org/wiki/Zlib">zlib</a> codec.</dd>
<dt><code>lz4</code></dt>
<dd>Compress data using the <a href="https://code.google.com/p/lz4/">lz4</a> codec.</dd>
<dt><code>lz4hc</code></dt>
<dd>Compress data using the <a href="https://code.google.com/p/lz4/">lz4hc</a> (high compression) codec.</dd>
<dt><code>none</code></dt>
<dd>Do not compress data.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-block-size-bytes">stores.<span class="store">store-name</span>.<br>rocksdb.block.size.bytes</td>
<td class="default">4096</td>
<td class="description">
If compression is enabled, RocksDB groups approximately this many uncompressed bytes
into one compressed block. You probably don't need to change this property.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-ttl">stores.<span class="store">store-name</span>.<br>rocksdb.ttl.ms</td>
<td class="default"></td>
<td class="description">
The time-to-live of the store. Please note it's not a strict TTL limit (removed only
after compaction). Please use caution opening a database with and without TTL, as it might corrupt the
database. Please make sure to read the <a href="https://github.com/facebook/rocksdb/wiki/Time-to-Live">constraints</a> before using.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-compaction-style">stores.<span class="store">store-name</span>.<br>rocksdb.compaction.style</td>
<td class="default">universal</td>
<td class="description">
This property controls the compaction style that RocksDB will employ when compacting its levels. The following values are valid:
<dl>
<dt><code>universal</code></dt>
<dd>Use <a href="https://github.com/facebook/rocksdb/wiki/Universal-Compaction">universal</a> compaction.</dd>
<dt><code>fifo</code></dt>
<dd>Use <a href="https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style">FIFO</a> compaction.</dd>
<dt><code>level</code></dt>
<dd>Use RocksDB's standard leveled compaction.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-num-write-buffers">stores.<span class="store">store-name</span>.<br>rocksdb.num.write.buffers</td>
<td class="default">3</td>
<td class="description">
Configures the <a href="https://github.com/facebook/rocksdb/wiki/Basic-Operations#write-buffer">number of write buffers</a> that a RocksDB store uses. This allows RocksDB to continue taking writes to other buffers even while a given write buffer is being flushed to disk.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-log-file-size">stores.<span class="store">store-name</span>.<br>rocksdb.max.log.file.size.bytes</td>
<td class="default">67108864</td>
<td class="description">
The maximum size in bytes of the RocksDB LOG file before it is rotated.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-num-log-files">stores.<span class="store">store-name</span>.<br>rocksdb.keep.log.file.num</td>
<td class="default">2</td>
<td class="description">
The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.
</td>
</tr>
<tr>
<td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td>
<td class="default"></td>
<td class="description">
A list of RocksDB <a href="https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409">properties</a> to expose as metrics (gauges).
</td>
</tr>
<tr>
<th colspan="3" class="section" id="cluster-manager">
Running Samza with a cluster manager<br>
</th>
</tr>
<tr>
<td class="property" id="cluster-manager-container-memory-mb">cluster-manager.container.<br>memory.mb</td>
<td class="default">1024</td>
<td class="description">
How much memory, in megabytes, to request from the cluster manager per container of your job. Along with
<a href="#cluster-manager-container-cpu-cores" class="property">cluster-manager.container.cpu.cores</a>, this
property determines how many containers the cluster manager will run on one machine. If the container
exceeds this limit, it will be killed, so it is important that the container's actual
memory use remains below the limit. The amount of memory used is normally the JVM heap
size (configured with <a href="#task-opts" class="property">task.opts</a>), plus the
size of any off-heap memory allocation (for example
<a href="#stores-rocksdb-container-cache-size-bytes" class="property">stores.*.container.cache.size.bytes</a>),
plus a safety margin to allow for JVM overheads.
</td>
</tr>
<tr>
<td class="property" id="cluster-manager-container-cpu-cores">cluster-manager.container.<br>cpu.cores</td>
<td class="default">1</td>
<td class="description">
The number of CPU cores to request per container of your job. Each node in the
cluster has a certain number of CPU cores available, so this number (along with
<a href="#cluster-manager-container-memory-mb" class="property">cluster-manager.container.memory.mb</a>)
determines how many containers can be run on one machine.
</td>
</tr>
<tr>
<td class="property" id="cluster-manager-container-retry-count">cluster-manager.container.<br>retry.count</td>
<td class="default">8</td>
<td class="description">
If a container fails, it is automatically restarted by Samza. However, if a container keeps
failing shortly after startup, that indicates a deeper problem, so we should kill the job
rather than retrying indefinitely. This property determines the maximum number of times we are
willing to restart a failed container in quick succession (the time period is configured with
<a href="#cluster-manager-container-retry-window-ms" class="property">cluster-manager.container.retry.window.ms</a>).
Each container in the job is counted separately. If this property is set to 0, any failed
container immediately causes the whole job to fail. If it is set to a negative number, there
is no limit on the number of retries.
</td>
</tr>
<tr>
<td class="property" id="cluster-manager-container-retry-window-ms">cluster-manager.container.<br>retry.window.ms</td>
<td class="default">300000</td>
<td class="description">
This property determines how frequently a container is allowed to fail before we give up and
fail the job. If the same container has failed more than
<a href="#cluster-manager-container-retry-count" class="property">cluster-manager.container.retry.count</a>
times, and the time between failures was less than this property
<code>cluster-manager.container.retry.window.ms</code> (in milliseconds), then we fail the job.
There is no limit to the number of times we will restart a container if the time between
failures is greater than <code>cluster-manager.container.retry.window.ms</code>.
</td>
</tr>
<tr>
<td class="property" id="cluster-manager-jmx-enabled">cluster-manager.jobcoordinator.<br>jmx.enabled</td>
<td class="default">true</td>
<td class="description">
Determines whether a JMX server should be started on the job's JobCoordinator.
(<code>true</code> or <code>false</code>).
</td>
</tr>
<tr>
<td class="property" id="cluster-manager-allocator-sleep-ms">cluster-manager.allocator.<br>sleep.ms</td>
<td class="default">3600</td>
<td class="description">
The container allocator thread is responsible for matching requests to allocated containers.
The sleep interval for this thread is configured using this property.
</td>
</tr>
<tr>
<td class="property" id="cluster-manager-container-request-timeout-ms">cluster-manager.container.<br>request.timeout.ms</td>
<td class="default">5000</td>
<td class="description">
The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource.
This property determines the number of milliseconds before a container request is considered to have expired / timed-out.
When a request expires, it gets allocated to any available container that was returned by the cluster manager.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="yarn">
Running your job on a <a href="../jobs/yarn-jobs.html">YARN</a> cluster<br>
<span class="subtitle">
(This section applies if you have set
<a href="#job-factory-class" class="property">job.factory.class</a>
<code>= org.apache.samza.job.yarn.YarnJobFactory</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="yarn-package-path">yarn.package.path</td>
<td class="default"></td>
<td class="description">
<strong>Required for YARN jobs:</strong> The URL from which the job package can
be downloaded, for example a <code>http://</code> or <code>hdfs://</code> URL.
The job package is a .tar.gz file with a
<a href="../jobs/packaging.html">specific directory structure</a>.
</td>
</tr>
<tr>
<td class="property" id="yarn-container-memory-mb">yarn.container.memory.mb</td>
<td class="default">1024</td>
<td class="description">
This is deprecated in favor of
<a href="#cluster-manager-container-memory-mb" class="property">cluster-manager.container.memory.mb</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-container-cpu-cores">yarn.container.cpu.cores</td>
<td class="default">1</td>
<td class="description">
This is deprecated in favor of
<a href="#cluster-manager-container-cpu-cores" class="property">cluster-manager.container.cpu.cores</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-container-retry-count">yarn.container.<br>retry.count</td>
<td class="default">8</td>
<td class="description">
This is deprecated in favor of
<a href="#cluster-manager-container-retry-count" class="property">cluster-manager.container.retry.count</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-container-retry-window-ms">yarn.container.<br>retry.window.ms</td>
<td class="default">300000</td>
<td class="description">
This is deprecated in favor of
<a href="#cluster-manager-container-retry-window-ms" class="property">cluster-manager.container.retry.window.ms</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-am-container-memory-mb">yarn.am.container.<br>memory.mb</td>
<td class="default">1024</td>
<td class="description">
Each Samza job when running in Yarn has one special container, the
<a href="../yarn/application-master.html">ApplicationMaster</a> (AM), which manages the
execution of the job. This property determines how much memory, in megabytes, to request
from YARN for running the ApplicationMaster.
</td>
</tr>
<tr>
<td class="property" id="yarn-am-opts">yarn.am.opts</td>
<td class="default"></td>
<td class="description">
Any JVM options to include in the command line when executing the Samza
<a href="../yarn/application-master.html">ApplicationMaster</a>. For example, this can be
used to set the JVM heap size, to tune the garbage collector, or to enable remote debugging.
</td>
</tr>
<tr>
<td class="property" id="yarn-am-java-home">yarn.am.java.home</td>
<td class="default"></td>
<td class="description">
The JAVA_HOME path for Samza AM. By setting this property, you can use a java version that is
different from your cluster's java version. Remember to set the <code>task.java.home</code> as well.
<dl>
<dt>Example: <code>yarn.am.java.home=/usr/java/jdk1.8.0_05</code></dt>
</dl>
</td>
</tr>
<tr>
<td class="property" id="yarn-am-poll-interval-ms">yarn.am.poll.interval.ms</td>
<td class="default">1000</td>
<td class="description">
The Samza ApplicationMaster sends regular heartbeats to the YARN ResourceManager
to confirm that it is alive. This property determines the time (in milliseconds)
between heartbeats.
</td>
</tr>
<tr>
<td class="property" id="yarn-am-jmx-enabled">yarn.am.jmx.enabled</td>
<td class="default">true</td>
<td class="description">
This is deprecated in favor of
<a href="#cluster-manager-jmx-enabled" class="property">cluster-manager.jobcoordinator.jmx.enabled</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-allocator-sleep-ms">yarn.allocator.sleep.ms</td>
<td class="default">3600</td>
<td class="description">
This is deprecated in favor of
<a href="#cluster-manager-allocator-sleep-ms" class="property">cluster-manager.allocator.sleep.ms</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-samza-host_affinity-enabled">yarn.samza.host-affinity.enabled</td>
<td class="default">false</td>
<td class="description">
This is deprecated in favor of
<a href="#job-host_affinity-enabled" class="property">job.host-affinity.enabled</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-container-request-timeout-ms">yarn.container.request.timeout.ms</td>
<td class="default">5000</td>
<td class="description">
This is deprecated in favor of
<a href="#cluster-manager-container-request-timeout-ms" class="property">cluster-manager.container.request.timeout.ms</a>
</td>
</tr>
<tr>
<td class="property" id="yarn-queue">yarn.queue</td>
<td class="default"></td>
<td class="description">
Determines which YARN queue will be used for Samza job.
</td>
</tr>
<tr>
<td class="property" id="yarn-kerberos-principal">yarn.kerberos.principal</td>
<td class="default"></td>
<td class="description">
Principal the Samza job uses to authenticate itself into KDC, when running on a Kerberos enabled YARN cluster.
</td>
</tr>
<tr>
<td class="property" id="yarn-kerberos-keytab">yarn.kerberos.keytab</td>
<td class="default"></td>
<td class="description">
The full path to the file containing keytab for the principal, specified by <a href="#yarn-kerberos-principal" class="property">yarn.kerberos.principal</a>.
The keytab file is uploaded to the staging directory unique to each application on HDFS and the Application Master then uses the keytab and principal to
periodically logs in to recreate the delegation tokens.
</td>
</tr>
<tr>
<td class="property" id="yarn-job-view-acl">yarn.job.view.acl</td>
<td class="default"></td>
<td class="description">
This is for secured YARN cluster only.
The 'viewing' acl of the YARN application that controls who can view the application (e.g. application status, logs).
See <a href="https://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/yarn/api/records/ApplicationAccessType.html">ApplicationAccessType</a> for more details.
</td>
</tr>
<tr>
<td class="property" id="yarn-job-modify-acl">yarn.job.modify.acl</td>
<td class="default"></td>
<td class="description">
This is for secured YARN cluster only.
The 'modify' acl of the YARN application that controls who can modify the application (e.g. killing the application).
See <a href="https://hadoop.apache.org/docs/r2.6.0/api/org/apache/hadoop/yarn/api/records/ApplicationAccessType.html">ApplicationAccessType</a> for more details.
</td>
</tr>
<tr>
<td class="property" id="yarn-token-renewal-interval-seconds">yarn.token.renewal.interval.seconds</td>
<td class="default"></td>
<td class="description">
The time interval by which the Application Master re-authenticates and renew the delegation tokens. This value should be smaller than the length of time a delegation token is valid on hadoop namenodes before expiration.
</td>
</tr>
<tr>
<td class="property" id="yarn-resources-resource-name-path">yarn.resources.<span class="resource">resource-name</span>.path</td>
<td class="default"></td>
<td class="description">
The path for localizing the resource for <span class="resource">resource-name</span>. The scheme (e.g. http, ftp, hdsf, file, etc) in the path should be configured in YARN core-site.xml as fs.&lt;scheme&gt;.impl and is associated with a <a href="https://hadoop.apache.org/docs/stable/api/index.html?org/apache/hadoop/fs/FileSystem.html">FileSystem</a>.
If defined, the resource will be localized in the Samza application directory before the Samza job runs. More details can be found <a href="../yarn/yarn-resource-localization.html">here</a>.
</td>
</tr>
<tr>
<td class="property" id="yarn-resources-resource-name-local-name">yarn.resources.<span class="resource">resource-name</span>.local.name</td>
<td class="default"><span class="resource">resource-name</span></td>
<td class="description">
The new local name for the resource after localization.
This configuration only applies when yarn.resources.<span class="resource">resource-name</span>.path is configured.
</td>
</tr>
<tr>
<td class="property" id="yarn-resources-resource-name-local-type">yarn.resources.<span class="resource">resource-name</span>.local.type</td>
<td class="default">FILE</td>
<td class="description">
The type for the resource after localization. It can be ARCHIVE (archived directory), FILE, or PATTERN (the entries extracted from the archive with the pattern).
This configuration only applies when yarn.resources.<span class="resource">resource-name</span>.path is configured.
</td>
</tr>
<tr>
<td class="property" id="yarn-resources-resource-name-local-visibility">yarn.resources.<span class="resource">resource-name</span>.local.visibility</td>
<td class="default">APPLICATION</td>
<td class="description">
The visibility for the resource after localization. It can be PUBLIC (visible to everyone), PRIVATE (visible to all Samza applications of the same account user as this application), or APPLICATION (visible to only this Samza application).
This configuration only applies when yarn.resources.<span class="resource">resource-name</span>.path is configured.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="metrics"><a href="../container/metrics.html">Metrics</a></th>
</tr>
<tr>
<td class="property" id="metrics-reporter-class">metrics.reporter.<br><span class="reporter">reporter-name</span>.class</td>
<td class="default"></td>
<td class="description">
Samza automatically tracks various metrics which are useful for monitoring the health
of a job, and you can also track <a href="../container/metrics.html">your own metrics</a>.
With this property, you can define any number of <em>metrics reporters</em> which send
the metrics to a system of your choice (for graphing, alerting etc). You give each reporter
an arbitrary <span class="reporter">reporter-name</span>. To enable the reporter, you need
to reference the <span class="reporter">reporter-name</span> in
<a href="#metrics-reporters" class="property">metrics.reporters</a>.
The value of this property is the fully-qualified name of a Java class that implements
<a href="../api/javadocs/org/apache/samza/metrics/MetricsReporterFactory.html">MetricsReporterFactory</a>.
Samza ships with these implementations by default:
<dl>
<dt><code>org.apache.samza.metrics.reporter.JmxReporterFactory</code></dt>
<dd>With this reporter, every container exposes its own metrics as JMX MBeans. The JMX
server is started on a <a href="../container/jmx.html">random port</a> to avoid
collisions between containers running on the same machine.</dd>
<dt><code>org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory</code></dt>
<dd>This reporter sends the latest values of all metrics as messages to an output
stream once per minute. The output stream is configured with
<a href="#metrics-reporter-stream" class="property">metrics.reporter.*.stream</a>
and it can use any system supported by Samza.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="metrics-reporters">metrics.reporters</td>
<td class="default"></td>
<td class="description">
If you have defined any metrics reporters with
<a href="#metrics-reporter-class" class="property">metrics.reporter.*.class</a>, you
need to list them here in order to enable them. The value of this property is a
comma-separated list of <span class="reporter">reporter-name</span> tokens.
</td>
</tr>
<tr>
<td class="property" id="metrics-reporter-stream">metrics.reporter.<br><span class="reporter">reporter-name</span>.stream</td>
<td class="default"></td>
<td class="description">
If you have registered the metrics reporter
<a href="#metrics-reporter-class" class="property">metrics.reporter.*.class</a>
<code>= org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory</code>,
you need to set this property to configure the output stream to which the metrics data
should be sent. The stream is given in the form
<span class="system">system-name</span>.<span class="stream">stream-name</span>,
and the system must be defined in the job configuration. It's fine for many different jobs
to publish their metrics to the same metrics stream. Samza defines a simple
<a href="../container/metrics.html">JSON encoding</a> for metrics; in order to use this
encoding, you also need to configure a serde for the metrics stream:
<ul>
<li><a href="#systems-samza-msg-serde" class="property">streams.*.samza.msg.serde</a>
<code>= metrics-serde</code> (replacing the asterisk with the
<span class="stream">stream-name</span> of the metrics stream)</li>
<li><a href="#serializers-registry-class" class="property">serializers.registry.metrics-serde.class</a>
<code>= org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code>
(registering the serde under a <span class="serde">serde-name</span> of
<code>metrics-serde</code>)</li>
</ul>
</td>
</tr>
<tr>
<td class="property" id="metrics-reporter-polling-interval">metrics.reporter.<br><span class="reporter">reporter-name</span>.interval</td>
<td class="default"></td>
<td class="description">
If you have registered the metrics reporter
<a href="#metrics-reporter-class" class="property">metrics.reporter.*.class</a>
<code>= org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory</code>,
you can use this property to configure how frequently the reporter will report the metrics
registered with it. The value for this property should be length of the interval between
consecutive metric reporting. This value is in seconds, and should be a positive integer value.
This property is optional and set to 60 by default, which means metrics will be reported every
60 seconds.
</td>
</tr>
<tr>
<td class="property" id="metrics-timer-enabled">metrics.timer.enabled</td>
<td class="default">true</td>
<td class="description">
This setting enables the common timer metrics for your job, which include container metrics
such as process-ns, window-ns, commit-ns, and block-ns, as well as key-value storage engine
metrics such as get-ns, put-ns and range-ns.
</td>
</tr>
<tr>
<td class="property" id="metrics-timer-debug-eanbled">metrics.timer.debug.enabled</td>
<td class="default">false</td>
<td class="description">
This setting enables the additional timer metrics for debugging of your job. These metrics
include operator metrics such as handle-message-ns and handle-timer-ns.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="hdfs-system-producer"><a href="../hdfs/producer.html">Writing to HDFS</a></th>
</tr>
<tr>
<td class="property" id="hdfs-writer-class">systems.<span class="system">system-name</span>.<br>.producer.hdfs.writer.class</td>
<td class="default">org.apache.samza.system.hdfs.<br>writer.BinarySequenceFileHdfsWriter</td>
<td class="description">Fully-qualified class name of the HdfsWriter implementation this HDFS Producer system should use</td>
</tr>
<tr>
<td class="property" id="hdfs-compression-type">systems.<span class="system">system-name</span>.<br>.producer.hdfs.compression.type</td>
<td class="default">none</td>
<td class="description">A human-readable label for the compression type to use, such as "gzip" "snappy" etc. This label will be interpreted differently (or ignored) depending on the nature of the HdfsWriter implementation.</td>
</tr>
<tr>
<td class="property" id="hdfs-base-output-dir">systems.<span class="system">system-name</span>.<br>.producer.hdfs.base.output.dir</td>
<td class="default">/user/USERNAME/SYSTEMNAME</td>
<td class="description">The base output directory for HDFS writes. Defaults to the home directory of the user who ran the job, followed by the systemName for this HdfsSystemProducer as defined in the job.properties file.</td>
</tr>
<tr>
<td class="property" id="hdfs-bucketer-class">systems.<span class="system">system-name</span>.<br>.producer.hdfs.bucketer.class</td>
<td class="default">org.apache.samza.system.hdfs.<br>writer.JobNameDateTimeBucketer</td>
<td class="description">Fully-qualified class name of the Bucketer implementation that will manage HDFS paths and file names. Used to batch writes by time, or other similar partitioning methods.</td>
</tr>
<tr>
<td class="property" id="hdfs-bucketer-date-path-format">systems.<span class="system">system-name</span>.<br>.producer.hdfs.bucketer.date.path.format</td>
<td class="default"yyyy_MM_dd></td>
<td class="description">A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.</td>
</tr>
<tr>
<td class="property" id="hdfs-write-batch-size-bytes">systems.<span class="system">system-name</span>.<br>.producer.hdfs.write.batch.size.bytes</td>
<td class="default">268435456</td>
<td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
</tr>
<tr>
<td class="property" id="hdfs-write-batch-size-records">systems.<span class="system">system-name</span>.<br>.producer.hdfs.write.batch.size.records</td>
<td class="default">262144</td>
<td class="description">The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.</td>
</tr>
<tr>
<th colspan="3" class="section" id="hdfs-system-consumer"><a href="../hdfs/consumer.html">Reading from HDFS</a></th>
</tr>
<tr>
<td class="property" id="hdfs-consumer-buffer-capacity">systems.<span class="system">system-name</span>.<br>.consumer.bufferCapacity</td>
<td class="default">10</td>
<td class="description">Capacity of the hdfs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.</td>
</tr>
<tr>
<td class="property" id="hdfs-consumer-numMaxRetries">systems.<span class="system">system-name</span>.<br>.consumer.numMaxRetries</td>
<td class="default">10</td>
<td class="description">The number of retry attempts when there is a failure to fetch messages from HDFS, before the container fails.</td>
</tr>
<tr>
<td class="property" id="hdfs-partitioner-whitelist">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.whitelist</td>
<td class="default">.*</td>
<td class="description">White list used by directory partitioner to select files in a hdfs directory, in Java Pattern style.</td>
</tr>
<tr>
<td class="property" id="hdfs-partitioner-blacklist">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.blacklist</td>
<td class="default"></td>
<td class="description">Black list used by directory partitioner to filter out unwanted files in a hdfs directory, in Java Pattern style.</td>
</tr>
<tr>
<td class="property" id="hdfs-partitioner-group-pattern">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.groupPattern</td>
<td class="default"></td>
<td class="description">Group pattern used by directory partitioner for advanced partitioning. The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro], and you want to organize the partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a "group identifier", you can then set this property to be "part-[id]-.*" (note that "[id]" is a reserved term here, i.e. you have to literally put it as "[id]"). The partitioner will apply this pattern to all file names and extract the "group identifier" ("[id]" in the pattern), then use the "group identifier" to group files into partitions. See more details in <a href="https://issues.apache.org/jira/secure/attachment/12827670/HDFSSystemConsumer.pdf">HdfsSystemConsumer design doc</a> </td>
</tr>
<tr>
<td class="property" id="hdfs-consumer-reader-type">systems.<span class="system">system-name</span>.<br>.consumer.reader</td>
<td class="default">avro</td>
<td class="description">Type of the file reader for different event formats (avro, plain, json, etc.). "avro" is only type supported for now.</td>
</tr>
<tr>
<td class="property" id="hdfs-staging-directory">systems.<span class="system">system-name</span>.<br>.stagingDirectory</td>
<td class="default"></td>
<td class="description">Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.</td>
</tr>
<tr>
<th colspan="3" class="section" id="eventhub">
Using <a href="https://azure.microsoft.com/en-us/services/event-hubs/">Event Hubs</a> for input and output streams<br>
<span class="subtitle">
(This section applies if you have set
<a href="#systems-samza-factory" class="property">systems.*.samza.factory</a>
<code>= org.apache.samza.system.eventhub.EventHubSystemFactory</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="eventhub-stream-list">systems.<span class="system">system-name</span>.<br>stream.list</td>
<td class="default"></td>
<td class="description">List of Samza <span class="stream">stream-ids</span> used for the Event Hubs system. Required if not using input/output system descriptors.</td>
</tr>
<tr>
<td class="property" id="eventhub-stream-namespace">streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td>
<td class="default"></td>
<td class="description">Namespace of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
</tr>
<tr>
<td class="property" id="eventhub-stream-entity">streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td>
<td class="default"></td>
<td class="description">Entity of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
</tr>
<tr>
<td class="property" id="eventhub-stream-sas-keyname">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td>
<td class="default"></td>
<td class="description">SAS Keyname of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
</tr>
<tr>
<td class="property" id="eventhub-stream-sas-token">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td>
<td class="default"></td>
<td class="description">SAS Token of the associated <span class="stream">stream-ids</span>. Required to access the Event Hubs entity per stream.</td>
</tr>
<tr>
<td class="property" id="eventhub-client-threads">streams.<span class="system">stream-name</span>.<br>eventhubs.numClientThreads</td>
<td class="default">10</td>
<td class="description">Number of threads in thread pool that will be used by the EventHubClient. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_hub_client.create">here </a> for more details.</td>
</tr>
<tr>
<td class="property" id="eventhub-prefetch-count">systems.<span class="system">system-name</span>.<br>eventhubs.prefetchCount</td>
<td class="default">999</td>
<td class="description">Number of events that Event Hubs client should prefetch from the server. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receiver.setprefetchcount">here </a> for more details.</td>
</tr>
<tr>
<td class="property" id="eventhub-max-event-count">systems.<span class="system">system-name</span>.<br>eventhubs.maxEventCountPerPoll</td>
<td class="default">50</td>
<td class="description">Maximum number of events that Event Hubs client can return in a receive call. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__">here </a> for more details.</td>
</tr>
<tr>
<td class="property" id="eventhub-runtime-timeout">systems.<span class="system">system-name</span>.<br>eventhubs.runtime.info.timeout</td>
<td class="default">60000</td>
<td class="description">Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.</td>
</tr>
<tr>
<td class="property" id="eventhub-send-partition-method">systems.<span class="system">system-name</span>.<br>eventhubs.partition.method</td>
<td class="default"><code>EVENT_HUB_HASHING</code></td>
<td class="description">
Producer only config. Configure the method that the message is partitioned for the downstream Event Hubs entity in one of the following ways:
<dl>
<dt><code>ROUND_ROBIN</code></dt>
<dd>The message key and partition key are ignored and the message will be distributed in a round-robin fashion amongst all the partitions in the downstream Event Hubs entity.</dd>
<dt><code>EVENT_HUB_HASHING</code></dt>
<dd>Employs the hashing mechanism in Event Hubs to determine, based on the key of the message, which partition the message should go. Using this method still ensures that all the events with the same key are sent to the same partition in the event hub. If this option is chosen, the partition key used for the hash should be a string. If the partition key is not set, the message key is used instead.</dd>
<dt><code>PARTITION_KEY_AS_PARTITION</code></dt>
<dd>Use the integer key specified by the partition key or key of the message to a specific partition on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity, a modulo operation will be performed to determine the resulting partition. ie. if there are 6 partitions and the key is 9, the message will end up in partition 3. Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</dd>
</dl>
</td>
</tr>
<tr>
<td class="property" id="eventhub-send-key">systems.<span class="system">system-name</span>.<br>eventhubs.send.key</td>
<td class="default">true</td>
<td class="description">
Producer only config. If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData message for Event Hubs. The Samza message key will not be sent otherwise. Note: If the Samza Event Hubs consumer is used, the Samza key is the partition key of the received EventData, or the message key if the partition key is not present.
</td>
</tr>
<tr>
<td class="property" id="eventhub-consumer-group">streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td>
<td class="default"><code>$Default</code></td>
<td class="description">
Consumer only config. Set the consumer group from the upstream Event Hubs that the consumer is part of. Defaults to the <code>$Default</code> group that is initially present in all Event Hubs entities (unless removed)
</td>
</tr>
<tr>
<td class="property" id="eventhub-consumer-buffer-capacity">systems.<span class="system">system-name</span>.<br>eventhubs.receive.queue.size</td>
<td class="default">100</td>
<td class="description">
Consumer only config. Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.
</td>
</tr>
<tr>
<th colspan="3" class="section" id="kinesis">
Using <a href="https://aws.amazon.com/kinesis/">Kinesis</a> for input streams<br>
<span class="subtitle">
(This section applies if you have set
<a href="#systems-samza-factory" class="property">systems.*.samza.factory</a>
<code>= org.apache.samza.system.kinesis.KinesisSystemFactory</code>)
</span>
</th>
</tr>
<tr>
<td class="property" id="kinesis-stream-region">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>aws.region</td>
<td class="default"></td>
<td class="description">Region of the associated <span class="stream">stream-name</span>. Required to access the Kinesis data stream.</td>
</tr>
<tr>
<td class="property" id="kinesis-stream-access-key">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>aws.accessKey</td>
<td class="default"></td>
<td class="description">AccessKey of the associated <span class="stream">stream-name</span>. Required to access Kinesis data stream.</td>
</tr>
<tr>
<td class="property" id="kinesis-stream-secret-key">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>aws.secretKey</td>
<td class="default"></td>
<td class="description">SecretKey of the associated <span class="stream">stream-name</span>. Required to access the Kinesis data stream.</td>
</tr>
<tr>
<td class="property" id="kinesis-stream-aws-kcl-configs">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>aws.kcl.*</td>
<td class="default"></td>
<td class="description"><a href="https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java">AWS Kinesis Client Library configuration</a> associated with the <span class="stream">stream-name</span>.</td>
</tr>
<tr>
<td class="property" id="kinesis-system-aws-client-configs">systems.<span class="system">system-name</span>.<br>aws.clientConfig.*</td>
<td class="default"></td>
<td class="description"><a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html">AWS ClientConfiguration</a> associated with the <span class="system">system-name</span>.</td>
</tr>
</tbody>
</table>
</body>
</html>