blob: 9e0f5ed564f5edd4ecd21904d099b0170a729394 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.druid;
import org.apache.calcite.avatica.util.TimeUnitRange;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.DateString;
import org.apache.calcite.util.RangeSets;
import org.apache.calcite.util.Sarg;
import org.apache.calcite.util.TimestampString;
import org.apache.calcite.util.Util;
import org.apache.calcite.util.trace.CalciteTrace;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableRangeSet;
import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
/**
* Utilities for generating intervals from RexNode.
*/
@SuppressWarnings({"rawtypes", "unchecked" })
public class DruidDateTimeUtils {
protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
private DruidDateTimeUtils() {
}
/**
* Generates a list of {@link Interval}s equivalent to a given
* expression. Assumes that all the predicates in the input
* reference a single column: the timestamp column.
*/
@Nullable
public static List<Interval> createInterval(RexNode e) {
final List<Range<Long>> ranges = extractRanges(e, false);
if (ranges == null) {
// We did not succeed, bail out
return null;
}
final TreeRangeSet condensedRanges = TreeRangeSet.create();
for (Range r : ranges) {
condensedRanges.add(r);
}
LOGGER.debug("Inferred ranges on interval : {}", condensedRanges);
return toInterval(ImmutableList.<Range<Long>>copyOf(condensedRanges.asRanges()));
}
protected static List<Interval> toInterval(
List<Range<Long>> ranges) {
List<Interval> intervals = Util.transform(ranges, range -> {
if (!range.hasLowerBound() && !range.hasUpperBound()) {
return DruidTable.DEFAULT_INTERVAL;
}
long start = range.hasLowerBound()
? range.lowerEndpoint().longValue()
: DruidTable.DEFAULT_INTERVAL.getStartMillis();
long end = range.hasUpperBound()
? range.upperEndpoint().longValue()
: DruidTable.DEFAULT_INTERVAL.getEndMillis();
if (range.hasLowerBound()
&& range.lowerBoundType() == BoundType.OPEN) {
start++;
}
if (range.hasUpperBound()
&& range.upperBoundType() == BoundType.CLOSED) {
end++;
}
return new Interval(start, end, ISOChronology.getInstanceUTC());
});
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Converted time ranges " + ranges + " to interval " + intervals);
}
return intervals;
}
@Nullable
protected static List<Range<Long>> extractRanges(RexNode node, boolean withNot) {
switch (node.getKind()) {
case EQUALS:
case LESS_THAN:
case LESS_THAN_OR_EQUAL:
case GREATER_THAN:
case GREATER_THAN_OR_EQUAL:
case DRUID_IN:
case SEARCH:
return leafToRanges((RexCall) node, withNot);
case NOT:
return extractRanges(((RexCall) node).getOperands().get(0), !withNot);
case OR: {
RexCall call = (RexCall) node;
List<Range<Long>> intervals = new ArrayList<>();
for (RexNode child : call.getOperands()) {
List<Range<Long>> extracted =
extractRanges(child, withNot);
if (extracted != null) {
intervals.addAll(extracted);
}
}
return intervals;
}
case AND: {
RexCall call = (RexCall) node;
List<Range<Long>> ranges = new ArrayList<>();
for (RexNode child : call.getOperands()) {
List<Range<Long>> extractedRanges =
extractRanges(child, false);
if (extractedRanges == null || extractedRanges.isEmpty()) {
// We could not extract, we bail out
return null;
}
if (ranges.isEmpty()) {
ranges.addAll(extractedRanges);
continue;
}
List<Range<Long>> overlapped = new ArrayList<>();
for (Range current : ranges) {
for (Range interval : extractedRanges) {
if (current.isConnected(interval)) {
overlapped.add(current.intersection(interval));
}
}
}
ranges = overlapped;
}
return ranges;
}
default:
return null;
}
}
@Nullable
protected static List<Range<Long>> leafToRanges(RexCall call, boolean withNot) {
final ImmutableList.Builder<Range<Long>> ranges;
switch (call.getKind()) {
case EQUALS:
case LESS_THAN:
case LESS_THAN_OR_EQUAL:
case GREATER_THAN:
case GREATER_THAN_OR_EQUAL: {
final Long value;
SqlKind kind = call.getKind();
if (call.getOperands().get(0) instanceof RexInputRef
&& literalValue(call.getOperands().get(1)) != null) {
value = literalValue(call.getOperands().get(1));
} else if (call.getOperands().get(1) instanceof RexInputRef
&& literalValue(call.getOperands().get(0)) != null) {
value = literalValue(call.getOperands().get(0));
kind = kind.reverse();
} else {
return null;
}
switch (kind) {
case LESS_THAN:
return ImmutableList.of(withNot ? Range.atLeast(value) : Range.lessThan(value));
case LESS_THAN_OR_EQUAL:
return ImmutableList.of(withNot ? Range.greaterThan(value) : Range.atMost(value));
case GREATER_THAN:
return ImmutableList.of(withNot ? Range.atMost(value) : Range.greaterThan(value));
case GREATER_THAN_OR_EQUAL:
return ImmutableList.of(withNot ? Range.lessThan(value) : Range.atLeast(value));
default:
if (!withNot) {
return ImmutableList.of(Range.closed(value, value));
}
return ImmutableList.of(Range.lessThan(value), Range.greaterThan(value));
}
}
case BETWEEN: {
final Long value1;
final Long value2;
if (literalValue(call.getOperands().get(2)) != null
&& literalValue(call.getOperands().get(3)) != null) {
value1 = literalValue(call.getOperands().get(2));
value2 = literalValue(call.getOperands().get(3));
} else {
return null;
}
boolean inverted = value1.compareTo(value2) > 0;
if (!withNot) {
return ImmutableList.of(
inverted ? Range.closed(value2, value1) : Range.closed(value1, value2));
}
return ImmutableList.of(Range.lessThan(inverted ? value2 : value1),
Range.greaterThan(inverted ? value1 : value2));
}
case DRUID_IN:
ranges = ImmutableList.builder();
for (RexNode operand : Util.skip(call.operands)) {
final Long element = literalValue(operand);
if (element == null) {
return null;
}
if (withNot) {
ranges.add(Range.lessThan(element));
ranges.add(Range.greaterThan(element));
} else {
ranges.add(Range.closed(element, element));
}
}
return ranges.build();
case SEARCH:
final RexLiteral right = (RexLiteral) call.operands.get(1);
final Sarg<?> sarg = right.getValueAs(Sarg.class);
ranges = ImmutableList.builder();
for (Range range : sarg.rangeSet.asRanges()) {
Range<Long> range2 = RangeSets.copy(range, DruidDateTimeUtils::toLong);
if (withNot) {
ranges.addAll(ImmutableRangeSet.of(range2).complement().asRanges());
} else {
ranges.add(range2);
}
}
return ranges.build();
default:
return null;
}
}
private static Long toLong(Comparable comparable) {
if (comparable instanceof TimestampString) {
TimestampString timestampString = (TimestampString) comparable;
return timestampString.getMillisSinceEpoch();
}
throw new AssertionError("unsupported type: " + comparable.getClass());
}
/**
* Returns the literal value for the given node, assuming it is a literal with
* datetime type, or a cast that only alters nullability on top of a literal with
* datetime type.
*/
@Nullable
protected static Long literalValue(RexNode node) {
switch (node.getKind()) {
case LITERAL:
switch (((RexLiteral) node).getTypeName()) {
case TIMESTAMP:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
TimestampString tsVal = ((RexLiteral) node).getValueAs(TimestampString.class);
if (tsVal == null) {
return null;
}
return tsVal.getMillisSinceEpoch();
case DATE:
DateString dateVal = ((RexLiteral) node).getValueAs(DateString.class);
if (dateVal == null) {
return null;
}
return dateVal.getMillisSinceEpoch();
}
break;
case CAST:
// Normally all CASTs are eliminated by now by constant reduction.
// But when HiveExecutor is used there may be a cast that changes only
// nullability, from TIMESTAMP NOT NULL literal to TIMESTAMP literal.
// We can handle that case by traversing the dummy CAST.
assert node instanceof RexCall;
final RexCall call = (RexCall) node;
final RexNode operand = call.getOperands().get(0);
final RelDataType callType = call.getType();
final RelDataType operandType = operand.getType();
if (operand.getKind() == SqlKind.LITERAL
&& callType.getSqlTypeName() == operandType.getSqlTypeName()
&& (callType.getSqlTypeName() == SqlTypeName.DATE
|| callType.getSqlTypeName() == SqlTypeName.TIMESTAMP
|| callType.getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
&& callType.isNullable()
&& !operandType.isNullable()) {
return literalValue(operand);
}
}
return null;
}
/**
* Infers granularity from a time unit.
* It supports {@code FLOOR(<time> TO <timeunit>)}
* and {@code EXTRACT(<timeunit> FROM <time>)}.
* Returns null if it cannot be inferred.
*
* @param node the Rex node
* @return the granularity, or null if it cannot be inferred
*/
@Nullable
public static Granularity extractGranularity(RexNode node, String timeZone) {
final int valueIndex;
final int flagIndex;
if (TimeExtractionFunction.isValidTimeExtract(node)) {
flagIndex = 0;
valueIndex = 1;
} else if (TimeExtractionFunction.isValidTimeFloor(node)) {
valueIndex = 0;
flagIndex = 1;
} else {
// We can only infer granularity from floor and extract.
return null;
}
final RexCall call = (RexCall) node;
final RexNode value = call.operands.get(valueIndex);
final RexLiteral flag = (RexLiteral) call.operands.get(flagIndex);
final TimeUnitRange timeUnit = (TimeUnitRange) flag.getValue();
final RelDataType valueType = value.getType();
if (valueType.getSqlTypeName() == SqlTypeName.DATE
|| valueType.getSqlTypeName() == SqlTypeName.TIMESTAMP) {
// We use 'UTC' for date/timestamp type as Druid needs timezone information
return Granularities.createGranularity(timeUnit, "UTC");
} else if (valueType.getSqlTypeName() == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
return Granularities.createGranularity(timeUnit, timeZone);
}
// Type not recognized
return null;
}
/**
* Converts a granularity to ISO period format.
*
* @param type Druid Granularity to translate as period of time
*
* @return String representing the granularity as ISO8601 Period of Time; null
* for unknown case
*/
@Nullable
public static String toISOPeriodFormat(Granularity.Type type) {
switch (type) {
case SECOND:
return Period.seconds(1).toString();
case MINUTE:
return Period.minutes(1).toString();
case HOUR:
return Period.hours(1).toString();
case DAY:
return Period.days(1).toString();
case WEEK:
return Period.weeks(1).toString();
case MONTH:
return Period.months(1).toString();
case QUARTER:
return Period.months(3).toString();
case YEAR:
return Period.years(1).toString();
default:
return null;
}
}
/**
* Translates a Calcite {@link TimeUnitRange} to a Druid {@link Granularity}.
*
* @param timeUnit Calcite Time unit to convert
*
* @return Druid Granularity or null
*/
@Nullable
public static Granularity.Type toDruidGranularity(TimeUnitRange timeUnit) {
if (timeUnit == null) {
return null;
}
switch (timeUnit) {
case YEAR:
return Granularity.Type.YEAR;
case QUARTER:
return Granularity.Type.QUARTER;
case MONTH:
return Granularity.Type.MONTH;
case WEEK:
return Granularity.Type.WEEK;
case DAY:
return Granularity.Type.DAY;
case HOUR:
return Granularity.Type.HOUR;
case MINUTE:
return Granularity.Type.MINUTE;
case SECOND:
return Granularity.Type.SECOND;
default:
return null;
}
}
}