/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.bk;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.common.concurrent.FutureEventListener;

import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.DLUtils;

import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Transaction.OpListener;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.distributedlog.zk.ZKVersionedSetOp;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



/**
 * Allocator to allocate ledgers.
 */
public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListener<LedgerHandle>, OpListener<Version> {

    static final Logger LOG = LoggerFactory.getLogger(SimpleLedgerAllocator.class);

    enum Phase {
        ALLOCATING, ALLOCATED, HANDING_OVER, HANDED_OVER, ERROR
    }

    static class AllocationException extends IOException {

        private static final long serialVersionUID = -1111397872059426882L;

        private final Phase phase;

        public AllocationException(Phase phase, String msg) {
            super(msg);
            this.phase = phase;
        }

        public Phase getPhase() {
            return this.phase;
        }

    }

    static class ConcurrentObtainException extends AllocationException {

        private static final long serialVersionUID = -8532471098537176913L;

        public ConcurrentObtainException(Phase phase, String msg) {
            super(phase, msg);
        }
    }

    // zookeeper client
    final ZooKeeperClient zkc;
    // bookkeeper client
    final BookKeeperClient bkc;
    // znode path
    final String allocatePath;
    // allocation phase
    Phase phase = Phase.HANDED_OVER;
    // version
    ZkVersion version = new ZkVersion(-1);
    // outstanding allocation
    CompletableFuture<LedgerHandle> allocatePromise;
    // outstanding tryObtain transaction
    Transaction<Object> tryObtainTxn = null;
    OpListener<LedgerHandle> tryObtainListener = null;
    // ledger id left from previous allocation
    Long ledgerIdLeftFromPrevAllocation = null;
    // Allocated Ledger
    LedgerHandle allocatedLh = null;

    CompletableFuture<Void> closeFuture = null;
    final LinkedList<CompletableFuture<Void>> ledgerDeletions =
            new LinkedList<CompletableFuture<Void>>();

    // Ledger configuration
    private final QuorumConfigProvider quorumConfigProvider;

    static CompletableFuture<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath,
                                                                final ZooKeeperClient zkc) {
        return Utils.zkGetData(zkc, allocatePath, false)
                .thenCompose(new Function<Versioned<byte[]>, CompletionStage<Versioned<byte[]>>>() {
            @Override
            public CompletableFuture<Versioned<byte[]>> apply(Versioned<byte[]> result) {
                if (null != result && null != result.getVersion() && null != result.getValue()) {
                    return FutureUtils.value(result);
                }
                return createAllocationData(allocatePath, zkc);
            }
        });
    }

    private static CompletableFuture<Versioned<byte[]>> createAllocationData(final String allocatePath,
                                                                  final ZooKeeperClient zkc) {
        try {
            final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>();
            zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES,
                    zkc.getDefaultACL(), CreateMode.PERSISTENT,
                    new org.apache.zookeeper.AsyncCallback.Create2Callback() {
                        @Override
                        public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
                            if (KeeperException.Code.OK.intValue() == rc) {
                                promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
                                        new ZkVersion(stat.getVersion())));
                            } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                                FutureUtils.proxyTo(
                                  Utils.zkGetData(zkc, allocatePath, false),
                                  promise
                                );
                            } else {
                                promise.completeExceptionally(Utils.zkException(
                                        KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
                            }
                        }
                    }, null);
            return promise;
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            return FutureUtils.exception(Utils.zkException(e, allocatePath));
        } catch (InterruptedException e) {
            return FutureUtils.exception(Utils.zkException(e, allocatePath));
        }
    }

    public static CompletableFuture<SimpleLedgerAllocator> of(final String allocatePath,
                                                   final Versioned<byte[]> allocationData,
                                                   final QuorumConfigProvider quorumConfigProvider,
                                                   final ZooKeeperClient zkc,
                                                   final BookKeeperClient bkc) {
        if (null != allocationData && null != allocationData.getValue()
                && null != allocationData.getVersion()) {
            return FutureUtils.value(new SimpleLedgerAllocator(allocatePath, allocationData,
                    quorumConfigProvider, zkc, bkc));
        }
        return getAndCreateAllocationData(allocatePath, zkc)
            .thenApply(allocationData1 -> new SimpleLedgerAllocator(allocatePath, allocationData1,
                        quorumConfigProvider, zkc, bkc));
    }

    /**
     * Construct a ledger allocator.
     *
     * @param allocatePath
     *          znode path to store the allocated ledger.
     * @param allocationData
     *          allocation data.
     * @param quorumConfigProvider
     *          Quorum configuration provider.
     * @param zkc
     *          zookeeper client.
     * @param bkc
     *          bookkeeper client.
     */
    public SimpleLedgerAllocator(String allocatePath,
                                 Versioned<byte[]> allocationData,
                                 QuorumConfigProvider quorumConfigProvider,
                                 ZooKeeperClient zkc,
                                 BookKeeperClient bkc) {
        this.zkc = zkc;
        this.bkc = bkc;
        this.allocatePath = allocatePath;
        this.quorumConfigProvider = quorumConfigProvider;
        initialize(allocationData);
    }

    /**
     * Initialize the allocator.
     *
     * @param allocationData
     *          Allocation Data.
     */
    private void initialize(Versioned<byte[]> allocationData) {
        setVersion((ZkVersion) allocationData.getVersion());
        byte[] data = allocationData.getValue();
        if (null != data && data.length > 0) {
            // delete the allocated ledger since this is left by last allocation.
            try {
                ledgerIdLeftFromPrevAllocation = DLUtils.bytes2LogSegmentId(data);
            } catch (NumberFormatException nfe) {
                LOG.warn("Invalid data found in allocator path {} : ", allocatePath, nfe);
            }
        }

    }

    private synchronized void deleteLedgerLeftFromPreviousAllocationIfNecessary() {
        if (null != ledgerIdLeftFromPrevAllocation) {
            LOG.info("Deleting allocated-but-unused ledger left from previous allocation {}.",
                    ledgerIdLeftFromPrevAllocation);
            deleteLedger(ledgerIdLeftFromPrevAllocation);
            ledgerIdLeftFromPrevAllocation = null;
        }
    }

    @Override
    public synchronized void allocate() throws IOException {
        if (Phase.ERROR == phase) {
            throw new AllocationException(Phase.ERROR, "Error on ledger allocator for " + allocatePath);
        }
        if (Phase.HANDED_OVER == phase) {
            // issue an allocate request when ledger is already handed over.
            allocateLedger();
        }
    }

    @Override
    public synchronized CompletableFuture<LedgerHandle> tryObtain(final Transaction<Object> txn,
                                                                  final OpListener<LedgerHandle> listener) {
        if (Phase.ERROR == phase) {
            return FutureUtils.exception(new AllocationException(Phase.ERROR,
                    "Error on allocating ledger under " + allocatePath));
        }
        if (Phase.HANDING_OVER == phase || Phase.HANDED_OVER == phase || null != tryObtainTxn) {
            return FutureUtils.exception(new ConcurrentObtainException(phase,
                    "Ledger handle is handling over to another thread : " + phase));
        }
        tryObtainTxn = txn;
        tryObtainListener = listener;
        if (null != allocatedLh) {
            completeAllocation(allocatedLh);
        }
        return allocatePromise;
    }

    @Override
    public void onCommit(Version r) {
        confirmObtain((ZkVersion) r);
    }

    private void confirmObtain(ZkVersion zkVersion) {
        boolean shouldAllocate = false;
        OpListener<LedgerHandle> listenerToNotify = null;
        LedgerHandle lhToNotify = null;
        synchronized (this) {
            if (Phase.HANDING_OVER == phase) {
                setPhase(Phase.HANDED_OVER);
                setVersion(zkVersion);
                listenerToNotify = tryObtainListener;
                lhToNotify = allocatedLh;
                // reset the state
                allocatedLh = null;
                allocatePromise = null;
                tryObtainTxn = null;
                tryObtainListener = null;
                // mark flag to issue an allocation request
                shouldAllocate = true;
            }
        }
        if (null != listenerToNotify && null != lhToNotify) {
            // notify the listener
            listenerToNotify.onCommit(lhToNotify);
        }
        if (shouldAllocate) {
            // issue an allocation request
            allocateLedger();
        }
    }

    @Override
    public void onAbort(Throwable t) {
        OpListener<LedgerHandle> listenerToNotify;
        synchronized (this) {
            listenerToNotify = tryObtainListener;
            if (t instanceof KeeperException
                    && ((KeeperException) t).code() == KeeperException.Code.BADVERSION) {
                LOG.info("Set ledger allocator {} to ERROR state after hit bad version : version = {}",
                        allocatePath, getVersion());
                setPhase(Phase.ERROR);
            } else {
                if (Phase.HANDING_OVER == phase) {
                    setPhase(Phase.ALLOCATED);
                    tryObtainTxn = null;
                    tryObtainListener = null;
                }
            }
        }
        if (null != listenerToNotify) {
            listenerToNotify.onAbort(t);
        }
    }

    private synchronized void setPhase(Phase phase) {
        this.phase = phase;
        LOG.info("Ledger allocator {} moved to phase {} : version = {}.",
                new Object[] { allocatePath, phase, version });
    }

    private synchronized void allocateLedger() {
        // make sure previous allocation is already handed over.
        if (Phase.HANDED_OVER != phase) {
            LOG.error("Trying allocate ledger for {} in phase {}, giving up.", allocatePath, phase);
            return;
        }
        setPhase(Phase.ALLOCATING);
        allocatePromise = new CompletableFuture<LedgerHandle>();
        QuorumConfig quorumConfig = quorumConfigProvider.getQuorumConfig();
        bkc.createLedger(
                quorumConfig.getEnsembleSize(),
                quorumConfig.getWriteQuorumSize(),
                quorumConfig.getAckQuorumSize()
        ).whenComplete(this);
    }

    private synchronized void completeAllocation(LedgerHandle lh) {
        allocatedLh = lh;
        if (null == tryObtainTxn) {
            return;
        }
        org.apache.zookeeper.Op zkSetDataOp = org.apache.zookeeper.Op.setData(
                allocatePath, DistributedLogConstants.EMPTY_BYTES, version.getZnodeVersion());
        ZKVersionedSetOp commitOp = new ZKVersionedSetOp(zkSetDataOp, this);
        tryObtainTxn.addOp(commitOp);
        setPhase(Phase.HANDING_OVER);
        allocatePromise.complete(lh);
    }

    private synchronized void failAllocation(Throwable cause) {
        allocatePromise.completeExceptionally(cause);
    }

    @Override
    public void onSuccess(LedgerHandle lh) {
        // a ledger is created, update the ledger to allocation path before handling it over for usage.
        markAsAllocated(lh);
    }

    @Override
    public void onFailure(Throwable cause) {
        LOG.error("Error creating ledger for allocating {} : ", allocatePath, cause);
        setPhase(Phase.ERROR);
        failAllocation(cause);
    }

    private synchronized ZkVersion getVersion() {
        return version;
    }

    private synchronized void setVersion(ZkVersion newVersion) {
        Version.Occurred occurred = newVersion.compare(version);
        if (occurred == Version.Occurred.AFTER) {
            LOG.info("Ledger allocator for {} moved version from {} to {}.",
                    new Object[] { allocatePath, version, newVersion });
            version = newVersion;
        } else {
            LOG.warn("Ledger allocator for {} received an old version {}, current version is {}.",
                    new Object[] { allocatePath, newVersion , version });
        }
    }

    private void markAsAllocated(final LedgerHandle lh) {
        byte[] data = DLUtils.logSegmentId2Bytes(lh.getId());
        Utils.zkSetData(zkc, allocatePath, data, getVersion())
            .whenComplete(new FutureEventListener<ZkVersion>() {
                @Override
                public void onSuccess(ZkVersion version) {
                    // we only issue deleting ledger left from previous allocation when we could allocate first ledger
                    // as zookeeper version could prevent us doing stupid things.
                    deleteLedgerLeftFromPreviousAllocationIfNecessary();
                    setVersion(version);
                    setPhase(Phase.ALLOCATED);
                    // complete the allocation after it is marked as allocated
                    completeAllocation(lh);
                }

                @Override
                public void onFailure(Throwable cause) {
                    setPhase(Phase.ERROR);
                    deleteLedger(lh.getId());
                    LOG.error("Fail mark ledger {} as allocated under {} : ",
                            new Object[] { lh.getId(), allocatePath, cause });
                    // fail the allocation since failed to mark it as allocated
                    failAllocation(cause);
                }
            });
    }

    void deleteLedger(final long ledgerId) {
        final CompletableFuture<Void> deleteFuture = bkc.deleteLedger(ledgerId, true);
        synchronized (ledgerDeletions) {
            ledgerDeletions.add(deleteFuture);
        }
        deleteFuture.whenComplete((value, cause) -> {
            if (null != cause) {
                LOG.error("Error deleting ledger {} for ledger allocator {}, retrying : ",
                        new Object[] { ledgerId, allocatePath, cause });
                if (!isClosing()) {
                    deleteLedger(ledgerId);
                }
            }
            synchronized (ledgerDeletions) {
                ledgerDeletions.remove(deleteFuture);
            }
        });
    }

    private synchronized boolean isClosing() {
        return closeFuture != null;
    }

    private CompletableFuture<Void> closeInternal(boolean cleanup) {
        CompletableFuture<Void> closePromise;
        synchronized (this) {
            if (null != closeFuture) {
                return closeFuture;
            }
            closePromise = new CompletableFuture<Void>();
            closeFuture = closePromise;
        }
        if (!cleanup) {
            LOG.info("Abort ledger allocator without cleaning up on {}.", allocatePath);
            closePromise.complete(null);
            return closePromise;
        }
        cleanupAndClose(closePromise);
        return closePromise;
    }

    private void cleanupAndClose(final CompletableFuture<Void> closePromise) {
        LOG.info("Closing ledger allocator on {}.", allocatePath);
        final ZKTransaction txn = new ZKTransaction(zkc);
        // try obtain ledger handle
        tryObtain(txn, new OpListener<LedgerHandle>() {
            @Override
            public void onCommit(LedgerHandle r) {
                // no-op
                complete();
            }

            @Override
            public void onAbort(Throwable t) {
                // no-op
                complete();
            }

            private void complete() {
                closePromise.complete(null);
                LOG.info("Closed ledger allocator on {}.", allocatePath);
            }
        }).whenComplete(new FutureEventListener<LedgerHandle>() {
            @Override
            public void onSuccess(LedgerHandle lh) {
                // try obtain succeed
                // if we could obtain the ledger handle, we have the responsibility to close it
                deleteLedger(lh.getId());
                // wait for deletion to be completed
                List<CompletableFuture<Void>> outstandingDeletions;
                synchronized (ledgerDeletions) {
                    outstandingDeletions = Lists.newArrayList(ledgerDeletions);
                }
                FutureUtils.collect(outstandingDeletions).whenComplete(new FutureEventListener<List<Void>>() {
                    @Override
                    public void onSuccess(List<Void> values) {
                        txn.execute();
                    }

                    @Override
                    public void onFailure(Throwable cause) {
                        LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
                        closePromise.complete(null);
                    }
                });
            }

            @Override
            public void onFailure(Throwable cause) {
                LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
                closePromise.complete(null);
            }
        });

    }

    @Override
    public void start() {
        // nop
    }

    @Override
    public CompletableFuture<Void> asyncClose() {
        return closeInternal(false);
    }

    @Override
    public CompletableFuture<Void> delete() {
        return closeInternal(true).thenCompose(value -> Utils.zkDelete(zkc, allocatePath, getVersion()));
    }

}
