/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker;

import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.getMLTransactionLogName;
import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.ABORTING;
import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTING;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException.BrokerPersistenceException;
import org.apache.pulsar.client.api.PulsarClientException.ConnectException;
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMetadataStoreService {

    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);

    private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores;
    private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
    private final PulsarService pulsarService;
    private final TransactionBufferClient tbClient;
    private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
    private static final long endTransactionRetryIntervalTime = 1000;
    private final Timer transactionOpRetryTimer;
    // this semaphore for loading one transaction coordinator with the same tc id on the same time
    private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
    // one connect request opens the transactionMetaStore the other request will add to the queue, when the open op
    // finishes the request will be polled and will complete the future
    private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests;
    private final ExecutorService internalPinnedExecutor;

    private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;


    public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
                                           PulsarService pulsarService, TransactionBufferClient tbClient,
                                           HashedWheelTimer timer) {
        this.pulsarService = pulsarService;
        this.stores = new ConcurrentHashMap<>();
        this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
        this.tbClient = tbClient;
        this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
        this.transactionOpRetryTimer = timer;
        this.tcLoadSemaphores = ConcurrentLongHashMap.<Semaphore>newBuilder().build();
        this.pendingConnectRequests =
                ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build();
        ThreadFactory threadFactory =
                new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
    }

    public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        internalPinnedExecutor.execute(() -> {
            if (stores.get(tcId) != null) {
                completableFuture.complete(null);
            } else {
                pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames
                        .TRANSACTION_COORDINATOR_ASSIGN.getPartition((int) tcId.getId()).toString())
                        .thenRun(() -> internalPinnedExecutor.execute(() -> {
                    final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
                            .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
                    Deque<CompletableFuture<Void>> deque = pendingConnectRequests
                            .computeIfAbsent(tcId.getId(), (id) -> new ConcurrentLinkedDeque<>());
                    if (tcLoadSemaphore.tryAcquire()) {
                        // when tcLoadSemaphore.release(), this command will acquire semaphore,
                        // so we should jude the store exist again.
                        if (stores.get(tcId) != null) {
                            completableFuture.complete(null);
                            tcLoadSemaphore.release();
                            return;
                        }

                        TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
                        TransactionRecoverTracker recoverTracker =
                                new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
                                        timeoutTracker, tcId.getId());
                        openTransactionMetadataStore(tcId, timeoutTracker, recoverTracker).thenAccept(
                                store -> internalPinnedExecutor.execute(() -> {
                                    // TransactionMetadataStore initialization
                                    // need to use TransactionMetadataStore itself.
                                    // we need to put store into stores map before
                                    // handle committing and aborting transaction.
                                    stores.put(tcId, store);
                                    LOG.info("Added new transaction meta store {}", tcId);
                                    recoverTracker.handleCommittingAndAbortingTransaction();
                                    timeoutTracker.start();

                                    long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
                                    while (true) {
                                        // prevent thread in a busy loop.
                                        if (System.currentTimeMillis() < endTime) {
                                            CompletableFuture<Void> future = deque.poll();
                                            if (future != null) {
                                                // complete queue request future
                                                future.complete(null);
                                            } else {
                                                break;
                                            }
                                        } else {
                                            deque.clear();
                                            break;
                                        }
                                    }

                                    completableFuture.complete(null);
                                    tcLoadSemaphore.release();
                                })).exceptionally(e -> {
                            internalPinnedExecutor.execute(() -> {
                                Throwable realCause = FutureUtil.unwrapCompletionException(e);
                                completableFuture.completeExceptionally(realCause);
                                // release before handle request queue,
                                //in order to client reconnect infinite loop
                                tcLoadSemaphore.release();
                                long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT;
                                while (true) {
                                    // prevent thread in a busy loop.
                                    if (System.currentTimeMillis() < endTime) {
                                        CompletableFuture<Void> future = deque.poll();
                                        if (future != null) {
                                            // this means that this tc client connection connect fail
                                            future.completeExceptionally(realCause);
                                        } else {
                                            break;
                                        }
                                    } else {
                                        deque.clear();
                                        break;
                                    }
                                }
                                LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
                            });
                            return null;
                        });
                    } else {
                        // only one command can open transaction metadata store,
                        // other will be added to the deque, when the op of openTransactionMetadataStore finished
                        // then handle the requests witch in the queue
                        deque.add(completableFuture);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId);
                        }
                    }
                })).exceptionally(ex -> {
                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
                    completableFuture.completeExceptionally(realCause);
                    return null;
                });
            }
        });
        return completableFuture;
    }

    public CompletableFuture<TransactionMetadataStore>
    openTransactionMetadataStore(TransactionCoordinatorID tcId,
                                 TransactionTimeoutTracker timeoutTracker,
                                 TransactionRecoverTracker recoverTracker) {
        final Timer brokerClientSharedTimer = pulsarService.getBrokerClientSharedTimer();
        final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
        final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
        txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionLogBatchedWriteEnabled());
        txnLogBufferedWriterConfig
                .setBatchedWriteMaxRecords(serviceConfiguration.getTransactionLogBatchedWriteMaxRecords());
        txnLogBufferedWriterConfig.setBatchedWriteMaxSize(serviceConfiguration.getTransactionLogBatchedWriteMaxSize());
        txnLogBufferedWriterConfig
                .setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());

        return pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
                v -> transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
                        timeoutTracker, recoverTracker,
                        pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), txnLogBufferedWriterConfig,
                        brokerClientSharedTimer));
    }

    public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
        final Semaphore tcLoadSemaphore = this.tcLoadSemaphores
                .computeIfAbsent(tcId.getId(), (id) -> new Semaphore(1));
        if (tcLoadSemaphore.tryAcquire()) {
            TransactionMetadataStore metadataStore = stores.remove(tcId);
            if (metadataStore != null) {
                metadataStore.closeAsync().whenComplete((v, ex) -> {
                    if (ex != null) {
                        LOG.error("Close transaction metadata store with id " + tcId, ex);
                    } else {
                        LOG.info("Removed and closed transaction meta store {}", tcId);
                    }
                });
            }
            tcLoadSemaphore.release();
            return CompletableFuture.completedFuture(null);
        } else {
            return FutureUtil.failedFuture(
                    new ServiceUnitNotReadyException("Could not remove "
                            + "TransactionMetadataStore, it is doing other operations!"));
        }
    }

    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills,
                                                   String owner) {
        TransactionMetadataStore store = stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
        }
        return store.newTransaction(timeoutInMills, owner);
    }

    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
        TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
        }
        return store.addProducedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
        TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
        }
        return store.addAckedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnId) {
        TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
        }
        return store.getTxnMeta(txnId);
    }

    public long getLowWaterMark(TxnID txnID) {
        TransactionCoordinatorID tcId = getTcIdFromTxnId(txnID);
        TransactionMetadataStore store = stores.get(tcId);

        if (store == null) {
            return 0;
        }
        return store.getLowWaterMark();
    }

    public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus, TxnStatus expectedStatus,
                                                   boolean isTimeout) {
        TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
        }
        return store.updateTxnStatus(txnId, newStatus, expectedStatus, isTimeout);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) {
        CompletableFuture<Void> future = new CompletableFuture<>();
        endTransaction(txnID, txnAction, isTimeout, future);
        return future;
    }

    public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
                                                  CompletableFuture<Void> future) {
        TxnStatus newStatus;
        switch (txnAction) {
            case TxnAction.COMMIT_VALUE:
                newStatus = COMMITTING;
                break;
            case TxnAction.ABORT_VALUE:
                newStatus = ABORTING;
                break;
            default:
                TransactionCoordinatorException.UnsupportedTxnActionException exception =
                        new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction);
                LOG.error(exception.getMessage());
                future.completeExceptionally(exception);
                return;
        }
        getTxnMeta(txnID)
                .thenCompose(txnMeta -> {
                    if (txnMeta.status() == TxnStatus.OPEN) {
                        return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout)
                                .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
                    }
                    return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus)
                            .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
                }).whenComplete((__, ex)-> {
                    if (ex == null) {
                        future.complete(null);
                        return;
                    }
                    if (!isRetryableException(ex)) {
                        LOG.error("End transaction fail! TxnId : {}, "
                                + "TxnAction : {}", txnID, txnAction, ex);
                        future.completeExceptionally(ex);
                        return;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
                                + "TxnAction : {}", txnID, txnAction, ex);
                    }
                    transactionOpRetryTimer.newTimeout(timeout ->
                                    endTransaction(txnID, txnAction, isTimeout, future),
                            endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
                });
    }

    private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int txnAction,
                                                          TxnID txnID, TxnStatus expectStatus) {
        boolean isLegal = switch (txnStatus) {
            case COMMITTING -> (txnAction == TxnAction.COMMIT.getValue());
            case ABORTING -> (txnAction == TxnAction.ABORT.getValue());
            default -> false;
        };
        if (!isLegal) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction);
            }
            return FutureUtil.failedFuture(
                    new InvalidTxnStatusException(txnID, expectStatus, txnStatus));
        }
       return CompletableFuture.completedFuture(null);
    }

    // when managedLedger fence will remove this tc and reload
    public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) {
        if (e instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            removeTransactionMetadataStore(tcId);
        }
    }

    public void endTransactionForTimeout(TxnID txnID) {
        getTxnMeta(txnID).thenCompose(txnMeta -> {
            if (txnMeta.status() == TxnStatus.OPEN) {
                return endTransaction(txnID, TxnAction.ABORT_VALUE, true);
            } else {
                return null;
            }
        }).exceptionally(e -> {
            if (isRetryableException(e)) {
                endTransaction(txnID, TxnAction.ABORT_VALUE, true);
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Transaction have been handle complete, "
                            + "don't need to handle by transaction timeout! TxnId : {}", txnID);
                }
            }
            return null;
        });
    }

    private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) {
        return getTxnMeta(txnID)
                .thenCompose(txnMeta -> {
                    long lowWaterMark = getLowWaterMark(txnID);
                    Stream<CompletableFuture<?>> onSubFutureStream = txnMeta.ackedPartitions().stream().map(tbSub -> {
                        switch (txnAction) {
                            case TxnAction.COMMIT_VALUE:
                                return tbClient.commitTxnOnSubscription(
                                        tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
                                        txnID.getLeastSigBits(), lowWaterMark);
                            case TxnAction.ABORT_VALUE:
                                return tbClient.abortTxnOnSubscription(
                                        tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
                                        txnID.getLeastSigBits(), lowWaterMark);
                            default:
                                return FutureUtil.failedFuture(
                                        new IllegalStateException("Unsupported txnAction " + txnAction));
                        }
                    });
                    Stream<CompletableFuture<?>> onTopicFutureStream =
                            txnMeta.producedPartitions().stream().map(partition -> {
                                switch (txnAction) {
                                    case TxnAction.COMMIT_VALUE:
                                        return tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(),
                                                txnID.getLeastSigBits(), lowWaterMark);
                                    case TxnAction.ABORT_VALUE:
                                        return tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(),
                                            txnID.getLeastSigBits(), lowWaterMark);
                                    default:
                                        return FutureUtil.failedFuture(
                                                new IllegalStateException("Unsupported txnAction " + txnAction));
                        }
                    });
                    return FutureUtil.waitForAll(Stream.concat(onSubFutureStream, onTopicFutureStream)
                                    .collect(Collectors.toList()))
                            .thenCompose(__ -> endTxnInTransactionMetadataStore(txnID, txnAction));
                });
    }

    private static boolean isRetryableException(Throwable ex) {
        Throwable realCause = FutureUtil.unwrapCompletionException(ex);
        return (realCause instanceof TransactionMetadataStoreStateException
                || realCause instanceof RequestTimeoutException
                || realCause instanceof ManagedLedgerException
                || realCause instanceof BrokerPersistenceException
                || realCause instanceof LookupException
                || realCause instanceof ReachMaxPendingOpsException
                || realCause instanceof ConnectException)
                && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException);
    }

    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
        if (TxnAction.COMMIT.getValue() == txnAction) {
            return updateTxnStatus(txnID, TxnStatus.COMMITTED, COMMITTING, false);
        } else if (TxnAction.ABORT.getValue() == txnAction) {
            return updateTxnStatus(txnID, TxnStatus.ABORTED, ABORTING, false);
        } else {
            return FutureUtil.failedFuture(new InvalidTxnStatusException("Unsupported txnAction " + txnAction));
        }
    }

    private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
        return new TransactionCoordinatorID(txnId.getMostSigBits());
    }

    @VisibleForTesting
    public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
        return Collections.unmodifiableMap(stores);
    }

    public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String checkOwner) {
        return getTxnMeta(txnID)
                .thenCompose(meta -> {
                    // owner was null in the old versions or no auth enabled
                    if (meta.getOwner() == null) {
                        return CompletableFuture.completedFuture(true);
                    }
                    if (meta.getOwner().equals(checkOwner)) {
                        return CompletableFuture.completedFuture(true);
                    }
                    return CompletableFuture.completedFuture(false);
                });
    }


    public void close () {
        this.internalPinnedExecutor.shutdown();
        stores.forEach((tcId, metadataStore) ->
            metadataStore.closeAsync().whenComplete((v, ex) -> {
                if (ex != null) {
                    LOG.error("Close transaction metadata store with id " + tcId, ex);
                } else {
                    LOG.info("Removed and closed transaction meta store {}", tcId);
                }
        }));
        stores.clear();
    }
}
