/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.lock;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.common.concurrent.AsyncSemaphore;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Distributed lock, using ZooKeeper.
 * <p/>
 * The lock is vulnerable to timing issues. For example, the process could
 * encounter a really long GC cycle between acquiring the lock, and writing to
 * a ledger. This could have timed out the lock, and another process could have
 * acquired the lock and started writing to bookkeeper. Therefore other
 * mechanisms are required to ensure correctness (i.e. Fencing).
 * <p/>
 * The lock is only allowed to acquire once. If the lock is acquired successfully,
 * the caller holds the ownership until it loses the ownership either because of
 * others already acquired the lock when session expired or explicitly close it.
 * <p>
 * The caller could use {@link #checkOwnership()} or {@link #checkOwnershipAndReacquire()}
 * to check if it still holds the lock. If it doesn't hold the lock, the caller should
 * give up the ownership and close the lock.
 * <h3>Metrics</h3>
 * All the lock related stats are exposed under `lock`.
 * <ul>
 * <li>lock/acquire: opstats. latency spent on acquiring a lock.
 * <li>lock/reacquire: opstats. latency spent on re-acquiring a lock.
 * <li>lock/internalTryRetries: counter. the number of retries on re-creating internal locks.
 * </ul>
 * Other internal lock related stats are also exposed under `lock`. See {@link SessionLock}
 * for details.
 */
public class ZKDistributedLock implements LockListener, DistributedLock {

    static final Logger LOG = LoggerFactory.getLogger(ZKDistributedLock.class);

    private final SessionLockFactory lockFactory;
    private final OrderedScheduler lockStateExecutor;
    private final String lockPath;
    private final long lockTimeout;
    private final DistributedLockContext lockContext = new DistributedLockContext();

    private final AsyncSemaphore lockSemaphore = new AsyncSemaphore(1, Optional.empty());
    // We have two lock acquire futures:
    // 1. lock acquire future: for the initial acquire op
    // 2. lock reacquire future: for reacquire necessary when session expires, lock is closed
    private CompletableFuture<ZKDistributedLock> lockAcquireFuture = null;
    private CompletableFuture<ZKDistributedLock> lockReacquireFuture = null;
    // following variable tracking the status of acquire process
    //   => create (internalLock) => tryLock (tryLockFuture) => waitForAcquire (lockWaiter)
    private SessionLock internalLock = null;
    private CompletableFuture<LockWaiter> tryLockFuture = null;
    private LockWaiter lockWaiter = null;
    // exception indicating if the reacquire failed
    private LockingException lockReacquireException = null;
    // closeFuture
    private volatile boolean closed = false;
    private CompletableFuture<Void> closeFuture = null;

    // A counter to track how many re-acquires happened during a lock's life cycle.
    private final AtomicInteger reacquireCount = new AtomicInteger(0);
    private final StatsLogger lockStatsLogger;
    private final OpStatsLogger acquireStats;
    private final OpStatsLogger reacquireStats;
    private final Counter internalTryRetries;

    public ZKDistributedLock(
            OrderedScheduler lockStateExecutor,
            SessionLockFactory lockFactory,
            String lockPath,
            long lockTimeout,
            StatsLogger statsLogger) {
        this.lockStateExecutor = lockStateExecutor;
        this.lockPath = lockPath;
        this.lockTimeout = lockTimeout;
        this.lockFactory = lockFactory;

        lockStatsLogger = statsLogger.scope("lock");
        acquireStats = lockStatsLogger.getOpStatsLogger("acquire");
        reacquireStats = lockStatsLogger.getOpStatsLogger("reacquire");
        internalTryRetries = lockStatsLogger.getCounter("internalTryRetries");
    }

    private LockClosedException newLockClosedException() {
        return new LockClosedException(lockPath, "Lock is already closed");
    }

    private synchronized void checkLockState() throws LockingException {
        if (closed) {
            throw newLockClosedException();
        }
        if (null != lockReacquireException) {
            throw lockReacquireException;
        }
    }

    /**
     * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter
     * list--is executed synchronously, but the lock wait itself doesn't block.
     */
    public synchronized CompletableFuture<ZKDistributedLock> asyncAcquire() {
        if (null != lockAcquireFuture) {
            return FutureUtils.exception(new UnexpectedException("Someone is already acquiring/acquired lock " + lockPath));
        }
        final CompletableFuture<ZKDistributedLock> promise = FutureUtils.createFuture();
        promise.whenComplete((zkDistributedLock, throwable) -> {
            if (null == throwable || !(throwable instanceof CancellationException)) {
                return;
            }
            lockStateExecutor.submit(lockPath, () -> asyncClose());
        });
        final Stopwatch stopwatch = Stopwatch.createStarted();
        promise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
            @Override
            public void onSuccess(ZKDistributedLock lock) {
                acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
            }
            @Override
            public void onFailure(Throwable cause) {
                acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
                // release the lock if fail to acquire
                asyncClose();
            }
        });
        this.lockAcquireFuture = promise;
        lockStateExecutor.submit(lockPath, new Runnable() {
            @Override
            public void run() {
                doAsyncAcquireWithSemaphore(promise, lockTimeout);
            }
        });
        return promise;
    }

    void doAsyncAcquireWithSemaphore(final CompletableFuture<ZKDistributedLock> acquirePromise,
                                     final long lockTimeout) {
        lockSemaphore.acquireAndRun(() -> {
            doAsyncAcquire(acquirePromise, lockTimeout);
            return acquirePromise;
        });
    }

    void doAsyncAcquire(final CompletableFuture<ZKDistributedLock> acquirePromise,
                        final long lockTimeout) {
        LOG.trace("Async Lock Acquire {}", lockPath);
        try {
            checkLockState();
        } catch (IOException ioe) {
            FutureUtils.completeExceptionally(acquirePromise, ioe);
            return;
        }

        if (haveLock()) {
            // it already hold the lock
            FutureUtils.complete(acquirePromise, this);
            return;
        }

        lockFactory
            .createLock(lockPath, lockContext)
            .whenCompleteAsync(new FutureEventListener<SessionLock>() {
            @Override
            public void onSuccess(SessionLock lock) {
                synchronized (ZKDistributedLock.this) {
                    if (closed) {
                        LOG.info("Skipping tryLocking lock {} since it is already closed", lockPath);
                        FutureUtils.completeExceptionally(acquirePromise, newLockClosedException());
                        return;
                    }
                }
                synchronized (ZKDistributedLock.this) {
                    internalLock = lock;
                    internalLock.setLockListener(ZKDistributedLock.this);
                }
                asyncTryLock(lock, acquirePromise, lockTimeout);
            }

            @Override
            public void onFailure(Throwable cause) {
                FutureUtils.completeExceptionally(acquirePromise, cause);
            }
        }, lockStateExecutor.chooseExecutor(lockPath));
    }

    void asyncTryLock(SessionLock lock,
                      final CompletableFuture<ZKDistributedLock> acquirePromise,
                      final long lockTimeout) {
        if (null != tryLockFuture) {
            tryLockFuture.cancel(true);
        }
        tryLockFuture = lock.asyncTryLock(lockTimeout, TimeUnit.MILLISECONDS);
        tryLockFuture.whenCompleteAsync(
            new FutureEventListener<LockWaiter>() {
                @Override
                public void onSuccess(LockWaiter waiter) {
                    synchronized (ZKDistributedLock.this) {
                        if (closed) {
                            LOG.info("Skipping acquiring lock {} since it is already closed", lockPath);
                            waiter
                                .getAcquireFuture()
                                .completeExceptionally(new LockingException(lockPath, "lock is already closed."));
                            FutureUtils.completeExceptionally(acquirePromise, newLockClosedException());
                            return;
                        }
                    }
                    tryLockFuture = null;
                    lockWaiter = waiter;
                    waitForAcquire(waiter, acquirePromise);
                }

                @Override
                public void onFailure(Throwable cause) {
                    FutureUtils.completeExceptionally(acquirePromise, cause);
                }
            }, lockStateExecutor.chooseExecutor(lockPath));
    }

    void waitForAcquire(final LockWaiter waiter,
                        final CompletableFuture<ZKDistributedLock> acquirePromise) {
        waiter.getAcquireFuture().whenCompleteAsync(
            new FutureEventListener<Boolean>() {
                @Override
                public void onSuccess(Boolean acquired) {
                    LOG.info("{} acquired lock {}", waiter, lockPath);
                    if (acquired) {
                        FutureUtils.complete(acquirePromise, ZKDistributedLock.this);
                    } else {
                        FutureUtils.completeExceptionally(acquirePromise,
                                new OwnershipAcquireFailedException(lockPath, waiter.getCurrentOwner()));
                    }
                }

                @Override
                public void onFailure(Throwable cause) {
                    FutureUtils.completeExceptionally(acquirePromise, cause);
                }
            }, lockStateExecutor.chooseExecutor(lockPath));
    }

    /**
     * NOTE: The {@link LockListener#onExpired()} is already executed in lock executor.
     */
    @Override
    public void onExpired() {
        try {
            reacquireLock(false);
        } catch (LockingException le) {
            // should not happen
            LOG.error("Locking exception on re-acquiring lock {} : ", lockPath, le);
        }
    }

    /**
     * Check if hold lock, if it doesn't, then re-acquire the lock.
     *
     * @throws LockingException     if the lock attempt fails
     */
    public synchronized void checkOwnershipAndReacquire() throws LockingException {
        if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
            throw new LockingException(lockPath, "check ownership before acquiring");
        }

        if (haveLock()) {
            return;
        }

        // We may have just lost the lock because of a ZK session timeout
        // not necessarily because someone else acquired the lock.
        // In such cases just try to reacquire. If that fails, it will throw
        reacquireLock(true);
    }

    /**
     * Check if lock is held.
     * If not, error out and do not reacquire. Use this in cases where there are many waiters by default
     * and reacquire is unlikley to succeed.
     *
     * @throws LockingException     if the lock attempt fails
     */
    public synchronized void checkOwnership() throws LockingException {
        if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) {
            throw new LockingException(lockPath, "check ownership before acquiring");
        }
        if (!haveLock()) {
            throw new LockingException(lockPath, "Lost lock ownership");
        }
    }

    @VisibleForTesting
    int getReacquireCount() {
        return reacquireCount.get();
    }

    @VisibleForTesting
    synchronized CompletableFuture<ZKDistributedLock> getLockReacquireFuture() {
        return lockReacquireFuture;
    }

    @VisibleForTesting
    synchronized CompletableFuture<ZKDistributedLock> getLockAcquireFuture() {
        return lockAcquireFuture;
    }

    @VisibleForTesting
    synchronized SessionLock getInternalLock() {
        return internalLock;
    }

    @VisibleForTesting
    LockWaiter getLockWaiter() {
        return lockWaiter;
    }

    synchronized boolean haveLock() {
        return !closed && internalLock != null && internalLock.isLockHeld();
    }

    void closeWaiter(final LockWaiter waiter,
                     final CompletableFuture<Void> closePromise) {
        if (null == waiter) {
            interruptTryLock(tryLockFuture, closePromise);
        } else {
            waiter.getAcquireFuture().whenCompleteAsync(
                new FutureEventListener<Boolean>() {
                    @Override
                    public void onSuccess(Boolean value) {
                        unlockInternalLock(closePromise);
                    }
                    @Override
                    public void onFailure(Throwable cause) {
                        unlockInternalLock(closePromise);
                    }
                }, lockStateExecutor.chooseExecutor(lockPath));
            waiter.getAcquireFuture().cancel(true);
        }
    }

    void interruptTryLock(final CompletableFuture<LockWaiter> tryLockFuture,
                          final CompletableFuture<Void> closePromise) {
        if (null == tryLockFuture) {
            unlockInternalLock(closePromise);
        } else {
            tryLockFuture.whenCompleteAsync(
                new FutureEventListener<LockWaiter>() {
                    @Override
                    public void onSuccess(LockWaiter waiter) {
                        closeWaiter(waiter, closePromise);
                    }
                    @Override
                    public void onFailure(Throwable cause) {
                        unlockInternalLock(closePromise);
                    }
                }, lockStateExecutor.chooseExecutor(lockPath));
            tryLockFuture.cancel(true);
        }
    }

    synchronized void unlockInternalLock(final CompletableFuture<Void> closePromise) {
        if (internalLock == null) {
            FutureUtils.complete(closePromise, null);
        } else {
            internalLock.asyncUnlock().whenComplete((value, cause) -> closePromise.complete(null));
        }
    }

    @Override
    public CompletableFuture<Void> asyncClose() {
        final CompletableFuture<Void> closePromise;
        synchronized (this) {
            if (closed) {
                return closeFuture;
            }
            closed = true;
            closeFuture = closePromise = new CompletableFuture<Void>();
        }
        final CompletableFuture<Void> closeWaiterFuture = new CompletableFuture<Void>();
        closeWaiterFuture.whenCompleteAsync(new FutureEventListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                complete();
            }
            @Override
            public void onFailure(Throwable cause) {
                complete();
            }

            private void complete() {
                FutureUtils.complete(closePromise, null);
            }
        }, lockStateExecutor.chooseExecutor(lockPath));
        lockStateExecutor.submit(lockPath, new Runnable() {
            @Override
            public void run() {
                closeWaiter(lockWaiter, closeWaiterFuture);
            }
        });
        return closePromise;
    }

    void internalReacquireLock(final AtomicInteger numRetries,
                               final long lockTimeout,
                               final CompletableFuture<ZKDistributedLock> reacquirePromise) {
        lockStateExecutor.submit(lockPath, new Runnable() {
            @Override
            public void run() {
                doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise);
            }
        });
    }

    void doInternalReacquireLock(final AtomicInteger numRetries,
                                 final long lockTimeout,
                                 final CompletableFuture<ZKDistributedLock> reacquirePromise) {
        internalTryRetries.inc();
        CompletableFuture<ZKDistributedLock> tryPromise = new CompletableFuture<ZKDistributedLock>();
        tryPromise.whenComplete(new FutureEventListener<ZKDistributedLock>() {
            @Override
            public void onSuccess(ZKDistributedLock lock) {
                FutureUtils.complete(reacquirePromise, lock);
            }

            @Override
            public void onFailure(Throwable cause) {
                if (cause instanceof OwnershipAcquireFailedException) {
                    // the lock has been acquired by others
                    FutureUtils.completeExceptionally(reacquirePromise, cause);
                } else {
                    if (numRetries.getAndDecrement() > 0 && !closed) {
                        internalReacquireLock(numRetries, lockTimeout, reacquirePromise);
                    } else {
                        FutureUtils.completeExceptionally(reacquirePromise, cause);
                    }
                }
            }
        });
        doAsyncAcquireWithSemaphore(tryPromise, 0);
    }

    private CompletableFuture<ZKDistributedLock> reacquireLock(boolean throwLockAcquireException) throws LockingException {
        final Stopwatch stopwatch = Stopwatch.createStarted();
        CompletableFuture<ZKDistributedLock> lockPromise;
        synchronized (this) {
            if (closed) {
                throw newLockClosedException();
            }
            if (null != lockReacquireException) {
                if (throwLockAcquireException) {
                    throw lockReacquireException;
                } else {
                    return null;
                }
            }
            if (null != lockReacquireFuture) {
                return lockReacquireFuture;
            }
            LOG.info("reacquiring lock at {}", lockPath);
            lockReacquireFuture = lockPromise = new CompletableFuture<ZKDistributedLock>();
            lockReacquireFuture.whenComplete(new FutureEventListener<ZKDistributedLock>() {
                @Override
                public void onSuccess(ZKDistributedLock lock) {
                    // if re-acquire successfully, clear the state.
                    synchronized (ZKDistributedLock.this) {
                        lockReacquireFuture = null;
                    }
                    reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
                }

                @Override
                public void onFailure(Throwable cause) {
                    synchronized (ZKDistributedLock.this) {
                        if (cause instanceof LockingException) {
                            lockReacquireException = (LockingException) cause;
                        } else {
                            lockReacquireException = new LockingException(lockPath,
                                    "Exception on re-acquiring lock", cause);
                        }
                    }
                    reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
                }
            });
        }
        reacquireCount.incrementAndGet();
        internalReacquireLock(new AtomicInteger(Integer.MAX_VALUE), 0, lockPromise);
        return lockPromise;
    }

}
