IGNITE-13406 adding support for camel 3.x

This commit breaks compatibility with 2.x - Fixes #21.

Signed-off-by: samaitra <saikat.maitra@gmail.com>
diff --git a/modules/camel-ext/pom.xml b/modules/camel-ext/pom.xml
index c2f89b1..9f47d19 100644
--- a/modules/camel-ext/pom.xml
+++ b/modules/camel-ext/pom.xml
@@ -95,9 +95,9 @@
         </dependency>
 
         <dependency>
-            <groupId>com.squareup.okhttp</groupId>
-            <artifactId>okhttp</artifactId>
-            <version>${okhttp.version}</version>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-http</artifactId>
+            <version>${camel.version}</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
index 369d2a6..51f3ba4 100644
--- a/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
+++ b/modules/camel-ext/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.stream.camel;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
@@ -26,8 +24,9 @@
 import org.apache.camel.Processor;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.service.BaseService;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.A;
@@ -36,6 +35,8 @@
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamSingleTupleExtractor;
 
+import java.util.Map;
+
 /**
  * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer.
  *
@@ -84,9 +85,12 @@
         if (camelCtx == null)
             camelCtx = new DefaultCamelContext();
 
-        // If the Camel Context is starting or started, reject this call to start.
-        if (camelCtx.getStatus() == ServiceStatus.Started || camelCtx.getStatus() == ServiceStatus.Starting)
-            throw new IgniteException("Failed to start Camel streamer (CamelContext already started or starting).");
+        // If the camel context is not started then simply start it up
+        if (!camelCtx.isStarted())
+            camelCtx.start();
+
+        if (!camelCtx.isRunAllowed())
+            throw new IgniteException("Failed to start Camel streamer (CamelContext not in a runnable state).");
 
         log = getIgnite().log();
 
@@ -97,6 +101,7 @@
         catch (Exception e) {
             U.error(log, e);
 
+            camelCtx.stop();
             throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
         }
 
@@ -107,16 +112,18 @@
         catch (Exception e) {
             U.error(log, e);
 
+            camelCtx.stop();
             throw new IgniteException("Failed to start Camel streamer [errMsg=" + e.getMessage() + ']');
         }
 
         // Start the Camel services.
         try {
-            ServiceHelper.startServices(camelCtx, endpoint, consumer);
+            ServiceHelper.startService(camelCtx, endpoint, consumer);
         }
         catch (Exception e) {
             U.error(log, e);
 
+            camelCtx.stop();
             try {
                 ServiceHelper.stopAndShutdownServices(camelCtx, endpoint, consumer);
 
diff --git a/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
index 9858357..329f3ed 100644
--- a/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
+++ b/modules/camel-ext/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -17,24 +17,8 @@
 
 package org.apache.ignite.stream.camel;
 
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
-import com.squareup.okhttp.MediaType;
-import com.squareup.okhttp.OkHttpClient;
-import com.squareup.okhttp.Request;
-import com.squareup.okhttp.RequestBody;
-import com.squareup.okhttp.Response;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -42,6 +26,7 @@
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.component.properties.PropertiesComponent;
 import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.engine.DefaultProducerTemplate;
 import org.apache.camel.support.LifecycleStrategySupport;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
@@ -60,18 +45,29 @@
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
 
 /**
  * Test class for {@link CamelStreamer}.
  */
 public class IgniteCamelStreamerTest extends GridCommonAbstractTest {
-    /** text/plain media type. */
-    private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8");
-
     /** The test data. */
     private static final Map<Integer, String> TEST_DATA = new HashMap<>();
 
+    /** The producer template used to publish data to the endpoint. */
+    private DefaultProducerTemplate producerTemplate;
+
     /** The Camel streamer currently under test. */
     private CamelStreamer<Integer, String> streamer;
 
@@ -84,9 +80,6 @@
     /** The UUID of the currently active remote listener. */
     private UUID remoteLsnr;
 
-    /** The OkHttpClient. */
-    private OkHttpClient httpClient = new OkHttpClient();
-
     // Initialize the test data.
     static {
         for (int i = 0; i < 100; i++)
@@ -114,9 +107,13 @@
             url = "http://localhost:" + port + "/ignite";
         }
 
+        DefaultCamelContext context = new DefaultCamelContext();
         // create Camel streamer
         dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME);
         streamer = createCamelStreamer(dataStreamer);
