blob: 51663584816807b3b414e89d38145eb24046c4c3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.etl.impl;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLRequest;
import kafka.etl.Props;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.javaapi.producer.SyncProducer;
import kafka.message.Message;
import kafka.producer.SyncProducerConfig;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
/**
* Use this class to produce test events to Kafka server. Each event contains a
* random timestamp in text format.
*/
@SuppressWarnings("deprecation")
public class DataGenerator {
protected final static Random RANDOM = new Random(
System.currentTimeMillis());
protected Props _props;
protected SyncProducer _producer = null;
protected URI _uri = null;
protected String _topic;
protected int _count;
protected String _offsetsDir;
protected final int TCP_BUFFER_SIZE = 300 * 1000;
protected final int CONNECT_TIMEOUT = 20000; // ms
protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms
public DataGenerator(String id, Props props) throws Exception {
_props = props;
_topic = props.getProperty("kafka.etl.topic");
System.out.println("topics=" + _topic);
_count = props.getInt("event.count");
_offsetsDir = _props.getProperty("input");
// initialize kafka producer to generate count events
String serverUri = _props.getProperty("kafka.server.uri");
_uri = new URI (serverUri);
System.out.println("server uri:" + _uri.toString());
Properties producerProps = new Properties();
producerProps.put("host", _uri.getHost());
producerProps.put("port", String.valueOf(_uri.getPort()));
producerProps.put("buffer.size", String.valueOf(TCP_BUFFER_SIZE));
producerProps.put("connect.timeout.ms", String.valueOf(CONNECT_TIMEOUT));
producerProps.put("reconnect.interval", String.valueOf(RECONNECT_INTERVAL));
_producer = new SyncProducer(new SyncProducerConfig(producerProps));
}
public void run() throws Exception {
List<Message> list = new ArrayList<Message>();
for (int i = 0; i < _count; i++) {
Long timestamp = RANDOM.nextLong();
if (timestamp < 0) timestamp = -timestamp;
byte[] bytes = timestamp.toString().getBytes("UTF8");
Message message = new Message(bytes);
list.add(message);
}
// send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
_producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list));
// close the producer
_producer.close();
// generate offset files
generateOffsets();
}
protected void generateOffsets() throws Exception {
JobConf conf = new JobConf();
conf.set("hadoop.job.ugi", _props.getProperty("hadoop.job.ugi"));
conf.setCompressMapOutput(false);
Path outPath = new Path(_offsetsDir + Path.SEPARATOR + "1.dat");
FileSystem fs = outPath.getFileSystem(conf);
if (fs.exists(outPath)) fs.delete(outPath);
KafkaETLRequest request =
new KafkaETLRequest(_topic, "tcp://" + _uri.getHost() + ":" + _uri.getPort(), 0);
System.out.println("Dump " + request.toString() + " to " + outPath.toUri().toString());
byte[] bytes = request.toString().getBytes("UTF-8");
KafkaETLKey dummyKey = new KafkaETLKey();
SequenceFile.setCompressionType(conf, SequenceFile.CompressionType.NONE);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, outPath,
KafkaETLKey.class, BytesWritable.class);
writer.append(dummyKey, new BytesWritable(bytes));
writer.close();
}
public static void main(String[] args) throws Exception {
if (args.length < 1)
throw new Exception("Usage: - config_file");
Props props = new Props(args[0]);
DataGenerator job = new DataGenerator("DataGenerator", props);
job.run();
}
}