blob: 14e4020bd254349817d9dad0d7570e169816ade8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.transaction.management.service.locking;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.ConsoleHandler;
import java.util.logging.Logger;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILockManager;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.transaction.management.service.locking.Request.Kind;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class LockManagerUnitTest {
public static int LOCK_MGR_SHRINK_TIMER = 5000;
public static int LOCK_MGR_ARENAS = 2;
public static int LOCK_MGR_TABLE_SIZE = 10;
static int INITIAL_TIMESTAMP = 0;
static long COORDINATOR_SLEEP = 20;
static int TIMEOUT_MS = 100;
static {
Logger.getLogger(ConcurrentLockManager.class.getName()).addHandler(new ConsoleHandler());
}
Map<Integer, ITransactionContext> jobId2TxnCtxMap;
ILockManager lockMgr;
// set to e.g. System.err to get some output
PrintStream out = System.out;
PrintStream err = null; //System.err;
//--------------------------------------------------------------------
// JUnit methods
//--------------------------------------------------------------------
@Before
public void setUp() throws Exception {
jobId2TxnCtxMap = new HashMap<>();
lockMgr = new ConcurrentLockManager(LOCK_MGR_SHRINK_TIMER, LOCK_MGR_ARENAS, LOCK_MGR_TABLE_SIZE);
}
@After
public void tearDown() throws Exception {
lockMgr = null;
jobId2TxnCtxMap = null;
}
@Test
public void testSimpleSharedUnlock() throws Exception {
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
reqs.add(req(Kind.UNLOCK, j(1), d(1), e(1), LockMode.S));
reportErrors(execute(reqs));
}
@Test
public void testSimpleSharedRelease() throws Exception {
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
reqs.add(req(Kind.RELEASE, j(1)));
reportErrors(execute(reqs));
}
@Test
public void testReacquire() throws Exception {
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
reqs.add(req(Kind.RELEASE, j(1)));
reportErrors(execute(reqs));
}
@Test
public void testInstant() throws Exception {
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X));
reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
reqs.add(req(Kind.PRINT));
reqs.add(req(Kind.INSTANT_LOCK, j(3), d(1), e(1), LockMode.S));
expectError(execute(reqs), j(3), WaitInterruptedException.class);
}
@Test
public void testTry() throws Exception {
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.TRY_LOCK, j(1), d(1), e(1), LockMode.S));
reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.S));
reqs.add(req(Kind.PRINT));
reqs.add(req(Kind.TRY_LOCK, j(3), d(1), e(1), LockMode.X));
reportErrors(execute(reqs));
}
@Test
public void testInstantTry() throws Exception {
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.INSTANT_LOCK, j(1), d(1), e(1), LockMode.X));
reqs.add(req(Kind.LOCK, j(2), d(1), e(1), LockMode.X));
reqs.add(req(Kind.PRINT));
reqs.add(req(Kind.INSTANT_TRY_LOCK, j(3), d(1), e(1), LockMode.S));
reportErrors(execute(reqs));
}
@Test
/**
* lock conversion/upgrade is not supported when deadlock-free locking
* protocol is enabled.
*/
public void testUpgrade() throws Exception {
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.X));
reqs.add(req(Kind.RELEASE, j(1)));
expectError(execute(reqs), j(1), IllegalStateException.class);
}
//--------------------------------------------------------------------
// Helper methods
//--------------------------------------------------------------------
/**
* Executes a list of requests where
* a) each job runs in a different thread and
* b) the threads/jobs are synchronized
* The synchronization ensures that the requests are submitted to the
* LockManager in list order, however they are fulfilled in the order
* decided by the LockManager
*
* @param reqs
* a list of requests that will be execute in order
* @return a map of (JodId, exception) pairs that can either be handled
* by the test or thrown using #reportErrors
*/
private Map<String, Throwable> execute(List<Request> reqs) throws InterruptedException {
if (err != null) {
err.println("*** start ***");
}
final AtomicInteger timeStamp = new AtomicInteger(INITIAL_TIMESTAMP);
Set<Locker> lockers = createLockers(reqs, timeStamp);
Map<String, Thread> threads = startThreads(lockers);
int coordinatorTime = timeStamp.get();
while (active(lockers)) {
if (err != null) {
err.println("coordinatorTime = " + coordinatorTime);
}
if (coordinatorTime == timeStamp.get()) {
Thread.sleep(COORDINATOR_SLEEP);
if (coordinatorTime == timeStamp.get()) {
Locker timedOut = timedOut(lockers);
if (timedOut != null) {
if (err != null) {
err.println(timedOut.name + " timed out");
}
break;
}
}
}
coordinatorTime = timeStamp.get();
}
Map<String, Throwable> result = stopThreads(lockers, threads);
return result;
}
private boolean active(Set<Locker> lockers) {
for (Locker locker : lockers) {
if (locker.active()) {
return true;
}
}
return false;
}
private Locker timedOut(Set<Locker> lockers) {
for (Locker locker : lockers) {
if (locker.timedOut()) {
return locker;
}
}
return null;
}
private Set<Locker> createLockers(List<Request> reqs, AtomicInteger timeStamp) {
Set<Locker> lockers = new HashSet<>();
lockers.add(new Locker(lockMgr, null, reqs, timeStamp, err));
for (ITransactionContext txnCtx : jobId2TxnCtxMap.values()) {
Locker locker = new Locker(lockMgr, txnCtx, reqs, timeStamp, err);
lockers.add(locker);
}
return lockers;
}
private Map<String, Thread> startThreads(Set<Locker> lockers) {
Map<String, Thread> threads = new HashMap<>(lockers.size());
for (Locker locker : lockers) {
Thread t = new Thread(locker, locker.name);
threads.put(locker.name, t);
t.start();
}
return threads;
}
private Map<String, Throwable> stopThreads(Set<Locker> lockers, Map<String, Thread> threads)
throws InterruptedException {
Map<String, Throwable> result = new HashMap<>();
for (Locker locker : lockers) {
stopThread(threads.get(locker.name));
List<Throwable> errors = locker.getErrors();
if (errors != null) {
errors.forEach(error -> result.put(locker.name, error));
}
}
return result;
}
private void stopThread(Thread t) throws InterruptedException {
if (err != null) {
err.println("stopping " + t.getName() + " " + t.getState());
}
boolean done = false;
while (!done) {
switch (t.getState()) {
case NEW:
case RUNNABLE:
case TERMINATED:
done = true;
break;
default:
if (err != null) {
err.println("interrupting " + t.getName());
}
t.interrupt();
}
}
if (err != null) {
err.println("joining " + t.getName());
}
t.join();
}
/**
* throws the first Throwable found in the map.
* This is the default way to handle the errors returned by #execute
*
* @param errors
* a map of (JodId, exception) pairs
*/
void reportErrors(Map<String, Throwable> errors) {
for (String name : errors.keySet()) {
throw new AssertionError("job " + name + " caught something", errors.get(name));
}
out.println("no errors");
}
void printErrors(Map<String, Throwable> errors) {
errors.keySet().forEach(name -> out.println("Thread " + name + " caught " + errors.get(name)));
}
/**
* gets the error for a specific job from the errors map
*
* @param errors
* a map of (JodId, throwable) pairs
* @param txnCtx
* the transaction context of the job whose error is requested
* @return throwable for said error
*/
private static Throwable getError(Map<String, Throwable> errors, ITransactionContext txnCtx) {
return errors.get(txnCtx.getJobId().toString());
}
/**
* asserts that the error for a specific job from the errors map is of a specific class
*
* @param errors
* a map of (JodId, throwable) pairs
* @param txnCtx
* the transaction context of the job whose error is requested
* @param clazz
* the exception class
*/
private void expectError(Map<String, Throwable> errors, ITransactionContext txnCtx,
Class<? extends Throwable> clazz) throws Exception {
Throwable error = getError(errors, txnCtx);
if (error == null) {
throw new AssertionError(
"expected " + clazz.getSimpleName() + " for " + txnCtx.getJobId() + ", got no " + "exception");
}
if (!clazz.isInstance(error)) {
throw new AssertionError(error);
}
out.println("caught expected " + error);
}
//--------------------------------------------------------------------
// Convenience methods to make test description more compact
//--------------------------------------------------------------------
private Request req(final Kind kind, final ITransactionContext txnCtx, final DatasetId dsId, final int hashValue,
final byte lockMode) {
return Request.create(kind, txnCtx, dsId, hashValue, lockMode);
}
private Request req(final Kind kind, final ITransactionContext txnCtx) {
return Request.create(kind, txnCtx);
}
private Request req(final Kind kind) {
return Request.create(kind, out);
}
private static DatasetId d(int id) {
return new DatasetId(id);
}
private static int e(int i) {
return i;
}
private ITransactionContext j(int jId) {
if (!jobId2TxnCtxMap.containsKey(jId)) {
ITransactionContext mockTxnContext = mock(ITransactionContext.class);
when(mockTxnContext.getJobId()).thenReturn(new JobId(jId));
jobId2TxnCtxMap.put(jId, mockTxnContext);
}
return jobId2TxnCtxMap.get(jId);
}
}