blob: fd0cf9ee99249290ebe514e4bbf000ebb0ee22ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.processors.query.calcite.prepare.AbstractQueryContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseDataContext;
import org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.processors.query.calcite.util.Commons.checkRange;
/**
* Runtime context allowing access to the tables in a database.
*/
public class ExecutionContext<Row> extends AbstractQueryContext implements DataContext {
/** Placeholder for values, which expressions is not specified. */
private static final Object UNSPECIFIED_VALUE = new Object();
/** */
private final UUID qryId;
/** */
private final UUID locNodeId;
/** */
private final UUID originatingNodeId;
/** */
private final AffinityTopologyVersion topVer;
/** */
private final FragmentDescription fragmentDesc;
/** */
private final Map<String, Object> params;
/** */
private final QueryTaskExecutor executor;
/** */
private final RowHandler<Row> handler;
/** */
private final ExpressionFactory<Row> expressionFactory;
/** */
private final AtomicBoolean cancelFlag = new AtomicBoolean();
/** */
private final BaseDataContext baseDataContext;
/** */
private Object[] correlations = new Object[16];
/**
* @param qctx Parent base query context.
* @param qryId Query ID.
* @param fragmentDesc Partitions information.
* @param params Parameters.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public ExecutionContext(
BaseQueryContext qctx,
QueryTaskExecutor executor,
UUID qryId,
UUID locNodeId,
UUID originatingNodeId,
AffinityTopologyVersion topVer,
FragmentDescription fragmentDesc,
RowHandler<Row> handler,
Map<String, Object> params
) {
super(qctx);
this.executor = executor;
this.qryId = qryId;
this.locNodeId = locNodeId;
this.originatingNodeId = originatingNodeId;
this.topVer = topVer;
this.fragmentDesc = fragmentDesc;
this.handler = handler;
this.params = params;
baseDataContext = new BaseDataContext(qctx.typeFactory());
expressionFactory = new ExpressionFactoryImpl<>(
this,
qctx.typeFactory(),
qctx.config().getParserConfig().conformance(),
qctx.rexBuilder()
);
}
/**
* @return Query ID.
*/
public UUID queryId() {
return qryId;
}
/**
* @return Fragment ID.
*/
public long fragmentId() {
return fragmentDesc.fragmentId();
}
/**
* @return Target mapping.
*/
public ColocationGroup target() {
return fragmentDesc.target();
}
/** */
public List<UUID> remotes(long exchangeId) {
return fragmentDesc.remotes().get(exchangeId);
}
/** */
public ColocationGroup group(long sourceId) {
return fragmentDesc.mapping().findGroup(sourceId);
}
/**
* @return Keep binary flag.
*/
public boolean keepBinary() {
return true; // TODO
}
/**
* @return MVCC snapshot.
*/
public MvccSnapshot mvccSnapshot() {
return null; // TODO
}
/**
* @return Handler to access row fields.
*/
public RowHandler<Row> rowHandler() {
return handler;
}
/**
* @return Expression factory.
*/
public ExpressionFactory<Row> expressionFactory() {
return expressionFactory;
}
/**
* @return Local node ID.
*/
public UUID localNodeId() {
return locNodeId;
}
/**
* @return Originating node ID (the node, who started the execution).
*/
public UUID originatingNodeId() {
return originatingNodeId;
}
/**
* @return Topology version.
*/
public AffinityTopologyVersion topologyVersion() {
return topVer;
}
/** */
public IgniteLogger logger() {
return unwrap(BaseQueryContext.class).logger();
}
/** {@inheritDoc} */
@Override public SchemaPlus getRootSchema() {
return baseDataContext.getRootSchema();
}
/** {@inheritDoc} */
@Override public IgniteTypeFactory getTypeFactory() {
return baseDataContext.getTypeFactory();
}
/** {@inheritDoc} */
@Override public QueryProvider getQueryProvider() {
return baseDataContext.getQueryProvider();
}
/** {@inheritDoc} */
@Override public Object get(String name) {
if (Variable.CANCEL_FLAG.camelName.equals(name))
return cancelFlag;
if (name.startsWith("?"))
return TypeUtils.toInternal(this, params.get(name));
return baseDataContext.get(name);
}
/**
* Gets correlated value.
*
* @param id Correlation ID.
* @return Correlated value.
*/
public @NotNull Object getCorrelated(int id) {
checkRange(correlations, id);
return correlations[id];
}
/**
* Sets correlated value.
*
* @param id Correlation ID.
* @param value Correlated value.
*/
public void setCorrelated(@NotNull Object value, int id) {
correlations = Commons.ensureCapacity(correlations, id + 1);
correlations[id] = value;
}
/**
* Executes a query task.
*
* @param task Query task.
*/
public void execute(RunnableX task, Consumer<Throwable> onError) {
if (isCancelled())
return;
executor.execute(qryId, fragmentId(), () -> {
try {
if (!isCancelled())
task.run();
}
catch (Throwable e) {
onError.accept(e);
throw new IgniteException("Unexpected exception", e);
}
});
}
/** */
@FunctionalInterface
public interface RunnableX {
/** */
void run() throws Exception;
}
/**
* Sets cancel flag, returns {@code true} if flag was changed by this call.
*
* @return {@code True} if flag was changed by this call.
*/
public boolean cancel() {
return !cancelFlag.get() && cancelFlag.compareAndSet(false, true);
}
/** */
public boolean isCancelled() {
return cancelFlag.get();
}
/** */
public Object unspecifiedValue() {
return UNSPECIFIED_VALUE;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ExecutionContext<?> context = (ExecutionContext<?>)o;
return qryId.equals(context.qryId) && fragmentDesc.fragmentId() == context.fragmentDesc.fragmentId();
}
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(qryId, fragmentDesc.fragmentId());
}
}