Code cleanup
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
index 7cc47f0..092e513 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnector.java
@@ -27,7 +27,7 @@
import org.slf4j.LoggerFactory;
public class CamelSinkConnector extends SinkConnector {
- private static Logger log = LoggerFactory.getLogger(CamelSinkConnector.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkConnector.class);
private Map<String, String> configProps;
@@ -38,7 +38,7 @@
@Override
public void start(Map<String, String> configProps) {
- log.info("Connector config keys: {}", String.join(", ", configProps.keySet()));
+ LOG.info("Connector config keys: {}", String.join(", ", configProps.keySet()));
this.configProps = configProps;
}
@@ -49,7 +49,7 @@
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- log.info("Setting task configurations for {} workers.", maxTasks);
+ LOG.info("Setting task configurations for {} workers.", maxTasks);
final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; ++i) {
configs.add(configProps);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index d59d342..e11e84d 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -38,18 +38,21 @@
public static final String CAMEL_SINK_URL_DOC = "The camel url to configure the destination. If this is set " + CAMEL_SINK_COMPONENT_CONF
+ " and all the properties starting with " + CamelSinkTask.getCamelSinkEndpointConfigPrefix() + ".<" + CAMEL_SINK_COMPONENT_CONF + " value> are ignored.";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
+ .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
+ .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+
public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
public CamelSinkConnectorConfig(Map<String, String> parsedConfig) {
- this(conf(), parsedConfig);
+ this(CONFIG_DEF, parsedConfig);
}
public static ConfigDef conf() {
- return new ConfigDef()
- .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC)
- .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
- .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC);
+ return CONFIG_DEF;
}
+
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 9785215..baa6665 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -19,11 +19,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
@@ -44,7 +42,7 @@
private static final String CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX = "camel.sink.endpoint.";
private static final String CAMEL_SINK_PATH_PROPERTIES_PREFIX = "camel.sink.path.";
- private static Logger log = LoggerFactory.getLogger(CamelSinkTask.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkTask.class);
private static final String LOCAL_URL = "direct:start";
private static final String HEADER_CAMEL_PREFIX = "CamelHeader";
@@ -62,7 +60,7 @@
@Override
public void start(Map<String, String> props) {
try {
- log.info("Starting CamelSinkTask connector task");
+ LOG.info("Starting CamelSinkTask connector task");
Map<String, String> actualProps = TaskHelper.mergeProperties(getDefaultConfig(), props);
config = getCamelSinkConnectorConfig(actualProps);
@@ -78,7 +76,7 @@
producer = cms.createProducerTemplate();
cms.start();
- log.info("CamelSinkTask connector task started");
+ LOG.info("CamelSinkTask connector task started");
} catch (Exception e) {
throw new ConnectException("Failed to create and start Camel context", e);
}
@@ -116,20 +114,20 @@
}
exchange.getMessage().setHeaders(headers);
exchange.getMessage().setBody(record.value());
- log.debug("Sending {} to {}", exchange, LOCAL_URL);
+ LOG.debug("Sending {} to {}", exchange, LOCAL_URL);
producer.send(LOCAL_URL, exchange);
}
}
@Override
public void stop() {
+ LOG.info("Stopping CamelSinkTask connector task");
try {
- log.info("Stopping CamelSinkTask connector task");
cms.stop();
} catch (Exception e) {
throw new ConnectException("Failed to stop Camel context", e);
} finally {
- log.info("CamelSinkTask connector task stopped");
+ LOG.info("CamelSinkTask connector task stopped");
}
}
@@ -157,7 +155,7 @@
map.put(singleHeader.key(), (Map<?, ?>)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
map.put(singleHeader.key(), (List<?>)singleHeader.value());
- }
+ }
}
private void addProperty(Exchange exchange, Header singleHeader) {
@@ -184,7 +182,7 @@
exchange.getProperties().put(singleHeader.key(), (Map<?, ?>)singleHeader.value());
} else if (schema.type().getName().equalsIgnoreCase(SchemaBuilder.array(Schema.STRING_SCHEMA).type().getName())) {
exchange.getProperties().put(singleHeader.key(), (List<?>)singleHeader.value());
- }
+ }
}
public CamelMainSupport getCms() {
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
index 930ae8b..3de49dd 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceConnectorConfig.java
@@ -67,25 +67,27 @@
public static final String CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC = "The name of a camel message header containing an unique key that can be used as a Kafka message key."
+ " If this is not specified, then the Kafka message will not have a key.";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
+ .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
+ .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
+ .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
+ .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
+ .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
+ .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
+ .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
+ .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
+ .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+
public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
public CamelSourceConnectorConfig(Map<String, String> parsedConfig) {
- this(conf(), parsedConfig);
+ this(CONFIG_DEF, parsedConfig);
}
public static ConfigDef conf() {
- return new ConfigDef()
- .define(CAMEL_SOURCE_URL_CONF, Type.STRING, CAMEL_SOURCE_URL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_URL_DOC)
- .define(CAMEL_SOURCE_UNMARSHAL_CONF, Type.STRING, CAMEL_SOURCE_UNMARSHAL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_UNMARSHAL_DOC)
- .define(TOPIC_CONF, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
- .define(CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, Type.LONG, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_DOC)
- .define(CAMEL_SOURCE_MAX_POLL_DURATION_CONF, Type.LONG, CAMEL_SOURCE_MAX_POLL_DURATION_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MAX_POLL_DURATION_DOC)
- .define(CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_DOC)
- .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF, Type.LONG, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_DOC)
- .define(CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF, Type.BOOLEAN, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_DOC)
- .define(CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, Type.STRING, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_MESSAGE_HEADER_KEY_DOC)
- .define(CAMEL_SOURCE_COMPONENT_CONF, Type.STRING, CAMEL_SOURCE_COMPONENT_DEFAULT, Importance.MEDIUM, CAMEL_SOURCE_COMPONENT_DOC);
+ return CONFIG_DEF;
}
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 5025296..83ea1b1 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -91,7 +90,7 @@
Endpoint endpoint = cms.getEndpoint(localUrl);
consumer = endpoint.createPollingConsumer();
consumer.start();
-
+
cms.start();
LOG.info("CamelSourceTask connector task started");
} catch (Exception e) {
@@ -143,7 +142,7 @@
}
if (records.isEmpty()) {
- return null;
+ return Collections.EMPTY_LIST;
} else {
return records;
}
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index ddfd1e2..91735f2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -43,7 +43,7 @@
public class CamelMainSupport {
public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
- private static Logger log = LoggerFactory.getLogger(CamelMainSupport.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class);
private Main camelMain;
private CamelContext camel;
@@ -83,7 +83,7 @@
Properties camelProperties = new OrderedProperties();
camelProperties.putAll(orderedProps);
- log.info("Setting initial properties in Camel context: [{}]", camelProperties);
+ LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
//creating the actual route
@@ -93,15 +93,15 @@
if (marshal != null && unmarshal != null) {
throw new UnsupportedOperationException("Uses of both marshal (i.e. " + marshal + ") and unmarshal (i.e. " + unmarshal + ") is not supported");
} else if (marshal != null) {
- log.info("Creating Camel route from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
+ LOG.info("Creating Camel route from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
camel.getRegistry().bind(marshal, lookupAndInstantiateDataformat(marshal));
rd.marshal().custom(marshal);
} else if (unmarshal != null) {
- log.info("Creating Camel route from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
+ LOG.info("Creating Camel route from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
camel.getRegistry().bind(unmarshal, lookupAndInstantiateDataformat(unmarshal));
rd.unmarshal().custom(unmarshal);
} else {
- log.info("Creating Camel route from({}).to({})", fromUrl, toUrl);
+ LOG.info("Creating Camel route from({}).to({})", fromUrl, toUrl);
}
rd.to(toUrl);
}
@@ -109,27 +109,27 @@
}
public void start() throws Exception {
- log.info("Starting CamelContext");
+ LOG.info("Starting CamelContext");
CamelContextStarter starter = new CamelContextStarter();
exService.execute(starter);
startFinishedSignal.await();
if (starter.hasException()) {
- log.info("CamelContext failed to start", starter.getException());
+ LOG.info("CamelContext failed to start", starter.getException());
throw starter.getException();
}
- log.info("CamelContext started");
+ LOG.info("CamelContext started");
}
public void stop() {
- log.info("Stopping CamelContext");
+ LOG.info("Stopping CamelContext");
camelMain.stop();
exService.shutdown();
- log.info("CamelContext stopped");
+ LOG.info("CamelContext stopped");
}
public ProducerTemplate createProducerTemplate() {
@@ -190,7 +190,7 @@
@Override
public void afterStart(BaseMainSupport main) {
- log.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
+ LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
startFinishedSignal.countDown();
}
@@ -217,10 +217,10 @@
try {
camelMain.run();
} catch (Exception e) {
- log.error("An exception has occurred before CamelContext startup has finished", e);
+ LOG.error("An exception has occurred before CamelContext startup has finished", e);
startException = e;
if (startFinishedSignal.getCount() > 0) {
- log.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
+ LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
startFinishedSignal.countDown();
}
}
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index 934b78d..89205ae 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -16,20 +16,15 @@
*/
package org.apache.camel.kafkaconnector;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.camel.ConsumerTemplate;
-import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
-import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
@@ -40,16 +35,18 @@
public class CamelSourceTaskTest {
+ private static final String TIMER_URI = "timer:kafkaconnector?period=10&fixedRate=true&delay=0";
+
@Test
public void testSourcePolling() throws InterruptedException {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(2, poll.size());
assertEquals("mytopic", poll.get(0).topic());
@@ -62,9 +59,9 @@
break;
}
}
- assertTrue(containsHeader);
camelSourceTask.stop();
+ assertTrue(containsHeader);
}
@Test
@@ -82,7 +79,7 @@
// first we test if we have a key in the message with body
template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234);
- Thread.sleep(100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -92,7 +89,7 @@
// second we test if we have no key under the header
template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234);
- Thread.sleep(100L);
+ Thread.sleep(11L);
poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -102,7 +99,7 @@
// third we test if we have the header but with null value
template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null);
- Thread.sleep(100L);
+ Thread.sleep(10L);
camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -126,7 +123,7 @@
// send first data
template.sendBody("direct:start", "testing kafka connect");
- Thread.sleep(100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -138,7 +135,7 @@
// send second data
template.sendBody("direct:start", true);
- Thread.sleep(100L);
+ Thread.sleep(11L);
poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -150,7 +147,7 @@
// second third data
template.sendBody("direct:start", 1234L);
- Thread.sleep(100L);
+ Thread.sleep(10L);
poll = camelSourceTask.poll();
assertEquals(1, poll.size());
@@ -162,7 +159,7 @@
// third with null data
template.sendBody("direct:start", null);
- Thread.sleep(100L);
+ Thread.sleep(10L);
poll = camelSourceTask.poll();
assertNull(poll.get(0).key());
assertNull(poll.get(0).keySchema());
@@ -175,50 +172,48 @@
@Test
public void testSourcePollingTimeout() throws InterruptedException {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
props.put("camel.source.maxPollDuration", "1");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(3000L);
+ long sleepTime = 30L;
+ Thread.sleep(sleepTime);
List<SourceRecord> poll;
int retries = 3;
do {
poll = camelSourceTask.poll();
if (poll == null) {
retries--;
-
if (retries == 0) {
fail("Exhausted the maximum retries and no record was returned");
}
-
- Thread.sleep(3000L);
+ Thread.sleep(sleepTime);
}
} while (poll == null && retries > 0);
assertTrue(poll.size() >= 1, "Received messages are: " + poll.size() + ", expected between 1 and 2.");
assertTrue(poll.size() <= 2, "Received messages are: " + poll.size() + ", expected between 1 and 2.");
-
camelSourceTask.stop();
}
@Test
public void testSourcePollingMaxRecordNumber() throws InterruptedException {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
props.put("camel.source.maxBatchPollSize", "1");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2000L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
- assertEquals(1, poll.size());
-
camelSourceTask.stop();
+
+ assertEquals(1, poll.size());
}
@Test
@@ -243,9 +238,9 @@
}
@Test
- public void testUrlPrecedenceOnComponentProperty() throws JsonProcessingException, InterruptedException {
+ public void testUrlPrecedenceOnComponentProperty() throws InterruptedException {
Map<String, String> props = new HashMap<>();
- props.put("camel.source.url", "timer:kafkaconnector");
+ props.put("camel.source.url", TIMER_URI);
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "shouldNotBeUsed");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "endpointProperty", "shouldNotBeUsed");
@@ -254,7 +249,7 @@
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
- Thread.sleep(2100L);
+ Thread.sleep(11L);
List<SourceRecord> poll = camelSourceTask.poll();
assertEquals(2, poll.size());
assertEquals("mytopic", poll.get(0).topic());
@@ -267,13 +262,13 @@
break;
}
}
- assertTrue(containsHeader);
-
camelSourceTask.stop();
+
+ assertTrue(containsHeader);
}
@Test
- public void testSourcePollingUsingComponentProperty() throws JsonProcessingException, InterruptedException {
+ public void testSourcePollingUsingComponentProperty() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
@@ -298,13 +293,14 @@
}
assertTrue(containsHeader);
- assertEquals(1, camelSourceTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
+ assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+ .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000")).count());
camelSourceTask.stop();
}
@Test
- public void testSourcePollingUsingMultipleComponentProperties() throws JsonProcessingException, InterruptedException {
+ public void testSourcePollingUsingMultipleComponentProperties() throws InterruptedException {
Map<String, String> props = new HashMap<>();
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
@@ -330,7 +326,8 @@
}
assertTrue(containsHeader);
- assertEquals(1, camelSourceTask.getCms().getEndpoints().stream().filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
+ assertEquals(1, camelSourceTask.getCms().getEndpoints().stream()
+ .filter(e -> e.getEndpointUri().equals("timer://kafkaconnector?period=1000&repeatCount=0")).count());
camelSourceTask.stop();
}
diff --git a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
index aeb2da1..d1eef84 100644
--- a/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
+++ b/tooling/camel-kafka-connector-generator-maven-plugin/src/main/java/org/apache/camel/kafkaconnector/maven/AbstractCamelKafkaConnectorMojo.java
@@ -112,7 +112,7 @@
public void execute() throws MojoExecutionException, MojoFailureException {
configureResourceManager();
if (!project.getArtifactId().equals(connectorsProjectName)) {
- getLog().debug("Skipping porject " + project.getArtifactId() + " since it is not " + connectorsProjectName + " can be configured with <connectors-project-name> option.");
+ getLog().debug("Skipping project " + project.getArtifactId() + " since it is not " + connectorsProjectName + " can be configured with <connectors-project-name> option.");
return;
}
try {