blob: 8f2560d762d221c303b95d005b66c1b963f6005f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT;
/**
* Test exchange manager warnings.
*/
public class GridCachePartitionExchangeManagerWarningsTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
private static final String CACHE_NAME = "TEST_CACHE";
/** */
private String oldLongOpsDumpTimeout;
/** */
private CustomTestLogger testLog;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
oldLongOpsDumpTimeout = System.getProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
if (oldLongOpsDumpTimeout != null)
System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, oldLongOpsDumpTimeout);
else
System.clearProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
if (testLog != null)
testLog.clearListeners();
testLog = null;
}
/**
* @throws Exception If failed.
*/
@Test
public void testLongRunningCacheFutures() throws Exception {
long timeout = 1000;
System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, Long.toString(timeout));
testLog = new CustomTestLogger(false, log, "future");
int longRunFuturesCnt = 1000;
try (Ignite srv1 = start("srv-1", false, false)) {
try (Ignite srv2 = start("srv-2", false, false)) {
try (Ignite client = start("client", true, false)) {
try (IgniteDataStreamer<Integer, Integer> streamer = client.dataStreamer(CACHE_NAME)) {
streamer.allowOverwrite(true);
for (int i = 0; i < longRunFuturesCnt; i++)
streamer.addData(i, i);
}
Thread.sleep(timeout * 2);
}
}
}
assertTrue("Warnings were not found", testLog.warningsTotal() > 0);
assertTrue("Too much warnings in the logs: " + testLog.warningsTotal(),
testLog.warningsTotal() < longRunFuturesCnt);
}
/**
* @throws Exception If failed.
*/
@Test
public void testLongRunningTransactions() throws Exception {
long timeout = 1000;
System.setProperty(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, Long.toString(timeout));
testLog = new CustomTestLogger(false, log, "transaction");
int transactions = 100;
ExecutorService excSvc = Executors.newFixedThreadPool(transactions);
try (Ignite srv1 = start("srv", false, true)) {
CountDownLatch txStarted = new CountDownLatch(transactions);
CountDownLatch stopTx = new CountDownLatch(1);
for (int i = 0; i < transactions; i++)
excSvc.submit(new AsyncTransaction(srv1, CACHE_NAME, i, txStarted, stopTx));
if (!txStarted.await(10000, TimeUnit.MILLISECONDS))
fail("Unable to start transactions");
Thread.sleep(timeout * 2);
stopTx.countDown();
}
finally {
excSvc.shutdown();
if (!excSvc.awaitTermination(10000, TimeUnit.MILLISECONDS))
fail("Unable to wait for thread pool termination.");
}
assertTrue("Warnings were not found", testLog.warningsTotal() > 0);
assertTrue("Too much warnings in the logs: " + testLog.warningsTotal(),
testLog.warningsTotal() < transactions);
}
/**
* Start Ignite node.
*
* @param instanceName Ignite instance name.
* @param clientMode Client mode flag.
* @param transactional Transactional cache flag.
* @throws Exception If failed.
*/
private Ignite start(String instanceName, boolean clientMode, boolean transactional) throws Exception {
return Ignition.start(getConfiguration(instanceName, clientMode, transactional));
}
/**
* Create Ignite configuration.
*
* @param instanceName Ignite instance name.
* @param clientMode Client mode flag.
* @param transactional Transactional cache flag.
* @throws Exception If failed.
*/
private IgniteConfiguration getConfiguration(String instanceName, boolean clientMode, boolean transactional) throws Exception {
CacheConfiguration cacheCfg = new CacheConfiguration(CACHE_NAME)
.setBackups(1)
.setAtomicityMode(transactional ? CacheAtomicityMode.TRANSACTIONAL : CacheAtomicityMode.ATOMIC);
IgniteConfiguration cfg = getConfiguration()
.setCacheConfiguration(cacheCfg)
.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER))
.setCommunicationSpi(new CustomTcpCommunicationSpi())
.setIgniteInstanceName(instanceName)
.setClientMode(clientMode);
if (testLog != null)
cfg.setGridLogger(testLog);
return cfg;
}
/**
* Async tx runnable.
*/
private static class AsyncTransaction implements Runnable {
/** */
private final Ignite ignite;
/** */
private final String cacheName;
/** */
private final Integer key;
/** */
private final CountDownLatch startLatch;
/** */
private final CountDownLatch canStopLatch;
/**
* @param ignite Ignite instance.
* @param cacheName Cache name.
* @param key Cache key.
* @param startLatch Latch to synchronize all transactions.
* @param canStopLatch Latch to finish transaction.
*/
public AsyncTransaction(Ignite ignite, String cacheName, Integer key, CountDownLatch startLatch, CountDownLatch canStopLatch) {
this.ignite = ignite;
this.cacheName = cacheName;
this.key = key;
this.startLatch = startLatch;
this.canStopLatch = canStopLatch;
}
/**
* Start transaction.
*/
@Override public void run() {
IgniteTransactions transactions = ignite.transactions();
try (Transaction tx = transactions.txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE)) {
IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
Integer val = cache.get(key);
startLatch.countDown();
cache.put(key, val == null ? 1 : val + 1);
try {
canStopLatch.await();
}
catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
tx.rollback();
}
}
}
/**
* Custom logger for counting warning messages.
*/
private static class CustomTestLogger extends ListeningTestLogger {
/** */
private final AtomicInteger warningsTotal = new AtomicInteger();
/** */
private final String substr;
/**
* @param dbg If set to {@code true}, enables debug and trace log messages processing.
* @param echo Logger to echo all messages, limited by {@code dbg} flag.
* @param substr Substring to filter warning messages.
*/
public CustomTestLogger(boolean dbg, @Nullable IgniteLogger echo, String substr) {
super(dbg, echo);
this.substr = substr;
}
/** {@inheritDoc} */
@Override public void warning(String msg, @Nullable Throwable t) {
super.warning(msg, t);
if (substr == null || msg.toLowerCase().contains(substr.toLowerCase()))
warningsTotal.incrementAndGet();
}
/**
* @return Total number of warnings.
*/
public int warningsTotal() {
return warningsTotal.get();
}
}
/**
* Custom communication SPI for simulating long running cache futures.
*/
private static class CustomTcpCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
public CustomTcpCommunicationSpi() {
// No-op.
}
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
// Skip response from backup node.
if (((GridIoMessage)msg).message() instanceof GridDhtAtomicDeferredUpdateResponse)
return;
}
super.sendMessage(node, msg, ackC);
}
}
}