blob: ec082d14b7fe7845c620ed311233c3b31796eb88 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.kafka;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.common.util.Pair;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
/**
* This is a base implementation of a Kafka output operator,
* which, in most cases, guarantees to send tuples to Kafka MQ only once. 
* Subclasses should implement the methods for converting tuples into a format appropriate for Kafka.
* <p>
* Assuming messages kept in kafka are ordered by either key or value or keyvalue pair
* (For example, use timestamps as key), this Kafka OutputOperator always retrieve the last message from MQ as initial offset.
* So that replayed message wouldn't be sent to kafka again.
*
* This is not "perfect exact once" in 2 cases:
* 1 Multiple producers produce messages to same kafka partition
* 2 You have same message sent out and before kafka synchronized this message among all the brokers, the operator is
* started again.
*
* <br>
* Ports:<br>
* <b>Input</b>: One input port<br>
* <b>Output</b>: No output port<br>
* <br>
* Properties:<br>
* configProperties<br>
* <br>
* Compile time checks:<br>
* Class derived from has to implement 2 methods:<br>
* tupleToKeyValue() to convert input tuples to kafka key value objects<br>
* compareToLastMsg() to compare incoming tuple with the last received msg in kafka so that the operator could skip the received ones<br>
* <br>
* Run time checks:<br>
* None<br>
* <br>
* Benchmarks:<br>
* TBD<br>
* </p>
*
* @displayName Abstract Exactly Once Kafka Output
* @category Messaging
* @tags output operator
*
* @since 1.0.2
*/
public abstract class AbstractExactlyOnceKafkaOutputOperator<T, K, V> extends AbstractKafkaOutputOperator<K, V>
{
private Map<Integer, Pair<byte[], byte[]>> lastMsgs;
private transient Partitioner partitioner;
private transient int partitionNum = 1;
@Override
public void setup(OperatorContext context)
{
super.setup(context);
try {
String className = (String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER);
if (className != null) {
partitioner = (Partitioner)Class.forName(className).newInstance();
}
} catch (Exception e) {
throw new RuntimeException("Failed to initialize partitioner", e);
}
//read last message from kafka
initializeLastProcessingOffset();
}
/**
* This input port receives tuples that will be written out to Kafka.
*/
public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
{
@Override
public void process(T tuple)
{
Pair<K, V> keyValue = tupleToKeyValue(tuple);
int pid = 0;
if (partitioner != null) {
pid = partitioner.partition(keyValue.first, partitionNum);
}
Pair<byte[], byte[]> lastMsg = lastMsgs.get(pid);
if (lastMsg == null || compareToLastMsg(keyValue, lastMsg) > 0) {
getProducer().send(new KeyedMessage<K, V>(getTopic(), keyValue.first, keyValue.second));
sendCount++;
} else {
// ignore tuple because kafka has already had the tuple
logger.debug("Ingore tuple " + tuple);
return;
}
}
};
private void initializeLastProcessingOffset()
{
// read last received kafka message
TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST)), this.getTopic());
if (tm == null) {
throw new RuntimeException("Failed to retrieve topic metadata");
}
partitionNum = tm.partitionsMetadata().size();
lastMsgs = new HashMap<Integer, Pair<byte[],byte[]>>(partitionNum);
for (PartitionMetadata pm : tm.partitionsMetadata()) {
String leadBroker = pm.leader().host();
int port = pm.leader().port();
String clientName = this.getClass().getName().replace('$', '.') + "_Client_" + tm.topic() + "_" + pm.partitionId();
SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(), kafka.api.OffsetRequest.LatestTime(), clientName);
FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(tm.topic(), pm.partitionId(), readOffset - 1, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
Message m = messageAndOffset.message();
ByteBuffer payload = m.payload();
ByteBuffer key = m.key();
byte[] valueBytes = new byte[payload.limit()];
byte[] keyBytes = new byte[key.limit()];
payload.get(valueBytes);
key.get(keyBytes);
lastMsgs.put(pm.partitionId(), new Pair<byte[], byte[]>(keyBytes, valueBytes));
}
}
}
/**
* compare the incoming tuple with the last received message in kafka.
*
* @param tupleKeyValue
* @param lastReceivedKeyValue
* @return <=0 if tupleKeyValue is supposed to be before lastReceivedKeyValue
* >0 if tupleKeyValue is after the lastReceivedKeyValue
*/
protected abstract int compareToLastMsg(Pair<K, V> tupleKeyValue, Pair<byte[], byte[]> lastReceivedKeyValue);
/**
* Tell the operator how to convert a input tuple to a kafka key value pair
* @param tuple
* @return A kafka key value pair.
*/
protected abstract Pair<K, V> tupleToKeyValue(T tuple);
private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class);
}