| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.stream.zeromq; |
| |
| import java.io.UnsupportedEncodingException; |
| import java.util.concurrent.CountDownLatch; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.CacheEvent; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.jetbrains.annotations.NotNull; |
| import org.junit.Test; |
| import org.zeromq.ZMQ; |
| |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; |
| |
| /** |
| * Tests {@link IgniteZeroMqStreamer}. |
| */ |
| public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest { |
| /** Cache entries count. */ |
| private static final int CACHE_ENTRY_COUNT = 1000; |
| |
| /** Local address for 0mq. */ |
| private final String ADDR = "tcp://localhost:5671"; |
| |
| /** Topic name for PUB-SUB. */ |
| private final byte[] TOPIC = "0mq".getBytes(); |
| |
| /** If pub-sub envelopes are used. */ |
| private static boolean multipart_pubsub; |
| |
| /** Constructor. */ |
| public IgniteZeroMqStreamerTest() { |
| super(true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| return super.getConfiguration(igniteInstanceName).setIncludeEventTypes(EventType.EVTS_ALL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return 10_000; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void beforeTestsStarted() throws Exception { |
| grid().getOrCreateCache(defaultCacheConfiguration()); |
| } |
| |
| /** |
| * @throws Exception Test exception. |
| */ |
| @Test |
| public void testZeroMqPairSocket() throws Exception { |
| try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { |
| try (IgniteZeroMqStreamer streamer = newStreamerInstance( |
| dataStreamer, 1, ZeroMqTypeSocket.PAIR, ADDR, null);) { |
| executeStreamer(streamer, ZMQ.PAIR, null); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception Test exception. |
| */ |
| @Test |
| public void testZeroMqSubSocketMultipart() throws Exception { |
| try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { |
| try (IgniteZeroMqStreamer streamer = newStreamerInstance( |
| dataStreamer, 3, ZeroMqTypeSocket.SUB, ADDR, TOPIC);) { |
| multipart_pubsub = true; |
| executeStreamer(streamer, ZMQ.PUB, TOPIC); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception Test exception. |
| */ |
| @Test |
| public void testZeroMqSubSocket() throws Exception { |
| try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { |
| try (IgniteZeroMqStreamer streamer = newStreamerInstance( |
| dataStreamer, 3, ZeroMqTypeSocket.SUB, ADDR, TOPIC);) { |
| executeStreamer(streamer, ZMQ.PUB, TOPIC); |
| } |
| } |
| } |
| |
| /** |
| * @throws Exception Test exception. |
| */ |
| @Test |
| public void testZeroMqPullSocket() throws Exception { |
| try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) { |
| try (IgniteZeroMqStreamer streamer = newStreamerInstance( |
| dataStreamer, 4, ZeroMqTypeSocket.PULL, ADDR, null);) { |
| executeStreamer(streamer, ZMQ.PUSH, null); |
| } |
| } |
| } |
| |
| /** |
| * Execute ZeroMQ streamer and checking cache content after streaming finished. |
| * Set singleTupleExtractor via {@link ZeroMqStringSingleTupleExtractor} in streamer. |
| * |
| * @param streamer ZeroMQ streamer. |
| * @param clientSocket ZeroMQ socket type. |
| * @param topic Topic name for PUB-SUB. |
| * @throws Exception Test exception. |
| */ |
| private void executeStreamer(IgniteZeroMqStreamer streamer, int clientSocket, |
| byte[] topic) throws Exception { |
| streamer.setSingleTupleExtractor(new ZeroMqStringSingleTupleExtractor()); |
| |
| IgniteCache<Integer, String> cache = grid().cache(DEFAULT_CACHE_NAME); |
| |
| CacheListener listener = subscribeToPutEvents(); |
| |
| streamer.start(); |
| |
| assertEquals(0, cache.size(CachePeekMode.PRIMARY)); |
| |
| startZeroMqClient(clientSocket, topic); |
| |
| CountDownLatch latch = listener.getLatch(); |
| |
| latch.await(); |
| |
| unsubscribeToPutEvents(listener); |
| |
| // Last element. |
| int testId = CACHE_ENTRY_COUNT - 1; |
| |
| String cachedValue = cache.get(testId); |
| |
| // ZeroMQ message successfully put to cache. |
| assertTrue(cachedValue != null && cachedValue.endsWith(String.valueOf(testId))); |
| |
| assertTrue(cache.size() == CACHE_ENTRY_COUNT); |
| |
| cache.clear(); |
| } |
| |
| /** |
| * @param dataStreamer Ignite Data Streamer. |
| * @return ZeroMQ Streamer. |
| */ |
| private IgniteZeroMqStreamer newStreamerInstance(IgniteDataStreamer<Integer, String> dataStreamer, |
| int ioThreads, ZeroMqTypeSocket socketType, @NotNull String addr, byte[] topic) { |
| IgniteZeroMqStreamer streamer = new IgniteZeroMqStreamer(ioThreads, socketType, addr, topic); |
| |
| streamer.setIgnite(grid()); |
| streamer.setStreamer(dataStreamer); |
| |
| dataStreamer.allowOverwrite(true); |
| dataStreamer.autoFlushFrequency(1); |
| |
| return streamer; |
| } |
| |
| /** |
| * Starts ZeroMQ client for testing. |
| * |
| * @param clientSocket ZeroMQ socket type. |
| * @param topic Topic name for PUB-SUB. |
| */ |
| private void startZeroMqClient(int clientSocket, |
| byte[] topic) throws InterruptedException, UnsupportedEncodingException { |
| try (ZMQ.Context context = ZMQ.context(1); |
| ZMQ.Socket socket = context.socket(clientSocket)) { |
| |
| socket.bind(ADDR); |
| |
| if (ZMQ.PUB == clientSocket) |
| Thread.sleep(500); |
| |
| for (int i = 0; i < CACHE_ENTRY_COUNT; i++) { |
| if (ZMQ.PUB == clientSocket) |
| socket.sendMore(topic); |
| |
| if (ZMQ.PUB == clientSocket && multipart_pubsub) |
| socket.send((topic + " " + String.valueOf(i)).getBytes("UTF-8")); |
| else |
| socket.send(String.valueOf(i).getBytes("UTF-8")); |
| } |
| } |
| } |
| |
| /** |
| * @return Cache listener. |
| */ |
| private CacheListener subscribeToPutEvents() { |
| Ignite ignite = grid(); |
| |
| // Listen to cache PUT events and expect as many as messages as test data items. |
| CacheListener listener = new CacheListener(); |
| |
| ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).localListen(listener, EVT_CACHE_OBJECT_PUT); |
| |
| return listener; |
| } |
| |
| /** |
| * @param listener Cache listener. |
| */ |
| private void unsubscribeToPutEvents(CacheListener listener) { |
| Ignite ignite = grid(); |
| |
| ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).stopLocalListen(listener, EVT_CACHE_OBJECT_PUT); |
| } |
| |
| /** |
| * Listener. |
| */ |
| private class CacheListener implements IgnitePredicate<CacheEvent> { |
| |
| /** */ |
| private final CountDownLatch latch = new CountDownLatch(CACHE_ENTRY_COUNT); |
| |
| /** |
| * @return Latch. |
| */ |
| public CountDownLatch getLatch() { |
| return latch; |
| } |
| |
| /** |
| * @param evt Cache Event. |
| * @return {@code true}. |
| */ |
| @Override |
| public boolean apply(CacheEvent evt) { |
| latch.countDown(); |
| |
| return true; |
| } |
| } |
| } |