diff --git a/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/kafka.json b/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/kafka.json
index 93d42e7..8ddb617 100644
--- a/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/kafka.json
+++ b/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/kafka.json
@@ -52,6 +52,7 @@
     "maxPollRecords": { "kind": "property", "displayName": "Max Poll Records", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "500", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum number of records returned in a single call to poll()" },
     "offsetRepository": { "kind": "property", "displayName": "Offset Repository", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.spi.StateRepository<java.lang.String, java.lang.String>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit." },
     "partitionAssignor": { "kind": "property", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used" },
+    "pollOnError": { "kind": "property", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop the consumer (have to be manually started\/restarted if the consumer should be able to consume messages again)" },
     "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." },
     "seekTo": { "kind": "property", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning" },
     "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." },
@@ -59,6 +60,7 @@
     "topicIsPattern": { "kind": "property", "displayName": "Topic Is Pattern", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Whether the topic is a pattern (regular expression). This can be used to subscribe to dynamic number of topics matching the pattern." },
     "valueDeserializer": { "kind": "property", "displayName": "Value Deserializer", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.common.serialization.StringDeserializer", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Deserializer class for value that implements the Deserializer interface." },
     "kafkaManualCommitFactory": { "kind": "property", "displayName": "Kafka Manual Commit Factory", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaManualCommitFactory", "deprecated": false, "autowired": false, "secret": false, "description": "Factory to use for creating KafkaManualCommit instances. This allows to plugin a custom factory to create custom KafkaManualCommit instances in case special logic is needed when doing manual commits that deviates from the default implementation that comes out of the box." },
+    "pollExceptionStrategy": { "kind": "property", "displayName": "Poll Exception Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollExceptionStrategy", "deprecated": false, "autowired": true, "secret": false, "description": "To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages." },
     "bufferMemorySize": { "kind": "property", "displayName": "Buffer Memory Size", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "33554432", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests." },
     "compressionCodec": { "kind": "property", "displayName": "Compression Codec", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "none", "gzip", "snappy", "lz4" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "none", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none, gzip and snappy." },
     "connectionMaxIdleMs": { "kind": "property", "displayName": "Connection Max Idle Ms", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "540000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Close idle connections after the number of milliseconds specified by this config." },
@@ -153,6 +155,7 @@
     "maxPollRecords": { "kind": "parameter", "displayName": "Max Poll Records", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "500", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The maximum number of records returned in a single call to poll()" },
     "offsetRepository": { "kind": "parameter", "displayName": "Offset Repository", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.spi.StateRepository<java.lang.String, java.lang.String>", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The offset repository to use in order to locally store the offset of each partition of the topic. Defining one will disable the autocommit." },
     "partitionAssignor": { "kind": "parameter", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used" },
+    "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop the consumer (have to be manually started\/restarted if the consumer should be able to consume messages again)" },
     "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." },
     "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning end : read from end This is replacing the earlier property seekToBeginning" },
     "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." },
diff --git a/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/scheduler.json b/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/scheduler.json
index e442487..3e4c0c0 100644
--- a/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/scheduler.json
+++ b/catalog/camel-catalog-provider-springboot/src/main/resources/org/apache/camel/springboot/catalog/components/scheduler.json
@@ -24,7 +24,7 @@
   "componentProperties": {
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored." },
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc." },
-    "concurrentTasks": { "kind": "property", "displayName": "Concurrent Tasks", "group": "scheduler", "label": "scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Number of threads used by the scheduling thread pool. Is by default using a single thread" }
+    "poolSize": { "kind": "property", "displayName": "Pool Size", "group": "scheduler", "label": "scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Number of core threads in the thread pool used by the scheduling thread pool. Is by default using a single thread" }
   "properties": {
     "name": { "kind": "path", "displayName": "Name", "group": "consumer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the scheduler" },
@@ -37,10 +37,10 @@
     "backoffErrorThreshold": { "kind": "parameter", "displayName": "Backoff Error Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in." },
     "backoffIdleThreshold": { "kind": "parameter", "displayName": "Backoff Idle Threshold", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "The number of subsequent idle polls that should happen before the backoffMultipler should kick-in." },
     "backoffMultiplier": { "kind": "parameter", "displayName": "Backoff Multiplier", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "description": "To let the scheduled polling consumer backoff if there has been a number of subsequent idles\/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and\/or backoffErrorThreshold must also be configured." },
-    "concurrentTasks": { "kind": "parameter", "displayName": "Concurrent Tasks", "group": "scheduler", "label": "scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Number of threads used by the scheduling thread pool. Is by default using a single thread" },
     "delay": { "kind": "parameter", "displayName": "Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 500, "description": "Milliseconds before the next poll." },
     "greedy": { "kind": "parameter", "displayName": "Greedy", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages." },
     "initialDelay": { "kind": "parameter", "displayName": "Initial Delay", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1000, "description": "Milliseconds before the first poll starts." },
+    "poolSize": { "kind": "parameter", "displayName": "Pool Size", "group": "scheduler", "label": "scheduler", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Number of core threads in the thread pool used by the scheduling thread pool. Is by default using a single thread" },
     "repeatCount": { "kind": "parameter", "displayName": "Repeat Count", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "description": "Specifies a maximum limit of number of fires. So if you set it to 1, the scheduler will only fire once. If you set it to 5, it will only fire five times. A value of zero or negative means fire forever." },
     "runLoggingLevel": { "kind": "parameter", "displayName": "Run Logging Level", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "org.apache.camel.LoggingLevel", "enum": [ "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "OFF" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRACE", "description": "The consumer logs a start\/complete log line when it polls. This option allows you to configure the logging level for that." },
     "scheduledExecutorService": { "kind": "parameter", "displayName": "Scheduled Executor Service", "group": "scheduler", "label": "consumer,scheduler", "required": false, "type": "object", "javaType": "java.util.concurrent.ScheduledExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "Allows for configuring a custom\/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool." },
diff --git a/components-starter/camel-kafka-starter/src/main/docs/kafka-starter.adoc b/components-starter/camel-kafka-starter/src/main/docs/kafka-starter.adoc
index 4ed532f..bcebb45 100644
--- a/components-starter/camel-kafka-starter/src/main/docs/kafka-starter.adoc
+++ b/components-starter/camel-kafka-starter/src/main/docs/kafka-starter.adoc
@@ -17,7 +17,7 @@
-The component supports 100 options, which are listed below.
+The component supports 102 options, which are listed below.
@@ -80,6 +80,8 @@
 | *camel.component.kafka.partition-assignor* | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
 | *camel.component.kafka.partition-key* | The partition to which the record will be sent (or null if no partition was specified). If this option has been configured then it take precedence over header KafkaConstants#PARTITION_KEY |  | Integer
 | *camel.component.kafka.partitioner* | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key. | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String
+| *camel.component.kafka.poll-exception-strategy* | To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. The option is a org.apache.camel.component.kafka.PollExceptionStrategy type. |  | PollExceptionStrategy
+| *camel.component.kafka.poll-on-error* | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop the consumer (have to be manually started/restarted if the consumer should be able to consume messages again) |  | PollOnError
 | *camel.component.kafka.poll-timeout-ms* | The timeout used when polling the KafkaConsumer. The option is a java.lang.Long type. | 5000 | Long
 | *camel.component.kafka.producer-batch-size* | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records. | 16384 | Integer
 | *camel.component.kafka.queue-buffering-max-messages* | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. | 10000 | Integer
diff --git a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/ b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/
index b0c8719..7f6a7f1 100644
--- a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/
+++ b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/
@@ -22,6 +22,8 @@
 import org.apache.camel.component.kafka.KafkaClientFactory;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import org.apache.camel.component.kafka.PollExceptionStrategy;
+import org.apache.camel.component.kafka.PollOnError;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
 import org.apache.camel.spi.HeaderFilterStrategy;
@@ -250,6 +252,19 @@
     private String partitionAssignor = "org.apache.kafka.clients.consumer.RangeAssignor";
+     * What to do if kafka threw an exception while polling for new messages.
+     * Will by default use the value from the component configuration unless an
+     * explicit value has been configured on the endpoint level. DISCARD will
+     * discard the message and continue to poll next message. ERROR_HANDLER will
+     * use Camel's error handler to process the exception, and afterwards
+     * continue to poll next message. RECONNECT will re-connect the consumer and
+     * try poll the message again RETRY will let the consumer retry polling the
+     * same message again STOP will stop the consumer (have to be manually
+     * started/restarted if the consumer should be able to consume messages
+     * again)
+     */
+    private PollOnError pollOnError;
+    /**
      * The timeout used when polling the KafkaConsumer. The option is a
      * java.lang.Long type.
@@ -290,6 +305,12 @@
     private KafkaManualCommitFactory kafkaManualCommitFactory;
+     * To use a custom strategy with the consumer to control how to handle
+     * exceptions thrown from the Kafka broker while pooling messages. The
+     * option is a org.apache.camel.component.kafka.PollExceptionStrategy type.
+     */
+    private PollExceptionStrategy pollExceptionStrategy;
+    /**
      * The total bytes of memory the producer can use to buffer records waiting
      * to be sent to the server. If records are sent faster than they can be
      * delivered to the server the producer will either block or throw an
@@ -949,6 +970,14 @@
         this.partitionAssignor = partitionAssignor;
+    public PollOnError getPollOnError() {
+        return pollOnError;
+    }
+    public void setPollOnError(PollOnError pollOnError) {
+        this.pollOnError = pollOnError;
+    }
     public Long getPollTimeoutMs() {
         return pollTimeoutMs;
@@ -1006,6 +1035,15 @@
         this.kafkaManualCommitFactory = kafkaManualCommitFactory;
+    public PollExceptionStrategy getPollExceptionStrategy() {
+        return pollExceptionStrategy;
+    }
+    public void setPollExceptionStrategy(
+            PollExceptionStrategy pollExceptionStrategy) {
+        this.pollExceptionStrategy = pollExceptionStrategy;
+    }
     public Integer getBufferMemorySize() {
         return bufferMemorySize;
diff --git a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/ b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/
index c26a1f5..9135a7f 100644
--- a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/
+++ b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/
@@ -46,6 +46,7 @@
         answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer.class));
         answer.add(new ConvertiblePair(String.class, org.apache.camel.spi.StateRepository.class));
         answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.KafkaManualCommitFactory.class));
+        answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.PollExceptionStrategy.class));
         answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.serde.KafkaHeaderSerializer.class));
         answer.add(new ConvertiblePair(String.class, java.util.concurrent.ExecutorService.class));
         answer.add(new ConvertiblePair(String.class, org.apache.camel.component.kafka.KafkaClientFactory.class));
@@ -71,6 +72,7 @@
             case "org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer": return applicationContext.getBean(ref, org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer.class);
             case "org.apache.camel.spi.StateRepository": return applicationContext.getBean(ref, org.apache.camel.spi.StateRepository.class);
             case "org.apache.camel.component.kafka.KafkaManualCommitFactory": return applicationContext.getBean(ref, org.apache.camel.component.kafka.KafkaManualCommitFactory.class);
+            case "org.apache.camel.component.kafka.PollExceptionStrategy": return applicationContext.getBean(ref, org.apache.camel.component.kafka.PollExceptionStrategy.class);
             case "org.apache.camel.component.kafka.serde.KafkaHeaderSerializer": return applicationContext.getBean(ref, org.apache.camel.component.kafka.serde.KafkaHeaderSerializer.class);
             case "java.util.concurrent.ExecutorService": return applicationContext.getBean(ref, java.util.concurrent.ExecutorService.class);
             case "org.apache.camel.component.kafka.KafkaClientFactory": return applicationContext.getBean(ref, org.apache.camel.component.kafka.KafkaClientFactory.class);
diff --git a/components-starter/camel-scheduler-starter/src/main/docs/scheduler-starter.adoc b/components-starter/camel-scheduler-starter/src/main/docs/scheduler-starter.adoc
index a3f5c09..9ff968c 100644
--- a/components-starter/camel-scheduler-starter/src/main/docs/scheduler-starter.adoc
+++ b/components-starter/camel-scheduler-starter/src/main/docs/scheduler-starter.adoc
@@ -26,7 +26,7 @@
 | Name | Description | Default | Type
 | *camel.component.scheduler.autowired-enabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | Boolean
 | *camel.component.scheduler.bridge-error-handler* | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | Boolean
-| *camel.component.scheduler.concurrent-tasks* | Number of threads used by the scheduling thread pool. Is by default using a single thread | 1 | Integer
 | *camel.component.scheduler.enabled* | Whether to enable auto configuration of the scheduler component. This is enabled by default. |  | Boolean
+| *camel.component.scheduler.pool-size* | Number of core threads in the thread pool used by the scheduling thread pool. Is by default using a single thread | 1 | Integer
 // spring-boot-auto-configure options: END
diff --git a/components-starter/camel-scheduler-starter/src/main/java/org/apache/camel/component/scheduler/springboot/ b/components-starter/camel-scheduler-starter/src/main/java/org/apache/camel/component/scheduler/springboot/
index ea629cc..ec36f72 100644
--- a/components-starter/camel-scheduler-starter/src/main/java/org/apache/camel/component/scheduler/springboot/
+++ b/components-starter/camel-scheduler-starter/src/main/java/org/apache/camel/component/scheduler/springboot/
@@ -56,10 +56,10 @@
     private Boolean autowiredEnabled = true;
-     * Number of threads used by the scheduling thread pool. Is by default using
-     * a single thread
+     * Number of core threads in the thread pool used by the scheduling thread
+     * pool. Is by default using a single thread
-    private Integer concurrentTasks = 1;
+    private Integer poolSize = 1;
     public Boolean getBridgeErrorHandler() {
         return bridgeErrorHandler;
@@ -77,11 +77,11 @@
         this.autowiredEnabled = autowiredEnabled;
-    public Integer getConcurrentTasks() {
-        return concurrentTasks;
+    public Integer getPoolSize() {
+        return poolSize;
-    public void setConcurrentTasks(Integer concurrentTasks) {
-        this.concurrentTasks = concurrentTasks;
+    public void setPoolSize(Integer poolSize) {
+        this.poolSize = poolSize;
\ No newline at end of file
diff --git a/docs/modules/ROOT/pages/kafka-starter.adoc b/docs/modules/ROOT/pages/kafka-starter.adoc
index 4ed532f..bcebb45 100644
--- a/docs/modules/ROOT/pages/kafka-starter.adoc
+++ b/docs/modules/ROOT/pages/kafka-starter.adoc
@@ -17,7 +17,7 @@
-The component supports 100 options, which are listed below.
+The component supports 102 options, which are listed below.
@@ -80,6 +80,8 @@
 | *camel.component.kafka.partition-assignor* | The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used | org.apache.kafka.clients.consumer.RangeAssignor | String
 | *camel.component.kafka.partition-key* | The partition to which the record will be sent (or null if no partition was specified). If this option has been configured then it take precedence over header KafkaConstants#PARTITION_KEY |  | Integer
 | *camel.component.kafka.partitioner* | The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key. | org.apache.kafka.clients.producer.internals.DefaultPartitioner | String
+| *camel.component.kafka.poll-exception-strategy* | To use a custom strategy with the consumer to control how to handle exceptions thrown from the Kafka broker while pooling messages. The option is a org.apache.camel.component.kafka.PollExceptionStrategy type. |  | PollExceptionStrategy
+| *camel.component.kafka.poll-on-error* | What to do if kafka threw an exception while polling for new messages. Will by default use the value from the component configuration unless an explicit value has been configured on the endpoint level. DISCARD will discard the message and continue to poll next message. ERROR_HANDLER will use Camel's error handler to process the exception, and afterwards continue to poll next message. RECONNECT will re-connect the consumer and try poll the message again RETRY will let the consumer retry polling the same message again STOP will stop the consumer (have to be manually started/restarted if the consumer should be able to consume messages again) |  | PollOnError
 | *camel.component.kafka.poll-timeout-ms* | The timeout used when polling the KafkaConsumer. The option is a java.lang.Long type. | 5000 | Long
 | *camel.component.kafka.producer-batch-size* | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size.Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records. | 16384 | Integer
 | *camel.component.kafka.queue-buffering-max-messages* | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. | 10000 | Integer
diff --git a/docs/modules/ROOT/pages/scheduler-starter.adoc b/docs/modules/ROOT/pages/scheduler-starter.adoc
index a3f5c09..9ff968c 100644
--- a/docs/modules/ROOT/pages/scheduler-starter.adoc
+++ b/docs/modules/ROOT/pages/scheduler-starter.adoc
@@ -26,7 +26,7 @@
 | Name | Description | Default | Type
 | *camel.component.scheduler.autowired-enabled* | Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which then gets configured on the component. This can be used for automatic configuring JDBC data sources, JMS connection factories, AWS Clients, etc. | true | Boolean
 | *camel.component.scheduler.bridge-error-handler* | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | Boolean
-| *camel.component.scheduler.concurrent-tasks* | Number of threads used by the scheduling thread pool. Is by default using a single thread | 1 | Integer
 | *camel.component.scheduler.enabled* | Whether to enable auto configuration of the scheduler component. This is enabled by default. |  | Boolean
+| *camel.component.scheduler.pool-size* | Number of core threads in the thread pool used by the scheduling thread pool. Is by default using a single thread | 1 | Integer
 // spring-boot-auto-configure options: END
diff --git a/tooling/camel-spring-boot-dependencies/pom.xml b/tooling/camel-spring-boot-dependencies/pom.xml
index 753d35a..c54db64 100644
--- a/tooling/camel-spring-boot-dependencies/pom.xml
+++ b/tooling/camel-spring-boot-dependencies/pom.xml
@@ -4475,12 +4475,12 @@
-        <version>2.7.0</version>
+        <version>2.7.1</version>
-        <version>2.7.0</version>
+        <version>2.7.1</version>
@@ -4581,167 +4581,167 @@
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>
-        <version>8.4.0.Final</version>
+        <version>8.4.1.Final</version>