blob: 307d1544aa445fece485903e25f559a55014532b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.druid;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.interpreter.BindableConvention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelectKeyword;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Table mapped onto a Druid table.
*/
public class DruidTable extends AbstractTable implements TranslatableTable {
public static final String DEFAULT_TIMESTAMP_COLUMN = "__time";
public static final Interval DEFAULT_INTERVAL =
new Interval(new DateTime("1900-01-01", ISOChronology.getInstanceUTC()),
new DateTime("3000-01-01", ISOChronology.getInstanceUTC()));
final DruidSchema schema;
final String dataSource;
final RelProtoDataType protoRowType;
final ImmutableSet<String> metricFieldNames;
final ImmutableList<Interval> intervals;
final String timestampFieldName;
final ImmutableMap<String, List<ComplexMetric>> complexMetrics;
final ImmutableMap<String, SqlTypeName> allFields;
/**
* Creates a Druid table.
*
* @param schema Druid schema that contains this table
* @param dataSource Druid data source name
* @param protoRowType Field names and types
* @param metricFieldNames Names of fields that are metrics
* @param intervals Default interval if query does not constrain the time, or null
* @param timestampFieldName Name of the column that contains the time
*/
public DruidTable(DruidSchema schema, String dataSource,
RelProtoDataType protoRowType, Set<String> metricFieldNames,
String timestampFieldName, List<Interval> intervals,
Map<String, List<ComplexMetric>> complexMetrics, Map<String, SqlTypeName> allFields) {
this.timestampFieldName = Objects.requireNonNull(timestampFieldName);
this.schema = Objects.requireNonNull(schema);
this.dataSource = Objects.requireNonNull(dataSource);
this.protoRowType = protoRowType;
this.metricFieldNames = ImmutableSet.copyOf(metricFieldNames);
this.intervals = intervals != null ? ImmutableList.copyOf(intervals)
: ImmutableList.of(DEFAULT_INTERVAL);
this.complexMetrics = complexMetrics == null ? ImmutableMap.of()
: ImmutableMap.copyOf(complexMetrics);
this.allFields = allFields == null ? ImmutableMap.of()
: ImmutableMap.copyOf(allFields);
}
/** Creates a {@link DruidTable} by using the given {@link DruidConnectionImpl}
* to populate the other parameters. The parameters may be partially populated.
*
* @param druidSchema Druid schema
* @param dataSourceName Data source name in Druid, also table name
* @param intervals Intervals, or null to use default
* @param fieldMap Partially populated map of fields (dimensions plus metrics)
* @param metricNameSet Partially populated set of metric names
* @param timestampColumnName Name of timestamp column, or null
* @param connection Connection used to find column definitions; Must be non-null
* @param complexMetrics List of complex metrics in Druid (thetaSketch, hyperUnique)
*
* @return A table
*/
static Table create(DruidSchema druidSchema, String dataSourceName,
List<Interval> intervals, Map<String, SqlTypeName> fieldMap,
Set<String> metricNameSet, String timestampColumnName,
DruidConnectionImpl connection, Map<String, List<ComplexMetric>> complexMetrics) {
assert connection != null;
connection.metadata(dataSourceName, timestampColumnName, intervals,
fieldMap, metricNameSet, complexMetrics);
return DruidTable.create(druidSchema, dataSourceName, intervals, fieldMap,
metricNameSet, timestampColumnName, complexMetrics);
}
/** Creates a {@link DruidTable} by copying the given parameters.
*
* @param druidSchema Druid schema
* @param dataSourceName Data source name in Druid, also table name
* @param intervals Intervals, or null to use default
* @param fieldMap Fully populated map of fields (dimensions plus metrics)
* @param metricNameSet Fully populated set of metric names
* @param timestampColumnName Name of timestamp column, or null
* @param complexMetrics List of complex metrics in Druid (thetaSketch, hyperUnique)
*
* @return A table
*/
static Table create(DruidSchema druidSchema, String dataSourceName,
List<Interval> intervals, Map<String, SqlTypeName> fieldMap,
Set<String> metricNameSet, String timestampColumnName,
Map<String, List<ComplexMetric>> complexMetrics) {
final ImmutableMap<String, SqlTypeName> fields =
ImmutableMap.copyOf(fieldMap);
return new DruidTable(druidSchema,
dataSourceName,
new MapRelProtoDataType(fields, timestampColumnName),
ImmutableSet.copyOf(metricNameSet),
timestampColumnName,
intervals,
complexMetrics,
fieldMap);
}
/**
* Returns the appropriate {@link ComplexMetric} that is mapped from the given <code>alias</code>
* if it exists, and is used in the expected context with the given {@link AggregateCall}.
* Otherwise returns <code>null</code>.
* */
public ComplexMetric resolveComplexMetric(String alias, AggregateCall call) {
List<ComplexMetric> potentialMetrics = getComplexMetricsFrom(alias);
// It's possible that multiple complex metrics match the AggregateCall,
// but for now we only return the first that matches
for (ComplexMetric complexMetric : potentialMetrics) {
if (complexMetric.canBeUsed(call)) {
return complexMetric;
}
}
return null;
}
@Override public boolean isRolledUp(String column) {
// The only rolled up columns we care about are Complex Metrics (aka sketches).
// But we also need to check if this column name is a dimension
return complexMetrics.get(column) != null
&& allFields.get(column) != SqlTypeName.VARCHAR;
}
@Override public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call,
SqlNode parent, CalciteConnectionConfig config) {
assert isRolledUp(column);
// Our rolled up columns are only allowed in COUNT(DISTINCT ...) aggregate functions.
// We only allow this when approximate results are acceptable.
return ((config != null
&& config.approximateDistinctCount()
&& isCountDistinct(call))
|| call.getOperator() == SqlStdOperatorTable.APPROX_COUNT_DISTINCT)
&& call.getOperandList().size() == 1 // for COUNT(a_1, a_2, ... a_n). n should be 1
&& isValidParentKind(parent);
}
private boolean isValidParentKind(SqlNode node) {
return node.getKind() == SqlKind.SELECT
|| node.getKind() == SqlKind.FILTER
|| isSupportedPostAggOperation(node.getKind());
}
private boolean isCountDistinct(SqlCall call) {
return call.getKind() == SqlKind.COUNT
&& call.getFunctionQuantifier() != null
&& call.getFunctionQuantifier().getValue() == SqlSelectKeyword.DISTINCT;
}
// Post aggs support +, -, /, * so we should allow the parent of a count distinct to be any one of
// those.
private boolean isSupportedPostAggOperation(SqlKind kind) {
return kind == SqlKind.PLUS
|| kind == SqlKind.MINUS
|| kind == SqlKind.DIVIDE
|| kind == SqlKind.TIMES;
}
/** Returns the list of {@link ComplexMetric} that match the given
* <code>alias</code> if it exists, otherwise returns an empty list, never
* <code>null</code>. */
public List<ComplexMetric> getComplexMetricsFrom(String alias) {
return complexMetrics.containsKey(alias)
? complexMetrics.get(alias)
: new ArrayList<>();
}
/** Returns whether the given <code>alias</code> is a reference to a
* registered {@link ComplexMetric}. */
public boolean isComplexMetric(String alias) {
return complexMetrics.get(alias) != null;
}
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
final RelDataType rowType = protoRowType.apply(typeFactory);
final List<String> fieldNames = rowType.getFieldNames();
Preconditions.checkArgument(fieldNames.contains(timestampFieldName));
Preconditions.checkArgument(fieldNames.containsAll(metricFieldNames));
return rowType;
}
public RelNode toRel(RelOptTable.ToRelContext context,
RelOptTable relOptTable) {
final RelOptCluster cluster = context.getCluster();
final TableScan scan = LogicalTableScan.create(cluster, relOptTable, ImmutableList.of());
return DruidQuery.create(cluster,
cluster.traitSetOf(BindableConvention.INSTANCE), relOptTable, this,
ImmutableList.of(scan));
}
public boolean isMetric(String name) {
return metricFieldNames.contains(name);
}
/** Creates a {@link RelDataType} from a map of
* field names and types. */
private static class MapRelProtoDataType implements RelProtoDataType {
private final ImmutableMap<String, SqlTypeName> fields;
private final String timestampColumn;
MapRelProtoDataType(ImmutableMap<String, SqlTypeName> fields) {
this.fields = fields;
this.timestampColumn = DruidTable.DEFAULT_TIMESTAMP_COLUMN;
}
MapRelProtoDataType(ImmutableMap<String, SqlTypeName> fields, String timestampColumn) {
this.fields = fields;
this.timestampColumn = timestampColumn;
}
public RelDataType apply(RelDataTypeFactory typeFactory) {
final RelDataTypeFactory.Builder builder = typeFactory.builder();
for (Map.Entry<String, SqlTypeName> field : fields.entrySet()) {
final String key = field.getKey();
builder.add(key, field.getValue())
// Druid's time column is always not null and the only column called __time.
.nullable(!timestampColumn.equals(key));
}
return builder.build();
}
}
}