blob: bcdd2f26f88b0661797e1e664f52347c59f04103 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.kafka.springboot;
import java.util.concurrent.ExecutorService;
import javax.annotation.Generated;
import org.apache.camel.component.kafka.KafkaManualCommitFactory;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
import org.apache.camel.util.jsse.SSLContextParameters;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
/**
* The kafka component allows messages to be sent to (or consumed from) Apache
* Kafka brokers.
*
* Generated by camel-package-maven-plugin - do not edit this file!
*/
@Generated("org.apache.camel.maven.packaging.SpringBootAutoConfigurationMojo")
@ConfigurationProperties(prefix = "camel.component.kafka")
public class KafkaComponentConfiguration
extends
ComponentConfigurationPropertiesCommon {
/**
* Allows to pre-configure the Kafka component with common options that the
* endpoints will reuse.
*/
private KafkaConfigurationNestedConfiguration configuration;
/**
* URL of the Kafka brokers to use. The format is host1:port1,host2:port2,
* and the list can be a subset of brokers or a VIP pointing to a subset of
* brokers. This option is known as bootstrap.servers in the Kafka
* documentation.
*/
private String brokers;
/**
* To use a shared custom worker pool for continue routing Exchange after
* kafka server has acknowledge the message that was sent to it from
* KafkaProducer using asynchronous non-blocking processing. If using this
* option then you must handle the lifecycle of the thread pool to shut the
* pool down when no longer needed.
*/
private ExecutorService workerPool;
/**
* Enable usage of global SSL context parameters.
*/
private Boolean useGlobalSslContextParameters = false;
/**
* This options controls what happens when a consumer is processing an
* exchange and it fails. If the option is false then the consumer continues
* to the next message and processes it. If the option is true then the
* consumer breaks out, and will seek back to offset of the message that
* caused a failure, and then re-attempt to process this message. However
* this can lead to endless processing of the same message if its bound to
* fail every time, eg a poison message. Therefore its recommended to deal
* with that for example by using Camel's error handler.
*/
private Boolean breakOnFirstError = false;
/**
* Whether to allow doing manual commits via KafkaManualCommit. If this
* option is enabled then an instance of KafkaManualCommit is stored on the
* Exchange message header, which allows end users to access this API and
* perform manual offset commits via the Kafka consumer.
*/
private Boolean allowManualCommit = false;
/**
* Factory to use for creating KafkaManualCommit instances. This allows to
* plugin a custom factory to create custom KafkaManualCommit instances in
* case special logic is needed when doing manual commits that deviates from
* the default implementation that comes out of the box.
*/
@NestedConfigurationProperty
private KafkaManualCommitFactory kafkaManualCommitFactory;
/**
* Whether the component should resolve property placeholders on itself when
* starting. Only properties which are of String type can use property
* placeholders.
*/
private Boolean resolvePropertyPlaceholders = true;
public KafkaConfigurationNestedConfiguration getConfiguration() {
return configuration;
}
public void setConfiguration(
KafkaConfigurationNestedConfiguration configuration) {
this.configuration = configuration;
}
public String getBrokers() {
return brokers;
}
public void setBrokers(String brokers) {
this.brokers = brokers;
}
public ExecutorService getWorkerPool() {
return workerPool;
}
public void setWorkerPool(ExecutorService workerPool) {
this.workerPool = workerPool;
}
public Boolean getUseGlobalSslContextParameters() {
return useGlobalSslContextParameters;
}
public void setUseGlobalSslContextParameters(
Boolean useGlobalSslContextParameters) {
this.useGlobalSslContextParameters = useGlobalSslContextParameters;
}
public Boolean getBreakOnFirstError() {
return breakOnFirstError;
}
public void setBreakOnFirstError(Boolean breakOnFirstError) {
this.breakOnFirstError = breakOnFirstError;
}
public Boolean getAllowManualCommit() {
return allowManualCommit;
}
public void setAllowManualCommit(Boolean allowManualCommit) {
this.allowManualCommit = allowManualCommit;
}
public KafkaManualCommitFactory getKafkaManualCommitFactory() {
return kafkaManualCommitFactory;
}
public void setKafkaManualCommitFactory(
KafkaManualCommitFactory kafkaManualCommitFactory) {
this.kafkaManualCommitFactory = kafkaManualCommitFactory;
}
public Boolean getResolvePropertyPlaceholders() {
return resolvePropertyPlaceholders;
}
public void setResolvePropertyPlaceholders(
Boolean resolvePropertyPlaceholders) {
this.resolvePropertyPlaceholders = resolvePropertyPlaceholders;
}
public static class KafkaConfigurationNestedConfiguration {
public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.kafka.KafkaConfiguration.class;
/**
* Whether the topic is a pattern (regular expression). This can be used
* to subscribe to dynamic number of topics matching the pattern.
*/
private Boolean topicIsPattern = false;
/**
* A string that uniquely identifies the group of consumer processes to
* which this consumer belongs. By setting the same group id multiple
* processes indicate that they are all part of the same consumer group.
* This option is required for consumers.
*/
private String groupId;
/**
* If the option is true, then KafkaProducer will ignore the
* KafkaConstants.TOPIC header setting of the inbound message.
*/
private Boolean bridgeEndpoint = false;
/**
* If the option is true, then KafkaProducer will detect if the message
* is attempted to be sent back to the same topic it may come from, if
* the message was original from a kafka consumer. If the
* KafkaConstants.TOPIC header is the same as the original kafka
* consumer topic, then the header setting is ignored, and the topic of
* the producer endpoint is used. In other words this avoids sending the
* same message back to where it came from. This option is not in use if
* the option bridgeEndpoint is set to true.
*/
private Boolean circularTopicDetection = true;
/**
* The partitioner class for partitioning messages amongst sub-topics.
* The default partitioner is based on the hash of the key.
*/
private String partitioner = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";
/**
* Name of the topic to use. On the consumer you can use comma to
* separate multiple topics. A producer can only send a message to a
* single topic.
*/
private String topic;
/**
* Number of concurrent consumers on the consumer
*/
private Integer consumerStreams = 10;
/**
* The number of consumers that connect to kafka server
*/
private Integer consumersCount = 1;
/**
* The client id is a user-specified string sent in each request to help
* trace calls. It should logically identify the application making the
* request.
*/
private String clientId;
/**
* If true, periodically commit to ZooKeeper the offset of messages
* already fetched by the consumer. This committed offset will be used
* when the process fails as the position from which the new consumer
* will begin.
*/
private Boolean autoCommitEnable = true;
/**
* Whether to allow doing manual commits via {@link KafkaManualCommit} .
* <p/>
* If this option is enabled then an instance of
* {@link KafkaManualCommit} is stored on the {@link Exchange} message
* header, which allows end users to access this API and perform manual
* offset commits via the Kafka consumer.
*/
private Boolean allowManualCommit = false;
/**
* The offset repository to use in order to locally store the offset of
* each partition of the topic. Defining one will disable the
* autocommit.
*/
private StateRepository offsetRepository;
/**
* The frequency in ms that the consumer offsets are committed to
* zookeeper.
*/
private Integer autoCommitIntervalMs = 5000;
/**
* The minimum amount of data the server should return for a fetch
* request. If insufficient data is available the request will wait for
* that much data to accumulate before answering the request.
*/
private Integer fetchMinBytes = 1;
private Integer fetchMaxBytes = 52428800;
/**
* The maximum amount of time the server will block before answering the
* fetch request if there isn't sufficient data to immediately satisfy
* fetch.min.bytes
*/
private Integer fetchWaitMaxMs = 500;
/**
* What to do when there is no initial offset in ZooKeeper or if an
* offset is out of range: smallest : automatically reset the offset to
* the smallest offset largest : automatically reset the offset to the
* largest offset fail: throw exception to the consumer
*/
private String autoOffsetReset = "latest";
/**
* Whether to perform an explicit auto commit when the consumer stops to
* ensure the broker has a commit from the last consumed message. This
* requires the option autoCommitEnable is turned on. The possible
* values are: sync, async, or none. And sync is the default value.
*/
private String autoCommitOnStop = "sync";
/**
* This options controls what happens when a consumer is processing an
* exchange and it fails. If the option is <tt>false</tt> then the
* consumer continues to the next message and processes it. If the
* option is <tt>true</tt> then the consumer breaks out, and will seek
* back to offset of the message that caused a failure, and then
* re-attempt to process this message. However this can lead to endless
* processing of the same message if its bound to fail every time, eg a
* poison message. Therefore its recommended to deal with that for
* example by using Camel's error handler.
*/
private Boolean breakOnFirstError = false;
/**
* URL of the Kafka brokers to use. The format is
* host1:port1,host2:port2, and the list can be a subset of brokers or a
* VIP pointing to a subset of brokers.
* <p/>
* This option is known as <tt>bootstrap.servers</tt> in the Kafka
* documentation.
*/
private String brokers;
/**
* This parameter allows you to specify the compression codec for all
* data generated by this producer. Valid values are "none", "gzip" and
* "snappy".
*/
private String compressionCodec = "none";
/**
* Before each retry, the producer refreshes the metadata of relevant
* topics to see if a new leader has been elected. Since leader election
* takes a bit of time, this property specifies the amount of time that
* the producer waits before refreshing the metadata.
*/
private Integer retryBackoffMs = 100;
/**
* Socket write buffer size
*/
private Integer sendBufferBytes = 131072;
/**
* The amount of time the broker will wait trying to meet the
* request.required.acks requirement before sending back an error to the
* client.
*/
private Integer requestTimeoutMs = 305000;
/**
* The maximum number of unsent messages that can be queued up the
* producer when using async mode before either the producer must be
* blocked or data must be dropped.
*/
private Integer queueBufferingMaxMessages = 10000;
/**
* The serializer class for messages.
*/
private String serializerClass = "org.apache.kafka.common.serialization.StringSerializer";
/**
* The serializer class for keys (defaults to the same as for messages
* if nothing is given).
*/
private String keySerializerClass = "org.apache.kafka.common.serialization.StringSerializer";
/**
* Kerberos kinit command path. Default is /usr/bin/kinit
*/
private String kerberosInitCmd = "/usr/bin/kinit";
/**
* Login thread sleep time between refresh attempts.
*/
private Integer kerberosBeforeReloginMinTime = 60000;
/**
* Percentage of random jitter added to the renewal time.
*/
private Double kerberosRenewJitter;
/**
* Login thread will sleep until the specified window factor of time
* from last refresh to ticket's expiry has been reached, at which time
* it will try to renew the ticket.
*/
private Double kerberosRenewWindowFactor;
/**
* A list of rules for mapping from principal names to short names
* (typically operating system usernames). The rules are evaluated in
* order and the first rule that matches a principal name is used to map
* it to a short name. Any later rules in the list are ignored. By
* default, principal names of the form {username}/{hostname}@{REALM}
* are mapped to {username}. For more details on the format please see
* <a href=\"#security_authz\"> security authorization and acls</a>.
* <p/>
* Multiple values can be separated by comma
*/
private String kerberosPrincipalToLocalRules = "DEFAULT";
/**
* A list of cipher suites. This is a named combination of
* authentication, encryption, MAC and key exchange algorithm used to
* negotiate the security settings for a network connection using TLS or
* SSL network protocol.By default all the available cipher suites are
* supported.
*/
private String sslCipherSuites;
/**
* The endpoint identification algorithm to validate server hostname
* using server certificate.
*/
private String sslEndpointAlgorithm;
/**
* The algorithm used by key manager factory for SSL connections.
* Default value is the key manager factory algorithm configured for the
* Java Virtual Machine.
*/
private String sslKeymanagerAlgorithm = "SunX509";
/**
* The algorithm used by trust manager factory for SSL connections.
* Default value is the trust manager factory algorithm configured for
* the Java Virtual Machine.
*/
private String sslTrustmanagerAlgorithm = "PKIX";
/**
* The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1
* and TLSv1 are enabled by default.
*/
private String sslEnabledProtocols = "TLSv1.2,TLSv1.1,TLSv1";
/**
* The file format of the key store file. This is optional for client.
* Default value is JKS
*/
private String sslKeystoreType = "JKS";
/**
* The SSL protocol used to generate the SSLContext. Default setting is
* TLS, which is fine for most cases. Allowed values in recent JVMs are
* TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in
* older JVMs, but their usage is discouraged due to known security
* vulnerabilities.
*/
private String sslProtocol = "TLS";
/**
* The name of the security provider used for SSL connections. Default
* value is the default security provider of the JVM.
*/
private String sslProvider;
/**
* The file format of the trust store file. Default value is JKS.
*/
private String sslTruststoreType = "JKS";
/**
* The Kerberos principal name that Kafka runs as. This can be defined
* either in Kafka's JAAS config or in Kafka's config.
*/
private String saslKerberosServiceName;
/**
* The Simple Authentication and Security Layer (SASL) Mechanism used.
* For the valid values see <a href=
* "http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml"
* >http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.
* xhtml</a>
*/
private String saslMechanism = "GSSAPI";
/**
* Expose the kafka sasl.jaas.config parameter Example:
* org.apache.kafka.common.security.plain.PlainLoginModule required
* username="USERNAME" password="PASSWORD";
*/
private String saslJaasConfig;
/**
* Protocol used to communicate with brokers. Currently only PLAINTEXT
* and SSL are supported.
*/
private String securityProtocol = "PLAINTEXT";
/**
* SSL configuration using a Camel {@link SSLContextParameters} object.
* If configured it's applied before the other SSL endpoint parameters.
*/
@NestedConfigurationProperty
private SSLContextParameters sslContextParameters;
/**
* The password of the private key in the key store file. This is
* optional for client.
*/
private String sslKeyPassword;
/**
* The location of the key store file. This is optional for client and
* can be used for two-way authentication for client.
*/
private String sslKeystoreLocation;
/**
* The store password for the key store file.This is optional for client
* and only needed if ssl.keystore.location is configured.
*/
private String sslKeystorePassword;
/**
* The location of the trust store file.
*/
private String sslTruststoreLocation;
/**
* The password for the trust store file.
*/
private String sslTruststorePassword;
/**
* The total bytes of memory the producer can use to buffer records
* waiting to be sent to the server. If records are sent faster than
* they can be delivered to the server the producer will either block or
* throw an exception based on the preference specified by
* block.on.buffer.full.This setting should correspond roughly to the
* total memory the producer will use, but is not a hard bound since not
* all memory the producer uses is used for buffering. Some additional
* memory will be used for compression (if compression is enabled) as
* well as for maintaining in-flight requests.
*/
private Integer bufferMemorySize = 33554432;
/**
* The record key (or null if no key is specified). If this option has
* been configured then it take precedence over header
* {@link KafkaConstants#KEY}
*/
private String key;
/**
* The partition to which the record will be sent (or null if no
* partition was specified). If this option has been configured then it
* take precedence over header {@link KafkaConstants#PARTITION_KEY}
*/
private Integer partitionKey;
/**
* The number of acknowledgments the producer requires the leader to
* have received before considering a request complete. This controls
* the durability of records that are sent. The following settings are
* common: acks=0 If set to zero then the producer will not wait for any
* acknowledgment from the server at all. The record will be immediately
* added to the socket buffer and considered sent. No guarantee can be
* made that the server has received the record in this case, and the
* retries configuration will not take effect (as the client won't
* generally know of any failures). The offset given back for each
* record will always be set to -1. acks=1 This will mean the leader
* will write the record to its local log but will respond without
* awaiting full acknowledgement from all followers. In this case should
* the leader fail immediately after acknowledging the record but before
* the followers have replicated it then the record will be lost.
* acks=all This means the leader will wait for the full set of in-sync
* replicas to acknowledge the record. This guarantees that the record
* will not be lost as long as at least one in-sync replica remains
* alive. This is the strongest available guarantee.
*/
private String requestRequiredAcks = "1";
/**
* Setting a value greater than zero will cause the client to resend any
* record whose send fails with a potentially transient error. Note that
* this retry is no different than if the client resent the record upon
* receiving the error. Allowing retries will potentially change the
* ordering of records because if two records are sent to a single
* partition, and the first fails and is retried but the second
* succeeds, then the second record may appear first.
*/
private Integer retries = 0;
/**
* The producer will attempt to batch records together into fewer
* requests whenever multiple records are being sent to the same
* partition. This helps performance on both the client and the server.
* This configuration controls the default batch size in bytes. No
* attempt will be made to batch records larger than this size.Requests
* sent to brokers will contain multiple batches, one for each partition
* with data available to be sent.A small batch size will make batching
* less common and may reduce throughput (a batch size of zero will
* disable batching entirely). A very large batch size may use memory a
* bit more wastefully as we will always allocate a buffer of the
* specified batch size in anticipation of additional records.
*/
private Integer producerBatchSize = 16384;
/**
* Close idle connections after the number of milliseconds specified by
* this config.
*/
private Integer connectionMaxIdleMs = 540000;
/**
* The producer groups together any records that arrive in between
* request transmissions into a single batched request. Normally this
* occurs only under load when records arrive faster than they can be
* sent out. However in some circumstances the client may want to reduce
* the number of requests even under moderate load. This setting
* accomplishes this by adding a small amount of artificial delay—that
* is, rather than immediately sending out a record the producer will
* wait for up to the given delay to allow other records to be sent so
* that the sends can be batched together. This can be thought of as
* analogous to Nagle's algorithm in TCP. This setting gives the upper
* bound on the delay for batching: once we get batch.size worth of
* records for a partition it will be sent immediately regardless of
* this setting, however if we have fewer than this many bytes
* accumulated for this partition we will 'linger' for the specified
* time waiting for more records to show up. This setting defaults to 0
* (i.e. no delay). Setting linger.ms=5, for example, would have the
* effect of reducing the number of requests sent but would add up to
* 5ms of latency to records sent in the absense of load.
*/
private Integer lingerMs = 0;
/**
* The configuration controls how long sending to kafka will block.
* These methods can be blocked for multiple reasons. For e.g: buffer
* full, metadata unavailable.This configuration imposes maximum limit
* on the total time spent in fetching metadata, serialization of key
* and value, partitioning and allocation of buffer memory when doing a
* send(). In case of partitionsFor(), this configuration imposes a
* maximum time threshold on waiting for metadata
*/
private Integer maxBlockMs = 60000;
/**
* The maximum size of a request. This is also effectively a cap on the
* maximum record size. Note that the server has its own cap on record
* size which may be different from this. This setting will limit the
* number of record batches the producer will send in a single request
* to avoid sending huge requests.
*/
private Integer maxRequestSize = 1048576;
/**
* The size of the TCP receive buffer (SO_RCVBUF) to use when reading
* data.
*/
private Integer receiveBufferBytes = 65536;
/**
* The maximum number of unacknowledged requests the client will send on
* a single connection before blocking. Note that if this setting is set
* to be greater than 1 and there are failed sends, there is a risk of
* message re-ordering due to retries (i.e., if retries are enabled).
*/
private Integer maxInFlightRequest = 5;
/**
* The period of time in milliseconds after which we force a refresh of
* metadata even if we haven't seen any partition leadership changes to
* proactively discover any new brokers or partitions.
*/
private Integer metadataMaxAgeMs = 300000;
/**
* A list of classes to use as metrics reporters. Implementing the
* MetricReporter interface allows plugging in classes that will be
* notified of new metric creation. The JmxReporter is always included
* to register JMX statistics.
*/
private String metricReporters;
/**
* The number of samples maintained to compute metrics.
*/
private Integer noOfMetricsSample = 2;
/**
* The number of samples maintained to compute metrics.
*/
private Integer metricsSampleWindowMs = 30000;
/**
* The amount of time to wait before attempting to reconnect to a given
* host. This avoids repeatedly connecting to a host in a tight loop.
* This backoff applies to all requests sent by the consumer to the
* broker.
*/
private Integer reconnectBackoffMs = 50;
/**
* The expected time between heartbeats to the consumer coordinator when
* using Kafka's group management facilities. Heartbeats are used to
* ensure that the consumer's session stays active and to facilitate
* rebalancing when new consumers join or leave the group. The value
* must be set lower than session.timeout.ms, but typically should be
* set no higher than 1/3 of that value. It can be adjusted even lower
* to control the expected time for normal rebalances.
*/
private Integer heartbeatIntervalMs = 3000;
/**
* The maximum amount of data per-partition the server will return. The
* maximum total memory used for a request will be #partitions *
* max.partition.fetch.bytes. This size must be at least as large as the
* maximum message size the server allows or else it is possible for the
* producer to send messages larger than the consumer can fetch. If that
* happens, the consumer can get stuck trying to fetch a large message
* on a certain partition.
*/
private Integer maxPartitionFetchBytes = 1048576;
/**
* The timeout used to detect failures when using Kafka's group
* management facilities.
*/
private Integer sessionTimeoutMs = 10000;
/**
* The maximum number of records returned in a single call to poll()
*/
private Integer maxPollRecords = 500;
/**
* The timeout used when polling the KafkaConsumer.
*/
private Long pollTimeoutMs = 5000L;
/**
* The maximum delay between invocations of poll() when using consumer
* group management. This places an upper bound on the amount of time
* that the consumer can be idle before fetching more records. If poll()
* is not called before expiration of this timeout, then the consumer is
* considered failed and the group will rebalance in order to reassign
* the partitions to another member.
*/
private Long maxPollIntervalMs;
/**
* The class name of the partition assignment strategy that the client
* will use to distribute partition ownership amongst consumer instances
* when group management is used
*/
private String partitionAssignor = "org.apache.kafka.clients.consumer.RangeAssignor";
/**
* The configuration controls the maximum amount of time the client will
* wait for the response of a request. If the response is not received
* before the timeout elapses the client will resend the request if
* necessary or fail the request if retries are exhausted.
*/
private Integer consumerRequestTimeoutMs = 40000;
/**
* Automatically check the CRC32 of the records consumed. This ensures
* no on-the-wire or on-disk corruption to the messages occurred. This
* check adds some overhead, so it may be disabled in cases seeking
* extreme performance.
*/
private Boolean checkCrcs = true;
/**
* Deserializer class for key that implements the Deserializer
* interface.
*/
private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
/**
* Deserializer class for value that implements the Deserializer
* interface.
*/
private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
/**
* Set if KafkaConsumer will read from beginning or end on startup:
* beginning : read from beginning end : read from end This is replacing
* the earlier property seekToBeginning
*/
private String seekTo;
/**
* To use a custom worker pool for continue routing {@link Exchange}
* after kafka server has acknowledge the message that was sent to it
* from {@link KafkaProducer} using asynchronous non-blocking
* processing.
*/
private ExecutorService workerPool;
/**
* Number of core threads for the worker pool for continue routing
* {@link Exchange} after kafka server has acknowledge the message that
* was sent to it from {@link KafkaProducer} using asynchronous
* non-blocking processing.
*/
private Integer workerPoolCoreSize = 10;
/**
* Maximum number of threads for the worker pool for continue routing
* {@link Exchange} after kafka server has acknowledge the message that
* was sent to it from {@link KafkaProducer} using asynchronous
* non-blocking processing.
*/
private Integer workerPoolMaxSize = 20;
/**
* Whether the producer should store the {@link RecordMetadata} results
* from sending to Kafka. The results are stored in a {@link List}
* containing the {@link RecordMetadata} metadata's. The list is stored
* on a header with the key {@link KafkaConstants#KAFKA_RECORDMETA}
*/
private Boolean recordMetadata = true;
/**
* Sets interceptors for producer or consumers. Producer interceptors
* have to be classes implementing
* {@link org.apache.kafka.clients.producer.ProducerInterceptor}
* Consumer interceptors have to be classes implementing
* {@link org.apache.kafka.clients.consumer.ConsumerInterceptor} Note
* that if you use Producer interceptor on a consumer it will throw a
* class cast exception in runtime
*/
private String interceptorClasses;
/**
* If set to 'true' the producer will ensure that exactly one copy of
* each message is written in the stream. If 'false', producer retries
* may write duplicates of the retried message in the stream. If set to
* true this option will require max.in.flight.requests.per.connection
* to be set to 1 and retries cannot be zero and additionally acks must
* be set to 'all'.
*/
private Boolean enableIdempotence = false;
/**
* The maximum amount of time in milliseconds to wait when reconnecting
* to a broker that has repeatedly failed to connect. If provided, the
* backoff per host will increase exponentially for each consecutive
* connection failure, up to this maximum. After calculating the backoff
* increase, 20% random jitter is added to avoid connection storms.
*/
private Integer reconnectBackoffMaxMs = 1000;
/**
* To use a custom HeaderFilterStrategy to filter header to and from
* Camel message.
*/
private HeaderFilterStrategy headerFilterStrategy;
/**
* Sets custom KafkaHeaderDeserializer for deserialization kafka headers
* values to camel headers values.
*
* @param kafkaHeaderDeserializer
* custom kafka header deserializer to be used
*/
private KafkaHeaderDeserializer kafkaHeaderDeserializer;
/**
* Sets custom KafkaHeaderDeserializer for serialization camel headers
* values to kafka headers values.
*
* @param kafkaHeaderSerializer
* custom kafka header serializer to be used
*/
private KafkaHeaderSerializer kafkaHeaderSerializer;
public Boolean getTopicIsPattern() {
return topicIsPattern;
}
public void setTopicIsPattern(Boolean topicIsPattern) {
this.topicIsPattern = topicIsPattern;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public Boolean getBridgeEndpoint() {
return bridgeEndpoint;
}
public void setBridgeEndpoint(Boolean bridgeEndpoint) {
this.bridgeEndpoint = bridgeEndpoint;
}
public Boolean getCircularTopicDetection() {
return circularTopicDetection;
}
public void setCircularTopicDetection(Boolean circularTopicDetection) {
this.circularTopicDetection = circularTopicDetection;
}
public String getPartitioner() {
return partitioner;
}
public void setPartitioner(String partitioner) {
this.partitioner = partitioner;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Integer getConsumerStreams() {
return consumerStreams;
}
public void setConsumerStreams(Integer consumerStreams) {
this.consumerStreams = consumerStreams;
}
public Integer getConsumersCount() {
return consumersCount;
}
public void setConsumersCount(Integer consumersCount) {
this.consumersCount = consumersCount;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Boolean getAutoCommitEnable() {
return autoCommitEnable;
}
public void setAutoCommitEnable(Boolean autoCommitEnable) {
this.autoCommitEnable = autoCommitEnable;
}
public Boolean getAllowManualCommit() {
return allowManualCommit;
}
public void setAllowManualCommit(Boolean allowManualCommit) {
this.allowManualCommit = allowManualCommit;
}
public StateRepository getOffsetRepository() {
return offsetRepository;
}
public void setOffsetRepository(StateRepository offsetRepository) {
this.offsetRepository = offsetRepository;
}
public Integer getAutoCommitIntervalMs() {
return autoCommitIntervalMs;
}
public void setAutoCommitIntervalMs(Integer autoCommitIntervalMs) {
this.autoCommitIntervalMs = autoCommitIntervalMs;
}
public Integer getFetchMinBytes() {
return fetchMinBytes;
}
public void setFetchMinBytes(Integer fetchMinBytes) {
this.fetchMinBytes = fetchMinBytes;
}
public Integer getFetchMaxBytes() {
return fetchMaxBytes;
}
public void setFetchMaxBytes(Integer fetchMaxBytes) {
this.fetchMaxBytes = fetchMaxBytes;
}
public Integer getFetchWaitMaxMs() {
return fetchWaitMaxMs;
}
public void setFetchWaitMaxMs(Integer fetchWaitMaxMs) {
this.fetchWaitMaxMs = fetchWaitMaxMs;
}
public String getAutoOffsetReset() {
return autoOffsetReset;
}
public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}
public String getAutoCommitOnStop() {
return autoCommitOnStop;
}
public void setAutoCommitOnStop(String autoCommitOnStop) {
this.autoCommitOnStop = autoCommitOnStop;
}
public Boolean getBreakOnFirstError() {
return breakOnFirstError;
}
public void setBreakOnFirstError(Boolean breakOnFirstError) {
this.breakOnFirstError = breakOnFirstError;
}
public String getBrokers() {
return brokers;
}
public void setBrokers(String brokers) {
this.brokers = brokers;
}
public String getCompressionCodec() {
return compressionCodec;
}
public void setCompressionCodec(String compressionCodec) {
this.compressionCodec = compressionCodec;
}
public Integer getRetryBackoffMs() {
return retryBackoffMs;
}
public void setRetryBackoffMs(Integer retryBackoffMs) {
this.retryBackoffMs = retryBackoffMs;
}
public Integer getSendBufferBytes() {
return sendBufferBytes;
}
public void setSendBufferBytes(Integer sendBufferBytes) {
this.sendBufferBytes = sendBufferBytes;
}
public Integer getRequestTimeoutMs() {
return requestTimeoutMs;
}
public void setRequestTimeoutMs(Integer requestTimeoutMs) {
this.requestTimeoutMs = requestTimeoutMs;
}
public Integer getQueueBufferingMaxMessages() {
return queueBufferingMaxMessages;
}
public void setQueueBufferingMaxMessages(
Integer queueBufferingMaxMessages) {
this.queueBufferingMaxMessages = queueBufferingMaxMessages;
}
public String getSerializerClass() {
return serializerClass;
}
public void setSerializerClass(String serializerClass) {
this.serializerClass = serializerClass;
}
public String getKeySerializerClass() {
return keySerializerClass;
}
public void setKeySerializerClass(String keySerializerClass) {
this.keySerializerClass = keySerializerClass;
}
public String getKerberosInitCmd() {
return kerberosInitCmd;
}
public void setKerberosInitCmd(String kerberosInitCmd) {
this.kerberosInitCmd = kerberosInitCmd;
}
public Integer getKerberosBeforeReloginMinTime() {
return kerberosBeforeReloginMinTime;
}
public void setKerberosBeforeReloginMinTime(
Integer kerberosBeforeReloginMinTime) {
this.kerberosBeforeReloginMinTime = kerberosBeforeReloginMinTime;
}
public Double getKerberosRenewJitter() {
return kerberosRenewJitter;
}
public void setKerberosRenewJitter(Double kerberosRenewJitter) {
this.kerberosRenewJitter = kerberosRenewJitter;
}
public Double getKerberosRenewWindowFactor() {
return kerberosRenewWindowFactor;
}
public void setKerberosRenewWindowFactor(
Double kerberosRenewWindowFactor) {
this.kerberosRenewWindowFactor = kerberosRenewWindowFactor;
}
public String getKerberosPrincipalToLocalRules() {
return kerberosPrincipalToLocalRules;
}
public void setKerberosPrincipalToLocalRules(
String kerberosPrincipalToLocalRules) {
this.kerberosPrincipalToLocalRules = kerberosPrincipalToLocalRules;
}
public String getSslCipherSuites() {
return sslCipherSuites;
}
public void setSslCipherSuites(String sslCipherSuites) {
this.sslCipherSuites = sslCipherSuites;
}
public String getSslEndpointAlgorithm() {
return sslEndpointAlgorithm;
}
public void setSslEndpointAlgorithm(String sslEndpointAlgorithm) {
this.sslEndpointAlgorithm = sslEndpointAlgorithm;
}
public String getSslKeymanagerAlgorithm() {
return sslKeymanagerAlgorithm;
}
public void setSslKeymanagerAlgorithm(String sslKeymanagerAlgorithm) {
this.sslKeymanagerAlgorithm = sslKeymanagerAlgorithm;
}
public String getSslTrustmanagerAlgorithm() {
return sslTrustmanagerAlgorithm;
}
public void setSslTrustmanagerAlgorithm(String sslTrustmanagerAlgorithm) {
this.sslTrustmanagerAlgorithm = sslTrustmanagerAlgorithm;
}
public String getSslEnabledProtocols() {
return sslEnabledProtocols;
}
public void setSslEnabledProtocols(String sslEnabledProtocols) {
this.sslEnabledProtocols = sslEnabledProtocols;
}
public String getSslKeystoreType() {
return sslKeystoreType;
}
public void setSslKeystoreType(String sslKeystoreType) {
this.sslKeystoreType = sslKeystoreType;
}
public String getSslProtocol() {
return sslProtocol;
}
public void setSslProtocol(String sslProtocol) {
this.sslProtocol = sslProtocol;
}
public String getSslProvider() {
return sslProvider;
}
public void setSslProvider(String sslProvider) {
this.sslProvider = sslProvider;
}
public String getSslTruststoreType() {
return sslTruststoreType;
}
public void setSslTruststoreType(String sslTruststoreType) {
this.sslTruststoreType = sslTruststoreType;
}
public String getSaslKerberosServiceName() {
return saslKerberosServiceName;
}
public void setSaslKerberosServiceName(String saslKerberosServiceName) {
this.saslKerberosServiceName = saslKerberosServiceName;
}
public String getSaslMechanism() {
return saslMechanism;
}
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
public String getSaslJaasConfig() {
return saslJaasConfig;
}
public void setSaslJaasConfig(String saslJaasConfig) {
this.saslJaasConfig = saslJaasConfig;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public void setSecurityProtocol(String securityProtocol) {
this.securityProtocol = securityProtocol;
}
public SSLContextParameters getSslContextParameters() {
return sslContextParameters;
}
public void setSslContextParameters(
SSLContextParameters sslContextParameters) {
this.sslContextParameters = sslContextParameters;
}
public String getSslKeyPassword() {
return sslKeyPassword;
}
public void setSslKeyPassword(String sslKeyPassword) {
this.sslKeyPassword = sslKeyPassword;
}
public String getSslKeystoreLocation() {
return sslKeystoreLocation;
}
public void setSslKeystoreLocation(String sslKeystoreLocation) {
this.sslKeystoreLocation = sslKeystoreLocation;
}
public String getSslKeystorePassword() {
return sslKeystorePassword;
}
public void setSslKeystorePassword(String sslKeystorePassword) {
this.sslKeystorePassword = sslKeystorePassword;
}
public String getSslTruststoreLocation() {
return sslTruststoreLocation;
}
public void setSslTruststoreLocation(String sslTruststoreLocation) {
this.sslTruststoreLocation = sslTruststoreLocation;
}
public String getSslTruststorePassword() {
return sslTruststorePassword;
}
public void setSslTruststorePassword(String sslTruststorePassword) {
this.sslTruststorePassword = sslTruststorePassword;
}
public Integer getBufferMemorySize() {
return bufferMemorySize;
}
public void setBufferMemorySize(Integer bufferMemorySize) {
this.bufferMemorySize = bufferMemorySize;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Integer getPartitionKey() {
return partitionKey;
}
public void setPartitionKey(Integer partitionKey) {
this.partitionKey = partitionKey;
}
public String getRequestRequiredAcks() {
return requestRequiredAcks;
}
public void setRequestRequiredAcks(String requestRequiredAcks) {
this.requestRequiredAcks = requestRequiredAcks;
}
public Integer getRetries() {
return retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public Integer getProducerBatchSize() {
return producerBatchSize;
}
public void setProducerBatchSize(Integer producerBatchSize) {
this.producerBatchSize = producerBatchSize;
}
public Integer getConnectionMaxIdleMs() {
return connectionMaxIdleMs;
}
public void setConnectionMaxIdleMs(Integer connectionMaxIdleMs) {
this.connectionMaxIdleMs = connectionMaxIdleMs;
}
public Integer getLingerMs() {
return lingerMs;
}
public void setLingerMs(Integer lingerMs) {
this.lingerMs = lingerMs;
}
public Integer getMaxBlockMs() {
return maxBlockMs;
}
public void setMaxBlockMs(Integer maxBlockMs) {
this.maxBlockMs = maxBlockMs;
}
public Integer getMaxRequestSize() {
return maxRequestSize;
}
public void setMaxRequestSize(Integer maxRequestSize) {
this.maxRequestSize = maxRequestSize;
}
public Integer getReceiveBufferBytes() {
return receiveBufferBytes;
}
public void setReceiveBufferBytes(Integer receiveBufferBytes) {
this.receiveBufferBytes = receiveBufferBytes;
}
public Integer getMaxInFlightRequest() {
return maxInFlightRequest;
}
public void setMaxInFlightRequest(Integer maxInFlightRequest) {
this.maxInFlightRequest = maxInFlightRequest;
}
public Integer getMetadataMaxAgeMs() {
return metadataMaxAgeMs;
}
public void setMetadataMaxAgeMs(Integer metadataMaxAgeMs) {
this.metadataMaxAgeMs = metadataMaxAgeMs;
}
public String getMetricReporters() {
return metricReporters;
}
public void setMetricReporters(String metricReporters) {
this.metricReporters = metricReporters;
}
public Integer getNoOfMetricsSample() {
return noOfMetricsSample;
}
public void setNoOfMetricsSample(Integer noOfMetricsSample) {
this.noOfMetricsSample = noOfMetricsSample;
}
public Integer getMetricsSampleWindowMs() {
return metricsSampleWindowMs;
}
public void setMetricsSampleWindowMs(Integer metricsSampleWindowMs) {
this.metricsSampleWindowMs = metricsSampleWindowMs;
}
public Integer getReconnectBackoffMs() {
return reconnectBackoffMs;
}
public void setReconnectBackoffMs(Integer reconnectBackoffMs) {
this.reconnectBackoffMs = reconnectBackoffMs;
}
public Integer getHeartbeatIntervalMs() {
return heartbeatIntervalMs;
}
public void setHeartbeatIntervalMs(Integer heartbeatIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
}
public Integer getMaxPartitionFetchBytes() {
return maxPartitionFetchBytes;
}
public void setMaxPartitionFetchBytes(Integer maxPartitionFetchBytes) {
this.maxPartitionFetchBytes = maxPartitionFetchBytes;
}
public Integer getSessionTimeoutMs() {
return sessionTimeoutMs;
}
public void setSessionTimeoutMs(Integer sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}
public Integer getMaxPollRecords() {
return maxPollRecords;
}
public void setMaxPollRecords(Integer maxPollRecords) {
this.maxPollRecords = maxPollRecords;
}
public Long getPollTimeoutMs() {
return pollTimeoutMs;
}
public void setPollTimeoutMs(Long pollTimeoutMs) {
this.pollTimeoutMs = pollTimeoutMs;
}
public Long getMaxPollIntervalMs() {
return maxPollIntervalMs;
}
public void setMaxPollIntervalMs(Long maxPollIntervalMs) {
this.maxPollIntervalMs = maxPollIntervalMs;
}
public String getPartitionAssignor() {
return partitionAssignor;
}
public void setPartitionAssignor(String partitionAssignor) {
this.partitionAssignor = partitionAssignor;
}
public Integer getConsumerRequestTimeoutMs() {
return consumerRequestTimeoutMs;
}
public void setConsumerRequestTimeoutMs(Integer consumerRequestTimeoutMs) {
this.consumerRequestTimeoutMs = consumerRequestTimeoutMs;
}
public Boolean getCheckCrcs() {
return checkCrcs;
}
public void setCheckCrcs(Boolean checkCrcs) {
this.checkCrcs = checkCrcs;
}
public String getKeyDeserializer() {
return keyDeserializer;
}
public void setKeyDeserializer(String keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
public String getValueDeserializer() {
return valueDeserializer;
}
public void setValueDeserializer(String valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
public String getSeekTo() {
return seekTo;
}
public void setSeekTo(String seekTo) {
this.seekTo = seekTo;
}
public ExecutorService getWorkerPool() {
return workerPool;
}
public void setWorkerPool(ExecutorService workerPool) {
this.workerPool = workerPool;
}
public Integer getWorkerPoolCoreSize() {
return workerPoolCoreSize;
}
public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
this.workerPoolCoreSize = workerPoolCoreSize;
}
public Integer getWorkerPoolMaxSize() {
return workerPoolMaxSize;
}
public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
this.workerPoolMaxSize = workerPoolMaxSize;
}
public Boolean getRecordMetadata() {
return recordMetadata;
}
public void setRecordMetadata(Boolean recordMetadata) {
this.recordMetadata = recordMetadata;
}
public String getInterceptorClasses() {
return interceptorClasses;
}
public void setInterceptorClasses(String interceptorClasses) {
this.interceptorClasses = interceptorClasses;
}
public Boolean getEnableIdempotence() {
return enableIdempotence;
}
public void setEnableIdempotence(Boolean enableIdempotence) {
this.enableIdempotence = enableIdempotence;
}
public Integer getReconnectBackoffMaxMs() {
return reconnectBackoffMaxMs;
}
public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
}
public HeaderFilterStrategy getHeaderFilterStrategy() {
return headerFilterStrategy;
}
public void setHeaderFilterStrategy(
HeaderFilterStrategy headerFilterStrategy) {
this.headerFilterStrategy = headerFilterStrategy;
}
public KafkaHeaderDeserializer getKafkaHeaderDeserializer() {
return kafkaHeaderDeserializer;
}
public void setKafkaHeaderDeserializer(
KafkaHeaderDeserializer kafkaHeaderDeserializer) {
this.kafkaHeaderDeserializer = kafkaHeaderDeserializer;
}
public KafkaHeaderSerializer getKafkaHeaderSerializer() {
return kafkaHeaderSerializer;
}
public void setKafkaHeaderSerializer(
KafkaHeaderSerializer kafkaHeaderSerializer) {
this.kafkaHeaderSerializer = kafkaHeaderSerializer;
}
}
}