blob: 6b596fe14f4c3ee7f6e1b74c4b5fa7b29c35d4cd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.states.datanode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.endpoint.HeartbeatEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint.RegisterEndpointTask;
import org.apache.hadoop.ozone.container.common.states.endpoint.VersionEndpointTask;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Class that implements handshake with SCM.
*/
public class RunningDatanodeState implements DatanodeState {
static final Logger
LOG = LoggerFactory.getLogger(RunningDatanodeState.class);
private final SCMConnectionManager connectionManager;
private final Configuration conf;
private final StateContext context;
private CompletionService<EndpointStateMachine.EndPointStates> ecs;
public RunningDatanodeState(Configuration conf,
SCMConnectionManager connectionManager,
StateContext context) {
this.connectionManager = connectionManager;
this.conf = conf;
this.context = context;
}
/**
* Called before entering this state.
*/
@Override
public void onEnter() {
LOG.trace("Entering handshake task.");
}
/**
* Called After exiting this state.
*/
@Override
public void onExit() {
LOG.trace("Exiting handshake task.");
}
/**
* Executes one or more tasks that is needed by this state.
*
* @param executor - ExecutorService
*/
@Override
public void execute(ExecutorService executor) {
ecs = new ExecutorCompletionService<>(executor);
for (EndpointStateMachine endpoint : connectionManager.getValues()) {
Callable<EndpointStateMachine.EndPointStates> endpointTask
= getEndPointTask(endpoint);
if (endpointTask != null) {
ecs.submit(endpointTask);
} else {
// This can happen if a task is taking more time than the timeOut
// specified for the task in await, and when it is completed the task
// has set the state to Shutdown, we may see the state as shutdown
// here. So, we need to Shutdown DatanodeStateMachine.
LOG.error("State is Shutdown in RunningDatanodeState");
context.setState(DatanodeStateMachine.DatanodeStates.SHUTDOWN);
}
}
}
//TODO : Cache some of these tasks instead of creating them
//all the time.
private Callable<EndpointStateMachine.EndPointStates>
getEndPointTask(EndpointStateMachine endpoint) {
switch (endpoint.getState()) {
case GETVERSION:
return new VersionEndpointTask(endpoint, conf, context.getParent()
.getContainer());
case REGISTER:
return RegisterEndpointTask.newBuilder()
.setConfig(conf)
.setEndpointStateMachine(endpoint)
.setContext(context)
.setDatanodeDetails(context.getParent().getDatanodeDetails())
.setOzoneContainer(context.getParent().getContainer())
.build();
case HEARTBEAT:
return HeartbeatEndpointTask.newBuilder()
.setConfig(conf)
.setEndpointStateMachine(endpoint)
.setDatanodeDetails(context.getParent().getDatanodeDetails())
.setContext(context)
.build();
case SHUTDOWN:
break;
default:
throw new IllegalArgumentException("Illegal Argument.");
}
return null;
}
/**
* Computes the next state the container state machine must move to by looking
* at all the state of endpoints.
* <p>
* if any endpoint state has moved to Shutdown, either we have an
* unrecoverable error or we have been told to shutdown. Either case the
* datanode state machine should move to Shutdown state, otherwise we
* remain in the Running state.
*
* @return next container state.
*/
private DatanodeStateMachine.DatanodeStates
computeNextContainerState(
List<Future<EndpointStateMachine.EndPointStates>> results) {
for (Future<EndpointStateMachine.EndPointStates> state : results) {
try {
if (state.get() == EndpointStateMachine.EndPointStates.SHUTDOWN) {
// if any endpoint tells us to shutdown we move to shutdown state.
return DatanodeStateMachine.DatanodeStates.SHUTDOWN;
}
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error in executing end point task.", e);
}
}
return DatanodeStateMachine.DatanodeStates.RUNNING;
}
/**
* Wait for execute to finish.
*
* @param duration - Time
* @param timeUnit - Unit of duration.
*/
@Override
public DatanodeStateMachine.DatanodeStates
await(long duration, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, TimeoutException {
int count = connectionManager.getValues().size();
int returned = 0;
long timeLeft = timeUnit.toMillis(duration);
long startTime = Time.monotonicNow();
List<Future<EndpointStateMachine.EndPointStates>> results = new
LinkedList<>();
while (returned < count && timeLeft > 0) {
Future<EndpointStateMachine.EndPointStates> result =
ecs.poll(timeLeft, TimeUnit.MILLISECONDS);
if (result != null) {
results.add(result);
returned++;
}
timeLeft = timeLeft - (Time.monotonicNow() - startTime);
}
return computeNextContainerState(results);
}
}