| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.stream.camel; |
| |
| import java.io.IOException; |
| import java.net.ServerSocket; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import com.google.common.base.Splitter; |
| import com.google.common.collect.Lists; |
| import com.squareup.okhttp.MediaType; |
| import com.squareup.okhttp.OkHttpClient; |
| import com.squareup.okhttp.Request; |
| import com.squareup.okhttp.RequestBody; |
| import com.squareup.okhttp.Response; |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Processor; |
| import org.apache.camel.ServiceStatus; |
| import org.apache.camel.component.properties.PropertiesComponent; |
| import org.apache.camel.impl.DefaultCamelContext; |
| import org.apache.camel.support.LifecycleStrategySupport; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.CacheEvent; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.util.lang.GridMapEntry; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.stream.StreamMultipleTupleExtractor; |
| import org.apache.ignite.stream.StreamSingleTupleExtractor; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; |
| |
| /** |
| * Test class for {@link CamelStreamer}. |
| */ |
| public class IgniteCamelStreamerTest extends GridCommonAbstractTest { |
| /** text/plain media type. */ |
| private static final MediaType TEXT_PLAIN = MediaType.parse("text/plain;charset=utf-8"); |
| |
| /** The test data. */ |
| private static final Map<Integer, String> TEST_DATA = new HashMap<>(); |
| |
| /** The Camel streamer currently under test. */ |
| private CamelStreamer<Integer, String> streamer; |
| |
| /** The Ignite data streamer. */ |
| private IgniteDataStreamer<Integer, String> dataStreamer; |
| |
| /** URL where the REST service will be exposed. */ |
| private String url; |
| |
| /** The UUID of the currently active remote listener. */ |
| private UUID remoteLsnr; |
| |
| /** The OkHttpClient. */ |
| private OkHttpClient httpClient = new OkHttpClient(); |
| |
| // Initialize the test data. |
| static { |
| for (int i = 0; i < 100; i++) |
| TEST_DATA.put(i, "v" + i); |
| } |
| |
| /** Constructor. */ |
| public IgniteCamelStreamerTest() { |
| super(true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override public void beforeTest() throws Exception { |
| grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration()); |
| |
| // find an available local port |
| try (ServerSocket ss = new ServerSocket(0)) { |
| int port = ss.getLocalPort(); |
| |
| url = "http://localhost:" + port + "/ignite"; |
| } |
| |
| // create Camel streamer |
| dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME); |
| streamer = createCamelStreamer(dataStreamer); |
| } |
| |
| @Override public void afterTest() throws Exception { |
| try { |
| streamer.stop(); |
| } |
| catch (Exception ignored) { |
| // ignore if already stopped |
| } |
| |
| dataStreamer.close(); |
| |
| grid().cache(DEFAULT_CACHE_NAME).clear(); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testSendOneEntryPerMessage() throws Exception { |
| streamer.setSingleTupleExtractor(singleTupleExtractor()); |
| |
| // Subscribe to cache PUT events. |
| CountDownLatch latch = subscribeToPutEvents(50); |
| |
| // Action time. |
| streamer.start(); |
| |
| // Send messages. |
| sendMessages(0, 50, false); |
| |
| // Assertions. |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| assertCacheEntriesLoaded(50); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testMultipleEntriesInOneMessage() throws Exception { |
| streamer.setMultipleTupleExtractor(multipleTupleExtractor()); |
| |
| // Subscribe to cache PUT events. |
| CountDownLatch latch = subscribeToPutEvents(50); |
| |
| // Action time. |
| streamer.start(); |
| |
| // Send messages. |
| sendMessages(0, 50, true); |
| |
| // Assertions. |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| assertCacheEntriesLoaded(50); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testResponseProcessorIsCalled() throws Exception { |
| streamer.setSingleTupleExtractor(singleTupleExtractor()); |
| streamer.setResponseProcessor(new Processor() { |
| @Override public void process(Exchange exchange) throws Exception { |
| exchange.getOut().setBody("Foo bar"); |
| } |
| }); |
| |
| // Subscribe to cache PUT events. |
| CountDownLatch latch = subscribeToPutEvents(50); |
| |
| // Action time. |
| streamer.start(); |
| |
| // Send messages. |
| List<String> responses = sendMessages(0, 50, false); |
| |
| for (String r : responses) |
| assertEquals("Foo bar", r); |
| |
| // Assertions. |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| assertCacheEntriesLoaded(50); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testUserSpecifiedCamelContext() throws Exception { |
| final AtomicInteger cnt = new AtomicInteger(); |
| |
| // Create a CamelContext with a probe that'll help us know if it has been used. |
| CamelContext context = new DefaultCamelContext(); |
| context.setTracing(true); |
| context.addLifecycleStrategy(new LifecycleStrategySupport() { |
| @Override public void onEndpointAdd(Endpoint endpoint) { |
| cnt.incrementAndGet(); |
| } |
| }); |
| |
| streamer.setSingleTupleExtractor(singleTupleExtractor()); |
| streamer.setCamelContext(context); |
| |
| // Subscribe to cache PUT events. |
| CountDownLatch latch = subscribeToPutEvents(50); |
| |
| // Action time. |
| streamer.start(); |
| |
| // Send messages. |
| sendMessages(0, 50, false); |
| |
| // Assertions. |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| assertCacheEntriesLoaded(50); |
| assertTrue(cnt.get() > 0); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception { |
| // Create a CamelContext with a custom property placeholder. |
| CamelContext context = new DefaultCamelContext(); |
| |
| PropertiesComponent pc = new PropertiesComponent("camel.test.properties"); |
| |
| context.addComponent("properties", pc); |
| |
| // Replace the context path in the test URL with the property placeholder. |
| url = url.replaceAll("/ignite", "{{test.contextPath}}"); |
| |
| // Recreate the Camel streamer with the new URL. |
| streamer = createCamelStreamer(dataStreamer); |
| |
| streamer.setSingleTupleExtractor(singleTupleExtractor()); |
| streamer.setCamelContext(context); |
| |
| // Subscribe to cache PUT events. |
| CountDownLatch latch = subscribeToPutEvents(50); |
| |
| // Action time. |
| streamer.start(); |
| |
| // Before sending the messages, get the actual URL after the property placeholder was resolved, |
| // stripping the jetty: prefix from it. |
| url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:", ""); |
| |
| // Send messages. |
| sendMessages(0, 50, false); |
| |
| // Assertions. |
| assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| assertCacheEntriesLoaded(50); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testInvalidEndpointUri() throws Exception { |
| streamer.setSingleTupleExtractor(singleTupleExtractor()); |
| streamer.setEndpointUri("abc"); |
| |
| // Action time. |
| try { |
| streamer.start(); |
| fail("Streamer started; should have failed."); |
| } |
| catch (IgniteException ignored) { |
| assertTrue(streamer.getCamelContext().getStatus() == ServiceStatus.Stopped); |
| assertTrue(streamer.getCamelContext().getEndpointRegistry().size() == 0); |
| } |
| } |
| |
| /** |
| * Creates a Camel streamer. |
| */ |
| private CamelStreamer<Integer, String> createCamelStreamer(IgniteDataStreamer<Integer, String> dataStreamer) { |
| CamelStreamer<Integer, String> streamer = new CamelStreamer<>(); |
| |
| streamer.setIgnite(grid()); |
| streamer.setStreamer(dataStreamer); |
| streamer.setEndpointUri("jetty:" + url); |
| |
| dataStreamer.allowOverwrite(true); |
| dataStreamer.autoFlushFrequency(1); |
| |
| return streamer; |
| } |
| |
| /** |
| * @throws IOException |
| * @return HTTP response payloads. |
| */ |
| private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException { |
| List<String> responses = Lists.newArrayList(); |
| |
| if (singleMessage) { |
| StringBuilder sb = new StringBuilder(); |
| |
| for (int i = fromIdx; i < fromIdx + cnt; i++) |
| sb.append(i).append(",").append(TEST_DATA.get(i)).append("\n"); |
| |
| Request request = new Request.Builder() |
| .url(url) |
| .post(RequestBody.create(TEXT_PLAIN, sb.toString())) |
| .build(); |
| |
| Response response = httpClient.newCall(request).execute(); |
| |
| responses.add(response.body().string()); |
| } |
| else { |
| for (int i = fromIdx; i < fromIdx + cnt; i++) { |
| String payload = i + "," + TEST_DATA.get(i); |
| |
| Request request = new Request.Builder() |
| .url(url) |
| .post(RequestBody.create(TEXT_PLAIN, payload)) |
| .build(); |
| |
| Response response = httpClient.newCall(request).execute(); |
| |
| responses.add(response.body().string()); |
| } |
| } |
| |
| return responses; |
| } |
| |
| /** |
| * Returns a {@link StreamSingleTupleExtractor} for testing. |
| */ |
| private static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor() { |
| return new StreamSingleTupleExtractor<Exchange, Integer, String>() { |
| @Override public Map.Entry<Integer, String> extract(Exchange exchange) { |
| List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class)); |
| |
| return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); |
| } |
| }; |
| } |
| |
| /** |
| * Returns a {@link StreamMultipleTupleExtractor} for testing. |
| */ |
| private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor() { |
| return new StreamMultipleTupleExtractor<Exchange, Integer, String>() { |
| @Override public Map<Integer, String> extract(Exchange exchange) { |
| final Map<String, String> map = Splitter.on("\n") |
| .omitEmptyStrings() |
| .withKeyValueSeparator(",") |
| .split(exchange.getIn().getBody(String.class)); |
| |
| final Map<Integer, String> answer = new HashMap<>(); |
| |
| F.forEach(map.keySet(), new IgniteInClosure<String>() { |
| @Override public void apply(String s) { |
| answer.put(Integer.parseInt(s), map.get(s)); |
| } |
| }); |
| |
| return answer; |
| } |
| }; |
| } |
| |
| /** |
| * Subscribe to cache put events. |
| */ |
| private CountDownLatch subscribeToPutEvents(int expect) { |
| Ignite ignite = grid(); |
| |
| // Listen to cache PUT events and expect as many as messages as test data items |
| final CountDownLatch latch = new CountDownLatch(expect); |
| @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = |
| new IgniteBiPredicate<UUID, CacheEvent>() { |
| @Override public boolean apply(UUID uuid, CacheEvent evt) { |
| latch.countDown(); |
| |
| return true; |
| } |
| }; |
| |
| remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)) |
| .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); |
| |
| return latch; |
| } |
| |
| /** |
| * Assert a given number of cache entries have been loaded. |
| */ |
| private void assertCacheEntriesLoaded(int cnt) { |
| // get the cache and check that the entries are present |
| IgniteCache<Integer, String> cache = grid().cache(DEFAULT_CACHE_NAME); |
| |
| // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache |
| for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt)) |
| assertEquals(TEST_DATA.get(key), cache.get(key)); |
| |
| // assert that the cache exactly the specified amount of elements |
| assertEquals(cnt, cache.size(CachePeekMode.ALL)); |
| |
| // remove the event listener |
| grid().events(grid().cluster().forCacheNodes(DEFAULT_CACHE_NAME)).stopRemoteListen(remoteLsnr); |
| } |
| |
| } |