blob: 11c89c9b0a124a86021c946876b32f49a88cca9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.yardstick.cache.failover;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.mxbean.IgniteMXBean;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.yardstick.cache.IgniteCacheAbstractBenchmark;
import org.yardstickframework.BenchmarkConfiguration;
import org.yardstickframework.BenchmarkUtils;
import org.yardstickframework.BenchmarkUtils.ProcessExecutionResult;
import static org.yardstickframework.BenchmarkUtils.println;
/**
* Ignite benchmark that performs long running failover tasks.
*/
public abstract class IgniteFailoverAbstractBenchmark<K, V> extends IgniteCacheAbstractBenchmark<K, V> {
/** */
private static final AtomicBoolean restarterStarted = new AtomicBoolean();
/** */
private final AtomicBoolean firtsExProcessed = new AtomicBoolean();
/** {@inheritDoc} */
@Override public void setUp(final BenchmarkConfiguration cfg) throws Exception {
super.setUp(cfg);
}
/** {@inheritDoc} */
@Override public void onWarmupFinished() {
if (cfg.memberId() == 0 && restarterStarted.compareAndSet(false, true)) {
Thread restarterThread = new Thread(new Runnable() {
@Override public void run() {
try {
println("Servers restarter started on driver: "
+ IgniteFailoverAbstractBenchmark.this.getClass().getSimpleName());
Ignite ignite = ignite();
// Read servers configs from cache to local map.
IgniteCache<Integer, BenchmarkConfiguration> srvsCfgsCache = ignite.
getOrCreateCache(new CacheConfiguration<Integer, BenchmarkConfiguration>().
setName("serversConfigs"));
final Map<Integer, BenchmarkConfiguration> srvsCfgs = new HashMap<>();
for (Cache.Entry<Integer, BenchmarkConfiguration> e : srvsCfgsCache) {
println("Read entry from 'serversConfigs' cache : " + e);
srvsCfgs.put(e.getKey(), e.getValue());
}
assert ignite.cluster().forServers().nodes().size() == srvsCfgs.size();
final int backupsCnt = args.backups();
assert backupsCnt >= 1 : "Backups: " + backupsCnt;
final boolean isDebug = ignite.log().isDebugEnabled();
// Main logic.
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(args.restartDelay() * 1000);
int numNodesToRestart = nextRandom(1, backupsCnt + 1);
List<Integer> ids = new ArrayList<>();
ids.addAll(srvsCfgs.keySet());
Collections.shuffle(ids);
println("Waiting for partitioned map exchage of all nodes");
ignite.compute().broadcastAsync(new AwaitPartitionMapExchangeTask())
.get(args.cacheOperationTimeoutMillis());
println("Start servers restarting [numNodesToRestart=" + numNodesToRestart
+ ", shuffledIds=" + ids + "]");
for (int i = 0; i < numNodesToRestart; i++) {
Integer id = ids.get(i);
BenchmarkConfiguration bc = srvsCfgs.get(id);
ProcessExecutionResult res = BenchmarkUtils.kill9Server(bc, isDebug);
println("Server with id " + id + " has been killed."
+ (isDebug ? " Process execution result:\n" + res : ""));
}
Thread.sleep(args.restartSleep() * 1000);
for (int i = 0; i < numNodesToRestart; i++) {
Integer id = ids.get(i);
BenchmarkConfiguration bc = srvsCfgs.get(id);
ProcessExecutionResult res = BenchmarkUtils.startServer(bc, isDebug);
println("Server with id " + id + " has been started."
+ (isDebug ? " Process execution result:\n" + res : ""));
}
}
}
catch (Throwable e) {
println("Got exception: " + e);
e.printStackTrace();
U.dumpThreads(null);
if (e instanceof Error)
throw (Error)e;
}
}
}, "servers-restarter");
restarterThread.setDaemon(true);
restarterThread.start();
}
}
/**
* Awaits for partitiona map exchage.
*
* @param ignite Ignite.
* @throws Exception If failed.
*/
@SuppressWarnings("BusyWait")
protected static void awaitPartitionMapExchange(Ignite ignite) throws Exception {
IgniteLogger log = ignite.log();
log.info("Waiting for finishing of a partition exchange on node: " + ignite);
IgniteKernal kernal = (IgniteKernal)ignite;
while (true) {
boolean partitionsExchangeFinished = true;
for (IgniteInternalCache<?, ?> cache : kernal.cachesx(null)) {
log.info("Checking cache: " + cache.name());
GridCacheAdapter<?, ?> c = kernal.internalCache(cache.name());
if (!(c instanceof GridDhtCacheAdapter))
break;
GridDhtCacheAdapter<?, ?> dht = (GridDhtCacheAdapter<?, ?>)c;
GridDhtPartitionFullMap partMap = dht.topology().partitionMap(true);
for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
log.info("Checking node: " + e.getKey());
for (Map.Entry<Integer, GridDhtPartitionState> e1 : e.getValue().entrySet()) {
if (e1.getValue() != GridDhtPartitionState.OWNING) {
log.info("Undesired state [id=" + e1.getKey() + ", state=" + e1.getValue() + ']');
partitionsExchangeFinished = false;
break;
}
}
if (!partitionsExchangeFinished)
break;
}
if (!partitionsExchangeFinished)
break;
}
if (partitionsExchangeFinished)
return;
Thread.sleep(100);
}
}
/** {@inheritDoc} */
@Override public void onException(Throwable e) {
// Proceess only the first exception to prevent a multiple printing of a full thread dump.
if (firtsExProcessed.compareAndSet(false, true)) {
// Debug info on current client.
println("Full thread dump of the current node below.");
U.dumpThreads(null);
println("");
((IgniteMXBean)ignite()).dumpDebugInfo();
// Debug info on servers.
Ignite ignite = ignite();
ClusterGroup srvs = ignite.cluster().forServers();
ignite.compute(srvs).broadcastAsync(new ThreadDumpPrinterTask(ignite.cluster().localNode().id(), e))
.get(10_000);
}
}
/**
* @return Cache name.
*/
protected abstract String cacheName();
/** {@inheritDoc} */
@Override protected IgniteCache<K, V> cache() {
return ignite().cache(cacheName());
}
/**
*/
private static class ThreadDumpPrinterTask implements IgniteRunnable {
/** */
private static final long serialVersionUID = 0;
/** */
@IgniteInstanceResource
private Ignite ignite;
/** */
private final UUID id;
/** */
private final Throwable e;
/**
* @param id Benchmark node id.
* @param e Exception.
*/
ThreadDumpPrinterTask(UUID id, Throwable e) {
this.id = id;
this.e = e;
}
/** {@inheritDoc} */
@Override public void run() {
println("Driver finished with exception [driverNodeId=" + id + ", e=" + e + "]");
println("Full thread dump of the current server node below.");
U.dumpThreads(null);
println("");
((IgniteMXBean)ignite).dumpDebugInfo();
}
}
/**
*/
private static class AwaitPartitionMapExchangeTask implements IgniteRunnable {
/** */
private static final long serialVersionUID = 0;
/** */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public void run() {
try {
awaitPartitionMapExchange(ignite);
}
catch (Exception e) {
throw new IgniteException(e);
}
}
}
}