blob: 311b533ad98032cb15da2d856cfa2516e860526b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lifecycle.LifecycleBean;
import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
/**
* Test exchange manager warnings.
*/
public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbstractTest {
/** Long running operations timeout. */
private static final String LONG_OPERATIONS_DUMP_TIMEOUT = "1000";
/** Atomic cache name. */
private static final String CACHE_NAME = "TEST_CACHE";
/** Transactional cache name. */
private static final String TX_CACHE_NAME = "TX_TEST_CACHE";
/** Lifecycle bean. */
private LifecycleBean lifecycleBean;
/** */
private String oldLongOpsDumpTimeout;
/** */
private CustomTestLogger testLog;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
oldLongOpsDumpTimeout = System.getProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
if (oldLongOpsDumpTimeout != null)
System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, oldLongOpsDumpTimeout);
else
System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
if (testLog != null)
testLog.clearListeners();
testLog = null;
lifecycleBean = null;
stopAllGrids();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCommunicationSpi(new CustomTcpCommunicationSpi());
if (testLog != null)
cfg.setGridLogger(testLog);
cfg.setLifecycleBeans(lifecycleBean);
CacheConfiguration atomicCfg = new CacheConfiguration(CACHE_NAME)
.setBackups(1)
.setAtomicityMode(ATOMIC);
CacheConfiguration txCfg = new CacheConfiguration(TX_CACHE_NAME)
.setBackups(1)
.setAtomicityMode(TRANSACTIONAL);
cfg.setCacheConfiguration(atomicCfg, txCfg);
return cfg;
}
/**
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = LONG_OPERATIONS_DUMP_TIMEOUT)
public void testLongRunningCacheFutures() throws Exception {
long timeout = Long.parseLong(LONG_OPERATIONS_DUMP_TIMEOUT);
testLog = new CustomTestLogger(false, log, "future");
int longRunFuturesCnt = 1000;
startGrids(2);
Ignite client = startClientGrid(3);
try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME)) {
streamer.allowOverwrite(true);
for (int i = 0; i < longRunFuturesCnt; i++)
streamer.addData(i, i);
}
doSleep(timeout * 2);
stopAllGrids();
assertTrue("Warnings were not found", testLog.warningsTotal() > 0);
assertTrue("Too much warnings in the logs: " + testLog.warningsTotal(),
testLog.warningsTotal() < longRunFuturesCnt);
}
/**
* @throws Exception If failed.
*/
@Test
@WithSystemProperty(key = IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, value = LONG_OPERATIONS_DUMP_TIMEOUT)
public void testLongRunningTransactions() throws Exception {
long timeout = Long.parseLong(LONG_OPERATIONS_DUMP_TIMEOUT);
testLog = new CustomTestLogger(false, log, "transaction");
int transactions = 100;
ExecutorService excSvc = Executors.newFixedThreadPool(transactions);
try (Ignite srv1 = startGrid(0)) {
CountDownLatch txStarted = new CountDownLatch(transactions);
CountDownLatch stopTx = new CountDownLatch(1);
for (int i = 0; i < transactions; i++)
excSvc.submit(new AsyncTransaction(srv1, TX_CACHE_NAME, i, txStarted, stopTx));
if (!txStarted.await(10_000, TimeUnit.MILLISECONDS))
fail("Unable to start transactions");
doSleep(timeout * 2);
stopTx.countDown();
}
finally {
excSvc.shutdown();
if (!excSvc.awaitTermination(10_000, TimeUnit.MILLISECONDS))
fail("Unable to wait for thread pool termination.");
}
assertTrue("Warnings were not found", testLog.warningsTotal() > 0);
assertTrue("Too much warnings in the logs: " + testLog.warningsTotal(),
testLog.warningsTotal() < transactions);
}
/** */
@Test
public void testDumpLongRunningOperationsWaitForFullyInitializedExchangeManager() throws Exception {
long waitingTimeout = 5_000;
PrintStream errStream = System.err;
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch waitLatch = new CountDownLatch(1);
try {
// GridCachePartitionExchangeManager#dumpLongRunningOperations() uses diagnostic log,
// which can be non-initialized, and so, error messgaes are logged into standard error output stream.
ByteArrayOutputStream testOut = new ByteArrayOutputStream(16 * 1024);
System.setErr(new PrintStream(testOut));
AtomicReference<IgniteInternalFuture<?>> dumpOpsFut = new AtomicReference<>();
IgniteInternalFuture<Ignite> startFut = null;
// Lyficycle bean allows to register DatabaseLifecycleListener and trigger dumpLongRunningOperations
// before GridCachePartitionExchangeManager is started.
lifecycleBean = new LifecycleBean() {
/** Ignite instance. */
@IgniteInstanceResource
IgniteEx ignite;
/** {@inheritDoc} */
@Override public void onLifecycleEvent(LifecycleEventType evt) throws IgniteException {
if (evt == LifecycleEventType.BEFORE_NODE_START) {
ignite.context().internalSubscriptionProcessor()
.registerDatabaseListener(new DatabaseLifecycleListener() {
@Override public void onInitDataRegions(
IgniteCacheDatabaseSharedManager mgr
) throws IgniteCheckedException {
dumpOpsFut.set(
GridTestUtils.runAsync(
() -> ignite.context().cache().context().exchange().dumpLongRunningOperations(1_000)));
// Let's allow to check that dumpLongRunningOperations is triggered.
startLatch.countDown();
// Wait for the check
try {
if (!waitLatch.await(waitingTimeout * 3, TimeUnit.MILLISECONDS))
throw new IgniteCheckedException("Failed to wait for a check of dumpLongRunningOperations");
}
catch (InterruptedException e) {
throw new IgniteCheckedException(e);
}
}
});
}
}
};
startFut = GridTestUtils.runAsync(new Callable<Ignite>() {
@Override public Ignite call() throws Exception {
return startGrid(0);
}
});
assertTrue("Server node did not start in " + waitingTimeout + " ms.",
startLatch.await(waitingTimeout, TimeUnit.MILLISECONDS));
// Check that dumpLongRunningOperations did not produce any error.
if (GridTestUtils.waitForCondition(() -> dumpOpsFut.get().isDone(), waitingTimeout)) {
// Check that error output stream does not contain NullPointerException.
String output = testOut.toString();
assertTrue("Unexpected error [err=" + output + ']', output.isEmpty());
}
// Unblock starting the node.
waitLatch.countDown();
assertTrue(
"Dumping log running operations is not completed yet.",
GridTestUtils.waitForCondition(() -> dumpOpsFut.get().isDone(), waitingTimeout));
// Check that error output stream does not contain any error.
String output = testOut.toString();
assertTrue("Unexpected error [err=" + output + ']', output.isEmpty());
startFut.get(waitingTimeout, TimeUnit.MILLISECONDS);
}
finally {
startLatch.countDown();
waitLatch.countDown();
System.setErr(errStream);
}
}
/**
* Async tx runnable.
*/
private static class AsyncTransaction implements Runnable {
/** */
private final Ignite ignite;
/** */
private final String cacheName;
/** */
private final Integer key;
/** */
private final CountDownLatch startLatch;
/** */
private final CountDownLatch canStopLatch;
/**
* @param ignite Ignite instance.
* @param cacheName Cache name.
* @param key Cache key.
* @param startLatch Latch to synchronize all transactions.
* @param canStopLatch Latch to finish transaction.
*/
public AsyncTransaction(Ignite ignite, String cacheName, Integer key, CountDownLatch startLatch, CountDownLatch canStopLatch) {
this.ignite = ignite;
this.cacheName = cacheName;
this.key = key;
this.startLatch = startLatch;
this.canStopLatch = canStopLatch;
}
/**
* Start transaction.
*/
@Override public void run() {
IgniteTransactions transactions = ignite.transactions();
try (Transaction tx = transactions.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE)) {
IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
Integer val = cache.get(key);
startLatch.countDown();
cache.put(key, val == null ? 1 : val + 1);
try {
canStopLatch.await();
}
catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
tx.rollback();
}
}
}
/**
* Custom logger for counting warning messages.
*/
private static class CustomTestLogger extends ListeningTestLogger {
/** */
private final AtomicInteger warningsTotal = new AtomicInteger();
/** */
private final String substr;
/**
* @param dbg If set to {@code true}, enables debug and trace log messages processing.
* @param echo Logger to echo all messages, limited by {@code dbg} flag.
* @param substr Substring to filter warning messages.
*/
public CustomTestLogger(boolean dbg, @Nullable IgniteLogger echo, String substr) {
super(dbg, echo);
this.substr = substr;
}
/** {@inheritDoc} */
@Override public void warning(String msg, @Nullable Throwable t) {
super.warning(msg, t);
if (substr == null || msg.toLowerCase().contains(substr.toLowerCase()))
warningsTotal.incrementAndGet();
}
/**
* @return Total number of warnings.
*/
public int warningsTotal() {
return warningsTotal.get();
}
}
/**
* Custom communication SPI for simulating long running cache futures.
*/
private static class CustomTcpCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
public CustomTcpCommunicationSpi() {
// No-op.
}
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
// Skip response from backup node.
if (((GridIoMessage)msg).message() instanceof GridDhtAtomicDeferredUpdateResponse)
return;
}
super.sendMessage(node, msg, ackC);
}
}
}