blob: 2448c9992b613d2acf4443b0d703159f134e4238 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ambari.server.agent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.Role;
import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.ServiceComponentHostNotFoundException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.ServiceNotFoundException;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.controller.MaintenanceStateHelper;
import org.apache.ambari.server.events.ActionFinalReportReceivedEvent;
import org.apache.ambari.server.events.AlertEvent;
import org.apache.ambari.server.events.AlertReceivedEvent;
import org.apache.ambari.server.events.HostComponentVersionAdvertisedEvent;
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.events.publishers.VersionEventPublisher;
import org.apache.ambari.server.metadata.ActionMetadata;
import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
import org.apache.ambari.server.state.Alert;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostHealthStatus;
import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.SecurityState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.UpgradeState;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.scheduler.RequestExecution;
import org.apache.ambari.server.state.stack.upgrade.Direction;
import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpFailedEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpInProgressEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostOpSucceededEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartedEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEvent;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.gson.annotations.SerializedName;
import com.google.inject.Inject;
import com.google.inject.Injector;
/**
* HeartbeatProcessor class is used for bulk processing data retrieved from agents in background
*
*/
public class HeartbeatProcessor extends AbstractService{
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatProcessor.class);
private ScheduledExecutorService executor;
private ConcurrentLinkedQueue<HeartBeat> heartBeatsQueue = new ConcurrentLinkedQueue<>();
private volatile boolean shouldRun = true;
//TODO rewrite to correlate with heartbeat frequency, hardcoded in agent as of now
private long delay = 5000;
private long period = 1000;
private int poolSize = 1;
private Clusters clusterFsm;
private HeartbeatMonitor heartbeatMonitor;
private Injector injector;
private ActionManager actionManager;
/**
* Publishes {@link AlertEvent} instances.
*/
@Inject
AlertEventPublisher alertEventPublisher;
@Inject
AmbariEventPublisher ambariEventPublisher;
@Inject
VersionEventPublisher versionEventPublisher;
@Inject
ActionMetadata actionMetadata;
@Inject
MaintenanceStateHelper maintenanceStateHelper;
@Inject
AmbariMetaInfo ambariMetaInfo;
@Inject
KerberosPrincipalHostDAO kerberosPrincipalHostDAO;
@Inject
Gson gson;
@Inject
public HeartbeatProcessor(Clusters clusterFsm, ActionManager am, HeartbeatMonitor heartbeatMonitor,
Injector injector) {
injector.injectMembers(this);
this.injector = injector;
this.heartbeatMonitor = heartbeatMonitor;
this.clusterFsm = clusterFsm;
actionManager = am;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("ambari-heartbeat-processor-%d").build();
executor = Executors.newScheduledThreadPool(poolSize, threadFactory);
}
@Override
protected void doStart() {
LOG.info("**** Starting heartbeats processing threads ****");
for (int i=0; i< poolSize; i++) {
executor.scheduleAtFixedRate(new HeartbeatProcessingTask(), delay, period, TimeUnit.MILLISECONDS);
}
}
@Override
protected void doStop() {
LOG.info("**** Stopping heartbeats processing threads ****");
shouldRun = false;
executor.shutdown();
}
public void addHeartbeat(HeartBeat heartBeat) {
heartBeatsQueue.add(heartBeat);
}
private HeartBeat pollHeartbeat() {
return heartBeatsQueue.poll();
}
/**
* Processing task to be scheduled for execution
*/
private class HeartbeatProcessingTask implements Runnable {
@Override
public void run() {
while (shouldRun) {
try {
HeartBeat heartbeat = pollHeartbeat();
if (heartbeat == null) {
break;
}
processHeartbeat(heartbeat);
} catch (Exception e) {
LOG.error("Exception received while processing heartbeat", e);
} catch (Throwable throwable) {
//catch everything to prevent task suppression
LOG.error("ERROR: ", throwable);
}
}
}
}
/**
* Incapsulates logic for processing data from agent heartbeat
* @param heartbeat Agent heartbeat object
* @throws AmbariException
*/
public void processHeartbeat(HeartBeat heartbeat) throws AmbariException {
long now = System.currentTimeMillis();
processAlerts(heartbeat);
//process status reports before command reports to prevent status override immediately after task finish
processStatusReports(heartbeat);
processCommandReports(heartbeat, now);
//host status calculation are based on task and status reports, should be performed last
processHostStatus(heartbeat);
}
/**
* Extracts all of the {@link Alert}s from the heartbeat and fires
* {@link AlertEvent}s for each one. If there is a problem looking up the
* cluster, then alerts will not be processed.
*
* @param heartbeat
* the heartbeat to process.
*/
protected void processAlerts(HeartBeat heartbeat) {
if (heartbeat == null) {
return;
}
String hostname = heartbeat.getHostname();
if (null != heartbeat.getAlerts()) {
AlertEvent event = new AlertReceivedEvent(heartbeat.getAlerts());
for (Alert alert : event.getAlerts()) {
if (alert.getHostName() == null) {
alert.setHostName(hostname);
}
}
alertEventPublisher.publish(event);
}
}
/**
* Update host status basing on components statuses
* @param heartbeat heartbeat to process
* @throws AmbariException
*/
protected void processHostStatus(HeartBeat heartbeat) throws AmbariException {
String hostname = heartbeat.getHostname();
Host host = clusterFsm.getHost(hostname);
HostHealthStatus.HealthStatus healthStatus = host.getHealthStatus().getHealthStatus();
if (!healthStatus.equals(HostHealthStatus.HealthStatus.UNKNOWN)) {
List<ComponentStatus> componentStatuses = heartbeat.getComponentStatus();
//Host status info could be calculated only if agent returned statuses in heartbeat
//Or, if a command is executed that can change component status
boolean calculateHostStatus = false;
String clusterName = null;
if (componentStatuses.size() > 0) {
calculateHostStatus = true;
for (ComponentStatus componentStatus : componentStatuses) {
clusterName = componentStatus.getClusterName();
break;
}
}
if (!calculateHostStatus) {
List<CommandReport> reports = heartbeat.getReports();
for (CommandReport report : reports) {
if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand())) {
continue;
}
String service = report.getServiceName();
if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
continue;
}
if (report.getStatus().equals("COMPLETED")) {
calculateHostStatus = true;
clusterName = report.getClusterName();
break;
}
}
}
if (calculateHostStatus) {
//Use actual component status to compute the host status
int masterCount = 0;
int mastersRunning = 0;
int slaveCount = 0;
int slavesRunning = 0;
StackId stackId;
Cluster cluster = clusterFsm.getCluster(clusterName);
stackId = cluster.getDesiredStackVersion();
List<ServiceComponentHost> scHosts = cluster.getServiceComponentHosts(heartbeat.getHostname());
for (ServiceComponentHost scHost : scHosts) {
ComponentInfo componentInfo =
ambariMetaInfo.getComponent(stackId.getStackName(),
stackId.getStackVersion(), scHost.getServiceName(),
scHost.getServiceComponentName());
String status = scHost.getState().name();
String category = componentInfo.getCategory();
if (MaintenanceState.OFF == maintenanceStateHelper.getEffectiveState(scHost, host)) {
if (category.equals("MASTER")) {
++masterCount;
if (status.equals("STARTED")) {
++mastersRunning;
}
} else if (category.equals("SLAVE")) {
++slaveCount;
if (status.equals("STARTED")) {
++slavesRunning;
}
}
}
}
if (masterCount == mastersRunning && slaveCount == slavesRunning) {
healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
} else if (masterCount > 0 && mastersRunning < masterCount) {
healthStatus = HostHealthStatus.HealthStatus.UNHEALTHY;
} else {
healthStatus = HostHealthStatus.HealthStatus.ALERT;
}
host.setStatus(healthStatus.name());
}
//If host doesn't belong to any cluster
if ((clusterFsm.getClustersForHost(host.getHostName())).size() == 0) {
healthStatus = HostHealthStatus.HealthStatus.HEALTHY;
host.setStatus(healthStatus.name());
}
}
}
/**
* Process reports of tasks executed on agents
* @param heartbeat heartbeat to process
* @param now cached current time
* @throws AmbariException
*/
protected void processCommandReports(
HeartBeat heartbeat, long now)
throws AmbariException {
String hostname = heartbeat.getHostname();
List<CommandReport> reports = heartbeat.getReports();
// Cache HostRoleCommand entities because we will need them few times
List<Long> taskIds = new ArrayList<Long>();
for (CommandReport report : reports) {
taskIds.add(report.getTaskId());
}
Map<Long, HostRoleCommand> commands = actionManager.getTasksMap(taskIds);
for (CommandReport report : reports) {
Long clusterId = null;
if (report.getClusterName() != null) {
try {
Cluster cluster = clusterFsm.getCluster(report.getClusterName());
clusterId = cluster.getClusterId();
} catch (AmbariException e) {
}
}
LOG.debug("Received command report: " + report);
Host host = clusterFsm.getHost(hostname);
// HostEntity hostEntity = hostDAO.findByName(hostname); //don't touch database
if (host == null) {
LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostname);
continue;
}
// Send event for final command reports for actions
if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
clusterId, hostname, report, false);
ambariEventPublisher.publish(event);
}
// Fetch HostRoleCommand that corresponds to a given task ID
HostRoleCommand hostRoleCommand = commands.get(report.getTaskId());
if (hostRoleCommand == null) {
LOG.warn("Can't fetch HostRoleCommand with taskId = " + report.getTaskId());
} else {
// Skip sending events for command reports for ABORTed commands
if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
continue;
}
if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
report.getStatus().equals("IN_PROGRESS")) {
hostRoleCommand.setStartTime(now);
// Because the task may be retried several times, set the original start time only once.
if (hostRoleCommand.getOriginalStartTime() == -1) {
hostRoleCommand.setOriginalStartTime(now);
}
}
}
// If the report indicates the keytab file was successfully transferred to a host or removed
// from a host, record this for future reference
if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
String customCommand = report.getCustomCommand();
boolean adding = "SET_KEYTAB".equalsIgnoreCase(customCommand);
if (adding || "REMOVE_KEYTAB".equalsIgnoreCase(customCommand)) {
WriteKeytabsStructuredOut writeKeytabsStructuredOut;
try {
writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
} catch (JsonSyntaxException ex) {
//Json structure was incorrect do nothing, pass this data further for processing
writeKeytabsStructuredOut = null;
}
if (writeKeytabsStructuredOut != null) {
Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
if (keytabs != null) {
for (Map.Entry<String, String> entry : keytabs.entrySet()) {
String principal = entry.getKey();
if (!kerberosPrincipalHostDAO.exists(principal, host.getHostId())) {
if (adding) {
kerberosPrincipalHostDAO.create(principal, host.getHostId());
} else if ("_REMOVED_".equalsIgnoreCase(entry.getValue())) {
kerberosPrincipalHostDAO.remove(principal, host.getHostId());
}
}
}
}
}
}
}
//pass custom START, STOP and RESTART
if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
!("RESTART".equals(report.getCustomCommand()) ||
"START".equals(report.getCustomCommand()) ||
"STOP".equals(report.getCustomCommand())))) {
continue;
}
Cluster cl = clusterFsm.getCluster(report.getClusterName());
String service = report.getServiceName();
if (service == null || service.isEmpty()) {
throw new AmbariException("Invalid command report, service: " + service);
}
if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
LOG.debug(report.getRole() + " is an action - skip component lookup");
} else {
try {
Service svc = cl.getService(service);
ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostname);
String schName = scHost.getServiceComponentName();
if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
// Reading component version if it is present
if (StringUtils.isNotBlank(report.getStructuredOut())) {
ComponentVersionStructuredOut structuredOutput = null;
try {
structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
} catch (JsonSyntaxException ex) {
//Json structure for component version was incorrect
//do nothing, pass this data further for processing
}
String newVersion = structuredOutput == null ? null : structuredOutput.version;
Long repoVersionId = structuredOutput == null ? null : structuredOutput.repositoryVersionId;
HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(
cl, scHost, newVersion, repoVersionId);
versionEventPublisher.publish(event);
}
// Updating stack version, if needed (this is not actually for express/rolling upgrades!)
if (scHost.getState().equals(org.apache.ambari.server.state.State.UPGRADING)) {
scHost.setStackVersion(scHost.getDesiredStackVersion());
} else if ((report.getRoleCommand().equals(RoleCommand.START.toString()) ||
(report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
("START".equals(report.getCustomCommand()) ||
"RESTART".equals(report.getCustomCommand()))))
&& null != report.getConfigurationTags()
&& !report.getConfigurationTags().isEmpty()) {
LOG.info("Updating applied config on service " + scHost.getServiceName() +
", component " + scHost.getServiceComponentName() + ", host " + scHost.getHostName());
scHost.updateActualConfigs(report.getConfigurationTags());
scHost.setRestartRequired(false);
}
// Necessary for resetting clients stale configs after starting service
if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
"INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
scHost.updateActualConfigs(report.getConfigurationTags());
scHost.setRestartRequired(false);
}
if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
!("START".equals(report.getCustomCommand()) ||
"STOP".equals(report.getCustomCommand()))) {
//do not affect states for custom commands except START and STOP
//lets status commands to be responsible for this
continue;
}
if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
"START".equals(report.getCustomCommand()))) {
scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
hostname, now));
scHost.setRestartRequired(false);
} else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
"STOP".equals(report.getCustomCommand()))) {
scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
hostname, now));
} else {
scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
hostname, now));
}
} else if (report.getStatus().equals("FAILED")) {
if (StringUtils.isNotBlank(report.getStructuredOut())) {
try {
ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
if (null != structuredOutput.upgradeDirection) {
scHost.setUpgradeState(UpgradeState.FAILED);
}
} catch (JsonSyntaxException ex) {
LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
}
}
LOG.error("Operation failed - may be retried. Service component host: "
+ schName + ", host: " + hostname + " Action id " + report.getActionId() + " and Task id " + report.getTaskId());
if (actionManager.isInProgressCommand(report)) {
scHost.handleEvent(new ServiceComponentHostOpFailedEvent
(schName, hostname, now));
} else {
LOG.info("Received report for a command that is no longer active. " + report);
}
} else if (report.getStatus().equals("IN_PROGRESS")) {
scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
hostname, now));
}
} catch (ServiceComponentNotFoundException scnex) {
LOG.warn("Service component not found ", scnex);
} catch (InvalidStateTransitionException ex) {
if (LOG.isDebugEnabled()) {
LOG.warn("State machine exception.", ex);
} else {
LOG.warn("State machine exception. " + ex.getMessage());
}
}
}
}
//Update state machines from reports
actionManager.processTaskResponse(hostname, reports, commands);
}
/**
* Process reports of status commands
* @param heartbeat heartbeat to process
* @throws AmbariException
*/
protected void processStatusReports(HeartBeat heartbeat) throws AmbariException {
String hostname = heartbeat.getHostname();
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
for (Cluster cl : clusters) {
for (ComponentStatus status : heartbeat.componentStatus) {
if (status.getClusterName().equals(cl.getClusterName())) {
try {
Service svc = cl.getService(status.getServiceName());
String componentName = status.getComponentName();
if (svc.getServiceComponents().containsKey(componentName)) {
ServiceComponent svcComp = svc.getServiceComponent(
componentName);
ServiceComponentHost scHost = svcComp.getServiceComponentHost(
hostname);
org.apache.ambari.server.state.State prevState = scHost.getState();
org.apache.ambari.server.state.State liveState =
org.apache.ambari.server.state.State.valueOf(org.apache.ambari.server.state.State.class,
status.getStatus());
//ignore reports from status commands if component is in INIT or any "in progress" state
if (prevState.equals(org.apache.ambari.server.state.State.INSTALLED)
|| prevState.equals(org.apache.ambari.server.state.State.STARTED)
|| prevState.equals(org.apache.ambari.server.state.State.UNKNOWN)) {
scHost.setState(liveState);
if (!prevState.equals(liveState)) {
LOG.info("State of service component " + componentName
+ " of service " + status.getServiceName()
+ " of cluster " + status.getClusterName()
+ " has changed from " + prevState + " to " + liveState
+ " at host " + hostname
+ " according to STATUS_COMMAND report");
}
}
SecurityState prevSecurityState = scHost.getSecurityState();
SecurityState currentSecurityState = SecurityState.valueOf(status.getSecurityState());
if((prevSecurityState != currentSecurityState)) {
if(prevSecurityState.isEndpoint()) {
scHost.setSecurityState(currentSecurityState);
LOG.info(String.format("Security of service component %s of service %s of cluster %s " +
"has changed from %s to %s on host %s",
componentName, status.getServiceName(), status.getClusterName(), prevSecurityState,
currentSecurityState, hostname));
}
else {
LOG.debug(String.format("Security of service component %s of service %s of cluster %s " +
"has changed from %s to %s on host %s but will be ignored since %s is a " +
"transitional state",
componentName, status.getServiceName(), status.getClusterName(),
prevSecurityState, currentSecurityState, hostname, prevSecurityState));
}
}
if (null != status.getStackVersion() && !status.getStackVersion().isEmpty()) {
scHost.setStackVersion(gson.fromJson(status.getStackVersion(), StackId.class));
}
if (null != status.getConfigTags()) {
scHost.updateActualConfigs(status.getConfigTags());
}
Map<String, Object> extra = status.getExtra();
if (null != extra && !extra.isEmpty()) {
try {
if (extra.containsKey("processes")) {
@SuppressWarnings("unchecked")
List<Map<String, String>> list = (List<Map<String, String>>) extra.get("processes");
scHost.setProcesses(list);
}
if (extra.containsKey("version")) {
String version = extra.get("version").toString();
HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(cl, scHost, version);
versionEventPublisher.publish(event);
}
} catch (Exception e) {
LOG.error("Could not access extra JSON for " +
scHost.getServiceComponentName() + " from " +
scHost.getHostName() + ": " + status.getExtra() +
" (" + e.getMessage() + ")");
}
}
heartbeatMonitor.getAgentRequests()
.setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
} else {
// TODO: What should be done otherwise?
}
} catch (ServiceNotFoundException e) {
LOG.warn("Received a live status update for a non-initialized"
+ " service"
+ ", clusterName=" + status.getClusterName()
+ ", serviceName=" + status.getServiceName());
// FIXME ignore invalid live update and continue for now?
continue;
} catch (ServiceComponentNotFoundException e) {
LOG.warn("Received a live status update for a non-initialized"
+ " servicecomponent"
+ ", clusterName=" + status.getClusterName()
+ ", serviceName=" + status.getServiceName()
+ ", componentName=" + status.getComponentName());
// FIXME ignore invalid live update and continue for now?
continue;
} catch (ServiceComponentHostNotFoundException e) {
LOG.warn("Received a live status update for a non-initialized"
+ " service"
+ ", clusterName=" + status.getClusterName()
+ ", serviceName=" + status.getServiceName()
+ ", componentName=" + status.getComponentName()
+ ", hostname=" + hostname);
// FIXME ignore invalid live update and continue for now?
continue;
} catch (RuntimeException e) {
LOG.warn("Received a live status with invalid payload"
+ " service"
+ ", clusterName=" + status.getClusterName()
+ ", serviceName=" + status.getServiceName()
+ ", componentName=" + status.getComponentName()
+ ", hostname=" + hostname
+ ", error=" + e.getMessage());
continue;
}
}
}
}
}
/**
* This class is used for mapping json of structured output for keytab distribution actions.
*/
private static class WriteKeytabsStructuredOut {
@SerializedName("keytabs")
private Map<String,String> keytabs;
public Map<String, String> getKeytabs() {
return keytabs;
}
public void setKeytabs(Map<String, String> keytabs) {
this.keytabs = keytabs;
}
}
/**
* This class is used for mapping json of structured output for component START action.
*/
private static class ComponentVersionStructuredOut {
@SerializedName("version")
private String version;
@SerializedName("upgrade_type")
private UpgradeType upgradeType = null;
@SerializedName("direction")
private Direction upgradeDirection = null;
@SerializedName(KeyNames.REPO_VERSION_ID)
private Long repositoryVersionId;
}
}