blob: 66f5af303b8658c074d01257ace28deaa962799d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.stat;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedEnumProperty;
import org.apache.ignite.internal.processors.query.schema.management.SchemaManager;
import org.apache.ignite.internal.processors.query.schema.management.TableDescriptor;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import static org.apache.ignite.internal.processors.query.stat.StatisticsUsageState.NO_UPDATE;
import static org.apache.ignite.internal.processors.query.stat.StatisticsUsageState.OFF;
import static org.apache.ignite.internal.processors.query.stat.StatisticsUsageState.ON;
/**
* Statistics manager implementation.
*/
public class IgniteStatisticsManagerImpl implements IgniteStatisticsManager {
/** Size of statistics collection pool. */
private static final int STATS_POOL_SIZE = 4;
/** Default statistics usage state. */
private static final StatisticsUsageState DEFAULT_STATISTICS_USAGE_STATE = ON;
/** Interval to check statistics obsolescence in seconds. */
private static final int OBSOLESCENCE_INTERVAL = 60;
/** Logger. */
private final IgniteLogger log;
/** Kernal context. */
private final GridKernalContext ctx;
/** SchemaManager */
private final SchemaManager schemaMgr;
/** Statistics repository. */
private final IgniteStatisticsRepository statsRepos;
/** Global statistics repository. */
private final IgniteGlobalStatisticsManager globalStatsMgr;
/** Ignite statistics helper. */
private final IgniteStatisticsHelper helper;
/** Statistics collector. */
private final StatisticsProcessor statProc;
/** Statistics configuration manager. */
private final IgniteStatisticsConfigurationManager statCfgMgr;
/** Management pool. */
private final IgniteThreadPoolExecutor mgmtPool;
/** Executor to do obsolescence management. */
private final BusyExecutor obsolescenceBusyExecutor;
/** Gathering pool. */
private final IgniteThreadPoolExecutor gatherPool;
/** Cluster wide statistics usage state. */
private final DistributedEnumProperty<StatisticsUsageState> usageState = new DistributedEnumProperty<>(
"statistics.usage.state",
"Statistics usage state. OFF - No statistics used, NO_UPDATE - Statistics used 'as is' without updates, " +
"ON - Statistics used and updated after each changes.",
StatisticsUsageState::fromOrdinal,
StatisticsUsageState::index,
StatisticsUsageState.class);
/** Last known statistics usage state. */
private volatile StatisticsUsageState lastUsageState = null;
/** Started flag to prevent double start on change statistics usage state and activation and vice versa. */
private volatile boolean started;
/** Schedule to process obsolescence statistics. */
private GridTimeoutProcessor.CancelableTask obsolescenceSchedule;
/** Exchange listener. */
private final PartitionsExchangeAware exchAwareLsnr = new PartitionsExchangeAware() {
@Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL)
return;
ClusterState newState = ctx.state().clusterState().state();
if (newState.active() && !started)
tryStart();
if (!newState.active() && started)
tryStop();
if (started)
statCfgMgr.afterTopologyUnlock(fut);
}
};
/**
* Constructor.
*
* @param ctx Kernal context.
*/
public IgniteStatisticsManagerImpl(GridKernalContext ctx) {
this.ctx = ctx;
schemaMgr = ctx.query().schemaManager();
boolean serverNode = !ctx.config().isClientMode();
helper = new IgniteStatisticsHelper(ctx.localNodeId(), schemaMgr, ctx::log);
log = ctx.log(IgniteStatisticsManagerImpl.class);
IgniteCacheDatabaseSharedManager db = (GridCacheUtils.isPersistenceEnabled(ctx.config())) ?
ctx.cache().context().database() : null;
gatherPool = (serverNode) ? new IgniteThreadPoolExecutor("stat-gather",
ctx.igniteInstanceName(),
0,
STATS_POOL_SIZE,
IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
ctx.uncaughtExceptionHandler()
) : null;
mgmtPool = new IgniteThreadPoolExecutor("stat-mgmt",
ctx.igniteInstanceName(),
0,
1,
IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
ctx.uncaughtExceptionHandler()
);
obsolescenceBusyExecutor = new BusyExecutor("obsolescence", mgmtPool, ctx::isStopping, ctx::log);
IgniteStatisticsStore store;
if (!serverNode)
store = new IgniteStatisticsDummyStoreImpl(ctx::log);
else if (db == null)
store = new IgniteStatisticsInMemoryStoreImpl(ctx::log);
else
store = new IgniteStatisticsPersistenceStoreImpl(ctx.internalSubscriptionProcessor(), db, ctx::log);
statsRepos = new IgniteStatisticsRepository(store, ctx.systemView(), helper, ctx::log);
statProc = serverNode ? new StatisticsProcessor(
statsRepos,
gatherPool,
ctx::isStopping,
ctx::log
) : null;
statCfgMgr = new IgniteStatisticsConfigurationManager(
schemaMgr,
ctx.internalSubscriptionProcessor(),
ctx.systemView(),
ctx.state(),
statProc,
db != null,
mgmtPool,
ctx::isStopping,
ctx::log,
serverNode
);
globalStatsMgr = new IgniteGlobalStatisticsManager(
this,
ctx.systemView(),
mgmtPool,
ctx.discovery(),
ctx.state(),
ctx.cache().context().exchange(),
helper,
ctx.io(),
ctx::log);
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
usageState.addListener((name, oldVal, newVal) -> {
if (log.isInfoEnabled())
log.info(String.format("Statistics usage state was changed from %s to %s", oldVal, newVal));
lastUsageState = newVal;
if (oldVal == newVal)
return;
if (newVal == ON || newVal == NO_UPDATE)
tryStart();
if (newVal == OFF)
tryStop();
});
dispatcher.registerProperty(usageState);
});
tryStart();
if (serverNode) {
// Use mgmt pool to work with statistics repository in busy lock to schedule some tasks.
obsolescenceSchedule = ctx.timeout().schedule(() -> {
obsolescenceBusyExecutor.execute(() -> processObsolescence());
}, OBSOLESCENCE_INTERVAL * 1000, OBSOLESCENCE_INTERVAL * 1000);
}
ctx.cache().context().exchange().registerExchangeAwareComponent(exchAwareLsnr);
}
/**
* Check all preconditions and stop if started and have reason to stop.
*/
private synchronized void tryStop() {
StatisticsUsageState statUsageState = usageState();
if (!(ClusterState.ACTIVE == ctx.state().clusterState().state()
&& !ctx.isStopping()
&& (statUsageState == ON || statUsageState == NO_UPDATE))
&& started)
stopX();
}
/**
* Stop all statistics related components.
*/
private void stopX() {
if (log.isDebugEnabled())
log.debug("Stopping statistics subsystem");
globalStatsMgr.stop();
statCfgMgr.stop();
if (statProc != null)
statProc.stop();
statsRepos.stop();
obsolescenceBusyExecutor.deactivate();
started = false;
}
/**
* Check all preconditions and start if stopped and all preconditions pass.
*/
private synchronized void tryStart() {
StatisticsUsageState statUsageState = usageState();
if (ClusterState.ACTIVE == ctx.state().clusterState().state()
&& !ctx.isStopping()
&& (statUsageState == ON || statUsageState == NO_UPDATE)
&& !started)
startX();
}
/**
* Start all statistics related components.
*/
private void startX() {
if (log.isDebugEnabled())
log.debug("Starting statistics subsystem...");
obsolescenceBusyExecutor.activate();
statsRepos.start();
if (statProc != null)
statProc.start();
statCfgMgr.start();
globalStatsMgr.start();
started = true;
}
/**
* @return Statistics repository.
*/
public IgniteStatisticsRepository statisticsRepository() {
return statsRepos;
}
/** {@inheritDoc} */
@Override public ObjectStatistics getLocalStatistics(StatisticsKey key) {
StatisticsUsageState currState = usageState();
return (currState == ON || currState == NO_UPDATE) ? statsRepos.getLocalStatistics(key, null) : null;
}
/**
* Get local statitsics with specified topology version if exists.
*
* @param key Key to get statistics by.
* @param topVer Required topology version.
* @return Local object statistics or {@code null} if there are no statistics with requested topology version.
*/
public ObjectStatisticsImpl getLocalStatistics(StatisticsKey key, AffinityTopologyVersion topVer) {
return statsRepos.getLocalStatistics(key, topVer);
}
/** {@inheritDoc} */
@Override public ObjectStatistics getGlobalStatistics(StatisticsKey key) {
StatisticsUsageState currState = usageState();
return (currState == ON || currState == NO_UPDATE) ? globalStatsMgr.getGlobalStatistics(key) : null;
}
/** {@inheritDoc} */
@Override public void collectStatistics(StatisticsObjectConfiguration... targets) throws IgniteCheckedException {
ensureActive("collect statistics");
if (usageState() == OFF)
throw new IgniteException("Can't gather statistics while statistics usage state is OFF.");
statCfgMgr.updateStatistics(targets);
}
/** {@inheritDoc} */
@Override public void dropStatistics(StatisticsTarget... targets) throws IgniteCheckedException {
ensureActive("drop statistics");
if (usageState() == OFF)
throw new IgniteException("Can't drop statistics while statistics usage state is OFF.");
statCfgMgr.dropStatistics(Arrays.asList(targets), true);
}
/** {@inheritDoc} */
@Override public void refreshStatistics(StatisticsTarget... targets) throws IgniteCheckedException {
ensureActive("refresh statistics");
if (usageState() == OFF)
throw new IgniteException("Can't refresh statistics while statistics usage state is OFF.");
statCfgMgr.refreshStatistics(Arrays.asList(targets));
}
/** {@inheritDoc} */
@Override public void dropAll() throws IgniteCheckedException {
ensureActive("drop all statistics");
statCfgMgr.dropAll();
}
/** {@inheritDoc} */
@Override public void stop() {
stopX();
if (obsolescenceSchedule != null)
obsolescenceSchedule.close();
if (gatherPool != null) {
List<Runnable> unfinishedTasks = gatherPool.shutdownNow();
if (!unfinishedTasks.isEmpty())
log.warning(String.format("%d statistics collection cancelled.", unfinishedTasks.size()));
}
if (mgmtPool != null) {
List<Runnable> unfinishedTasks = mgmtPool.shutdownNow();
if (!unfinishedTasks.isEmpty()) {
log.warning(String.format("%d statistics configuration change handler cancelled.",
unfinishedTasks.size()));
}
}
}
/**
* @return Statistics configuration manager.
*/
public IgniteStatisticsConfigurationManager statisticConfiguration() {
return statCfgMgr;
}
/** {@inheritDoc} */
@Override public void usageState(StatisticsUsageState state) throws IgniteCheckedException {
ensureActive("change usage state of statistics");
try {
usageState.propagate(state);
}
catch (IgniteCheckedException e) {
log.error("Unable to set usage state value due to " + e.getMessage(), e);
}
}
/** {@inheritDoc} */
@Override public StatisticsUsageState usageState() {
return (lastUsageState == null) ? DEFAULT_STATISTICS_USAGE_STATE : lastUsageState;
}
/** {@inheritDoc} */
@Override public void onRowUpdated(String schemaName, String objName, int partId, byte[] keyBytes) {
ObjectPartitionStatisticsObsolescence statObs = statsRepos.getObsolescence(
new StatisticsKey(schemaName, objName), partId);
if (statObs != null)
statObs.onModified(keyBytes);
}
/**
* Save dirty obsolescence info to local metastore. Check if statistics need to be refreshed and schedule it.
*
* 1) Get all dirty partition statistics.
* 2) Make separate tasks for each key to avoid saving obsolescence info for removed partition (race).
* 3) Check if partition should be recollected and add it to list in its tables task.
* 4) Submit tasks. Actually obsolescence info will be stored during task processing.
*/
public synchronized void processObsolescence() {
StatisticsUsageState usageState = usageState();
if (usageState != ON || ctx.isStopping()) {
if (log.isDebugEnabled())
log.debug("Skipping obsolescence processing.");
return;
}
if (log.isTraceEnabled())
log.trace("Process statistics obsolescence started.");
List<StatisticsKey> keys = statsRepos.getObsolescenceKeys();
if (F.isEmpty(keys)) {
if (log.isTraceEnabled())
log.trace("No obsolescence info found. Finish obsolescence processing.");
return;
}
else {
if (log.isTraceEnabled())
log.trace(String.format("Scheduling obsolescence savings for %d targets", keys.size()));
}
for (StatisticsKey key : keys) {
StatisticsObjectConfiguration cfg = null;
try {
cfg = statCfgMgr.config(key);
}
catch (IgniteCheckedException e) {
// No-op/
}
Set<Integer> tasksParts = calculateObsolescencedPartitions(cfg, statsRepos.getObsolescence(key));
TableDescriptor tbl = schemaMgr.table(key.schema(), key.obj());
GridCacheContextInfo<?, ?> cacheInfo = tbl.cacheInfo();
if (tbl == null) {
// Table can be removed earlier, but not already processed. Or somethink goes wrong. Try to reschedule.
if (log.isDebugEnabled())
log.debug(String.format("Got obsolescence statistics for unknown table %s", key));
}
LocalStatisticsGatheringContext ctx = new LocalStatisticsGatheringContext(true,
tbl.type(), cacheInfo, cfg, tasksParts, null);
statProc.updateLocalStatistics(ctx);
}
}
/**
* Calculate targets to refresh obsolescence statistics by map of dirty partitions and actual per partition
* statistics.
*
* @param cfg Statistics configuration
* @param parts list of it's obsolescence info paritions.
* @return Map of statistics cfg to partition to refresh statistics.
*/
private Set<Integer> calculateObsolescencedPartitions(
StatisticsObjectConfiguration cfg,
IntMap<ObjectPartitionStatisticsObsolescence> parts
) {
Set<Integer> res = new HashSet<>();
parts.forEach((k, v) -> {
ObjectPartitionStatisticsImpl partStat = statsRepos.getLocalPartitionStatistics(cfg.key(), k);
if (partStat == null || partStat.rowCount() == 0 ||
(double)v.modified() * 100 / partStat.rowCount() > cfg.maxPartitionObsolescencePercent())
res.add(k);
});
// Will add even empty list of partitions to recollect just to force obsolescence info to be stored.
return res;
}
/**
* Subscribe to all local statistics changes.
*
* @param subscriber Local statitics subscriber.
*/
public void subscribeToLocalStatistics(Consumer<ObjectStatisticsEvent> subscriber) {
statsRepos.subscribeToLocalStatistics(subscriber);
}
/**
* Subscribe to all statistics configuration changed.
*
* @param subscriber Statistics configuration subscriber.
*/
public void subscribeToStatisticsConfig(Consumer<StatisticsObjectConfiguration> subscriber) {
statCfgMgr.subscribe(subscriber);
}
/**
* Check that cluster is active.
*
* @param op Operation name.
*/
public void ensureActive(String op) {
if (ctx.state().clusterState().state() != ClusterState.ACTIVE)
throw new IgniteException(String.format(
"Unable to perform %s due to cluster state [state=%s]", op, ctx.state().clusterState().state()));
}
}