Code cleanup
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
index 7cc47f0..092e513 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
@@ -27,7 +27,7 @@
 import org.slf4j.LoggerFactory;
 
 public class CamelSinkConnector extends SinkConnector {
-    private static Logger log = LoggerFactory.getLogger(CamelSinkConnector.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkConnector.class);
 
     private Map<String, String> configProps;
 
@@ -38,7 +38,7 @@
 
     @Override
     public void start(Map<String, String> configProps) {
-        log.info("Connector config keys: {}", String.join(", ", configProps.keySet()));
+        LOG.info("Connector config keys: {}", String.join(", ", configProps.keySet()));
         this.configProps = configProps;
     }
 
@@ -49,7 +49,7 @@
 
     @Override
     public List<Map<String, String>> taskConfigs(int maxTasks) {
-        log.info("Setting task configurations for {} workers.", maxTasks);
+        LOG.info("Setting task configurations for {} workers.", maxTasks);
         final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
         for (int i = 0; i < maxTasks; ++i) {
             configs.add(configProps);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index d59d342..e11e84d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -38,18 +38,21 @@
     public static final String CAMEL_SINK_URL_DOC = "The camel url to configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
             + " and all the properties starting with " + CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
 
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
+        .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
+        .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
     }
 
     public CamelSinkConnectorConfig(Map<String, String> parsedConfig) {
-        this(conf(), parsedConfig);
+        this(CONFIG_DEF, parsedConfig);
     }
 
     public static ConfigDef conf() {
-        return new ConfigDef()
-                .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
-                .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
-                .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+        return CONFIG_DEF;
     }
+
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 9785215..baa6665 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,11 +19,9 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
@@ -44,7 +42,7 @@
     private static final String CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX = "camel.sink.endpoint.";
     private static final String CAMEL_SINK_PATH_PROPERTIES_PREFIX = "camel.sink.path.";
 
-    private static Logger log = LoggerFactory.getLogger(CamelSinkTask.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class);
 
     private static final String LOCAL_URL = "direct:start";
     private static final String HEADER_CAMEL_PREFIX = "CamelHeader";
@@ -62,7 +60,7 @@
     @Override
     public void start(Map<String, String> props) {
         try {
-            log.info("Starting CamelSinkTask connector task");
+            LOG.info("Starting CamelSinkTask connector task");
             Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props);
             config = getCamelSinkConnectorConfig(actualProps);
 
@@ -78,7 +76,7 @@
             producer = cms.createProducerTemplate();
 
             cms.start();
-            log.info("CamelSinkTask connector task started");
+            LOG.info("CamelSinkTask connector task started");
         } catch (Exception e) {
             throw new ConnectException("Failed to create and start Camel context", e);
         }
@@ -116,20 +114,20 @@
             }
             exchange.getMessage().setHeaders(headers);
             exchange.getMessage().setBody(record.value());
-            log.debug("Sending {} to {}", exchange, LOCAL_URL);
+            LOG.debug("Sending {} to {}", exchange, LOCAL_URL);
             producer.send(LOCAL_URL, exchange);
         }
     }
 
     @Override
     public void stop() {
+        LOG.info("Stopping CamelSinkTask connector task");
         try {
-            log.info("Stopping CamelSinkTask connector task");
             cms.stop();
         } catch (Exception e) {
             throw new ConnectException("Failed to stop Camel context", e);
         } finally {
-            log.info("CamelSinkTask connector task stopped");
+            LOG.info("CamelSinkTask connector task stopped");
         }
     }
 
@@ -157,7 +155,7 @@
             map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
             map.put(singleHeader.key(), (List<?>)singleHeader.value());
-        } 
+        }
     }
 
     private void addProperty(Exchange exchange, Header singleHeader) {
@@ -184,7 +182,7 @@
             exchange.getProperties().put(singleHeader.key(), (Map<?, ?>)singleHeader.value());
         } else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
             exchange.getProperties().put(singleHeader.key(), (List<?>)singleHeader.value());
-        } 
+        }
     }
 
     public CamelMainSupport getCms() {
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 930ae8b..3de49dd 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -67,25 +67,27 @@
     public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name of a camel message header containing an unique key that can be used as a Kafka message key."
           +  " If this is not specified, then the Kafka message will not have a key.";
 
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
+        .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
+        .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
+        .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
+        .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+        .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
+        .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
+        .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
+        .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
+        .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+
     public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
         super(config, parsedConfig);
     }
 
     public CamelSourceConnectorConfig(Map<String, String> parsedConfig) {
-        this(conf(), parsedConfig);
+        this(CONFIG_DEF, parsedConfig);
     }
 
     public static ConfigDef conf() {
-        return new ConfigDef()
-                .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
-                .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
-                .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
-                .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
-                .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
-                .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
-                .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
-                .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+        return CONFIG_DEF;
     }
 }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 5025296..83ea1b1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -24,7 +24,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -91,7 +90,7 @@
             Endpoint endpoint = cms.getEndpoint(localUrl);
             consumer = endpoint.createPollingConsumer();
             consumer.start();
