/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog;

import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.function.GetLastTxIdFunction;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
import org.apache.distributedlog.logsegment.RollingPolicy;
import org.apache.distributedlog.logsegment.SizeBasedRollingPolicy;
import org.apache.distributedlog.logsegment.TimeBasedRollingPolicy;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.util.Allocator;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Log Handler for Writers.
 *
 * <h3>Metrics</h3>
 * All the metrics about log write handler are exposed under scope `segments`.
 * <ul>
 * <li> `segments`/open : opstats. latency characteristics on starting a new log segment.
 * <li> `segments`/close : opstats. latency characteristics on completing an inprogress log segment.
 * <li> `segments`/recover : opstats. latency characteristics on recovering a log segment.
 * <li> `segments`/delete : opstats. latency characteristics on deleting a log segment.
 * </ul>
 */
class BKLogWriteHandler extends BKLogHandler {
    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);

    private static Transaction.OpListener<LogSegmentEntryWriter> NULL_OP_LISTENER =
            new Transaction.OpListener<LogSegmentEntryWriter>() {
        @Override
        public void onCommit(LogSegmentEntryWriter r) {
            // no-op
        }

        @Override
        public void onAbort(Throwable t) {
            // no-op
        }
    };

    protected final LogMetadataForWriter logMetadataForWriter;
    protected final Allocator<LogSegmentEntryWriter, Object> logSegmentAllocator;
    protected final DistributedLock lock;
    protected final MaxTxId maxTxId;
    protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
    protected final boolean validateLogSegmentSequenceNumber;
    protected final int regionId;
    protected final RollingPolicy rollingPolicy;
    protected CompletableFuture<? extends DistributedLock> lockFuture = null;
    protected final PermitLimiter writeLimiter;
    protected final FeatureProvider featureProvider;
    protected final DynamicDistributedLogConfiguration dynConf;
    protected final MetadataUpdater metadataUpdater;
    // tracking the inprogress log segments
    protected final LinkedList<Long> inprogressLSSNs;

    // Fetch LogSegments State: write can continue without full list of log segments while truncation needs
    private final CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
    private CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;

    // Recover Functions
    private final RecoverLogSegmentFunction recoverLogSegmentFunction =
            new RecoverLogSegmentFunction();
    private final Function<List<LogSegmentMetadata>, CompletableFuture<Long>> recoverLogSegmentsFunction =
            new Function<List<LogSegmentMetadata>, CompletableFuture<Long>>() {
                @Override
                public CompletableFuture<Long> apply(List<LogSegmentMetadata> segmentList) {
                    LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList);
                    // if lastLedgerRollingTimeMillis is not updated, we set it to now.
                    synchronized (BKLogWriteHandler.this) {
                        if (lastLedgerRollingTimeMillis < 0) {
                            lastLedgerRollingTimeMillis = Utils.nowInMillis();
                        }
                    }

                    if (validateLogSegmentSequenceNumber) {
                        synchronized (inprogressLSSNs) {
                            for (LogSegmentMetadata segment : segmentList) {
                                if (segment.isInProgress()) {
                                    inprogressLSSNs.addLast(segment.getLogSegmentSequenceNumber());
                                }
                            }
                        }
                    }

                    return FutureUtils.processList(
                        segmentList,
                        recoverLogSegmentFunction,
                        scheduler
                    ).thenApply(GetLastTxIdFunction.INSTANCE);
                }
            };

    // Stats
    private final StatsLogger perLogStatsLogger;
    private final OpStatsLogger closeOpStats;
    private final OpStatsLogger openOpStats;
    private final OpStatsLogger recoverOpStats;
    private final OpStatsLogger deleteOpStats;

    /**
     * Construct a Bookkeeper journal manager.
     */
    BKLogWriteHandler(LogMetadataForWriter logMetadata,
                      DistributedLogConfiguration conf,
                      LogStreamMetadataStore streamMetadataStore,
                      LogSegmentMetadataCache metadataCache,
                      LogSegmentEntryStore entryStore,
                      OrderedScheduler scheduler,
                      Allocator<LogSegmentEntryWriter, Object> segmentAllocator,
                      StatsLogger statsLogger,
                      StatsLogger perLogStatsLogger,
                      AlertStatsLogger alertStatsLogger,
                      String clientId,
                      int regionId,
                      PermitLimiter writeLimiter,
                      FeatureProvider featureProvider,
                      DynamicDistributedLogConfiguration dynConf,
                      DistributedLock lock /** owned by handler **/) {
        super(logMetadata,
                conf,
                streamMetadataStore,
                metadataCache,
                entryStore,
                scheduler,
                statsLogger,
                alertStatsLogger,
                clientId);
        this.logMetadataForWriter = logMetadata;
        this.logSegmentAllocator = segmentAllocator;
        this.perLogStatsLogger = perLogStatsLogger;
        this.writeLimiter = writeLimiter;
        this.featureProvider = featureProvider;
        this.dynConf = dynConf;
        this.lock = lock;
        this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);

        if (conf.getEncodeRegionIDInLogSegmentMetadata()) {
            this.regionId = regionId;
        } else {
            this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
        }
        this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled();

        // Construct the max sequence no
        maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData());
        inprogressLSSNs = new LinkedList<Long>();
        // Construct the max txn id.
        maxTxId = new MaxTxId(logMetadata.getMaxTxIdData());

        // Schedule fetching log segment list in background before we access it.
        // We don't need to watch the log segment list changes for writer, as it manages log segment list.
        fetchForWrite = readLogSegmentsFromStore(
                LogSegmentMetadata.COMPARATOR,
                WRITE_HANDLE_FILTER,
                null);

        // Initialize other parameters.
        setLastLedgerRollingTimeMillis(Utils.nowInMillis());

        // Rolling Policy
        if (conf.getLogSegmentRollingIntervalMinutes() > 0) {
            rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L);
        } else {
            rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes());
        }

        // Stats
        StatsLogger segmentsStatsLogger = statsLogger.scope("segments");
        openOpStats = segmentsStatsLogger.getOpStatsLogger("open");
        closeOpStats = segmentsStatsLogger.getOpStatsLogger("close");
        recoverOpStats = segmentsStatsLogger.getOpStatsLogger("recover");
        deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
    }

    private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
            final Comparator<LogSegmentMetadata> comparator) {
        final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
        fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
            @Override
            public void onFailure(Throwable cause) {
                promise.completeExceptionally(cause);
            }

            @Override
            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
                try {
                    promise.complete(getCachedLogSegments(comparator));
                } catch (UnexpectedException e) {
                    promise.completeExceptionally(e);
                }
            }
        });
        return promise;
    }

    private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
            final Comparator<LogSegmentMetadata> comparator) {
        CompletableFuture<Versioned<List<LogSegmentMetadata>>> result;
        synchronized (this) {
            if (null == fetchForTruncation) {
                fetchForTruncation = readLogSegmentsFromStore(
                        LogSegmentMetadata.COMPARATOR,
                        LogSegmentFilter.DEFAULT_FILTER,
                        null);
            }
            result = fetchForTruncation;
        }

        final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
        result.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(promise, cause);
            }

            @Override
            public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
                try {
                    FutureUtils.complete(promise, getCachedLogSegments(comparator));
                } catch (UnexpectedException e) {
                    FutureUtils.completeExceptionally(promise, e);
                }
            }
        });
        return promise;
    }

    // Transactional operations for MaxLogSegmentSequenceNo
    void storeMaxSequenceNumber(final Transaction<Object> txn,
                                final MaxLogSegmentSequenceNo maxSeqNo,
                                final long seqNo,
                                final boolean isInprogress) {
        metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo),
                new Transaction.OpListener<Version>() {
            @Override
            public void onCommit(Version version) {
                if (validateLogSegmentSequenceNumber) {
                    synchronized (inprogressLSSNs) {
                        if (isInprogress) {
                            inprogressLSSNs.add(seqNo);
                        } else {
                            inprogressLSSNs.removeFirst();
                        }
                    }
                }
                maxSeqNo.update(version, seqNo);
            }

            @Override
            public void onAbort(Throwable t) {
                // no-op
            }
        });
    }

    // Transactional operations for MaxTxId
    void storeMaxTxId(final Transaction<Object> txn,
                      final MaxTxId maxTxId,
                      final long txId) {
        metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId),
                new Transaction.OpListener<Version>() {
                    @Override
                    public void onCommit(Version version) {
                                                        maxTxId.update(version, txId);
                                                                                      }

                    @Override
                    public void onAbort(Throwable t) {
                        // no-op
                    }
                });
    }

    // Transactional operations for logsegment
    void writeLogSegment(final Transaction<Object> txn,
                         final LogSegmentMetadata metadata) {
        metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
            @Override
            public void onCommit(Void r) {
                addLogSegmentToCache(metadata.getSegmentName(), metadata);
            }

            @Override
            public void onAbort(Throwable t) {
                // no-op
            }
        });
    }

    void deleteLogSegment(final Transaction<Object> txn,
                          final LogSegmentMetadata metadata) {
        metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
            @Override
            public void onCommit(Void r) {
                removeLogSegmentFromCache(metadata.getSegmentName());
            }

            @Override
            public void onAbort(Throwable t) {
                // no-op
            }
        });
    }

    /**
     * The caller could call this before any actions, which to hold the lock for
     * the write handler of its whole lifecycle. The lock will only be released
     * when closing the write handler.
     *
     * This method is useful to prevent releasing underlying zookeeper lock during
     * recovering/completing log segments. Releasing underlying zookeeper lock means
     * 1) increase latency when re-lock on starting new log segment. 2) increase the
     * possibility of a stream being re-acquired by other instances.
     *
     * @return future represents the lock result
     */
    CompletableFuture<? extends DistributedLock> lockHandler() {
        if (null != lockFuture) {
            return lockFuture;
        }
        lockFuture = lock.asyncAcquire();
        return lockFuture;
    }

    CompletableFuture<Void> unlockHandler() {
        if (null != lockFuture) {
            return lock.asyncClose();
        } else {
            return FutureUtils.Void();
        }
    }

    /**
     * Start a new log segment in a BookKeeper ledger.
     * First ensure that we have the write lock for this journal.
     * Then create a ledger and stream based on that ledger.
     * The ledger id is written to the inprogress znode, so that in the
     * case of a crash, a recovery process can find the ledger we were writing
     * to when we crashed.
     *
     * @param txId First transaction id to be written to the stream
     * @return
     * @throws IOException
     */
    public BKLogSegmentWriter startLogSegment(long txId) throws IOException {
        return startLogSegment(txId, false, false);
    }

    /**
     * Start a new log segment in a BookKeeper ledger.
     * First ensure that we have the write lock for this journal.
     * Then create a ledger and stream based on that ledger.
     * The ledger id is written to the inprogress znode, so that in the
     * case of a crash, a recovery process can find the ledger we were writing
     * to when we crashed.
     *
     * @param txId First transaction id to be written to the stream
     * @param bestEffort
     * @param allowMaxTxID
     *          allow using max tx id to start log segment
     * @return
     * @throws IOException
     */
    public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID)
            throws IOException {
        Stopwatch stopwatch = Stopwatch.createStarted();
        boolean success = false;
        try {
            BKLogSegmentWriter writer = doStartLogSegment(txId, bestEffort, allowMaxTxID);
            success = true;
            return writer;
        } finally {
            if (success) {
                openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            } else {
                openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            }
        }
    }

    protected long assignLogSegmentSequenceNumber() throws IOException {
        // For any active stream we will always make sure that there is at least one
        // active ledger (except when the stream first starts out). Therefore when we
        // see no ledger metadata for a stream, we assume that this is the first ledger
        // in the stream
        long logSegmentSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
        boolean logSegmentsFound = false;

        if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) {
            List<LogSegmentMetadata> ledgerListDesc = getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
            Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc);

            if (null == nextLogSegmentSeqNo) {
                logSegmentsFound = false;
                // we don't find last assigned log segment sequence number
                // then we start the log segment with configured FirstLogSegmentSequenceNumber.
                logSegmentSeqNo = conf.getFirstLogSegmentSequenceNumber();
            } else {
                logSegmentsFound = true;
                // latest log segment is assigned with a sequence number, start with next sequence number
                logSegmentSeqNo = nextLogSegmentSeqNo;
            }
        }

        // We only skip log segment sequence number validation only when no log segments found &
        // the maximum log segment sequence number is "UNASSIGNED".
        if (!logSegmentsFound &&
            (DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO == maxLogSegmentSequenceNo.getSequenceNumber())) {
            // no ledger seqno stored in /ledgers before
            LOG.info("No max ledger sequence number found while creating log segment {} for {}.",
                logSegmentSeqNo, getFullyQualifiedName());
        } else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 != logSegmentSeqNo) {
            LOG.warn("Unexpected max log segment sequence number {} for {} : list of cached segments = {}",
                new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(), getFullyQualifiedName(),
                    getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)});
            // there is max log segment number recorded there and it isn't match. throw exception.
            throw new DLIllegalStateException("Unexpected max log segment sequence number "
                + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
                + ", expected " + (logSegmentSeqNo - 1));
        }

        return logSegmentSeqNo;
    }

    protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException {
        return Utils.ioResult(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
    }

    protected CompletableFuture<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
                                                              final boolean bestEffort,
                                                              final boolean allowMaxTxID) {
        final CompletableFuture<BKLogSegmentWriter> promise = new CompletableFuture<BKLogSegmentWriter>();
        try {
            lock.checkOwnershipAndReacquire();
        } catch (LockingException e) {
            FutureUtils.completeExceptionally(promise, e);
            return promise;
        }
        fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(promise, cause);
            }

            @Override
            public void onSuccess(Versioned<List<LogSegmentMetadata>> list) {
                doStartLogSegment(txId, bestEffort, allowMaxTxID, promise);
            }
        });
        return promise;
    }

    protected void doStartLogSegment(final long txId,
                                     final boolean bestEffort,
                                     final boolean allowMaxTxID,
                                     final CompletableFuture<BKLogSegmentWriter> promise) {
        // validate the tx id
        if ((txId < 0) ||
                (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) {
            FutureUtils.completeExceptionally(promise, new IOException("Invalid Transaction Id " + txId));
            return;
        }

        long highestTxIdWritten = maxTxId.get();
        if (txId < highestTxIdWritten) {
            if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
                LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
                FutureUtils.completeExceptionally(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
                return;
            } else {
                LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
                FutureUtils.completeExceptionally(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
                return;
            }
        }

        try {
            logSegmentAllocator.allocate();
        } catch (IOException e) {
            // failed to issue an allocation request
            failStartLogSegment(promise, bestEffort, e);
            return;
        }

        // start the transaction from zookeeper
        final Transaction<Object> txn = streamMetadataStore.newTransaction();

        // failpoint injected before creating ledger
        try {
            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
        } catch (IOException ioe) {
            failStartLogSegment(promise, bestEffort, ioe);
            return;
        }

        logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER)
                .whenComplete(new FutureEventListener<LogSegmentEntryWriter>() {

            @Override
            public void onSuccess(LogSegmentEntryWriter entryWriter) {
                // try-obtain succeed
                createInprogressLogSegment(
                        txn,
                        txId,
                        entryWriter,
                        bestEffort,
                        promise);
            }

            @Override
            public void onFailure(Throwable cause) {
                failStartLogSegment(promise, bestEffort, cause);
            }
        });
    }

    private void failStartLogSegment(CompletableFuture<BKLogSegmentWriter> promise,
                                     boolean bestEffort,
                                     Throwable cause) {
        if (bestEffort) {
            FutureUtils.complete(promise, null);
        } else {
            FutureUtils.completeExceptionally(promise, cause);
        }
    }

    // once the ledger handle is obtained from allocator, this function should guarantee
    // either the transaction is executed or aborted. Otherwise, the ledger handle will
    // just leak from the allocation pool - hence cause "No Ledger Allocator"
    private void createInprogressLogSegment(Transaction<Object> txn,
                                            final long txId,
                                            final LogSegmentEntryWriter entryWriter,
                                            boolean bestEffort,
                                            final CompletableFuture<BKLogSegmentWriter> promise) {
        final long logSegmentSeqNo;
        try {
            FailpointUtils.checkFailPoint(
                    FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber);
            logSegmentSeqNo = assignLogSegmentSequenceNumber();
        } catch (IOException e) {
            // abort the current prepared transaction
            txn.abort(e);
            failStartLogSegment(promise, bestEffort, e);
            return;
        }

        final String inprogressZnodePath = inprogressZNode(
                entryWriter.getLogSegmentId(), txId, logSegmentSeqNo);
        final LogSegmentMetadata l =
            new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
                conf.getDLLedgerMetadataLayoutVersion(), entryWriter.getLogSegmentId(), txId)
                    .setLogSegmentSequenceNo(logSegmentSeqNo)
                    .setRegionId(regionId)
                    .setEnvelopeEntries(
                            LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion()))
                    .build();

        // Create an inprogress segment
        writeLogSegment(txn, l);

        // Try storing max sequence number.
        LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo);
        storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo, true);

        // Try storing max tx id.
        LOG.debug("Try storing MaxTxId in startLogSegment  {} {}", inprogressZnodePath, txId);
        storeMaxTxId(txn, maxTxId, txId);

        txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {

            @Override
            public void onSuccess(Void value) {
                try {
                    FutureUtils.complete(promise, new BKLogSegmentWriter(
                            getFullyQualifiedName(),
                            l.getSegmentName(),
                            conf,
                            conf.getDLLedgerMetadataLayoutVersion(),
                            entryWriter,
                            lock,
                            txId,
                            logSegmentSeqNo,
                            scheduler,
                            statsLogger,
                            perLogStatsLogger,
                            alertStatsLogger,
                            writeLimiter,
                            featureProvider,
                            dynConf));
                } catch (IOException ioe) {
                    failStartLogSegment(promise, false, ioe);
                }
            }

            @Override
            public void onFailure(Throwable cause) {
                failStartLogSegment(promise, false, cause);
            }
        }, scheduler);
    }

    boolean shouldStartNewSegment(BKLogSegmentWriter writer) {
        return rollingPolicy.shouldRollover(writer, lastLedgerRollingTimeMillis);
    }

    /**
     * Finalize a log segment. If the journal manager is currently
     * writing to a ledger, ensure that this is the ledger of the log segment
     * being finalized.
     * <p/>
     * Otherwise this is the recovery case. In the recovery case, ensure that
     * the firstTxId of the ledger matches firstTxId for the segment we are
     * trying to finalize.
     */
    CompletableFuture<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
        final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
        completeAndCloseLogSegment(writer, promise);
        return promise;
    }

    private void completeAndCloseLogSegment(final BKLogSegmentWriter writer,
                                            final CompletableFuture<LogSegmentMetadata> promise) {
        writer.asyncClose().whenComplete(new FutureEventListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                // in theory closeToFinalize should throw exception if a stream is in error.
                // just in case, add another checking here to make sure we don't close log segment is a stream is in error.
                if (writer.shouldFailCompleteLogSegment()) {
                    FutureUtils.completeExceptionally(promise,
                            new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error."));
                    return;
                }
                doCompleteAndCloseLogSegment(
                        inprogressZNodeName(writer.getLogSegmentId(), writer.getStartTxId(), writer.getLogSegmentSequenceNumber()),
                        writer.getLogSegmentSequenceNumber(),
                        writer.getLogSegmentId(),
                        writer.getStartTxId(),
                        writer.getLastTxId(),
                        writer.getPositionWithinLogSegment(),
                        writer.getLastDLSN().getEntryId(),
                        writer.getLastDLSN().getSlotId(),
                        promise);
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(promise, cause);
            }
        });
    }

    @VisibleForTesting
    LogSegmentMetadata completeAndCloseLogSegment(long logSegmentSeqNo,
                                                  long logSegmentId,
                                                  long firstTxId,
                                                  long lastTxId,
                                                  int recordCount)
        throws IOException {
        return completeAndCloseLogSegment(inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo), logSegmentSeqNo,
            logSegmentId, firstTxId, lastTxId, recordCount, -1, -1);
    }

    /**
     * Finalize a log segment. If the journal manager is currently
     * writing to a ledger, ensure that this is the ledger of the log segment
     * being finalized.
     * <p/>
     * Otherwise this is the recovery case. In the recovery case, ensure that
     * the firstTxId of the ledger matches firstTxId for the segment we are
     * trying to finalize.
     */
    LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, long logSegmentSeqNo,
                                                  long logSegmentId, long firstTxId, long lastTxId,
                                                  int recordCount, long lastEntryId, long lastSlotId)
            throws IOException {
        Stopwatch stopwatch = Stopwatch.createStarted();
        boolean success = false;
        try {
            LogSegmentMetadata completedLogSegment =
                    doCompleteAndCloseLogSegment(inprogressZnodeName, logSegmentSeqNo,
                            logSegmentId, firstTxId, lastTxId, recordCount,
                            lastEntryId, lastSlotId);
            success = true;
            return completedLogSegment;
        } finally {
            if (success) {
                closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            } else {
                closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            }
        }
    }

    protected long computeStartSequenceId(LogSegmentMetadata segment) throws IOException {
        if (!segment.isInProgress()) {
            return segment.getStartSequenceId();
        }

        long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;

        // we only record sequence id when both write version and logsegment's version support sequence id
        if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion())
                && segment.supportsSequenceId()) {
            List<LogSegmentMetadata> logSegmentDescList =
                    getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
            startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment);
        }

        return startSequenceId;
    }

    /**
     * Close log segment
     *
     * @param inprogressZnodeName
     * @param logSegmentSeqNo
     * @param logSegmentId
     * @param firstTxId
     * @param lastTxId
     * @param recordCount
     * @param lastEntryId
     * @param lastSlotId
     * @throws IOException
     */
    protected LogSegmentMetadata doCompleteAndCloseLogSegment(
            String inprogressZnodeName,
            long logSegmentSeqNo,
            long logSegmentId,
            long firstTxId,
            long lastTxId,
            int recordCount,
            long lastEntryId,
            long lastSlotId) throws IOException {
        CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
        doCompleteAndCloseLogSegment(
                inprogressZnodeName,
                logSegmentSeqNo,
                logSegmentId,
                firstTxId,
                lastTxId,
                recordCount,
                lastEntryId,
                lastSlotId,
                promise);
        return Utils.ioResult(promise);
    }

    protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName,
                                                final long logSegmentSeqNo,
                                                final long logSegmentId,
                                                final long firstTxId,
                                                final long lastTxId,
                                                final int recordCount,
                                                final long lastEntryId,
                                                final long lastSlotId,
                                                final CompletableFuture<LogSegmentMetadata> promise) {
        fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(promise, cause);
            }

            @Override
            public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
                doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
                        inprogressZnodeName,
                        logSegmentSeqNo,
                        logSegmentId,
                        firstTxId,
                        lastTxId,
                        recordCount,
                        lastEntryId,
                        lastSlotId,
                        promise);
            }
        });
    }

    private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
            final String inprogressZnodeName,
            long logSegmentSeqNo,
            long logSegmentId,
            long firstTxId,
            long lastTxId,
            int recordCount,
            long lastEntryId,
            long lastSlotId,
            final CompletableFuture<LogSegmentMetadata> promise) {
        try {
            lock.checkOwnershipAndReacquire();
        } catch (IOException ioe) {
            FutureUtils.completeExceptionally(promise, ioe);
            return;
        }

        LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
        LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);

        // validate log segment
        if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
            FutureUtils.completeExceptionally(promise, new IOException(
                "Active ledger has different ID to inprogress. "
                    + inprogressLogSegment.getLogSegmentId() + " found, "
                    + logSegmentId + " expected"));
            return;
        }
        // validate the transaction id
        if (inprogressLogSegment.getFirstTxId() != firstTxId) {
            FutureUtils.completeExceptionally(promise, new IOException("Transaction id not as expected, "
                + inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
            return;
        }
        // validate the log sequence number
        if (validateLogSegmentSequenceNumber) {
            synchronized (inprogressLSSNs) {
                if (inprogressLSSNs.isEmpty()) {
                    FutureUtils.completeExceptionally(promise, new UnexpectedException(
                            "Didn't find matched inprogress log segments when completing inprogress "
                                    + inprogressLogSegment));
                    return;
                }
                long leastInprogressLSSN = inprogressLSSNs.getFirst();
                // the log segment sequence number in metadata {@link inprogressLogSegment.getLogSegmentSequenceNumber()}
                // should be same as the sequence number we are completing (logSegmentSeqNo)
                // and
                // it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs}
                if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) ||
                        (leastInprogressLSSN != logSegmentSeqNo)) {
                    FutureUtils.completeExceptionally(promise, new UnexpectedException(
                            "Didn't find matched inprogress log segments when completing inprogress "
                                    + inprogressLogSegment));
                    return;
                }
            }
        }

        // store max sequence number.
        long maxSeqNo= Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber());
        if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo ||
                (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) {
            // ignore the case that a new inprogress log segment is pre-allocated
            // before completing current inprogress one
            LOG.info("Try storing max sequence number {} in completing {}.",
                    new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
        } else {
            LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
                    new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
            if (validateLogSegmentSequenceNumber) {
                FutureUtils.completeExceptionally(promise, new DLIllegalStateException("Unexpected max log segment sequence number "
                        + maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
                        + ", expected " + (logSegmentSeqNo - 1)));
                return;
            }
        }

        // Prepare the completion
        final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
        long startSequenceId;
        try {
            startSequenceId = computeStartSequenceId(inprogressLogSegment);
        } catch (IOException ioe) {
            FutureUtils.completeExceptionally(promise, ioe);
            return;
        }
        // write completed ledger znode
        final LogSegmentMetadata completedLogSegment =
                inprogressLogSegment.completeLogSegment(
                        pathForCompletedLedger,
                        lastTxId,
                        recordCount,
                        lastEntryId,
                        lastSlotId,
                        startSequenceId);
        setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());

        // prepare the transaction
        Transaction<Object> txn = streamMetadataStore.newTransaction();

        // create completed log segment
        writeLogSegment(txn, completedLogSegment);
        // delete inprogress log segment
        deleteLogSegment(txn, inprogressLogSegment);
        // store max sequence number
        storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
        // update max txn id.
        LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId);
        storeMaxTxId(txn, maxTxId, lastTxId);

        txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                LOG.info("Completed {} to {} for {} : {}",
                        new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
                                getFullyQualifiedName(), completedLogSegment });
                FutureUtils.complete(promise, completedLogSegment);
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(promise, cause);
            }
        }, scheduler);
    }

    public CompletableFuture<Long> recoverIncompleteLogSegments() {
        try {
            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
        } catch (IOException ioe) {
            return FutureUtils.exception(ioe);
        }
        return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).thenCompose(recoverLogSegmentsFunction);
    }

    class RecoverLogSegmentFunction implements Function<LogSegmentMetadata, CompletableFuture<LogSegmentMetadata>> {

        @Override
        public CompletableFuture<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
            if (!l.isInProgress()) {
                return FutureUtils.value(l);
            }

            LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName());
            return asyncReadLastRecord(l, true, true, true).thenCompose(
                lastRecord -> completeLogSegment(l, lastRecord));
        }

        private CompletableFuture<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
                                                              LogRecordWithDLSN lastRecord) {
            LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName());

            long endTxId = DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID;
            int recordCount = 0;
            long lastEntryId = -1;
            long lastSlotId = -1;

            if (null != lastRecord) {
                endTxId = lastRecord.getTransactionId();
                recordCount = lastRecord.getLastPositionWithinLogSegment();
                lastEntryId = lastRecord.getDlsn().getEntryId();
                lastSlotId = lastRecord.getDlsn().getSlotId();
            }

            if (endTxId == DistributedLogConstants.INVALID_TXID) {
                LOG.error("Unrecoverable corruption has occurred in segment "
                    + l.toString() + " at path " + l.getZkPath()
                    + ". Unable to continue recovery.");
                return FutureUtils.exception(new IOException("Unrecoverable corruption,"
                    + " please check logs."));
            } else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
                // TODO: Empty ledger - Ideally we should just remove it?
                endTxId = l.getFirstTxId();
            }

            CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
            doCompleteAndCloseLogSegment(
                    l.getZNodeName(),
                    l.getLogSegmentSequenceNumber(),
                    l.getLogSegmentId(),
                    l.getFirstTxId(),
                    endTxId,
                    recordCount,
                    lastEntryId,
                    lastSlotId,
                    promise);
            return promise;
        }

    }

    CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
        if (DLSN.InvalidDLSN == dlsn) {
            List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
            return FutureUtils.value(emptyList);
        }
        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
            logSegments -> setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn));
    }

    private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
                                                                                  final DLSN dlsn) {
        LOG.debug("Setting truncation status on logs older than {} from {} for {}",
                new Object[]{dlsn, logSegments, getFullyQualifiedName()});
        List<LogSegmentMetadata> truncateList = new ArrayList<LogSegmentMetadata>(logSegments.size());
        LogSegmentMetadata partialTruncate = null;
        LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn);
        for (int i = 0; i < logSegments.size(); i++) {
            LogSegmentMetadata l = logSegments.get(i);
            if (!l.isInProgress()) {
                if (l.getLastDLSN().compareTo(dlsn) < 0) {
                    LOG.debug("{}: Truncating log segment {} ", getFullyQualifiedName(), l);
                    truncateList.add(l);
                } else if (l.getFirstDLSN().compareTo(dlsn) < 0) {
                    // Can be satisfied by at most one segment
                    if (null != partialTruncate) {
                        String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l);
                        LOG.error(logMsg);
                        return FutureUtils.exception(new DLIllegalStateException(logMsg));
                    }
                    LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn});
                    partialTruncate = l;
                } else {
                    break;
                }
            } else {
                break;
            }
        }
        return setLogSegmentTruncationStatus(truncateList, partialTruncate, dlsn);
    }

    private int getNumCandidateLogSegmentsToPurge(List<LogSegmentMetadata> logSegments) {
        if (logSegments.isEmpty()) {
            return 0;
        } else {
            // we have to keep at least one completed log segment for sequence id
            int numCandidateLogSegments = 0;
            for (LogSegmentMetadata segment : logSegments) {
                if (segment.isInProgress()) {
                    break;
                } else {
                    ++numCandidateLogSegments;
                }
            }

            return numCandidateLogSegments - 1;
        }
    }

    CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
        if (minTimestampToKeep >= Utils.nowInMillis()) {
            return FutureUtils.exception(new IllegalArgumentException(
                    "Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
        }
        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
                new Function<List<LogSegmentMetadata>, CompletableFuture<List<LogSegmentMetadata>>>() {
            @Override
            public CompletableFuture<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
                List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());

                int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments);

                for (int iterator = 0; iterator < numCandidates; iterator++) {
                    LogSegmentMetadata l = logSegments.get(iterator);
                    // When application explicitly truncates segments; timestamp based purge is
                    // only used to cleanup log segments that have been marked for truncation
                    if ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
                        !l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) {
                        purgeList.add(l);
                    } else {
                        // stop truncating log segments if we find either an inprogress or a partially
                        // truncated log segment
                        break;
                    }
                }
                LOG.info("Deleting log segments older than {} for {} : {}",
                        new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList });
                return deleteLogSegments(purgeList);
            }
        });
    }

    CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
        return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
            logSegments -> {
                int numLogSegmentsToProcess;

                if (minTxIdToKeep < 0) {
                    // we are deleting the log, we can remove whole log segments
                    numLogSegmentsToProcess = logSegments.size();
                } else {
                    numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments);
                }
                List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
                for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
                    LogSegmentMetadata l = logSegments.get(iterator);
                    if ((minTxIdToKeep < 0) ||
                        ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
                        !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
                        purgeList.add(l);
                    } else {
                        // stop truncating log segments if we find either an inprogress or a partially
                        // truncated log segment
                        break;
                    }
                }
                return deleteLogSegments(purgeList);
            });
    }

    private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
            final List<LogSegmentMetadata> truncateList,
            LogSegmentMetadata partialTruncate,
            DLSN minActiveDLSN) {
        final List<LogSegmentMetadata> listToTruncate = Lists.newArrayListWithCapacity(truncateList.size() + 1);
        final List<LogSegmentMetadata> listAfterTruncated = Lists.newArrayListWithCapacity(truncateList.size() + 1);
        Transaction<Object> updateTxn = metadataUpdater.transaction();
        for(LogSegmentMetadata l : truncateList) {
            if (!l.isTruncated()) {
                LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentTruncated(updateTxn, l);
                listToTruncate.add(l);
                listAfterTruncated.add(newSegment);
            }
        }

        if (null != partialTruncate && (partialTruncate.isNonTruncated() ||
                (partialTruncate.isPartiallyTruncated() && (partialTruncate.getMinActiveDLSN().compareTo(minActiveDLSN) < 0)))) {
            LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentPartiallyTruncated(
                    updateTxn, partialTruncate, minActiveDLSN);
            listToTruncate.add(partialTruncate);
            listAfterTruncated.add(newSegment);
        }

        return updateTxn.execute().thenApply(value -> {
            for (int i = 0; i < listToTruncate.size(); i++) {
                removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
                LogSegmentMetadata newSegment = listAfterTruncated.get(i);
                addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
            }
            return listAfterTruncated;
        });
    }

    private CompletableFuture<List<LogSegmentMetadata>> deleteLogSegments(
            final List<LogSegmentMetadata> logs) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
        }
        return FutureUtils.processList(logs,
            segment -> deleteLogSegment(segment), scheduler);
    }

    private CompletableFuture<LogSegmentMetadata> deleteLogSegment(
            final LogSegmentMetadata ledgerMetadata) {
        LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName());
        final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
        final Stopwatch stopwatch = Stopwatch.createStarted();
        promise.whenComplete(new FutureEventListener<LogSegmentMetadata>() {
            @Override
            public void onSuccess(LogSegmentMetadata segment) {
                deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            }

            @Override
            public void onFailure(Throwable cause) {
                deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            }
        });
        entryStore.deleteLogSegment(ledgerMetadata)
                .whenComplete(new FutureEventListener<LogSegmentMetadata>() {
            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(promise, cause);
            }

            @Override
            public void onSuccess(LogSegmentMetadata segment) {
                deleteLogSegmentMetadata(segment, promise);
            }
        });
        return promise;
    }

    private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
                                          final CompletableFuture<LogSegmentMetadata> promise) {
        Transaction<Object> deleteTxn = metadataStore.transaction();
        metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
            @Override
            public void onCommit(Void r) {
                // purge log segment
                removeLogSegmentFromCache(segmentMetadata.getZNodeName());
                promise.complete(segmentMetadata);
            }

            @Override
            public void onAbort(Throwable t) {
                if (t instanceof LogSegmentNotFoundException) {
                    // purge log segment
                    removeLogSegmentFromCache(segmentMetadata.getZNodeName());
                    promise.complete(segmentMetadata);
                    return;
                } else {
                    LOG.error("Couldn't purge {} for {}: with error {}",
                            new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
                    promise.completeExceptionally(t);
                }
            }
        });
        deleteTxn.execute();
    }

    @Override
    public CompletableFuture<Void> asyncClose() {
        return Utils.closeSequence(scheduler,
                lock,
                logSegmentAllocator);
    }

    @Override
    public CompletableFuture<Void> asyncAbort() {
        return asyncClose();
    }

    String completedLedgerZNodeName(long firstTxId, long lastTxId, long logSegmentSeqNo) {
        if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
            return String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo);
        } else {
            return String.format("%s_%018d_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX,
                    firstTxId, lastTxId);
        }
    }

    /**
     * Get the znode path for a finalize ledger
     */
    String completedLedgerZNode(long firstTxId, long lastTxId, long logSegmentSeqNo) {
        return String.format("%s/%s", logMetadata.getLogSegmentsPath(),
                completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo));
    }

    /**
     * Get the name of the inprogress znode.
     *
     * @return name of the inprogress znode.
     */
    String inprogressZNodeName(long logSegmentId, long firstTxId, long logSegmentSeqNo) {
        if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
            // Lots of the problems are introduced due to different inprogress names with same ledger sequence number.
            return String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo);
        } else {
            return DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX + "_" + Long.toString(firstTxId, 16);
        }
    }

    /**
     * Get the znode path for the inprogressZNode
     */
    String inprogressZNode(long logSegmentId, long firstTxId, long logSegmentSeqNo) {
        return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo);
    }

    String inprogressZNode(String inprogressZNodeName) {
        return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName;
    }
}
