blob: 41168004eeb9e859eceec6377efcb1ac6317f25d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kafka;
import java.lang.reflect.Field;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.SynchronousDelegateProducer;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.util.CastUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The kafka component allows messages to be sent to (or consumed from) Apache Kafka brokers.
*/
@UriEndpoint(scheme = "kafka", title = "Kafka", syntax = "kafka:brokers", consumerClass = KafkaConsumer.class, label = "messaging")
public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
private static final Logger LOG = LoggerFactory.getLogger(KafkaEndpoint.class);
@UriParam
private KafkaConfiguration configuration = new KafkaConfiguration();
@UriParam
private boolean bridgeEndpoint;
public KafkaEndpoint() {
}
public KafkaEndpoint(String endpointUri, KafkaComponent component) {
super(endpointUri, component);
}
public KafkaConfiguration getConfiguration() {
if (configuration == null) {
configuration = createConfiguration();
}
return configuration;
}
public void setConfiguration(KafkaConfiguration configuration) {
this.configuration = configuration;
}
protected KafkaConfiguration createConfiguration() {
return new KafkaConfiguration();
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
KafkaConsumer consumer = new KafkaConsumer(this, processor);
configureConsumer(consumer);
return consumer;
}
@Override
public Producer createProducer() throws Exception {
KafkaProducer producer = createProducer(this);
if (isSynchronous()) {
return new SynchronousDelegateProducer(producer);
} else {
return producer;
}
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public boolean isMultipleConsumersSupported() {
return true;
}
private void loadParitionerClass(ClassResolver resolver, Properties props) {
replaceWithClass(props, "partitioner.class", resolver, Partitioner.class);
}
<T> Class<T> loadClass(Object o, ClassResolver resolver, Class<T> type) {
if (o == null || o instanceof Class) {
return CastUtils.cast((Class<?>)o);
}
String name = o.toString();
Class<T> c = resolver.resolveClass(name, type);
if (c == null) {
c = resolver.resolveClass(name, type, getClass().getClassLoader());
}
if (c == null) {
c = resolver.resolveClass(name, type, org.apache.kafka.clients.producer.KafkaProducer.class.getClassLoader());
}
return c;
}
void replaceWithClass(Properties props, String key, ClassResolver resolver, Class<?> type) {
Class<?> c = loadClass(props.get(key), resolver, type);
if (c != null) {
props.put(key, c);
}
}
public void updateClassProperties(Properties props) {
try {
if (getCamelContext() != null) {
ClassResolver resolver = getCamelContext().getClassResolver();
replaceWithClass(props, "key.serializer", resolver, Serializer.class);
replaceWithClass(props, "value.serializer", resolver, Serializer.class);
try {
//doesn't exist in old version of Kafka client so detect and only call the method if
//the field/config actually exists
Field f = ProducerConfig.class.getDeclaredField("PARTITIONER_CLASS_CONFIG");
if (f != null) {
loadParitionerClass(resolver, props);
}
} catch (NoSuchFieldException e) {
//ignore
} catch (SecurityException e) {
//ignore
}
//doesn't work as it needs to be List<String> :(
//replaceWithClass(props, "partition.assignment.strategy", resolver, PartitionAssignor.class);
}
} catch (Throwable t) {
//can ignore and Kafka itself might be able to handle it, if not, it will throw an exception
LOG.debug("Problem loading classes for Serializers", t);
}
}
public ExecutorService createExecutor() {
return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
}
public ExecutorService createProducerExecutor() {
int core = getConfiguration().getWorkerPoolCoreSize();
int max = getConfiguration().getWorkerPoolMaxSize();
return getCamelContext().getExecutorServiceManager().newThreadPool(this, "KafkaProducer[" + configuration.getTopic() + "]", core, max);
}
public Exchange createKafkaExchange(ConsumerRecord record) {
Exchange exchange = super.createExchange();
Message message = exchange.getIn();
message.setHeader(KafkaConstants.PARTITION, record.partition());
message.setHeader(KafkaConstants.TOPIC, record.topic());
message.setHeader(KafkaConstants.OFFSET, record.offset());
if (record.key() != null) {
message.setHeader(KafkaConstants.KEY, record.key());
}
message.setBody(record.value());
return exchange;
}
protected KafkaProducer createProducer(KafkaEndpoint endpoint) {
return new KafkaProducer(endpoint);
}
// Delegated properties from the configuration
//-------------------------------------------------------------------------
public Properties createProducerProperties() {
return configuration.createProducerProperties();
}
public void setValueDeserializer(String valueDeserializer) {
configuration.setValueDeserializer(valueDeserializer);
}
public void setRequestTimeoutMs(Integer requestTimeoutMs) {
configuration.setRequestTimeoutMs(requestTimeoutMs);
}
public void setProducerBatchSize(Integer producerBatchSize) {
configuration.setProducerBatchSize(producerBatchSize);
}
public void setRetryBackoffMs(Integer retryBackoffMs) {
configuration.setRetryBackoffMs(retryBackoffMs);
}
public void setNoOfMetricsSample(Integer noOfMetricsSample) {
configuration.setNoOfMetricsSample(noOfMetricsSample);
}
public String getMetricReporters() {
return configuration.getMetricReporters();
}
public void setSslKeystoreType(String sslKeystoreType) {
configuration.setSslKeystoreType(sslKeystoreType);
}
public void setSslCipherSuites(String sslCipherSuites) {
configuration.setSslCipherSuites(sslCipherSuites);
}
public void setClientId(String clientId) {
configuration.setClientId(clientId);
}
public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
configuration.setMetricsSampleWindowMs(metricsSampleWindowMs);
}
public String getKeyDeserializer() {
return configuration.getKeyDeserializer();
}
public int getConsumersCount() {
return configuration.getConsumersCount();
}
public String getSslKeyPassword() {
return configuration.getSslKeyPassword();
}
public void setSendBufferBytes(Integer sendBufferBytes) {
configuration.setSendBufferBytes(sendBufferBytes);
}
public Boolean isAutoCommitEnable() {
return configuration.isAutoCommitEnable();
}
public Integer getMaxBlockMs() {
return configuration.getMaxBlockMs();
}
public String getConsumerId() {
return configuration.getConsumerId();
}
public void setSslProtocol(String sslProtocol) {
configuration.setSslProtocol(sslProtocol);
}
public void setReceiveBufferBytes(Integer receiveBufferBytes) {
configuration.setReceiveBufferBytes(receiveBufferBytes);
}
public Boolean getCheckCrcs() {
return configuration.getCheckCrcs();
}
public void setGroupId(String groupId) {
configuration.setGroupId(groupId);
}
public String getCompressionCodec() {
return configuration.getCompressionCodec();
}
public String getGroupId() {
return configuration.getGroupId();
}
public void setSslTruststoreLocation(String sslTruststoreLocation) {
configuration.setSslTruststoreLocation(sslTruststoreLocation);
}
public String getKerberosInitCmd() {
return configuration.getKerberosInitCmd();
}
public String getAutoOffsetReset() {
return configuration.getAutoOffsetReset();
}
public void setAutoCommitEnable(Boolean autoCommitEnable) {
configuration.setAutoCommitEnable(autoCommitEnable);
}
public void setSerializerClass(String serializerClass) {
configuration.setSerializerClass(serializerClass);
}
public Integer getQueueBufferingMaxMessages() {
return configuration.getQueueBufferingMaxMessages();
}
public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
configuration.setSslEndpointAlgorithm(sslEndpointAlgorithm);
}
public void setRetries(Integer retries) {
configuration.setRetries(retries);
}
public void setAutoOffsetReset(String autoOffsetReset) {
configuration.setAutoOffsetReset(autoOffsetReset);
}
public Integer getSessionTimeoutMs() {
return configuration.getSessionTimeoutMs();
}
public Integer getBufferMemorySize() {
return configuration.getBufferMemorySize();
}
public String getKeySerializerClass() {
return configuration.getKeySerializerClass();
}
public void setSslProvider(String sslProvider) {
configuration.setSslProvider(sslProvider);
}
public void setFetchMinBytes(Integer fetchMinBytes) {
configuration.setFetchMinBytes(fetchMinBytes);
}
public Integer getAutoCommitIntervalMs() {
return configuration.getAutoCommitIntervalMs();
}
public void setKeySerializerClass(String keySerializerClass) {
configuration.setKeySerializerClass(keySerializerClass);
}
public Integer getConnectionMaxIdleMs() {
return configuration.getConnectionMaxIdleMs();
}
public Integer getReceiveBufferBytes() {
return configuration.getReceiveBufferBytes();
}
public void setBrokers(String brokers) {
configuration.setBrokers(brokers);
}
public String getValueDeserializer() {
return configuration.getValueDeserializer();
}
public String getPartitioner() {
return configuration.getPartitioner();
}
public String getSslTruststoreLocation() {
return configuration.getSslTruststoreLocation();
}
public void setBarrierAwaitTimeoutMs(int barrierAwaitTimeoutMs) {
configuration.setBarrierAwaitTimeoutMs(barrierAwaitTimeoutMs);
}
public String getSslProvider() {
return configuration.getSslProvider();
}
public void setMetricReporters(String metricReporters) {
configuration.setMetricReporters(metricReporters);
}
public void setSslTruststorePassword(String sslTruststorePassword) {
configuration.setSslTruststorePassword(sslTruststorePassword);
}
public void setMaxInFlightRequest(Integer maxInFlightRequest) {
configuration.setMaxInFlightRequest(maxInFlightRequest);
}
public String getTopic() {
return configuration.getTopic();
}
public int getBarrierAwaitTimeoutMs() {
return configuration.getBarrierAwaitTimeoutMs();
}
public Integer getFetchMinBytes() {
return configuration.getFetchMinBytes();
}
public Integer getHeartbeatIntervalMs() {
return configuration.getHeartbeatIntervalMs();
}
public void setKeyDeserializer(String keyDeserializer) {
configuration.setKeyDeserializer(keyDeserializer);
}
public Integer getMaxRequestSize() {
return configuration.getMaxRequestSize();
}
public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
configuration.setMetadataMaxAgeMs(metadataMaxAgeMs);
}
public String getSslKeystoreType() {
return configuration.getSslKeystoreType();
}
public void setKerberosRenewWindowFactor(Double kerberosRenewWindowFactor) {
configuration.setKerberosRenewWindowFactor(kerberosRenewWindowFactor);
}
public Integer getKerberosBeforeReloginMinTime() {
return configuration.getKerberosBeforeReloginMinTime();
}
public String getSslEnabledProtocols() {
return configuration.getSslEnabledProtocols();
}
public Integer getMaxInFlightRequest() {
return configuration.getMaxInFlightRequest();
}
public Integer getProducerBatchSize() {
return configuration.getProducerBatchSize();
}
public void setSslKeystorePassword(String sslKeystorePassword) {
configuration.setSslKeystorePassword(sslKeystorePassword);
}
public void setCheckCrcs(Boolean checkCrcs) {
configuration.setCheckCrcs(checkCrcs);
}
public int getConsumerStreams() {
return configuration.getConsumerStreams();
}
public void setConsumersCount(int consumersCount) {
configuration.setConsumersCount(consumersCount);
}
public int getBatchSize() {
return configuration.getBatchSize();
}
public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
configuration.setAutoCommitIntervalMs(autoCommitIntervalMs);
}
public void setSslTruststoreType(String sslTruststoreType) {
configuration.setSslTruststoreType(sslTruststoreType);
}
public Integer getConsumerRequestTimeoutMs() {
return configuration.getConsumerRequestTimeoutMs();
}
public String getSslKeystorePassword() {
return configuration.getSslKeystorePassword();
}
public void setSslKeyPassword(String sslKeyPassword) {
configuration.setSslKeyPassword(sslKeyPassword);
}
public String getRequestRequiredAcks() {
return configuration.getRequestRequiredAcks();
}
public Double getKerberosRenewWindowFactor() {
return configuration.getKerberosRenewWindowFactor();
}
public void setKerberosInitCmd(String kerberosInitCmd) {
configuration.setKerberosInitCmd(kerberosInitCmd);
}
public Integer getRetryBackoffMs() {
return configuration.getRetryBackoffMs();
}
public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
configuration.setSslTrustmanagerAlgorithm(sslTrustmanagerAlgorithm);
}
public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
configuration.setConsumerRequestTimeoutMs(consumerRequestTimeoutMs);
}
public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
configuration.setReconnectBackoffMs(reconnectBackoffMs);
}
public void setKerberosRenewJitter(Double kerberosRenewJitter) {
configuration.setKerberosRenewJitter(kerberosRenewJitter);
}
public String getSslKeystoreLocation() {
return configuration.getSslKeystoreLocation();
}
public Integer getNoOfMetricsSample() {
return configuration.getNoOfMetricsSample();
}
public String getSslKeymanagerAlgorithm() {
return configuration.getSslKeymanagerAlgorithm();
}
public void setConsumerId(String consumerId) {
configuration.setConsumerId(consumerId);
}
public String getClientId() {
return configuration.getClientId();
}
public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
configuration.setFetchWaitMaxMs(fetchWaitMaxMs);
}
public String getSslCipherSuites() {
return configuration.getSslCipherSuites();
}
public void setRequestRequiredAcks(String requestRequiredAcks) {
configuration.setRequestRequiredAcks(requestRequiredAcks);
}
public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
configuration.setConnectionMaxIdleMs(connectionMaxIdleMs);
}
public String getSslTrustmanagerAlgorithm() {
return configuration.getSslTrustmanagerAlgorithm();
}
public String getSslTruststorePassword() {
return configuration.getSslTruststorePassword();
}
public void setConsumerStreams(int consumerStreams) {
configuration.setConsumerStreams(consumerStreams);
}
public String getSslTruststoreType() {
return configuration.getSslTruststoreType();
}
public String getSecurityProtocol() {
return configuration.getSecurityProtocol();
}
public void setBufferMemorySize(Integer bufferMemorySize) {
configuration.setBufferMemorySize(bufferMemorySize);
}
public void setSaslKerberosServiceName(String saslKerberosServiceName) {
configuration.setSaslKerberosServiceName(saslKerberosServiceName);
}
public void setCompressionCodec(String compressionCodec) {
configuration.setCompressionCodec(compressionCodec);
}
public void setKerberosBeforeReloginMinTime(Integer kerberosBeforeReloginMinTime) {
configuration.setKerberosBeforeReloginMinTime(kerberosBeforeReloginMinTime);
}
public Integer getMetadataMaxAgeMs() {
return configuration.getMetadataMaxAgeMs();
}
public String getSerializerClass() {
return configuration.getSerializerClass();
}
public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
configuration.setSslKeymanagerAlgorithm(sslKeymanagerAlgorithm);
}
public void setMaxRequestSize(Integer maxRequestSize) {
configuration.setMaxRequestSize(maxRequestSize);
}
public Double getKerberosRenewJitter() {
return configuration.getKerberosRenewJitter();
}
public String getPartitionAssignor() {
return configuration.getPartitionAssignor();
}
public void setSecurityProtocol(String securityProtocol) {
configuration.setSecurityProtocol(securityProtocol);
}
public void setQueueBufferingMaxMessages(Integer queueBufferingMaxMessages) {
configuration.setQueueBufferingMaxMessages(queueBufferingMaxMessages);
}
public String getSaslKerberosServiceName() {
return configuration.getSaslKerberosServiceName();
}
public void setBatchSize(int batchSize) {
configuration.setBatchSize(batchSize);
}
public Integer getLingerMs() {
return configuration.getLingerMs();
}
public Integer getRetries() {
return configuration.getRetries();
}
public Integer getMaxPartitionFetchBytes() {
return configuration.getMaxPartitionFetchBytes();
}
public String getSslEndpointAlgorithm() {
return configuration.getSslEndpointAlgorithm();
}
public Integer getReconnectBackoffMs() {
return configuration.getReconnectBackoffMs();
}
public void setLingerMs(Integer lingerMs) {
configuration.setLingerMs(lingerMs);
}
public void setPartitionAssignor(String partitionAssignor) {
configuration.setPartitionAssignor(partitionAssignor);
}
public Integer getRequestTimeoutMs() {
return configuration.getRequestTimeoutMs();
}
public Properties createConsumerProperties() {
return configuration.createConsumerProperties();
}
public void setTopic(String topic) {
configuration.setTopic(topic);
}
public Integer getFetchWaitMaxMs() {
return configuration.getFetchWaitMaxMs();
}
public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
configuration.setSessionTimeoutMs(sessionTimeoutMs);
}
public void setSslEnabledProtocols(String sslEnabledProtocols) {
configuration.setSslEnabledProtocols(sslEnabledProtocols);
}
public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
configuration.setHeartbeatIntervalMs(heartbeatIntervalMs);
}
public void setMaxBlockMs(Integer maxBlockMs) {
configuration.setMaxBlockMs(maxBlockMs);
}
public void setSslKeystoreLocation(String sslKeystoreLocation) {
configuration.setSslKeystoreLocation(sslKeystoreLocation);
}
public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
configuration.setMaxPartitionFetchBytes(maxPartitionFetchBytes);
}
public void setPartitioner(String partitioner) {
configuration.setPartitioner(partitioner);
}
public String getBrokers() {
return configuration.getBrokers();
}
public Integer getMetricsSampleWindowMs() {
return configuration.getMetricsSampleWindowMs();
}
public Integer getSendBufferBytes() {
return configuration.getSendBufferBytes();
}
public String getSslProtocol() {
return configuration.getSslProtocol();
}
public boolean isBridgeEndpoint() {
return bridgeEndpoint;
}
/**
* If the option is true, then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message.
*/
public void setBridgeEndpoint(boolean bridgeEndpoint) {
this.bridgeEndpoint = bridgeEndpoint;
}
public void setWorkerPool(ExecutorService workerPool) {
configuration.setWorkerPool(workerPool);
}
public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
configuration.setWorkerPoolMaxSize(workerPoolMaxSize);
}
public Integer getWorkerPoolMaxSize() {
return configuration.getWorkerPoolMaxSize();
}
public Integer getWorkerPoolCoreSize() {
return configuration.getWorkerPoolCoreSize();
}
public ExecutorService getWorkerPool() {
return configuration.getWorkerPool();
}
public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
configuration.setWorkerPoolCoreSize(workerPoolCoreSize);
}
}