blob: 52c253ccd1017a09b1ac16e233dca55bd14e83fc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.kafka;
import java.lang.reflect.Field;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang3.ClassUtils;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.lib.util.PojoUtils;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* POJOKafkaOutputOperator extends from AbstractKafkaOutputOperator receives the POJO
* from upstream and converts to Kafka Messages and writes to kafka topic. 
* <p>
* <br>
* Ports:<br>
* <b>Input</b>: Have only one input port<br>
* <b>Output</b>: No Output Port <br>
* <br>
* Properties:<br>
* <b>brokerList</b>: List of brokers in the form of Host1:Port1,Host2:Port2,Host3:port3,...<br>
* <b>keyField</b>: Specifies the field creates distribution of tuples to kafka partition. <br>
* <b>isBatchProcessing</b>: Specifies whether to write messages in batch or not. By default,
* the value is true <br>
* <b>batchSize</b>: Specifies the batch size.<br>
* <br>
* <br>
* </p>
*
* @displayName POJO Kafka Output
* @category Messaging
* @tags Output operator
*
*
* @since 3.3.0
*/
public class POJOKafkaOutputOperator extends AbstractKafkaOutputOperator<Object,Object>
{
@AutoMetric
private long outputMessagesPerSec;
@AutoMetric
private long outputBytesPerSec;
protected final String BROKER_KEY = "metadata.broker.list";
protected final String BATCH_NUM_KEY = "batch.num.messages";
protected final String PRODUCER_KEY = "producer.type";
protected final String QUEUE_BUFFER_KEY = "queue.buffering.max.ms";
protected final String ASYNC_PRODUCER_TYPE = "async";
private long messageCount;
private long byteCount;
private String brokerList;
private double windowTimeSec;
private String keyField = "";
protected boolean isBatchProcessing = true;
@Min(2)
protected int batchSize;
protected transient PojoUtils.Getter keyMethod;
protected transient Class<?> pojoClass;
public final transient DefaultInputPort<Object> inputPort = new DefaultInputPort<Object>() {
@Override
public void setup(Context.PortContext context)
{
if (context.getAttributes().contains(Context.PortContext.TUPLE_CLASS)) {
pojoClass = context.getAttributes().get(Context.PortContext.TUPLE_CLASS);
}
}
@Override
public void process(Object tuple)
{
processTuple(tuple);
}
};
/**
* setup producer configuration.
* @return ProducerConfig
*/
@Override
protected ProducerConfig createKafkaProducerConfig()
{
if (brokerList != null) {
getConfigProperties().setProperty(BROKER_KEY, brokerList);
}
if (isBatchProcessing) {
if (batchSize != 0) {
getConfigProperties().setProperty(BATCH_NUM_KEY, String.valueOf(batchSize));
}
getConfigProperties().setProperty(PRODUCER_KEY, ASYNC_PRODUCER_TYPE);
}
return super.createKafkaProducerConfig();
}
@Override
public void setup(Context.OperatorContext context)
{
if (isBatchProcessing) {
getConfigProperties().setProperty(QUEUE_BUFFER_KEY, String.valueOf(context.getValue(
Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)));
}
super.setup(context);
windowTimeSec = (context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)
* context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS) * 1.0) / 1000.0;
if (pojoClass != null && keyField != "") {
try {
keyMethod = generateGetterForKeyField();
} catch (NoSuchFieldException e) {
throw new RuntimeException("Field " + keyField + " is invalid: " + e);
}
}
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
outputMessagesPerSec = 0;
outputBytesPerSec = 0;
messageCount = 0;
byteCount = 0;
}
/**
* Write the incoming tuple to Kafka
* @param tuple incoming tuple
*/
protected void processTuple(Object tuple)
{
// Get the getter method from the keyField
if (keyMethod == null && keyField != "") {
pojoClass = tuple.getClass();
try {
keyMethod = generateGetterForKeyField();
} catch (NoSuchFieldException e) {
throw new RuntimeException("Field " + keyField + " is invalid: " + e);
}
}
// Convert the given tuple to KeyedMessage
KeyedMessage msg;
if (keyMethod != null) {
msg = new KeyedMessage(getTopic(), keyMethod.get(tuple), tuple);
} else {
msg = new KeyedMessage(getTopic(), tuple, tuple);
}
getProducer().send(msg);
messageCount++;
if (tuple instanceof byte[]) {
byteCount += ((byte[])tuple).length;
}
}
@Override
public void endWindow()
{
super.endWindow();
outputBytesPerSec = (long)(byteCount / windowTimeSec);
outputMessagesPerSec = (long)(messageCount / windowTimeSec);
}
private PojoUtils.Getter generateGetterForKeyField() throws NoSuchFieldException, SecurityException
{
Field f = pojoClass.getDeclaredField(keyField);
Class c = ClassUtils.primitiveToWrapper(f.getType());
PojoUtils.Getter classGetter = PojoUtils.createGetter(pojoClass, keyField, c);
return classGetter;
}
/**
* Returns the broker list of kafka clusters
* @return the broker list
*/
public String getBrokerList()
{
return brokerList;
}
/**
* Sets the broker list with the given list
* @param brokerList
*/
public void setBrokerList(@NotNull String brokerList)
{
this.brokerList = brokerList;
}
/**
* Specifies whether want to write in batch or not.
* @return isBatchProcessing
*/
public boolean isBatchProcessing()
{
return isBatchProcessing;
}
/**
* Specifies whether want to write in batch or not.
* @param batchProcessing given batchProcessing
*/
public void setBatchProcessing(boolean batchProcessing)
{
isBatchProcessing = batchProcessing;
}
/**
* Returns the batch size
* @return batch size
*/
public int getBatchSize()
{
return batchSize;
}
/**
* Sets the batch size
* @param batchSize batch size
*/
public void setBatchSize(int batchSize)
{
this.batchSize = batchSize;
}
/**
* Returns the key field
* @return the key field
*/
public String getKeyField()
{
return keyField;
}
/**
* Sets the key field which specifies the messages writes to Kafka based on this key.
* @param keyField the key field
*/
public void setKeyField(String keyField)
{
this.keyField = keyField;
}
}