blob: 7c28ac249e76c1f6a066f68c5329028ad0312fcf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.exec.mapping;
import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntObjectPair;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.sql.engine.exec.mapping.MappingServiceImpl.LogicalTopologyHolder.TopologySnapshot;
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.PlanId;
import org.apache.ignite.internal.sql.engine.prepare.pruning.PartitionPruner;
import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
import org.apache.ignite.internal.sql.engine.schema.IgniteDataSource;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
import org.apache.ignite.internal.sql.engine.util.cache.CacheFactory;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.lang.ErrorGroups.Sql;
/**
* An implementation of {@link MappingService}.
*
* <p>This particular implementation keeps track of changes in logical cluster topology.
* Always uses latest topology snapshot to map query.
*/
public class MappingServiceImpl implements MappingService, LogicalTopologyEventListener {
private static final int MAPPING_ATTEMPTS = 3;
private final LogicalTopologyHolder topologyHolder = new LogicalTopologyHolder();
private final CompletableFuture<Void> initialTopologyFuture = new CompletableFuture<>();
private final String localNodeName;
private final ExecutionTargetProvider targetProvider;
private final Cache<PlanId, FragmentsTemplate> templatesCache;
private final Cache<PlanId, MappingsCacheValue> mappingsCache;
private final Executor taskExecutor;
private final PartitionPruner partitionPruner;
/**
* Constructor.
*
* @param localNodeName Name of the current Ignite node.
* @param targetProvider Execution target provider.
* @param cacheFactory A factory to create cache of fragments.
* @param cacheSize Size of the cache of query plans. Should be non negative.
* @param partitionPruner Partition pruner.
* @param taskExecutor Mapper service task executor.
*/
public MappingServiceImpl(
String localNodeName,
ExecutionTargetProvider targetProvider,
CacheFactory cacheFactory,
int cacheSize,
PartitionPruner partitionPruner,
Executor taskExecutor
) {
this.localNodeName = localNodeName;
this.targetProvider = targetProvider;
this.templatesCache = cacheFactory.create(cacheSize);
this.mappingsCache = cacheFactory.create(cacheSize);
this.taskExecutor = taskExecutor;
this.partitionPruner = partitionPruner;
}
@Override
public CompletableFuture<List<MappedFragment>> map(MultiStepPlan multiStepPlan, MappingParameters parameters) {
if (initialTopologyFuture.isDone()) {
return map0(multiStepPlan, parameters);
}
return initialTopologyFuture.thenComposeAsync(ignore -> map0(multiStepPlan, parameters), taskExecutor);
}
/** Called when the primary replica has expired. */
public CompletableFuture<Boolean> onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
assert parameters != null;
assert parameters.groupId() instanceof TablePartitionId;
int tabId = ((TablePartitionId) parameters.groupId()).tableId();
// TODO https://issues.apache.org/jira/browse/IGNITE-21201 Move complex computations to a different thread.
mappingsCache.removeIfValue(value -> value.tableIds.contains(tabId));
return CompletableFutures.falseCompletedFuture();
}
private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan multiStepPlan, MappingParameters parameters) {
TopologySnapshot topology = topologyHolder.topology();
MappingContext context = new MappingContext(localNodeName, topology.nodes());
FragmentsTemplate template = getOrCreateTemplate(multiStepPlan, context);
MappingsCacheValue cacheValue = mappingsCache.compute(multiStepPlan.id(), (key, val) -> {
if (val == null) {
IntSet tableIds = new IntOpenHashSet();
boolean topologyAware = false;
for (Fragment fragment : template.fragments) {
topologyAware = topologyAware || !fragment.systemViews().isEmpty();
for (IgniteDataSource source : fragment.tables().values()) {
tableIds.add(source.id());
}
}
long topVer = topologyAware ? topology.version() : Long.MAX_VALUE;
return new MappingsCacheValue(topVer, tableIds, mapFragments(context, template));
}
if (val.topVer < topology.version()) {
return new MappingsCacheValue(topology.version(), val.tableIds, mapFragments(context, template));
}
return val;
});
return cacheValue.mappedFragments.thenApply(mappedFragments -> applyPartitionPruning(mappedFragments.fragments, parameters));
}
private CompletableFuture<MappedFragments> mapFragments(MappingContext context, FragmentsTemplate template) {
IdGenerator idGenerator = new IdGenerator(template.nextId);
List<Fragment> fragments = new ArrayList<>(template.fragments);
List<CompletableFuture<IntObjectPair<ExecutionTarget>>> targets =
fragments.stream().flatMap(fragment -> Stream.concat(
fragment.tables().values().stream()
.map(table -> targetProvider.forTable(context.targetFactory(), table)
.thenApply(target -> IntObjectPair.of(table.id(), target))
),
fragment.systemViews().stream()
.map(view -> targetProvider.forSystemView(context.targetFactory(), view)
.thenApply(target -> IntObjectPair.of(view.id(), target))
)
))
.collect(Collectors.toList());
return allOf(targets.toArray(new CompletableFuture[0]))
.thenApply(ignored -> {
Int2ObjectMap<ExecutionTarget> targetsById = new Int2ObjectOpenHashMap<>();
for (CompletableFuture<IntObjectPair<ExecutionTarget>> fut : targets) {
// this is a safe join, because we have waited for all futures to be complete
IntObjectPair<ExecutionTarget> pair = fut.join();
targetsById.put(pair.firstInt(), pair.second());
}
FragmentMapper mapper = new FragmentMapper(template.cluster.getMetadataQuery(), context, targetsById);
Long2ObjectMap<FragmentMapping> mappingByFragmentId = new Long2ObjectOpenHashMap<>();
Long2ObjectMap<ColocationGroup> groupsBySourceId = new Long2ObjectOpenHashMap<>();
Long2ObjectMap<List<String>> allSourcesByExchangeId = new Long2ObjectOpenHashMap<>();
Exception ex = null;
boolean lastAttemptSucceed = false;
List<Fragment> fragmentsToMap = fragments;
for (int attempt = 0; attempt < MAPPING_ATTEMPTS && !lastAttemptSucceed; attempt++) {
Fragment currentFragment = null;
try {
for (Fragment fragment : fragmentsToMap) {
currentFragment = fragment;
if (mappingByFragmentId.containsKey(fragment.fragmentId())) {
continue;
}
FragmentMapping mapping = mapper.map(fragment);
mappingByFragmentId.put(fragment.fragmentId(), mapping);
for (ColocationGroup group : mapping.groups()) {
for (long sourceId : group.sourceIds()) {
groupsBySourceId.put(sourceId, group);
}
}
if (!fragment.rootFragment()) {
IgniteSender sender = (IgniteSender) fragment.root();
List<String> nodeNames = mapping.groups().stream()
.flatMap(g -> g.nodeNames().stream())
.distinct().collect(Collectors.toList());
allSourcesByExchangeId.put(sender.exchangeId(), nodeNames);
}
}
lastAttemptSucceed = true;
} catch (FragmentMappingException mappingException) {
if (ex == null) {
ex = mappingException;
} else {
ex.addSuppressed(mappingException);
}
fragmentsToMap = replace(
fragmentsToMap,
currentFragment,
new FragmentSplitter(idGenerator, mappingException.node()).go(currentFragment)
);
}
}
if (!lastAttemptSucceed) {
throw new IgniteInternalException(Sql.MAPPING_ERR, "Unable to map query: " + ex.getMessage(), ex);
}
List<MappedFragment> mappedFragmentsList = new ArrayList<>(fragmentsToMap.size());
Set<String> targetNodes = new HashSet<>();
for (Fragment fragment : fragmentsToMap) {
FragmentMapping mapping = mappingByFragmentId.get(fragment.fragmentId());
ColocationGroup targetGroup = null;
if (!fragment.rootFragment()) {
IgniteSender sender = (IgniteSender) fragment.root();
targetGroup = groupsBySourceId.get(sender.exchangeId());
}
Long2ObjectMap<List<String>> sourcesByExchangeId = null;
for (IgniteReceiver receiver : fragment.remotes()) {
if (sourcesByExchangeId == null) {
sourcesByExchangeId = new Long2ObjectOpenHashMap<>();
}
long exchangeId = receiver.exchangeId();
sourcesByExchangeId.put(exchangeId, allSourcesByExchangeId.get(exchangeId));
}
MappedFragment mappedFragment = new MappedFragment(
fragment,
mapping.groups(),
sourcesByExchangeId,
targetGroup,
null
);
mappedFragmentsList.add(mappedFragment);
targetNodes.addAll(mappedFragment.nodes());
}
return new MappedFragments(mappedFragmentsList, targetNodes);
});
}
@Override
public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot newTopology) {
topologyHolder.update(newTopology);
}
@Override
public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot newTopology) {
topologyHolder.update(newTopology);
mappingsCache.removeIfValue(value -> {
if (value.mappedFragments.isDone()) {
return value.mappedFragments.join().nodes.contains(leftNode.name());
}
// Invalidate non-completed mappings to reduce the chance of getting stale value
return true;
});
}
@Override
public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
topologyHolder.update(newTopology);
}
private static List<Fragment> replace(
List<Fragment> originalFragments,
Fragment fragmentToReplace,
List<Fragment> replacement
) {
assert !nullOrEmpty(replacement);
Long2LongOpenHashMap newTargets = new Long2LongOpenHashMap();
for (Fragment fragment0 : replacement) {
for (IgniteReceiver remote : fragment0.remotes()) {
newTargets.put(remote.exchangeId(), fragment0.fragmentId());
}
}
List<Fragment> newFragments = new ArrayList<>(originalFragments.size() + replacement.size() - 1);
for (Fragment fragment : originalFragments) {
if (fragment == fragmentToReplace) {
fragment = first(replacement);
} else if (!fragment.rootFragment()) {
IgniteSender sender = (IgniteSender) fragment.root();
long newTargetId = newTargets.getOrDefault(sender.exchangeId(), Long.MIN_VALUE);
if (newTargetId != Long.MIN_VALUE) {
sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(),
sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution());
fragment = new Fragment(fragment.fragmentId(), fragment.correlated(), sender,
fragment.remotes(), fragment.tables(), fragment.systemViews());
}
}
newFragments.add(fragment);
}
newFragments.addAll(replacement.subList(1, replacement.size()));
return newFragments;
}
private List<MappedFragment> applyPartitionPruning(List<MappedFragment> mappedFragments, MappingParameters parameters) {
return partitionPruner.apply(mappedFragments, parameters.dynamicParameters());
}
/**
* Holder for topology snapshots that guarantees monotonically increasing versions.
*/
class LogicalTopologyHolder {
private volatile TopologySnapshot topology = new TopologySnapshot(Long.MIN_VALUE, List.of());
void update(LogicalTopologySnapshot topologySnapshot) {
synchronized (this) {
if (topology.version() < topologySnapshot.version()) {
topology = new TopologySnapshot(topologySnapshot.version(), deriveNodeNames(topologySnapshot));
}
if (initialTopologyFuture.isDone() || !topology.nodes().contains(localNodeName)) {
return;
}
}
initialTopologyFuture.complete(null);
}
TopologySnapshot topology() {
return topology;
}
private List<String> deriveNodeNames(LogicalTopologySnapshot topology) {
return topology.nodes().stream()
.map(LogicalNode::name)
.collect(Collectors.toUnmodifiableList());
}
class TopologySnapshot {
private final List<String> nodes;
private final long version;
TopologySnapshot(long version, List<String> nodes) {
this.version = version;
this.nodes = nodes;
}
public List<String> nodes() {
return nodes;
}
public long version() {
return version;
}
}
}
private FragmentsTemplate getOrCreateTemplate(MultiStepPlan plan, MappingContext context) {
// QuerySplitter is deterministic, thus we can cache result in order to reuse it next time
return templatesCache.get(plan.id(), key -> {
IdGenerator idGenerator = new IdGenerator(0);
List<Fragment> fragments = new QuerySplitter(idGenerator, context.cluster()).split(plan.root());
return new FragmentsTemplate(
idGenerator.nextId(), context.cluster(), fragments
);
});
}
private static class FragmentsTemplate {
private final long nextId;
private final RelOptCluster cluster;
private final List<Fragment> fragments;
FragmentsTemplate(long nextId, RelOptCluster cluster, List<Fragment> fragments) {
this.nextId = nextId;
this.cluster = cluster;
this.fragments = fragments;
}
}
/** Wraps list of mapped fragments with target node names. */
private static class MappedFragments {
final List<MappedFragment> fragments;
final Set<String> nodes;
MappedFragments(List<MappedFragment> fragments, Set<String> nodes) {
this.fragments = fragments;
this.nodes = nodes;
}
}
private static class MappingsCacheValue {
private final long topVer;
private final IntSet tableIds;
private final CompletableFuture<MappedFragments> mappedFragments;
MappingsCacheValue(long topVer, IntSet tableIds, CompletableFuture<MappedFragments> mappedFragments) {
this.topVer = topVer;
this.tableIds = tableIds;
this.mappedFragments = mappedFragments;
}
}
}