/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;

/**
 * The in-memory implementation of {@link TransactionBuffer}.
 */
class InMemTransactionBuffer implements TransactionBuffer {

    /**
     * A class represents the buffer of a transaction.
     */
    private static class TxnBuffer implements TransactionMeta, AutoCloseable {

        private final TxnID txnid;
        private final SortedMap<Long, ByteBuf> entries;
        private TxnStatus status;
        private long committedAtLedgerId = -1L;
        private long committedAtEntryId = -1L;

        TxnBuffer(TxnID txnid) {
            this.txnid = txnid;
            this.entries = new TreeMap<>();
            this.status = TxnStatus.OPEN;
        }

        @Override
        public TxnID id() {
            return txnid;
        }

        @Override
        public synchronized TxnStatus status() {
            return status;
        }

        @Override
        public int numEntries() {
            synchronized (entries) {
                return entries.size();
            }
        }

        @Override
        public int numMessageInTxn() throws TransactionBufferException.TransactionStatusException {
            return -1;
        }

        @Override
        public long committedAtLedgerId() {
            return committedAtLedgerId;
        }

        @Override
        public long committedAtEntryId() {
            return committedAtEntryId;
        }

        @Override
        public long lastSequenceId() {
            return entries.lastKey();
        }

        @Override
        public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) {
            return FutureUtil.failedFuture(new UnsupportedOperationException());
        }

        @Override
        public CompletableFuture<Position> appendEntry(long sequenceId, Position position, int batchSize) {
            return FutureUtil.failedFuture(new UnsupportedOperationException());
        }

        @Override
        public CompletableFuture<TransactionMeta> committingTxn() {
            status = TxnStatus.COMMITTING;
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) {
            try {
                return CompletableFuture.completedFuture(commitAt(committedAtLedgerId, committedAtEntryId));
            } catch (TransactionBufferException.TransactionStatusException e) {
                return FutureUtil.failedFuture(e);
            }
        }

        @Override
        public CompletableFuture<TransactionMeta> abortTxn() {
            try {
                return CompletableFuture.completedFuture(abort());
            } catch (TransactionBufferException.TransactionStatusException e) {
                return FutureUtil.failedFuture(e);
            }
        }

        synchronized TxnBuffer abort() throws TransactionBufferException.TransactionStatusException {
            if (TxnStatus.OPEN != status) {
                throw new TransactionBufferException.TransactionStatusException(txnid, TxnStatus.OPEN, status);
            }
            this.status = TxnStatus.ABORTED;
            return this;
        }

        synchronized TxnBuffer commitAt(long committedAtLedgerId, long committedAtEntryId)
                throws TransactionBufferException.TransactionStatusException {
            if (TxnStatus.OPEN != status) {
                throw new TransactionBufferException.TransactionStatusException(txnid, TxnStatus.OPEN, status);
            }

            this.committedAtLedgerId = committedAtLedgerId;
            this.committedAtEntryId = committedAtEntryId;
            this.status = TxnStatus.COMMITTED;

            return this;
        }

        @Override
        public void close() {
            synchronized (entries) {
                entries.forEach((sequenceId, buffer) -> {
                    buffer.release();
                });
                entries.clear();
            }
        }

        public void appendEntry(long sequenceId, ByteBuf entry) throws
                TransactionBufferException.TransactionSealedException {
            synchronized (this) {
                if (TxnStatus.OPEN != status) {
                    // the transaction is not open anymore, reject the append operations
                    throw new TransactionBufferException
                            .TransactionSealedException("Transaction `" + txnid + "` is already sealed");
                }
            }

            synchronized (this.entries) {
                this.entries.put(sequenceId, entry);
            }
        }

