/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.distributedlog.lock;

import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.lock.ZKSessionLock.State;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import com.twitter.util.Await;
import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.distributedlog.lock.ZKSessionLock.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Distributed Lock Tests
 */
public class TestZKSessionLock extends ZooKeeperClusterTestCase {

    @Rule
    public TestName testNames = new TestName();

    static final Logger logger = LoggerFactory.getLogger(TestZKSessionLock.class);

    private final static int sessionTimeoutMs = 2000;

    private ZooKeeperClient zkc;
    private ZooKeeperClient zkc0; // used for checking
    private OrderedScheduler lockStateExecutor;

    @Before
    public void setup() throws Exception {
        zkc = ZooKeeperClientBuilder.newBuilder()
                .name("zkc")
                .uri(DLMTestUtil.createDLMURI(zkPort, "/"))
                .sessionTimeoutMs(sessionTimeoutMs)
                .zkServers(zkServers)
                .zkAclId(null)
                .build();
        zkc0 = ZooKeeperClientBuilder.newBuilder()
                .name("zkc0")
                .uri(DLMTestUtil.createDLMURI(zkPort, "/"))
                .sessionTimeoutMs(sessionTimeoutMs)
                .zkServers(zkServers)
                .zkAclId(null)
                .build();
        lockStateExecutor = OrderedScheduler.newBuilder()
                .corePoolSize(1)
                .build();
    }

    @After
    public void teardown() throws Exception {
        zkc.close();
        zkc0.close();
        lockStateExecutor.shutdown();
    }

