blob: feaf82ee49512ba7451efbc9ea52fe198130d5c5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.stat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.systemview.GridSystemViewManager;
import org.apache.ignite.internal.managers.systemview.walker.StatisticsColumnConfigurationViewWalker;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.AbstractSchemaChangeListener;
import org.apache.ignite.internal.processors.query.schema.SchemaChangeListener;
import org.apache.ignite.internal.processors.query.schema.management.SchemaManager;
import org.apache.ignite.internal.processors.query.schema.management.TableDescriptor;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.processors.query.stat.view.ColumnConfigurationViewSupplier;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
/**
* Holds statistic configuration objects at the distributed metastore
* and match local statistics with target statistic configuration.
*/
public class IgniteStatisticsConfigurationManager {
/** */
private static final String STAT_OBJ_PREFIX = "sql.statobj.";
/** */
private static final String STAT_CFG_VIEW_NAME = "statistics.configuration";
/** */
private static final String STAT_CFG_VIEW_DESCRIPTION = "Statistics configuration";
/** Empty strings array. */
public static final String[] EMPTY_STRINGS = new String[0];
/** Schema manager. */
private final SchemaManager schemaMgr;
/** Distributed metastore. */
private volatile DistributedMetaStorage distrMetaStorage;
/** Statistic processor. */
private final StatisticsProcessor statProc;
/** */
private final BusyExecutor mgmtBusyExecutor;
/** Persistence enabled flag. */
private final boolean persistence;
/** Logger. */
private final IgniteLogger log;
/** Last ready topology version if {@code null} - used to skip updates of the distributed metastorage on start. */
private volatile AffinityTopologyVersion topVer;
/** Cluster state processor. */
private final GridClusterStateProcessor cluster;
/** Is server node flag. */
private final boolean isServerNode;
/** Active flag. */
private volatile boolean active;
/** Configuration change subscribers. */
private final List<Consumer<StatisticsObjectConfiguration>> subscribers = new CopyOnWriteArrayList<>();
/** Change statistics configuration listener to update particular object statistics. */
private final DistributedMetastorageLifecycleListener distrMetaStoreLsnr =
new DistributedMetastorageLifecycleListener() {
@Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
distrMetaStorage = metastorage;
distrMetaStorage.listen(
(metaKey) -> metaKey.startsWith(STAT_OBJ_PREFIX),
(k, oldV, newV) -> {
// Skip invoke on start node (see 'ReadableDistributedMetaStorage#listen' the second case)
// The update statistics on start node is handled by 'scanAndCheckLocalStatistic' method
// called on exchange done.
if (topVer == null)
return;
StatisticsObjectConfiguration newStatCfg = (StatisticsObjectConfiguration)newV;
for (Consumer<StatisticsObjectConfiguration> subscriber : subscribers)
subscriber.accept(newStatCfg);
mgmtBusyExecutor.execute(() -> {
try {
while (!updateLocalStatisticsAsync((StatisticsObjectConfiguration)newV).get())
; // No-op
}
catch (IgniteCheckedException e) {
log.warning("Unexpected error during statistics collection: " + e.getMessage(), e);
}
});
}
);
}
};
/** Schema change listener */
private final SchemaChangeListener schemaLsnr = new AbstractSchemaChangeListener() {
@Override public void onColumnsDropped(
String schemaName,
GridQueryTypeDescriptor typeDesc,
GridCacheContextInfo<?, ?> cacheInfo,
List<String> cols
) {
if (!active)
return;
assert !F.isEmpty(cols);
// Drop statistics after columns dropped.
dropStatistics(
Collections.singletonList(
new StatisticsTarget(schemaName, typeDesc.tableName(), cols.toArray(EMPTY_STRINGS))
),
false
);
}
@Override public void onSqlTypeDropped(
String schemaName,
GridQueryTypeDescriptor typeDesc,
boolean destroy
) {
if (!active || !destroy)
return;
String name = typeDesc.tableName();
assert !F.isEmpty(schemaName) && !F.isEmpty(name) : schemaName + ":" + name;
StatisticsKey key = new StatisticsKey(schemaName, name);
try {
StatisticsObjectConfiguration cfg = config(key);
if (cfg != null && !F.isEmpty(cfg.columns()))
dropStatistics(Collections.singletonList(new StatisticsTarget(schemaName, name)), false);
}
catch (Throwable e) {
if (!X.hasCause(e, NodeStoppingException.class))
throw new IgniteSQLException("Error on drop statistics for dropped table [key=" + key + ']', e);
}
}
};
/**
* Constructor.
*
* @param schemaMgr Schema manager.
* @param subscriptionProcessor Subscription processor.
* @param sysViewMgr System view manager.
* @param cluster Cluster state processor.
* @param statProc Staitistics processor.
* @param persistence Persistence enabled flag.
* @param mgmtPool Statistics management pool
* @param stopping Stopping state supplier.
* @param logSupplier Log supplier.
* @param isServerNode Server node flag.
*/
public IgniteStatisticsConfigurationManager(
SchemaManager schemaMgr,
GridInternalSubscriptionProcessor subscriptionProcessor,
GridSystemViewManager sysViewMgr,
GridClusterStateProcessor cluster,
StatisticsProcessor statProc,
boolean persistence,
IgniteThreadPoolExecutor mgmtPool,
Supplier<Boolean> stopping,
Function<Class<?>, IgniteLogger> logSupplier,
boolean isServerNode
) {
this.schemaMgr = schemaMgr;
log = logSupplier.apply(IgniteStatisticsConfigurationManager.class);
this.persistence = persistence;
this.mgmtBusyExecutor = new BusyExecutor("configuration", mgmtPool, stopping, logSupplier);
this.statProc = statProc;
this.cluster = cluster;
this.isServerNode = isServerNode;
subscriptionProcessor.registerDistributedMetastorageListener(distrMetaStoreLsnr);
if (isServerNode)
subscriptionProcessor.registerSchemaChangeListener(schemaLsnr);
ColumnConfigurationViewSupplier colCfgViewSupplier = new ColumnConfigurationViewSupplier(this,
logSupplier);
sysViewMgr.registerFiltrableView(STAT_CFG_VIEW_NAME, STAT_CFG_VIEW_DESCRIPTION,
new StatisticsColumnConfigurationViewWalker(),
colCfgViewSupplier::columnConfigurationViewSupplier,
Function.identity());
}
/**
* Update statistics after topology change, if necessary.
*
* @param fut Topology change future.
*/
public void afterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
topVer = fut.topologyVersion();
// Skip join/left client nodes.
if (fut.exchangeType() != GridDhtPartitionsExchangeFuture.ExchangeType.ALL ||
(persistence && cluster.clusterState().lastState() != ClusterState.ACTIVE))
return;
DiscoveryEvent evt = fut.firstEvent();
// Skip create/destroy caches.
if (evt.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
if (msg instanceof DynamicCacheChangeBatch)
return;
}
mgmtBusyExecutor.execute(this::updateAllLocalStatistics);
}
/**
* Pass all necessary parameters to schedule statistics key update.
*
* @param cfg Statistics object configuration to update statistics by.
*/
private IgniteInternalFuture<Boolean> updateLocalStatisticsAsync(StatisticsObjectConfiguration cfg) {
TableDescriptor tbl = schemaMgr.table(cfg.key().schema(), cfg.key().obj());
GridQueryTypeDescriptor typeDesc = tbl != null ? tbl.type() : null;
GridCacheContextInfo<?, ?> cacheInfo = tbl != null ? tbl.cacheInfo() : null;
GridCacheContext<?, ?> cctx = cacheInfo != null ? cacheInfo.cacheContext() : null;
if (tbl == null || cfg.columns().isEmpty()) {
// Can be drop table event, need to ensure that there is no stored data left for this table.
if (log.isDebugEnabled()) {
if (tbl == null)
log.debug("Can't find table by key " + cfg.key() + ". Check statistics empty.");
else if (cfg == null)
log.debug("Tombstone configuration by key " + cfg.key() + ". Check statistics empty.");
}
// Ensure to clean local metastorage.
LocalStatisticsGatheringContext ctx = new LocalStatisticsGatheringContext(false, typeDesc,
cacheInfo, cfg, Collections.emptySet(), topVer);
statProc.updateLocalStatistics(ctx);
if (tbl == null && !cfg.columns().isEmpty()) {
if (log.isDebugEnabled())
log.debug("Removing config for non existing object " + cfg.key());
return dropStatisticsAsync(Collections.singletonList(new StatisticsTarget(cfg.key())), false);
}
return new GridFinishedFuture<>(true);
}
if (cctx == null || !cctx.gate().enterIfNotStopped()) {
if (log.isDebugEnabled())
log.debug("Unable to lock table by key " + cfg.key() + ". Skipping statistics collection.");
return new GridFinishedFuture<>(true);
}
try {
AffinityTopologyVersion topVer0 = cctx.affinity().affinityReadyFuture(topVer).get();
final Set<Integer> primParts = cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer0);
LocalStatisticsGatheringContext ctx = new LocalStatisticsGatheringContext(false, typeDesc,
cacheInfo, cfg, primParts, topVer0);
statProc.updateLocalStatistics(ctx);
}
catch (IgniteCheckedException e) {
log.warning("Unexpected error during statistics collection: " + e.getMessage(), e);
}
finally {
cctx.gate().leave();
}
return new GridFinishedFuture<>(true);
}
/**
* Get statistics configurations for all objects.
*
* @return Collection of all statistics configuration.
* @throws IgniteCheckedException In case of error.
*/
public Collection<StatisticsObjectConfiguration> getAllConfig() throws IgniteCheckedException {
List<StatisticsObjectConfiguration> res = new ArrayList<>();
distrMetaStorage.iterate(STAT_OBJ_PREFIX, (k, v) -> res.add((StatisticsObjectConfiguration)v));
return res;
}
/**
* Start tracking configuration changes and do initial loading.
*/
public void start() {
if (log.isTraceEnabled())
log.trace("Statistics configuration manager starting...");
active = true;
mgmtBusyExecutor.activate();
if (log.isDebugEnabled())
log.debug("Statistics configuration manager started.");
if (distrMetaStorage != null && isServerNode)
mgmtBusyExecutor.execute(this::updateAllLocalStatistics);
}
/**
* Scan statistics configuration and update each key it contains.
*/
public void updateAllLocalStatistics() {
try {
GridCompoundFuture<Boolean, Boolean> compoundFut = new GridCompoundFuture<>(CU.boolReducer());
distrMetaStorage.iterate(STAT_OBJ_PREFIX, (k, v) -> {
StatisticsObjectConfiguration cfg = (StatisticsObjectConfiguration)v;
compoundFut.add(updateLocalStatisticsAsync(cfg));
});
compoundFut.markInitialized();
compoundFut.listen(() -> {
if (compoundFut.error() == null && !compoundFut.result())
mgmtBusyExecutor.execute(this::updateAllLocalStatistics);
});
}
catch (IgniteCheckedException e) {
log.warning("Unexpected statistics configuration processing error", e);
}
}
/**
* Stop tracking configuration changes.
*/
public void stop() {
if (log.isTraceEnabled())
log.trace("Statistics configuration manager stopping...");
active = false;
mgmtBusyExecutor.deactivate();
if (log.isDebugEnabled())
log.debug("Statistics configuration manager stopped.");
}
/**
* Update local statistic for specified database objects on the cluster.
* Each node will scan local primary partitions to collect and update local statistic.
*
* @param targets DB objects to statistics update.
*/
public void updateStatistics(StatisticsObjectConfiguration... targets) {
if (log.isDebugEnabled())
log.debug("Update statistics [targets=" + targets + ']');
for (StatisticsObjectConfiguration target : targets) {
TableDescriptor tbl = schemaMgr.table(target.key().schema(), target.key().obj());
validate(target, tbl);
List<StatisticsColumnConfiguration> colCfgs;
if (F.isEmpty(target.columns()))
colCfgs = tbl.type().fields().keySet().stream()
.filter(col -> !QueryUtils.KEY_FIELD_NAME.equals(col) && !QueryUtils.VAL_FIELD_NAME.equals(col))
.map(col -> new StatisticsColumnConfiguration(col, null))
.collect(Collectors.toList());
else
colCfgs = new ArrayList<>(target.columns().values());
StatisticsObjectConfiguration newCfg = new StatisticsObjectConfiguration(target.key(), colCfgs,
target.maxPartitionObsolescencePercent());
try {
while (true) {
String key = key2String(newCfg.key());
StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key);
StatisticsObjectConfiguration resultCfg = (oldCfg == null) ? newCfg :
StatisticsObjectConfiguration.merge(oldCfg, newCfg);
if (distrMetaStorage.compareAndSet(key, oldCfg, resultCfg))
break;
}
}
catch (IgniteCheckedException ex) {
throw new IgniteSQLException("Error on get or update statistic schema",
IgniteQueryErrorCode.UNKNOWN, ex);
}
}
}
/**
* Drop local statistic for specified database objects on the cluster.
* Remove local aggregated and partitioned statistics that are stored at the local metastorage.
*
* @param targets DB objects to update statistics by.
* @param validate if {@code true} - validate statistics existence, otherwise - just try to remove.
*/
public void dropStatistics(List<StatisticsTarget> targets, boolean validate) {
try {
while (!dropStatisticsAsync(targets, validate).get())
; // No-op
}
catch (IgniteCheckedException ex) {
if (ex.getCause() instanceof IgniteSQLException)
throw (IgniteSQLException)ex.getCause();
throw new IgniteSQLException("Error occurs while updating statistics schema",
IgniteQueryErrorCode.UNKNOWN, ex);
}
}
/**
* Drop local statistic for specified database objects on the cluster.
* Remove local aggregated and partitioned statistics that are stored at the local metastorage.
*
* @param targets DB objects to update statistics by.
* @param validate if {@code true} - validate statistics existence, otherwise - just try to remove.
*/
public IgniteInternalFuture<Boolean> dropStatisticsAsync(List<StatisticsTarget> targets, boolean validate) {
if (log.isDebugEnabled())
log.debug("Drop statistics [targets=" + targets + ']');
GridFutureAdapter<Boolean> resultFut = new GridFutureAdapter<>();
IgniteInternalFuture<Boolean> chainFut = new GridFinishedFuture<>(true);
for (StatisticsTarget target : targets) {
chainFut = chainFut.chainCompose(f -> {
if (f.error() == null && f.result() == Boolean.TRUE)
return removeFromMetastore(target, validate);
return f;
});
}
chainFut.listen(f -> {
if (f.error() != null)
resultFut.onDone(f.error());
else
resultFut.onDone(f.result() == null || f.result().booleanValue());
});
return resultFut;
}
/**
* Drop statistics target configuration from metastore.
*
* @param target Statistics target.
* @param validate Validation flag.
* @return Operation future.
*/
private IgniteInternalFuture<Boolean> removeFromMetastore(StatisticsTarget target, boolean validate) {
String key = key2String(target.key());
try {
StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key);
if (validate)
validateDropRefresh(target, oldCfg);
if (oldCfg == null)
return new GridFinishedFuture<>(null); //Stop future chaining. Other thread\node makes the progress.
Set<String> dropColNames = (target.columns() == null) ? Collections.emptySet() :
Arrays.stream(target.columns()).collect(Collectors.toSet());
StatisticsObjectConfiguration newCfg = oldCfg.dropColumns(dropColNames);
if (oldCfg.equals(newCfg))
return new GridFinishedFuture<>(true); //Skip. Nothing to do.
return distrMetaStorage.compareAndSetAsync(key, oldCfg, newCfg);
}
catch (Throwable ex) {
return new GridFinishedFuture<>(ex);
}
}
/**
* Drop all local statistics on the cluster.
*/
public void dropAll() {
try {
final List<StatisticsTarget> targetsToRemove = new ArrayList<>();
distrMetaStorage.iterate(STAT_OBJ_PREFIX, (k, v) -> {
StatisticsKey statKey = ((StatisticsObjectConfiguration)v).key();
StatisticsObjectConfiguration cfg = (StatisticsObjectConfiguration)v;
if (!F.isEmpty(cfg.columns()))
targetsToRemove.add(new StatisticsTarget(statKey, null));
}
);
dropStatistics(targetsToRemove, false);
}
catch (IgniteCheckedException e) {
throw new IgniteSQLException(
"Unexpected exception drop all statistics", IgniteQueryErrorCode.UNKNOWN, e);
}
}
/**
* Refresh local statistic for specified database objects on the cluster.
*
* @param targets DB objects to statistics update.
*/
public void refreshStatistics(List<StatisticsTarget> targets) {
if (log.isDebugEnabled())
log.debug("Drop statistics [targets=" + targets + ']');
for (StatisticsTarget target : targets) {
String key = key2String(target.key());
try {
while (true) {
StatisticsObjectConfiguration oldCfg = distrMetaStorage.read(key);
validateDropRefresh(target, oldCfg);
Set<String> cols;
if (F.isEmpty(target.columns())) {
cols = oldCfg.columns().values().stream().map(StatisticsColumnConfiguration::name)
.collect(Collectors.toSet());
}
else
cols = Arrays.stream(target.columns()).collect(Collectors.toSet());
StatisticsObjectConfiguration newCfg = oldCfg.refresh(cols);
if (distrMetaStorage.compareAndSet(key, oldCfg, newCfg))
break;
}
}
catch (IgniteCheckedException ex) {
throw new IgniteSQLException(
"Error on get or update statistic schema", IgniteQueryErrorCode.UNKNOWN, ex);
}
}
}
/**
* Validate that drop/refresh target exists in specified configuration. For statistics refresh/drop operations.
*
* @param target Operation targer.
* @param cfg Current statistics configuration.
*/
private void validateDropRefresh(@NotNull StatisticsTarget target, StatisticsObjectConfiguration cfg) {
if (cfg == null || F.isEmpty(cfg.columns())) {
throw new IgniteSQLException(
"Statistic doesn't exist for [schema=" + target.schema() + ", obj=" + target.obj() + ']',
IgniteQueryErrorCode.TABLE_NOT_FOUND
);
}
if (!F.isEmpty(target.columns())) {
for (String col : target.columns()) {
if (!cfg.columns().containsKey(col)) {
throw new IgniteSQLException(
"Statistic doesn't exist for [" +
"schema=" + cfg.key().schema() +
", obj=" + cfg.key().obj() +
", col=" + col + ']',
IgniteQueryErrorCode.COLUMN_NOT_FOUND
);
}
}
}
}
/**
* Read statistics object configuration by key.
*
* @param key Statistics key to read configuration by.
* @return Statistics object configuration of {@code null} if there are no such configuration.
* @throws IgniteCheckedException In case of errors.
*/
public StatisticsObjectConfiguration config(StatisticsKey key) throws IgniteCheckedException {
return distrMetaStorage.read(key2String(key));
}
/**
* Validate specified configuration: check that specified table exist and contains all specified columns.
*
* @param cfg Statistics object configuration to check.
* @param tbl Corresponding table (if exists).
*/
private void validate(StatisticsObjectConfiguration cfg, TableDescriptor tbl) {
if (tbl == null) {
throw new IgniteSQLException(
"Table doesn't exist [schema=" + cfg.key().schema() + ", table=" + cfg.key().obj() + ']',
IgniteQueryErrorCode.TABLE_NOT_FOUND);
}
if (!F.isEmpty(cfg.columns())) {
for (String col : cfg.columns().keySet()) {
if (!tbl.type().fields().containsKey(col)) {
throw new IgniteSQLException(
"Column doesn't exist [schema=" + cfg.key().schema() +
", table=" + cfg.key().obj() +
", column=" + col + ']',
IgniteQueryErrorCode.COLUMN_NOT_FOUND);
}
}
}
}
/**
* Generate metastorage key by specified statistics key.
*
* @param key Statistics key.
* @return Metastorage key.
*/
private static String key2String(StatisticsKey key) {
StringBuilder sb = new StringBuilder(STAT_OBJ_PREFIX);
sb.append(key.schema()).append('.').append(key.obj());
return sb.toString();
}
/**
* Subscribe to statistics configuration changed.
*
* @param subscriber Subscriber.
*/
public void subscribe(Consumer<StatisticsObjectConfiguration> subscriber) {
subscribers.add(subscriber);
}
}