/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pinot.core.operator.transform.function;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.function.DateTimeUtils;
import org.apache.pinot.common.function.TimeZoneKey;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.core.operator.DocIdSetOperator;
import org.apache.pinot.core.operator.ProjectionOperator;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.testng.Assert;
import org.testng.annotations.Test;


public class DateTruncTransformFunctionTest {
  private static final String TIME_COLUMN = "time";
  private static final ZoneOffset WEIRD_ZONE = ZoneOffset.ofHoursMinutes(7, 9);
  private static final DateTimeZone WEIRD_DATE_TIME_ZONE = DateTimeZone.forID(WEIRD_ZONE.getId());
  private static final DateTime WEIRD_TIMESTAMP = new DateTime(2001, 8, 22, 3, 4, 5, 321, WEIRD_DATE_TIME_ZONE);
  private static final String WEIRD_TIMESTAMP_ISO8601_STRING = "2001-08-22T03:04:05.321+07:09";
  private static final DateTimeZone UTC_TIME_ZONE =
      DateTimeUtils.DateTimeZoneIndex.getDateTimeZone(TimeZoneKey.UTC_KEY);
  private static final String TIMESTAMP_ISO8601_STRING = "2001-08-22T03:04:05.321+00:00";

  private static final DateTime TIMESTAMP = new DateTime(2001, 8, 22, 3, 4, 5, 321, UTC_TIME_ZONE);
  // This is TIMESTAMP w/o TZ

  private static long iso8601ToUtcEpochMillis(String iso8601) {
    DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser().withOffsetParsed();
    return formatter.parseDateTime(iso8601).getMillis();
  }

  private static void testDateTruncHelper(Schema schema, String literalInput, String unit, String tz, long expected)
      throws Exception {
    long zmillisInput = iso8601ToUtcEpochMillis(literalInput);
    GenericRow row = new GenericRow();
    row.init(ImmutableMap.of(TIME_COLUMN, zmillisInput));
    List<GenericRow> rows = ImmutableList.of(row);
    TableConfig tableConfig =
        new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COLUMN).build();

    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
    String segmentName = "testSegment";
    String indexDirPath =
        Paths.get(Files.createTempDirectory("pinot_date_trunc_test").toAbsolutePath().toString(), segmentName)
            .toAbsolutePath().toString();
    try {
      FileUtils.deleteQuietly(new File(indexDirPath));
      config.setOutDir(indexDirPath);
      config.setSegmentName(segmentName);
      SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
      driver.init(config, new GenericRowRecordReader(rows));
      driver.build();

      IndexSegment indexSegment = ImmutableSegmentLoader.load(new File(indexDirPath, segmentName), ReadMode.heap);
      Set<String> columnNames = indexSegment.getPhysicalColumnNames();
      HashMap<String, DataSource> dataSourceMap = new HashMap<>(columnNames.size());
      for (String columnName : columnNames) {
        dataSourceMap.put(columnName, indexSegment.getDataSource(columnName));
      }

      ProjectionBlock projectionBlock = new ProjectionOperator(dataSourceMap,
          new DocIdSetOperator(new MatchAllFilterOperator(rows.size()), DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock();

      ExpressionContext expression = RequestContextUtils.getExpression(
          String.format("dateTrunc('%s', \"%s\", '%s', '%s')", unit, TIME_COLUMN, TimeUnit.MILLISECONDS, tz));
      TransformFunction transformFunction = TransformFunctionFactory.get(expression, dataSourceMap);
      Assert.assertTrue(transformFunction instanceof DateTruncTransformFunction);
      Assert.assertEquals(transformFunction.getName(), DateTruncTransformFunction.FUNCTION_NAME);
      long[] longValues = transformFunction.transformToLongValuesSV(projectionBlock);
      Assert.assertEquals(longValues[0], expected);
    } finally {
      FileUtils.deleteDirectory(new File(indexDirPath));
    }
  }

  @Test
  public void testPrestoCompatibleDateTimeConversionTransformFunction()
      throws Exception {
    Schema schemaTimeFieldSpec = new Schema.SchemaBuilder()
        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();
    testDateTrunc(schemaTimeFieldSpec);

    Schema schemaDateTimeFieldSpec = new Schema.SchemaBuilder()
        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
    testDateTrunc(schemaDateTimeFieldSpec);
  }

  private void testDateTrunc(Schema schema)
      throws Exception {

    DateTime result = TIMESTAMP;
    result = result.withMillisOfSecond(0);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "second", UTC_TIME_ZONE.getID(), result.getMillis());

    result = result.withSecondOfMinute(0);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "minute", UTC_TIME_ZONE.getID(), result.getMillis());

    result = result.withMinuteOfHour(0);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "hour", UTC_TIME_ZONE.getID(), result.getMillis());

    result = result.withHourOfDay(0);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "day", UTC_TIME_ZONE.getID(), result.getMillis());

    // ISO8601 week begins on Monday. For this timestamp (2001-08-22), 20th is the Monday of that week
    result = result.withDayOfMonth(20);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "week", UTC_TIME_ZONE.getID(), result.getMillis());

    result = result.withDayOfMonth(1);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "month", UTC_TIME_ZONE.getID(), result.getMillis());

    result = result.withMonthOfYear(7);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "quarter", UTC_TIME_ZONE.getID(), result.getMillis());

    result = result.withMonthOfYear(1);
    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "year", UTC_TIME_ZONE.getID(), result.getMillis());

    result = WEIRD_TIMESTAMP;
    result = result.withMillisOfSecond(0);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "second", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());

    result = result.withSecondOfMinute(0);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "minute", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());

    result = result.withMinuteOfHour(0);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "hour", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());

    result = result.withHourOfDay(0);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "day", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());

    result = result.withDayOfMonth(20);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "week", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());

    result = result.withDayOfMonth(1);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "month", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());

    result = result.withMonthOfYear(7);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "quarter", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());

    result = result.withMonthOfYear(1);
    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "year", WEIRD_DATE_TIME_ZONE.getID(),
        result.getMillis());
  }
}
