IGNITE-13406 adding support for camel 3.x
This commit breaks compatibility with 2.x - Fixes #21.
Signed-off-by: samaitra <saikat.maitra@gmail.com>
diff --git a/modules/camel-ext/pom.xml b/modules/camel-ext/pom.xml
index c2f89b1..9f47d19 100644
--- a/modules/camel-ext/pom.xml
+++ b/modules/camel-ext/pom.xml
@@ -95,9 +95,9 @@
</dependency>
<dependency>
- <groupId>com.squareup.okhttp</groupId>
- <artifactId>okhttp</artifactId>
- <version>${okhttp.version}</version>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-http</artifactId>
+ <version>${camel.version}</version>
<scope>test</scope>
</dependency>
diff --git a/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
index 369d2a6..51f3ba4 100644
--- a/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
+++ b/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -17,8 +17,6 @@
package org.apache.ignite.stream.camel;
-import java.util.Map;
-
import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
@@ -26,8 +24,9 @@
import org.apache.camel.Processor;
import org.apache.camel.ServiceStatus;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.service.BaseService;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.util.typedef.internal.A;
@@ -36,6 +35,8 @@
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import java.util.Map;
+
/**
* This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
*
@@ -84,9 +85,12 @@
if (camelCtx == null)
camelCtx = new DefaultCamelContext();
- // If the Camel Context is starting or started, reject this call to start.
- if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
- throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting).");
+ // If the camel context is not started then simply start it up
+ if (!camelCtx.isStarted())
+ camelCtx.start();
+
+ if (!camelCtx.isRunAllowed())
+ throw new IgniteException("Failed to start Camel streamer (CamelContext not in a runnable state).");
log = getIgnite().log();
@@ -97,6 +101,7 @@
catch (Exception e) {
U.error(log, e);
+ camelCtx.stop();
throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
@@ -107,16 +112,18 @@
catch (Exception e) {
U.error(log, e);
+ camelCtx.stop();
throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
}
// Start the Camel services.
try {
- ServiceHelper.startServices(camelCtx, endpoint, consumer);
+ ServiceHelper.startService(camelCtx, endpoint, consumer);
}
catch (Exception e) {
U.error(log, e);
+ camelCtx.stop();
try {
ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
diff --git a/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
index 9858357..329f3ed 100644
--- a/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
+++ b/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -17,24 +17,8 @@
package org.apache.ignite.stream.camel;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -42,6 +26,7 @@
import org.apache.camel.ServiceStatus;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.engine.DefaultProducerTemplate;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -60,18 +45,29 @@
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
/**
* Test class for {@link CamelStreamer}.
*/
public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
- /** text/plain media type. */
- private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
-
/** The test data. */
private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+ /** The producer template used to publish data to the endpoint. */
+ private DefaultProducerTemplate producerTemplate;
+
/** The Camel streamer currently under test. */
private CamelStreamer<Integer, String> streamer;
@@ -84,9 +80,6 @@
/** The UUID of the currently active remote listener. */
private UUID remoteLsnr;
- /** The OkHttpClient. */
- private OkHttpClient httpClient = new OkHttpClient();
-
// Initialize the test data.
static {
for (int i = 0; i < 100; i++)
@@ -114,9 +107,13 @@
url = "http://localhost:" + port + "/ignite";
}
+ DefaultCamelContext context = new DefaultCamelContext();
// create Camel streamer
dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME);
streamer = createCamelStreamer(dataStreamer);
+ streamer.setCamelContext(context);
+ producerTemplate = new DefaultProducerTemplate(context);
+ producerTemplate.start();
}
@Override public void afterTest() throws Exception {
@@ -208,10 +205,10 @@
*/
@Test
public void testUserSpecifiedCamelContext() throws Exception {
- final AtomicInteger cnt = new AtomicInteger();
- // Create a CamelContext with a probe that'll help us know if it has been used.
- CamelContext context = new DefaultCamelContext();
+ CamelContext context = streamer.getCamelContext();
+
+ final AtomicInteger cnt = new AtomicInteger();
context.setTracing(true);
context.addLifecycleStrategy(new LifecycleStrategySupport() {
@Override public void onEndpointAdd(Endpoint endpoint) {
@@ -220,7 +217,6 @@
});
streamer.setSingleTupleExtractor(singleTupleExtractor());
- streamer.setCamelContext(context);
// Subscribe to cache PUT events.
CountDownLatch latch = subscribeToPutEvents(50);
@@ -232,7 +228,7 @@
sendMessages(0, 50, false);
// Assertions.
- assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertTrue(latch.await(100, TimeUnit.SECONDS));
assertCacheEntriesLoaded(50);
assertTrue(cnt.get() > 0);
}
@@ -247,7 +243,7 @@
PropertiesComponent pc = new PropertiesComponent("camel.test.properties");
- context.addComponent("properties", pc);
+ context.setPropertiesComponent(pc);
// Replace the context path in the test URL with the property placeholder.
url = url.replaceAll("/ignite", "{{test.contextPath}}");
@@ -258,6 +254,9 @@
streamer.setSingleTupleExtractor(singleTupleExtractor());
streamer.setCamelContext(context);
+ producerTemplate = new DefaultProducerTemplate(context);
+ producerTemplate.start();
+
// Subscribe to cache PUT events.
CountDownLatch latch = subscribeToPutEvents(50);
@@ -290,8 +289,11 @@
fail("Streamer started; should have failed.");
}
catch (IgniteException ignored) {
- assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped);
- assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0);
+ CamelContext context = streamer.getCamelContext();
+ ServiceStatus status = context.getStatus();
+ assertEquals("streamer should be in a stopped state, instead is " + status, status, ServiceStatus.Stopped);
+ assertEquals("endpoint registry has > 0 elements, size: " + context.getEndpointRegistry().size(),
+ context.getEndpointRegistry().size(), 0);
}
}
@@ -312,10 +314,10 @@
}
/**
- * @throws IOException
+ * @throws Exception
* @return HTTP response payloads.
*/
- private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException {
+ private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws Exception {
List<String> responses = Lists.newArrayList();
if (singleMessage) {
@@ -324,27 +326,23 @@
for (int i = fromIdx; i < fromIdx + cnt; i++)
sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n");
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(TEXT_PLAIN, sb.toString()))
- .build();
+ Exchange out = producerTemplate.send(url, exchange -> exchange.getIn().setBody(sb));
- Response response = httpClient.newCall(request).execute();
+ if (out.getException() != null)
+ throw out.getException();
- responses.add(response.body().string());
+ responses.add(out.getMessage().toString());
}
else {
for (int i = fromIdx; i < fromIdx + cnt; i++) {
String payload = i + "," + TEST_DATA.get(i);
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(TEXT_PLAIN, payload))
- .build();
+ Exchange out = producerTemplate.send(url, exchange -> exchange.getIn().setBody(payload));
- Response response = httpClient.newCall(request).execute();
+ if (out.getException() != null)
+ throw out.getException();
- responses.add(response.body().string());
+ responses.add(out.getMessage().getBody(String.class));
}
}
diff --git a/parent/pom.xml b/parent/pom.xml
index 9610e2a..6444b27 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -55,7 +55,7 @@
<aspectj.version>1.8.13</aspectj.version>
<aws.sdk.bundle.version>1.10.12_1</aws.sdk.bundle.version>
<aws.sdk.version>1.11.75</aws.sdk.version>
- <camel.version>2.22.0</camel.version>
+ <camel.version>3.4.3</camel.version>
<aws.encryption.sdk.version>1.3.2</aws.encryption.sdk.version>
<commons.beanutils.bundle.version>1.9.2_1</commons.beanutils.bundle.version>
<commons.beanutils.version>1.9.3</commons.beanutils.version>