blob: 0a283209281a71bbfea7758df83c6bf709f9839f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.calcite.avatica.ColumnMetaData;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.math.expr.Evals;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.IOException;
import java.sql.Array;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
public class QueryMaker
{
private final QueryLifecycleFactory queryLifecycleFactory;
private final PlannerContext plannerContext;
private final ObjectMapper jsonMapper;
public QueryMaker(
final QueryLifecycleFactory queryLifecycleFactory,
final PlannerContext plannerContext,
final ObjectMapper jsonMapper
)
{
this.queryLifecycleFactory = queryLifecycleFactory;
this.plannerContext = plannerContext;
this.jsonMapper = jsonMapper;
}
public PlannerContext getPlannerContext()
{
return plannerContext;
}
public ObjectMapper getJsonMapper()
{
return jsonMapper;
}
public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
{
final Query<?> query = druidQuery.getQuery();
if (plannerContext.getPlannerConfig().isRequireTimeCondition()) {
if (Intervals.ONLY_ETERNITY.equals(findBaseDataSourceIntervals(query))) {
throw new CannotBuildQueryException(
"requireTimeCondition is enabled, all queries must include a filter condition on the __time column"
);
}
}
final List<String> rowOrder;
if (query instanceof TimeseriesQuery && !druidQuery.getGrouping().getDimensions().isEmpty()) {
// Hack for timeseries queries: when generating them, DruidQuery.toTimeseriesQuery translates a dimension
// based on a timestamp_floor expression into a 'granularity'. This is not reflected in the druidQuery's
// output row signature, so we have to account for it here.
// TODO: We can remove this once https://github.com/apache/druid/issues/9974 is done.
final String timeDimension = Iterables.getOnlyElement(druidQuery.getGrouping().getDimensions()).getOutputName();
rowOrder = druidQuery.getOutputRowSignature().getColumnNames().stream()
.map(f -> timeDimension.equals(f) ? ColumnHolder.TIME_COLUMN_NAME : f)
.collect(Collectors.toList());
} else {
rowOrder = druidQuery.getOutputRowSignature().getColumnNames();
}
return execute(
query,
rowOrder,
druidQuery.getOutputRowType()
.getFieldList()
.stream()
.map(f -> f.getType().getSqlTypeName())
.collect(Collectors.toList())
);
}
private List<Interval> findBaseDataSourceIntervals(Query<?> query)
{
return DataSourceAnalysis.forDataSource(query.getDataSource())
.getBaseQuerySegmentSpec()
.map(QuerySegmentSpec::getIntervals)
.orElseGet(query::getIntervals);
}
private <T> Sequence<Object[]> execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
{
Hook.QUERY_PLAN.run(query);
if (query.getId() == null) {
final String queryId = UUID.randomUUID().toString();
plannerContext.addNativeQueryId(queryId);
query = query.withId(queryId);
}
query = query.withSqlQueryId(plannerContext.getSqlQueryId());
final AuthenticationResult authenticationResult = plannerContext.getAuthenticationResult();
final Access authorizationResult = plannerContext.getAuthorizationResult();
final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize();
// After calling "runSimple" the query will start running. We need to do this before reading the toolChest, since
// otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do
// array-based results before starting the query; but in practice we don't expect this to happen since we keep
// tight control over which query types we generate in the SQL layer. They all support array-based results.)
final Sequence<T> results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
//noinspection unchecked
final QueryToolChest<T, Query<T>> toolChest = queryLifecycle.getToolChest();
final List<String> resultArrayFields = toolChest.resultArraySignature(query).getColumnNames();
final Sequence<Object[]> resultArrays = toolChest.resultsAsArrays(query, results);
return remapFields(resultArrays, resultArrayFields, newFields, newTypes);
}
private Sequence<Object[]> remapFields(
final Sequence<Object[]> sequence,
final List<String> originalFields,
final List<String> newFields,
final List<SqlTypeName> newTypes
)
{
// Build hash map for looking up original field positions, in case the number of fields is super high.
final Object2IntMap<String> originalFieldsLookup = new Object2IntOpenHashMap<>();
originalFieldsLookup.defaultReturnValue(-1);
for (int i = 0; i < originalFields.size(); i++) {
originalFieldsLookup.put(originalFields.get(i), i);
}
// Build "mapping" array of new field index -> old field index.
final int[] mapping = new int[newFields.size()];
for (int i = 0; i < newFields.size(); i++) {
final String newField = newFields.get(i);
final int idx = originalFieldsLookup.getInt(newField);
if (idx < 0) {
throw new ISE(
"newField[%s] not contained in originalFields[%s]",
newField,
String.join(", ", originalFields)
);
}
mapping[i] = idx;
}
return Sequences.map(
sequence,
array -> {
final Object[] newArray = new Object[mapping.length];
for (int i = 0; i < mapping.length; i++) {
newArray[i] = coerce(array[mapping[i]], newTypes.get(i));
}
return newArray;
}
);
}
public static ColumnMetaData.Rep rep(final SqlTypeName sqlType)
{
if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
return ColumnMetaData.Rep.of(String.class);
} else if (sqlType == SqlTypeName.TIMESTAMP) {
return ColumnMetaData.Rep.of(Long.class);
} else if (sqlType == SqlTypeName.DATE) {
return ColumnMetaData.Rep.of(Integer.class);
} else if (sqlType == SqlTypeName.INTEGER) {
// use Number.class for exact numeric types since JSON transport might switch longs to integers
return ColumnMetaData.Rep.of(Number.class);
} else if (sqlType == SqlTypeName.BIGINT) {
// use Number.class for exact numeric types since JSON transport might switch longs to integers
return ColumnMetaData.Rep.of(Number.class);
} else if (sqlType == SqlTypeName.FLOAT) {
return ColumnMetaData.Rep.of(Float.class);
} else if (sqlType == SqlTypeName.DOUBLE || sqlType == SqlTypeName.DECIMAL) {
return ColumnMetaData.Rep.of(Double.class);
} else if (sqlType == SqlTypeName.BOOLEAN) {
return ColumnMetaData.Rep.of(Boolean.class);
} else if (sqlType == SqlTypeName.OTHER) {
return ColumnMetaData.Rep.of(Object.class);
} else if (sqlType == SqlTypeName.ARRAY) {
return ColumnMetaData.Rep.of(Array.class);
} else {
throw new ISE("No rep for SQL type[%s]", sqlType);
}
}
private Object coerce(final Object value, final SqlTypeName sqlType)
{
final Object coercedValue;
if (SqlTypeName.CHAR_TYPES.contains(sqlType)) {
if (value == null || value instanceof String) {
coercedValue = NullHandling.nullToEmptyIfNeeded((String) value);
} else if (value instanceof NlsString) {
coercedValue = ((NlsString) value).getValue();
} else if (value instanceof Number) {
coercedValue = String.valueOf(value);
} else if (value instanceof Collection) {
// Iterate through the collection, coercing each value. Useful for handling selects of multi-value dimensions.
final List<String> valueStrings = ((Collection<?>) value).stream()
.map(v -> (String) coerce(v, sqlType))
.collect(Collectors.toList());
try {
coercedValue = jsonMapper.writeValueAsString(valueStrings);
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else if (value == null) {
coercedValue = null;
} else if (sqlType == SqlTypeName.DATE) {
return Calcites.jodaToCalciteDate(coerceDateTime(value, sqlType), plannerContext.getTimeZone());
} else if (sqlType == SqlTypeName.TIMESTAMP) {
return Calcites.jodaToCalciteTimestamp(coerceDateTime(value, sqlType), plannerContext.getTimeZone());
} else if (sqlType == SqlTypeName.BOOLEAN) {
if (value instanceof String) {
coercedValue = Evals.asBoolean(((String) value));
} else if (value instanceof Number) {
coercedValue = Evals.asBoolean(((Number) value).longValue());
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.INTEGER) {
if (value instanceof String) {
coercedValue = Ints.tryParse((String) value);
} else if (value instanceof Number) {
coercedValue = ((Number) value).intValue();
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.BIGINT) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToLong(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.FLOAT) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToFloat(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else if (SqlTypeName.FRACTIONAL_TYPES.contains(sqlType)) {
try {
coercedValue = DimensionHandlerUtils.convertObjectToDouble(value);
}
catch (Exception e) {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else if (sqlType == SqlTypeName.OTHER) {
// Complex type, try to serialize if we should, else print class name
if (plannerContext.getPlannerConfig().shouldSerializeComplexValues()) {
try {
coercedValue = jsonMapper.writeValueAsString(value);
}
catch (JsonProcessingException jex) {
throw new ISE(jex, "Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
} else {
coercedValue = value.getClass().getName();
}
} else if (sqlType == SqlTypeName.ARRAY) {
if (plannerContext.isStringifyArrays()) {
if (value instanceof String) {
coercedValue = NullHandling.nullToEmptyIfNeeded((String) value);
} else if (value instanceof NlsString) {
coercedValue = ((NlsString) value).getValue();
} else {
try {
coercedValue = jsonMapper.writeValueAsString(value);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
} else {
// the protobuf jdbc handler prefers lists (it actually can't handle java arrays as sql arrays, only java lists)
// the json handler could handle this just fine, but it handles lists as sql arrays as well so just convert
// here if needed
if (value instanceof List) {
coercedValue = value;
} else if (value instanceof String[]) {
coercedValue = Arrays.asList((String[]) value);
} else if (value instanceof Long[]) {
coercedValue = Arrays.asList((Long[]) value);
} else if (value instanceof Double[]) {
coercedValue = Arrays.asList((Double[]) value);
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
}
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
return coercedValue;
}
private static DateTime coerceDateTime(Object value, SqlTypeName sqlType)
{
final DateTime dateTime;
if (value instanceof Number) {
dateTime = DateTimes.utc(((Number) value).longValue());
} else if (value instanceof String) {
dateTime = DateTimes.utc(Long.parseLong((String) value));
} else if (value instanceof DateTime) {
dateTime = (DateTime) value;
} else {
throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType);
}
return dateTime;
}
}