blob: ce7cd5ae68edc3c4cce07a4b8b81b2a831002394 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.stat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.schema.management.SchemaManager;
import org.apache.ignite.internal.processors.query.schema.management.TableDescriptor;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsColumnConfiguration;
import org.apache.ignite.internal.processors.query.stat.config.StatisticsObjectConfiguration;
import org.apache.ignite.internal.processors.query.stat.messages.StatisticsKeyMessage;
import org.apache.ignite.internal.processors.query.stat.messages.StatisticsRequest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.jetbrains.annotations.Nullable;
/**
* Utility methods to statistics messages generation.
*/
public class IgniteStatisticsHelper {
/** Logger. */
private final IgniteLogger log;
/** Schema manager. */
private final SchemaManager schemaMgr;
/**
* Constructor.
*
* @param locNodeId Local node id.
* @param schemaMgr Schema manager.
* @param logSupplier Ignite logger supplier to get logger from.
*/
public IgniteStatisticsHelper(
UUID locNodeId,
SchemaManager schemaMgr,
Function<Class<?>, IgniteLogger> logSupplier
) {
this.schemaMgr = schemaMgr;
this.log = logSupplier.apply(IgniteStatisticsHelper.class);
}
/**
* Get cache group context by specified statistics key.
*
* @param key Statistics key to get context by.
* @return Cache group context for the given key.
* @throws IgniteCheckedException If unable to find table by specified key.
*/
public CacheGroupContext groupContext(StatisticsKey key) throws IgniteCheckedException {
TableDescriptor tbl = schemaMgr.table(key.schema(), key.obj());
if (tbl == null)
throw new IgniteCheckedException(String.format("Can't find object %s.%s", key.schema(), key.obj()));
return tbl.cacheInfo().cacheContext().group();
}
/**
* Generate local statistics requests.
*
* @param target Statistics target to request local statistics by.
* @param cfg Statistics configuration.
* @return Collection of statistics request.
*/
public List<StatisticsAddressedRequest> generateGatheringRequests(
StatisticsTarget target,
StatisticsObjectConfiguration cfg
) throws IgniteCheckedException {
List<String> cols = (target.columns() == null) ? null : Arrays.asList(target.columns());
StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(target.schema(), target.obj(), cols);
Map<String, Long> versions = cfg.columns().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().version()));
CacheGroupContext grpCtx = groupContext(target.key());
AffinityTopologyVersion topVer = grpCtx.affinity().lastVersion();
StatisticsRequest req = new StatisticsRequest(UUID.randomUUID(), keyMsg, StatisticsType.LOCAL, topVer, versions);
List<List<ClusterNode>> assignments = grpCtx.affinity().assignments(topVer);
Set<UUID> nodes = new HashSet<>();
for (List<ClusterNode> partNodes : assignments) {
if (F.isEmpty(partNodes))
continue;
nodes.add(partNodes.get(0).id());
}
List<StatisticsAddressedRequest> res = new ArrayList<>(nodes.size());
for (UUID nodeId : nodes)
res.add(new StatisticsAddressedRequest(req, nodeId));
return res;
}
/**
* Aggregate specified partition level statistics to local level statistics.
*
* @param cfg Statistics object configuration.
* @param stats Collection of all local partition level or local level statistics by specified key to aggregate.
* @return Local level aggregated statistics.
*/
public ObjectStatisticsImpl aggregateLocalStatistics(
StatisticsObjectConfiguration cfg,
Collection<? extends ObjectStatisticsImpl> stats
) {
StatisticsKeyMessage keyMsg = new StatisticsKeyMessage(
cfg.key().schema(),
cfg.key().obj(),
new ArrayList<>(cfg.columns().keySet())
);
// For now there can be only tables
TableDescriptor tbl = schemaMgr.table(keyMsg.schema(), keyMsg.obj());
if (tbl == null) {
// remove all loaded statistics.
if (log.isDebugEnabled())
log.debug(String.format("Removing statistics for object %s.%s cause table doesn't exists.",
keyMsg.schema(), keyMsg.obj()));
return null;
}
return aggregateLocalStatistics(tbl.type(), cfg, stats, log);
}
/**
* Aggregate partition level statistics to local level one or local statistics to global one.
*
* @param tbl Table to aggregate statistics by.
* @param cfg Statistics object configuration.
* @param stats Collection of partition level or local level statistics to aggregate.
* @param log Logger.
* @return Local level statistics.
*/
public static ObjectStatisticsImpl aggregateLocalStatistics(
GridQueryTypeDescriptor tbl,
StatisticsObjectConfiguration cfg,
Collection<? extends ObjectStatisticsImpl> stats,
IgniteLogger log
) {
assert !stats.isEmpty();
List<String> selectedCols = filterColumns(tbl, cfg.columns().keySet()).stream().map(T2::getValue)
.collect(Collectors.toList());
Map<String, List<ColumnStatistics>> colPartStats = new HashMap<>(selectedCols.size());
long rowCnt = 0;
for (String col : selectedCols)
colPartStats.put(col, new ArrayList<>());
for (ObjectStatisticsImpl partStat : stats) {
for (String col : selectedCols) {
ColumnStatistics colPartStat = partStat.columnStatistics(col);
if (colPartStat != null)
colPartStats.get(col).add(colPartStat);
}
rowCnt += partStat.rowCount();
}
Map<String, ColumnStatistics> colStats = new HashMap<>(selectedCols.size());
for (String col : selectedCols) {
StatisticsColumnConfiguration colCfg = cfg.columns().get(col);
ColumnStatistics stat = ColumnStatisticsCollector.aggregate(colPartStats.get(col),
colCfg.overrides());
if (log.isDebugEnabled())
log.debug("Aggregate column statistic done [col=" + col + ", stat=" + stat + ']');
colStats.put(col, stat);
}
rowCnt = calculateRowCount(cfg, rowCnt);
return new ObjectStatisticsImpl(rowCnt, colStats);
}
/**
* Calculate effective row count. If there are some overrides in statistics configuration - maximum value will be
* choosen. If not - will return actualRowCount.
*
* @param cfg Statistics configuration to dig overrides row count from.
* @param actualRowCount Actual row count.
* @return Effective row count.
*/
public static long calculateRowCount(StatisticsObjectConfiguration cfg, long actualRowCount) {
long overridedRowCnt = -1;
for (StatisticsColumnConfiguration ccfg : cfg.columns().values()) {
if (ccfg.overrides() != null && ccfg.overrides().total() != null) {
Long colRowCnt = ccfg.overrides().total();
overridedRowCnt = Math.max(overridedRowCnt, colRowCnt);
}
}
return (overridedRowCnt == -1) ? actualRowCount : overridedRowCnt;
}
/**
* Build object configurations array with all default parameters from specified targets.
*
* @param targets Targets to build configurations from.
* @return StatisticsObjectConfiguration array.
*/
public static StatisticsObjectConfiguration[] buildDefaultConfigurations(StatisticsTarget... targets) {
StatisticsObjectConfiguration[] res = Arrays.stream(targets)
.map(t -> {
List<StatisticsColumnConfiguration> colCfgs;
if (t.columns() == null)
colCfgs = Collections.emptyList();
else
colCfgs = Arrays.stream(t.columns()).map(name -> new StatisticsColumnConfiguration(name, null))
.collect(Collectors.toList());
return new StatisticsObjectConfiguration(t.key(), colCfgs,
StatisticsObjectConfiguration.DEFAULT_OBSOLESCENCE_MAX_PERCENT);
}).toArray(StatisticsObjectConfiguration[]::new);
return res;
}
/**
* Filter columns by specified names.
*
* @param typeDescriptor Table descriptor.
* @param colNames Column names.
* @return Column with specified names.
*/
public static List<T2<Integer, String>> filterColumns(
GridQueryTypeDescriptor typeDescriptor,
@Nullable Collection<String> colNames
) {
Stream<T2<Integer, String>> colStream = enumerate(typeDescriptor.fields().keySet().stream(),
QueryUtils.DEFAULT_COLUMNS_COUNT);
if (F.isEmpty(colNames)) {
return colStream.filter(col -> !QueryUtils.KEY_FIELD_NAME.equals(col.getValue()) &&
!QueryUtils.VAL_FIELD_NAME.equals(col.getValue())).collect(Collectors.toList());
}
Set<String> colNamesSet = new HashSet<>(colNames);
return colStream.filter(col -> colNamesSet.contains(col.getValue())).collect(Collectors.toList());
}
/** */
private static <T> Stream<T2<Integer, T>> enumerate(Stream<? extends T> stream, int startIdx) {
Iterator<T2<Integer, T>> iter = new Iterator<T2<Integer, T>>() {
private final Iterator<? extends T> streamIter = stream.iterator();
private int idx = startIdx;
@Override public boolean hasNext() {
return streamIter.hasNext();
}
@Override public T2<Integer, T> next() {
return new T2<>(idx++, streamIter.next());
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED |
Spliterator.IMMUTABLE), false);
}
}