| <h2 id="quickstart">Quick Start</h2> |
| |
| <h3> Step 1: Download the code </h3> |
| |
| <a href="../downloads.html" title="Kafka downloads">Download</a> a recent stable release. |
| |
| <pre> |
| <b>> tar xzf kafka-<VERSION>.tgz</b> |
| <b>> cd kafka-<VERSION></b> |
| <b>> ./sbt update</b> |
| <b>> ./sbt package</b> |
| </pre> |
| |
| <h3>Step 2: Start the server</h3> |
| |
| Kafka brokers and consumers use this for co-ordination. |
| <p> |
| First start the zookeeper server. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node zookeeper instance. |
| |
| <pre> |
| <b>> bin/zookeeper-server-start.sh config/zookeeper.properties</b> |
| [2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties |
| ... |
| </pre> |
| |
| Now start the Kafka server: |
| <pre> |
| <b>> bin/kafka-server-start.sh config/server.properties</b> |
| jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties |
| [2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager) |
| [2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper) |
| ... |
| </pre> |
| |
| <h3>Step 3: Send some messages</h3> |
| |
| Kafka comes with a command line client that will take input from standard in and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message. The topic <i>test</i> is created automatically when messages are sent to it. Omitting logging you should see something like this: |
| |
| <pre> |
| > <b>bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test</b> |
| This is a message |
| This is another message |
| </pre> |
| |
| <h3>Step 4: Start a consumer</h3> |
| |
| Kafka also has a command line consumer that will dump out messages to standard out. |
| |
| <pre> |
| <b>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning</b> |
| This is a message |
| This is another message |
| </pre> |
| <p> |
| If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal. |
| </p> |
| <p> |
| Both of these command line tools have additional options. Running the command with no arguments will display usage information documenting them in more detail. |
| </p> |
| |
| <h3>Step 5: Write some code</h3> |
| |
| Below is some very simple examples of using Kafka for sending messages, more complete examples can be found in the Kafka source code in the examples/ directory. |
| |
| <h4>Producer Code</h4> |
| |
| <h5>Producer API </h5> |
| |
| Here are examples of using the producer API - <code>kafka.producer.Producer<T></code> - |
| |
| <ol> |
| <li>First, start a local instance of the zookeeper server |
| <pre>./bin/zookeeper-server-start.sh config/zookeeper.properties</pre> |
| </li> |
| <li>Next, start a kafka broker |
| <pre>./bin/kafka-server-start.sh config/server.properties</pre> |
| </li> |
| <li>Now, create the producer with all configuration defaults and use zookeeper based broker discovery. |
| <pre> |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Properties; |
| import kafka.javaapi.producer.SyncProducer; |
| import kafka.javaapi.message.ByteBufferMessageSet; |
| import kafka.message.Message; |
| import kafka.producer.SyncProducerConfig; |
| |
| ... |
| |
| Properties props = new Properties(); |
| props.put(“zk.connect”, “127.0.0.1:2181”); |
| props.put("serializer.class", "kafka.serializer.StringEncoder"); |
| ProducerConfig config = new ProducerConfig(props); |
| Producer<String, String> producer = new Producer<String, String>(config); |
| </pre> |
| </li> |
| <li>Send a single message |
| <pre> |
| <small>// The message is sent to a randomly selected partition registered in ZK</small> |
| ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message"); |
| producer.send(data); |
| </pre> |
| </li> |
| <li>Send multiple messages to multiple topics in one request |
| <pre> |
| List<String> messages = new java.util.ArrayList<String>(); |
| messages.add("test-message1"); |
| messages.add("test-message2"); |
| ProducerData<String, String> data1 = new ProducerData<String, String>("test-topic1", messages); |
| ProducerData<String, String> data2 = new ProducerData<String, String>("test-topic2", messages); |
| List<ProducerData<String, String>> dataForMultipleTopics = new ArrayList<ProducerData<String, String>>(); |
| dataForMultipleTopics.add(data1); |
| dataForMultipleTopics.add(data2); |
| producer.send(dataForMultipleTopics); |
| </pre> |
| </li> |
| <li>Send a message with a partition key. Messages with the same key are sent to the same partition |
| <pre> |
| ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message"); |
| producer.send(data); |
| </pre> |
| </li> |
| <li>Use your custom partitioner |
| <p>If you are using zookeeper based broker discovery, <code>kafka.producer.Producer<T></code> routes your data to a particular broker partition based on a <code>kafka.producer.Partitioner<T></code>, specified through the <code>partitioner.class</code> config parameter. It defaults to <code>kafka.producer.DefaultPartitioner</code>. If you don't supply a partition key, then it sends each request to a random broker partition.</p> |
| <pre> |
| class MemberIdPartitioner extends Partitioner[MemberIdLocation] { |
| def partition(data: MemberIdLocation, numPartitions: Int): Int = { |
| (data.location.hashCode % numPartitions) |
| } |
| } |
| <small>// create the producer config to plug in the above partitioner</small> |
| Properties props = new Properties(); |
| props.put(“zk.connect”, “127.0.0.1:2181”); |
| props.put("serializer.class", "kafka.serializer.StringEncoder"); |
| props.put("partitioner.class", "xyz.MemberIdPartitioner"); |
| ProducerConfig config = new ProducerConfig(props); |
| Producer<String, String> producer = new Producer<String, String>(config); |
| </pre> |
| </li> |
| <li>Use custom Encoder |
| <p>The producer takes in a required config parameter <code>serializer.class</code> that specifies an <code>Encoder<T></code> to convert T to a Kafka Message. Default is the no-op kafka.serializer.DefaultEncoder. |
| Here is an example of a custom Encoder -</p> |
| <pre> |
| class TrackingDataSerializer extends Encoder<TrackingData> { |
| <small>// Say you want to use your own custom Avro encoding</small> |
| CustomAvroEncoder avroEncoder = new CustomAvroEncoder(); |
| def toMessage(event: TrackingData):Message = { |
| new Message(avroEncoder.getBytes(event)); |
| } |
| } |
| </pre> |
| If you want to use the above Encoder, pass it in to the "serializer.class" config parameter |
| <pre> |
| Properties props = new Properties(); |
| props.put("serializer.class", "xyz.TrackingDataSerializer"); |
| </pre> |
| </li> |
| <li>Using static list of brokers, instead of zookeeper based broker discovery |
| <p>Some applications would rather not depend on zookeeper. In that case, the config parameter <code>broker.list</code> |
| can be used to specify the list of all brokers in the Kafka cluster.- the list of all brokers in your Kafka cluster in the following format - |
| <code>broker_id1:host1:port1, broker_id2:host2:port2...</code></p> |
| <pre> |
| <small>// you can stop the zookeeper instance as it is no longer required</small> |
| ./bin/zookeeper-server-stop.sh |
| <small>// create the producer config object </small> |
| Properties props = new Properties(); |
| props.put(“broker.list”, “0:localhost:9092”); |
| props.put("serializer.class", "kafka.serializer.StringEncoder"); |
| ProducerConfig config = new ProducerConfig(props); |
| <small>// send a message using default partitioner </small> |
| Producer<String, String> producer = new Producer<String, String>(config); |
| List<String> messages = new java.util.ArrayList<String>(); |
| messages.add("test-message"); |
| ProducerData<String, String> data = new ProducerData<String, String>("test-topic", messages); |
| producer.send(data); |
| </pre> |
| </li> |
| <li>Use the asynchronous producer along with GZIP compression. This buffers writes in memory until either <code>batch.size</code> or <code>queue.time</code> is reached. After that, data is sent to the Kafka brokers |
| <pre> |
| Properties props = new Properties(); |
| props.put("zk.connect"‚ "127.0.0.1:2181"); |
| props.put("serializer.class", "kafka.serializer.StringEncoder"); |
| props.put("producer.type", "async"); |
| props.put("compression.codec", "1"); |
| ProducerConfig config = new ProducerConfig(props); |
| Producer<String, String> producer = new Producer<String, String>(config); |
| ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message"); |
| producer.send(data); |
| </pre |
| </li> |
| <li>Finally, the producer should be closed, through |
| <pre>producer.close();</pre> |
| </li> |
| </ol> |
| |
| <h5>Log4j appender </h5> |
| |
| Data can also be produced to a Kafka server in the form of a log4j appender. In this way, minimal code needs to be written in order to send some data across to the Kafka server. |
| Here is an example of how to use the Kafka Log4j appender - |
| |
| Start by defining the Kafka appender in your log4j.properties file. |
| <pre> |
| <small>// define the kafka log4j appender config parameters</small> |
| log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender |
| <small>// <b>REQUIRED</b>: set the hostname of the kafka server</small> |
| log4j.appender.KAFKA.Host=localhost |
| <small>// <b>REQUIRED</b>: set the port on which the Kafka server is listening for connections</small> |
| log4j.appender.KAFKA.Port=9092 |
| <small>// <b>REQUIRED</b>: the topic under which the logger messages are to be posted</small> |
| log4j.appender.KAFKA.Topic=test |
| <small>// the serializer to be used to turn an object into a Kafka message. Defaults to kafka.producer.DefaultStringEncoder</small> |
| log4j.appender.KAFKA.Serializer=kafka.test.AppenderStringSerializer |
| <small>// do not set the above KAFKA appender as the root appender</small> |
| log4j.rootLogger=INFO |
| <small>// set the logger for your package to be the KAFKA appender</small> |
| log4j.logger.your.test.package=INFO, KAFKA |
| </pre> |
| |
| Data can be sent using a log4j appender as follows - |
| |
| <pre> |
| Logger logger = Logger.getLogger([your.test.class]) |
| logger.info("message from log4j appender"); |
| </pre> |
| |
| If your log4j appender fails to send messages, please verify that the correct |
| log4j properties file is being used. You can add |
| <code>-Dlog4j.debug=true</code> to your VM parameters to verify this. |
| |
| <h4>Consumer Code</h4> |
| |
| The consumer code is slightly more complex as it enables multithreaded consumption: |
| |
| <pre> |
| // specify some consumer properties |
| Properties props = new Properties(); |
| props.put("zk.connect", "localhost:2181"); |
| props.put("zk.connectiontimeout.ms", "1000000"); |
| props.put("groupid", "test_group"); |
| |
| // Create the connection to the cluster |
| ConsumerConfig consumerConfig = new ConsumerConfig(props); |
| ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); |
| |
| // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume |
| Map<String, List<KafkaStream<Message>>> topicMessageStreams = |
| consumerConnector.createMessageStreams(ImmutableMap.of("test", 4)); |
| List<KafkaStream<Message>> streams = topicMessageStreams.get("test"); |
| |
| // create list of 4 threads to consume from each of the partitions |
| ExecutorService executor = Executors.newFixedThreadPool(4); |
| |
| // consume the messages in the threads |
| for(final KafkaStream<Message> stream: streams) { |
| executor.submit(new Runnable() { |
| public void run() { |
| for(MessageAndMetadata msgAndMetadata: stream) { |
| // process message (msgAndMetadata.message()) |
| } |
| } |
| }); |
| } |
| </pre> |
| |
| <h4>Hadoop Consumer</h4> |
| |
| <p> |
| Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers). |
| </p> |
| |
| <p> |
| Usage information on the hadoop consumer can be found <a href="https://github.com/kafka-dev/kafka/tree/master/contrib/hadoop-consumer">here</a>. |
| </p> |
| |
| <h4>Simple Consumer</h4> |
| |
| Kafka has a lower-level consumer api for reading message chunks directly from servers. Under most circumstances this should not be needed. But just in case, it's usage is as follows: |
| |
| <pre> |
| import kafka.api.FetchRequest; |
| import kafka.javaapi.consumer.SimpleConsumer; |
| import kafka.javaapi.message.ByteBufferMessageSet; |
| import kafka.message.Message; |
| import kafka.message.MessageSet; |
| import kafka.utils.Utils; |
| |
| ... |
| |
| <small>// create a consumer to connect to the kafka server running on localhost, port 9092, socket timeout of 10 secs, socket receive buffer of ~1MB</small> |
| SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000); |
| |
| long offset = 0; |
| while (true) { |
| <small>// create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB</small> |
| FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000); |
| |
| <small>// get the message set from the consumer and print them out</small> |
| ByteBufferMessageSet messages = consumer.fetch(fetchRequest); |
| for(MessageAndOffset msg : messages) { |
| System.out.println("consumed: " + Utils.toString(msg.message.payload(), "UTF-8")); |
| <small>// advance the offset after consuming each message</small> |
| offset = msg.offset; |
| } |
| } |
| </pre> |