blob: 0c7c1cda6eaae3cab64b8264db88c60bda532c46 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.transform.function;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.transformer.datetime.BaseDateTimeTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetime.DateTimeTransformerFactory;
import org.apache.pinot.core.operator.transform.transformer.datetime.EpochToEpochTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetime.EpochToSDFTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetime.SDFToEpochTransformer;
import org.apache.pinot.core.operator.transform.transformer.datetime.SDFToSDFTransformer;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
/**
* The <code>DateTimeConversionTransformFunction</code> class implements the date time conversion transform function.
* <ul>
* <li>
* This transform function should be invoked with arguments:
* <ul>
* <li>
* Column name to convert. E.g. Date
* </li>
* <li>
* Format (as defined in {@link DateTimeFieldSpec}) of the input column. E.g. 1:HOURS:EPOCH;
* 1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd
* </li>
* <li>
* Format (as defined in {@link DateTimeFieldSpec}) of the output. E.g. 1:MILLISECONDS:EPOCH; 1:WEEKS:EPOCH;
* 1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd
* </li>
* <li>
* Granularity to bucket data into. E.g. 1:HOURS; 15:MINUTES
* </li>
* </ul>
* </li>
* <li>
* End to end example:
* <ul>
* <li>
* If Date column is expressed in millis, and output is expected in millis but bucketed to 15 minutes:
* dateTimeConvert(Date, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '15:MINUTES')
* </li>
* <li>
* If Date column is expressed in hoursSinceEpoch, and output is expected in weeksSinceEpoch bucketed to
* weeks: dateTimeConvert(Date, '1:HOURS:EPOCH', '1:WEEKS:EPOCH', '1:WEEKS')
* </li>
* </ul>
* </li>
* <li>
* Outputs:
* <ul>
* <li>
* Time values converted to the desired format and bucketed to desired granularity
* </li>
* </ul>
* </li>
* </ul>
*/
public class DateTimeConversionTransformFunction extends BaseTransformFunction {
public static final String FUNCTION_NAME = "dateTimeConvert";
private TransformFunction _mainTransformFunction;
private BaseDateTimeTransformer _dateTimeTransformer;
private TransformResultMetadata _resultMetadata;
private long[] _longOutputTimes;
private String[] _stringOutputTimes;
@Override
public String getName() {
return FUNCTION_NAME;
}
@Override
public void init(List<TransformFunction> arguments, Map<String, DataSource> dataSourceMap) {
// Check that there are exactly 4 arguments
if (arguments.size() != 4) {
throw new IllegalArgumentException("Exactly 4 arguments are required for DATE_TIME_CONVERT transform function");
}
TransformFunction firstArgument = arguments.get(0);
if (firstArgument instanceof LiteralTransformFunction || !firstArgument.getResultMetadata().isSingleValue()) {
throw new IllegalArgumentException(
"The first argument of DATE_TIME_CONVERT transform function must be a single-valued column or a transform "
+ "function");
}
_mainTransformFunction = firstArgument;
_dateTimeTransformer = DateTimeTransformerFactory
.getDateTimeTransformer(((LiteralTransformFunction) arguments.get(1)).getLiteral(),
((LiteralTransformFunction) arguments.get(2)).getLiteral(),
((LiteralTransformFunction) arguments.get(3)).getLiteral());
if (_dateTimeTransformer instanceof EpochToEpochTransformer
|| _dateTimeTransformer instanceof SDFToEpochTransformer) {
_resultMetadata = LONG_SV_NO_DICTIONARY_METADATA;
} else {
_resultMetadata = STRING_SV_NO_DICTIONARY_METADATA;
}
}
@Override
public TransformResultMetadata getResultMetadata() {
return _resultMetadata;
}
@Override
public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
if (_resultMetadata == LONG_SV_NO_DICTIONARY_METADATA) {
int length = projectionBlock.getNumDocs();
if (_longOutputTimes == null || _longOutputTimes.length < length) {
_longOutputTimes = new long[length];
}
if (_dateTimeTransformer instanceof EpochToEpochTransformer) {
EpochToEpochTransformer dateTimeTransformer = (EpochToEpochTransformer) _dateTimeTransformer;
dateTimeTransformer
.transform(_mainTransformFunction.transformToLongValuesSV(projectionBlock), _longOutputTimes, length);
} else {
SDFToEpochTransformer dateTimeTransformer = (SDFToEpochTransformer) _dateTimeTransformer;
dateTimeTransformer
.transform(_mainTransformFunction.transformToStringValuesSV(projectionBlock), _longOutputTimes, length);
}
return _longOutputTimes;
} else {
return super.transformToLongValuesSV(projectionBlock);
}
}
@Override
public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
if (_resultMetadata == STRING_SV_NO_DICTIONARY_METADATA) {
int length = projectionBlock.getNumDocs();
if (_stringOutputTimes == null || _stringOutputTimes.length < length) {
_stringOutputTimes = new String[length];
}
if (_dateTimeTransformer instanceof EpochToSDFTransformer) {
EpochToSDFTransformer dateTimeTransformer = (EpochToSDFTransformer) _dateTimeTransformer;
dateTimeTransformer
.transform(_mainTransformFunction.transformToLongValuesSV(projectionBlock), _stringOutputTimes, length);
} else {
SDFToSDFTransformer dateTimeTransformer = (SDFToSDFTransformer) _dateTimeTransformer;
dateTimeTransformer
.transform(_mainTransformFunction.transformToStringValuesSV(projectionBlock), _stringOutputTimes, length);
}
return _stringOutputTimes;
} else {
return super.transformToStringValuesSV(projectionBlock);
}
}
}