/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.flink.statefun.flink.io.kinesis;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import org.apache.flink.statefun.flink.io.spi.SourceProvider;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressSpec;
import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;

public final class KinesisSourceProvider implements SourceProvider {

  @Override
  public <T> SourceFunction<T> forSpec(IngressSpec<T> spec) {
    final KinesisIngressSpec<T> kinesisIngressSpec = asKinesisSpec(spec);

    return new FlinkKinesisConsumer<>(
        kinesisIngressSpec.streams(),
        deserializationSchemaFromSpec(kinesisIngressSpec),
        propertiesFromSpec(kinesisIngressSpec));
  }

  private static <T> KinesisIngressSpec<T> asKinesisSpec(IngressSpec<T> spec) {
    if (spec instanceof KinesisIngressSpec) {
      return (KinesisIngressSpec<T>) spec;
    }
    if (spec == null) {
      throw new NullPointerException("Unable to translate a NULL spec");
    }
    throw new IllegalArgumentException(String.format("Wrong type %s", spec.type()));
  }

  private static <T> KinesisDeserializationSchema<T> deserializationSchemaFromSpec(
      KinesisIngressSpec<T> spec) {
    return new KinesisDeserializationSchemaDelegate<>(spec.deserializer());
  }

  private static Properties propertiesFromSpec(KinesisIngressSpec<?> spec) {
    final Properties properties = new Properties();

    properties.putAll(resolveProperties(spec.properties()));
    spec.awsRegion()
        .transformPropertiesIfPresent(
            properties,
            ConsumerConfigConstants.AWS_REGION,
            (props, region) ->
                properties.putAll(AwsAuthConfigProperties.forAwsRegionConsumerProps(region)));
    spec.awsCredentials()
        .transformPropertiesIfPresent(
            properties,
            ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER,
            (props, credentials) ->
                properties.putAll(AwsAuthConfigProperties.forAwsCredentials(credentials)));

    setStartupPositionProperties(properties, spec.startupPosition());

    return properties;
  }

  private static Properties resolveProperties(Properties properties) {
    final Properties resolvedProps = new Properties();
    for (String property : properties.stringPropertyNames()) {
      if (property.startsWith("flink.") || property.startsWith("aws.")) {
        resolvedProps.setProperty(property, properties.getProperty(property));
      } else {
        // all other configs are assumed to be AWS configs
        resolvedProps.setProperty(
            asAwsClientPropertyKey(property), properties.getProperty(property));
      }
    }
    return resolvedProps;
  }

  private static void setStartupPositionProperties(
      Properties properties, KinesisIngressStartupPosition startupPosition) {
    if (startupPosition.isEarliest()) {
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
          ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
    } else if (startupPosition.isLatest()) {
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
          ConsumerConfigConstants.InitialPosition.LATEST.name());
    } else if (startupPosition.isDate()) {
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_POSITION,
          ConsumerConfigConstants.InitialPosition.AT_TIMESTAMP.name());

      final ZonedDateTime startupDate = startupPosition.asDate().date();
      final DateTimeFormatter formatter =
          DateTimeFormatter.ofPattern(ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
      properties.setProperty(
          ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, startupDate.format(formatter));
    } else {
      throw new IllegalStateException(
          "Unrecognized ingress startup position type: " + startupPosition);
    }
  }

  private static String asAwsClientPropertyKey(String key) {
    return AWSUtil.AWS_CLIENT_CONFIG_PREFIX + lowercaseFirstLetter(key);
  }

  private static String lowercaseFirstLetter(String string) {
    final char[] chars = string.toCharArray();
    chars[0] = Character.toLowerCase(chars[0]);
    return new String(chars);
  }
}
