/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.io.debezium;

import static org.apache.commons.lang.StringUtils.isBlank;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import java.io.IOException;
import java.util.UUID;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;

/**
 * A {@link DatabaseHistory} implementation that records schema changes as normal pulsar messages on the specified
 * topic, and that recovers the history by establishing a Kafka Consumer re-processing all messages on that topic.
 */
@Slf4j
@ThreadSafe
public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {

    public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
        .withDisplayName("Database history topic name")
        .withType(Type.STRING)
        .withWidth(Width.LONG)
        .withImportance(Importance.HIGH)
        .withDescription("The name of the topic for the database schema history")
        .withValidation(Field::isRequired);

    public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url")
        .withDisplayName("Pulsar service url")
        .withType(Type.STRING)
        .withWidth(Width.LONG)
        .withImportance(Importance.HIGH)
        .withDescription("Pulsar service url")
        .withValidation(Field::isOptional);

    public static final Field CLIENT_BUILDER = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder")
        .withDisplayName("Pulsar client builder")
        .withType(Type.STRING)
        .withWidth(Width.LONG)
        .withImportance(Importance.HIGH)
        .withDescription("Pulsar client builder")
        .withValidation(Field::isOptional);

    public static final Field.Set ALL_FIELDS = Field.setOf(
        TOPIC,
        SERVICE_URL,
        CLIENT_BUILDER,
        DatabaseHistory.NAME);

    private final DocumentReader reader = DocumentReader.defaultReader();
    private String topicName;
    private String dbHistoryName;
    private ClientBuilder clientBuilder;
    private volatile PulsarClient pulsarClient;
    private volatile Producer<String> producer;

    @Override
    public void configure(
            Configuration config,
            HistoryRecordComparator comparator,
            DatabaseHistoryListener listener,
            boolean useCatalogBeforeSchema) {
        super.configure(config, comparator, listener, useCatalogBeforeSchema);
        if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
            throw new IllegalArgumentException("Error configuring an instance of "
                + getClass().getSimpleName() + "; check the logs for details");
        }
        this.topicName = config.getString(TOPIC);

        String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
        if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) {
            throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
        }
        this.clientBuilder = PulsarClient.builder();
        if (!isBlank(clientBuilderBase64Encoded)) {
            // deserialize the client builder to the same classloader
            this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded,
                    this.clientBuilder.getClass().getClassLoader());
        } else {
            this.clientBuilder.serviceUrl(config.getString(SERVICE_URL));
        }

        // Copy the relevant portions of the configuration and add useful defaults ...
        this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());

        log.info("Configure to store the debezium database history {} to pulsar topic {}",
            dbHistoryName, topicName);
    }

    @Override
    public void initializeStorage() {
        super.initializeStorage();

        // try simple to publish an empty string to create topic
        try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
            p.send("");
        } catch (PulsarClientException pce) {
            log.error("Failed to initialize storage", pce);
            throw new RuntimeException("Failed to initialize storage", pce);
        }
    }

    void setupClientIfNeeded() {
        if (null == this.pulsarClient) {
            try {
                pulsarClient = clientBuilder.build();
            } catch (PulsarClientException e) {
                throw new RuntimeException("Failed to create pulsar client to pulsar cluster", e);
            }
        }
    }

    void setupProducerIfNeeded() {
        setupClientIfNeeded();
        if (null == this.producer) {
            try {
                this.producer = pulsarClient.newProducer(Schema.STRING)
                    .topic(topicName)
                    .producerName(dbHistoryName)
                    .blockIfQueueFull(true)
                    .create();
            } catch (PulsarClientException e) {
                log.error("Failed to create pulsar producer to topic '{}'", topicName);
                throw new RuntimeException("Failed to create pulsar producer to topic '"
                    + topicName, e);
            }
        }
    }

    @Override
    public void start() {
        super.start();
        setupProducerIfNeeded();
    }

    @Override
    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
        if (this.producer == null) {
            throw new IllegalStateException("No producer is available. Ensure that 'start()'"
                    + " is called before storing database history records.");
        }
        if (log.isTraceEnabled()) {
            log.trace("Storing record into database history: {}", record);
        }
        try {
            producer.send(record.toString());
        } catch (PulsarClientException e) {
            throw new DatabaseHistoryException(e);
        }
    }

    @Override
    public void stop() {
        try {
            if (this.producer != null) {
                try {
                    producer.flush();
                } catch (PulsarClientException pce) {
                    // ignore the error to ensure the client is eventually closed
                } finally {
                    this.producer.close();
                }
                this.producer = null;
            }
            if (this.pulsarClient != null) {
                pulsarClient.close();
                this.pulsarClient = null;
            }
        } catch (PulsarClientException pe) {
            log.warn("Failed to closing pulsar client", pe);
        }
    }

    @Override
    protected void recoverRecords(Consumer<HistoryRecord> records) {
        setupClientIfNeeded();
        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
                .topic(topicName)
                .startMessageId(MessageId.earliest)
            .create()
        ) {
            log.info("Scanning the database history topic '{}'", topicName);

            // Read all messages in the topic ...
            MessageId lastProcessedMessageId = null;

            // read the topic until the end
            while (historyReader.hasMessageAvailable()) {
                Message<String> msg = historyReader.readNext();
                try {
                    if (null == lastProcessedMessageId || lastProcessedMessageId.compareTo(msg.getMessageId()) < 0) {
                        if (!isBlank(msg.getValue())) {
                            HistoryRecord recordObj = new HistoryRecord(reader.read(msg.getValue()));
                            if (log.isTraceEnabled()) {
                                log.trace("Recovering database history: {}", recordObj);
                            }
                            if (!recordObj.isValid()) {
                                log.warn("Skipping invalid database history record '{}'. This is often not an issue,"
                                                + " but if it happens repeatedly please check the '{}' topic.",
                                    recordObj, topicName);
                            } else {
                                records.accept(recordObj);
                                log.trace("Recovered database history: {}", recordObj);
                            }
                        }
                        lastProcessedMessageId = msg.getMessageId();
                    }
                } catch (IOException ioe) {
                    log.error("Error while deserializing history record '{}'", msg.getValue(), ioe);
                } catch (final Exception e) {
                    throw e;
                }
            }
            log.info("Successfully completed scanning the database history topic '{}'", topicName);
        } catch (IOException ioe) {
            log.error("Encountered issues on recovering history records", ioe);
            throw new RuntimeException("Encountered issues on recovering history records", ioe);
        }
    }

    @Override
    public boolean exists() {
        setupClientIfNeeded();
        try (Reader<String> historyReader = pulsarClient.newReader(Schema.STRING)
                .topic(topicName)
                .startMessageId(MessageId.earliest)
            .create()
        ) {
            return historyReader.hasMessageAvailable();
        } catch (IOException e) {
            log.error("Encountered issues on checking existence of database history", e);
            throw new RuntimeException("Encountered issues on checking existence of database history", e);
        }
    }

    @Override
    public boolean storageExists() {
        return true;
    }

    @Override
    public String toString() {
        if (topicName != null) {
            return "Pulsar topic (" + topicName + ")";
        }
        return "Pulsar topic";
    }
}
