blob: b1a496c5ba94b5f2cb5179c5e7adca5ff930753f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kafka;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducer extends DefaultAsyncProducer {
private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint;
private ExecutorService workerPool;
private boolean shutdownWorkerPool;
public KafkaProducer(KafkaEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
Properties getProps() {
Properties props = endpoint.getConfiguration().createProducerProperties();
endpoint.updateClassProperties(props);
if (endpoint.getBrokers() != null) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, endpoint.getBrokers());
}
return props;
}
public org.apache.kafka.clients.producer.KafkaProducer getKafkaProducer() {
return kafkaProducer;
}
/**
* To use a custom {@link org.apache.kafka.clients.producer.KafkaProducer} instance.
*/
public void setKafkaProducer(org.apache.kafka.clients.producer.KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
public ExecutorService getWorkerPool() {
return workerPool;
}
public void setWorkerPool(ExecutorService workerPool) {
this.workerPool = workerPool;
}
@Override
protected void doStart() throws Exception {
Properties props = getProps();
if (kafkaProducer == null) {
kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
// if we are in asynchronous mode we need a worker pool
if (!endpoint.isSynchronous() && workerPool == null) {
workerPool = endpoint.createProducerExecutor();
// we create a thread pool so we should also shut it down
shutdownWorkerPool = true;
}
}
@Override
protected void doStop() throws Exception {
if (kafkaProducer != null) {
kafkaProducer.close();
}
if (shutdownWorkerPool && workerPool != null) {
endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool);
workerPool = null;
}
}
@SuppressWarnings("unchecked")
protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException {
String topic = endpoint.getTopic();
if (!endpoint.isBridgeEndpoint()) {
topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
}
if (topic == null) {
throw new CamelExchangeException("No topic key set", exchange);
}
final Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
final boolean hasPartitionKey = partitionKey != null;
final Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
final boolean hasMessageKey = messageKey != null;
Object msg = exchange.getIn().getBody();
Iterator<Object> iterator = null;
if (msg instanceof Iterable) {
iterator = ((Iterable<Object>)msg).iterator();
} else if (msg instanceof Iterator) {
iterator = (Iterator<Object>)msg;
}
if (iterator != null) {
final Iterator<Object> msgList = iterator;
final String msgTopic = topic;
return new Iterator<ProducerRecord>() {
@Override
public boolean hasNext() {
return msgList.hasNext();
}
@Override
public ProducerRecord next() {
if (hasPartitionKey && hasMessageKey) {
return new ProducerRecord(msgTopic, new Integer(partitionKey.toString()), messageKey, msgList.next());
} else if (hasMessageKey) {
return new ProducerRecord(msgTopic, messageKey, msgList.next());
}
return new ProducerRecord(msgTopic, msgList.next());
}
@Override
public void remove() {
msgList.remove();
}
};
}
ProducerRecord record;
if (hasPartitionKey && hasMessageKey) {
record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg);
} else if (hasMessageKey) {
record = new ProducerRecord(topic, messageKey, msg);
} else {
log.warn("No message key or partition key set");
record = new ProducerRecord(topic, msg);
}
return Collections.singletonList(record).iterator();
}
@Override
@SuppressWarnings("unchecked")
// Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it
public void process(Exchange exchange) throws Exception {
Iterator<ProducerRecord> c = createRecorder(exchange);
List<Future<ProducerRecord>> futures = new LinkedList<Future<ProducerRecord>>();
while (c.hasNext()) {
futures.add(kafkaProducer.send(c.next()));
}
for (Future<ProducerRecord> f : futures) {
//wait for them all to be sent
f.get();
}
}
@Override
@SuppressWarnings("unchecked")
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
Iterator<ProducerRecord> c = createRecorder(exchange);
KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
while (c.hasNext()) {
cb.increment();
kafkaProducer.send(c.next(), cb);
}
return cb.allSent();
} catch (Exception ex) {
exchange.setException(ex);
}
callback.done(true);
return true;
}
private final class KafkaProducerCallBack implements Callback {
private final Exchange exchange;
private final AsyncCallback callback;
private final AtomicInteger count = new AtomicInteger(1);
KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
this.exchange = exchange;
this.callback = callback;
}
void increment() {
count.incrementAndGet();
}
boolean allSent() {
if (count.decrementAndGet() == 0) {
//was able to get all the work done while queuing the requests
callback.done(true);
return true;
}
return false;
}
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
exchange.setException(e);
}
if (count.decrementAndGet() == 0) {
// use worker pool to continue routing the exchange
// as this thread is from Kafka Callback and should not be used by Camel routing
workerPool.submit(new Runnable() {
@Override
public void run() {
callback.done(false);
}
});
}
}
}
}