blob: 2071fcc8b25e48d93a805e7b82d8b80ec98ca7ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.execution.fragment;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.StateMachine;
import org.apache.iotdb.db.mpp.execution.StateMachine.StateChangeListener;
import org.apache.iotdb.db.utils.SetThreadName;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.ABORTED;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.CANCELLED;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FAILED;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FINISHED;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.FLUSHING;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.RUNNING;
import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState.TERMINAL_INSTANCE_STATES;
@ThreadSafe
public class FragmentInstanceStateMachine {
private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceStateMachine.class);
private final long createdTime = System.currentTimeMillis();
private final FragmentInstanceId instanceId;
private final Executor executor;
private final StateMachine<FragmentInstanceState> instanceState;
private final LinkedBlockingQueue<Throwable> failureCauses = new LinkedBlockingQueue<>();
@GuardedBy("this")
private final Map<FragmentInstanceId, Throwable> sourceInstanceFailures = new HashMap<>();
@GuardedBy("this")
private final List<FragmentInstanceFailureListener> sourceInstanceFailureListeners =
new ArrayList<>();
public FragmentInstanceStateMachine(FragmentInstanceId fragmentInstanceId, Executor executor) {
this.instanceId = requireNonNull(fragmentInstanceId, "fragmentInstanceId is null");
this.executor = requireNonNull(executor, "executor is null");
instanceState =
new StateMachine<>(
"FragmentInstance " + fragmentInstanceId, executor, RUNNING, TERMINAL_INSTANCE_STATES);
if (LOGGER.isDebugEnabled()) {
instanceState.addStateChangeListener(
newState -> {
try (SetThreadName threadName = new SetThreadName(fragmentInstanceId.getFullId())) {
LOGGER.debug("[StateChanged] To {}", newState);
}
});
}
}
public long getCreatedTime() {
return createdTime;
}
public FragmentInstanceId getFragmentInstanceId() {
return instanceId;
}
public FragmentInstanceState getState() {
return instanceState.get();
}
public ListenableFuture<FragmentInstanceState> getStateChange(
FragmentInstanceState currentState) {
requireNonNull(currentState, "currentState is null");
checkArgument(!currentState.isDone(), "Current state is already done");
ListenableFuture<FragmentInstanceState> future = instanceState.getStateChange(currentState);
FragmentInstanceState state = instanceState.get();
if (state.isDone()) {
return immediateFuture(state);
}
return future;
}
public LinkedBlockingQueue<Throwable> getFailureCauses() {
return failureCauses;
}
public void transitionToFlushing() {
instanceState.setIf(FLUSHING, currentState -> currentState == RUNNING);
}
public void finished() {
transitionToDoneState(FINISHED);
}
public void cancel() {
transitionToDoneState(CANCELLED);
}
public void abort() {
transitionToDoneState(ABORTED);
}
public void failed(Throwable cause) {
failureCauses.add(cause);
transitionToDoneState(FAILED);
}
private void transitionToDoneState(FragmentInstanceState doneState) {
requireNonNull(doneState, "doneState is null");
checkArgument(doneState.isDone(), "doneState %s is not a done state", doneState);
instanceState.setIf(doneState, currentState -> !currentState.isDone());
}
/**
* Listener is always notified asynchronously using a dedicated notification thread pool so, care
* should be taken to avoid leaking {@code this} when adding a listener in a constructor.
* Additionally, it is possible notifications are observed out of order due to the asynchronous
* execution.
*/
public void addStateChangeListener(
StateChangeListener<FragmentInstanceState> stateChangeListener) {
instanceState.addStateChangeListener(stateChangeListener);
}
public void addSourceTaskFailureListener(FragmentInstanceFailureListener listener) {
Map<FragmentInstanceId, Throwable> failures;
synchronized (this) {
sourceInstanceFailureListeners.add(listener);
failures = ImmutableMap.copyOf(sourceInstanceFailures);
}
executor.execute(
() -> {
failures.forEach(listener::onTaskFailed);
});
}
public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable failure) {
List<FragmentInstanceFailureListener> listeners;
synchronized (this) {
sourceInstanceFailures.putIfAbsent(instanceId, failure);
listeners = ImmutableList.copyOf(sourceInstanceFailureListeners);
}
executor.execute(
() -> {
for (FragmentInstanceFailureListener listener : listeners) {
listener.onTaskFailed(instanceId, failure);
}
});
}
@Override
public String toString() {
return toStringHelper(this)
.add("FragmentInstanceId", instanceId)
.add("FragmentInstanceState", instanceState)
.add("failureCauses", failureCauses)
.toString();
}
}