blob: 3a855008c1df0d431777b31ab52fa591befc65f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.operator.transform.function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.function.DateTimeUtils;
import org.apache.pinot.common.function.TimeZoneKey;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.core.operator.DocIdSetOperator;
import org.apache.pinot.core.operator.ProjectionOperator;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.testng.Assert;
import org.testng.annotations.Test;
public class DateTruncTransformFunctionTest {
private static final String TIME_COLUMN = "time";
private static final ZoneOffset WEIRD_ZONE = ZoneOffset.ofHoursMinutes(7, 9);
private static final DateTimeZone WEIRD_DATE_TIME_ZONE = DateTimeZone.forID(WEIRD_ZONE.getId());
private static final DateTime WEIRD_TIMESTAMP = new DateTime(2001, 8, 22, 3, 4, 5, 321, WEIRD_DATE_TIME_ZONE);
private static final String WEIRD_TIMESTAMP_ISO8601_STRING = "2001-08-22T03:04:05.321+07:09";
private static final DateTimeZone UTC_TIME_ZONE =
DateTimeUtils.DateTimeZoneIndex.getDateTimeZone(TimeZoneKey.UTC_KEY);
private static final String TIMESTAMP_ISO8601_STRING = "2001-08-22T03:04:05.321+00:00";
private static final DateTime TIMESTAMP = new DateTime(2001, 8, 22, 3, 4, 5, 321, UTC_TIME_ZONE);
// This is TIMESTAMP w/o TZ
private static long iso8601ToUtcEpochMillis(String iso8601) {
DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser().withOffsetParsed();
return formatter.parseDateTime(iso8601).getMillis();
}
private static void testDateTruncHelper(Schema schema, String literalInput, String unit, String tz, long expected)
throws Exception {
long zmillisInput = iso8601ToUtcEpochMillis(literalInput);
GenericRow row = new GenericRow();
row.init(ImmutableMap.of(TIME_COLUMN, zmillisInput));
List<GenericRow> rows = ImmutableList.of(row);
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COLUMN).build();
SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
String segmentName = "testSegment";
String indexDirPath =
Paths.get(Files.createTempDirectory("pinot_date_trunc_test").toAbsolutePath().toString(), segmentName)
.toAbsolutePath().toString();
try {
FileUtils.deleteQuietly(new File(indexDirPath));
config.setOutDir(indexDirPath);
config.setSegmentName(segmentName);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(config, new GenericRowRecordReader(rows));
driver.build();
IndexSegment indexSegment = ImmutableSegmentLoader.load(new File(indexDirPath, segmentName), ReadMode.heap);
Set<String> columnNames = indexSegment.getPhysicalColumnNames();
HashMap<String, DataSource> dataSourceMap = new HashMap<>(columnNames.size());
for (String columnName : columnNames) {
dataSourceMap.put(columnName, indexSegment.getDataSource(columnName));
}
ProjectionBlock projectionBlock = new ProjectionOperator(dataSourceMap,
new DocIdSetOperator(new MatchAllFilterOperator(rows.size()), DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock();
ExpressionContext expression = RequestContextUtils.getExpression(
String.format("dateTrunc('%s', \"%s\", '%s', '%s')", unit, TIME_COLUMN, TimeUnit.MILLISECONDS, tz));
TransformFunction transformFunction = TransformFunctionFactory.get(expression, dataSourceMap);
Assert.assertTrue(transformFunction instanceof DateTruncTransformFunction);
Assert.assertEquals(transformFunction.getName(), DateTruncTransformFunction.FUNCTION_NAME);
long[] longValues = transformFunction.transformToLongValuesSV(projectionBlock);
Assert.assertEquals(longValues[0], expected);
} finally {
FileUtils.deleteDirectory(new File(indexDirPath));
}
}
@Test
public void testPrestoCompatibleDateTimeConversionTransformFunction()
throws Exception {
Schema schemaTimeFieldSpec = new Schema.SchemaBuilder()
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();
testDateTrunc(schemaTimeFieldSpec);
Schema schemaDateTimeFieldSpec = new Schema.SchemaBuilder()
.addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
testDateTrunc(schemaDateTimeFieldSpec);
}
private void testDateTrunc(Schema schema)
throws Exception {
DateTime result = TIMESTAMP;
result = result.withMillisOfSecond(0);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "second", UTC_TIME_ZONE.getID(), result.getMillis());
result = result.withSecondOfMinute(0);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "minute", UTC_TIME_ZONE.getID(), result.getMillis());
result = result.withMinuteOfHour(0);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "hour", UTC_TIME_ZONE.getID(), result.getMillis());
result = result.withHourOfDay(0);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "day", UTC_TIME_ZONE.getID(), result.getMillis());
// ISO8601 week begins on Monday. For this timestamp (2001-08-22), 20th is the Monday of that week
result = result.withDayOfMonth(20);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "week", UTC_TIME_ZONE.getID(), result.getMillis());
result = result.withDayOfMonth(1);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "month", UTC_TIME_ZONE.getID(), result.getMillis());
result = result.withMonthOfYear(7);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "quarter", UTC_TIME_ZONE.getID(), result.getMillis());
result = result.withMonthOfYear(1);
testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "year", UTC_TIME_ZONE.getID(), result.getMillis());
result = WEIRD_TIMESTAMP;
result = result.withMillisOfSecond(0);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "second", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
result = result.withSecondOfMinute(0);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "minute", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
result = result.withMinuteOfHour(0);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "hour", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
result = result.withHourOfDay(0);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "day", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
result = result.withDayOfMonth(20);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "week", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
result = result.withDayOfMonth(1);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "month", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
result = result.withMonthOfYear(7);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "quarter", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
result = result.withMonthOfYear(1);
testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "year", WEIRD_DATE_TIME_ZONE.getID(),
result.getMillis());
}
}