blob: c3b47bc492b95ff78d9a21a309792ca5f46995d4 [file] [log] [blame]
package edu.uci.ics.asterix.transaction.management.service.locking;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.NoSuchElementException;
import java.util.Scanner;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager.TransactionState;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
public class LockManagerDeterministicUnitTest {
public static void main(String args[]) throws ACIDException, IOException {
//initialize controller thread
String requestFileName = new String(
"src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockRequestFile");
Thread t = new Thread(new LockRequestController(requestFileName));
t.start();
}
}
class LockRequestController implements Runnable {
public static final boolean IS_DEBUG_MODE = false;
TransactionSubsystem txnProvider;
WorkerReadyQueue workerReadyQueue;
ArrayList<LockRequest> requestList;
ArrayList<ArrayList<Integer>> expectedResultList;
int resultListIndex;
LockManager lockMgr;
String requestFileName;
long defaultWaitTime;
public LockRequestController(String requestFileName) throws ACIDException {
this.txnProvider = new TransactionSubsystem("LockManagerPredefinedUnitTest", null);;
this.workerReadyQueue = new WorkerReadyQueue();
this.requestList = new ArrayList<LockRequest>();
this.expectedResultList = new ArrayList<ArrayList<Integer>>();
this.lockMgr = (LockManager) txnProvider.getLockManager();
this.requestFileName = new String(requestFileName);
this.resultListIndex = 0;
this.defaultWaitTime = 10;
}
@Override
public void run() {
Thread.currentThread().setName("Thread-0");
HashMap<String, Thread> threadMap = new HashMap<String, Thread>();
Thread t = null;
LockRequest lockRequest = null;
boolean isSuccess = true;
try {
readRequest();
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
} catch (ACIDException e) {
e.printStackTrace();
System.exit(-1);
}
//initialize workerThread
int size = requestList.size();
for (int i = 0; i < size; i++) {
lockRequest = requestList.get(i);
if (lockRequest.threadName.equals("Thread-0")) {
//Thread-0 is controller thread.
continue;
}
t = threadMap.get(lockRequest.threadName);
if (t == null) {
t = new Thread(new LockRequestWorker(txnProvider, workerReadyQueue, lockRequest.threadName),
lockRequest.threadName);
threadMap.put(lockRequest.threadName, t);
t.start();
log("Created " + lockRequest.threadName);
}
}
//wait for all workerThreads to be ready
try {
log("waiting for all workerThreads to complete initialization ...");
Thread.sleep(5);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
while (workerReadyQueue.size() != threadMap.size()) {
try {
log(" .");
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//make workerThread work
while (requestList.size() != 0) {
lockRequest = requestList.remove(0);
log("Processing: " + lockRequest.prettyPrint());
try {
if (!handleRequest(lockRequest)) {
log("\n*** Test Failed ***");
isSuccess = false;
break;
} else {
log("Processed: " + lockRequest.prettyPrint());
}
} catch (ACIDException e) {
e.printStackTrace();
break;
}
}
if (isSuccess) {
log("\n*** Test Passed ***");
}
}
public boolean handleRequest(LockRequest request) throws ACIDException {
LockRequestWorker worker = null;
int i = 0;
if (request.requestType == RequestType.CHECK_SEQUENCE) {
return validateExpectedResult(true);
} else if (request.requestType == RequestType.CHECK_SET) {
return validateExpectedResult(false);
} else if (request.requestType == RequestType.WAIT) {
try {
Thread.sleep((long) request.entityHashValue);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
} else if (request.requestType == RequestType.END) {
worker = workerReadyQueue.pop(request.threadName);
while (worker == null) {
if (!IS_DEBUG_MODE) {
log(request.threadName + " is not in the workerReadyQueue");
return false;
}
log(Thread.currentThread().getName() + " waiting for " + request.threadName
+ " to be in the workerReadyQueue[" + i++ + "].");
try {
Thread.sleep((long) 10);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
worker = workerReadyQueue.pop(request.threadName);
}
synchronized (worker) {
worker.setDone(true);
worker.setWait(false);
worker.notify();
}
try {
Thread.sleep((long) defaultWaitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
worker = workerReadyQueue.pop(request.threadName);
while (worker == null) {
if (!IS_DEBUG_MODE) {
log(request.threadName + " is not in the workerReadyQueue");
return false;
}
log(Thread.currentThread().getName() + " waiting for " + request.threadName
+ " to be in the workerReadyQueue[" + i++ + "].");
try {
Thread.sleep((long) 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
worker = workerReadyQueue.pop(request.threadName);
}
synchronized (worker) {
worker.setLockRequest(request);
worker.setWait(false);
worker.notify();
}
try {
Thread.sleep((long) defaultWaitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
public boolean validateExpectedResult(boolean isSequence) {
if (isSequence) {
return workerReadyQueue.checkSequence(expectedResultList.get(resultListIndex++));
} else {
return workerReadyQueue.checkSet(expectedResultList.get(resultListIndex++));
}
}
public void readRequest() throws IOException, ACIDException {
int i = 0;
LockRequest lockRequest = null;
TransactionContext txnContext = null;
HashMap<Integer, TransactionContext> jobMap = new HashMap<Integer, TransactionContext>();
int threadId;
String requestType;
int jobId;
int datasetId;
int PKHashVal;
int waitTime;
ArrayList<Integer> list = null;
String lockMode;
Scanner scanner = new Scanner(new FileInputStream(requestFileName));
while (scanner.hasNextLine()) {
try {
threadId = Integer.parseInt(scanner.next().substring(1));
requestType = scanner.next();
if (requestType.equals("CSQ") || requestType.equals("CST") || requestType.equals("END")) {
log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType);
lockRequest = new LockRequest("Thread-" + threadId, getRequestType(requestType));
if (requestType.equals("CSQ") || requestType.equals("CST")) {
list = new ArrayList<Integer>();
while (scanner.hasNextInt()) {
threadId = scanner.nextInt();
if (threadId < 0) {
break;
}
list.add(threadId);
}
expectedResultList.add(list);
}
} else if (requestType.equals("DW")) {
defaultWaitTime = scanner.nextInt();
log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType + "," + defaultWaitTime);
continue;
} else if (requestType.equals("W")) {
waitTime = scanner.nextInt();
log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType);
lockRequest = new LockRequest("Thread-" + threadId, getRequestType(requestType), waitTime);
} else {
jobId = Integer.parseInt(scanner.next().substring(1));
datasetId = Integer.parseInt(scanner.next().substring(1));
PKHashVal = Integer.parseInt(scanner.next().substring(1));
lockMode = scanner.next();
txnContext = jobMap.get(jobId);
if (txnContext == null) {
txnContext = new TransactionContext(new JobId(jobId), txnProvider);
jobMap.put(jobId, txnContext);
}
log("LockRequest[" + i++ + "]:T" + threadId + "," + requestType + ",J" + jobId + ",D" + datasetId
+ ",E" + PKHashVal + "," + lockMode);
lockRequest = new LockRequest("Thread-" + threadId, getRequestType(requestType), new DatasetId(
datasetId), PKHashVal, getLockMode(lockMode), txnContext);
}
requestList.add(lockRequest);
} catch (NoSuchElementException e) {
scanner.close();
break;
}
}
}
public void log(String s) {
System.out.println(s);
}
private int getRequestType(String s) {
if (s.equals("L")) {
return RequestType.LOCK;
}
if (s.equals("TL")) {
return RequestType.TRY_LOCK;
}
if (s.equals("IL")) {
return RequestType.INSTANT_LOCK;
}
if (s.equals("ITL")) {
return RequestType.INSTANT_TRY_LOCK;
}
if (s.equals("UL")) {
return RequestType.UNLOCK;
}
if (s.equals("RL")) {
return RequestType.RELEASE_LOCKS;
}
if (s.equals("CSQ")) {
return RequestType.CHECK_SEQUENCE;
}
if (s.equals("CST")) {
return RequestType.CHECK_SET;
}
if (s.equals("END")) {
return RequestType.END;
}
if (s.equals("W")) {
return RequestType.WAIT;
}
try {
throw new UnsupportedOperationException("Invalid request type:" + s);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.exit(0);
}
return -1;
}
private byte getLockMode(String s) {
if (s.equals("S")) {
return LockMode.S;
}
if (s.equals("X")) {
return LockMode.X;
}
try {
throw new UnsupportedOperationException("Invalid lock mode type:" + s);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.exit(0);
}
return -1;
}
}
class LockRequestWorker implements Runnable {
String threadName;
TransactionSubsystem txnProvider;
ILockManager lockMgr;
WorkerReadyQueue workerReadyQueue;
LockRequest lockRequest;
boolean needWait;
boolean isAwaken;
boolean isDone;
public LockRequestWorker(TransactionSubsystem txnProvider, WorkerReadyQueue workerReadyQueue, String threadName) {
this.txnProvider = txnProvider;
this.lockMgr = txnProvider.getLockManager();
this.workerReadyQueue = workerReadyQueue;
this.threadName = new String(threadName);
this.lockRequest = null;
needWait = true;
isDone = false;
isAwaken = false;
}
public boolean isAwaken() {
return isAwaken;
}
@Override
public void run() {
//initial wait
needWait = true;
isAwaken = false;
while (!isDone) {
while (needWait) {
synchronized (this) {
workerReadyQueue.push(this);
try {
this.wait();
isAwaken = true;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
if (isDone) {
break;
}
try {
sendRequest(lockRequest);
} catch (ACIDException e) {
if (lockRequest.txnContext.getStatus() == TransactionContext.TIMED_OUT_STATUS) {
if (lockRequest.txnContext.getTxnState() != TransactionState.ABORTED) {
lockRequest.txnContext.setTxnState(TransactionState.ABORTED);
log("*** " + lockRequest.txnContext.getJobId() + " lock request causing deadlock ***");
log("Abort --> Releasing all locks acquired by " + lockRequest.txnContext.getJobId());
try {
lockMgr.releaseLocks(lockRequest.txnContext);
} catch (ACIDException e1) {
e1.printStackTrace();
}
log("Abort --> Released all locks acquired by " + lockRequest.txnContext.getJobId());
}
isDone = true;
} else {
e.printStackTrace();
System.exit(-1);
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
needWait = true;
isAwaken = false;
}
}
public void sendRequest(LockRequest request) throws ACIDException {
switch (request.requestType) {
case RequestType.LOCK:
lockMgr.lock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
break;
case RequestType.INSTANT_LOCK:
lockMgr.instantLock(request.datasetIdObj, request.entityHashValue, request.lockMode, request.txnContext);
break;
case RequestType.TRY_LOCK:
request.isTryLockFailed = !lockMgr.tryLock(request.datasetIdObj, request.entityHashValue,
request.lockMode, request.txnContext);
break;
case RequestType.INSTANT_TRY_LOCK:
lockMgr.instantTryLock(request.datasetIdObj, request.entityHashValue, request.lockMode,
request.txnContext);
break;
case RequestType.UNLOCK:
lockMgr.unlock(request.datasetIdObj, request.entityHashValue, request.txnContext);
break;
case RequestType.RELEASE_LOCKS:
lockMgr.releaseLocks(request.txnContext);
break;
default:
throw new UnsupportedOperationException("Unsupported lock method");
}
}
public void setLockRequest(LockRequest request) {
this.lockRequest = request;
}
public void setWait(boolean wait) {
needWait = wait;
}
public void setDone(boolean done) {
isDone = done;
}
public String getThreadName() {
return threadName;
}
public void log(String s) {
System.out.println(s);
}
}
class WorkerReadyQueue {
ArrayList<LockRequestWorker> workerReadyQueue;
public WorkerReadyQueue() {
workerReadyQueue = new ArrayList<LockRequestWorker>();
}
public synchronized void push(LockRequestWorker worker) {
workerReadyQueue.add(worker);
}
public synchronized LockRequestWorker pop(String threadName) {
int i;
LockRequestWorker worker = null;
int size = workerReadyQueue.size();
for (i = 0; i < size; i++) {
worker = workerReadyQueue.get(i);
if (worker.getThreadName().equals(threadName)) {
workerReadyQueue.remove(i);
break;
}
}
if (i == size) {
return null;
} else {
return worker;
}
}
public synchronized int size() {
return workerReadyQueue.size();
}
public boolean checkSet(ArrayList<Integer> threadIdList) {
int i;
int j;
StringBuilder s = new StringBuilder();
LockRequestWorker worker = null;
int resultListSize = 0;
int queueSize = workerReadyQueue.size();
int listSize = threadIdList.size();
s.append("ExpectedList(Set):\t");
for (i = 0; i < listSize; i++) {
s.append(threadIdList.get(i)).append(" ");
}
s.append("\n");
while (queueSize < listSize) {
//wait until workers finish its task
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log(Thread.currentThread().getName() + " waiting for worker to finish its task...");
queueSize = workerReadyQueue.size();
}
if (listSize != queueSize) {
log("listSize:" + listSize + ", queueSize:" + queueSize);
return false;
}
s.append("ResultList(Set):\t");
for (i = 0; i < listSize; i++) {
for (j = 0; j < queueSize; j++) {
worker = workerReadyQueue.get(j);
if (worker.getThreadName().equals("Thread-" + threadIdList.get(i))) {
s.append(threadIdList.get(i)).append(" ");
resultListSize++;
break;
}
}
}
log(s.toString());
if (listSize != resultListSize) {
return false;
}
return true;
}
public boolean checkSequence(ArrayList<Integer> threadIdList) {
int i;
StringBuilder s = new StringBuilder();
LockRequestWorker worker = null;
int queueSize = workerReadyQueue.size();
int listSize = threadIdList.size();
s.append("ExpectedList(Sequence):\t");
for (i = 0; i < listSize; i++) {
s.append(threadIdList.get(i)).append(" ");
}
s.append("\n");
while (queueSize < listSize) {
//wait until workers finish its task
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log(Thread.currentThread().getName() + "Waiting for worker to finish its task...");
queueSize = workerReadyQueue.size();
}
if (queueSize != listSize) {
return false;
}
s.append("ResultList(Sequence):\t");
for (i = 0; i < listSize; i++) {
worker = workerReadyQueue.get(i);
if (!worker.getThreadName().equals("Thread-" + threadIdList.get(i))) {
log(s.toString());
return false;
} else {
s.append(threadIdList.get(i)).append(" ");
}
}
log(s.toString());
return true;
}
public void log(String s) {
System.out.println(s);
}
}