blob: 1ab67c496949652a66fe6dacb17dda8952b5c011 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.stream.pubsub;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
import static org.apache.ignite.stream.pubsub.MockPubSubServer.PROJECT;
import static org.apache.ignite.stream.pubsub.MockPubSubServer.TOPIC_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests for {@link PubSubStreamer}.
*/
public class PubSubStreamerSelfTest {
private static final Logger LOGGER = Logger.getLogger(PubSubStreamerSelfTest.class.getName());
/** Cache name. */
private static final String DEFAULT_CACHE_NAME = "testCache";
/** Ignite test configuration file. */
private static final String GRID_CONF_FILE = "config/example-ignite.xml";
/** Subscription Name. */
private static final String SUBSCRIPTION = "ignite_subscription";
/** Count. */
private static final int CNT = 100;
/** Messages per request. */
private static final int MESSAGES_PER_REQUEST = 10;
/** Topic message key prefix. */
private static final String KEY_PREFIX = "192.168.2.";
/** Topic message value URL. */
private static final String VALUE_URL = ",www.example.com,";
private static final String JSON_KEY = "key";
private static final String JSON_VALUE = "value";
private Ignite ignite;
private static MockPubSubServer mockPubSubServer = new MockPubSubServer();
@Before
public void beforeTest() throws InterruptedException {
this.ignite = Ignition.start(GRID_CONF_FILE);
IgniteCache<Integer,String> igniteCache = ignite.getOrCreateCache(defaultCacheConfiguration());
}
@After
public void afterTest() {
ignite.cache(DEFAULT_CACHE_NAME).clear();
Ignition.stop(true);
}
/**
* @return New cache configuration with modified defaults.
*/
public static CacheConfiguration<Integer,String> defaultCacheConfiguration() {
CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
cfg.setAtomicityMode(ATOMIC);
cfg.setWriteSynchronizationMode(FULL_SYNC);
return cfg;
}
/**
* Tests Pub/Sub streamer.
*
* @throws TimeoutException If timed out.
* @throws InterruptedException If interrupted.
*/
@Test
public void testPubSubStreamer() throws Exception {
Map<String, String> keyValMap = produceStream();
consumerStream(ProjectTopicName.of(MockPubSubServer.PROJECT, MockPubSubServer.TOPIC_NAME), keyValMap);
}
/**
* Consumes Pub/Sub stream via Ignite.
*
* @param topic Topic name.
* @param keyValMap Expected key value map.
* @throws TimeoutException If timed out.
* @throws InterruptedException If interrupted.
*/
private void consumerStream(ProjectTopicName topic, Map<String, String> keyValMap)
throws InterruptedException, IOException {
PubSubStreamer<String, String> pubSubStmr = null;
try (IgniteDataStreamer<String, String> stmr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
stmr.allowOverwrite(true);
stmr.autoFlushFrequency(MESSAGES_PER_REQUEST);
// Configure Pub/Sub streamer.
pubSubStmr = new PubSubStreamer<>();
// Get the cache.
IgniteCache<String, String> cache = ignite.cache(DEFAULT_CACHE_NAME);
// Set Ignite instance.
pubSubStmr.setIgnite(ignite);
// Set data streamer instance.
pubSubStmr.setStreamer(stmr);
// Set the topic.
pubSubStmr.setTopic(Arrays.asList(topic));
// Set subscription name
pubSubStmr.setSubscriptionName(ProjectSubscriptionName.format(PROJECT, SUBSCRIPTION));
// Set the number of threads.
pubSubStmr.setThreads(4);
// Set the SubScriber Stub settings.
pubSubStmr.setSubscriberStubSettings(mockPubSubServer.createSubscriberStub());
pubSubStmr.setSingleTupleExtractor(singleTupleExtractor());
// Start Pub/Sub streamer.
pubSubStmr.start();
final CountDownLatch latch = new CountDownLatch(CNT);
IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
@IgniteInstanceResource
private Ignite ig;
@LoggerResource
private IgniteLogger log;
/** {@inheritDoc} */
@Override public boolean apply(UUID uuid, CacheEvent evt) {
latch.countDown();
if (log.isInfoEnabled()) {
IgniteEx igEx = (IgniteEx)ig;
UUID nodeId = igEx.localNode().id();
log.info("Receive event=" + evt + ", nodeId=" + nodeId);
}
return true;
}
};
ignite.events(ignite.cluster().forCacheNodes(DEFAULT_CACHE_NAME)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
// Checks all events successfully processed in 10 seconds.
assertTrue("Failed to wait latch completion, still wait " + latch.getCount() + " events",
latch.await(10, TimeUnit.SECONDS));
for (Map.Entry<String, String> entry : keyValMap.entrySet())
assertEquals(entry.getValue(), cache.get(entry.getKey()));
}
finally {
if (pubSubStmr!= null)
pubSubStmr.stop();
}
}
/**
* Sends messages to Pub/Sub.
*
* @return Map of key value messages.
*/
private Map<String, String> produceStream() throws Exception {
List<Integer> subnet = new ArrayList<>();
for (int i = 1; i <= CNT; i++)
subnet.add(i);
Collections.shuffle(subnet);
List<PubsubMessage> messages = new ArrayList<>();
Map<String, String> keyValMap = new HashMap<>();
for (int evt = 0; evt < CNT; evt++) {
long runtime = System.currentTimeMillis();
String ip = KEY_PREFIX + subnet.get(evt);
String msg = runtime + VALUE_URL + ip;
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty(JSON_KEY, ip);
jsonObject.addProperty(JSON_VALUE, msg);
ByteString byteString = ByteString.copyFrom(jsonObject.toString(), Charset.defaultCharset());
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(byteString)
.build();
messages.add(pubsubMessage);
String messageId = mockPubSubServer.getPublisher(TOPIC_NAME).publish(pubsubMessage).get();
keyValMap.put(ip, msg);
}
return keyValMap;
}
/**
* @return {@link StreamSingleTupleExtractor} for testing.
*/
private static StreamSingleTupleExtractor<PubsubMessage, String, String> singleTupleExtractor() {
return new StreamSingleTupleExtractor<PubsubMessage, String, String>() {
@Override public Map.Entry<String, String> extract(PubsubMessage msg) {
String dataStr = msg.getData().toStringUtf8();
JsonElement jsonElement = new JsonParser().parse(dataStr).getAsJsonObject();
JsonObject jsonObject = jsonElement.getAsJsonObject();
return new GridMapEntry<>(jsonObject.get(JSON_KEY).getAsString(), jsonObject.get(JSON_VALUE).getAsString());
}
};
}
/**
* @return {@link StreamMultipleTupleExtractor} for testing.
*/
private static StreamMultipleTupleExtractor<PubsubMessage, String, String> multipleTupleExtractor() {
return new StreamMultipleTupleExtractor<PubsubMessage, String, String>() {
@Override public Map<String, String> extract(PubsubMessage msg) {
String dataStr = msg.getData().toStringUtf8();
JsonElement jsonElement = new JsonParser().parse(dataStr).getAsJsonObject();
JsonArray jsonArray = jsonElement.getAsJsonArray();
final Map<String, String> answer = new HashMap<>();
for(int i=0;i<jsonArray.size();i++) {
JsonObject jsonObject = jsonArray.get(i) .getAsJsonObject();
String key = jsonObject.get(JSON_KEY).getAsString();
String value = jsonObject.get(JSON_VALUE).getAsString();
answer.put(key,value);
}
return answer;
}
};
}
}