    private static void createLockPath(ZooKeeper zk, String lockPath) throws Exception {
        zk.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    private static String createLockNodeV1(ZooKeeper zk, String lockPath, String clientId) throws Exception {
        return zk.create(getLockPathPrefixV1(lockPath), serializeClientId(clientId),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private static String createLockNodeV2(ZooKeeper zk, String lockPath, String clientId) throws Exception {
        return zk.create(getLockPathPrefixV2(lockPath, clientId), serializeClientId(clientId),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private static String createLockNodeV3(ZooKeeper zk, String lockPath, String clientId) throws Exception {
        return zk.create(getLockPathPrefixV3(lockPath, clientId, zk.getSessionId()), serializeClientId(clientId),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    private static String createLockNodeWithBadNodeName(ZooKeeper zk, String lockPath, String clientId, String badNodeName)
            throws Exception {
        return zk.create(lockPath + "/" + badNodeName, serializeClientId(clientId),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }

    private static List<String> getLockWaiters(ZooKeeperClient zkc, String lockPath) throws Exception {
        List<String> children = zkc.get().getChildren(lockPath, false);
        Collections.sort(children, ZKSessionLock.MEMBER_COMPARATOR);
        return children;
    }

    @Test(timeout = 60000)
    public void testParseClientID() throws Exception {
        ZooKeeper zk = zkc.get();

        String lockPath = "/test-parse-clientid";
        String clientId = "test-parse-clientid-" + System.currentTimeMillis();
        Pair<String, Long> lockId = Pair.of(clientId, zk.getSessionId());

        createLockPath(zk, lockPath);

        // Correct data
        String node1 = getLockIdFromPath(createLockNodeV1(zk, lockPath, clientId));
        String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
        String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));

        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));

        // Bad Lock Node Name
        String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
        String node5 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode"));
        String node6 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode"));
        String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
        String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));

        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
        assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));

        // Malformed Node Name
        String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
        assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));
    }

    @Test(timeout = 60000)
    public void testParseMemberID() throws Exception {
        assertEquals(Integer.MAX_VALUE, parseMemberID("badnode"));
        assertEquals(Integer.MAX_VALUE, parseMemberID("badnode_badnode"));
        assertEquals(0, parseMemberID("member_000000"));
        assertEquals(123, parseMemberID("member_000123"));
    }

    @Test(timeout = 60000)
    public void testAreLockWaitersInSameSession() throws Exception {
        ZooKeeper zk = zkc.get();

        String lockPath = "/test-are-lock-waiters-in-same-session";
        String clientId1 = "test-are-lock-waiters-in-same-session-1";
        String clientId2 = "test-are-lock-waiters-in-same-session-2";

        createLockPath(zk, lockPath);

        String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
        String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2));
        String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));

        assertEquals(node1 + " and " + node3 + " should be in same session.",
                true, areLockWaitersInSameSession(node1, node3));
        assertEquals(node1 + " and " + node2 + " should be not in same session.",
                false, areLockWaitersInSameSession(node1, node2));
        assertEquals(node3 + " and " + node2 + " should be not in same session.",
                false, areLockWaitersInSameSession(node3, node2));
    }

    @Test(timeout = 60000)
    public void testExecuteLockAction() throws Exception {
        String lockPath = "/test-execute-lock-action";
        String clientId = "test-execute-lock-action-" + System.currentTimeMillis();

        ZKSessionLock lock =
                new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);

        final AtomicInteger counter = new AtomicInteger(0);

        // lock action would be executed in same epoch
        final CountDownLatch latch1 = new CountDownLatch(1);
        lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
            @Override
            public void execute() {
                counter.incrementAndGet();
                latch1.countDown();
            }

            @Override
            public String getActionName() {
                return "increment1";
            }
        });
        latch1.await();
        assertEquals("counter should be increased in same epoch", 1, counter.get());

        // lock action would not be executed in same epoch
        final CountDownLatch latch2 = new CountDownLatch(1);
        lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
            @Override
            public void execute() {
                counter.incrementAndGet();
            }

            @Override
            public String getActionName() {
                return "increment2";
            }
        });
        lock.executeLockAction(lock.getEpoch().get(), new LockAction() {
            @Override
            public void execute() {
                latch2.countDown();
            }

            @Override
            public String getActionName() {
                return "countdown";
            }
        });
        latch2.await();
        assertEquals("counter should not be increased in different epochs", 1, counter.get());

        // lock action would not be executed in same epoch and promise would be satisfied with exception
        Promise<BoxedUnit> promise = new Promise<BoxedUnit>();
        lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
            @Override
            public void execute() {
                counter.incrementAndGet();
            }

            @Override
            public String getActionName() {
                return "increment3";
            }
        }, promise);
        try {
            Await.result(promise);
            fail("Should satisfy promise with epoch changed exception.");
        } catch (EpochChangedException ece) {
            // expected
        }
        assertEquals("counter should not be increased in different epochs", 1, counter.get());

        lockStateExecutor.shutdown();
    }

    /**
     * Test lock after unlock is called.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testLockAfterUnlock() throws Exception {
        String lockPath = "/test-lock-after-unlock";
        String clientId = "test-lock-after-unlock";

        ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
        lock.unlock();
        assertEquals(State.CLOSED, lock.getLockState());

        try {
            lock.tryLock(0, TimeUnit.MILLISECONDS);
            fail("Should fail on tryLock since lock state has changed.");
        } catch (LockStateChangedException lsce) {
            // expected
        }
        assertEquals(State.CLOSED, lock.getLockState());

        try {
            lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            fail("Should fail on tryLock immediately if lock state has changed.");
        } catch (LockStateChangedException lsce) {
            // expected
        }
        assertEquals(State.CLOSED, lock.getLockState());
    }

    class DelayFailpointAction extends FailpointUtils.AbstractFailPointAction {
        long timeout;
        DelayFailpointAction(long timeout) {
            this.timeout = timeout;
        }
        @Override
        public boolean checkFailPoint() throws IOException {
            try {
                Thread.sleep(timeout);
            } catch (InterruptedException ie) {
            }
            return true;
        }
    }

    /**
     * Test unlock timeout.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testUnlockTimeout() throws Exception {
        String name = testNames.getMethodName();
        String lockPath = "/" + name;
        String clientId = name;

        createLockPath(zkc.get(), lockPath);

        ZKSessionLock lock = new ZKSessionLock(
                zkc, lockPath, clientId, lockStateExecutor,
                1*1000 /* op timeout */, NullStatsLogger.INSTANCE,
                new DistributedLockContext());

        lock.tryLock(0, TimeUnit.MILLISECONDS);
        assertEquals(State.CLAIMED, lock.getLockState());

        try {
            FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockUnlockCleanup,
                                        new DelayFailpointAction(60*60*1000));

            lock.unlock();
            assertEquals(State.CLOSING, lock.getLockState());
        } finally {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockUnlockCleanup);
        }
    }

    /**
     * Test try-create after close race condition.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testTryCloseRaceCondition() throws Exception {
        String name = testNames.getMethodName();
        String lockPath = "/" + name;
        String clientId = name;

        createLockPath(zkc.get(), lockPath);

        ZKSessionLock lock = new ZKSessionLock(
                zkc, lockPath, clientId, lockStateExecutor,
                1*1000 /* op timeout */, NullStatsLogger.INSTANCE,
                new DistributedLockContext());

        try {
            FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition,
                                        FailpointUtils.DEFAULT_ACTION);

            lock.tryLock(0, TimeUnit.MILLISECONDS);
        } catch (LockClosedException ex) {
            ;
        } finally {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryCloseRaceCondition);
        }

        assertEquals(State.CLOSED, lock.getLockState());
        List<String> children = getLockWaiters(zkc, lockPath);
        assertEquals(0, children.size());
    }

    /**
     * Test try acquire timeout.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testTryAcquireTimeout() throws Exception {
        String name = testNames.getMethodName();
        String lockPath = "/" + name;
        String clientId = name;

        createLockPath(zkc.get(), lockPath);

        ZKSessionLock lock = new ZKSessionLock(
                zkc, lockPath, clientId, lockStateExecutor,
                1 /* op timeout */, NullStatsLogger.INSTANCE,
                new DistributedLockContext());

        try {
            FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_LockTryAcquire,
                                        new DelayFailpointAction(60*60*1000));

            lock.tryLock(0, TimeUnit.MILLISECONDS);
            assertEquals(State.CLOSED, lock.getLockState());
        } catch (LockingException le) {
        } catch (Exception e) {
            fail("expected locking exception");
        } finally {
            FailpointUtils.removeFailpoint(FailpointUtils.FailPointName.FP_LockTryAcquire);
        }
    }

    @Test(timeout = 60000)
    public void testBasicLockUnlock0() throws Exception {
        testBasicLockUnlock(0);
    }

    @Test(timeout = 60000)
    public void testBasicLockUnlock1() throws Exception {
        testBasicLockUnlock(Long.MAX_VALUE);
    }

    /**
     * Test Basic Lock and Unlock
     *
     * - lock should succeed if there is no lock held
     * - lock should fail on a success lock
     * - unlock should release the held lock
     *
     * @param timeout
     *          timeout to wait for the lock
     * @throws Exception
     */
    private void testBasicLockUnlock(long timeout) throws Exception {
        String lockPath = "/test-basic-lock-unlock-" + timeout + System.currentTimeMillis();
        String clientId = "test-basic-lock-unlock";

        createLockPath(zkc.get(), lockPath);

        ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
        // lock
        lock.tryLock(timeout, TimeUnit.MILLISECONDS);
        // verification after lock
        assertEquals(State.CLAIMED, lock.getLockState());
        List<String> children = getLockWaiters(zkc, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));

        // lock should fail on a success lock
        try {
            lock.tryLock(timeout, TimeUnit.MILLISECONDS);
            fail("Should fail on locking a failure lock.");
        } catch (LockStateChangedException lsce) {
            // expected
        }
        assertEquals(State.CLAIMED, lock.getLockState());
        children = getLockWaiters(zkc, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));

        // unlock
        lock.unlock();
        // verification after unlock
        assertEquals(State.CLOSED, lock.getLockState());
        assertEquals(0, getLockWaiters(zkc, lockPath).size());
    }

    /**
     * Test lock on non existed lock.
     *
     * - lock should fail on a non existed lock.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testLockOnNonExistedLock() throws Exception {
        String lockPath = "/test-lock-on-non-existed-lock";
        String clientId = "test-lock-on-non-existed-lock";

        ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
        // lock
        try {
            lock.tryLock(0, TimeUnit.MILLISECONDS);
            fail("Should fail on locking a non-existed lock.");
        } catch (LockingException le) {
            Throwable cause = le.getCause();
            assertTrue(cause instanceof KeeperException);
            assertEquals(KeeperException.Code.NONODE, ((KeeperException) cause).code());
        }
        assertEquals(State.CLOSED, lock.getLockState());

        // lock should failed on a failure lock
        try {
            lock.tryLock(0, TimeUnit.MILLISECONDS);
            fail("Should fail on locking a failure lock.");
        } catch (LockStateChangedException lsce) {
            // expected
        }
        assertEquals(State.CLOSED, lock.getLockState());
    }

    @Test(timeout = 60000)
    public void testLockWhenSomeoneHeldLock0() throws Exception {
        testLockWhenSomeoneHeldLock(0);
    }

    @Test(timeout = 60000)
    public void testLockWhenSomeoneHeldLock1() throws Exception {
        testLockWhenSomeoneHeldLock(500);
    }

    /**
     * Test lock if the lock is already held by someone else. Any lock in this situation will
     * fail with current owner.
     *
     * @param timeout
     *          timeout to wait for the lock
     * @throws Exception
     */
    private void testLockWhenSomeoneHeldLock(long timeout) throws Exception {
        String lockPath = "/test-lock-nowait-" + timeout + "-" + System.currentTimeMillis();
        String clientId0 = "test-lock-nowait-0-" + System.currentTimeMillis();
        String clientId1 = "test-lock-nowait-1-" + System.currentTimeMillis();
        String clientId2 = "test-lock-nowait-2-" + System.currentTimeMillis();

        createLockPath(zkc.get(), lockPath);

        ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
        ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);

        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        // verification after lock0 lock
        assertEquals(State.CLAIMED, lock0.getLockState());
        List<String> children = getLockWaiters(zkc0, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

        try {
            lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
            fail("lock1 should fail on locking since lock0 is holding the lock.");
        } catch (OwnershipAcquireFailedException oafe) {
            assertEquals(lock0.getLockId().getLeft(), oafe.getCurrentOwner());
        }
        // verification after lock1 tryLock
        assertEquals(State.CLAIMED, lock0.getLockState());
        assertEquals(State.CLOSED, lock1.getLockState());
        children = getLockWaiters(zkc0, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

        lock0.unlock();
        // verification after unlock lock0
        assertEquals(State.CLOSED, lock0.getLockState());
        assertEquals(0, getLockWaiters(zkc, lockPath).size());

        ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId2, lockStateExecutor);
        lock2.tryLock(timeout, TimeUnit.MILLISECONDS);
        // verification after lock2 lock
        assertEquals(State.CLOSED, lock0.getLockState());
        assertEquals(State.CLOSED, lock1.getLockState());
        assertEquals(State.CLAIMED, lock2.getLockState());
        children = getLockWaiters(zkc, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock2.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));

        lock2.unlock();
    }

    @Test(timeout = 60000)
    public void testLockWhenPreviousLockZnodeStillExists() throws Exception {
        String lockPath = "/test-lock-when-previous-lock-znode-still-exists-" +
                System.currentTimeMillis();
        String clientId = "client-id";

        ZooKeeper zk = zkc.get();

        createLockPath(zk, lockPath);

        final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);
        // lock0 lock
        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

        // simulate lock0 expires but znode still exists
        final DistributedLockContext context1 = new DistributedLockContext();
        context1.addLockId(lock0.getLockId());

        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
                60000, NullStatsLogger.INSTANCE, context1);
        lock1.tryLock(0L, TimeUnit.MILLISECONDS);
        assertEquals(State.CLAIMED, lock1.getLockState());
        lock1.unlock();

        final DistributedLockContext context2 = new DistributedLockContext();
        context2.addLockId(lock0.getLockId());

        final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor,
                60000, NullStatsLogger.INSTANCE, context2);
        lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        assertEquals(State.CLAIMED, lock2.getLockState());
        lock2.unlock();

        lock0.unlock();
    }

    @Test(timeout = 60000)
    public void testWaitForLockUnlock() throws Exception {
        testWaitForLockReleased("/test-wait-for-lock-unlock", true);
    }

    @Test(timeout = 60000)
    public void testWaitForLockExpired() throws Exception {
        testWaitForLockReleased("/test-wait-for-lock-expired", false);
    }

    /**
     * Test lock wait for the lock owner to release the lock. The lock waiter should acquire lock successfully
     * if the lock owner unlock or it is expired.
     *
     * @param lockPath
     *          lock path
     * @param isUnlock
     *          whether to unlock or expire the lock
     * @throws Exception
     */
    private void testWaitForLockReleased(String lockPath, boolean isUnlock) throws Exception {
        String clientId0 = "test-wait-for-lock-released-0-" + System.currentTimeMillis();
        String clientId1 = "test-wait-for-lock-released-1-" + System.currentTimeMillis();

        createLockPath(zkc.get(), lockPath);

        final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);

        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        // verification after lock0 lock
        assertEquals(State.CLAIMED, lock0.getLockState());
        List<String> children = getLockWaiters(zkc0, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

        final CountDownLatch lock1DoneLatch = new CountDownLatch(1);

        Thread lock1Thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    lock1DoneLatch.countDown();
                } catch (LockingException e) {
                    logger.error("Failed on locking lock1 : ", e);
                }
            }
        }, "lock1-thread");
        lock1Thread.start();

        // ensure lock1 is waiting for lock0
        children = awaitWaiters(2, zkc, lockPath);

        if (isUnlock) {
            lock0.unlock();
        } else {
            ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
        }

        lock1DoneLatch.await();
        lock1Thread.join();

        // verification after lock2 lock
        if (isUnlock) {
            assertEquals(State.CLOSED, lock0.getLockState());
        } else {
            assertEquals(State.EXPIRED, lock0.getLockState());
        }
        assertEquals(State.CLAIMED, lock1.getLockState());
        children = getLockWaiters(zkc, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));

        lock1.unlock();
    }

    /**
     * Test session expired after claimed the lock: lock state should be changed to expired and notify
     * the lock listener about expiry.
     *
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testLockListenerOnExpired() throws Exception {
        String lockPath = "/test-lock-listener-on-expired";
        String clientId = "test-lock-listener-on-expired-" + System.currentTimeMillis();

        createLockPath(zkc.get(), lockPath);

        final CountDownLatch expiredLatch = new CountDownLatch(1);
        LockListener listener = new LockListener() {
            @Override
            public void onExpired() {
                expiredLatch.countDown();
            }
        };
        final ZKSessionLock lock =
                new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor).setLockListener(listener);
        lock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        // verification after lock
        assertEquals(State.CLAIMED, lock.getLockState());
        List<String> children = getLockWaiters(zkc, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));

        ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
        expiredLatch.await();
        assertEquals(State.EXPIRED, lock.getLockState());
        children = getLockWaiters(zkc, lockPath);
        assertEquals(0, children.size());

        try {
            lock.tryLock(0, TimeUnit.MILLISECONDS);
            fail("Should fail on tryLock since lock state has changed.");
        } catch (LockStateChangedException lsce) {
            // expected
        }

        lock.unlock();
    }

    @Test(timeout = 60000)
    public void testSessionExpiredBeforeLock0() throws Exception {
        testSessionExpiredBeforeLock(0);
    }

    @Test(timeout = 60000)
    public void testSessionExpiredBeforeLock1() throws Exception {
        testSessionExpiredBeforeLock(Long.MAX_VALUE);
    }

    /**
     * Test Session Expired Before Lock does locking. The lock should be closed since
     * all zookeeper operations would be failed.
     *
     * @param timeout
     *          timeout to wait for the lock
     * @throws Exception
     */
    private void testSessionExpiredBeforeLock(long timeout) throws Exception {
        String lockPath = "/test-session-expired-before-lock-" + timeout + "-" + System.currentTimeMillis();
        String clientId = "test-session-expired-before-lock-" + System.currentTimeMillis();

        createLockPath(zkc.get(), lockPath);
        final AtomicInteger expireCounter = new AtomicInteger(0);
        final CountDownLatch expiredLatch = new CountDownLatch(1);
        LockListener listener = new LockListener() {
            @Override
            public void onExpired() {
                expireCounter.incrementAndGet();
            }
        };
        final ZKSessionLock lock = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor)
                .setLockListener(listener);
        // expire session
        ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
        // submit a runnable to lock state executor to ensure any state changes happened when session expired
        lockStateExecutor.submit(lockPath, new SafeRunnable() {
            @Override
            public void safeRun() {
                expiredLatch.countDown();
            }
        });
        expiredLatch.await();
        // no watcher was registered if never acquired lock successfully
        assertEquals(State.INIT, lock.getLockState());
        try {
            lock.tryLock(timeout, TimeUnit.MILLISECONDS);
            fail("Should fail locking using an expired lock");
        } catch (LockingException le) {
            assertTrue(le.getCause() instanceof KeeperException.SessionExpiredException);
        }
        assertEquals(State.CLOSED, lock.getLockState());
        List<String> children = getLockWaiters(zkc, lockPath);
        assertEquals(0, children.size());
    }

    @Test(timeout = 60000)
    public void testSessionExpiredForLockWaiter() throws Exception {
        String lockPath = "/test-session-expired-for-lock-waiter";
        String clientId0 = "test-session-expired-for-lock-waiter-0";
        String clientId1 = "test-session-expired-for-lock-waiter-1";

        createLockPath(zkc.get(), lockPath);

        final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        assertEquals(State.CLAIMED, lock0.getLockState());
        List<String> children = getLockWaiters(zkc0, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
        final CountDownLatch lock1DoneLatch = new CountDownLatch(1);

        Thread lock1Thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                } catch (OwnershipAcquireFailedException oafe) {
                    lock1DoneLatch.countDown();
                } catch (LockingException e) {
                    logger.error("Failed on locking lock1 : ", e);
                }
            }
        }, "lock1-thread");
        lock1Thread.start();

        // check lock1 is waiting for lock0
        children = awaitWaiters(2, zkc, lockPath);

        assertEquals(2, children.size());
        assertEquals(State.CLAIMED, lock0.getLockState());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
        awaitState(State.WAITING, lock1);
        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));

        // expire lock1
        ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);

        lock1DoneLatch.countDown();
        lock1Thread.join();
        assertEquals(State.CLAIMED, lock0.getLockState());
        assertEquals(State.CLOSED, lock1.getLockState());
        children = getLockWaiters(zkc0, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
    }

    public void awaitState(State state, ZKSessionLock lock) throws InterruptedException {
        while (lock.getLockState() != state) {
            Thread.sleep(50);
        }
    }

    public List<String> awaitWaiters(int waiters, ZooKeeperClient zkc, String lockPath) throws Exception {
        List<String> children = getLockWaiters(zkc, lockPath);
        while (children.size() < waiters) {
            Thread.sleep(50);
            children = getLockWaiters(zkc, lockPath);
        }
        return children;
    }

    @Test(timeout = 60000)
    public void testLockUseSameClientIdButDifferentSessions0() throws Exception {
        testLockUseSameClientIdButDifferentSessions(true);
    }

    @Test(timeout = 60000)
    public void testLockUseSameClientIdButDifferentSessions1() throws Exception {
        testLockUseSameClientIdButDifferentSessions(false);
    }

    private void testLockUseSameClientIdButDifferentSessions(boolean isUnlock) throws Exception {
        String lockPath = "/test-lock-use-same-client-id-but-different-sessions-" + isUnlock + System.currentTimeMillis();
        String clientId = "test-lock-use-same-client-id-but-different-sessions";

        createLockPath(zkc.get(), lockPath);

        final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);

        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        // lock1_0 couldn't claim ownership since owner is in a different zk session.
        final ZKSessionLock lock1_0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
        try {
            lock1_0.tryLock(0, TimeUnit.MILLISECONDS);
            fail("Should fail locking since the lock is held in a different zk session.");
        } catch (OwnershipAcquireFailedException oafe) {
            assertEquals(clientId, oafe.getCurrentOwner());
        }
        assertEquals(State.CLOSED, lock1_0.getLockState());
        List<String> children = getLockWaiters(zkc0, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

        // lock1_1 would wait the ownership
        final ZKSessionLock lock1_1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
        final CountDownLatch lock1DoneLatch = new CountDownLatch(1);

        Thread lock1Thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1_1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    lock1DoneLatch.countDown();
                } catch (LockingException e) {
                    logger.error("Failed on locking lock1 : ", e);
                }
            }
        }, "lock1-thread");
        lock1Thread.start();

        // check lock1 is waiting for lock0
        children = awaitWaiters(2, zkc, lockPath);

        logger.info("Found {} lock waiters : {}", children.size(), children);

        assertEquals(2, children.size());
        assertEquals(State.CLAIMED, lock0.getLockState());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
        awaitState(State.WAITING, lock1_1);
        assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));

        if (isUnlock) {
            lock0.unlock();
        } else {
            ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
        }
        lock1DoneLatch.await();
        lock1Thread.join();

        // verification
        if (isUnlock) {
            assertEquals(State.CLOSED, lock0.getLockState());
        } else {
            assertEquals(State.EXPIRED, lock0.getLockState());
        }
        assertEquals(State.CLAIMED, lock1_1.getLockState());
        children = getLockWaiters(zkc, lockPath);
        assertEquals(1, children.size());
        assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));

        lock1_1.unlock();
    }

    @Test(timeout = 60000)
    public void testLockWithMultipleSiblingWaiters() throws Exception {
        String lockPath = "/test-lock-with-multiple-sibling-waiters";
        String clientId = "client-id";

        createLockPath(zkc.get(), lockPath);

        final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
        final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);

        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

        List<String> children = awaitWaiters(3, zkc, lockPath);

        assertEquals(3, children.size());
        assertEquals(State.CLAIMED, lock0.getLockState());
        assertEquals(State.CLAIMED, lock1.getLockState());
        assertEquals(State.CLAIMED, lock2.getLockState());

        lock0.unlock();
        lock1.unlock();
        lock2.unlock();
    }

    /**
     * Immediate lock and unlock first lock
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId0() throws Exception {
        testLockWhenSiblingUseDifferentLockId(0, true);
    }

    /**
     * Immediate lock and expire first lock
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId1() throws Exception {
        testLockWhenSiblingUseDifferentLockId(0, false);
    }

    /**
     * Wait Lock and unlock lock0_0 and lock1
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId2() throws Exception {
        testLockWhenSiblingUseDifferentLockId(Long.MAX_VALUE, true);
    }

    /**
     * Wait Lock and expire first & third lock
     * @throws Exception
     */
    @Test(timeout = 60000)
    public void testLockWhenSiblingUseDifferentLockId3() throws Exception {
        testLockWhenSiblingUseDifferentLockId(Long.MAX_VALUE, false);
    }

    private void testLockWhenSiblingUseDifferentLockId(long timeout, final boolean isUnlock) throws Exception {
        String lockPath = "/test-lock-when-sibling-use-different-lock-id-" + timeout
                + "-" + isUnlock + "-" + System.currentTimeMillis();
        String clientId0 = "client-id-0";
        String clientId1 = "client-id-1";

        createLockPath(zkc.get(), lockPath);

        final ZKSessionLock lock0_0 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
        final ZKSessionLock lock0_1 = new ZKSessionLock(zkc0, lockPath, clientId0, lockStateExecutor);
        final ZKSessionLock lock1   = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);

        lock0_0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

        // lock1 wait for the lock ownership.
        final CountDownLatch lock1DoneLatch = new CountDownLatch(1);
        Thread lock1Thread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                    lock1DoneLatch.countDown();
                } catch (LockingException e) {
                    logger.error("Failed on locking lock1 : ", e);
                }
            }
        }, "lock1-thread");
        lock1Thread.start();

        // check lock1 is waiting for lock0_0
        List<String> children = awaitWaiters(2, zkc, lockPath);

        assertEquals(2, children.size());
        assertEquals(State.CLAIMED, lock0_0.getLockState());
        assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
        awaitState(State.WAITING, lock1);
        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));

        final CountDownLatch lock0DoneLatch = new CountDownLatch(1);
        final AtomicReference<String> ownerFromLock0 = new AtomicReference<String>(null);
        Thread lock0Thread = null;
        if (timeout == 0) {
            try {
                lock0_1.tryLock(0, TimeUnit.MILLISECONDS);
                fail("Should fail on locking if sibling is using differnt lock id.");
            } catch (OwnershipAcquireFailedException oafe) {
                assertEquals(clientId0, oafe.getCurrentOwner());
            }
            assertEquals(State.CLOSED, lock0_1.getLockState());
            children = getLockWaiters(zkc, lockPath);
            assertEquals(2, children.size());
            assertEquals(State.CLAIMED, lock0_0.getLockState());
            assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
            assertEquals(State.WAITING, lock1.getLockState());
            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
        } else {
            lock0Thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock0_1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
                        if (isUnlock) {
                            lock0DoneLatch.countDown();
                        }
                    } catch (OwnershipAcquireFailedException oafe) {
                        if (!isUnlock) {
                            ownerFromLock0.set(oafe.getCurrentOwner());
                            lock0DoneLatch.countDown();
                        }
                    } catch (LockingException le) {
                        logger.error("Failed on locking lock0_1 : ", le);
                    }
                }
            }, "lock0-thread");
            lock0Thread.start();

            // check lock1 is waiting for lock0_0
            children = awaitWaiters(3, zkc, lockPath);

            assertEquals(3, children.size());
            assertEquals(State.CLAIMED, lock0_0.getLockState());
            assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
            awaitState(State.WAITING, lock1);
            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
            awaitState(State.WAITING, lock0_1);
            assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(2))));
        }

        if (isUnlock) {
            lock0_0.unlock();
        } else {
            ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
        }

        lock1DoneLatch.await();
        lock1Thread.join();

        // check the state of lock0_0
        if (isUnlock) {
            assertEquals(State.CLOSED, lock0_0.getLockState());
        } else {
            assertEquals(State.EXPIRED, lock0_0.getLockState());
        }

        if (timeout == 0) {
            children = getLockWaiters(zkc, lockPath);
            assertEquals(1, children.size());
            assertEquals(State.CLAIMED, lock1.getLockState());
            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
        } else {
            assertNotNull(lock0Thread);
            if (!isUnlock) {
                // both lock0_0 and lock0_1 would be expired
                lock0DoneLatch.await();
                lock0Thread.join();

                assertEquals(clientId0, ownerFromLock0.get());
                assertEquals(State.CLOSED, lock0_1.getLockState());

                children = getLockWaiters(zkc, lockPath);
                assertEquals(1, children.size());
                assertEquals(State.CLAIMED, lock1.getLockState());
                assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
            } else {
                children = getLockWaiters(zkc, lockPath);
                assertEquals(2, children.size());
                assertEquals(State.CLAIMED, lock1.getLockState());
                assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
                assertEquals(State.WAITING, lock0_1.getLockState());
                assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
            }
        }

        lock1.unlock();

        if (timeout != 0 && isUnlock) {
            lock0DoneLatch.await();
            lock0Thread.join();

            children = getLockWaiters(zkc, lockPath);
            assertEquals(1, children.size());
            assertEquals(State.CLAIMED, lock0_1.getLockState());
            assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
        }
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId0() throws Exception {
        testLockWhenSiblingUseSameLockId(0, true);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId1() throws Exception {
        testLockWhenSiblingUseSameLockId(0, false);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId2() throws Exception {
        testLockWhenSiblingUseSameLockId(Long.MAX_VALUE, true);
    }

    @Test(timeout = 60000)
    public void testLockWhenSiblingUseSameLockId3() throws Exception {
        testLockWhenSiblingUseSameLockId(Long.MAX_VALUE, false);
    }

    private void testLockWhenSiblingUseSameLockId(long timeout, final boolean isUnlock) throws Exception {
        String lockPath = "/test-lock-when-sibling-use-same-lock-id-" + timeout
                + "-" + isUnlock + "-" + System.currentTimeMillis();
        String clientId = "client-id";

        createLockPath(zkc.get(), lockPath);

        final ZKSessionLock lock0 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);
        final ZKSessionLock lock1 = new ZKSessionLock(zkc0, lockPath, clientId, lockStateExecutor);

        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        List<String> children = getLockWaiters(zkc0, lockPath);
        assertEquals(1, children.size());
        assertEquals(State.CLAIMED, lock0.getLockState());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));

        lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
        children = getLockWaiters(zkc0, lockPath);
        assertEquals(2, children.size());
        assertEquals(State.CLAIMED, lock0.getLockState());
        assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
        assertEquals(State.CLAIMED, lock1.getLockState());
        assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));

        if (isUnlock) {
            lock0.unlock();
            assertEquals(State.CLOSED, lock0.getLockState());
            children = getLockWaiters(zkc0, lockPath);
            assertEquals(1, children.size());
            assertEquals(State.CLAIMED, lock1.getLockState());
            assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
            lock1.unlock();
        } else {
            ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
            final CountDownLatch latch = new CountDownLatch(1);
            lockStateExecutor.submit(lockPath, new SafeRunnable() {
                @Override
                public void safeRun() {
                    latch.countDown();
                }
            });
            latch.await();
            children = getLockWaiters(zkc, lockPath);
            assertEquals(0, children.size());
            assertEquals(State.EXPIRED, lock0.getLockState());
            assertEquals(State.EXPIRED, lock1.getLockState());
        }
    }

}
