blob: b60864a787e30aa0781ea2366c4955dd4ef06432 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.daemon.nimbus;
import backtype.storm.Config;
import backtype.storm.daemon.Shutdownable;
import backtype.storm.generated.*;
import backtype.storm.generated.Nimbus.Iface;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.impl.RemoveTransitionCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.*;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.metric.MetaType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.metric.SimpleJStormMetric;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.error.TaskError;
import com.alibaba.jstorm.utils.*;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileExistsException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.security.InvalidParameterException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import static com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.*;
/**
* Thrift callback, all commands handling entrance
*
* @author version 1: lixin, version 2:Longda
*/
public class ServiceHandler implements Iface, Shutdownable, DaemonCommon {
private final static Logger LOG = LoggerFactory.getLogger(ServiceHandler.class);
public final static int THREAD_NUM = 64;
private NimbusData data;
private Map<Object, Object> conf;
public ServiceHandler(NimbusData data) {
this.data = data;
conf = data.getConf();
}
/**
* Shutdown the nimbus
*/
@Override
public void shutdown() {
LOG.info("Begin to shut down master");
// Timer.cancelTimer(nimbus.getTimer());
LOG.info("Successfully shut down master");
}
@Override
public boolean waiting() {
// @@@ TODO
return false;
}
@Override
public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws TException, AlreadyAliveException,
InvalidTopologyException, TopologyAssignException {
SubmitOptions options = new SubmitOptions(TopologyInitialStatus.ACTIVE);
submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, options);
}
private void makeAssignment(String topologyName, String topologyId, TopologyInitialStatus status) throws FailedAssignTopologyException {
TopologyAssignEvent assignEvent = new TopologyAssignEvent();
assignEvent.setTopologyId(topologyId);
assignEvent.setScratch(false);
assignEvent.setTopologyName(topologyName);
assignEvent.setOldStatus(Thrift.topologyInitialStatusToStormStatus(status));
TopologyAssign.push(assignEvent);
boolean isSuccess = assignEvent.waitFinish();
if (isSuccess == true) {
LOG.info("Finish submit for " + topologyName);
} else {
throw new FailedAssignTopologyException(assignEvent.getErrorMsg());
}
}
/**
* Submit one Topology
*
* @param topologyName String: topology name
* @param uploadedJarLocation String: already uploaded jar path
* @param jsonConf String: jsonConf serialize all toplogy configuration to
* Json
* @param topology StormTopology: topology Object
*/
@SuppressWarnings("unchecked")
@Override
public void submitTopologyWithOpts(String topologyName, String uploadedJarLocation, String jsonConf, StormTopology topology, SubmitOptions options)
throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException, TException {
LOG.info("Receive " + topologyName + ", uploadedJarLocation:" + uploadedJarLocation);
long start = System.nanoTime();
//check topologyname is valid
if (!Common.charValidate(topologyName)) {
throw new InvalidTopologyException(topologyName + " is not a valid topology name");
}
try {
checkTopologyActive(data, topologyName, false);
} catch (AlreadyAliveException e) {
LOG.info(topologyName + " already exists ");
throw e;
} catch (Throwable e) {
LOG.info("Failed to check whether topology is alive or not", e);
throw new TException(e);
}
String topologyId = null;
synchronized (data) {
// avoid to the same topologys wered submmitted at the same time
Set<String> pendingTopologys =
data.getPendingSubmitTopoloygs().keySet();
for (String cachTopologyId : pendingTopologys) {
if (cachTopologyId.contains(topologyName + "-"))
throw new AlreadyAliveException(
topologyName + " were submitted");
}
int counter = data.getSubmittedCount().incrementAndGet();
topologyId = Common.topologyNameToId(topologyName, counter);
data.getPendingSubmitTopoloygs().put(topologyId, null);
}
try {
Map<Object, Object> serializedConf = (Map<Object, Object>) JStormUtils.from_json(jsonConf);
if (serializedConf == null) {
LOG.warn("Failed to serialized Configuration");
throw new InvalidTopologyException("Failed to serialize topology configuration");
}
serializedConf.put(Config.TOPOLOGY_ID, topologyId);
serializedConf.put(Config.TOPOLOGY_NAME, topologyName);
Map<Object, Object> stormConf;
stormConf = NimbusUtils.normalizeConf(conf, serializedConf, topology);
LOG.info("Normalized configuration:" + stormConf);
Map<Object, Object> totalStormConf = new HashMap<Object, Object>(conf);
totalStormConf.putAll(stormConf);
StormTopology normalizedTopology = NimbusUtils.normalizeTopology(stormConf, topology, true);
// this validates the structure of the topology
Common.validate_basic(normalizedTopology, totalStormConf, topologyId);
// don't need generate real topology, so skip Common.system_topology
// Common.system_topology(totalStormConf, topology);
StormClusterState stormClusterState = data.getStormClusterState();
double metricsSampleRate = ConfigExtension.getMetricSampleRate(stormConf);
// create /local-dir/nimbus/topologyId/xxxx files
setupStormCode(conf, topologyId, uploadedJarLocation, stormConf, normalizedTopology);
// generate TaskInfo for every bolt or spout in ZK
// /ZK/tasks/topoologyId/xxx
setupZkTaskInfo(conf, topologyId, stormClusterState);
// make assignments for a topology
LOG.info("Submit for " + topologyName + " with conf " + serializedConf);
makeAssignment(topologyName, topologyId, options.get_initial_status());
// when make assignment for a topology,so remove the topologyid form
// pendingSubmitTopologys
data.getPendingSubmitTopoloygs().remove(topologyId);
// push start event after startup
StartTopologyEvent startEvent = new StartTopologyEvent();
startEvent.clusterName = this.data.getClusterName();
startEvent.topologyId = topologyId;
startEvent.timestamp = System.currentTimeMillis();
startEvent.sampleRate = metricsSampleRate;
this.data.getMetricRunnable().pushEvent(startEvent);
} catch (FailedAssignTopologyException e) {
StringBuilder sb = new StringBuilder();
sb.append("Fail to sumbit topology, Root cause:");
if (e.getMessage() == null) {
sb.append("submit timeout");
} else {
sb.append(e.getMessage());
}
sb.append("\n\n");
sb.append("topologyId:" + topologyId);
sb.append(", uploadedJarLocation:" + uploadedJarLocation + "\n");
LOG.error(sb.toString(), e);
data.getPendingSubmitTopoloygs().remove(topologyId);
throw new TopologyAssignException(sb.toString());
} catch (InvalidParameterException e) {
StringBuilder sb = new StringBuilder();
sb.append("Fail to sumbit topology ");
sb.append(e.getMessage());
sb.append(", cause:" + e.getCause());
sb.append("\n\n");
sb.append("topologyId:" + topologyId);
sb.append(", uploadedJarLocation:" + uploadedJarLocation + "\n");
LOG.error(sb.toString(), e);
data.getPendingSubmitTopoloygs().remove(topologyId);
throw new InvalidParameterException(sb.toString());
} catch (InvalidTopologyException e) {
LOG.error("Topology is invalid. " + e.get_msg());
data.getPendingSubmitTopoloygs().remove(topologyId);
throw e;
} catch (Throwable e) {
StringBuilder sb = new StringBuilder();
sb.append("Fail to sumbit topology ");
sb.append(e.getMessage());
sb.append(", cause:" + e.getCause());
sb.append("\n\n");
sb.append("topologyId:" + topologyId);
sb.append(", uploadedJarLocation:" + uploadedJarLocation + "\n");
LOG.error(sb.toString(), e);
data.getPendingSubmitTopoloygs().remove(topologyId);
throw new TopologyAssignException(sb.toString());
} finally {
double spend = (System.nanoTime() - start) / TimeUtils.NS_PER_US;
SimpleJStormMetric.updateNimbusHistogram("submitTopologyWithOpts", spend);
LOG.info("submitTopologyWithOpts {} costs {}ms", topologyName, spend);
}
}
/**
* kill topology
*
* @param topologyName String topology name
*/
@Override
public void killTopology(String topologyName) throws TException, NotAliveException {
killTopologyWithOpts(topologyName, new KillOptions());
}
@Override
public void killTopologyWithOpts(String topologyName, KillOptions options) throws TException, NotAliveException {
try {
checkTopologyActive(data, topologyName, true);
String topologyId = getTopologyId(topologyName);
Integer wait_amt = null;
if (options.is_set_wait_secs()) {
wait_amt = options.get_wait_secs();
}
NimbusUtils.transitionName(data, topologyName, true, StatusType.kill, wait_amt);
Remove event = new Remove();
event.topologyId = topologyId;
data.getMetricRunnable().pushEvent(event);
} catch (NotAliveException e) {
String errMsg = "KillTopology Error, no this topology " + topologyName;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
String errMsg = "Failed to kill topology " + topologyName;
LOG.error(errMsg, e);
throw new TException(errMsg);
}
}
/**
* set topology status as active
*
* @param topologyName
*/
@Override
public void activate(String topologyName) throws TException, NotAliveException {
try {
NimbusUtils.transitionName(data, topologyName, true, StatusType.activate);
} catch (NotAliveException e) {
String errMsg = "Activate Error, no this topology " + topologyName;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
String errMsg = "Failed to active topology " + topologyName;
LOG.error(errMsg, e);
throw new TException(errMsg);
}
}
/**
* set topology stauts as deactive
*
* @param topologyName
*/
@Override
public void deactivate(String topologyName) throws TException, NotAliveException {
try {
NimbusUtils.transitionName(data, topologyName, true, StatusType.inactivate);
} catch (NotAliveException e) {
String errMsg = "Deactivate Error, no this topology " + topologyName;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
String errMsg = "Failed to deactivate topology " + topologyName;
LOG.error(errMsg, e);
throw new TException(errMsg);
}
}
/**
* rebalance one topology
*
* @param topologyName topology name
* @param options RebalanceOptions
* @@@ rebalance options hasn't implements
* <p/>
* It is used to let workers wait several seconds to finish jobs
*/
@Override
public void rebalance(String topologyName, RebalanceOptions options) throws TException, NotAliveException {
try {
checkTopologyActive(data, topologyName, true);
Integer wait_amt = null;
String jsonConf = null;
Boolean reassign = false;
if (options != null) {
if (options.is_set_wait_secs())
wait_amt = options.get_wait_secs();
if (options.is_set_reassign())
reassign = options.is_reassign();
if (options.is_set_conf())
jsonConf = options.get_conf();
}
LOG.info("Begin to rebalance " + topologyName + "wait_time:" + wait_amt + ", reassign: " + reassign + ", new worker/bolt configuration:" + jsonConf);
Map<Object, Object> conf = (Map<Object, Object>) JStormUtils.from_json(jsonConf);
NimbusUtils.transitionName(data, topologyName, true, StatusType.rebalance, wait_amt, reassign, conf);
} catch (NotAliveException e) {
String errMsg = "Rebalance Error, no this topology " + topologyName;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
String errMsg = "Failed to rebalance topology " + topologyName;
LOG.error(errMsg, e);
throw new TException(errMsg);
}
}
@Override
public void restart(String name, String jsonConf) throws TException, NotAliveException, InvalidTopologyException, TopologyAssignException {
LOG.info("Begin to restart " + name + ", new configuration:" + jsonConf);
// 1. get topologyId
StormClusterState stormClusterState = data.getStormClusterState();
String topologyId;
try {
topologyId = Cluster.get_topology_id(stormClusterState, name);
} catch (Exception e2) {
topologyId = null;
}
if (topologyId == null) {
LOG.info("No topology of " + name);
throw new NotAliveException("No topology of " + name);
}
// Restart the topology: Deactivate -> Kill -> Submit
// 2. Deactivate
deactivate(name);
JStormUtils.sleepMs(5000);
LOG.info("Deactivate " + name);
// 3. backup old jar/configuration/topology
StormTopology topology;
Map topologyConf;
String topologyCodeLocation = null;
try {
topology = StormConfig.read_nimbus_topology_code(conf, topologyId);
topologyConf = StormConfig.read_nimbus_topology_conf(conf, topologyId);
if (jsonConf != null) {
Map<Object, Object> newConf = (Map<Object, Object>) JStormUtils.from_json(jsonConf);
topologyConf.putAll(newConf);
}
// Copy storm files back to stormdist dir from the tmp dir
String oldDistDir = StormConfig.masterStormdistRoot(conf, topologyId);
String parent = StormConfig.masterInbox(conf);
topologyCodeLocation = parent + PathUtils.SEPERATOR + topologyId;
FileUtils.forceMkdir(new File(topologyCodeLocation));
FileUtils.cleanDirectory(new File(topologyCodeLocation));
File stormDistDir = new File(oldDistDir);
stormDistDir.setLastModified(System.currentTimeMillis());
FileUtils.copyDirectory(stormDistDir, new File(topologyCodeLocation));
LOG.info("Successfully read old jar/conf/topology " + name);
} catch (Exception e) {
LOG.error("Failed to read old jar/conf/topology", e);
if (topologyCodeLocation != null) {
try {
PathUtils.rmr(topologyCodeLocation);
} catch (IOException ignored) {
}
}
throw new TException("Failed to read old jar/conf/topology ");
}
// 4. Kill
// directly use remove command to kill, more stable than issue kill cmd
RemoveTransitionCallback killCb = new RemoveTransitionCallback(data, topologyId);
killCb.execute(new Object[0]);
LOG.info("Successfully kill the topology " + name);
// send metric events
TopologyMetricsRunnable.KillTopologyEvent killEvent = new TopologyMetricsRunnable.KillTopologyEvent();
killEvent.clusterName = this.data.getClusterName();
killEvent.topologyId = topologyId;
killEvent.timestamp = System.currentTimeMillis();
this.data.getMetricRunnable().pushEvent(killEvent);
Remove removeEvent = new Remove();
removeEvent.topologyId = topologyId;
this.data.getMetricRunnable().pushEvent(removeEvent);
// 5. submit
try {
submitTopology(name, topologyCodeLocation, JStormUtils.to_json(topologyConf), topology);
} catch (AlreadyAliveException e) {
LOG.info("Failed to kill the topology" + name);
throw new TException("Failed to kill the topology" + name);
} finally {
try {
PathUtils.rmr(topologyCodeLocation);
} catch (IOException ignored) {
}
}
}
@Override
public void beginLibUpload(String libName) throws TException {
try {
String parent = PathUtils.parent_path(libName);
PathUtils.local_mkdirs(parent);
data.getUploaders().put(libName, Channels.newChannel(new FileOutputStream(libName)));
LOG.info("Begin upload file from client to " + libName);
} catch (Exception e) {
LOG.error("Fail to upload jar " + libName, e);
throw new TException(e);
}
}
/**
* prepare to uploading topology jar, return the file location
*/
@Override
public String beginFileUpload() throws TException {
String fileLoc = null;
try {
String path;
String key = UUID.randomUUID().toString();
path = StormConfig.masterInbox(conf) + "/" + key;
FileUtils.forceMkdir(new File(path));
FileUtils.cleanDirectory(new File(path));
fileLoc = path + "/stormjar-" + key + ".jar";
data.getUploaders().put(fileLoc, Channels.newChannel(new FileOutputStream(fileLoc)));
LOG.info("Begin upload file from client to " + fileLoc);
return path;
} catch (FileNotFoundException e) {
LOG.error("File not found: " + fileLoc, e);
throw new TException(e);
} catch (IOException e) {
LOG.error("Upload file error: " + fileLoc, e);
throw new TException(e);
}
}
/**
* uploading topology jar data
*/
@Override
public void uploadChunk(String location, ByteBuffer chunk) throws TException {
TimeCacheMap<Object, Object> uploaders = data.getUploaders();
Object obj = uploaders.get(location);
if (obj == null) {
throw new TException("File for that location does not exist (or timed out) " + location);
}
try {
if (obj instanceof WritableByteChannel) {
WritableByteChannel channel = (WritableByteChannel) obj;
channel.write(chunk);
uploaders.put(location, channel);
} else {
throw new TException("Object isn't WritableByteChannel for " + location);
}
} catch (IOException e) {
String errMsg = " WritableByteChannel write filed when uploadChunk " + location;
LOG.error(errMsg);
throw new TException(e);
}
}
@Override
public void finishFileUpload(String location) throws TException {
TimeCacheMap<Object, Object> uploaders = data.getUploaders();
Object obj = uploaders.get(location);
if (obj == null) {
throw new TException("File for that location does not exist (or timed out)");
}
try {
if (obj instanceof WritableByteChannel) {
WritableByteChannel channel = (WritableByteChannel) obj;
channel.close();
uploaders.remove(location);
LOG.info("Finished uploading file from client: " + location);
} else {
throw new TException("Object isn't WritableByteChannel for " + location);
}
} catch (IOException e) {
LOG.error(" WritableByteChannel close failed when finishFileUpload " + location);
}
}
@Override
public String beginFileDownload(String file) throws TException {
BufferFileInputStream is;
String id;
try {
int bufferSize = JStormUtils.parseInt(conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE), 1024 * 1024) / 2;
is = new BufferFileInputStream(file, bufferSize);
id = UUID.randomUUID().toString();
data.getDownloaders().put(id, is);
} catch (FileNotFoundException e) {
LOG.error(e + "file:" + file + " not found");
throw new TException(e);
}
return id;
}
@Override
public ByteBuffer downloadChunk(String id) throws TException {
TimeCacheMap<Object, Object> downloaders = data.getDownloaders();
Object obj = downloaders.get(id);
if (obj == null) {
throw new TException("Could not find input stream for that id");
}
try {
if (obj instanceof BufferFileInputStream) {
BufferFileInputStream is = (BufferFileInputStream) obj;
byte[] ret = is.read();
if (ret != null) {
downloaders.put(id, is);
return ByteBuffer.wrap(ret);
}
} else {
throw new TException("Object isn't BufferFileInputStream for " + id);
}
} catch (IOException e) {
LOG.error("BufferFileInputStream read failed when downloadChunk ", e);
throw new TException(e);
}
byte[] empty = {};
return ByteBuffer.wrap(empty);
}
@Override
public void finishFileDownload(String id) throws TException {
data.getDownloaders().remove(id);
}
/**
* get cluster's summary, it will contain SupervisorSummary and TopologySummary
*
* @return ClusterSummary
*/
@Override
public ClusterSummary getClusterInfo() throws TException {
long start = System.nanoTime();
try {
StormClusterState stormClusterState = data.getStormClusterState();
Map<String, Assignment> assignments = new HashMap<String, Assignment>();
// get TopologySummary
List<TopologySummary> topologySummaries = NimbusUtils.getTopologySummary(stormClusterState, assignments);
// all supervisors
Map<String, SupervisorInfo> supervisorInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// generate SupervisorSummaries
List<SupervisorSummary> supervisorSummaries = NimbusUtils.mkSupervisorSummaries(supervisorInfos, assignments);
NimbusSummary nimbusSummary = NimbusUtils.getNimbusSummary(stormClusterState, supervisorSummaries, data);
return new ClusterSummary(nimbusSummary, supervisorSummaries, topologySummaries);
} catch (TException e) {
LOG.info("Failed to get ClusterSummary ", e);
throw e;
} catch (Exception e) {
LOG.info("Failed to get ClusterSummary ", e);
throw new TException(e);
} finally {
long end = System.nanoTime();
SimpleJStormMetric.updateNimbusHistogram("getClusterInfo", (end - start) / TimeUtils.NS_PER_US);
}
}
@Override
public String getVersion() throws TException {
return Utils.getVersion();
}
@Override
public SupervisorWorkers getSupervisorWorkers(String host) throws NotAliveException, TException {
long start = System.nanoTime();
try {
StormClusterState stormClusterState = data.getStormClusterState();
String supervisorId = null;
SupervisorInfo supervisorInfo = null;
String ip = NetWorkUtils.host2Ip(host);
String hostName = NetWorkUtils.ip2Host(host);
// all supervisors
Map<String, SupervisorInfo> supervisorInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
for (Entry<String, SupervisorInfo> entry : supervisorInfos.entrySet()) {
SupervisorInfo info = entry.getValue();
if (info.getHostName().equals(hostName) || info.getHostName().equals(ip)) {
supervisorId = entry.getKey();
supervisorInfo = info;
break;
}
}
if (supervisorId == null) {
throw new TException("No supervisor of " + host);
}
Map<String, Assignment> assignments = Cluster.get_all_assignment(stormClusterState, null);
Map<Integer, WorkerSummary> portWorkerSummarys = new TreeMap<Integer, WorkerSummary>();
int usedSlotNumber = 0;
Map<String, Map<Integer, String>> topologyTaskToComponent = new HashMap<String, Map<Integer, String>>();
Map<String, MetricInfo> metricInfoMap = new HashMap<String, MetricInfo>();
for (Entry<String, Assignment> entry : assignments.entrySet()) {
String topologyId = entry.getKey();
Assignment assignment = entry.getValue();
Set<ResourceWorkerSlot> workers = assignment.getWorkers();
for (ResourceWorkerSlot worker : workers) {
if (supervisorId.equals(worker.getNodeId()) == false) {
continue;
}
usedSlotNumber++;
Integer port = worker.getPort();
WorkerSummary workerSummary = portWorkerSummarys.get(port);
if (workerSummary == null) {
workerSummary = new WorkerSummary();
workerSummary.set_port(port);
workerSummary.set_topology(topologyId);
workerSummary.set_tasks(new ArrayList<TaskComponent>());
portWorkerSummarys.put(port, workerSummary);
}
Map<Integer, String> taskToComponent = topologyTaskToComponent.get(topologyId);
if (taskToComponent == null) {
taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
topologyTaskToComponent.put(topologyId, taskToComponent);
}
int earliest = TimeUtils.current_time_secs();
for (Integer taskId : worker.getTasks()) {
TaskComponent taskComponent = new TaskComponent();
taskComponent.set_component(taskToComponent.get(taskId));
taskComponent.set_taskId(taskId);
Integer startTime = assignment.getTaskStartTimeSecs().get(taskId);
if (startTime != null && startTime < earliest) {
earliest = startTime;
}
workerSummary.add_to_tasks(taskComponent);
}
workerSummary.set_uptime(TimeUtils.time_delta(earliest));
String workerSlotName = getWorkerSlotName(supervisorInfo.getHostName(), port);
List<MetricInfo> workerMetricInfoList = this.data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
if (workerMetricInfoList.size() > 0) {
MetricInfo workerMetricInfo = workerMetricInfoList.get(0);
// remove metrics that don't belong to current worker
for (Iterator<String> itr = workerMetricInfo.get_metrics().keySet().iterator();
itr.hasNext(); ) {
String metricName = itr.next();
if (!metricName.contains(host)) {
itr.remove();
}
}
metricInfoMap.put(workerSlotName, workerMetricInfo);
}
}
}
List<WorkerSummary> workerList = new ArrayList<WorkerSummary>();
workerList.addAll(portWorkerSummarys.values());
Map<String, Integer> supervisorToUsedSlotNum = new HashMap<String, Integer>();
supervisorToUsedSlotNum.put(supervisorId, usedSlotNumber);
SupervisorSummary supervisorSummary = NimbusUtils.mkSupervisorSummary(supervisorInfo, supervisorId, supervisorToUsedSlotNum);
return new SupervisorWorkers(supervisorSummary, workerList, metricInfoMap);
} catch (TException e) {
LOG.info("Failed to get ClusterSummary ", e);
throw e;
} catch (Exception e) {
LOG.info("Failed to get ClusterSummary ", e);
throw new TException(e);
} finally {
long end = System.nanoTime();
SimpleJStormMetric.updateNimbusHistogram("getSupervisorWorkers", (end - start) / TimeUtils.NS_PER_US);
}
}
/**
* Get TopologyInfo, it contain all data of the topology running status
*
* @return TopologyInfo
*/
@Override
public TopologyInfo getTopologyInfo(String topologyId) throws NotAliveException, TException {
long start = System.nanoTime();
StormClusterState stormClusterState = data.getStormClusterState();
try {
// get topology's StormBase
StormBase base = stormClusterState.storm_base(topologyId, null);
if (base == null) {
throw new NotAliveException("No topology of " + topologyId);
}
Assignment assignment = stormClusterState.assignment_info(topologyId, null);
if (assignment == null) {
throw new NotAliveException("No topology of " + topologyId);
}
TopologyTaskHbInfo topologyTaskHbInfo = data.getTasksHeartbeat().get(topologyId);
Map<Integer, TaskHeartbeat> taskHbMap = null;
if (topologyTaskHbInfo != null)
taskHbMap = topologyTaskHbInfo.get_taskHbs();
Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(stormClusterState, topologyId);
Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, taskInfoMap);
Map<Integer, String> taskToType = Cluster.get_all_task_type(stormClusterState, topologyId, taskInfoMap);
String errorString;
if (Cluster.is_topology_exist_error(stormClusterState, topologyId)) {
errorString = "Y";
} else {
errorString = "";
}
TopologySummary topologySummary = new TopologySummary();
topologySummary.set_id(topologyId);
topologySummary.set_name(base.getStormName());
topologySummary.set_uptimeSecs(TimeUtils.time_delta(base.getLanchTimeSecs()));
topologySummary.set_status(base.getStatusString());
topologySummary.set_numTasks(NimbusUtils.getTopologyTaskNum(assignment));
topologySummary.set_numWorkers(assignment.getWorkers().size());
topologySummary.set_errorInfo(errorString);
Map<String, ComponentSummary> componentSummaryMap = new HashMap<String, ComponentSummary>();
HashMap<String, List<Integer>> componentToTasks = JStormUtils.reverse_map(taskToComponent);
for (Entry<String, List<Integer>> entry : componentToTasks.entrySet()) {
String name = entry.getKey();
List<Integer> taskIds = entry.getValue();
if (taskIds == null || taskIds.size() == 0) {
LOG.warn("No task of component " + name);
continue;
}
ComponentSummary componentSummary = new ComponentSummary();
componentSummaryMap.put(name, componentSummary);
componentSummary.set_name(name);
componentSummary.set_type(taskToType.get(taskIds.get(0)));
componentSummary.set_parallel(taskIds.size());
componentSummary.set_taskIds(taskIds);
}
Map<Integer, TaskSummary> taskSummaryMap = new TreeMap<Integer, TaskSummary>();
Map<Integer, List<TaskError>> taskErrors = Cluster.get_all_task_errors(stormClusterState, topologyId);
for (Integer taskId : taskInfoMap.keySet()) {
TaskSummary taskSummary = new TaskSummary();
taskSummaryMap.put(taskId, taskSummary);
taskSummary.set_taskId(taskId);
if (taskHbMap == null) {
taskSummary.set_status("Starting");
taskSummary.set_uptime(0);
} else {
TaskHeartbeat hb = taskHbMap.get(taskId);
if (hb == null) {
taskSummary.set_status("Starting");
taskSummary.set_uptime(0);
} else {
boolean isInactive = NimbusUtils.isTaskDead(data, topologyId, taskId);
if (isInactive)
taskSummary.set_status("INACTIVE");
else
taskSummary.set_status("ACTIVE");
taskSummary.set_uptime(hb.get_uptime());
}
}
if (StringUtils.isBlank(errorString)) {
continue;
}
List<TaskError> taskErrorList = taskErrors.get(taskId);
if (taskErrorList != null && taskErrorList.size() != 0) {
for (TaskError taskError : taskErrorList) {
ErrorInfo errorInfo = new ErrorInfo(taskError.getError(), taskError.getTimSecs());
taskSummary.add_to_errors(errorInfo);
String component = taskToComponent.get(taskId);
componentSummaryMap.get(component).add_to_errors(errorInfo);
}
}
}
for (ResourceWorkerSlot workerSlot : assignment.getWorkers()) {
String hostname = workerSlot.getHostname();
int port = workerSlot.getPort();
for (Integer taskId : workerSlot.getTasks()) {
TaskSummary taskSummary = taskSummaryMap.get(taskId);
taskSummary.set_host(hostname);
taskSummary.set_port(port);
}
}
TopologyInfo topologyInfo = new TopologyInfo();
topologyInfo.set_topology(topologySummary);
topologyInfo.set_components(JStormUtils.mk_list(componentSummaryMap.values()));
topologyInfo.set_tasks(JStormUtils.mk_list(taskSummaryMap.values()));
// return topology metric & component metric only
List<MetricInfo> tpMetricList = data.getMetricCache().getMetricData(topologyId, MetaType.TOPOLOGY);
List<MetricInfo> compMetricList = data.getMetricCache().getMetricData(topologyId, MetaType.COMPONENT);
List<MetricInfo> workerMetricList = data.getMetricCache().getMetricData(topologyId, MetaType.WORKER);
MetricInfo taskMetric = MetricUtils.mkMetricInfo();
MetricInfo streamMetric = MetricUtils.mkMetricInfo();
MetricInfo nettyMetric = MetricUtils.mkMetricInfo();
MetricInfo tpMetric, compMetric, workerMetric;
if (tpMetricList == null || tpMetricList.size() == 0) {
tpMetric = MetricUtils.mkMetricInfo();
} else {
// get the last min topology metric
tpMetric = tpMetricList.get(tpMetricList.size() - 1);
}
if (compMetricList == null || compMetricList.size() == 0) {
compMetric = MetricUtils.mkMetricInfo();
} else {
compMetric = compMetricList.get(0);
}
if (workerMetricList == null || workerMetricList.size() == 0) {
workerMetric = MetricUtils.mkMetricInfo();
} else {
workerMetric = workerMetricList.get(0);
}
TopologyMetric topologyMetrics = new TopologyMetric(tpMetric, compMetric, workerMetric,
taskMetric, streamMetric, nettyMetric);
topologyInfo.set_metrics(topologyMetrics);
return topologyInfo;
} catch (TException e) {
LOG.info("Failed to get topologyInfo " + topologyId, e);
throw e;
} catch (Exception e) {
LOG.info("Failed to get topologyInfo " + topologyId, e);
throw new TException("Failed to get topologyInfo" + topologyId);
} finally {
long end = System.nanoTime();
SimpleJStormMetric.updateNimbusHistogram("getTopologyInfo", (end - start) / TimeUtils.NS_PER_US);
}
}
@Override
public TopologyInfo getTopologyInfoByName(String topologyName) throws NotAliveException, TException {
String topologyId = getTopologyId(topologyName);
return getTopologyInfo(topologyId);
}
@Override
public String getNimbusConf() throws TException {
try {
return JStormUtils.to_json(data.getConf());
} catch (Exception e) {
String err = "Failed to generate Nimbus configuration";
LOG.error(err, e);
throw new TException(err);
}
}
/**
* get topology configuration
*
* @param id String: topology id
* @return String
*/
@Override
public String getTopologyConf(String id) throws NotAliveException, TException {
String rtn;
try {
Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(conf, id);
rtn = JStormUtils.to_json(topologyConf);
} catch (IOException e) {
LOG.info("Failed to get configuration of " + id, e);
throw new TException(e);
}
return rtn;
}
@Override
public String getTopologyId(String topologyName) throws NotAliveException, TException {
StormClusterState stormClusterState = data.getStormClusterState();
try {
// get all active topology's StormBase
String topologyId = Cluster.get_topology_id(stormClusterState, topologyName);
if (topologyId != null) {
return topologyId;
}
} catch (Exception e) {
LOG.info("Failed to get getTopologyId " + topologyName, e);
throw new TException("Failed to get getTopologyId " + topologyName);
}
// topologyId == null
throw new NotAliveException("No topology of " + topologyName);
}
/**
* get StormTopology throw deserialize local files
*
* @param id String: topology id
* @return StormTopology
*/
@Override
public StormTopology getTopology(String id) throws NotAliveException, TException {
StormTopology topology;
try {
StormTopology stormtopology = StormConfig.read_nimbus_topology_code(conf, id);
if (stormtopology == null) {
throw new NotAliveException("No topology of " + id);
}
Map<Object, Object> topologyConf = (Map<Object, Object>) StormConfig.read_nimbus_topology_conf(conf, id);
topology = Common.system_topology(topologyConf, stormtopology);
} catch (Exception e) {
LOG.error("Failed to get topology " + id + ",", e);
throw new TException("Failed to get system_topology");
}
return topology;
}
@Override
public StormTopology getUserTopology(String id) throws NotAliveException, TException {
StormTopology topology = null;
try {
StormTopology stormtopology = StormConfig.read_nimbus_topology_code(conf, id);
if (stormtopology == null) {
throw new NotAliveException("No topology of " + id);
}
return topology;
} catch (Exception e) {
LOG.error("Failed to get topology " + id + ",", e);
throw new TException("Failed to get system_topology");
}
}
/**
* check whether the topology is bActive?
*
* @param nimbus
* @param topologyName
* @param bActive
* @throws Exception
*/
public void checkTopologyActive(NimbusData nimbus, String topologyName, boolean bActive) throws Exception {
if (isTopologyActive(nimbus.getStormClusterState(), topologyName) != bActive) {
if (bActive) {
throw new NotAliveException(topologyName + " is not alive");
} else {
throw new AlreadyAliveException(topologyName + " is already active");
}
}
}
/**
* whether the topology is active by topology name
*
* @param stormClusterState see Cluster_clj
* @param topologyName
* @return boolean if the storm is active, return true, otherwise return
* false;
* @throws Exception
*/
public boolean isTopologyActive(StormClusterState stormClusterState, String topologyName) throws Exception {
boolean rtn = false;
if (Cluster.get_topology_id(stormClusterState, topologyName) != null) {
rtn = true;
}
return rtn;
}
/**
* create local topology files /local-dir/nimbus/topologyId/stormjar.jar
* /local-dir/nimbus/topologyId/stormcode.ser
* /local-dir/nimbus/topologyId/stormconf.ser
*
* @param conf
* @param topologyId
* @param tmpJarLocation
* @param stormConf
* @param topology
* @throws IOException
*/
private void setupStormCode(Map<Object, Object> conf, String topologyId, String tmpJarLocation, Map<Object, Object> stormConf, StormTopology topology)
throws IOException {
// local-dir/nimbus/stormdist/topologyId
String stormroot = StormConfig.masterStormdistRoot(conf, topologyId);
FileUtils.forceMkdir(new File(stormroot));
FileUtils.cleanDirectory(new File(stormroot));
// copy jar to /local-dir/nimbus/topologyId/stormjar.jar
setupJar(conf, tmpJarLocation, stormroot);
// serialize to file /local-dir/nimbus/topologyId/stormcode.ser
FileUtils.writeByteArrayToFile(new File(StormConfig.stormcode_path(stormroot)), Utils.serialize(topology));
// serialize to file /local-dir/nimbus/topologyId/stormconf.ser
FileUtils.writeByteArrayToFile(new File(StormConfig.stormconf_path(stormroot)), Utils.serialize(stormConf));
// Update downloadCode timeStamp
StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, System.currentTimeMillis());
}
private boolean copyLibJars(String tmpJarLocation, String stormroot) throws IOException {
String srcLibPath = StormConfig.stormlib_path(tmpJarLocation);
String destLibPath = StormConfig.stormlib_path(stormroot);
LOG.info("Begin to copy from " + srcLibPath + " to " + destLibPath);
File srcFile = new File(srcLibPath);
if (srcFile.exists() == false) {
LOG.info("No lib jars " + srcLibPath);
return false;
}
File destFile = new File(destLibPath);
FileUtils.copyDirectory(srcFile, destFile);
PathUtils.rmr(srcLibPath);
LOG.info("Successfully copy libs " + destLibPath);
return true;
}
/**
* Copy jar to /local-dir/nimbus/topologyId/stormjar.jar
*
* @param conf
* @param tmpJarLocation
* @param stormroot
* @throws IOException
*/
private void setupJar(Map<Object, Object> conf, String tmpJarLocation, String stormroot) throws IOException {
if (!StormConfig.local_mode(conf)) {
boolean existLibs = copyLibJars(tmpJarLocation, stormroot);
String jarPath = null;
List<String> files = PathUtils.read_dir_contents(tmpJarLocation);
for (String file : files) {
if (file.endsWith(".jar")) {
jarPath = tmpJarLocation + PathUtils.SEPERATOR + file;
break;
}
}
if (jarPath == null) {
if (existLibs == false) {
throw new IllegalArgumentException("No jar under " + tmpJarLocation);
} else {
LOG.info("No submit jar");
return;
}
}
File srcFile = new File(jarPath);
if (!srcFile.exists()) {
throw new IllegalArgumentException(jarPath + " to copy to " + stormroot + " does not exist!");
}
String path = StormConfig.stormjar_path(stormroot);
File destFile = new File(path);
FileUtils.copyFile(srcFile, destFile);
srcFile.delete();
}
}
/**
* generate TaskInfo for every bolt or spout in ZK /ZK/tasks/topoologyId/xxx
*
* @param conf
* @param topologyId
* @param stormClusterState
* @throws Exception
*/
public void setupZkTaskInfo(Map<Object, Object> conf, String topologyId, StormClusterState stormClusterState) throws Exception {
Map<Integer, TaskInfo> taskToTaskInfo = mkTaskComponentAssignments(conf, topologyId);
// mkdir /ZK/taskbeats/topoologyId
int masterId = NimbusUtils.getTopologyMasterId(taskToTaskInfo);
TopologyTaskHbInfo topoTaskHbinfo = new TopologyTaskHbInfo(topologyId, masterId);
data.getTasksHeartbeat().put(topologyId, topoTaskHbinfo);
stormClusterState.topology_heartbeat(topologyId, topoTaskHbinfo);
if (taskToTaskInfo == null || taskToTaskInfo.size() == 0) {
throw new InvalidTopologyException("Failed to generate TaskIDs map");
}
// key is taskid, value is taskinfo
stormClusterState.set_task(topologyId, taskToTaskInfo);
}
/**
* generate a taskid(Integer) for every task
*
* @param conf
* @param topologyid
* @return Map<Integer, String>: from taskid to componentid
* @throws IOException
* @throws InvalidTopologyException
*/
public Map<Integer, TaskInfo> mkTaskComponentAssignments(Map<Object, Object> conf, String topologyid) throws IOException, InvalidTopologyException {
// @@@ here exist a little problem,
// we can directly pass stormConf from Submit method
Map<Object, Object> stormConf = StormConfig.read_nimbus_topology_conf(conf, topologyid);
StormTopology stopology = StormConfig.read_nimbus_topology_code(conf, topologyid);
StormTopology topology = Common.system_topology(stormConf, stopology);
return Common.mkTaskInfo(stormConf, topology, topologyid);
}
@Override
public void metricMonitor(String topologyName, MonitorOptions options) throws TException {
boolean isEnable = options.is_isEnable();
StormClusterState clusterState = data.getStormClusterState();
try {
String topologyId = Cluster.get_topology_id(clusterState, topologyName);
if (null != topologyId) {
clusterState.set_storm_monitor(topologyId, isEnable);
} else {
throw new NotAliveException("Failed to update metricsMonitor status as " + topologyName + " is not alive");
}
} catch (Exception e) {
String errMsg = "Failed to update metricsMonitor " + topologyName;
LOG.error(errMsg, e);
throw new TException(e);
}
}
@Override
public TopologyMetric getTopologyMetrics(String topologyId) throws TException {
LOG.debug("Nimbus service handler, getTopologyMetric, topology ID: " + topologyId);
long start = System.nanoTime();
try {
return data.getMetricRunnable().getTopologyMetric(topologyId);
} finally {
long end = System.nanoTime();
SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end - start) / TimeUtils.NS_PER_US);
}
}
@Override
public void uploadTopologyMetrics(String topologyId, TopologyMetric uploadMetrics) throws TException {
LOG.info("Received topology metrics:{}", topologyId);
Update event = new Update();
event.timestamp = System.currentTimeMillis();
event.topologyMetrics = uploadMetrics;
event.topologyId = topologyId;
data.getMetricRunnable().pushEvent(event);
}
@Override
public Map<String, Long> registerMetrics(String topologyId, Set<String> metrics) throws TException {
try {
return data.getMetricRunnable().registerMetrics(topologyId, metrics);
} catch (Exception ex) {
return null;
}
}
public void uploadNewCredentials(String topologyName, Credentials creds) {
}
@Override
public List<MetricInfo> getMetrics(String topologyId, int type) throws TException {
MetaType metaType = MetaType.parse(type);
return data.getMetricCache().getMetricData(topologyId, metaType);
}
@Override
public MetricInfo getNettyMetrics(String topologyId) throws TException {
List<MetricInfo> metricInfoList = data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
if (metricInfoList != null && metricInfoList.size() > 0) {
return metricInfoList.get(0);
}
return new MetricInfo();
}
@Override
public MetricInfo getNettyMetricsByHost(String topologyId, String host) throws TException {
MetricInfo ret = new MetricInfo();
List<MetricInfo> metricInfoList = data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
if (metricInfoList != null && metricInfoList.size() > 0) {
MetricInfo metricInfo = metricInfoList.get(0);
for (Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metricInfo.get_metrics().entrySet()) {
String metricName = metricEntry.getKey();
Map<Integer, MetricSnapshot> data = metricEntry.getValue();
if (metricName.contains(host)) {
ret.put_to_metrics(metricName, data);
}
}
}
LOG.info("getNettyMetricsByHost, total size:{}", ret.get_metrics_size());
return ret;
}
@Override
public int getNettyMetricSizeByHost(String topologyId, String host) throws TException {
return getNettyMetricsByHost(topologyId, host).get_metrics_size();
}
@Override
public MetricInfo getPagingNettyMetrics(String topologyId, String host, int page) throws TException {
MetricInfo ret = new MetricInfo();
int start = (page - 1) * MetricUtils.NETTY_METRIC_PAGE_SIZE;
int end = page * MetricUtils.NETTY_METRIC_PAGE_SIZE;
int cur = -1;
List<MetricInfo> metricInfoList = data.getMetricCache().getMetricData(topologyId, MetaType.NETTY);
if (metricInfoList != null && metricInfoList.size() > 0) {
MetricInfo metricInfo = metricInfoList.get(0);
for (Entry<String, Map<Integer, MetricSnapshot>> metricEntry : metricInfo.get_metrics().entrySet()) {
String metricName = metricEntry.getKey();
Map<Integer, MetricSnapshot> data = metricEntry.getValue();
if (metricName.contains(host)) {
++cur;
if (cur >= start && cur < end) {
ret.put_to_metrics(metricName, data);
}
if (cur >= end) {
break;
}
}
}
}
LOG.info("getNettyMetricsByHost, total size:{}", ret.get_metrics_size());
return ret;
}
@Override
public MetricInfo getTaskMetrics(String topologyId, String component) throws TException {
List<MetricInfo> taskMetricList = getMetrics(topologyId, MetaType.TASK.getT());
if (taskMetricList != null && taskMetricList.size() > 0) {
MetricInfo metricInfo = taskMetricList.get(0);
Map<String, Map<Integer, MetricSnapshot>> metrics = metricInfo.get_metrics();
for (Iterator<String> itr = metrics.keySet().iterator(); itr.hasNext(); ) {
String metricName = itr.next();
String[] parts = metricName.split(MetricUtils.DELIM);
if (parts.length < 7 || !parts[2].equals(component)) {
itr.remove();
}
}
LOG.info("taskMetric, total size:{}", metricInfo.get_metrics_size());
return metricInfo;
}
return MetricUtils.mkMetricInfo();
}
@Override
public List<MetricInfo> getTaskAndStreamMetrics(String topologyId, int taskId) throws TException {
List<MetricInfo> taskMetricList = getMetrics(topologyId, MetaType.TASK.getT());
List<MetricInfo> streamMetricList = getMetrics(topologyId, MetaType.STREAM.getT());
String taskIdStr = taskId + "";
MetricInfo taskMetricInfo;
if (taskMetricList != null && taskMetricList.size() > 0) {
taskMetricInfo = taskMetricList.get(0);
Map<String, Map<Integer, MetricSnapshot>> metrics = taskMetricInfo.get_metrics();
for (Iterator<String> itr = metrics.keySet().iterator(); itr.hasNext(); ) {
String metricName = itr.next();
String[] parts = metricName.split(MetricUtils.DELIM);
if (parts.length < 7 || !parts[3].equals(taskIdStr)) {
itr.remove();
}
}
} else {
taskMetricInfo = MetricUtils.mkMetricInfo();
}
MetricInfo streamMetricInfo;
if (streamMetricList != null && streamMetricList.size() > 0) {
streamMetricInfo = streamMetricList.get(0);
Map<String, Map<Integer, MetricSnapshot>> metrics = streamMetricInfo.get_metrics();
for (Iterator<String> itr = metrics.keySet().iterator(); itr.hasNext(); ) {
String metricName = itr.next();
String[] parts = metricName.split(MetricUtils.DELIM);
if (parts.length < 7 || !parts[3].equals(taskIdStr)) {
itr.remove();
}
}
} else {
streamMetricInfo = MetricUtils.mkMetricInfo();
}
return Lists.newArrayList(taskMetricInfo, streamMetricInfo);
}
@Override
public List<MetricInfo> getSummarizedTopologyMetrics(String topologyId) throws TException {
return data.getMetricCache().getMetricData(topologyId, MetaType.TOPOLOGY);
}
@Override
public void updateTopology(String name, String uploadedLocation,
String updateConf) throws NotAliveException,
InvalidTopologyException, TException {
try {
checkTopologyActive(data, name, true);
String topologyId = null;
StormClusterState stormClusterState = data.getStormClusterState();
topologyId = Cluster.get_topology_id(stormClusterState, name);
if (topologyId == null) {
throw new NotAliveException(name);
}
if (uploadedLocation != null) {
String stormroot =
StormConfig.masterStormdistRoot(conf, topologyId);
int lastIndexOf = uploadedLocation.lastIndexOf("/");
// /local-dir/nimbus/inbox/xxxx/
String tmpDir = uploadedLocation.substring(0, lastIndexOf);
// /local-dir/nimbus/inbox/xxxx/stormjar.jar
String stormJarPath = StormConfig.stormjar_path(tmpDir);
File file = new File(uploadedLocation);
if (file.exists()) {
file.renameTo(new File(stormJarPath));
} else {
throw new FileNotFoundException("Source \'"
+ uploadedLocation + "\' does not exist");
}
// move fileDir to /local-dir/nimbus/topologyid/
File srcDir = new File(tmpDir);
File destDir = new File(stormroot);
try {
FileUtils.moveDirectory(srcDir, destDir);
} catch (FileExistsException e) {
FileUtils.copyDirectory(srcDir, destDir);
FileUtils.deleteQuietly(srcDir);
}
// Update downloadCode timeStamp
StormConfig.write_nimbus_topology_timestamp(data.getConf(), topologyId, System.currentTimeMillis());
LOG.info("update jar of " + name + " successfully");
}
Map topoConf = StormConfig.read_nimbus_topology_conf(data.getConf(),
topologyId);
Map<Object, Object> config =
(Map<Object, Object>) JStormUtils.from_json(updateConf);
topoConf.putAll(config);
StormConfig.write_nimbus_topology_conf(data.getConf(), topologyId,
topoConf);
LOG.info("update topology " + name + " successfully");
NimbusUtils.transitionName(data, name, true,
StatusType.update_topology, config);
} catch (NotAliveException e) {
String errMsg = "Error, no this topology " + name;
LOG.error(errMsg, e);
throw new NotAliveException(errMsg);
} catch (Exception e) {
String errMsg = "Failed to update topology " + name;
LOG.error(errMsg, e);
throw new TException(errMsg);
}
}
@Override
public void updateTaskHeartbeat(TopologyTaskHbInfo taskHbs) throws TException {
String topologyId = taskHbs.get_topologyId();
Integer topologyMasterId = taskHbs.get_topologyMasterId();
TopologyTaskHbInfo nimbusTaskHbs = data.getTasksHeartbeat().get(topologyId);
if (nimbusTaskHbs == null) {
nimbusTaskHbs = new TopologyTaskHbInfo(topologyId, topologyMasterId);
data.getTasksHeartbeat().put(topologyId, nimbusTaskHbs);
}
Map<Integer, TaskHeartbeat> nimbusTaskHbMap = nimbusTaskHbs.get_taskHbs();
if (nimbusTaskHbMap == null) {
nimbusTaskHbMap = new ConcurrentHashMap<Integer, TaskHeartbeat>();
nimbusTaskHbs.set_taskHbs(nimbusTaskHbMap);
}
Map<Integer, TaskHeartbeat> taskHbMap = taskHbs.get_taskHbs();
if (taskHbMap != null) {
for (Entry<Integer, TaskHeartbeat> entry : taskHbMap.entrySet()) {
nimbusTaskHbMap.put(entry.getKey(), entry.getValue());
}
}
}
}