blob: ec3e4ff0252e6f39f723dbc72db29bf2aef01dd5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.datastreamer;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Tests for {@code IgniteDataStreamerImpl}.
*/
public class DataStreamerImplSelfTest extends GridCommonAbstractTest {
/** Number of keys to load via data streamer. */
private static final int KEYS_COUNT = 1000;
/** Next nodes after MAX_CACHE_COUNT start without cache */
private static final int MAX_CACHE_COUNT = 4;
/** Started grid counter. */
private static int cnt;
/** No nodes filter. */
private static volatile boolean noNodesFilter;
/** Indicates whether we need to make the topology stale */
private static boolean needStaleTop = false;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCommunicationSpi(new StaleTopologyCommunicationSpi());
if (cnt < MAX_CACHE_COUNT)
cfg.setCacheConfiguration(cacheConfiguration());
cnt++;
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
public void testCloseWithCancellation() throws Exception {
cnt = 0;
startGrids(2);
Ignite g1 = grid(1);
List<IgniteFuture> futures = new ArrayList<>();
IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(DEFAULT_CACHE_NAME);
for (int i = 0; i < 100; i++)
futures.add(dataLdr.addData(i, i));
try {
dataLdr.close(true);
}
catch (CacheException e) {
// No-op.
}
for (IgniteFuture fut : futures)
assertTrue(fut.isDone());
}
/**
* @throws Exception If failed.
*/
@Test
public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
cnt = 0;
startGrids(5);
final CyclicBarrier barrier = new CyclicBarrier(2);
multithreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
U.awaitQuiet(barrier);
G.stopAll(true);
return null;
}
}, 1);
Ignite g4 = grid(4);
IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(DEFAULT_CACHE_NAME);
dataLdr.perNodeBufferSize(32);
for (int i = 0; i < 100000; i += 2) {
dataLdr.addData(i, i);
dataLdr.removeData(i + 1);
}
U.awaitQuiet(barrier);
info("Closing data streamer.");
try {
dataLdr.close(true);
}
catch (CacheException | IllegalStateException ignore) {
// This is ok to ignore this exception as test is racy by it's nature -
// grid is stopping in different thread.
}
}
/**
* Data streamer should correctly load entries from HashMap in case of grids with more than one node
* and with GridOptimizedMarshaller that requires serializable.
*
* @throws Exception If failed.
*/
@Test
public void testAddDataFromMap() throws Exception {
cnt = 0;
startGrids(2);
Ignite g0 = grid(0);
IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(DEFAULT_CACHE_NAME);
Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
for (int i = 0; i < KEYS_COUNT; i++)
map.put(i, String.valueOf(i));
dataLdr.addData(map);
dataLdr.close();
Random rnd = new Random();
IgniteCache<Integer, String> c = g0.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < KEYS_COUNT; i++) {
Integer k = rnd.nextInt(KEYS_COUNT);
String v = c.get(k);
assertEquals(k.toString(), v);
}
}
/**
* Test logging on {@code DataStreamer.addData()} method when cache have no data nodes
*
* @throws Exception If fail.
*/
@Test
public void testNoDataNodesOnClose() throws Exception {
boolean failed = false;
cnt = 0;
noNodesFilter = true;
try {
Ignite ignite = startGrid(1);
try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
streamer.addData(1, "1");
}
catch (CacheException ignored) {
failed = true;
}
}
finally {
noNodesFilter = false;
assertTrue(failed);
}
}
/**
* Test logging on {@code DataStreamer.addData()} method when cache have no data nodes
*
* @throws Exception If fail.
*/
@Test
public void testNoDataNodesOnFlush() throws Exception {
boolean failed = false;
cnt = 0;
noNodesFilter = true;
try {
Ignite ignite = startGrid(1);
IgniteFuture fut = null;
try (IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
streamer.perThreadBufferSize(1);
fut = streamer.addData(1, "1");
streamer.flush();
}
catch (IllegalStateException ignored) {
try {
fut.get();
fail("DataStreamer ignores failed streaming.");
}
catch (CacheServerNotFoundException ignored2) {
// No-op.
}
failed = true;
}
}
finally {
noNodesFilter = false;
assertTrue(failed);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testAllOperationFinishedBeforeFutureCompletion() throws Exception {
cnt = 0;
Ignite ignite = startGrids(MAX_CACHE_COUNT);
final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> ex = new AtomicReference<>();
Collection<Map.Entry> entries = new ArrayList<>(100);
for (int i = 0; i < 100; i++)
entries.add(new IgniteBiTuple<>(i, "" + i));
IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
ldr.addData(entries).listen(new IgniteInClosure<IgniteFuture<?>>() {
@Override public void apply(IgniteFuture<?> future) {
try {
future.get();
for (int i = 0; i < 100; i++)
assertEquals("" + i, cache.get(i));
}
catch (Throwable e) {
ex.set(e);
}
latch.countDown();
}
});
ldr.tryFlush();
assertTrue(latch.await(5, TimeUnit.SECONDS));
Throwable e = ex.get();
if(e != null) {
if(e instanceof Error)
throw (Error) e;
if(e instanceof RuntimeException)
throw (RuntimeException) e;
throw new RuntimeException(e);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemapOnTopologyChangeDuringUpdatePreparation() throws Exception {
cnt = 0;
Ignite ignite = startGrids(MAX_CACHE_COUNT);
final int threads = 8;
final int entries = threads * 10000;
final long timeout = 10000;
final CountDownLatch l1 = new CountDownLatch(threads);
final CountDownLatch l2 = new CountDownLatch(1);
final AtomicInteger cntr = new AtomicInteger();
final AtomicReference<Throwable> ex = new AtomicReference<>();
final IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME);
ldr.perThreadBufferSize(1);
final IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
try {
int i = cntr.getAndIncrement();
for (int j = 0; i < (entries >> 1); i += threads) {
ldr.addData(i, i);
if(j++ % 1000 == 0)
ldr.tryFlush();
}
l1.countDown();
assertTrue(l2.await(timeout, TimeUnit.MILLISECONDS));
for (int j = 0; i < entries; i += threads) {
ldr.addData(i, i);
if(j++ % 1000 == 0)
ldr.tryFlush();
}
}
catch (Throwable e) {
ex.compareAndSet(null, e);
}
}
}, threads, "loader");
assertTrue(l1.await(timeout, TimeUnit.MILLISECONDS));
stopGrid(MAX_CACHE_COUNT - 1);
l2.countDown();
fut.get(timeout);
ldr.close();
Throwable e = ex.get();
if(e != null) {
if(e instanceof Error)
throw (Error) e;
if(e instanceof RuntimeException)
throw (RuntimeException) e;
throw new RuntimeException(e);
}
IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME);
for(int i = 0; i < entries; i++)
assertEquals(i, cache.get(i));
}
/**
* Cluster topology mismatch shall result in DataStreamer retrying cache update with the latest topology and
* no error logged to the console.
*
* @throws Exception if failed
*/
@Test
public void testRetryWhenTopologyMismatch() throws Exception {
final int KEY = 1;
final String VAL = "1";
cnt = 0;
StringWriter logWriter = new StringWriter();
Appender logAppender = new WriterAppender(new SimpleLayout(), logWriter);
Logger.getRootLogger().addAppender(logAppender);
startGrids(MAX_CACHE_COUNT - 1); // cache-enabled nodes
try (Ignite ignite = startGrid(MAX_CACHE_COUNT);
IgniteDataStreamer<Integer, String> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) {
needStaleTop = true; // simulate stale topology for the next action
streamer.addData(KEY, VAL);
} finally {
needStaleTop = false;
logWriter.flush();
Logger.getRootLogger().removeAppender(logAppender);
logAppender.close();
}
assertFalse(logWriter.toString().contains("DataStreamer will retry data transfer at stable topology"));
}
/**
* @throws Exception If failed.
*/
@Test
public void testClientEventsNotCausingRemaps() throws Exception {
Ignite ignite = startGrids(2);
ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME);
((DataStreamerImpl)streamer).maxRemapCount(3);
streamer.addData(1, 1);
for (int topChanges = 0; topChanges < 30; topChanges++) {
IgniteEx node = startClientGrid(getConfiguration("flapping-client"));
streamer.addData(1, 1);
node.close();
streamer.addData(1, 1);
}
streamer.flush();
streamer.close();
}
/**
* @throws Exception If failed.
*/
@Test
public void testServerEventsCauseRemaps() throws Exception {
Ignite ignite = startGrids(2);
ignite.getOrCreateCache(DEFAULT_CACHE_NAME);
IgniteDataStreamer<Object, Object> streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME);
streamer.perThreadBufferSize(1);
((DataStreamerImpl)streamer).maxRemapCount(0);
streamer.addData(1, 1);
startGrid(2);
try {
streamer.addData(1, 1);
streamer.flush();
}
catch (IllegalStateException ex) {
assert ex.getMessage().contains("Data streamer has been closed");
return;
}
fail("Expected exception wasn't thrown");
}
/**
* @throws Exception If failed.
*/
@Test
public void testDataStreamerWaitsUntilDynamicCacheStartIsFinished() throws Exception {
final Ignite ignite0 = startGrids(2);
final Ignite ignite1 = grid(1);
final String cacheName = "testCache";
IgniteCache<Integer, Integer> cache = ignite0.getOrCreateCache(
new CacheConfiguration<Integer, Integer>().setName(cacheName));
try (IgniteDataStreamer<Integer, Integer> ldr = ignite1.dataStreamer(cacheName)) {
ldr.addData(0, 0);
}
assertEquals(Integer.valueOf(0), cache.get(0));
}
/**
* Gets cache configuration.
*
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration() {
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setCacheMode(PARTITIONED);
cacheCfg.setBackups(1);
cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
if (noNodesFilter)
cacheCfg.setNodeFilter(F.alwaysFalse());
return cacheCfg;
}
/**
* Simulate stale (not up-to-date) topology
*/
private static class StaleTopologyCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
// Send stale topology only in the first request to avoid indefinitely getting failures.
if (needStaleTop) {
if (msg instanceof GridIoMessage) {
GridIoMessage ioMsg = (GridIoMessage)msg;
Message appMsg = ioMsg.message();
if (appMsg != null && appMsg instanceof DataStreamerRequest) {
DataStreamerRequest req = (DataStreamerRequest)appMsg;
AffinityTopologyVersion validTop = req.topologyVersion();
// Simulate situation when a node did not receive the latest "node joined" topology update causing
// topology mismatch
AffinityTopologyVersion staleTop = new AffinityTopologyVersion(
validTop.topologyVersion() - 1,
validTop.minorTopologyVersion());
appMsg = new DataStreamerRequest(
req.requestId(),
req.responseTopicBytes(),
req.cacheName(),
req.updaterBytes(),
req.entries(),
req.ignoreDeploymentOwnership(),
req.skipStore(),
req.keepBinary(),
req.deploymentMode(),
req.sampleClassName(),
req.userVersion(),
req.participants(),
req.classLoaderId(),
req.forceLocalDeployment(),
staleTop,
-1);
msg = new GridIoMessage(
GridTestUtils.<Byte>getFieldValue(ioMsg, "plc"),
GridTestUtils.getFieldValue(ioMsg, "topic"),
GridTestUtils.<Integer>getFieldValue(ioMsg, "topicOrd"),
appMsg,
GridTestUtils.<Boolean>getFieldValue(ioMsg, "ordered"),
ioMsg.timeout(),
ioMsg.skipOnTimeout());
needStaleTop = false;
}
}
}
super.sendMessage(node, msg, ackC);
}
}
}