+        streamer.setCamelContext(context);
+        producerTemplate = new DefaultProducerTemplate(context);
+        producerTemplate.start();
     }
 
     @Override public void afterTest() throws Exception {
@@ -208,10 +205,10 @@
      */
     @Test
     public void testUserSpecifiedCamelContext() throws Exception {
-        final AtomicInteger cnt = new AtomicInteger();
 
-        // Create a CamelContext with a probe that'll help us know if it has been used.
-        CamelContext context = new DefaultCamelContext();
+        CamelContext context = streamer.getCamelContext();
+
+        final AtomicInteger cnt = new AtomicInteger();
         context.setTracing(true);
         context.addLifecycleStrategy(new LifecycleStrategySupport() {
             @Override public void onEndpointAdd(Endpoint endpoint) {
@@ -220,7 +217,6 @@
         });
 
         streamer.setSingleTupleExtractor(singleTupleExtractor());
-        streamer.setCamelContext(context);
 
         // Subscribe to cache PUT events.
         CountDownLatch latch = subscribeToPutEvents(50);
@@ -232,7 +228,7 @@
         sendMessages(0, 50, false);
 
         // Assertions.
-        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertTrue(latch.await(100, TimeUnit.SECONDS));
         assertCacheEntriesLoaded(50);
         assertTrue(cnt.get() > 0);
     }
@@ -247,7 +243,7 @@
 
         PropertiesComponent pc = new PropertiesComponent("camel.test.properties");
 
-        context.addComponent("properties", pc);
+        context.setPropertiesComponent(pc);
 
         // Replace the context path in the test URL with the property placeholder.
         url = url.replaceAll("/ignite", "{{test.contextPath}}");
@@ -258,6 +254,9 @@
         streamer.setSingleTupleExtractor(singleTupleExtractor());
         streamer.setCamelContext(context);
 
+        producerTemplate = new DefaultProducerTemplate(context);
+        producerTemplate.start();
+
         // Subscribe to cache PUT events.
         CountDownLatch latch = subscribeToPutEvents(50);
 
@@ -290,8 +289,11 @@
             fail("Streamer started; should have failed.");
         }
         catch (IgniteException ignored) {
-            assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped);
-            assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0);
+            CamelContext context = streamer.getCamelContext();
+            ServiceStatus status = context.getStatus();
+            assertEquals("streamer should be in a stopped state, instead is " + status, status, ServiceStatus.Stopped);
+            assertEquals("endpoint registry has > 0 elements, size: " + context.getEndpointRegistry().size(),
+                context.getEndpointRegistry().size(), 0);
         }
     }
 
@@ -312,10 +314,10 @@
     }
 
     /**
-     * @throws IOException
+     * @throws Exception
      * @return HTTP response payloads.
      */
-    private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException {
+    private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws Exception {
         List<String> responses = Lists.newArrayList();
 
         if (singleMessage) {
@@ -324,27 +326,23 @@
             for (int i = fromIdx; i < fromIdx + cnt; i++)
                 sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n");
 
-            Request request = new Request.Builder()
-                .url(url)
-                .post(RequestBody.create(TEXT_PLAIN, sb.toString()))
-                .build();
+            Exchange out = producerTemplate.send(url, exchange -> exchange.getIn().setBody(sb));
 
-            Response response = httpClient.newCall(request).execute();
+            if (out.getException() != null)
+                throw out.getException();
 
-            responses.add(response.body().string());
+            responses.add(out.getMessage().toString());
         }
         else {
             for (int i = fromIdx; i < fromIdx + cnt; i++) {
                 String payload = i + "," + TEST_DATA.get(i);
 
-                Request request = new Request.Builder()
-                    .url(url)
-                    .post(RequestBody.create(TEXT_PLAIN, payload))
-                    .build();
+                Exchange out = producerTemplate.send(url, exchange -> exchange.getIn().setBody(payload));
 
-                Response response = httpClient.newCall(request).execute();
+                if (out.getException() != null)
+                    throw out.getException();
 
-                responses.add(response.body().string());
+                responses.add(out.getMessage().getBody(String.class));
             }
         }
 
diff --git a/parent/pom.xml b/parent/pom.xml
index 9610e2a..6444b27 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -55,7 +55,7 @@
         <aspectj.version>1.8.13</aspectj.version>
         <aws.sdk.bundle.version>1.10.12_1</aws.sdk.bundle.version>
         <aws.sdk.version>1.11.75</aws.sdk.version>
-        <camel.version>2.22.0</camel.version>
+        <camel.version>3.4.3</camel.version>
         <aws.encryption.sdk.version>1.3.2</aws.encryption.sdk.version>
         <commons.beanutils.bundle.version>1.9.2_1</commons.beanutils.bundle.version>
         <commons.beanutils.version>1.9.3</commons.beanutils.version>