<h3><a id="apidesign">5.1 API Design</a></h3>

<h4>Producer APIs</h4>

<p>
The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>. 
<pre>
class Producer<T> {
	
  /* Sends the data, partitioned by key to the topic using either the */
  /* synchronous or the asynchronous producer */
  public void send(kafka.javaapi.producer.ProducerData&lt;K,V&gt; producerData);

  /* Sends a list of data, partitioned by key to the topic using either */
  /* the synchronous or the asynchronous producer */
  public void send(java.util.List&lt;kafka.javaapi.producer.ProducerData&lt;K,V&gt;&gt; producerData);

  /* Closes the producer and cleans up */	
  public void close();

}
</pre>

The goal is to expose all the producer functionality through a single API to the client.  

The new producer -
<ul>
<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - 	
<p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.EventHandler</code> serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the <code>event.handler</code> config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the <code>kafka.producer.async.CallbackHandler</code> interface and setting <code>callback.handler</code> config parameter to that class.
</p>
</li>
<li>handles the serialization of data through a user-specified <code>Encoder</code> - 
<pre>
interface Encoder&lt;T&gt; {
  public Message toMessage(T data);
}
</pre>
<p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
</li>
<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> - 
<p>
The routing decision is influenced by the <code>kafka.producer.Partitioner</code>. 
<pre>
interface Partitioner&lt;T&gt; {
   int partition(T key, int numPartitions);
}
</pre>
The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter.	
</p>
</li>
</ul>
</p>

<h4>Consumer APIs</h4>
<p>
We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.
</p>
<p>
The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. The high-level API also provides the ability to subscribe to topics that match a filter expression (i.e., either a whitelist or a blacklist regular expression).
</p>

<h5>Low-level API</h5>
<pre>
class SimpleConsumer {
	
  /* Send fetch request to a broker and get back a set of messages. */ 
  public ByteBufferMessageSet fetch(FetchRequest request);

  /* Send a list of fetch requests to a broker and get back a response set. */ 
  public MultiFetchResponse multifetch(List&lt;FetchRequest&gt; fetches);

  /**
   * Get a list of valid offsets (up to maxSize) before the given time.
   * The result is a list of offsets, in descending order.
   * @param time: time in millisecs,
   *              if set to OffsetRequest$.MODULE$.LATIEST_TIME(), get from the latest offset available.
   *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
   */
  public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
</pre>

The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state.

<h5>High-level API</h5>
<pre>

/* create a connection to the cluster */ 
ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {
	
  /**
   * This method is used to get a list of KafkaStreams, which are iterators over
   * MessageAndMetadata objects from which you can obtain messages and their
   * associated metadata (currently only topic).
   *  Input: a map of &lt;topic, #streams&gt;
   *  Output: a map of &lt;topic, list of message streams&gt;
   */
  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap); 

  /**
   * You can also obtain a list of KafkaStreams, that iterate over messages
   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
   * whitelist or a blacklist which is a standard Java regex.)
   */
  public List&lt;KafkaStream&gt; createMessageStreamsByFilter(
      TopicFilter topicFilter, int numStreams);

  /* Commit the offsets of all messages consumed so far. */
  public commitOffsets()
  
  /* Shut down the connector */
  public shutdown()
}
</pre>
<p>
This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.
</p>
<p>
The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).
</p>

<h3><a id="networklayer">5.2 Network Layer</a></h3>
<p>
The network layer is a fairly straight-forward NIO server, and will not be described in great detail. The sendfile implementation is done by giving the <code>MessageSet</code> interface a <code>writeTo</code> method. This allows the file-backed message set to use the more efficient <code>transferTo</code> implementation instead of an in-process buffered write. The threading model is a single acceptor thread and <i>N</i> processor threads which handle a fixed number of connections each. This design has been pretty thoroughly tested <a href="http://sna-projects.com/blog/2009/08/introducing-the-nio-socketserver-implementation">elsewhere</a> and found to be simple to implement and fast. The protocol is kept quite simple to allow for future implementation of clients in other languages.
</p>
<h3><a id="messages">5.3 Messages</a></h3>
<p>
Messages consist of a fixed-size header and variable length opaque byte array payload. The header contains a format version and a CRC32 checksum to detect corruption or truncation. Leaving the payload opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The <code>MessageSet</code> interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO <code>Channel</code>.

<h3><a id="messageformat">5.4 Message Format</a></h3>

<pre>
	/** 
	 * A message. The format of an N byte message is the following: 
	 * 
	 * If magic byte is 0 
	 * 
	 * 1. 1 byte "magic" identifier to allow format changes 
	 * 
	 * 2. 4 byte CRC32 of the payload 
	 * 
	 * 3. N - 5 byte payload 
	 * 
	 * If magic byte is 1 
	 * 
	 * 1. 1 byte "magic" identifier to allow format changes 
	 * 
	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
	 * 
	 * 3. 4 byte CRC32 of the payload 
	 * 
	 * 4. N - 6 byte payload 
	 * 
	 */
