blob: b7c8e89c7069ff0f5653a2595b9162b2bbf12f4a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.schedule;
import backtype.storm.Config;
import backtype.storm.generated.TaskHeartbeat;
import backtype.storm.generated.TopologyTaskHbInfo;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.nimbus.NimbusData;
import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
import com.alibaba.jstorm.daemon.nimbus.StatusType;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.TaskDeadEvent;
import java.util.*;
/**
* Scan all task's heartbeat, if task isn't alive, DO NimbusUtils.transition(monitor)
*
* @author Longda
*/
public class MonitorRunnable implements Runnable {
private static Logger LOG = LoggerFactory.getLogger(MonitorRunnable.class);
private NimbusData data;
public MonitorRunnable(NimbusData data) {
this.data = data;
}
/**
* @@@ Todo when one topology is being reassigned, the topology should skip check
*/
@Override
public void run() {
StormClusterState clusterState = data.getStormClusterState();
try {
// Attetion, need first check Assignments
List<String> active_topologys = clusterState.assignments(null);
if (active_topologys == null) {
LOG.info("Failed to get active topologies");
return;
}
for (String topologyid : active_topologys) {
if (clusterState.storm_base(topologyid, null) == null) {
continue;
}
LOG.debug("Check tasks " + topologyid);
// Attention, here don't check /ZK-dir/taskbeats/topologyid to
// get task ids
Set<Integer> taskIds = clusterState.task_ids(topologyid);
if (taskIds == null) {
LOG.info("Failed to get task ids of " + topologyid);
continue;
}
Assignment assignment = clusterState.assignment_info(topologyid, null);
Set<Integer> deadTasks = new HashSet<Integer>();
boolean needReassign = false;
for (Integer task : taskIds) {
boolean isTaskDead = NimbusUtils.isTaskDead(data, topologyid, task);
if (isTaskDead == true) {
deadTasks.add(task);
needReassign = true;
}
}
TopologyTaskHbInfo topologyHbInfo = data.getTasksHeartbeat().get(topologyid);
if (needReassign == true) {
if (topologyHbInfo != null) {
int topologyMasterId = topologyHbInfo.get_topologyMasterId();
if (deadTasks.contains(topologyMasterId)) {
deadTasks.clear();
if (assignment != null) {
ResourceWorkerSlot resource = assignment.getWorkerByTaskId(topologyMasterId);
if (resource != null)
deadTasks.addAll(resource.getTasks());
else
deadTasks.add(topologyMasterId);
}
} else {
Map<Integer, TaskHeartbeat> taskHbs = topologyHbInfo.get_taskHbs();
int launchTime = JStormUtils.parseInt(data.getConf().get(Config.NIMBUS_TASK_LAUNCH_SECS));
if (taskHbs == null || taskHbs.get(topologyMasterId) == null || taskHbs.get(topologyMasterId).get_uptime() < launchTime) {
/*try {
clusterState.topology_heartbeat(topologyid, topologyHbInfo);
} catch (Exception e) {
LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e);
}*/
return;
}
}
Map<Integer, ResourceWorkerSlot> deadTaskWorkers = new HashMap<>();
for (Integer task : deadTasks) {
LOG.info("Found " + topologyid + ",taskid:" + task + " is dead");
ResourceWorkerSlot resource = null;
if (assignment != null)
resource = assignment.getWorkerByTaskId(task);
if (resource != null) {
deadTaskWorkers.put(task, resource);
Date now = new Date();
String nowStr = TimeFormat.getSecond(now);
String errorInfo = "Task-" + task + " is dead on " + resource.getHostname() + ":" + resource.getPort() + ", " + nowStr;
LOG.info(errorInfo);
clusterState.report_task_error(topologyid, task, errorInfo, null);
}
}
if (deadTaskWorkers.size() > 0) {
// notify jstorm monitor
TaskDeadEvent event = new TaskDeadEvent();
event.clusterName = data.getClusterName();
event.topologyId = topologyid;
event.deadTasks = deadTaskWorkers;
event.timestamp = System.currentTimeMillis();
data.getMetricRunnable().pushEvent(event);
}
}
NimbusUtils.transition(data, topologyid, false, StatusType.monitor);
}
if (topologyHbInfo != null) {
try {
clusterState.topology_heartbeat(topologyid, topologyHbInfo);
} catch (Exception e) {
LOG.error("Failed to update task heartbeat info to ZK for " + topologyid, e);
}
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}