blob: 0a403e2c657d5b03ea54e8d5e09f8311137e13e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.startup;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
/**
* Basic warm-up closure which warm-ups cache operations.
*/
public class BasicWarmupClosure implements IgniteInClosure<IgniteConfiguration> {
/** */
private static final long serialVersionUID = 9175346848249957458L;
/** Default grid count to warm up. */
public static final int DFLT_GRID_CNT = 2;
/** Default iteration count per thread. */
public static final int DFLT_ITERATION_CNT = 30_000;
/** Default key range. */
public static final int DFLT_KEY_RANGE = 10_000;
/** Grid count. */
private int gridCnt = DFLT_GRID_CNT;
/** Warmup thread count. */
private int threadCnt = Runtime.getRuntime().availableProcessors() * 2;
/** Per thread iteration count. */
private int iterCnt = DFLT_ITERATION_CNT;
/** Key range. */
private int keyRange = DFLT_KEY_RANGE;
/** Warmup discovery port. */
private int discoveryPort = 27000;
/** Methods to warmup. */
@GridToStringInclude
private String[] warmupMethods = {"put", "putx", "get", "remove", "removex", "putIfAbsent", "replace"};
/**
* Gets number of grids to start and run warmup.
*
* @return Number of grids.
*/
public int getGridCount() {
return gridCnt;
}
/**
* Sets number of grids to start and run the warmup.
*
* @param gridCnt Number of grids.
*/
public void setGridCount(int gridCnt) {
this.gridCnt = gridCnt;
}
/**
* Gets warmup methods to use for cache warmup.
*
* @return Warmup methods.
*/
public String[] getWarmupMethods() {
return warmupMethods;
}
/**
* Sets warmup methods to use for cache warmup.
*
* @param warmupMethods Array of warmup methods.
*/
public void setWarmupMethods(String... warmupMethods) {
this.warmupMethods = warmupMethods;
}
/**
* Gets thread count for warmup.
*
* @return Thread count.
*/
public int getThreadCount() {
return threadCnt;
}
/**
* Sets thread count for warmup.
*
* @param threadCnt Thread count.
*/
public void setThreadCount(int threadCnt) {
this.threadCnt = threadCnt;
}
/**
* Gets iteration count for warmup.
*
* @return Iteration count.
*/
public int getIterationCount() {
return iterCnt;
}
/**
* Sets iteration count for warmup.
*
* @param iterCnt Iteration count for warmup.
*/
public void setIterationCount(int iterCnt) {
this.iterCnt = iterCnt;
}
/**
* Gets key range.
*
* @return Key range.
*/
public int getKeyRange() {
return keyRange;
}
/**
* Sets key range.
*
* @param keyRange Key range.
*/
public void setKeyRange(int keyRange) {
this.keyRange = keyRange;
}
/**
* Gets discovery port for warmup.
*
* @return Discovery port.
*/
public int getDiscoveryPort() {
return discoveryPort;
}
/**
* Sets discovery port for warmup.
*
* @param discoveryPort Discovery port.
*/
public void setDiscoveryPort(int discoveryPort) {
this.discoveryPort = discoveryPort;
}
/** {@inheritDoc} */
@Override public void apply(IgniteConfiguration gridCfg) {
// Remove cache duplicates, clean up the rest, etc.
IgniteConfiguration cfg = prepareConfiguration(gridCfg);
// Do nothing if no caches found.
if (cfg == null)
return;
out("Starting grids to warmup caches [gridCnt=" + gridCnt +
", caches=" + cfg.getCacheConfiguration().length + ']');
Collection<Ignite> ignites = new LinkedList<>();
String old = System.getProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER);
try {
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
for (int i = 0; i < gridCnt; i++) {
IgniteConfiguration cfg0 = new IgniteConfiguration(cfg);
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
discoSpi.setIpFinder(ipFinder);
discoSpi.setLocalPort(discoveryPort);
cfg0.setDiscoverySpi(discoSpi);
cfg0.setGridLogger(new NullLogger());
cfg0.setIgniteInstanceName("ignite-warmup-grid-" + i);
ignites.add(Ignition.start(cfg0));
}
doWarmup(ignites);
}
catch (Exception e) {
throw new IgniteException(e);
}
finally {
for (Ignite ignite : ignites)
Ignition.stop(ignite.name(), false);
out("Stopped warmup grids.");
if (old == null)
old = "false";
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, old);
}
}
/**
* @param grids Grids to warmup.
*/
private void doWarmup(Iterable<Ignite> grids) throws Exception {
Ignite first = F.first(grids);
ExecutorService svc = Executors.newFixedThreadPool(threadCnt);
try {
for (IgniteCacheProxy cache : ((IgniteKernal)first).caches()) {
if (!cache.context().userCache())
continue;
IgniteInternalCache<Object, Object> cache0 = cache.context().cache();
for (String warmupMethod : warmupMethods) {
Collection<Future> futs = new ArrayList<>(threadCnt);
for (int i = 0; i < threadCnt; i++) {
Callable call;
switch (warmupMethod) {
case "get": {
call = new GetCallable(cache0);
break;
}
case "put": {
call = new PutCallable(cache0);
break;
}
case "putx": {
call = new PutxCallable(cache0);
break;
}
case "remove": {
call = new RemoveCallable(cache0);
break;
}
case "removex": {
call = new RemovexCallable(cache0);
break;
}
case "putIfAbsent": {
call = new PutIfAbsentCallable(cache0);
break;
}
case "replace": {
call = new ReplaceCallable(cache0);
break;
}
default:
throw new IgniteCheckedException("Unsupported warmup method: " + warmupMethod);
}
futs.add(svc.submit(call));
}
out("Running warmup [cacheName=" + U.maskName(cache.getName()) + ", method=" + warmupMethod + ']');
for (Future fut : futs)
fut.get();
for (int key = 0; key < keyRange; key++)
cache0.getAndRemove(key);
}
}
}
finally {
svc.shutdownNow();
}
}
/**
* Output for warmup messages.
*
* @param msg Format message.
*/
private static void out(String msg) {
System.out.println('[' + IgniteUtils.SHORT_DATE_FMT.format(Instant.now()) + "][WARMUP][" +
Thread.currentThread().getName() + ']' + ' ' + msg);
}
/**
* Prepares configuration for warmup.
*
* @param gridCfg Original grid configuration.
* @return Prepared configuration or {@code null} if no caches found.
*/
private IgniteConfiguration prepareConfiguration(IgniteConfiguration gridCfg) {
if (F.isEmpty(gridCfg.getCacheConfiguration()))
return null;
IgniteConfiguration cp = new IgniteConfiguration();
cp.setConnectorConfiguration(null);
Collection<CacheConfiguration> reduced = new ArrayList<>();
for (CacheConfiguration ccfg : gridCfg.getCacheConfiguration()) {
if (CU.isSystemCache(ccfg.getName()))
continue;
if (!matches(reduced, ccfg)) {
CacheConfiguration ccfgCp = new CacheConfiguration(ccfg);
ccfgCp.setCacheStoreFactory(null);
ccfgCp.setWriteBehindEnabled(false);
reduced.add(ccfgCp);
}
}
if (F.isEmpty(reduced))
return null;
CacheConfiguration[] res = new CacheConfiguration[reduced.size()];
reduced.toArray(res);
cp.setCacheConfiguration(res);
return cp;
}
/**
* Checks if passed configuration matches one of the configurations in the list.
*
* @param reduced Reduced configurations.
* @param ccfg Cache configuration to match.
* @return {@code True} if matching configuration is found, {@code false} otherwise.
*/
private boolean matches(Iterable<CacheConfiguration> reduced, CacheConfiguration ccfg) {
for (CacheConfiguration ccfg0 : reduced) {
if (matches(ccfg0, ccfg))
return true;
}
return false;
}
/**
* Checks if cache configurations are alike for warmup.
*
* @param ccfg0 First configuration.
* @param ccfg1 Second configuration.
* @return {@code True} if configurations match.
*/
private boolean matches(CacheConfiguration ccfg0, CacheConfiguration ccfg1) {
return
F.eq(ccfg0.getCacheMode(), ccfg1.getCacheMode()) &&
F.eq(ccfg0.getBackups(), ccfg1.getBackups()) &&
F.eq(ccfg0.getAtomicityMode(), ccfg1.getAtomicityMode());
}
/**
* Base class for all warmup callables.
*/
private abstract class BaseWarmupCallable implements Callable<Object> {
/** Cache. */
protected final IgniteInternalCache<Object, Object> cache;
/**
* @param cache Cache.
*/
protected BaseWarmupCallable(IgniteInternalCache<Object, Object> cache) {
this.cache = cache;
}
/** {@inheritDoc} */
@Override public Object call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < iterCnt; i++)
operation(rnd.nextInt(keyRange));
return null;
}
/**
* Runs operation.
*
* @param key Key.
* @throws Exception If failed.
*/
protected abstract void operation(int key) throws Exception;
}
/**
*
*/
private class GetCallable extends BaseWarmupCallable {
/**
* @param cache Cache.
*/
private GetCallable(IgniteInternalCache<Object, Object> cache) {
super(cache);
}
/** {@inheritDoc} */
@Override protected void operation(int key) throws Exception {
cache.get(key);
}
}
/**
*
*/
private class PutCallable extends BaseWarmupCallable {
/**
* @param cache Cache.
*/
private PutCallable(IgniteInternalCache<Object, Object> cache) {
super(cache);
}
/** {@inheritDoc} */
@Override protected void operation(int key) throws Exception {
cache.getAndPut(key, key);
}
}
/**
*
*/
private class PutxCallable extends BaseWarmupCallable {
/**
* @param cache Cache.
*/
private PutxCallable(IgniteInternalCache<Object, Object> cache) {
super(cache);
}
/** {@inheritDoc} */
@Override protected void operation(int key) throws Exception {
cache.put(key, key);
}
}
/**
*
*/
private class RemoveCallable extends BaseWarmupCallable {
/**
* @param cache Cache.
*/
private RemoveCallable(IgniteInternalCache<Object, Object> cache) {
super(cache);
}
/** {@inheritDoc} */
@Override protected void operation(int key) throws Exception {
cache.getAndRemove(key);
}
}
/**
*
*/
private class RemovexCallable extends BaseWarmupCallable {
/**
* @param cache Cache.
*/
private RemovexCallable(IgniteInternalCache<Object, Object> cache) {
super(cache);
}
/** {@inheritDoc} */
@Override protected void operation(int key) throws Exception {
cache.remove(key);
}
}
/**
*
*/
private class PutIfAbsentCallable extends BaseWarmupCallable {
/**
* @param cache Cache.
*/
private PutIfAbsentCallable(IgniteInternalCache<Object, Object> cache) {
super(cache);
}
/** {@inheritDoc} */
@Override protected void operation(int key) throws Exception {
cache.getAndPutIfAbsent(key, key);
}
}
/**
*
*/
private class ReplaceCallable extends BaseWarmupCallable {
/**
* @param cache Cache.
*/
private ReplaceCallable(IgniteInternalCache<Object, Object> cache) {
super(cache);
}
/** {@inheritDoc} */
@Override protected void operation(int key) throws Exception {
cache.replace(key, key, key);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(BasicWarmupClosure.class, this);
}
}