blob: ee83867fdefa5d5533d1442763d0720c81fe243c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.util.UUID;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.oozie.lock.LockToken;
import org.apache.oozie.lock.TestMemoryLocks;
import org.apache.oozie.service.ZKLocksService.ZKLockToken;
import org.apache.oozie.test.ZKXTestCase;
import org.apache.oozie.util.Locker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.ZKUtils;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.data.Stat;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.concurrent.TimeUnit;
public class TestZKLocksService extends ZKXTestCase {
private XLog log = XLog.getLog(getClass());
@Override
protected void setUp() throws Exception {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public void testRegisterUnregister() throws Exception {
assertEquals(0, ZKUtils.getUsers().size());
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
assertEquals(1, ZKUtils.getUsers().size());
assertEquals(zkls, ZKUtils.getUsers().iterator().next());
zkls.destroy();
assertEquals(0, ZKUtils.getUsers().size());
}
finally {
zkls.destroy();
}
}
public class ReadLocker extends Locker {
private final ZKLocksService zkls;
public ReadLocker(String name, int nameIndex, long timeout, StringBuffer buffer, ZKLocksService zkls) {
super(name, nameIndex, timeout, buffer);
this.zkls = zkls;
}
@Override
protected ZKLocksService.ZKLockToken getLock() throws InterruptedException {
return (ZKLocksService.ZKLockToken)zkls.getReadLock(name, timeout);
}
}
public class WriteLocker extends Locker {
private final ZKLocksService zkls;
public WriteLocker(String name, int nameIndex, long timeout, StringBuffer buffer, ZKLocksService zkls) {
super(name, nameIndex, timeout, buffer);
this.zkls = zkls;
}
@Override
protected ZKLocksService.ZKLockToken getLock() throws InterruptedException {
return (ZKLocksService.ZKLockToken)zkls.getWriteLock(name, timeout);
}
}
public void testWaitWriteLockThreads() throws Exception {
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
checkWaitWriteLock(zkls, zkls);
}
finally {
zkls.destroy();
}
}
public void testWaitWriteLockOozies() throws Exception {
// Simulate having two different Oozies by using two different ZKLocksServices instead of using the same one in two threads
ZKLocksService zkls1 = new ZKLocksService();
ZKLocksService zkls2 = new ZKLocksService();
try {
zkls1.init(Services.get());
zkls2.init(Services.get());
checkWaitWriteLock(zkls1, zkls2);
}
finally {
zkls1.destroy();
zkls2.destroy();
}
}
public void checkWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
StringBuffer sb = new StringBuffer("");
Locker l1 = new WriteLocker("a", 1, -1, sb, zkls1);
Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2);
new Thread(l1).start();
l1.awaitLockAcquire();
new Thread(l2).start();
l2.awaitStart();
l1.proceed();
l2.proceed();
l1.awaitTermination();
l2.awaitTermination();
assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
}
public void testNoWaitWriteLockThreads() throws Exception {
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
checkNoWaitWriteLock(zkls, zkls);
}
finally {
zkls.destroy();
}
}
public void testNoWaitWriteLockOozies() throws Exception {
// Simulate having two different Oozies by using two different ZKLocksServices instead of using the same one in two threads
ZKLocksService zkls1 = new ZKLocksService();
ZKLocksService zkls2 = new ZKLocksService();
try {
zkls1.init(Services.get());
zkls2.init(Services.get());
checkNoWaitWriteLock(zkls1, zkls2);
}
finally {
zkls1.destroy();
zkls2.destroy();
}
}
public void checkNoWaitWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
StringBuffer sb = new StringBuffer("");
Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1);
Locker l2 = new WriteLocker("a", 2, 0, sb, zkls2);
new Thread(l1).start();
l1.awaitLockAcquire();
new Thread(l2).start();
l2.awaitStart();
l2.proceed();
l2.awaitTermination();
l1.proceed();
l1.awaitTermination();
assertEquals("a:1-L a:2-N a:1-U", sb.toString().trim());
}
public void testTimeoutWaitingWriteLockThreads() throws Exception {
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
checkTimeoutWaitingWriteLock(zkls, zkls);
}
finally {
zkls.destroy();
}
}
public void testTimeoutWaitingWriteLockOozies() throws Exception {
// Simulate having two different Oozies by using two different ZKLocksServices instead of using the same one in two threads
ZKLocksService zkls1 = new ZKLocksService();
ZKLocksService zkls2 = new ZKLocksService();
try {
zkls1.init(Services.get());
zkls2.init(Services.get());
checkTimeoutWaitingWriteLock(zkls1, zkls2);
}
finally {
zkls1.destroy();
zkls2.destroy();
}
}
public void checkTimeoutWaitingWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
StringBuffer sb = new StringBuffer("");
Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1);
Locker l2 = new WriteLocker("a", 2, (long) (WAITFOR_RATIO * 2000), sb, zkls2);
new Thread(l1).start();
l1.awaitLockAcquire();
new Thread(l2).start();
l2.awaitStart();
l1.proceed();
l1.awaitTermination();
l2.proceed();
l2.awaitTermination();
assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
}
public void testTimeoutTimingOutWriteLockThreads() throws Exception {
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
checkTimeoutTimingOutWriteLock(zkls, zkls);
}
finally {
zkls.destroy();
}
}
public void testTimeoutTimingOutWriteLockOozies() throws Exception {
// Simulate having two different Oozies by using two different ZKLocksServices instead of using the same one in two threads
ZKLocksService zkls1 = new ZKLocksService();
ZKLocksService zkls2 = new ZKLocksService();
try {
zkls1.init(Services.get());
zkls2.init(Services.get());
checkTimeoutTimingOutWriteLock(zkls1, zkls2);
}
finally {
zkls1.destroy();
zkls2.destroy();
}
}
public void checkTimeoutTimingOutWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
StringBuffer sb = new StringBuffer("");
Locker l1 = new WriteLocker("a", 1, 0, sb, zkls1);
Locker l2 = new WriteLocker("a", 2, 50, sb, zkls2);
new Thread(l1).start();
l1.awaitLockAcquire();
new Thread(l2).start();
l2.awaitStart();
l2.proceed();
l2.awaitTermination();
l1.proceed();
l1.awaitTermination();
assertEquals("a:1-L a:2-N a:1-U", sb.toString().trim());
}
public void testReadLockThreads() throws Exception {
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
checkReadLock(zkls, zkls);
}
finally {
zkls.destroy();
}
}
public void testReadLockOozies() throws Exception {
// Simulate having two different Oozies by using two different ZKLocksServices instead of using the same one in two threads
ZKLocksService zkls1 = new ZKLocksService();
ZKLocksService zkls2 = new ZKLocksService();
try {
zkls1.init(Services.get());
zkls2.init(Services.get());
checkReadLock(zkls1, zkls2);
}
finally {
zkls1.destroy();
zkls2.destroy();
}
}
public void checkReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
StringBuffer sb = new StringBuffer("");
Locker l1 = new ReadLocker("a", 1, -1, sb, zkls1);
Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2);
new Thread(l1).start();
l1.awaitLockAcquire();
new Thread(l2).start();
l2.awaitLockAcquire();
l1.proceed();
l1.awaitTermination();
l2.proceed();
l2.awaitTermination();
assertEquals("a:1-L a:2-L a:1-U a:2-U", sb.toString().trim());
}
public void testReadWriteLockThreads() throws Exception {
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
checkReadWriteLock(zkls, zkls);
}
finally {
zkls.destroy();
}
}
public void testReadWriteLockOozies() throws Exception {
// Simulate having two different Oozies by using two different ZKLocksServices instead of using the same one in two threads
ZKLocksService zkls1 = new ZKLocksService();
ZKLocksService zkls2 = new ZKLocksService();
try {
zkls1.init(Services.get());
zkls2.init(Services.get());
checkReadWriteLock(zkls1, zkls2);
}
finally {
zkls1.destroy();
zkls2.destroy();
}
}
public void checkReadWriteLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
StringBuffer sb = new StringBuffer("");
Locker l1 = new ReadLocker("a", 1, -1, sb, zkls1);
Locker l2 = new WriteLocker("a", 2, -1, sb, zkls2);
new Thread(l1).start();
l1.awaitLockAcquire();
new Thread(l2).start();
l2.awaitStart();
l1.proceed();
l1.awaitTermination();
l2.proceed();
l2.awaitTermination();
assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
}
public void testWriteReadLockThreads() throws Exception {
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
checkWriteReadLock(zkls, zkls);
}
finally {
zkls.destroy();
}
}
public void testWriteReadLockOozies() throws Exception {
// Simulate having two different Oozies by using two different ZKLocksServices instead of using the same one in two threads
ZKLocksService zkls1 = new ZKLocksService();
ZKLocksService zkls2 = new ZKLocksService();
try {
zkls1.init(Services.get());
zkls2.init(Services.get());
checkWriteReadLock(zkls1, zkls2);
}
finally {
zkls1.destroy();
zkls2.destroy();
}
}
public void checkWriteReadLock(ZKLocksService zkls1, ZKLocksService zkls2) throws Exception {
StringBuffer sb = new StringBuffer("");
Locker l1 = new WriteLocker("a", 1, -1, sb, zkls1);
Locker l2 = new ReadLocker("a", 2, -1, sb, zkls2);
new Thread(l1).start();
l1.awaitLockAcquire();
new Thread(l2).start();
l2.awaitStart();
l1.proceed();
l1.awaitTermination();
l2.proceed();
l2.awaitTermination();
assertEquals("a:1-L a:1-U a:2-L a:2-U", sb.toString().trim());
}
public void testLockRelease() throws ServiceException, InterruptedException {
final String path = UUID.randomUUID().toString();
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
assertTrue(zkls.getLocks().containsKey(path));
lock.release();
checkLockRelease(path, zkls);
}
finally {
zkls.destroy();
}
}
public void testReentrantMultipleCall() throws ServiceException, InterruptedException {
final String path = UUID.randomUUID().toString();
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
ZKLockToken lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
lock = (ZKLockToken) zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
assertTrue(zkls.getLocks().containsKey(path));
lock.release();
assertTrue(zkls.getLocks().containsKey(path));
lock.release();
assertTrue(zkls.getLocks().containsKey(path));
lock.release();
checkLockRelease(path, zkls);
}
catch (Exception e) {
fail("Reentrant property, it should have acquired lock");
}
finally {
zkls.destroy();
}
}
public void testReentrantMultipleThread() throws ServiceException, InterruptedException {
final String path = UUID.randomUUID().toString();
final ZKLocksService zkls = new ZKLocksService();
zkls.init(Services.get());
try {
ThreadLock t1 = new ThreadLock(zkls, path);
ThreadLock t2 = new ThreadLock(zkls, path);
t1.start();
t1.join();
checkLockRelease(path, zkls);
t2.start();
t2.join();
checkLockRelease(path, zkls);
}
finally {
zkls.destroy();
}
}
public void testLockReaper() throws Exception {
ConfigurationService.set(ZKLocksService.REAPING_THRESHOLD, "1");
ZKLocksService zkls = new ZKLocksService();
try {
zkls.init(Services.get());
for (int i = 0; i < 10; ++i) {
LockToken l = zkls.getReadLock(String.valueOf(i), 1);
l.release();
}
waitFor(10000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
Stat stat = getClient().checkExists().forPath(ZKLocksService.LOCKS_NODE);
return stat.getNumChildren() == 0;
}
});
Stat stat = getClient().checkExists().forPath(ZKLocksService.LOCKS_NODE);
assertEquals(0, stat.getNumChildren());
}
finally {
zkls.destroy();
}
}
public void testLocksAreGarbageCollected() throws ServiceException, InterruptedException {
String path = new String("a");
String path1 = new String("a");
ZKLocksService lockService = new ZKLocksService();
try {
lockService.init(Services.get());
LockToken lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
lock.release();
assertEquals(lockService.getLocks().size(), 1);
int oldHash = lockService.getLocks().get(path).hashCode();
lock = lockService.getWriteLock(path1, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
int newHash = lockService.getLocks().get(path1).hashCode();
assertTrue(oldHash == newHash);
lock = null;
System.gc();
lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
newHash = lockService.getLocks().get(path).hashCode();
assertFalse(oldHash == newHash);
}
finally {
lockService.destroy();
}
}
public void testLocksAreReused() throws ServiceException, InterruptedException {
String path = "a";
ZKLocksService lockService = new ZKLocksService();
try {
lockService.init(Services.get());
LockToken lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
int oldHash = System.identityHashCode(lockService.getLocks().get(path));
System.gc();
lock.release();
lock = lockService.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
assertEquals(lockService.getLocks().size(), 1);
int newHash = System.identityHashCode(lockService.getLocks().get(path));
assertTrue(oldHash == newHash);
}
finally {
lockService.destroy();
}
}
public void testRetriableRelease() throws Exception {
final String path = UUID.randomUUID().toString();
ZKLocksService zkls = new ZKLocksService();
try{
zkls.init(Services.get());
InterProcessReadWriteLock lockEntry = Mockito.mock(InterProcessReadWriteLock.class);
final InterProcessMutex writeLock = Mockito.mock(InterProcessMutex.class);
Mockito.when(lockEntry.writeLock()).thenReturn(writeLock);
Mockito.doThrow(new ConnectionLossException()).when(writeLock).release();
Mockito.doNothing().when(writeLock).acquire();
// put mocked lockEntry
zkls.getLocks().putIfAbsent(path,lockEntry);
LockToken lock = zkls.getWriteLock(path, -1);
final boolean [] lockReleased = new boolean [] {false};
Runnable exceptionStopper = new Runnable() {
@Override
public void run() {
try {
// Stop the exception on release() after some time in other thread
Thread.sleep(TimeUnit.SECONDS.toMillis(13));
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
lockReleased[0] = true;
return null;
}
}).when(writeLock).release();
}
catch (Exception e) {
log.error(e);
fail("Test case failed due to " + e.getMessage());
}
}
};
new Thread(exceptionStopper).start();
//Try to release the lock
lock.release();
assertEquals("Failing the test case. The lock should have been released", true, lockReleased[0]);
}
finally {
zkls.destroy();
}
}
private void checkLockRelease(String path, ZKLocksService zkls) {
if (zkls.getLocks().get(path) == null) {
// good, lock is removed from memory after gc.
}
else {
assertFalse(zkls.getLocks().get(path).writeLock().isAcquiredInThisProcess());
}
}
static class ThreadLock extends Thread {
ZKLocksService zkls;
String path;
LockToken lock = null;
public ThreadLock(ZKLocksService zkls, String path) {
this.zkls = zkls;
this.path = path;
}
public void run() {
try {
lock = zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
if (lock != null) {
lock = zkls.getWriteLock(path, TestMemoryLocks.DEFAULT_LOCK_TIMEOUT);
Thread.sleep(1000);
lock.release();
Thread.sleep(1000);
lock.release();
}
}
catch (InterruptedException e) {
}
}
}
}