blob: 7afc6746ed8e69b2e1579e6a2b4b007a47154161 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.calcite;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.calcite.DataContexts;
import org.apache.calcite.config.Lex;
import org.apache.calcite.config.NullCollation;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionService;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityService;
import org.apache.ignite.internal.processors.query.calcite.metadata.AffinityServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
import org.apache.ignite.internal.processors.query.calcite.metadata.MappingServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
import org.apache.ignite.internal.processors.query.calcite.prepare.CacheKey;
import org.apache.ignite.internal.processors.query.calcite.prepare.ExplainPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.FieldsMetadata;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteConvertletTable;
import org.apache.ignite.internal.processors.query.calcite.prepare.IgniteTypeCoercion;
import org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.PrepareServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlan;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCache;
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolderImpl;
import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlConformance;
import org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteOwnSqlOperatorTable;
import org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteStdSqlOperatorTable;
import org.apache.ignite.internal.processors.query.calcite.sql.generated.IgniteSqlParserImpl;
import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTraitDef;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
/** */
public class CalciteQueryProcessor extends GridProcessorAdapter implements QueryEngine {
/**
* Default planner timeout, in ms.
*/
private static final long DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT = 15000;
/**
* Planner timeout property name.
*/
@SystemProperty(value = "Timeout of calcite based sql engine's planner, in ms", type = Long.class,
defaults = "" + DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT)
public static final String IGNITE_CALCITE_PLANNER_TIMEOUT = "IGNITE_CALCITE_PLANNER_TIMEOUT";
/** */
public static final FrameworkConfig FRAMEWORK_CONFIG = Frameworks.newConfigBuilder()
.executor(new RexExecutorImpl(DataContexts.EMPTY))
.sqlToRelConverterConfig(SqlToRelConverter.config()
.withTrimUnusedFields(true)
// currently SqlToRelConverter creates not optimal plan for both optimization and execution
// so it's better to disable such rewriting right now
// TODO: remove this after IGNITE-14277
.withInSubQueryThreshold(Integer.MAX_VALUE)
.withDecorrelationEnabled(true)
.withExpand(false)
.withHintStrategyTable(
HintStrategyTable.builder()
.hintStrategy("DISABLE_RULE", (hint, rel) -> true)
.hintStrategy("EXPAND_DISTINCT_AGG", (hint, rel) -> rel instanceof Aggregate)
// QUERY_ENGINE hint preprocessed by regexp, but to avoid warnings should be also in HintStrategyTable.
.hintStrategy("QUERY_ENGINE", (hint, rel) -> true)
.build()
)
)
.convertletTable(IgniteConvertletTable.INSTANCE)
.parserConfig(
SqlParser.config()
.withParserFactory(IgniteSqlParserImpl.FACTORY)
.withLex(Lex.ORACLE)
.withConformance(IgniteSqlConformance.INSTANCE))
.sqlValidatorConfig(SqlValidator.Config.DEFAULT
.withIdentifierExpansion(true)
.withDefaultNullCollation(NullCollation.LOW)
.withSqlConformance(IgniteSqlConformance.INSTANCE)
.withTypeCoercionFactory(IgniteTypeCoercion::new))
// Dialects support.
.operatorTable(SqlOperatorTables.chain(IgniteStdSqlOperatorTable.INSTANCE, IgniteOwnSqlOperatorTable.instance()))
// Context provides a way to store data within the planner session that can be accessed in planner rules.
.context(Contexts.empty())
// Custom cost factory to use during optimization
.costFactory(new IgniteCostFactory())
.typeSystem(IgniteTypeSystem.INSTANCE)
.traitDefs(new RelTraitDef<?>[] {
ConventionTraitDef.INSTANCE,
RelCollationTraitDef.INSTANCE,
DistributionTraitDef.INSTANCE,
RewindabilityTraitDef.INSTANCE,
CorrelationTraitDef.INSTANCE,
})
.build();
/** Query planner timeout. */
private final long queryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT,
DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT);
/** */
private final QueryPlanCache qryPlanCache;
/** */
private final QueryTaskExecutor taskExecutor;
/** */
private final FailureProcessor failureProcessor;
/** */
private final AffinityService partSvc;
/** */
private final SchemaHolder schemaHolder;
/** */
private final MessageService msgSvc;
/** */
private final ExchangeService exchangeSvc;
/** */
private final MappingService mappingSvc;
/** */
private final MailboxRegistry mailboxRegistry;
/** */
private final ExecutionService<Object[]> executionSvc;
/** */
private final PrepareServiceImpl prepareSvc;
/** */
private final QueryRegistry qryReg;
/** */
private final CalciteQueryEngineConfiguration cfg;
/** */
private volatile boolean started;
/**
* @param ctx Kernal context.
*/
public CalciteQueryProcessor(GridKernalContext ctx) {
super(ctx);
failureProcessor = ctx.failure();
schemaHolder = new SchemaHolderImpl(ctx);
qryPlanCache = new QueryPlanCacheImpl(ctx);
mailboxRegistry = new MailboxRegistryImpl(ctx);
taskExecutor = new QueryTaskExecutorImpl(ctx);
executionSvc = new ExecutionServiceImpl<>(ctx, ArrayRowHandler.INSTANCE);
partSvc = new AffinityServiceImpl(ctx);
msgSvc = new MessageServiceImpl(ctx);
mappingSvc = new MappingServiceImpl(ctx);
exchangeSvc = new ExchangeServiceImpl(ctx);
prepareSvc = new PrepareServiceImpl(ctx);
qryReg = new QueryRegistryImpl(ctx);
QueryEngineConfiguration[] qryEnginesCfg = ctx.config().getSqlConfiguration().getQueryEnginesConfiguration();
if (F.isEmpty(qryEnginesCfg))
cfg = new CalciteQueryEngineConfiguration();
else {
cfg = (CalciteQueryEngineConfiguration)Arrays.stream(qryEnginesCfg)
.filter(c -> c instanceof CalciteQueryEngineConfiguration)
.findAny()
.orElse(new CalciteQueryEngineConfiguration());
}
}
/**
* @return Affinity service.
*/
public AffinityService affinityService() {
return partSvc;
}
/**
* @return Query cache.
*/
public QueryPlanCache queryPlanCache() {
return qryPlanCache;
}
/**
* @return Task executor.
*/
public QueryTaskExecutor taskExecutor() {
return taskExecutor;
}
/**
* @return Schema holder.
*/
public SchemaHolder schemaHolder() {
return schemaHolder;
}
/**
* @return Message service.
*/
public MessageService messageService() {
return msgSvc;
}
/**
* @return Mapping service.
*/
public MappingService mappingService() {
return mappingSvc;
}
/**
* @return Exchange service.
*/
public ExchangeService exchangeService() {
return exchangeSvc;
}
/**
* @return Mailbox registry.
*/
public MailboxRegistry mailboxRegistry() {
return mailboxRegistry;
}
/**
* @return Failure processor.
*/
public FailureProcessor failureProcessor() {
return failureProcessor;
}
/** */
public PrepareServiceImpl prepareService() {
return prepareSvc;
}
/** */
public ExecutionService<Object[]> executionService() {
return executionSvc;
}
/** {@inheritDoc} */
@Override public void onKernalStart(boolean active) {
onStart(ctx,
executionSvc,
mailboxRegistry,
partSvc,
schemaHolder,
msgSvc,
taskExecutor,
mappingSvc,
qryPlanCache,
exchangeSvc,
qryReg
);
started = true;
}
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
if (started) {
started = false;
onStop(
qryReg,
executionSvc,
mailboxRegistry,
partSvc,
schemaHolder,
msgSvc,
taskExecutor,
mappingSvc,
qryPlanCache,
exchangeSvc
);
}
}
/** {@inheritDoc} */
@Override public List<FieldsQueryCursor<List<?>>> query(
@Nullable QueryContext qryCtx,
@Nullable String schemaName,
String sql,
Object... params
) throws IgniteSQLException {
return parseAndProcessQuery(qryCtx, executionSvc::executePlan, schemaName, sql, params);
}
/** {@inheritDoc} */
@Override public List<List<GridQueryFieldMetadata>> parameterMetaData(
@Nullable QueryContext ctx,
String schemaName,
String sql
) throws IgniteSQLException {
return parseAndProcessQuery(ctx, (qry, plan) -> {
try {
return fieldsMeta(plan, true);
}
finally {
qryReg.unregister(qry.id());
}
}, schemaName, sql);
}
/** {@inheritDoc} */
@Override public List<List<GridQueryFieldMetadata>> resultSetMetaData(
@Nullable QueryContext ctx,
String schemaName,
String sql
) throws IgniteSQLException {
return parseAndProcessQuery(ctx, (qry, plan) -> {
try {
return fieldsMeta(plan, false);
}
finally {
qryReg.unregister(qry.id());
}
}, schemaName, sql);
}
/** {@inheritDoc} */
@Override public List<FieldsQueryCursor<List<?>>> queryBatched(
@Nullable QueryContext qryCtx,
String schemaName,
String sql,
List<Object[]> batchedParams
) throws IgniteSQLException {
SchemaPlus schema = schemaHolder.schema(schemaName);
assert schema != null : "Schema not found: " + schemaName;
SqlNodeList qryNodeList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig());
if (qryNodeList.size() != 1) {
throw new IgniteSQLException("Multiline statements are not supported in batched query",
IgniteQueryErrorCode.PARSING);
}
SqlNode qryNode = qryNodeList.get(0);
if (qryNode.getKind() != SqlKind.INSERT && qryNode.getKind() != SqlKind.UPDATE
&& qryNode.getKind() != SqlKind.MERGE && qryNode.getKind() != SqlKind.DELETE) {
throw new IgniteSQLException("Unexpected operation kind for batched query [kind=" + qryNode.getKind() + "]",
IgniteQueryErrorCode.UNEXPECTED_OPERATION);
}
List<FieldsQueryCursor<List<?>>> cursors = new ArrayList<>(batchedParams.size());
List<RootQuery<Object[]>> qrys = new ArrayList<>(batchedParams.size());
BiFunction<RootQuery<Object[]>, Object[], QueryPlan> planSupplier =
new BiFunction<RootQuery<Object[]>, Object[], QueryPlan>() {
private QueryPlan plan;
@Override public QueryPlan apply(RootQuery<Object[]> qry, Object[] params) {
if (plan == null) {
plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params), () ->
prepareSvc.prepareSingle(qryNode, qry.planningContext())
);
}
return plan;
}
};
for (final Object[] batch: batchedParams) {
FieldsQueryCursor<List<?>> cur = processQuery(qryCtx, qry ->
executionSvc.executePlan(qry, planSupplier.apply(qry, batch)), schema.getName(), sql, qrys, batch);
cursors.add(cur);
}
return cursors;
}
/** */
private <T> List<T> parseAndProcessQuery(
@Nullable QueryContext qryCtx,
BiFunction<RootQuery<Object[]>, QueryPlan, T> action,
@Nullable String schemaName,
String sql,
Object... params
) throws IgniteSQLException {
SchemaPlus schema = schemaHolder.schema(schemaName);
assert schema != null : "Schema not found: " + schemaName;
QueryPlan plan = queryPlanCache().queryPlan(new CacheKey(schema.getName(), sql, null, params));
if (plan != null) {
return Collections.singletonList(
processQuery(qryCtx, qry -> action.apply(qry, plan), schema.getName(), sql, null, params)
);
}
SqlNodeList qryList = Commons.parse(sql, FRAMEWORK_CONFIG.getParserConfig());
List<T> res = new ArrayList<>(qryList.size());
List<RootQuery<Object[]>> qrys = new ArrayList<>(qryList.size());
for (final SqlNode sqlNode: qryList) {
T singleRes = processQuery(qryCtx, qry -> {
QueryPlan plan0;
if (qryList.size() == 1) {
plan0 = queryPlanCache().queryPlan(
// Use source SQL to avoid redundant parsing next time.
new CacheKey(schema.getName(), sql, null, params),
() -> prepareSvc.prepareSingle(sqlNode, qry.planningContext())
);
}
else
plan0 = prepareSvc.prepareSingle(sqlNode, qry.planningContext());
return action.apply(qry, plan0);
}, schema.getName(), sqlNode.toString(), qrys, params);
res.add(singleRes);
}
return res;
}
/** */
private <T> T processQuery(
@Nullable QueryContext qryCtx,
Function<RootQuery<Object[]>, T> action,
String schema,
String sql,
@Nullable List<RootQuery<Object[]>> qrys,
Object... params
) {
RootQuery<Object[]> qry = new RootQuery<>(
sql,
schemaHolder.schema(schema),
params,
qryCtx,
exchangeSvc,
(q) -> qryReg.unregister(q.id()),
log,
queryPlannerTimeout
);
if (qrys != null)
qrys.add(qry);
qryReg.register(qry);
if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
ctx.event().record(new SqlQueryExecutionEvent(
ctx.discovery().localNode(),
"SQL query execution.",
sql,
params,
SecurityUtils.securitySubjectId(ctx))
);
}
try {
return action.apply(qry);
}
catch (Throwable e) {
boolean isCanceled = qry.isCancelled();
if (qrys != null)
qrys.forEach(RootQuery::cancel);
qryReg.unregister(qry.id());
if (isCanceled)
throw new IgniteSQLException("The query was cancelled while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
else
throw e;
}
}
/**
* @param plan Query plan.
* @param isParamsMeta If {@code true}, return parameter metadata, otherwise result set metadata.
* @return Return query fields metadata.
*/
private List<GridQueryFieldMetadata> fieldsMeta(QueryPlan plan, boolean isParamsMeta) {
IgniteTypeFactory typeFactory = Commons.typeFactory();
switch (plan.type()) {
case QUERY:
case DML:
MultiStepPlan msPlan = (MultiStepPlan)plan;
FieldsMetadata meta = isParamsMeta ? msPlan.paramsMetadata() : msPlan.fieldsMetadata();
return meta.queryFieldsMetadata(typeFactory);
case EXPLAIN:
ExplainPlan exPlan = (ExplainPlan)plan;
return isParamsMeta ? Collections.emptyList() : exPlan.fieldsMeta().queryFieldsMetadata(typeFactory);
default:
return Collections.emptyList();
}
}
/** */
private void onStart(GridKernalContext ctx, Service... services) {
for (Service service : services) {
if (service instanceof LifecycleAware)
((LifecycleAware)service).onStart(ctx);
}
}
/** */
private void onStop(Service... services) {
for (Service service : services) {
if (service instanceof LifecycleAware)
((LifecycleAware)service).onStop();
}
}
/** {@inheritDoc} */
@Override public RunningQuery runningQuery(UUID id) {
return qryReg.query(id);
}
/** {@inheritDoc} */
@Override public Collection<? extends RunningQuery> runningQueries() {
return qryReg.runningQueries();
}
/** */
public QueryRegistry queryRegistry() {
return qryReg;
}
/** */
public CalciteQueryEngineConfiguration config() {
return cfg;
}
}