/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.component.debezium.configuration;

import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.embedded.spi.OffsetCommitPolicy;
import org.apache.camel.component.debezium.DebeziumConstants;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.ObjectHelper;
import org.apache.kafka.connect.json.JsonConverter;

@UriParams
public abstract class EmbeddedDebeziumConfiguration {

    private static final String LABEL_NAME = "consumer";

    private Class<?> connectorClass;
    // name
    @UriPath(label = LABEL_NAME)
    @Metadata(required = true)
    private String name;
    // offset.storage
    @UriParam(label = LABEL_NAME, defaultValue = "org.apache.kafka.connect.storage.FileOffsetBackingStore")
    private String offsetStorage = DebeziumConstants.DEFAULT_OFFSET_STORAGE;
    // offset.storage.file.filename
    @UriParam(label = LABEL_NAME)
    private String offsetStorageFileName;
    // offset.storage.topic
    @UriParam(label = LABEL_NAME)
    private String offsetStorageTopic;
    // offset.storage.partitions
    @UriParam(label = LABEL_NAME)
    private int offsetStoragePartitions;
    // offset.storage.replication.factor
    @UriParam(label = LABEL_NAME)
    private int offsetStorageReplicationFactor;
    // offset.commit.policy
    @UriParam(label = LABEL_NAME, defaultValue = "io.debezium.embedded.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy")
    private String offsetCommitPolicy = OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName();
    // offset.flush.interval.ms
    @UriParam(label = LABEL_NAME, defaultValue = "60000")
    private long offsetFlushIntervalMs = 60000;
    // offset.commit.timeout.ms
    @UriParam(label = LABEL_NAME, defaultValue = "5000")
    private long offsetCommitTimeoutMs = 5000;
    // internal.key.converter
    @UriParam(label = LABEL_NAME, defaultValue = "org.apache.kafka.connect.json.JsonConverter")
    private String internalKeyConverter = JsonConverter.class.getName();
    // internal.value.converter
    @UriParam(label = LABEL_NAME, defaultValue = "org.apache.kafka.connect.json.JsonConverter")
    private String internalValueConverter = JsonConverter.class.getName();

    public EmbeddedDebeziumConfiguration() {
        ObjectHelper.notNull(configureConnectorClass(), "connectorClass");
        this.connectorClass = configureConnectorClass();
    }

    /**
     * Configure the Debezium connector class that is supported by Debezium
     *
     * @return {@link Class}
     */
    protected abstract Class<?> configureConnectorClass();

    /**
     * Create a specific {@link Configuration} for a concrete configuration
     *
     * @return {@link Configuration}
     */
    protected abstract Configuration createConnectorConfiguration();

    /**
     * Validate a concrete configuration
     *
     * @return {@link ConfigurationValidation}
     */
    protected abstract ConfigurationValidation validateConnectorConfiguration();

    /**
     * The Debezium connector type that is supported by Camel Debezium component.
     *
     * @return {@link String}
     */
    public abstract String getConnectorDatabaseType();

    /**
     * Creates a Debezium configuration of type {@link Configuration} in order to be
     * used in the engine.
     *
     * @return {@link Configuration}
     */
    public Configuration createDebeziumConfiguration() {
        final Configuration connectorConfiguration = createConnectorConfiguration();

        ObjectHelper.notNull(connectorConfiguration, "createConnectorConfiguration");

        return Configuration.create().with(createDebeziumEmbeddedEngineConfiguration())
            .with(createConnectorConfiguration()).build();
    }

