Hadoop to Kafka Bridge

What is it?

The Hadoop to Kafka bridge is a way to publish data from Hadoop to Kafka. There are two possible mechanisms, varying from easy to difficult: writing a Pig script and writing messages in Avro format, or rolling your own job using the Kafka OutputFormat.

Note that there are no write-once semantics: any client of the data must handle messages in an idempotent manner. That is, because of node failures and Hadoop‘s failure recovery, it’s possible that the same message is published multiple times in the same push.

How do I use it?

With this bridge, Kafka topics are URIs and are specified as kafka://<kafka-server>/<kafka-topic>.

Pig

Pig bridge writes data in binary Avro format with one message created per input row. To push data via Kafka, store to the Kafka URI using AvroKafkaStorage with the Avro schema as its first argument. You'll need to register the appropriate Kafka JARs. Here is what an example Pig script looks like:

REGISTER hadoop-kafka-bridge-0.5.2.jar;
REGISTER avro-1.4.0.jar;
REGISTER piggybank.jar;
REGISTER kafka-0.5.2.jar;
REGISTER jackson-core-asl-1.5.5.jar;
REGISTER jackson-mapper-asl-1.5.5.jar;
REGISTER scala-library.jar;

member_info = LOAD 'member_info.tsv' as (member_id : int, name : chararray);
names = FOREACH member_info GENERATE name;
STORE member_info INTO 'kafka://my-broker:9092/member_info' USING kafka.bridge.AvroKafkaStorage('"string"');

That‘s it! The Pig StoreFunc makes use of AvroStorage in Piggybank to convert from Pig’s data model to the specified Avro schema.

Further, multi-store is possible with KafkaStorage, so you can easily write to multiple topics and brokers in the same job:

SPLIT member_info INTO early_adopters IF member_id < 1000, others IF member_id >= 1000;
STORE early_adopters INTO 'kafka://my-broker:9092/early_adopters' USING AvroKafkaStorage('$schema');
STORE others INTO 'kafka://my-broker:9092/others' USING AvroKafkaStorage('$schema');

KafkaOutputFormat

KafkaOutputFormat is a Hadoop OutputFormat for publishing data via Kafka. It uses the newer 0.20 mapreduce APIs and simply pushes bytes (i.e., BytesWritable). This is a lower-level method of publishing data, as it allows you to precisely control output.

Here is an example that publishes some input text. With KafkaOutputFormat, the key is a NullWritable and is ignored; only values are published. Speculative execution is turned off by the OutputFormat.

import kafka.bridge.hadoop.KafkaOutputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import java.io.IOException;

public class TextPublisher
{
  public static void main(String[] args) throws Exception
  {
    if (args.length != 2) {
      System.err.println("usage: <input path> <kafka output url>");
      return;
    }

    Job job = new Job();

    job.setJarByClass(TextPublisher.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(BytesWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(KafkaOutputFormat.class);

    job.setMapperClass(TheMapper.class);
    job.setNumReduceTasks(0);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    KafkaOutputFormat.setOutputPath(job, new Path(args[1]));

    if (!job.waitForCompletion(true)) {
      throw new RuntimeException("Job failed!");
    }
  }

  public static class TheMapper extends Mapper<Object, Object, NullWritable, BytesWritable>
  {
    @Override
    protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
    {
      context.write(NullWritable.get(), new BytesWritable(((Text) value).getBytes()));
    }
  }
}

What can I tune?

Normally, you needn't change any of these parameters:

  • kafka.output.queue_size: Bytes to queue in memory before pushing to the Kafka producer (i.e., the batch size). Default is 1010241024 (10MB).
  • kafka.output.connect_timeout: Connection timeout in milliseconds (see Kafka producer docs). Default is 30*1000 (30s).
  • kafka.output.reconnect_timeout: Milliseconds to wait until attempting reconnection (see Kafka producer docs). Default is 1000 (1s).
  • kafka.output.bufsize: Producer buffer size in bytes (see Kafka producer docs). Default is 64*1024 (64KB).
  • kafka.output.max_msgsize: Maximum message size in bytes (see Kafka producer docs). Default is 1024*1024 (1MB).

For easier debugging, the above values as well as the Kafka URI (kafka.output.url), the output server (kafka.output.server), the topic (kafka.output.topic), and the schema (kafka.output.schema) are injected into the job's configuration.