blob: 61e7b4d48a5a5977ab5858f2ea265990e8ce90e3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.confignode.procedure.exception.ProcedureAbortedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureTimeoutException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.ProcedureState;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* Abstract class of all procedures.
*
* @param <Env>
*/
public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {
private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
public static final long NO_PROC_ID = -1;
public static final long NO_TIMEOUT = -1;
private long parentProcId = NO_PROC_ID;
private long rootProcId = NO_PROC_ID;
private long procId = NO_PROC_ID;
private long submittedTime;
private ProcedureState state = ProcedureState.INITIALIZING;
private int childrenLatch = 0;
private ProcedureException exception;
private volatile long timeout = NO_TIMEOUT;
private volatile long lastUpdate;
private volatile byte[] result = null;
private volatile boolean locked = false;
private boolean lockedWhenLoading = false;
private int[] stackIndexes = null;
private boolean persist = true;
public boolean needPersistance() {
return this.persist;
}
public void resetPersistance() {
this.persist = true;
}
public final void skipPersistance() {
this.persist = false;
}
public final boolean hasLock() {
return locked;
}
// User level code, override it if necessary
/**
* The main code of the procedure. It must be idempotent since execute() may be called multiple
* times in case of machine failure in the middle of the execution.
*
* @param env the environment passed to the ProcedureExecutor
* @return a set of sub-procedures to run or ourselves if there is more work to do or null if the
* procedure is done.
* @throws ProcedureYieldException the procedure will be added back to the queue and retried
* later.
* @throws InterruptedException the procedure will be added back to the queue and retried later.
* @throws ProcedureSuspendedException Signal to the executor that Procedure has suspended itself
* and has set itself up waiting for an external event to wake it back up again.
*/
protected abstract Procedure<Env>[] execute(Env env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException;
/**
* The code to undo what was done by the execute() code. It is called when the procedure or one of
* the sub-procedures failed or an abort was requested. It should cleanup all the resources
* created by the execute() call. The implementation must be idempotent since rollback() may be
* called multiple time in case of machine failure in the middle of the execution.
*
* @param env the environment passed to the ProcedureExecutor
* @throws IOException temporary failure, the rollback will retry later
* @throws InterruptedException the procedure will be added back to the queue and retried later
*/
protected abstract void rollback(Env env)
throws IOException, InterruptedException, ProcedureException;
/**
* The abort() call is asynchronous and each procedure must decide how to deal with it, if they
* want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
* abort() method and then the execute() will check if the abort flag is set or not. abort() may
* be called multiple times from the client, so the implementation must be idempotent.
*
* <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
* procedure implementor abort.
*/
protected abstract boolean abort(Env env);
public void serialize(DataOutputStream stream) throws IOException {
// procid
stream.writeLong(this.procId);
// state
stream.writeInt(this.state.ordinal());
// submit time
stream.writeLong(this.submittedTime);
// last updated
stream.writeLong(this.lastUpdate);
// parent id
stream.writeLong(this.parentProcId);
// time out
stream.writeLong(this.timeout);
// stack indexes
if (stackIndexes != null) {
stream.writeInt(stackIndexes.length);
for (int index : stackIndexes) {
stream.writeInt(index);
}
} else {
stream.writeInt(-1);
}
// exceptions
if (hasException()) {
stream.write((byte) 1);
String exceptionClassName = exception.getClass().getName();
byte[] exceptionClassNameBytes = exceptionClassName.getBytes(StandardCharsets.UTF_8);
stream.writeInt(exceptionClassNameBytes.length);
stream.write(exceptionClassNameBytes);
String message = this.exception.getMessage();
if (message != null) {
byte[] messageBytes = message.getBytes(StandardCharsets.UTF_8);
stream.writeInt(messageBytes.length);
stream.write(messageBytes);
} else {
stream.writeInt(-1);
}
} else {
stream.write((byte) 0);
}
// result
if (result != null) {
stream.writeInt(result.length);
stream.write(result);
} else {
stream.writeInt(-1);
}
// has lock
stream.write(this.hasLock() ? (byte) 1 : (byte) 0);
}
public void deserialize(ByteBuffer byteBuffer) {
// procid
this.setProcId(byteBuffer.getLong());
// state
this.setState(ProcedureState.values()[byteBuffer.getInt()]);
// submit time
this.setSubmittedTime(byteBuffer.getLong());
// last updated
this.setLastUpdate(byteBuffer.getLong());
// parent id
this.setParentProcId(byteBuffer.getLong());
// time out
this.setTimeout(byteBuffer.getLong());
// stack index
int stackIndexesLen = byteBuffer.getInt();
if (stackIndexesLen >= 0) {
List<Integer> indexList = new ArrayList<>(stackIndexesLen);
for (int i = 0; i < stackIndexesLen; i++) {
indexList.add(byteBuffer.getInt());
}
this.setStackIndexes(indexList);
}
// exceptions
if (byteBuffer.get() == 1) {
Class<?> exceptionClass = deserializeTypeInfo(byteBuffer);
int messageBytesLength = byteBuffer.getInt();
String errMsg = null;
if (messageBytesLength > 0) {
byte[] messageBytes = new byte[messageBytesLength];
byteBuffer.get(messageBytes);
errMsg = new String(messageBytes, StandardCharsets.UTF_8);
}
ProcedureException exception;
try {
exception =
(ProcedureException) exceptionClass.getConstructor(String.class).newInstance(errMsg);
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException
| NoSuchMethodException e) {
LOG.warn("Instantiation exception class failed", e);
exception = new ProcedureException(errMsg);
}
setFailure(exception);
}
// result
int resultLen = byteBuffer.getInt();
if (resultLen > 0) {
byte[] resultArr = new byte[resultLen];
byteBuffer.get(resultArr);
}
// has lock
if (byteBuffer.get() == 1) {
this.lockedWhenLoading();
}
}
/**
* Deserialize class Name and load class
*
* @param byteBuffer bytebuffer
* @return Procedure
*/
public static Class<?> deserializeTypeInfo(ByteBuffer byteBuffer) {
int classNameBytesLen = byteBuffer.getInt();
byte[] classNameBytes = new byte[classNameBytesLen];
byteBuffer.get(classNameBytes);
String className = new String(classNameBytes, StandardCharsets.UTF_8);
Class<?> clazz;
try {
clazz = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Invalid procedure class", e);
}
return clazz;
}
public static Procedure newInstance(ByteBuffer byteBuffer) {
Class<?> procedureClass = deserializeTypeInfo(byteBuffer);
Procedure procedure;
try {
procedure = (Procedure) procedureClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Instantiation failed", e);
}
return procedure;
}
/**
* The {@link #doAcquireLock(Object, IProcedureStore)} will be split into two steps, first, it
* will call us to determine whether we need to wait for initialization, second, it will call
* {@link #acquireLock(Object)} to actually handle the lock for this procedure.
*
* @return true means we need to wait until the environment has been initialized, otherwise true.
*/
protected boolean waitInitialized(Env env) {
return false;
}
/**
* Acquire a lock, user should override it if necessary.
*
* @param env environment
* @return state of lock
*/
protected ProcedureLockState acquireLock(Env env) {
return ProcedureLockState.LOCK_ACQUIRED;
}
/**
* Release a lock, user should override it if necessary.
*
* @param env env
*/
protected void releaseLock(Env env) {
// no op
}
/**
* Used to keep procedure lock even when the procedure is yielded or suspended.
*
* @param env env
* @return true if hold the lock
*/
protected boolean holdLock(Env env) {
return false;
}
/**
* Called before the procedure is recovered and added into the queue.
*
* @param env environment
*/
protected final void beforeRecover(Env env) {
// no op
}
/**
* Called when the procedure is recovered and added into the queue.
*
* @param env environment
*/
protected final void afterRecover(Env env) {
// no op
}
/**
* Called when the procedure is completed (success or rollback). The procedure may use this method
* to clean up in-memory states. This operation will not be retried on failure.
*
* @param env environment
*/
protected void completionCleanup(Env env) {
// no op
}
/**
* To make executor yield between each execution step to give other procedures a chance to run.
*
* @param env environment
* @return return true if yield is allowed.
*/
protected boolean isYieldAfterExecution(Env env) {
return false;
}
// -------------------------Internal methods - called by the procedureExecutor------------------
/**
* Internal method called by the ProcedureExecutor that starts the user-level code execute().
*
* @param env execute environment
* @return sub procedures
*/
protected Procedure<Env>[] doExecute(Env env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
updateTimestamp();
return execute(env);
} finally {
updateTimestamp();
}
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code rollback().
*
* @param env execute environment
* @throws IOException ioe
* @throws InterruptedException interrupted exception
*/
public void doRollback(Env env) throws IOException, InterruptedException, ProcedureException {
try {
updateTimestamp();
rollback(env);
} finally {
updateTimestamp();
}
}
/**
* Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
*
* @param env environment
* @param store ProcedureStore
* @return ProcedureLockState
*/
public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
if (waitInitialized(env)) {
return ProcedureLockState.LOCK_EVENT_WAIT;
}
if (lockedWhenLoading) {
lockedWhenLoading = false;
locked = true;
return ProcedureLockState.LOCK_ACQUIRED;
}
ProcedureLockState state = acquireLock(env);
if (state == ProcedureLockState.LOCK_ACQUIRED) {
locked = true;
store.update(this);
}
return state;
}
/**
* Presist lock state of the procedure
*
* @param env environment
* @param store ProcedureStore
*/
public final void doReleaseLock(Env env, IProcedureStore store) {
locked = false;
if (getState() != ProcedureState.ROLLEDBACK) {
store.update(this);
}
releaseLock(env);
}
public final void restoreLock(Env env) {
if (!lockedWhenLoading) {
LOG.debug("{} didn't hold the lock before restarting, skip acquiring lock", this);
return;
}
if (isFinished()) {
LOG.debug("{} is already bypassed, skip acquiring lock.", this);
return;
}
if (getState() == ProcedureState.WAITING && !holdLock(env)) {
LOG.debug("{} is in WAITING STATE, and holdLock= false , skip acquiring lock.", this);
return;
}
LOG.debug("{} held the lock before restarting, call acquireLock to restore it.", this);
acquireLock(env);
}
@Override
public String toString() {
// Return the simple String presentation of the procedure.
return toStringSimpleSB().toString();
}
/**
* Build the StringBuilder for the simple form of procedure string.
*
* @return the StringBuilder
*/
protected StringBuilder toStringSimpleSB() {
final StringBuilder sb = new StringBuilder();
sb.append("pid=");
sb.append(getProcId());
if (hasParent()) {
sb.append(", ppid=");
sb.append(getParentProcId());
}
/*
* TODO
* Enable later when this is being used.
* Currently owner not used.
if (hasOwner()) {
sb.append(", owner=");
sb.append(getOwner());
}*/
sb.append(", state="); // pState for Procedure State as opposed to any other kind.
toStringState(sb);
// Only print out locked if actually locked. Most of the time it is not.
if (this.locked) {
sb.append(", locked=").append(locked);
}
if (hasException()) {
sb.append(", exception=" + getException());
}
sb.append("; ");
toStringClassDetails(sb);
return sb;
}
/** Extend the toString() information with more procedure details */
public String toStringDetails() {
final StringBuilder sb = toStringSimpleSB();
sb.append(" submittedTime=");
sb.append(getSubmittedTime());
sb.append(", lastUpdate=");
sb.append(getLastUpdate());
final int[] stackIndices = getStackIndexes();
if (stackIndices != null) {
sb.append("\n");
sb.append("stackIndexes=");
sb.append(Arrays.toString(stackIndices));
}
return sb.toString();
}
protected String toStringClass() {
StringBuilder sb = new StringBuilder();
toStringClassDetails(sb);
return sb.toString();
}
/**
* Called from {@link #toString()} when interpolating {@link Procedure} State. Allows decorating
* generic Procedure State with Procedure particulars.
*
* @param builder Append current {@link ProcedureState}
*/
protected void toStringState(StringBuilder builder) {
builder.append(getState());
}
/**
* Extend the toString() information with the procedure details e.g. className and parameters
*
* @param builder the string builder to use to append the proc specific information
*/
protected void toStringClassDetails(StringBuilder builder) {
builder.append(getClass().getName());
}
// ==========================================================================
// Those fields are unchanged after initialization.
//
// Each procedure will get created from the user or during
// ProcedureExecutor.start() during the load() phase and then submitted
// to the executor. these fields will never be changed after initialization
// ==========================================================================
public long getProcId() {
return procId;
}
public boolean hasParent() {
return parentProcId != NO_PROC_ID;
}
public long getParentProcId() {
return parentProcId;
}
public long getRootProcId() {
return rootProcId;
}
public String getProcName() {
return toStringClass();
}
public long getSubmittedTime() {
return submittedTime;
}
/** Called by the ProcedureExecutor to assign the ID to the newly created procedure. */
protected void setProcId(long procId) {
this.procId = procId;
}
public void setProcRunnable() {
this.submittedTime = System.currentTimeMillis();
setState(ProcedureState.RUNNABLE);
}
/** Called by the ProcedureExecutor to assign the parent to the newly created procedure. */
protected void setParentProcId(long parentProcId) {
this.parentProcId = parentProcId;
}
protected void setRootProcId(long rootProcId) {
this.rootProcId = rootProcId;
}
/**
* Called on store load to initialize the Procedure internals after the creation/deserialization.
*/
protected void setSubmittedTime(long submittedTime) {
this.submittedTime = submittedTime;
}
// ==========================================================================
// runtime state - timeout related
// ==========================================================================
/** @param timeout timeout interval in msec */
protected void setTimeout(long timeout) {
this.timeout = timeout;
}
public boolean hasTimeout() {
return timeout != NO_TIMEOUT;
}
/** @return the timeout in msec */
public long getTimeout() {
return timeout;
}
/**
* Called on store load to initialize the Procedure internals after the creation/deserialization.
*/
protected void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
/** Called by ProcedureExecutor after each time a procedure step is executed. */
protected void updateTimestamp() {
this.lastUpdate = System.currentTimeMillis();
}
public long getLastUpdate() {
return lastUpdate;
}
/**
* Timeout of the next timeout. Called by the ProcedureExecutor if the procedure has timeout set
* and the procedure is in the waiting queue.
*
* @return the timestamp of the next timeout.
*/
protected long getTimeoutTimestamp() {
return getLastUpdate() + getTimeout();
}
// ==========================================================================
// runtime state
// ==========================================================================
/** @return the time elapsed between the last update and the start time of the procedure. */
public long elapsedTime() {
return getLastUpdate() - getSubmittedTime();
}
/** @return the serialized result if any, otherwise null */
public byte[] getResult() {
return result;
}
/**
* The procedure may leave a "result" on completion.
*
* @param result the serialized result that will be passed to the client
*/
protected void setResult(byte[] result) {
this.result = result;
}
/**
* Will only be called when loading procedures from procedure store, where we need to record
* whether the procedure has already held a lock. Later we will call {@link #restoreLock(Object)}
* to actually acquire the lock.
*/
final void lockedWhenLoading() {
this.lockedWhenLoading = true;
}
/**
* Can only be called when restarting, before the procedure actually being executed, as after we
* actually call the {@link #doAcquireLock(Object, IProcedureStore)} method, we will reset {@link
* #lockedWhenLoading} to false.
*
* <p>Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure
* in front of a queue.
*/
public boolean isLockedWhenLoading() {
return lockedWhenLoading;
}
// ==============================================================================================
// Runtime state, updated every operation by the ProcedureExecutor
//
// There is always 1 thread at the time operating on the state of the procedure.
// The ProcedureExecutor may check and set states, or some Procecedure may
// update its own state. but no concurrent updates. we use synchronized here
// just because the procedure can get scheduled on different executor threads on each step.
// ==============================================================================================
/** @return true if the procedure is in a RUNNABLE state. */
public synchronized boolean isRunnable() {
return state == ProcedureState.RUNNABLE;
}
public synchronized boolean isInitializing() {
return state == ProcedureState.INITIALIZING;
}
/** @return true if the procedure has failed. It may or may not have rolled back. */
public synchronized boolean isFailed() {
return state == ProcedureState.FAILED || state == ProcedureState.ROLLEDBACK;
}
/** @return true if the procedure is finished successfully. */
public synchronized boolean isSuccess() {
return state == ProcedureState.SUCCESS && !hasException();
}
/**
* @return true if the procedure is finished. The Procedure may be completed successfully or
* rolledback.
*/
public synchronized boolean isFinished() {
return isSuccess() || state == ProcedureState.ROLLEDBACK;
}
/** @return true if the procedure is waiting for a child to finish or for an external event. */
public synchronized boolean isWaiting() {
switch (state) {
case WAITING:
case WAITING_TIMEOUT:
return true;
default:
break;
}
return false;
}
protected synchronized void setState(final ProcedureState state) {
this.state = state;
updateTimestamp();
}
public synchronized ProcedureState getState() {
return state;
}
protected void setFailure(final String source, final Throwable cause) {
setFailure(new ProcedureException(source, cause));
}
protected synchronized void setFailure(final ProcedureException exception) {
this.exception = exception;
if (!isFinished()) {
setState(ProcedureState.FAILED);
}
}
protected void setAbortFailure(final String source, final String msg) {
setFailure(source, new ProcedureAbortedException(msg));
}
/**
* Called by the ProcedureExecutor when the timeout set by setTimeout() is expired.
*
* <p>Another usage for this method is to implement retrying. A procedure can set the state to
* {@code WAITING_TIMEOUT} by calling {@code setState} method, and throw a {@link
* ProcedureSuspendedException} to halt the execution of the procedure, and do not forget a call
* {@link #setTimeout(long)} method to set the timeout. And you should also override this method
* to wake up the procedure, and also return false to tell the ProcedureExecutor that the timeout
* event has been handled.
*
* @return true to let the framework handle the timeout as abort, false in case the procedure
* handled the timeout itself.
*/
protected synchronized boolean setTimeoutFailure(Env env) {
if (state == ProcedureState.WAITING_TIMEOUT) {
long timeDiff = System.currentTimeMillis() - lastUpdate;
setFailure(
"ProcedureExecutor",
new ProcedureTimeoutException("Operation timed out after " + timeDiff + " ms."));
return true;
}
return false;
}
public synchronized boolean hasException() {
return exception != null;
}
public synchronized ProcedureException getException() {
return exception;
}
/** Called by the ProcedureExecutor on procedure-load to restore the latch state */
protected synchronized void setChildrenLatch(int numChildren) {
this.childrenLatch = numChildren;
if (LOG.isTraceEnabled()) {
LOG.trace("CHILD LATCH INCREMENT SET " + this.childrenLatch, new Throwable(this.toString()));
}
}
/** Called by the ProcedureExecutor on procedure-load to restore the latch state */
protected synchronized void incChildrenLatch() {
// TODO: can this be inferred from the stack? I think so...
this.childrenLatch++;
if (LOG.isTraceEnabled()) {
LOG.trace("CHILD LATCH INCREMENT " + this.childrenLatch, new Throwable(this.toString()));
}
}
/** Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. */
private synchronized boolean childrenCountDown() {
assert childrenLatch > 0 : this;
boolean b = --childrenLatch == 0;
if (LOG.isTraceEnabled()) {
LOG.trace("CHILD LATCH DECREMENT " + childrenLatch, new Throwable(this.toString()));
}
return b;
}
/**
* Try to set this procedure into RUNNABLE state. Succeeds if all subprocedures/children are done.
*
* @return True if we were able to move procedure to RUNNABLE state.
*/
synchronized boolean tryRunnable() {
// Don't use isWaiting in the below; it returns true for WAITING and WAITING_TIMEOUT
if (getState() == ProcedureState.WAITING && childrenCountDown()) {
setState(ProcedureState.RUNNABLE);
return true;
} else {
return false;
}
}
protected synchronized boolean hasChildren() {
return childrenLatch > 0;
}
protected synchronized int getChildrenLatch() {
return childrenLatch;
}
/**
* Called by the RootProcedureState on procedure execution. Each procedure store its stack-index
* positions.
*/
protected synchronized void addStackIndex(final int index) {
if (stackIndexes == null) {
stackIndexes = new int[] {index};
} else {
int count = stackIndexes.length;
stackIndexes = Arrays.copyOf(stackIndexes, count + 1);
stackIndexes[count] = index;
}
}
protected synchronized boolean removeStackIndex() {
if (stackIndexes != null && stackIndexes.length > 1) {
stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1);
return false;
} else {
stackIndexes = null;
return true;
}
}
/**
* Called on store load to initialize the Procedure internals after the creation/deserialization.
*/
protected synchronized void setStackIndexes(final List<Integer> stackIndexes) {
this.stackIndexes = new int[stackIndexes.size()];
for (int i = 0; i < this.stackIndexes.length; ++i) {
this.stackIndexes[i] = stackIndexes.get(i);
}
}
protected synchronized boolean wasExecuted() {
return stackIndexes != null;
}
protected synchronized int[] getStackIndexes() {
return stackIndexes;
}
/** Helper to lookup the root Procedure ID given a specified procedure. */
protected static long getRootProcedureId(Map<Long, Procedure> procedures, Procedure proc) {
while (proc.hasParent()) {
proc = procedures.get(proc.getParentProcId());
if (proc == null) {
return NO_PROC_ID;
}
}
return proc.getProcId();
}
public void setRootProcedureId(long rootProcedureId) {
this.rootProcId = rootProcedureId;
}
/**
* @param a the first procedure to be compared.
* @param b the second procedure to be compared.
* @return true if the two procedures have the same parent
*/
public static boolean haveSameParent(Procedure<?> a, Procedure<?> b) {
return a.hasParent() && b.hasParent() && (a.getParentProcId() == b.getParentProcId());
}
@Override
public int compareTo(Procedure<Env> other) {
return Long.compare(getProcId(), other.getProcId());
}
}