blob: 223cf6f9fc1d569a4dd30cbf2de40ad344e2a84e [file] [log] [blame]
/*
* Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.kafka;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* A kafka producer for testing
*/
public class KafkaTestProducer implements Runnable
{
// private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
private final kafka.javaapi.producer.Producer<String, String> producer;
private final String topic;
private int sendCount = 20;
// to generate a random int as a key for partition
private Random rand = new Random();
private boolean hasPartition = false;
public int getSendCount()
{
return sendCount;
}
public void setSendCount(int sendCount)
{
this.sendCount = sendCount;
}
private ProducerConfig createProducerConfig()
{
Properties props = new Properties();
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
// props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT );
if(hasPartition){
props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT + ",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER2_PORT);
props.setProperty("partitioner.class", KafkaTestPartitioner.class.getCanonicalName());
} else {
props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT );
}
props.setProperty("topic.metadata.refresh.interval.ms", "20000");
props.setProperty("producer.type", "async");
return new ProducerConfig(props);
}
public KafkaTestProducer(String topic)
{
this(topic, false);
}
public KafkaTestProducer(String topic, boolean hasPartition)
{
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
this.topic = topic;
this.hasPartition = hasPartition;
producer = new Producer<String, String>(createProducerConfig());
}
@Override
public void run()
{
// Create dummy message
int messageNo = 1;
while (messageNo <= sendCount) {
String messageStr = "Message_" + messageNo;
int k = rand.nextInt(100);
producer.send(new KeyedMessage<String, String>(topic, "" + k, messageStr));
messageNo++;
// logger.debug(String.format("Producing %s", messageStr));
}
// produce the end tuple to let the test input operator know it's done produce messages
producer.send(new KeyedMessage<String, String>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
if(hasPartition){
// Send end_tuple to other partition if it exist
producer.send(new KeyedMessage<String, String>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
}
}
public void close()
{
producer.close();
}
} // End of KafkaTestProducer