| This is a Hadoop job that pulls data from kafka server into HDFS. |
| |
| It requires the following inputs from a configuration file |
| (test/test.properties is an example) |
| |
| kafka.etl.topic : the topic to be fetched; |
| |
| input : input directory containing topic offsets and |
| it can be generated by DataGenerator; |
| the number of files in this directory determines the |
| number of mappers in the hadoop job; |
| |
| output : output directory containing kafka data and updated |
| topic offsets; |
| |
| kafka.request.limit : it is used to limit the number events fetched. |
| |
| KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat. |
| It fetches kafka data from the server. It starts from provided offsets |
| (specified by "input") and stops when it reaches the largest available offsets |
| or the specified limit (specified by "kafka.request.limit"). |
| |
| KafkaETLJob contains some helper functions to initialize job configuration. |
| |
| SimpleKafkaETLJob sets up job properties and files Hadoop job. |
| |
| SimpleKafkaETLMapper dumps kafka data into hdfs. |
| |
| HOW TO RUN: |
| In order to run this, make sure the HADOOP_HOME environment variable points to |
| your hadoop installation directory. |
| |
| 1. Complile using "sbt" to create a package for hadoop consumer code. |
| ./sbt package |
| |
| 2. Run the hadoop-setup.sh script that enables write permission on the |
| required HDFS directory |
| |
| 3. Produce test events in server and generate offset files |
| 1) Start kafka server [ Follow the quick start - |
| http://sna-projects.com/kafka/quickstart.php ] |
| |
| 2) Update test/test.properties to change the following parameters: |
| kafka.etl.topic : topic name |
| event.count : number of events to be generated |
| kafka.server.uri : kafka server uri; |
| input : hdfs directory of offset files |
| |
| 3) Produce test events to Kafka server and generate offset files |
| ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties |
| |
| 4. Fetch generated topic into HDFS: |
| 1) Update test/test.properties to change the following parameters: |
| hadoop.job.ugi : id and group |
| input : input location |
| output : output location |
| kafka.request.limit: limit the number of events to be fetched; |
| -1 means no limitation. |
| hdfs.default.classpath.dir : hdfs location of jars |
| |
| 2) copy jars into hdfs |
| ./copy-jars.sh ${hdfs.default.classpath.dir} |
| |
| 2) Fetch data |
| ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties |
| |