</pre>
</p>
<h3><a id="log">5.5 Log</a></h3>
<p>
A log for a topic named "my_topic" with two partitions consists of two directories (namely <code>my_topic_0</code> and <code>my_topic_1</code>) populated with data files containing the messages for that topic. The format of the log files is a sequence of "log entries""; each log entry is a 4 byte integer <i>N</i> storing the message length which is followed by the <i>N</i> message bytes. Each message is uniquely identified by a 64-bit integer <i>offset</i> giving the byte position of the start of this message in the stream of all messages ever sent to that topic on that partition. The on-disk format of each message is given below. Each log file is named with the offset of the first message it contains. So the first file created will be 00000000000.kafka, and each additional file will have an integer name roughly <i>S</i> bytes from the previous file where <i>S</i> is the max log file size given in the configuration.
</p>
<p>
The exact binary format for messages is versioned and maintained as a standard interface so message sets can be transfered between producer, broker, and client without recopying or conversion when desirable. This format is as follows:
</p>
<pre>
On-disk format of a message

message length : 4 bytes (value: 1+4+n) 
"magic" value  : 1 byte
crc            : 4 bytes
payload        : n bytes
</pre>
<p>
The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural&mdash;both after all are monotonically increasing integers unique to a partition. Since the offset is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach.
</p>
<img src="../images/kafka_log.png">
<h4>Writes</h4>
<p>
The log allows serial appends which always go to the last file. This file is rolled over to a fresh file when it reaches a configurable size (say 1GB). The log takes two configuration parameter <i>M</i> which gives the number of messages to write before forcing the OS to flush the file to disk, and <i>S</i> which gives a number of seconds after which a flush is forced. This gives a durability guarantee of losing at most <i>M</i> messages or <i>S</i> seconds of data in the event of a system crash.
</p>
<h4>Reads</h4>
<p>
Reads are done by giving the 64-bit logical offset of a message and an <i>S</i>-byte max chunk size. This will return an iterator over the messages contained in the <i>S</i>-byte buffer. <i>S</i> is intended to be larger than any single message, but in the event of an abnormally large message, the read can be retried multiple times, each time doubling the buffer size, until the message is read successfully. A maximum message and buffer size can be specified to make the server reject messages larger than some size, and to give a bound to the client on the maximum it need ever read to get a complete message. It is likely that the read buffer ends with a partial message, this is easily detected by the size delimiting.
</p>
<p>
The actual process of reading from an offset requires first locating the log segment file in which the data is stored, calculating the file-specific offset from the global offset value, and then reading from that file offset. The search is done as a simple binary search variation against an in-memory range maintained for each file.
</p>
<p>
The log provides the capability of getting the most recently written message to allow clients to start subscribing as of "right now". This is also useful in the case the consumer fails to consume its data within its SLA-specified number of days. In this case when the client attempts to consume a non-existant offset it is given an OutOfRangeException and can either reset itself or fail as appropriate to the use case.
</p>

<p> The following is the format of the results sent to the consumer.

<pre>
MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes
</pre>

<pre>
MultiMessageSetSend (multiFetch result)

total length       : 4 bytes
error code         : 2 bytes
messageSetSend 1
...
messageSetSend n
</pre>

<h4>Deletes</h4>
<p>
Data is deleted one log segment at a time. The log manager allows pluggable delete policies to choose which files are eligible for deletion. The current policy deletes any log with a modification time of more than <i>N</i> days ago, though a policy which retained the last <i>N</i> GB could also be useful. To avoid locking reads while still allowing deletes that modify the segment list we use a copy-on-write style segment list implementation that provides consistent views to allow a binary search to proceed on an immutable static snapshot view of the log segments while deletes are progressing.
</p>
<h4>Guarantees</h4>
<p>
The log provides a configuration parameter <i>M</i> which controls the maximum number of messages that are written before forcing a flush to disk. On startup a log recovery process is run that iterates over all messages in the newest log segment and verifies that each message entry is valid. A message entry is valid if the sum of its size and offset are less than the length of the file AND the CRC32 of the message payload matches the CRC stored with the message. In the event corruption is detected the log is truncated to the last valid offset.
</p>
<p>
Note that two kinds of corruption must be handled: truncation in which an unwritten block is lost due to a crash, and corruption in which a nonsense block is ADDED to the file. The reason for this is that in general the OS makes no guarantee of the write order between the file inode and the actual block data so in addition to losing written data the file can gain nonsense data if the inode is updated with a new size but a crash occurs before the block containing that data is not written. The CRC detects this corner case, and prevents it from corrupting the log (though the unwritten messages are, of course, lost).
</p>

<h3><a id="distributionimpl">5.6 Distribution</a></h3>
<h4>ZooKeeper Directories</h4>
<p>
The following gives the ZooKeeper structures and algorithms used for co-ordination between consumers and brokers.
</p>

<h4>Notation</h4>
<p>
When an element in a path is denoted [xyz], that means that the value of xyz is not fixed and there is in fact a ZooKeeper znode for each possible value of xyz. For example /topics/[topic] would be a directory named /topics containing a sub-directory for each topic name. Numerical ranges are also given such as [0...5] to indicate the subdirectories 0, 1, 2, 3, 4. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world".
</p>

<h4>Broker Node Registry</h4>
<pre>
/brokers/ids/[0...N] --> host:port (ephemeral node)
</pre>
<p>
This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.
</p>
<p>
Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).	
</p>
<h4>Broker Topic Registry</h4>
<pre>
/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)
</pre>

<p>
Each broker registers itself under the topics it maintains and stores the number of partitions for that topic.
</p>

<h4>Consumers and Consumer Groups</h4>
<p>
Consumers of topics also register themselves in ZooKeeper, in order to balance the consumption of data and track their offsets in each partition for each broker they consume from.
</p>

<p>
Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. 
For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to.
</p>

<p>
The consumers in a group divide up the partitions as fairly as possible, each partition is consumed by exactly one consumer in a consumer group.
</p>

<h4>Consumer Id Registry</h4>
<p>
In addition to the group_id which is shared by all consumers in a group, each consumer is given a transient, unique consumer_id (of the form hostname:uuid) for identification purposes. Consumer ids are registered in the following directory.
<pre>
/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)
</pre>
Each of the consumers in the group registers under its group and creates a znode with its consumer_id. The value of the znode contains a map of &lt;topic, #streams&gt;. This id is simply used to identify each of the consumers which is currently active within a group. This is an ephemeral node so it will disappear if the consumer process dies.
</p>

<h4>Consumer Offset Tracking</h4>
<p>
Consumers track the maximum offset they have consumed in each partition. This value is stored in a ZooKeeper directory
</p>
<pre>
/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)
</pre>

<h4>Partition Owner registry</h4>

<p>
Each broker partition is consumed by a single consumer within a given consumer group. The consumer must establish its ownership of a given partition before any consumption can begin. To establish its ownership, a consumer writes its own id in an ephemeral node under the particular broker partition it is claiming.
</p>

<pre>
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
</pre>

<h4>Broker node registration</h4>

<p>
The broker nodes are basically independent, so they only publish information about what they have. When a broker joins, it registers itself under the broker node registry directory and writes information about its host name and port. The broker also register the list of existing topics and their logical partitions in the broker topic registry. New topics are registered dynamically when they are created on the broker.
</p>

<h4>Consumer registration algorithm</h4>

<p>
When a consumer starts, it does the following:
<ol>
   <li> Register itself in the consumer id registry under its group.
   </li>
   <li> Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.)
   </li>
   <li> Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. (Each change triggers rebalancing among all consumers in all consumer groups.) </li>
   <li> If the consumer creates a message stream using a topic filter, it also registers a watch on changes (new topics being added) under the broker topic registry. (Each change will trigger re-evaluation of the available topics to determine which topics are allowed by the topic filter. A new allowed topic will trigger rebalancing among all consumers within the consumer group.)</li>
   <li> Force itself to rebalance within in its consumer group.
   </li>
</ol>
</p>

<h4>Consumer rebalancing algorithm</h4>
<p>
The consumer rebalancing algorithms allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Consumer rebalancing is triggered on each addition or removal of both broker nodes and other consumers within the same group. For a given topic and a given consumer group, broker partitions are divided evenly among consumers within the group. A partition is always consumed by a single consumer. This design simplifies the implementation. Had we allowed a partition to be concurrently consumed by multiple consumers, there would be contention on the partition and some kind of locking would be required. If there are more consumers than partitions, some consumers won't get any data at all. During rebalancing, we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.
</p>
<p>
Each consumer does the following during rebalancing:
</p>
<pre>
   1. For each topic T that C<sub>i</sub> subscribes to 
   2.   let P<sub>T</sub> be all partitions producing topic T
   3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T
   4.   sort P<sub>T</sub> (so partitions on the same broker are clustered together)
   5.   sort C<sub>G</sub>
   6.   let i be the index position of C<sub>i</sub> in C<sub>G</sub> and let N = size(P<sub>T</sub>)/size(C<sub>G</sub>)
   7.   assign partitions from i*N to (i+1)*N - 1 to consumer C<sub>i</sub>
   8.   remove current entries owned by C<sub>i</sub> from the partition owner registry
   9.   add newly assigned partitions to the partition owner registry
        (we may need to re-try this until the original partition owner releases its ownership)
</pre>
<p>
When rebalancing is triggered at one consumer, rebalancing should be triggered in other consumers within the same group about the same time.
</p>
