blob: 10c146cdbd1ababe4a470c00c7cab647f4c70ab9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.stat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.processors.query.stat.task.GatherPartitionStatistics;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
/**
* Process all tasks, related to statistics repository. Mostly - statistics collection,
* invalidation (due to configuration, topology or obsolescence issues) and loads.
* Input tasks should be scheduled throug management pool while gathering pool used to process heavy
* operations in parallel.
*
* Manage gathering pool and it's jobs. To guarantee gracefull shutdown:
* 1) Any job can be added into gatheringInProgress only in active state (check after adding)
* 2) State can be disactivated only after cancelling all jobs and getting busyLock block
* 3) Each job should do it's work in busyLock with periodically checking of it's cancellation status.
*/
public class StatisticsProcessor {
/** Logger. */
private final IgniteLogger log;
/** Ignite statistics repository. */
private final IgniteStatisticsRepository statRepo;
/** Ignite Thread pool executor to do statistics collection tasks. */
private final BusyExecutor gatheringBusyExecutor;
/** (cacheGroupId -> gather context) */
private final ConcurrentMap<StatisticsKey, LocalStatisticsGatheringContext> gatheringInProgress =
new ConcurrentHashMap<>();
/**
* Constructor.
*
* @param repo IgniteStatisticsRepository.
* @param gatherPool Thread pool to gather statistics in.
* @param stopping External stopping state supplier.
* @param logSupplier Log supplier function.
*/
public StatisticsProcessor(
IgniteStatisticsRepository repo,
IgniteThreadPoolExecutor gatherPool,
Supplier<Boolean> stopping,
Function<Class<?>, IgniteLogger> logSupplier
) {
this.statRepo = repo;
this.gatheringBusyExecutor = new BusyExecutor("gathering", gatherPool, stopping, logSupplier);
this.log = logSupplier.apply(StatisticsProcessor.class);
}
/**
* Update statistics for the given key to actual state.
* If byObsolescence and tbl is not {@code null} - does not clear any other partitions.
*
* 1) Replace previous gathering context if exist and needed (replace byObsolescence gathering with new one or
* replace gathering with older configuration or topology version with new one).
* 2) If byObsolescence and no table awailable - clean obsolescence and partition statistics for the given key.
* 3) Submit tasks for each specified partition.
* 4) after last task finish gathering - it starts aggregation.
* 5) read all partitions & obsolescence from repo and
* if byObsolescence = {@code true} - remove unnecessary one and aggregate by specified list
* if byObsolescence = {@code false} - aggregate all presented in store (because it should contains only actual ones)
* 5) save aggregated local statistics
*
* @param ctx Statistics Gathering context.
*/
public void updateLocalStatistics(LocalStatisticsGatheringContext ctx) {
if (log.isDebugEnabled()) {
log.debug(String.format(
"Start statistics processing: forceRecollect=%b, cfg=%s, partToProcess = %s, topVer=%s",
ctx.forceRecollect(), ctx.configuration(), ctx.allParts(), ctx.topologyVersion()));
}
if (registerNewTask(ctx)) {
try {
if (ctx.forceRecollect())
// To save all obsolescence info even if there is no partitions to recollect.
statRepo.saveObsolescenceInfo(ctx.configuration().key());
if (ctx.table() == null || ctx.configuration() == null || ctx.configuration().columns().isEmpty()) {
statRepo.clearLocalPartitionsStatistics(ctx.configuration().key(), null);
ctx.future().complete(null);
return;
}
if (ctx.remainingParts().isEmpty())
ctx.future().complete(null);
else
submitTasks(ctx);
}
catch (Throwable t) {
// Submit tasks can't fire an error, so no need to cancel and wait for tasks finished here.
ctx.future().completeExceptionally(t);
}
}
else {
if (log.isDebugEnabled())
log.debug("Gathered by key " + ctx.configuration().key() + " were skipped due to previous one.");
}
}
/**
* Try to register new task. Returned task will remove itself from gatheringInProgress after completion.
* If there are some other task for the given key - operation will be scheduled right after it if necessary
* (current task have newer configuration or topology).
*
* @param ctx Task to register.
* @return {@code true} if task was actually pushed as Gathering in progress task, {@code false} - otherwise.
*/
private boolean registerNewTask(LocalStatisticsGatheringContext ctx) {
boolean res[] = new boolean[1];
gatheringInProgress.compute(ctx.configuration().key(), (k, v) -> {
if (v == null) {
// No old context - start new
res[0] = true;
ctx.future().whenComplete((r, t) -> {
if (t != null) {
if (t instanceof CancellationException || t instanceof NodeStoppingException) {
if (log.isDebugEnabled())
log.debug("Got " + t.getClass() + " exception during statistics collection by key "
+ ctx.configuration().key() + "." );
}
else
log.warning("Unexpected error during statistics collection by key " +
ctx.configuration().key() + " . " + t.getMessage(), t);
}
// Anyway - try to remove itself from gathering map.
gatheringInProgress.remove(ctx.configuration().key(), ctx);
});
return ctx;
}
// If there are key - check if we can cancel it.
if (v.topologyVersion() == null ||
(ctx.topologyVersion() != null && v.topologyVersion().compareTo(ctx.topologyVersion()) < 0) ||
v.configuration().compareTo(ctx.configuration()) < 0) {
// Old context for older topology or config - cancel and start new
v.cancel();
v.future().whenComplete((r, t) -> {
// Will be executed before original, so have to try to cancel previous context to add new one.
gatheringInProgress.remove(ctx.configuration().key(), v);
boolean rescheduled = gatheringBusyExecutor.busyRun(() -> updateLocalStatistics(ctx));
if (!rescheduled && log.isDebugEnabled()) {
log.debug("Unable to reschedule statistics task by key " + ctx.configuration().key()
+ " due to inactive state.");
}
});
res[0] = false;
return v;
}
res[0] = false;
return v;
});
return res[0];
}
/**
* Generate and subtim tasks into gathering pool for existing gathering context.
*
* @param ctx Context to generate tasks by.
*/
private void submitTasks(LocalStatisticsGatheringContext ctx) {
for (int part : ctx.remainingParts()) {
final GatherPartitionStatistics task = new GatherPartitionStatistics(
statRepo,
ctx,
part,
log
);
gatheringBusyExecutor.submit(new CancellableTask() {
/** {@inheritDoc} */
@Override public void run() {
processPartitionTask(task);
}
/** {@inheritDoc} */
@Override public void cancel() {
task.context().cancel();
}
})
.thenAccept(success -> {
if (!success) {
if (log.isDebugEnabled()) {
log.debug(String.format("Gathering failed for key %s.%d ", ctx.configuration().key(),
task.partition()));
}
ctx.partitionNotAvailable(task.partition());
}
});
}
}
/**
* Aggregate partition statistics to local one.
* @param ctx Context to use in aggregation.
*/
private void aggregateStatistics(LocalStatisticsGatheringContext ctx) {
if (ctx.cancelled())
return;
StatisticsKey key = ctx.configuration().key();
Collection<ObjectPartitionStatisticsImpl> allParts = statRepo.getLocalPartitionsStatistics(key);
if (ctx.forceRecollect())
statRepo.aggregatedLocalStatistics(allParts, ctx.configuration(), ctx.topologyVersion());
else {
Set<Integer> partsToRemove = new HashSet<>();
Collection<ObjectPartitionStatisticsImpl> partsToAggregate = new ArrayList<>();
for (ObjectPartitionStatisticsImpl partStat : allParts) {
if (ctx.allParts() == null || !ctx.allParts().contains(partStat.partId()))
partsToRemove.add(partStat.partId());
else
partsToAggregate.add(partStat);
}
if (!partsToRemove.isEmpty())
statRepo.clearLocalPartitionsStatistics(ctx.configuration().key(), partsToRemove);
if (!partsToAggregate.isEmpty())
statRepo.aggregatedLocalStatistics(partsToAggregate, ctx.configuration(), ctx.topologyVersion());
}
}
/**
* Process partition: call gather task, mark partition done, aggregate and complete future if needed.
* @param task Partition gathering task to process.
*/
private void processPartitionTask(final GatherPartitionStatistics task) {
LocalStatisticsGatheringContext ctx = task.context();
try {
task.call();
if (ctx.partitionDone(task.partition())) {
if (log.isDebugEnabled())
log.debug("Local partitions statistics successfully gathered by key " +
ctx.configuration().key());
aggregateStatistics(ctx);
ctx.future().complete(null);
}
}
catch (Throwable t) {
ctx.partitionNotAvailable(task.partition());
if (t instanceof GatherStatisticCancelException) {
if (log.isDebugEnabled()) {
log.debug("Collect statistics task was cancelled " +
"[key=" + ctx.configuration().key() + ", part=" + task.partition() + ']');
}
}
else if (t.getCause() instanceof NodeStoppingException) {
if (log.isDebugEnabled())
log.debug("Node stopping during statistics collection on " +
"[key=" + ctx.configuration().key() + ", part=" + task.partition() + ']');
}
else
log.warning("Unexpected error on statistic gathering", t);
}
}
/**
* Check if specified object stistic fully meet specified statistics object configuration.
*
* @param stat Object statistics to test.
* @param cfg Statistics object configuration to compare with.
* @return {@code true} if specified statistics fully meet to specified configuration requiremenrs,
* {@code false} - otherwise.
*/
private boolean partStatSuitToConfiguration(ObjectStatisticsImpl stat, StatisticsObjectConfiguration cfg) {
if (stat == null)
return false;
if (stat.columnsStatistics().size() != cfg.columns().size())
return false;
for (StatisticsColumnConfiguration colCfg : cfg.columns().values()) {
ColumnStatistics colStat = stat.columnStatistics(colCfg.name());
if (colStat == null || colCfg.version() > colStat.version())
return false;
}
return true;
}
/**
* Start gathering.
*/
public void start() {
if (log.isDebugEnabled())
log.debug("Statistics gathering started.");
gatheringBusyExecutor.activate();
}
/**
* Stop gathering.
*/
public void stop() {
if (log.isTraceEnabled())
log.trace(String.format("Statistics gathering stopping %d task...", gatheringInProgress.size()));
// Can skip waiting for each task finished because of global busyLock.
gatheringBusyExecutor.deactivate();
if (log.isDebugEnabled())
log.debug("Statistics gathering stopped.");
}
}