        public TransactionBufferReader newReader(long sequenceId) throws
                TransactionBufferException.TransactionNotSealedException {
            synchronized (this) {
                if (TxnStatus.COMMITTED != status) {
                    // the transaction is not committed yet, hence the buffer is not sealed
                    throw new TransactionBufferException
                            .TransactionNotSealedException("Transaction `" + txnid + "` is not sealed yet");
                }
            }

            final SortedMap<Long, ByteBuf> entriesToRead = new TreeMap<>();
            synchronized (entries) {
                SortedMap<Long, ByteBuf> subEntries = entries.tailMap(sequenceId);
                subEntries.values().forEach(value -> value.retain());
                entriesToRead.putAll(subEntries);
            }

            return new InMemTransactionBufferReader(
                txnid,
                entriesToRead.entrySet().iterator(),
                committedAtLedgerId,
                committedAtEntryId
            );
        }

    }

    final ConcurrentMap<TxnID, TxnBuffer> buffers;
    final Map<Long, Set<TxnID>> txnIndex;
    public InMemTransactionBuffer(Topic topic) {
        this.buffers = new ConcurrentHashMap<>();
        this.txnIndex = new HashMap<>();
    }

    @Override
    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
        CompletableFuture<TransactionMeta> getFuture = new CompletableFuture<>();
        try {
            getFuture.complete(getTxnBufferOrThrowNotFoundException(txnID));
        } catch (TransactionBufferException.TransactionNotFoundException e) {
            getFuture.completeExceptionally(e);
        }
        return getFuture;
    }

    private TxnBuffer getTxnBufferOrThrowNotFoundException(TxnID txnID)
            throws TransactionBufferException.TransactionNotFoundException {
        TxnBuffer buffer = buffers.get(txnID);
        if (null == buffer) {
            throw new TransactionBufferException.TransactionNotFoundException(
                "Transaction `" + txnID + "` doesn't exist in the transaction buffer");
        }
        return buffer;
    }

    private TxnBuffer getTxnBufferOrCreateIfNotExist(TxnID txnID) {
        TxnBuffer buffer = buffers.get(txnID);
        if (null == buffer) {
            TxnBuffer newBuffer = new TxnBuffer(txnID);
            TxnBuffer oldBuffer = buffers.putIfAbsent(txnID, newBuffer);
            if (null != oldBuffer) {
                newBuffer.close();
                return oldBuffer;
            } else {
                return newBuffer;
            }
        } else {
            return buffer;
        }
    }

    @Override
    public CompletableFuture<Position> appendBufferToTxn(TxnID txnId,
                                                     long sequenceId,
                                                     ByteBuf buffer) {
        TxnBuffer txnBuffer = getTxnBufferOrCreateIfNotExist(txnId);

        CompletableFuture appendFuture = new CompletableFuture();
        try {
            txnBuffer.appendEntry(sequenceId, buffer);
            appendFuture.complete(null);
        } catch (TransactionBufferException.TransactionSealedException e) {
            appendFuture.completeExceptionally(e);
        }
        return appendFuture;
    }

    @Override
    public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(
            TxnID txnID, long startSequenceId) {
        CompletableFuture<TransactionBufferReader> openFuture = new CompletableFuture<>();
        try {
            TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
            TransactionBufferReader reader = txnBuffer.newReader(startSequenceId);
            openFuture.complete(reader);
        } catch (TransactionBufferException.TransactionNotFoundException
                | TransactionBufferException.TransactionNotSealedException e) {
            openFuture.completeExceptionally(e);
        }
        return openFuture;
    }

    @Override
    public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {
        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
        try {
            TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
            synchronized (txnBuffer) {
                // the committed position should be generated
                long committedAtLedgerId = -1L;
                long committedAtEntryId = -1L;
                txnBuffer.commitAt(committedAtLedgerId, committedAtEntryId);
                addTxnToTxnIdex(txnID, committedAtLedgerId);
            }
            commitFuture.complete(null);
        } catch (TransactionBufferException.TransactionNotFoundException
                | TransactionBufferException.TransactionStatusException e) {
            commitFuture.completeExceptionally(e);
        }
        return commitFuture;
    }

    private void addTxnToTxnIdex(TxnID txnId, long committedAtLedgerId) {
        synchronized (txnIndex) {
            txnIndex
                .computeIfAbsent(committedAtLedgerId, ledgerId -> new HashSet<>())
                .add(txnId);
        }
    }

    @Override
    public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
        CompletableFuture<Void> abortFuture = new CompletableFuture<>();

        try {
            TxnBuffer txnBuffer = getTxnBufferOrThrowNotFoundException(txnID);
            txnBuffer.abort();
            buffers.remove(txnID, txnBuffer);
            abortFuture.complete(null);
        } catch (TransactionBufferException.TransactionNotFoundException
                | TransactionBufferException.TransactionStatusException e) {
            abortFuture.completeExceptionally(e);
        }

        return abortFuture;
    }

    @Override
    public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
        List<TxnBuffer> buffersToPurge = new ArrayList<>();
        synchronized (txnIndex) {
            dataLedgers.forEach(ledger -> {
                Set<TxnID> txns = txnIndex.remove(ledger);
                txns.forEach(txnId -> {
                    TxnBuffer tb = buffers.remove(txnId);
                    if (null != tb) {
                        buffersToPurge.add(tb);
                    }
                });
            });
        }

        buffersToPurge.forEach(TxnBuffer::close);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> clearSnapshot() {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        buffers.values().forEach(TxnBuffer::close);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public boolean isTxnAborted(TxnID txnID) {
        return false;
    }

    @Override
    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
        //no-op
    }

    @Override
    public PositionImpl getMaxReadPosition() {
        return PositionImpl.LATEST;
    }

    @Override
    public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
        return null;
    }

    @Override
    public TransactionBufferStats getStats(boolean lowWaterMarks) {
        return null;
    }

    @Override
    public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxn) {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public long getOngoingTxnCount() {
        return this.buffers.values().stream()
                .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.OPEN)
                        || txnBuffer.status.equals(TxnStatus.COMMITTING)
                        || txnBuffer.status.equals(TxnStatus.ABORTING)
                )
                .count();
    }

    @Override
    public long getAbortedTxnCount() {
        return this.buffers.values().stream()
                .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.ABORTED))
                .count();
    }

    @Override
    public long getCommittedTxnCount() {
        return this.buffers.values().stream()
                .filter(txnBuffer -> txnBuffer.status.equals(TxnStatus.COMMITTED))
                .count();
    }
}
