blob: 5a2a8939826cd4b4f9f2afa861605aa61f43e8f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.locking;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockHeartbeatResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos.LockType;
@Category({ClientTests.class, SmallTests.class})
public class TestEntityLocks {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestEntityLocks.class);
private static final Logger LOG = LoggerFactory.getLogger(TestEntityLocks.class);
private final Configuration conf = HBaseConfiguration.create();
private final LockService.BlockingInterface master =
Mockito.mock(LockService.BlockingInterface.class);
private LockServiceClient admin;
private ArgumentCaptor<LockRequest> lockReqArgCaptor;
private ArgumentCaptor<LockHeartbeatRequest> lockHeartbeatReqArgCaptor;
private static final LockHeartbeatResponse UNLOCKED_RESPONSE =
LockHeartbeatResponse.newBuilder().setLockStatus(
LockHeartbeatResponse.LockStatus.UNLOCKED).build();
// timeout such that worker thread waits for 500ms for each heartbeat.
private static final LockHeartbeatResponse LOCKED_RESPONSE =
LockHeartbeatResponse.newBuilder().setLockStatus(
LockHeartbeatResponse.LockStatus.LOCKED).setTimeoutMs(10000).build();
private long procId;
// Setup mock admin.
LockServiceClient getAdmin() throws Exception {
conf.setInt("hbase.client.retries.number", 3);
conf.setInt("hbase.client.pause", 1); // 1ms. Immediately retry rpc on failure.
return new LockServiceClient(conf, master, PerClientRandomNonceGenerator.get());
}
@Before
public void setUp() throws Exception {
admin = getAdmin();
lockReqArgCaptor = ArgumentCaptor.forClass(LockRequest.class);
lockHeartbeatReqArgCaptor = ArgumentCaptor.forClass(LockHeartbeatRequest.class);
procId = new Random().nextLong();
}
private boolean waitLockTimeOut(EntityLock lock, long maxWaitTimeMillis) {
long startMillis = System.currentTimeMillis();
while (lock.isLocked()) {
LOG.info("Sleeping...");
Threads.sleepWithoutInterrupt(100);
if (!lock.isLocked()) {
return true;
}
if (System.currentTimeMillis() - startMillis > maxWaitTimeMillis) {
LOG.info("Timedout...");
return false;
}
}
return true; // to make compiler happy.
}
/**
* Test basic lock function - requestLock, await, unlock.
* @throws Exception
*/
@Test
public void testEntityLock() throws Exception {
final long procId = 100;
final long workerSleepTime = 200; // in ms
EntityLock lock = admin.namespaceLock("namespace", "description", null);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any())).thenReturn(
LockResponse.newBuilder().setProcId(procId).build());
when(master.lockHeartbeat(any(), any())).thenReturn(
UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, UNLOCKED_RESPONSE, LOCKED_RESPONSE);
lock.requestLock();
// we return unlock response 3 times, so actual wait time should be around 2 * workerSleepTime
lock.await(4 * workerSleepTime, TimeUnit.MILLISECONDS);
assertTrue(lock.isLocked());
lock.unlock();
assertTrue(!lock.getWorker().isAlive());
assertFalse(lock.isLocked());
// check LockRequest in requestLock()
verify(master, times(1)).requestLock(any(), lockReqArgCaptor.capture());
LockRequest request = lockReqArgCaptor.getValue();
assertEquals("namespace", request.getNamespace());
assertEquals("description", request.getDescription());
assertEquals(LockType.EXCLUSIVE, request.getLockType());
assertEquals(0, request.getRegionInfoCount());
// check LockHeartbeatRequest in lockHeartbeat()
verify(master, atLeastOnce()).lockHeartbeat(any(), lockHeartbeatReqArgCaptor.capture());
for (LockHeartbeatRequest req : lockHeartbeatReqArgCaptor.getAllValues()) {
assertEquals(procId, req.getProcId());
}
}
/**
* Test that abort is called when lock times out.
*/
@Test
public void testEntityLockTimeout() throws Exception {
final long workerSleepTime = 200; // in ms
Abortable abortable = Mockito.mock(Abortable.class);
EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any()))
.thenReturn(LockResponse.newBuilder().setProcId(procId).build());
// Acquires the lock, but then it times out (since we don't call unlock() on it).
when(master.lockHeartbeat(any(), any()))
.thenReturn(LOCKED_RESPONSE, UNLOCKED_RESPONSE);
lock.requestLock();
lock.await();
assertTrue(lock.isLocked());
// Should get unlocked in next heartbeat i.e. after workerSleepTime. Wait 10x time to be sure.
assertTrue(waitLockTimeOut(lock, 10 * workerSleepTime));
// Works' run() returns, there is a small gap that the thread is still alive(os
// has not declare it is dead yet), so remove the following assertion.
// assertFalse(lock.getWorker().isAlive());
verify(abortable, times(1)).abort(any(), eq(null));
}
/**
* Test that abort is called when lockHeartbeat fails with IOException.
*/
@Test
public void testHeartbeatException() throws Exception {
final long workerSleepTime = 100; // in ms
Abortable abortable = Mockito.mock(Abortable.class);
EntityLock lock = admin.namespaceLock("namespace", "description", abortable);
lock.setTestingSleepTime(workerSleepTime);
when(master.requestLock(any(), any()))
.thenReturn(LockResponse.newBuilder().setProcId(procId).build());
when(master.lockHeartbeat(any(), any()))
.thenReturn(LOCKED_RESPONSE)
.thenThrow(new ServiceException("Failed heartbeat!"));
lock.requestLock();
lock.await();
assertTrue(waitLockTimeOut(lock, 100 * workerSleepTime));
while (lock.getWorker().isAlive()) {
TimeUnit.MILLISECONDS.sleep(100);
}
verify(abortable, times(1)).abort(any(), isA(HBaseIOException.class));
assertFalse(lock.getWorker().isAlive());
}
}