blob: 13cd7c2ea878732901ba727a136d7d6daadf959d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.sql.engine.prepare;
import static org.apache.ignite.internal.sql.engine.externalize.RelJsonWriter.toJson;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.ignite.internal.sql.engine.metadata.ColocationMappingException;
import org.apache.ignite.internal.sql.engine.metadata.FragmentMapping;
import org.apache.ignite.internal.sql.engine.metadata.FragmentMappingException;
import org.apache.ignite.internal.sql.engine.metadata.IgniteMdFragmentMapping;
import org.apache.ignite.internal.sql.engine.metadata.MappingService;
import org.apache.ignite.internal.sql.engine.metadata.NodeMappingException;
import org.apache.ignite.internal.sql.engine.rel.IgniteReceiver;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteSender;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Fragment of distributed query.
*/
public class Fragment {
private final long id;
private final IgniteRel root;
/** Serialized root representation. */
@IgniteToStringExclude
private final String rootSer;
private final FragmentMapping mapping;
private final List<IgniteReceiver> remotes;
/**
* Constructor.
*
* @param id Fragment id.
* @param root Root node of the fragment.
* @param remotes Remote sources of the fragment.
*/
public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes) {
this(id, root, remotes, null, null);
}
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer, @Nullable FragmentMapping mapping) {
this.id = id;
this.root = root;
this.remotes = List.copyOf(remotes);
this.rootSer = rootSer != null ? rootSer : toJson(root);
this.mapping = mapping;
}
/**
* Get fragment ID.
*/
public long fragmentId() {
return id;
}
/**
* Get root node.
*/
public IgniteRel root() {
return root;
}
/**
* Lazy serialized root representation.
*
* @return Serialized form.
*/
public String serialized() {
return rootSer;
}
public FragmentMapping mapping() {
return mapping;
}
private FragmentMapping mapping(MappingQueryContext ctx, RelMetadataQuery mq, Supplier<List<String>> nodesSource) {
try {
FragmentMapping mapping = IgniteMdFragmentMapping.fragmentMappingForMetadataQuery(root, mq, ctx);
if (rootFragment()) {
mapping = FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
}
if (single() && mapping.nodeIds().size() > 1) {
// this is possible when the fragment contains scan of a replicated cache, which brings
// several nodes (actually all containing nodes) to the colocation group, but this fragment
// supposed to be executed on a single node, so let's choose one wisely
mapping = FragmentMapping.create(mapping.nodeIds()
.get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
}
return mapping.finalize(nodesSource);
} catch (NodeMappingException e) {
throw new FragmentMappingException("Failed to calculate physical distribution", this, e.node(), e);
} catch (ColocationMappingException e) {
throw new FragmentMappingException("Failed to calculate physical distribution", this, root, e);
}
}
/**
* Get fragment remote sources.
*/
public List<IgniteReceiver> remotes() {
return remotes;
}
public boolean rootFragment() {
return !(root instanceof IgniteSender);
}
public Fragment attach(RelOptCluster cluster) {
return root.getCluster() == cluster ? this : new Cloner(cluster).go(this);
}
/**
* Maps the fragment to its data location.
*
* @param ctx Planner context.
* @param mq Metadata query.
*/
Fragment map(MappingService mappingSrvc, MappingQueryContext ctx, RelMetadataQuery mq) throws FragmentMappingException {
if (mapping != null) {
return this;
}
return new Fragment(id, root, remotes, rootSer, mapping(ctx, mq, nodesSource(mappingSrvc, ctx)));
}
@NotNull
private Supplier<List<String>> nodesSource(MappingService mappingSrvc, MappingQueryContext ctx) {
return () -> mappingSrvc.executionNodes(single(), null);
}
private boolean single() {
return root instanceof IgniteSender
&& ((IgniteSender) root).sourceDistribution().satisfies(IgniteDistributions.single());
}
/** {@inheritDoc} */
@Override
public String toString() {
return S.toString(Fragment.class, this, "root", RelOptUtil.toString(root));
}
}