blob: 6ca6581aaeb6cdd7c0d176fca9b3a4e3d0addc66 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMComponent;
public abstract class AbstractMutableLSMComponent implements ILSMComponent {
private int readerCount;
private int writerCount;
private ComponentState state;
private boolean isModified;
private enum ComponentState {
READABLE_WRITABLE,
READABLE_UNWRITABLE,
READABLE_UNWRITABLE_FLUSHING,
UNREADABLE_UNWRITABLE
}
public AbstractMutableLSMComponent() {
readerCount = 0;
writerCount = 0;
state = ComponentState.READABLE_WRITABLE;
isModified = false;
}
@Override
public synchronized boolean threadEnter(LSMOperationType opType) throws InterruptedException {
switch (opType) {
case FORCE_MODIFICATION:
if (state != ComponentState.READABLE_WRITABLE && state != ComponentState.READABLE_UNWRITABLE) {
return false;
}
writerCount++;
break;
case MODIFICATION:
if (state != ComponentState.READABLE_WRITABLE) {
return false;
}
writerCount++;
break;
case SEARCH:
if (state == ComponentState.UNREADABLE_UNWRITABLE) {
return false;
}
readerCount++;
break;
case FLUSH:
if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING
|| state == ComponentState.UNREADABLE_UNWRITABLE) {
return false;
}
state = ComponentState.READABLE_UNWRITABLE_FLUSHING;
while (writerCount > 0) {
wait();
}
readerCount++;
break;
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
}
return true;
}
@Override
public synchronized void threadExit(LSMOperationType opType, boolean failedOperation) throws HyracksDataException {
switch (opType) {
case FORCE_MODIFICATION:
case MODIFICATION:
writerCount--;
if (state == ComponentState.READABLE_WRITABLE && isFull()) {
state = ComponentState.READABLE_UNWRITABLE;
}
break;
case SEARCH:
readerCount--;
if (state == ComponentState.UNREADABLE_UNWRITABLE && readerCount == 0) {
reset();
state = ComponentState.READABLE_WRITABLE;
} else if (state == ComponentState.READABLE_WRITABLE && isFull()) {
state = ComponentState.READABLE_UNWRITABLE;
}
break;
case FLUSH:
if (failedOperation) {
state = isFull() ? ComponentState.READABLE_UNWRITABLE : ComponentState.READABLE_WRITABLE;
}
readerCount--;
if (readerCount == 0) {
reset();
state = ComponentState.READABLE_WRITABLE;
} else if (state == ComponentState.READABLE_UNWRITABLE_FLUSHING) {
state = ComponentState.UNREADABLE_UNWRITABLE;
}
break;
default:
throw new UnsupportedOperationException("Unsupported operation " + opType);
}
notifyAll();
}
public void setIsModified() {
isModified = true;
}
public boolean isModified() {
return isModified;
}
protected abstract boolean isFull();
protected void reset() throws HyracksDataException {
isModified = false;
}
}