-            
+
             cms.start();
             LOG.info("CamelSourceTask connector task started");
         } catch (Exception e) {
@@ -143,7 +142,7 @@
         }
 
         if (records.isEmpty()) {
-            return null;
+            return Collections.EMPTY_LIST;
         } else {
             return records;
         }
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index ddfd1e2..91735f2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -43,7 +43,7 @@
 
 public class CamelMainSupport {
     public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
-    private static Logger log = LoggerFactory.getLogger(CamelMainSupport.class);
+    private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class);
 
     private Main camelMain;
     private CamelContext camel;
@@ -83,7 +83,7 @@
         Properties camelProperties = new OrderedProperties();
         camelProperties.putAll(orderedProps);
 
-        log.info("Setting initial properties in Camel context: [{}]", camelProperties);
+        LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
         this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
 
         //creating the actual route
@@ -93,15 +93,15 @@
                 if (marshal != null && unmarshal != null) {
                     throw new UnsupportedOperationException("Uses of both marshal (i.e. " + marshal + ") and unmarshal (i.e. " + unmarshal + ") is not supported");
                 } else if (marshal != null) {
-                    log.info("Creating Camel route from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
+                    LOG.info("Creating Camel route from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
                     camel.getRegistry().bind(marshal, lookupAndInstantiateDataformat(marshal));
                     rd.marshal().custom(marshal);
                 } else if (unmarshal != null) {
-                    log.info("Creating Camel route from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
+                    LOG.info("Creating Camel route from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
                     camel.getRegistry().bind(unmarshal, lookupAndInstantiateDataformat(unmarshal));
                     rd.unmarshal().custom(unmarshal);
                 } else {
-                    log.info("Creating Camel route from({}).to({})", fromUrl, toUrl);
+                    LOG.info("Creating Camel route from({}).to({})", fromUrl, toUrl);
                 }
                 rd.to(toUrl);
             }
@@ -109,27 +109,27 @@
     }
 
     public void start() throws Exception {
-        log.info("Starting CamelContext");
+        LOG.info("Starting CamelContext");
 
         CamelContextStarter starter = new CamelContextStarter();
         exService.execute(starter);
         startFinishedSignal.await();
 
         if (starter.hasException()) {
-            log.info("CamelContext failed to start", starter.getException());
+            LOG.info("CamelContext failed to start", starter.getException());
             throw starter.getException();
         }
 
-        log.info("CamelContext started");
+        LOG.info("CamelContext started");
     }
 
     public void stop() {
-        log.info("Stopping CamelContext");
+        LOG.info("Stopping CamelContext");
 
         camelMain.stop();
         exService.shutdown();
 
-        log.info("CamelContext stopped");
+        LOG.info("CamelContext stopped");
     }
 
     public ProducerTemplate createProducerTemplate() {
@@ -190,7 +190,7 @@
 
         @Override
         public void afterStart(BaseMainSupport main) {
-            log.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
+            LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
             startFinishedSignal.countDown();
         }
 
@@ -217,10 +217,10 @@
             try {
                 camelMain.run();
             } catch (Exception e) {
-                log.error("An exception has occurred before CamelContext startup has finished", e);
+                LOG.error("An exception has occurred before CamelContext startup has finished", e);
                 startException = e;
                 if (startFinishedSignal.getCount() > 0) {
-                    log.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
+                    LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
                     startFinishedSignal.countDown();
                 }
             }
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 934b78d..89205ae 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,20 +16,15 @@
  */
 package org.apache.camel.kafkaconnector;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.camel.ConsumerTemplate;
-import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.header.Headers;
-import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Test;
 
@@ -40,16 +35,18 @@
 
 public class CamelSourceTaskTest {
 
+    private static final String TIMER_URI = "timer:kafkaconnector?period=10&fixedRate=true&delay=0";
+
     @Test
     public void testSourcePolling() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2100L);
+        Thread.sleep(11L);
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(2, poll.size());
         assertEquals("mytopic", poll.get(0).topic());
@@ -62,9 +59,9 @@
                 break;
             }
         }
-        assertTrue(containsHeader);
 
         camelSourceTask.stop();
+        assertTrue(containsHeader);
     }
 
     @Test
