blob: dc2821496dfcb5273ce37af2e479442bdba9785c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.tserver.scan;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.tserver.TabletServer;
public abstract class ScanTask<T> implements RunnableFuture<T> {
protected final TabletServer server;
protected AtomicBoolean interruptFlag;
protected ArrayBlockingQueue<Object> resultQueue;
protected AtomicInteger state;
protected AtomicReference<ScanRunState> runState;
private static final int INITIAL = 1;
private static final int ADDED = 2;
private static final int CANCELED = 3;
ScanTask(TabletServer server) {
this.server = server;
interruptFlag = new AtomicBoolean(false);
runState = new AtomicReference<>(ScanRunState.QUEUED);
state = new AtomicInteger(INITIAL);
resultQueue = new ArrayBlockingQueue<>(1);
}
protected void addResult(Object o) {
if (state.compareAndSet(INITIAL, ADDED))
resultQueue.add(o);
else if (state.get() == ADDED)
throw new IllegalStateException("Tried to add more than one result");
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!mayInterruptIfRunning)
throw new IllegalArgumentException(
"Cancel will always attempt to interupt running next batch task");
if (state.get() == CANCELED)
return true;
if (state.compareAndSet(INITIAL, CANCELED)) {
interruptFlag.set(true);
resultQueue = null;
return true;
}
return false;
}
@Override
public T get() throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
ArrayBlockingQueue<Object> localRQ = resultQueue;
if (isCancelled())
throw new CancellationException();
if (localRQ == null) {
int st = state.get();
String stateStr;
switch (st) {
case ADDED:
stateStr = "ADDED";
break;
case CANCELED:
stateStr = "CANCELED";
break;
case INITIAL:
stateStr = "INITIAL";
break;
default:
stateStr = "UNKNOWN";
break;
}
throw new IllegalStateException(
"Tried to get result twice [state=" + stateStr + "(" + st + ")]");
}
Object r = localRQ.poll(timeout, unit);
// could have been canceled while waiting
if (isCancelled()) {
if (r != null)
throw new IllegalStateException("Nothing should have been added when in canceled state");
throw new CancellationException();
}
if (r == null)
throw new TimeoutException();
// make this method stop working now that something is being
// returned
resultQueue = null;
if (r instanceof Error)
throw (Error) r; // don't wrap an Error
if (r instanceof Throwable)
throw new ExecutionException((Throwable) r);
@SuppressWarnings("unchecked")
T rAsT = (T) r;
return rAsT;
}
@Override
public boolean isCancelled() {
return state.get() == CANCELED;
}
@Override
public boolean isDone() {
return runState.get().equals(ScanRunState.FINISHED);
}
public ScanRunState getScanRunState() {
return runState.get();
}
}