blob: 042dc2840aa8175ef278c5bb167a362061f3563d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.actionmanager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.orm.dao.HostDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.StageDAO;
import org.apache.ambari.server.orm.entities.HostEntity;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
import org.apache.ambari.server.utils.StageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Injector;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
//This class encapsulates the stage. The stage encapsulates all the information
//required to persist an action.
public class Stage {
private static Logger LOG = LoggerFactory.getLogger(Stage.class);
private final long requestId;
private final String clusterName;
private long stageId = -1;
private final String logDir;
private int taskTimeout = -1;
private int perTaskTimeFactor = 60000;
//Map of roles to successFactors for this stage. Default is 1 i.e. 100%
private Map<Role, Float> successFactors = new HashMap<Role, Float>();
//Map of host to host-roles
Map<String, Map<String, HostRoleCommand>> hostRoleCommands =
new TreeMap<String, Map<String, HostRoleCommand>>();
private Map<String, List<ExecutionCommandWrapper>> commandsToSend =
new TreeMap<String, List<ExecutionCommandWrapper>>();
@AssistedInject
public Stage(@Assisted long requestId, @Assisted("logDir") String logDir, @Assisted("clusterName") String clusterName) {
this.requestId = requestId;
this.logDir = logDir;
this.clusterName = clusterName;
}
/**
* Creates Stage existing in database
* @param actionId "requestId-stageId" string
*/
@AssistedInject
public Stage(@Assisted String actionId, Injector injector) {
this(injector.getInstance(StageDAO.class).findByActionId(actionId), injector);
}
@AssistedInject
public Stage(@Assisted StageEntity stageEntity, Injector injector) {
HostRoleCommandDAO hostRoleCommandDAO = injector.getInstance(HostRoleCommandDAO.class);
HostDAO hostDAO = injector.getInstance(HostDAO.class);
HostRoleCommandFactory hostRoleCommandFactory = injector.getInstance(HostRoleCommandFactory.class);
requestId = stageEntity.getRequestId();
stageId = stageEntity.getStageId();
logDir = stageEntity.getLogInfo();
clusterName = stageEntity.getCluster().getClusterName();
for (HostEntity hostEntity : hostDAO.findByStage(stageEntity)) {
List<HostRoleCommandEntity> commands = hostRoleCommandDAO.findSortedCommandsByStageAndHost(stageEntity, hostEntity);
commandsToSend.put(hostEntity.getHostName(), new ArrayList<ExecutionCommandWrapper>());
hostRoleCommands.put(hostEntity.getHostName(), new TreeMap<String, HostRoleCommand>());
for (HostRoleCommandEntity command : commands) {
HostRoleCommand hostRoleCommand = hostRoleCommandFactory.createExisting(command);
hostRoleCommands.get(hostEntity.getHostName()).put(hostRoleCommand.getRole().toString(), hostRoleCommand);
commandsToSend.get(hostEntity.getHostName()).add(hostRoleCommand.getExecutionCommandWrapper());
}
}
for (RoleSuccessCriteriaEntity successCriteriaEntity : stageEntity.getRoleSuccessCriterias()) {
successFactors.put(successCriteriaEntity.getRole(), successCriteriaEntity.getSuccessFactor().floatValue());
}
}
/**
* Creates object to be persisted in database
* @return StageEntity
*/
public synchronized StageEntity constructNewPersistenceEntity() {
StageEntity stageEntity = new StageEntity();
stageEntity.setRequestId(requestId);
stageEntity.setStageId(getStageId());
stageEntity.setLogInfo(logDir);
stageEntity.setHostRoleCommands(new ArrayList<HostRoleCommandEntity>());
stageEntity.setRoleSuccessCriterias(new ArrayList<RoleSuccessCriteriaEntity>());
for (Role role : successFactors.keySet()) {
RoleSuccessCriteriaEntity roleSuccessCriteriaEntity = new RoleSuccessCriteriaEntity();
roleSuccessCriteriaEntity.setRole(role);
roleSuccessCriteriaEntity.setStage(stageEntity);
roleSuccessCriteriaEntity.setSuccessFactor(successFactors.get(role).doubleValue());
stageEntity.getRoleSuccessCriterias().add(roleSuccessCriteriaEntity);
}
return stageEntity;
}
public List<HostRoleCommand> getOrderedHostRoleCommands() {
List<HostRoleCommand> commands = new ArrayList<HostRoleCommand>();
//TODO trick for proper storing order, check it
for (String hostName : hostRoleCommands.keySet()) {
for (ExecutionCommandWrapper executionCommandWrapper : commandsToSend.get(hostName)) {
for (HostRoleCommand hostRoleCommand : hostRoleCommands.get(hostName).values()) {
if (hostRoleCommand.getExecutionCommandWrapper() == executionCommandWrapper) {
commands.add(hostRoleCommand);
}
}
}
}
return commands;
}
public synchronized void setStageId(long stageId) {
if (this.stageId != -1) {
throw new RuntimeException("Attempt to set stageId again! Not allowed.");
}
this.stageId = stageId;
for (String host: this.commandsToSend.keySet()) {
for (ExecutionCommandWrapper wrapper : this.commandsToSend.get(host)) {
ExecutionCommand cmd = wrapper.getExecutionCommand();
cmd.setCommandId(StageUtils.getActionId(requestId, stageId));
}
}
}
public synchronized long getStageId() {
return stageId;
}
public String getActionId() {
return StageUtils.getActionId(requestId, getStageId());
}
/**
* A new host role command is created for execution.
* Creates both ExecutionCommand and HostRoleCommand objects and
* adds them to the Stage. This should be called only once for a host-role
* for a given stage.
*/
public synchronized void addHostRoleExecutionCommand(String host, Role role, RoleCommand command,
ServiceComponentHostEvent event, String clusterName, String serviceName) {
HostRoleCommand hrc = new HostRoleCommand(host, role, event, command);
ExecutionCommand cmd = new ExecutionCommand();
ExecutionCommandWrapper wrapper = new ExecutionCommandWrapper(cmd);
hrc.setExecutionCommandWrapper(wrapper);
cmd.setHostname(host);
cmd.setClusterName(clusterName);
cmd.setServiceName(serviceName);
cmd.setCommandId(this.getActionId());
cmd.setRole(role);
cmd.setRoleCommand(command);
Map<String, HostRoleCommand> hrcMap = this.hostRoleCommands.get(host);
if (hrcMap == null) {
hrcMap = new TreeMap<String, HostRoleCommand>();
this.hostRoleCommands.put(host, hrcMap);
}
if (hrcMap.get(role.toString()) != null) {
throw new RuntimeException(
"Setting the host role command second time for same stage: stage="
+ this.getActionId() + ", host=" + host + ", role=" + role);
}
hrcMap.put(role.toString(), hrc);
List<ExecutionCommandWrapper> execCmdList = this.commandsToSend.get(host);
if (execCmdList == null) {
execCmdList = new ArrayList<ExecutionCommandWrapper>();
this.commandsToSend.put(host, execCmdList);
}
if (execCmdList.contains(wrapper)) {
//todo: proper exception
throw new RuntimeException(
"Setting the execution command second time for same stage: stage="
+ this.getActionId() + ", host=" + host + ", role=" + role);
}
execCmdList.add(wrapper);
}
/**
*
* @return list of hosts
*/
public synchronized List<String> getHosts() { // TODO: Check whether method should be synchronized
List<String> hlist = new ArrayList<String>();
for (String h : this.hostRoleCommands.keySet()) {
hlist.add(h);
}
return hlist;
}
synchronized float getSuccessFactor(Role r) {
Float f = successFactors.get(r);
if (f == null) {
if (r.equals(Role.DATANODE) || r.equals(Role.TASKTRACKER) || r.equals(Role.GANGLIA_MONITOR) ||
r.equals(Role.HBASE_REGIONSERVER)) {
return (float) 0.5;
} else {
return 1;
}
} else {
return f;
}
}
public synchronized void setSuccessFactors(Map<Role, Float> suc) {
successFactors = suc;
}
public synchronized Map<Role, Float> getSuccessFactors() {
return successFactors;
}
public long getRequestId() {
return requestId;
}
public String getClusterName() {
return clusterName;
}
public long getLastAttemptTime(String host, String role) {
return this.hostRoleCommands.get(host).get(role).getLastAttemptTime();
}
public short getAttemptCount(String host, String role) {
return this.hostRoleCommands.get(host).get(role).getAttemptCount();
}
public void incrementAttemptCount(String hostname, String role) {
this.hostRoleCommands.get(hostname).get(role).incrementAttemptCount();
}
public void setLastAttemptTime(String host, String role, long t) {
this.hostRoleCommands.get(host).get(role).setLastAttemptTime(t);
}
public ExecutionCommandWrapper getExecutionCommandWrapper(String hostname,
String role) {
HostRoleCommand hrc = hostRoleCommands.get(hostname).get(role);
if (hrc != null) {
return hrc.getExecutionCommandWrapper();
} else {
return null;
}
}
public List<ExecutionCommandWrapper> getExecutionCommands(String hostname) {
return commandsToSend.get(hostname);
}
public long getStartTime(String hostname, String role) {
return this.hostRoleCommands.get(hostname).get(role).getStartTime();
}
public void setStartTime(String hostname, String role, long startTime) {
this.hostRoleCommands.get(hostname).get(role).setStartTime(startTime);
}
public HostRoleStatus getHostRoleStatus(String hostname, String role) {
return this.hostRoleCommands.get(hostname).get(role).getStatus();
}
public void setHostRoleStatus(String host, String role,
HostRoleStatus status) {
this.hostRoleCommands.get(host).get(role).setStatus(status);
}
public ServiceComponentHostEventWrapper getFsmEvent(String hostname, String roleStr) {
return this.hostRoleCommands.get(hostname).get(roleStr).getEvent();
}
public void setExitCode(String hostname, String role, int exitCode) {
this.hostRoleCommands.get(hostname).get(role).setExitCode(exitCode);
}
public int getExitCode(String hostname, String role) {
return this.hostRoleCommands.get(hostname).get(role).getExitCode();
}
public void setStderr(String hostname, String role, String stdErr) {
this.hostRoleCommands.get(hostname).get(role).setStderr(stdErr);
}
public void setStdout(String hostname, String role, String stdOut) {
this.hostRoleCommands.get(hostname).get(role).setStdout(stdOut);
}
public synchronized boolean isStageInProgress() {
for(String host: hostRoleCommands.keySet()) {
for (String role : hostRoleCommands.get(host).keySet()) {
HostRoleCommand hrc = hostRoleCommands.get(host).get(role);
if (hrc == null) {
return false;
}
if (hrc.getStatus().equals(HostRoleStatus.PENDING) ||
hrc.getStatus().equals(HostRoleStatus.QUEUED) ||
hrc.getStatus().equals(HostRoleStatus.IN_PROGRESS)) {
return true;
}
}
}
return false;
}
public synchronized boolean doesStageHaveHostRoleStatus(
Set<HostRoleStatus> statuses) {
for(String host: hostRoleCommands.keySet()) {
for (String role : hostRoleCommands.get(host).keySet()) {
HostRoleCommand hrc = hostRoleCommands.get(host).get(role);
if (hrc == null) {
return false;
}
for (HostRoleStatus status : statuses)
if (hrc.getStatus().equals(status)) {
return true;
}
}
}
return false;
}
public Map<String, List<ExecutionCommandWrapper>> getExecutionCommands() {
return this.commandsToSend;
}
public String getLogDir() {
return this.logDir;
}
public Map<String, Map<String, HostRoleCommand>> getHostRoleCommands() {
return hostRoleCommands;
}
/**
* This method should be used only in stage planner. To add
* a new execution command use
* {@link #addHostRoleExecutionCommand(String, Role, RoleCommand,
* ServiceComponentHostEvent, String, String)}
*/
public synchronized void addExecutionCommandWrapper(Stage origStage,
String hostname, Role r) {
String role = r.toString();
if (commandsToSend.get(hostname) == null) {
commandsToSend.put(hostname, new ArrayList<ExecutionCommandWrapper>());
}
commandsToSend.get(hostname).add(
origStage.getExecutionCommandWrapper(hostname, role));
if (hostRoleCommands.get(hostname) == null) {
hostRoleCommands.put(hostname, new TreeMap<String, HostRoleCommand>());
}
// TODO add reference to ExecutionCommand into HostRoleCommand
hostRoleCommands.get(hostname).put(role,
origStage.getHostRoleCommand(hostname, role));
}
HostRoleCommand getHostRoleCommand(String hostname, String role) {
return hostRoleCommands.get(hostname).get(role);
}
public synchronized int getTaskTimeout() {
if (taskTimeout == -1) {
int maxTasks = 0;
for (String host: commandsToSend.keySet()) {
if (commandsToSend.get(host).size() > maxTasks) {
maxTasks = commandsToSend.get(host).size();
}
}
taskTimeout = maxTasks * perTaskTimeFactor;
}
return taskTimeout;
}
@Override //Object
public synchronized String toString() {
StringBuilder builder = new StringBuilder();
builder.append("STAGE DESCRIPTION BEGIN\n");
builder.append("requestId="+requestId+"\n");
builder.append("stageId="+stageId+"\n");
builder.append("clusterName="+clusterName+"\n");
builder.append("logDir=" + logDir+"\n");
builder.append("Success Factors:\n");
for (Role r : successFactors.keySet()) {
builder.append(" role: "+r+", factor: "+successFactors.get(r)+"\n");
}
for (HostRoleCommand hostRoleCommand : getOrderedHostRoleCommands()) {
builder.append("HOST: ").append(hostRoleCommand.getHostName()).append(" :\n");
builder.append(hostRoleCommand.getExecutionCommandWrapper().getJson());
builder.append("\n");
builder.append(hostRoleCommand.toString());
builder.append("\n");
}
builder.append("STAGE DESCRIPTION END\n");
return builder.toString();
}
}