blob: 5395d38ff9accb1843c05354ef1b609352afc1f7 [file] [log] [blame]
This is a Hadoop job that pulls data from kafka server into HDFS.
It requires the following inputs from a configuration file
(test/test.properties is an example)
kafka.etl.topic : the topic to be fetched;
input : input directory containing topic offsets and
it can be generated by DataGenerator;
the number of files in this directory determines the
number of mappers in the hadoop job;
output : output directory containing kafka data and updated
topic offsets;
kafka.request.limit : it is used to limit the number events fetched.
KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat.
It fetches kafka data from the server. It starts from provided offsets
(specified by "input") and stops when it reaches the largest available offsets
or the specified limit (specified by "kafka.request.limit").
KafkaETLJob contains some helper functions to initialize job configuration.
SimpleKafkaETLJob sets up job properties and files Hadoop job.
SimpleKafkaETLMapper dumps kafka data into hdfs.
HOW TO RUN:
In order to run this, make sure the HADOOP_HOME environment variable points to
your hadoop installation directory.
1. Complile using "sbt" to create a package for hadoop consumer code.
./sbt package
2. Run the hadoop-setup.sh script that enables write permission on the
required HDFS directory
3. Produce test events in server and generate offset files
1) Start kafka server [ Follow the quick start -
http://sna-projects.com/kafka/quickstart.php ]
2) Update test/test.properties to change the following parameters:
kafka.etl.topic : topic name
event.count : number of events to be generated
kafka.server.uri : kafka server uri;
input : hdfs directory of offset files
3) Produce test events to Kafka server and generate offset files
./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
4. Fetch generated topic into HDFS:
1) Update test/test.properties to change the following parameters:
hadoop.job.ugi : id and group
input : input location
output : output location
kafka.request.limit: limit the number of events to be fetched;
-1 means no limitation.
hdfs.default.classpath.dir : hdfs location of jars
2) copy jars into hdfs
./copy-jars.sh ${hdfs.default.classpath.dir}
2) Fetch data
./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties