blob: b5d85aa8026e28875b5978dc28bd814939cc7a2c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.zk;
import java.time.Duration;
import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.testUtils.EmbeddedZookeeper;
import org.apache.samza.util.NoOpMetricsRegistry;
import java.util.List;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static junit.framework.Assert.*;
public class TestZkDistributedLock {
private static EmbeddedZookeeper zkServer = null;
private static String testZkConnectionString = null;
private ZkUtils zkUtils1;
private ZkUtils zkUtils2;
public static void test() {
zkServer = new EmbeddedZookeeper();
testZkConnectionString = String.format("", zkServer.getPort());
public void testSetup() {
ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
ZkClient zkClient2 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
this.zkUtils2 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient2, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
public void testTearDown() {
public static void teardown() {
private List<String> getParticipants(ZkUtils zkUtils, String lockId) {
String lockPath = String.format("%s/lock_%s", zkUtils1.getKeyBuilder().getRootPath(), lockId);
return zkUtils.getZkClient().getChildren(lockPath);
public void testLockSingleProcessor() {
String lockId = "FAKE_LOCK_ID_1";
ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size());
boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
assertEquals("Lock does not have 1 participant after first processor tries to lock.", 1, getParticipants(zkUtils1, lockId).size());
assertEquals("1st processor requesting to lock did not acquire the lock.", true, lock1Status);
assertEquals("Lock does have 1 participant after first processor tries to unlock.", 0, getParticipants(zkUtils1, lockId).size());
public void testLockTwoProcessors() {
// second processor should acquire lock after first one unlocks
String lockId = "FAKE_LOCK_ID_2";
ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId);
assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size());
boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status);
boolean lock2Status = lock2.lock(Duration.ofMillis(10000));
assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status);
assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils1, lockId).size());
public void testLockFirstProcessorClosing() {
// first processor dies before unlock then second processor should acquire
String lockId = "FAKE_LOCK_ID_3";
ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId);
assertEquals("Lock has participants before any processor tried to lock!", 0, getParticipants(zkUtils1, lockId).size());
boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status);
// first processor dies before unlock
boolean lock2Status = lock2.lock(Duration.ofMillis(10000));
assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status);
assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils2, lockId).size());