blob: 5060aeca3e46491e86f15e46b4860db3cf8117f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.callback.impl;
import backtype.storm.Config;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.callback.BaseCallback;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.cluster.StormStatus;
import com.alibaba.jstorm.daemon.nimbus.*;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.TkHbCacheTime;
import com.alibaba.jstorm.utils.JStormUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.Map.Entry;
/**
* Do real rebalance action.
*
* After nimbus receive one rebalance command, it will do as following: 1. set topology status as rebalancing 2. delay 2 * timeout seconds 3. do this callback
*
* @author Xin.Li/Longda
*
*/
public class DoRebalanceTransitionCallback extends BaseCallback {
private static Logger LOG = LoggerFactory.getLogger(DoRebalanceTransitionCallback.class);
private NimbusData data;
private String topologyid;
private StormStatus oldStatus;
private Set<Integer> newTasks;
public DoRebalanceTransitionCallback(NimbusData data, String topologyid, StormStatus status) {
this.data = data;
this.topologyid = topologyid;
this.oldStatus = status;
this.newTasks = new HashSet<Integer>();
}
@Override
public <T> Object execute(T... args) {
boolean isSetTaskInfo = false;
try {
Boolean reassign = (Boolean) args[1];
Map<Object, Object> conf = (Map<Object, Object>) args[2]; // args[0]:
// delay,
// args[1]:
// reassign_flag,
// args[2]:
// conf
if (conf != null) {
boolean isConfUpdate = false;
Map stormConf = data.getConf();
// Update topology code
Map topoConf = StormConfig.read_nimbus_topology_conf(stormConf, topologyid);
StormTopology rawOldTopology = StormConfig.read_nimbus_topology_code(stormConf, topologyid);
StormTopology rawNewTopology = NimbusUtils.normalizeTopology(conf, rawOldTopology, true);
StormTopology sysOldTopology = rawOldTopology.deepCopy();
StormTopology sysNewTopology = rawNewTopology.deepCopy();
if (conf.get(Config.TOPOLOGY_ACKER_EXECUTORS) != null) {
Common.add_acker(topoConf, sysOldTopology);
Common.add_acker(conf, sysNewTopology);
int ackerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
int oldAckerNum = JStormUtils.parseInt(topoConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
LOG.info("Update acker from oldAckerNum=" + oldAckerNum + " to ackerNum=" + ackerNum);
topoConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, ackerNum);
isConfUpdate = true;
}
// If scale-out, setup task info for new added tasks
setTaskInfo(sysOldTopology, sysNewTopology);
isSetTaskInfo = true;
// If everything is OK, write topology code into disk
StormConfig.write_nimbus_topology_code(stormConf, topologyid, Utils.serialize(rawNewTopology));
// Update topology conf if worker num has been updated
Set<Object> keys = conf.keySet();
Integer workerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS));
if (workerNum != null) {
Integer oldWorkerNum = JStormUtils.parseInt(topoConf.get(Config.TOPOLOGY_WORKERS));
topoConf.put(Config.TOPOLOGY_WORKERS, workerNum);
isConfUpdate = true;
LOG.info("Update worker num from " + oldWorkerNum + " to " + workerNum);
}
if (keys.contains(Config.ISOLATION_SCHEDULER_MACHINES)) {
topoConf.put(Config.ISOLATION_SCHEDULER_MACHINES, conf.get(Config.ISOLATION_SCHEDULER_MACHINES));
}
if (isConfUpdate) {
StormConfig.write_nimbus_topology_conf(stormConf, topologyid, topoConf);
}
}
TopologyAssignEvent event = new TopologyAssignEvent();
event.setTopologyId(topologyid);
event.setScratch(true);
event.setOldStatus(oldStatus);
event.setReassign(reassign);
if (conf != null)
event.setScaleTopology(true);
TopologyAssign.push(event);
event.waitFinish();
} catch (Exception e) {
LOG.error("do-rebalance error!", e);
// Rollback the changes on ZK
if (isSetTaskInfo) {
try {
StormClusterState clusterState = data.getStormClusterState();
clusterState.remove_task(topologyid, newTasks);
} catch (Exception e1) {
LOG.error("Failed to rollback the changes on ZK for task-" + newTasks, e);
}
}
}
DelayStatusTransitionCallback delayCallback =
new DelayStatusTransitionCallback(data, topologyid, oldStatus, StatusType.rebalancing, StatusType.done_rebalance);
return delayCallback.execute();
}
private void setTaskInfo(StormTopology oldTopology, StormTopology newTopology) throws Exception {
StormClusterState clusterState = data.getStormClusterState();
// Retrieve the max task ID
TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids(topologyid));
int cnt = taskIds.descendingIterator().next();
cnt = setBoltInfo(oldTopology, newTopology, cnt, clusterState);
cnt = setSpoutInfo(oldTopology, newTopology, cnt, clusterState);
}
private int setBoltInfo(StormTopology oldTopology, StormTopology newTopology, int cnt, StormClusterState clusterState) throws Exception {
Map<String, Bolt> oldBolts = oldTopology.get_bolts();
Map<String, Bolt> bolts = newTopology.get_bolts();
for (Entry<String, Bolt> entry : oldBolts.entrySet()) {
String boltName = entry.getKey();
Bolt oldBolt = entry.getValue();
Bolt bolt = bolts.get(boltName);
if (oldBolt.get_common().get_parallelism_hint() > bolt.get_common().get_parallelism_hint()) {
int removedTaskNum = oldBolt.get_common().get_parallelism_hint() - bolt.get_common().get_parallelism_hint();
TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids_by_componentId(topologyid, boltName));
Iterator<Integer> descendIterator = taskIds.descendingIterator();
while (--removedTaskNum >= 0) {
int taskId = descendIterator.next();
removeTask(topologyid, taskId, clusterState);
LOG.info("Remove bolt task, taskId=" + taskId + " for " + boltName);
}
} else if (oldBolt.get_common().get_parallelism_hint() == bolt.get_common().get_parallelism_hint()) {
continue;
} else {
int delta = bolt.get_common().get_parallelism_hint() - oldBolt.get_common().get_parallelism_hint();
Map<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, TaskInfo>();
for (int i = 1; i <= delta; i++) {
cnt++;
TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), "bolt");
taskInfoMap.put(cnt, taskInfo);
newTasks.add(cnt);
LOG.info("Setup new bolt task, taskId=" + cnt + " for " + boltName);
}
clusterState.add_task(topologyid, taskInfoMap);
}
}
return cnt;
}
private int setSpoutInfo(StormTopology oldTopology, StormTopology newTopology, int cnt, StormClusterState clusterState) throws Exception {
Map<String, SpoutSpec> oldSpouts = oldTopology.get_spouts();
Map<String, SpoutSpec> spouts = newTopology.get_spouts();
for (Entry<String, SpoutSpec> entry : oldSpouts.entrySet()) {
String spoutName = entry.getKey();
SpoutSpec oldSpout = entry.getValue();
SpoutSpec spout = spouts.get(spoutName);
if (oldSpout.get_common().get_parallelism_hint() > spout.get_common().get_parallelism_hint()) {
int removedTaskNum = oldSpout.get_common().get_parallelism_hint() - spout.get_common().get_parallelism_hint();
TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids_by_componentId(topologyid, spoutName));
Iterator<Integer> descendIterator = taskIds.descendingIterator();
while (--removedTaskNum >= 0) {
int taskId = descendIterator.next();
removeTask(topologyid, taskId, clusterState);
LOG.info("Remove spout task, taskId=" + taskId + " for " + spoutName);
}
} else if (oldSpout.get_common().get_parallelism_hint() == spout.get_common().get_parallelism_hint()) {
continue;
} else {
int delta = spout.get_common().get_parallelism_hint() - oldSpout.get_common().get_parallelism_hint();
Map<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, TaskInfo>();
for (int i = 1; i <= delta; i++) {
cnt++;
TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), "spout");
taskInfoMap.put(cnt, taskInfo);
newTasks.add(cnt);
LOG.info("Setup new spout task, taskId=" + cnt + " for " + spoutName);
}
clusterState.add_task(topologyid, taskInfoMap);
}
}
return cnt;
}
private void removeTask(String topologyId, int taskId, StormClusterState clusterState) throws Exception {
Set<Integer> taskIds = new HashSet<Integer>(taskId);
clusterState.remove_task(topologyid, taskIds);
Map<Integer, TkHbCacheTime> TkHbs = data.getTaskHeartbeatsCache(topologyid, false);
if (TkHbs != null) {
TkHbs.remove(taskId);
}
}
}