| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.etl.impl; |
| |
| import kafka.etl.KafkaETLInputFormat; |
| import kafka.etl.KafkaETLJob; |
| import kafka.etl.Props; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.RunningJob; |
| import org.apache.hadoop.mapred.TextOutputFormat; |
| |
| /** |
| * This is a simple Kafka ETL job which pull text events generated by |
| * DataGenerator and store them in hdfs |
| */ |
| @SuppressWarnings("deprecation") |
| public class SimpleKafkaETLJob { |
| |
| protected String _name; |
| protected Props _props; |
| protected String _input; |
| protected String _output; |
| protected String _topic; |
| |
| public SimpleKafkaETLJob(String name, Props props) throws Exception { |
| _name = name; |
| _props = props; |
| |
| _input = _props.getProperty("input"); |
| _output = _props.getProperty("output"); |
| |
| _topic = props.getProperty("kafka.etl.topic"); |
| } |
| |
| |
| protected JobConf createJobConf() throws Exception { |
| JobConf jobConf = KafkaETLJob.createJobConf("SimpleKafakETL", _topic, _props, getClass()); |
| |
| jobConf.setMapperClass(SimpleKafkaETLMapper.class); |
| KafkaETLInputFormat.setInputPaths(jobConf, new Path(_input)); |
| |
| jobConf.setOutputKeyClass(LongWritable.class); |
| jobConf.setOutputValueClass(Text.class); |
| jobConf.setOutputFormat(TextOutputFormat.class); |
| TextOutputFormat.setCompressOutput(jobConf, false); |
| Path output = new Path(_output); |
| FileSystem fs = output.getFileSystem(jobConf); |
| if (fs.exists(output)) fs.delete(output); |
| TextOutputFormat.setOutputPath(jobConf, output); |
| |
| jobConf.setNumReduceTasks(0); |
| return jobConf; |
| } |
| |
| public void execute () throws Exception { |
| JobConf conf = createJobConf(); |
| RunningJob runningJob = new JobClient(conf).submitJob(conf); |
| String id = runningJob.getJobID(); |
| System.out.println("Hadoop job id=" + id); |
| runningJob.waitForCompletion(); |
| |
| if (!runningJob.isSuccessful()) |
| throw new Exception("Hadoop ETL job failed! Please check status on http://" |
| + conf.get("mapred.job.tracker") + "/jobdetails.jsp?jobid=" + id); |
| } |
| |
| /** |
| * for testing only |
| * |
| * @param args |
| * @throws Exception |
| */ |
| public static void main(String[] args) throws Exception { |
| |
| if (args.length < 1) |
| throw new Exception("Usage: - config_file"); |
| |
| Props props = new Props(args[0]); |
| SimpleKafkaETLJob job = new SimpleKafkaETLJob("SimpleKafkaETLJob", |
| props); |
| job.execute(); |
| } |
| |
| } |