    private Configuration createDebeziumEmbeddedEngineConfiguration() {
        final Configuration.Builder configBuilder = Configuration.create();

        addPropertyIfNotNull(configBuilder, EmbeddedEngine.ENGINE_NAME, name);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName());
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE, offsetStorage);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME,
                             offsetStorageFileName);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_TOPIC, offsetStorageTopic);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_PARTITIONS,
                             offsetStoragePartitions);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR,
                             offsetStorageReplicationFactor);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_COMMIT_POLICY, offsetCommitPolicy);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, offsetFlushIntervalMs);
        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_COMMIT_TIMEOUT_MS, offsetCommitTimeoutMs);

        if (internalKeyConverter != null && internalValueConverter != null) {
            configBuilder.with("internal.key.converter", internalKeyConverter);
            configBuilder.with("internal.value.converter", internalValueConverter);
        }

        return configBuilder.build();
    }

    protected static <T> void addPropertyIfNotNull(final Configuration.Builder configBuilder,
                                                   final Field field, final T value) {
        if (value != null) {
            configBuilder.with(field, value);
        }
    }

    protected static <T> void addPropertyIfNotNull(final Configuration.Builder configBuilder,
                                                   final String key, final T value) {
        if (value != null) {
            configBuilder.with(key, value);
        }
    }

    /**
     * Validate all configurations defined and return
     * {@link ConfigurationValidation} instance which contains the validation
     * results
     *
     * @return {@link ConfigurationValidation}
     */
    public ConfigurationValidation validateConfiguration() {
        final ConfigurationValidation embeddedEngineValidation = validateDebeziumEmbeddedEngineConfiguration();
        // only if embeddedEngineValidation is true, we check the connector validation
        if (embeddedEngineValidation.isValid()) {
            final ConfigurationValidation connectorValidation = validateConnectorConfiguration();

            ObjectHelper.notNull(connectorValidation, "validateConnectorConfiguration");

            return connectorValidation;
        }
        return embeddedEngineValidation;
    }

    private ConfigurationValidation validateDebeziumEmbeddedEngineConfiguration() {
        if (isFieldValueNotSet(name)) {
            return ConfigurationValidation.notValid("Required field 'name' must be set.");
        }
        // check for offsetStorageFileName
        if (offsetStorage.equals(DebeziumConstants.DEFAULT_OFFSET_STORAGE)
            && isFieldValueNotSet(offsetStorageFileName)) {
            return ConfigurationValidation.notValid(String
                .format("Required field 'offsetStorageFileName' must be set since 'offsetStorage' is set to '%s'",
                        DebeziumConstants.DEFAULT_OFFSET_STORAGE));
        }
        return ConfigurationValidation.valid();
    }

    protected static boolean isFieldValueNotSet(final Object field) {
        return ObjectHelper.isEmpty(field);
    }

    /**
     * The name of the Java class for the connector
     */
    public Class<?> getConnectorClass() {
        return connectorClass;
    }

    public void setConnectorClass(Class<?> connectorClass) {
        this.connectorClass = connectorClass;
    }

    /**
     * Unique name for the connector. Attempting to register again with the same
     * name will fail.
     */
    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    /**
     * The name of the Java class that is responsible for persistence of connector
     * offsets.
     */
    public String getOffsetStorage() {
        return offsetStorage;
    }

    public void setOffsetStorage(String offsetStorage) {
        this.offsetStorage = offsetStorage;
    }

    /**
     * Path to file where offsets are to be stored. Required when offset.storage is
     * set to the FileOffsetBackingStore
     */
    public String getOffsetStorageFileName() {
        return offsetStorageFileName;
    }

    public void setOffsetStorageFileName(String offsetStorageFileName) {
        this.offsetStorageFileName = offsetStorageFileName;
    }

    /**
     * The name of the Kafka topic where offsets are to be stored. Required when
     * offset.storage is set to the KafkaOffsetBackingStore.
     */
    public String getOffsetStorageTopic() {
        return offsetStorageTopic;
    }

    public void setOffsetStorageTopic(String offsetStorageTopic) {
        this.offsetStorageTopic = offsetStorageTopic;
    }

    /**
     * Replication factor used when creating the offset storage topic. Required when
     * offset.storage is set to the KafkaOffsetBackingStore
     */
    public int getOffsetStorageReplicationFactor() {
        return offsetStorageReplicationFactor;
    }

    public void setOffsetStorageReplicationFactor(int offsetStorageReplicationFactor) {
        this.offsetStorageReplicationFactor = offsetStorageReplicationFactor;
    }

    /**
     * The name of the Java class of the commit policy. It defines when offsets
     * commit has to be triggered based on the number of events processed and the
     * time elapsed since the last commit. This class must implement the interface
     * 'OffsetCommitPolicy'. The default is a periodic commit policy based upon
     * time intervals.
     */
    public String getOffsetCommitPolicy() {
        return offsetCommitPolicy;
    }

    public void setOffsetCommitPolicy(String offsetCommitPolicy) {
        this.offsetCommitPolicy = offsetCommitPolicy;
    }

    /**
     * Interval at which to try committing offsets. The default is 1 minute.
     */
    public long getOffsetFlushIntervalMs() {
        return offsetFlushIntervalMs;
    }

    public void setOffsetFlushIntervalMs(long offsetFlushIntervalMs) {
        this.offsetFlushIntervalMs = offsetFlushIntervalMs;
    }

    /**
     * Maximum number of milliseconds to wait for records to flush and partition
     * offset data to be committed to offset storage before cancelling the process
     * and restoring the offset data to be committed in a future attempt. The
     * default is 5 seconds.
     */
    public long getOffsetCommitTimeoutMs() {
        return offsetCommitTimeoutMs;
    }

    public void setOffsetCommitTimeoutMs(long offsetCommitTimeoutMs) {
        this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
    }

    /**
     * The number of partitions used when creating the offset storage topic.
     * Required when offset.storage is set to the 'KafkaOffsetBackingStore'.
     */
    public int getOffsetStoragePartitions() {
        return offsetStoragePartitions;
    }

    public void setOffsetStoragePartitions(int offsetStoragePartitions) {
        this.offsetStoragePartitions = offsetStoragePartitions;
    }

    /**
     * The Converter class that should be used to serialize and deserialize key data
     * for offsets. The default is JSON converter.
     */
    public String getInternalKeyConverter() {
        return internalKeyConverter;
    }

    public void setInternalKeyConverter(String internalKeyConverter) {
        this.internalKeyConverter = internalKeyConverter;
    }

    /**
     * The Converter class that should be used to serialize and deserialize value
     * data for offsets. The default is JSON converter.
     */
    public String getInternalValueConverter() {
        return internalValueConverter;
    }

    public void setInternalValueConverter(String internalValueConverter) {
        this.internalValueConverter = internalValueConverter;
    }
}
