blob: 37b8692cb4dfb28f4f8b58676f37ab3f3755a83d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.sql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngines;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MSQTaskSqlEngine implements SqlEngine
{
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS =
ImmutableSet.<String>builder()
.addAll(NativeSqlEngine.SYSTEM_CONTEXT_PARAMETERS)
.add(QueryKitUtils.CTX_TIME_COLUMN_NAME)
.build();
public static final List<String> TASK_STRUCT_FIELD_NAMES = ImmutableList.of("TASK");
private static final String NAME = "msq-task";
private final OverlordClient overlordClient;
private final ObjectMapper jsonMapper;
@Inject
public MSQTaskSqlEngine(
final OverlordClient overlordClient,
final ObjectMapper jsonMapper
)
{
this.overlordClient = overlordClient;
this.jsonMapper = jsonMapper;
}
@Override
public String name()
{
return NAME;
}
@Override
public void validateContext(Map<String, Object> queryContext)
{
SqlEngines.validateNoSpecialContextKeys(queryContext, SYSTEM_CONTEXT_PARAMETERS);
}
@Override
public RelDataType resultTypeForSelect(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
return getMSQStructType(typeFactory);
}
@Override
public RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType)
{
return getMSQStructType(typeFactory);
}
@Override
public boolean featureAvailable(EngineFeature feature, PlannerContext plannerContext)
{
switch (feature) {
case ALLOW_BINDABLE_PLAN:
case ALLOW_BROADCAST_RIGHTY_JOIN:
case TIMESERIES_QUERY:
case TOPN_QUERY:
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
case UNNEST:
return false;
case CAN_SELECT:
case CAN_INSERT:
case CAN_REPLACE:
case READ_EXTERNAL_DATA:
case SCAN_ORDER_BY_NON_TIME:
case SCAN_NEEDS_SIGNATURE:
return true;
default:
throw SqlEngines.generateUnrecognizedFeatureException(MSQTaskSqlEngine.class.getSimpleName(), feature);
}
}
@Override
public QueryMaker buildQueryMakerForSelect(
final RelRoot relRoot,
final PlannerContext plannerContext
)
{
validateSelect(plannerContext);
return new MSQTaskQueryMaker(
null,
overlordClient,
plannerContext,
jsonMapper,
relRoot.fields
);
}
public OverlordClient overlordClient()
{
return overlordClient;
}
@Override
public QueryMaker buildQueryMakerForInsert(
final String targetDataSource,
final RelRoot relRoot,
final PlannerContext plannerContext
)
{
validateInsert(relRoot.rel, relRoot.fields, plannerContext);
return new MSQTaskQueryMaker(
targetDataSource,
overlordClient,
plannerContext,
jsonMapper,
relRoot.fields
);
}
/**
* Checks if the SELECT contains {@link DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a
* defensive cheeck because {@link org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the
* {@link #validateContext}
*/
private static void validateSelect(final PlannerContext plannerContext)
{
if (plannerContext.queryContext().containsKey(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)) {
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
"The SELECT query's context contains invalid parameter [%s] which is supposed to be populated "
+ "by Druid for INSERT queries. If the user is seeing this exception, that means there's a bug in Druid "
+ "that is populating the query context with the segment's granularity.",
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);
}
}
private static void validateInsert(
final RelNode rootRel,
final List<Pair<Integer, String>> fieldMappings,
final PlannerContext plannerContext
)
{
validateNoDuplicateAliases(fieldMappings);
// Find the __time field.
int timeFieldIndex = -1;
for (final Pair<Integer, String> field : fieldMappings) {
if (field.right.equals(ColumnHolder.TIME_COLUMN_NAME)) {
timeFieldIndex = field.left;
// Validate the __time field has the proper type.
final SqlTypeName timeType = rootRel.getRowType().getFieldList().get(field.left).getType().getSqlTypeName();
if (timeType != SqlTypeName.TIMESTAMP) {
throw InvalidSqlInput.exception(
"Field [%s] was the wrong type [%s], expected TIMESTAMP",
ColumnHolder.TIME_COLUMN_NAME,
timeType
);
}
}
}
// Validate that if segmentGranularity is not ALL then there is also a __time field.
final Granularity segmentGranularity;
try {
segmentGranularity = QueryKitUtils.getSegmentGranularityFromContext(
plannerContext.getJsonMapper(),
plannerContext.queryContextMap()
);
}
catch (Exception e) {
// This is a defensive check as the DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY in the query context is
// populated by Druid. If the user entered an incorrect granularity, that should have been flagged before reaching
// here
throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.DEFENSIVE)
.build(
e,
"[%s] is not a valid value for [%s]",
plannerContext.queryContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY),
DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY
);
}
final boolean hasSegmentGranularity = !Granularities.ALL.equals(segmentGranularity);
// Validate that the query does not have an inappropriate LIMIT or OFFSET. LIMIT prevents gathering result key
// statistics, which INSERT execution logic depends on. (In QueryKit, LIMIT disables statistics generation and
// funnels everything through a single partition.)
validateLimitAndOffset(rootRel, !hasSegmentGranularity);
if (hasSegmentGranularity && timeFieldIndex < 0) {
throw InvalidInput.exception(
"The granularity [%s] specified in the PARTITIONED BY clause of the INSERT query is different from ALL. "
+ "Therefore, the query must specify a time column (named __time).",
segmentGranularity
);
}
}
/**
* SQL allows multiple output columns with the same name. However, we don't allow this for INSERT or REPLACE
* queries, because we use these output names to generate columns in segments. They must be unique.
*/
private static void validateNoDuplicateAliases(final List<Pair<Integer, String>> fieldMappings)
{
final Set<String> aliasesSeen = new HashSet<>();
for (final Pair<Integer, String> field : fieldMappings) {
if (!aliasesSeen.add(field.right)) {
throw InvalidSqlInput.exception("Duplicate field in SELECT: [%s]", field.right);
}
}
}
private static void validateLimitAndOffset(final RelNode topRel, final boolean limitOk)
{
Sort sort = null;
if (topRel instanceof Sort) {
sort = (Sort) topRel;
} else if (topRel instanceof Project) {
// Look for Project after a Sort, then validate the sort.
final Project project = (Project) topRel;
if (project.isMapping()) {
final RelNode projectInput = project.getInput();
if (projectInput instanceof Sort) {
sort = (Sort) projectInput;
}
}
}
if (sort != null && sort.fetch != null && !limitOk) {
// Found an outer LIMIT that is not allowed.
// The segment generator relies on shuffle statistics to determine segment intervals when PARTITIONED BY is not ALL,
// and LIMIT/OFFSET prevent shuffle statistics from being generated. This is because they always send everything
// to a single partition, so there are no shuffle statistics.
throw InvalidSqlInput.exception(
"INSERT and REPLACE queries cannot have a LIMIT unless PARTITIONED BY is \"ALL\"."
);
}
if (sort != null && sort.offset != null) {
// Found an outer OFFSET that is not allowed.
throw InvalidSqlInput.exception("INSERT and REPLACE queries cannot have an OFFSET.");
}
}
private static RelDataType getMSQStructType(RelDataTypeFactory typeFactory)
{
return typeFactory.createStructType(
ImmutableList.of(
Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)
),
TASK_STRUCT_FIELD_NAMES
);
}
}