@@ -82,7 +79,7 @@
         // first we test if we have a key in the message with body
         template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234);
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -92,7 +89,7 @@
         // second we test if we have no key under the header
         template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234);
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -102,7 +99,7 @@
         // third we test if we have the header but with null value
         template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null);
 
-        Thread.sleep(100L);
+        Thread.sleep(10L);
 
         camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -126,7 +123,7 @@
         // send first data
         template.sendBody("direct:start", "testing kafka connect");
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -138,7 +135,7 @@
         // send second data
         template.sendBody("direct:start", true);
 
-        Thread.sleep(100L);
+        Thread.sleep(11L);
 
         poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -150,7 +147,7 @@
         // second third data
         template.sendBody("direct:start", 1234L);
 
-        Thread.sleep(100L);
+        Thread.sleep(10L);
 
         poll = camelSourceTask.poll();
         assertEquals(1, poll.size());
@@ -162,7 +159,7 @@
         // third with null data
         template.sendBody("direct:start", null);
 
-        Thread.sleep(100L);
+        Thread.sleep(10L);
         poll = camelSourceTask.poll();
         assertNull(poll.get(0).key());
         assertNull(poll.get(0).keySchema());
@@ -175,50 +172,48 @@
     @Test
     public void testSourcePollingTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
         props.put("camel.source.maxPollDuration", "1");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(3000L);
+        long sleepTime = 30L;
+        Thread.sleep(sleepTime);
         List<SourceRecord> poll;
         int retries = 3;
         do {
             poll = camelSourceTask.poll();
             if (poll == null) {
                 retries--;
-
                 if (retries == 0) {
                     fail("Exhausted the maximum retries and no record was returned");
                 }
-
-                Thread.sleep(3000L);
+                Thread.sleep(sleepTime);
             }
         } while (poll == null && retries > 0);
 
         assertTrue(poll.size() >= 1, "Received messages are: " + poll.size() + ", expected between 1 and 2.");
         assertTrue(poll.size() <= 2, "Received messages are: " + poll.size() + ", expected between 1 and 2.");
-
         camelSourceTask.stop();
     }
 
     @Test
     public void testSourcePollingMaxRecordNumber() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
         props.put("camel.source.maxBatchPollSize", "1");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2000L);
+        Thread.sleep(11L);
         List<SourceRecord> poll = camelSourceTask.poll();
-        assertEquals(1, poll.size());
-
         camelSourceTask.stop();
+
+        assertEquals(1, poll.size());
     }
 
     @Test
@@ -243,9 +238,9 @@
     }
 
     @Test
-    public void testUrlPrecedenceOnComponentProperty() throws JsonProcessingException, InterruptedException {
+    public void testUrlPrecedenceOnComponentProperty() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
-        props.put("camel.source.url", "timer:kafkaconnector");
+        props.put("camel.source.url", TIMER_URI);
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "shouldNotBeUsed");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
@@ -254,7 +249,7 @@
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
 
-        Thread.sleep(2100L);
+        Thread.sleep(11L);
         List<SourceRecord> poll = camelSourceTask.poll();
         assertEquals(2, poll.size());
         assertEquals("mytopic", poll.get(0).topic());
@@ -267,13 +262,13 @@
                 break;
             }
         }
-        assertTrue(containsHeader);
-
         camelSourceTask.stop();
+
+        assertTrue(containsHeader);
     }
 
     @Test
-    public void testSourcePollingUsingComponentProperty() throws JsonProcessingException, InterruptedException {
+    public void testSourcePollingUsingComponentProperty() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
@@ -298,13 +293,14 @@
         }
         assertTrue(containsHeader);
 
-        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
+        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+            .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
 
         camelSourceTask.stop();
     }
 
     @Test
-    public void testSourcePollingUsingMultipleComponentProperties() throws JsonProcessingException, InterruptedException {
+    public void testSourcePollingUsingMultipleComponentProperties() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
@@ -330,7 +326,8 @@
         }
         assertTrue(containsHeader);
 
-        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
+        assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+            .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
 
         camelSourceTask.stop();
     }
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
index aeb2da1..d1eef84 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
@@ -112,7 +112,7 @@
     public void execute() throws MojoExecutionException, MojoFailureException {
         configureResourceManager();
         if (!project.getArtifactId().equals(connectorsProjectName)) {
-            getLog().debug("Skipping porject " + project.getArtifactId() + " since it is not " + connectorsProjectName + " can be configured with <connectors-project-name> option.");
+            getLog().debug("Skipping project " + project.getArtifactId() + " since it is not " + connectorsProjectName + " can be configured with <connectors-project-name> option.");
             return;
         }
         try {