blob: 27a965f3ef662cbe4eb3b6e5949e8d5998b59de7 [file] [log] [blame]
<h2 id="configuration"> Configuration </h2>
<h3> Important configuration properties for Kafka broker: </h3>
<p>More details about server configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>
<table class="data-table">
<tr>
<th>name</th>
<th>default</th>
<th>description</th>
</tr>
<tr>
<td><code>brokerid</code></td>
<td>none</td>
<td>Each broker is uniquely identified by an id. This id serves as the brokers "name", and allows the broker to be moved to a different host/port without confusing consumers.</td>
</tr>
<tr>
<td><code>enable.zookeeper</code></td>
<td>true</td>
<td>enable zookeeper registration in the server</td>
</tr>
<tr>
<td><code>log.flush.interval</code></td>
<td>500</td>
<td>Controls the number of messages accumulated in each topic (partition) before the data is flushed to disk and made available to consumers.</td>
</tr>
<tr>
<td><code>log.default.flush.scheduler.interval.ms</code></td>
<td>3000</td>
<td>Controls the interval at which logs are checked to see if they need to be flushed to disk. A background thread will run at a frequency specified by this parameter and will check each log to see if it has exceeded its flush.interval time, and if so it will flush it.</td>
</tr>
<tr>
<td><code>log.default.flush.interval.ms</code> </td>
<td>log.default.flush.scheduler.interval.ms</td>
<td>Controls the maximum time that a message in any topic is kept in memory before flushed to disk. The value only makes sense if it's a multiple of <code>log.default.flush.scheduler.interval
.ms</code></td>
</tr>
<tr>
<td><code>topic.flush.intervals.ms</code></td>
<td>none</td>
<td>Per-topic overrides for <code>log.default.flush.interval.ms</code>. Controls the maximum time that a message in selected topics is kept in memory before flushed to disk. The per-topic value only makes sense if it's a multiple of <code>log.default.flush.scheduler.interval.ms</code>. E.g., topic1:1000,topic2:2000</td>
</tr>
<tr>
<td><code>log.retention.hours</code></td>
<td>168</td>
<td>Controls how long a log file is retained.</td>
</tr>
<tr>
<td><code>topic.log.retention.hours</code></td>
<td>none</td>
<td>Topic-specific retention time that overrides <code>log.retention.hours</code>, e.g., topic1:10,topic2:20</td>
</tr>
<tr>
<td><code>log.retention.size</code></td>
<td>-1</td>
<td>the maximum size of the log before deleting it. This controls how large a log is allowed to grow</td>
</tr>
<tr>
<td><code>log.cleanup.interval.mins</code></td>
<td>10</td>
<td>Controls how often the log cleaner checks logs eligible for deletion. A log file is eligible for deletion if it hasn't been modified for <code>log.retention.hours</code> hours.</td>
</tr>
<tr>
<td><code>log.dir</code></td>
<td>none</td>
<td>Specifies the root directory in which all log data is kept.</td>
</tr>
<tr>
<td><code>log.file.size</code></td>
<td>1*1024*1024*1024</td>
<td>Controls the maximum size of a single log file.</td>
</tr>
<tr>
<td><code>log.roll.hours</code></td>
<td>24 * 7</td>
<td>The maximum time before a new log segment is rolled out</td>
</tr>
<tr>
<td><code>max.socket.request.bytes<code></td>
<td>104857600</td>
<td>the maximum number of bytes in a socket request</td>
</tr>
<tr>
<td><code>monitoring.period.secs<code></td>
<td>600</td>
<td>the interval in which to measure performance statistics</td>
</tr>
<tr>
<td><code>num.threads</code></td>
<td>Runtime.getRuntime().availableProcessors</td>
<td>Controls the number of worker threads in the broker to serve requests.</td>
</tr>
<tr>
<td><code>num.partitions</code></td>
<td>1</td>
<td>Specifies the default number of partitions per topic.</td>
</tr>
<tr>
<td><code>socket.send.buffer</code></td>
<td>102400</td>
<td>the SO_SNDBUFF buffer of the socket sever sockets</td>
</tr>
<tr>
<td><code>socket.receive.buffer</code></td>
<td>102400</td>
<td>the SO_RCVBUFF buffer of the socket sever sockets</td>
</tr>
<tr>
<td><code>topic.partition.count.map</code></td>
<td>none</td>
<td>Override parameter to control the number of partitions for selected topics. E.g., topic1:10,topic2:20</td>
</tr>
<tr>
<td><code>zk.connect</code></td>
<td>localhost:2182/kafka</td>
<td>Specifies the zookeeper connection string in the form hostname:port/chroot. Here the chroot is a base directory which is prepended to all path operations (this effectively namespaces all kafka znodes to allow sharing with other applications on the same zookeeper cluster)</td>
</tr>
<tr>
<td><code>zk.connectiontimeout.ms</code> </td>
<td>6000</td>
<td>Specifies the max time that the client waits to establish a connection to zookeeper.</td>
</tr>
<tr>
<td><code>zk.sessiontimeout.ms</code> </td>
<td>6000</td>
<td>The zookeeper session timeout.</td>
</tr>
<tr>
<td><code>zk.synctime.ms</code></td>
<td>2000</td>
<td>Max time for how far a ZK follower can be behind a ZK leader</td>
</tr>
</table>
<h3> Important configuration properties for the high-level consumer: </h3>
<p>More details about consumer configuration can be found in the scala class <code>kafka.consumer.ConsumerConfig</code>.</p>
<table class="data-table">
<tr>
<th>property</th>
<th>default</th>
<th>description</th>
</tr>
<tr>
<td><code>groupid</code></td>
<td>groupid</td>
<td>is a string that uniquely identifies a set of consumers within the same consumer group. </td>
</tr>
<tr>
<td><code>socket.timeout.ms</code></td>
<td>30000</td>
<td>controls the socket timeout for network requests </td>
</tr>
<tr>
<td><code>socket.buffersize</code></td>
<td>64*1024</td>
<td>controls the socket receive buffer for network requests</td>
</tr>
<tr>
<td><code>fetch.size</code></td>
<td>300 * 1024</td>
<td>controls the number of bytes of messages to attempt to fetch in one request to the Kafka server</td>
</tr>
<tr>
<td><code>backoff.increment.ms</code></td>
<td>1000</td>
<td>This parameter avoids repeatedly polling a broker node which has no new data. We will backoff every time we get an empty set
from the broker for this time period</td>
</tr>
<tr>
<td><code>queuedchunks.max</code></td>
<td>100</td>
<td>the high level consumer buffers the messages fetched from the server internally in blocking queues. This parameter controls
the size of those queues</td>
</tr>
<tr>
<td><code>autocommit.enable</code></td>
<td>true</td>
<td>if set to true, the consumer periodically commits to zookeeper the latest consumed offset of each partition. </td>
</tr>
<tr>
<td><code>autocommit.interval.ms</code> </td>
<td>10000</td>
<td>is the frequency that the consumed offsets are committed to zookeeper. </td>
</tr>
<tr>
<td><code>autooffset.reset</code></td>
<td>smallest</td>
<td><ul>
<li> <code>smallest</code>: automatically reset the offset to the smallest offset available on the broker.</li>
<li> <code>largest</code> : automatically reset the offset to the largest offset available on the broker.</li>
<li> <code>anything else</code>: throw an exception to the consumer.</li>
</ul>
</td>
</tr>
<tr>
<td><code>consumer.timeout.ms</code></td>
<td>-1</td>
<td>By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.</td>
</tr>
<tr>
<td><code>rebalance.retries.max</code> </td>
<td>4</td>
<td>max number of retries during rebalance</td>
</tr>
<tr>
<td><code>mirror.topics.whitelist</code></td>
<td>""</td>
<td>Whitelist of topics for this mirror's embedded consumer to consume. At most one of whitelist/blacklist may be specified.</td>
</tr>
<tr>
<td><code>mirror.topics.blacklist</code></td>
<td>""</td>
<td>Topics to skip mirroring. At most one of whitelist/blacklist may be specified</td>
</tr>
<tr>
<td><code>mirror.consumer.numthreads</code></td>
<td>4</td>
<td>The number of threads to be used per topic for the mirroring consumer, by default</td>
</tr>
</table>
<h3> Important configuration properties for the producer: </h3>
<p>More details about producer configuration can be found in the scala class <code>kafka.producer.ProducerConfig</code>.</p>
<table class="data-table">
<tr>
<th>property</th>
<th>default</th>
<th>description</th>
</tr>
<tr>
<td><code>serializer.class</code></td>
<td>kafka.serializer.DefaultEncoder. This is a no-op encoder. The serialization of data to Message should be handled outside the Producer</td>
<td>class that implements the <code>kafka.serializer.Encoder&lt;T&gt;</code> interface, used to encode data of type T into a Kafka message </td>
</tr>
<tr>
<td><code>partitioner.class</code></td>
<td><code>kafka.producer.DefaultPartitioner&lt;T&gt;</code> - uses the partitioning strategy <code>hash(key)%num_partitions</code>. If key is null, then it picks a random partition. </td>
<td>class that implements the <code>kafka.producer.Partitioner&lt;K&gt;</code>, used to supply a custom partitioning strategy on the message key (of type K) that is specified through the <code>ProducerData&lt;K, T&gt;</code> object in the <code>kafka.producer.Producer&lt;T&gt;</code> send API</td>
</tr>
<tr>
<td><code>producer.type</code></td>
<td>sync</td>
<td>this parameter specifies whether the messages are sent asynchronously or not. Valid values are - <ul><li><code>async</code> for asynchronous batching send through <code>kafka.producer.AyncProducer</code></li><li><code>sync</code> for synchronous send through <code>kafka.producer.SyncProducer</code></li></ul></td>
</tr>
<tr>
<td><code>broker.list</code></td>
<td>null. Either this parameter or zk.connect needs to be specified by the user.</td>
<td>For bypassing zookeeper based auto partition discovery, use this config to pass in static broker and per-broker partition information. Format-<code>brokerid1:host1:port1, brokerid2:host2:port2.</code>
If you use this option, the <code>partitioner.class</code> will be ignored and each producer request will be routed to a random broker partition.</td>
</tr>
<tr>
<td><code>zk.connect</code></td>
<td>null. Either this parameter or broker.partition.info needs to be specified by the user</td>
<td>For using the zookeeper based automatic broker discovery, use this config to pass in the zookeeper connection url to the zookeeper cluster where the Kafka brokers are registered.</td>
</tr>
<tr>
<td><code>buffer.size</code></td>
<td>102400</td>
<td>the socket buffer size, in bytes</td>
</tr>
<tr>
<td><code>connect.timeout.ms</code></td>
<td>5000</td>
<td>the maximum time spent by <code>kafka.producer.SyncProducer</code> trying to connect to the kafka broker. Once it elapses, the producer throws an ERROR and stops.</td>
</tr>
<tr>
<td><code>socket.timeout.ms</code></td>
<td>30000</td>
<td>The socket timeout in milliseconds</td>
</tr>
<tr>
<td><code>reconnect.interval</code> </td>
<td>30000</td>
<td>the number of produce requests after which <code>kafka.producer.SyncProducer</code> tears down the socket connection to the broker and establishes it again; this and the following property are mainly used when the producer connects to the brokers through a VIP in a load balancer; they give the producer a chance to pick up the new broker periodically</td>
</tr>
<tr>
<td><code>reconnect.time.interval.ms</code> </td>
<td>10 * 1000 * 1000</td>
<td>the amount of time after which <code>kafka.producer.SyncProducer</code> tears down the socket connection to the broker and establishes it again; negative reconnect time interval means disabling this time-based reconnect feature</td>
</tr>
<tr>
<td><code>max.message.size</code> </td>
<td>1000000</td>
<td>the maximum number of bytes that the kafka.producer.SyncProducer can send as a single message payload</td>
</tr>
<tr>
<td><code>compression.codec</code></td>
<td>0 (No compression)</td>
<td>This parameter allows you to specify the compression codec for all data generated by this producer.</td>
</tr>
<tr>
<td><code>compressed.topics</code></td>
<td>null</td>
<td>This parameter allows you to set whether compression should be turned on for particular topics. If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. If the compression codec is NoCompressionCodec, compression is disabled for all topics. </td>
</tr>
<tr>
<td><code>zk.read.num.retries</code></td>
<td>3</td>
<td>The producer using the zookeeper software load balancer maintains a ZK cache that gets updated by the zookeeper watcher listeners. During some events like a broker bounce, the producer ZK cache can get into an inconsistent state, for a small time period. In this time period, it could end up picking a broker partition that is unavailable. When this happens, the ZK cache needs to be updated. This parameter specifies the number of times the producer attempts to refresh this ZK cache.</td>
</tr>
<tr>
<td colspan="3" style="text-align: center">
Options for Asynchronous Producers (<code>producer.type=async</code>)
</td>
</tr>
<tr>
<td><code>queue.time</code></td>
<td>5000</td>
<td>maximum time, in milliseconds, for buffering data on the producer queue. After it elapses, the buffered data in the producer queue is dispatched to the <code>event.handler</code>.</td>
</tr>
<tr>
<td><code>queue.size</code></td>
<td>10000</td>
<td>the maximum size of the blocking queue for buffering on the <code> kafka.producer.AsyncProducer</code></td>
</tr>
<tr>
<td><code>batch.size</code> </td>
<td>200</td>
<td>the number of messages batched at the producer, before being dispatched to the <code>event.handler</code></td>
</tr>
<tr>
<td><code>event.handler</code></td>
<td><code>kafka.producer.async.EventHandler&lt;T&gt;</code></td>
<td>the class that implements <code>kafka.producer.async.IEventHandler&lt;T&gt;</code> used to dispatch a batch of produce requests, using an instance of <code>kafka.producer.SyncProducer</code>.
</td>
</tr>
<tr>
<td><code>event.handler.props</code></td>
<td>null</td>
<td>the <code>java.util.Properties()</code> object used to initialize the custom <code>event.handler</code> through its <code>init()</code> API</td>
</tr>
<tr>
<td><code>callback.handler</code></td>
<td><code>null</code></td>
<td>the class that implements <code>kafka.producer.async.CallbackHandler&lt;T&gt;</code> used to inject callbacks at various stages of the <code>kafka.producer.AsyncProducer</code> pipeline.
</td>
</tr>
<tr>
<td><code>callback.handler.props</code></td>
<td>null</td>
<td>the <code>java.util.Properties()</code> object used to initialize the custom <code>callback.handler</code> through its <code>init()</code> API</td>
</tr>
</table>