blob: d2700782e6254cc25219069d69dc420628563939 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.task.backpressure;
import backtype.storm.generated.*;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.*;
import com.alibaba.jstorm.task.acker.Acker;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.Map.Entry;
/**
* Coordinator is responsible for the request from trigger and controller.
* - Event from trigger:
* Find relative controllers (source spouts), and decide if it is required to send out the request.
* - Event from controller:
* If backpressure stop event, send stop request to all target triggers.
*
* @author Basti Li
*/
public class BackpressureCoordinator extends Backpressure {
private static final Logger LOG = LoggerFactory.getLogger(BackpressureCoordinator.class);
private static final int adjustedTime = 5;
private TopologyContext context;
private StormTopology topology;
private OutputCollector output;
private int topologyMasterId;
private Map<Integer, String> taskIdToComponentId;
private Map<String, SpoutSpec> spouts;
private Map<String, Bolt> bolts;
// Map<source componentId, Map<ComponentId, backpressure info>>
private Map<String, SourceBackpressureInfo> SourceTobackpressureInfo;
private Integer period;
private StormClusterState zkCluster;
private static final String BACKPRESSURE_TAG = "Backpressure has been ";
public BackpressureCoordinator(OutputCollector output, TopologyContext topologyContext, Integer taskId) {
super(topologyContext.getStormConf());
this.context = topologyContext;
this.topology = topologyContext.getRawTopology();
this.spouts = new HashMap<String, SpoutSpec>();
if (this.topology.get_spouts() != null) {
this.spouts.putAll(this.topology.get_spouts());
}
this.bolts = new HashMap<String, Bolt>();
if (this.topology.get_bolts() != null) {
this.bolts.putAll(this.topology.get_bolts());
}
this.taskIdToComponentId = topologyContext.getTaskToComponent();
this.topologyMasterId = taskId;
this.output = output;
int checkInterval = ConfigExtension.getBackpressureCheckIntervl(context.getStormConf());
int sampleNum = ConfigExtension.getBackpressureTriggerSampleNumber(context.getStormConf());
this.period = checkInterval * sampleNum;
this.zkCluster = topologyContext.getZkCluster();
try {
this.SourceTobackpressureInfo = zkCluster.get_backpressure_info(context.getTopologyId());
if (this.SourceTobackpressureInfo == null) {
this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
} else {
LOG.info("Successfully retrieve existing SourceTobackpressureInfo from zk: " + SourceTobackpressureInfo);
}
} catch (Exception e) {
LOG.warn("Failed to get SourceTobackpressureInfo from zk", e);
this.SourceTobackpressureInfo = new HashMap<String, SourceBackpressureInfo>();
}
}
private Set<String> getInputSpoutsForBolt(StormTopology topology, String boltComponentId, Set<String> componentsTraversed) {
Set<String> ret = new TreeSet<String>();
if (componentsTraversed == null) {
componentsTraversed = new HashSet<String>();
}
Bolt bolt = bolts.get(boltComponentId);
if (bolt == null) {
return ret;
}
ComponentCommon common = bolt.get_common();
Set<GlobalStreamId> inputstreams = common.get_inputs().keySet();
Set<String> inputComponents = new TreeSet<String>();
for (GlobalStreamId streamId : inputstreams) {
inputComponents.add(streamId.get_componentId());
}
Set<String> spoutComponentIds = new HashSet<String>(spouts.keySet());
Set<String> boltComponentIds = new HashSet<String>(bolts.keySet());
for (String inputComponent : inputComponents) {
// Skip the components which has been traversed before, to avoid dead loop when there are loop bolts in topology
if (componentsTraversed.contains(inputComponent)) {
continue;
} else {
componentsTraversed.add(inputComponent);
}
if (spoutComponentIds.contains(inputComponent)) {
ret.add(inputComponent);
} else if (boltComponentIds.contains(inputComponent)) {
Set<String> inputs = getInputSpoutsForBolt(topology, inputComponent, componentsTraversed);
ret.addAll(inputs);
}
}
return ret;
}
public void process(Tuple input) {
if (isBackpressureEnable == false) {
return;
}
int sourceTask = input.getSourceTask();
String componentId = taskIdToComponentId.get(sourceTask);
if (componentId == null) {
LOG.warn("Receive tuple from unknown task-" + sourceTask);
return;
}
if (spouts.keySet().contains(componentId)) {
if (SourceTobackpressureInfo.get(componentId) != null) {
handleEventFromSpout(sourceTask, input);
}
} else if (bolts.keySet().contains(componentId)) {
handleEventFromBolt(sourceTask, input);
}
}
public void updateBackpressureConfig(Map conf) {
updateConfig(conf);
if (isBackpressureEnable == false) {
LOG.info("Disable backpressure in coordinator.");
SourceTobackpressureInfo.clear();
} else {
LOG.info("Enable backpressure in coordinator.");
}
TopoMasterCtrlEvent updateBpConfig = new TopoMasterCtrlEvent(EventType.updateBackpressureConfig, new ArrayList<Object>());
updateBpConfig.addEventValue(conf);
Values values = new Values(updateBpConfig);
Set<Integer> targetTasks = new TreeSet<Integer>(taskIdToComponentId.keySet());
targetTasks.remove(topologyMasterId);
targetTasks.removeAll(context.getComponentTasks(Acker.ACKER_COMPONENT_ID));
sendBackpressureMessage(targetTasks, values, EventType.updateBackpressureConfig);
reportBackpressureStatus();
}
private boolean checkSpoutsUnderBackpressure(Set<String> spouts) {
boolean ret = false;
if (spouts != null) {
for (String spout : spouts) {
SourceBackpressureInfo backpressureInfo = SourceTobackpressureInfo.get(spout);
if (backpressureInfo != null && backpressureInfo.getTasks().size() > 0) {
ret = true;
break;
}
}
}
return ret;
}
private TargetBackpressureInfo getBackpressureInfoBySourceSpout(String sourceSpout, String targetComponentId, boolean created) {
TargetBackpressureInfo ret = null;
SourceBackpressureInfo info = SourceTobackpressureInfo.get(sourceSpout);
if (info == null) {
if (created) {
info = new SourceBackpressureInfo();
SourceTobackpressureInfo.put(sourceSpout, info);
}
} else {
ret = info.getTargetTasks().get(targetComponentId);
}
if (ret == null && created) {
ret = new TargetBackpressureInfo();
info.getTargetTasks().put(targetComponentId, ret);
}
return ret;
}
private boolean checkIntervalExpired(long time) {
boolean ret = false;
if (time != 0) {
if (System.currentTimeMillis() - time > period) {
ret = true;
}
}
return ret;
}
private void sendBackpressureMessage(Set<Integer> targetTasks, Values value, EventType backpressureType) {
for (Integer taskId : targetTasks) {
output.emitDirect(taskId, Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, value);
LOG.debug("Send " + backpressureType.toString() + " request to taskId-" + taskId);
}
}
private void handleEventFromSpout(int sourceTask, Tuple input) {
TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
EventType type = ctrlEvent.getEventType();
boolean update = false;
if (type.equals(EventType.stopBackpressure)) {
String spoutComponentId = taskIdToComponentId.get(sourceTask);
SourceBackpressureInfo info = SourceTobackpressureInfo.remove(spoutComponentId);
if (info != null) {
info.getTasks().remove(sourceTask);
if (info.getTasks().size() == 0) {
for (Entry<String, TargetBackpressureInfo> entry : info.getTargetTasks().entrySet()) {
String componentId = entry.getKey();
// Make sure if all source spouts for this bolt are NOT under backpressure mode.
Set<String> sourceSpouts = getInputSpoutsForBolt(topology, componentId, null);
if (checkSpoutsUnderBackpressure(sourceSpouts) == false) {
Set<Integer> tasks = new TreeSet<Integer>();
tasks.addAll(context.getComponentTasks(componentId));
sendBackpressureMessage(tasks, new Values(ctrlEvent), type);
}
}
}
update = true;
} else {
LOG.error("Received event from non-recorded spout-" + sourceTask);
}
} else {
LOG.warn("Received unexpected event, " + type.toString());
}
// If task set under backpressure has been changed, report the latest status
if (update) {
reportBackpressureStatus();
}
}
private void handleEventFromBolt(int sourceTask, Tuple input) {
String componentId = taskIdToComponentId.get(sourceTask);
Set<String> inputSpouts = getInputSpoutsForBolt(topology, componentId, null);
TopoMasterCtrlEvent ctrlEvent = (TopoMasterCtrlEvent) input.getValueByField("ctrlEvent");
EventType type = ctrlEvent.getEventType();
Set<Integer> notifyList = new TreeSet<Integer>();
Values values = null;
TargetBackpressureInfo info = null;
boolean update = false;
if (type.equals(EventType.startBackpressure)) {
int flowCtrlTime = (Integer) ctrlEvent.getEventValue().get(0);
for (String spout : inputSpouts) {
info = getBackpressureInfoBySourceSpout(spout, componentId, true);
SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout);
update = info.getTasks().add(sourceTask);
boolean add = false;
if (System.currentTimeMillis() - sourceInfo.getLastestTimeStamp() > period) {
add = true;
} else {
EventType lastestBpEvent = sourceInfo.getLastestBackpressureEvent();
if (lastestBpEvent != null && lastestBpEvent.equals(EventType.startBackpressure) == false) {
add = true;
}
int maxFlowCtrlTime = sourceInfo.getMaxFlowCtrlTime();
if ((flowCtrlTime - maxFlowCtrlTime > adjustedTime || maxFlowCtrlTime == -1) &&
flowCtrlTime >= 0) {
add = true;
}
}
info.setFlowCtrlTime(flowCtrlTime);
info.setBackpressureStatus(type);
if (add) {
info.setTimeStamp(System.currentTimeMillis());
// Only when the number of bolt tasks sending request is more than a configured number, coordinator will
// send out backpressure request to controller. It is used to avoid the problem that very few tasks might
// cause the over control.
double taskBpRatio = Double.valueOf(info.getTasks().size()) / Double.valueOf(context.getComponentTasks(componentId).size()) ;
if (taskBpRatio >= triggerBpRatio) {
Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout));
if (spoutTasks != null) {
SourceTobackpressureInfo.get(spout).getTasks().addAll(spoutTasks);
notifyList.addAll(spoutTasks);
}
} else {
update = false;
}
} else {
update = false;
}
}
List<Object> value = new ArrayList<Object>();
value.add(info.getFlowCtrlTime());
TopoMasterCtrlEvent startBp = new TopoMasterCtrlEvent(EventType.startBackpressure, value);
values = new Values(startBp);
} else if (type.equals(EventType.stopBackpressure)) {
for (String spout : inputSpouts) {
info = getBackpressureInfoBySourceSpout(spout, componentId, false);
SourceBackpressureInfo sourceInfo = SourceTobackpressureInfo.get(spout);
if (info != null) {
Set<Integer> tasks = info.getTasks();
if (tasks != null) {
if(tasks.remove(sourceTask)) {
update = true;
}
}
}
if (sourceInfo != null && checkIntervalExpired(sourceInfo.getLastestTimeStamp())) {
info.setTimeStamp(System.currentTimeMillis());
Set<Integer> spoutTasks = new TreeSet<Integer>(context.getComponentTasks(spout));
if (spoutTasks != null) {
notifyList.addAll(spoutTasks);
}
info.setBackpressureStatus(type);
}
}
// Check if all source spouts are Not under backpressure. If so, notify the bolt.
if (checkSpoutsUnderBackpressure(inputSpouts) == false) {
notifyList.add(sourceTask);
}
TopoMasterCtrlEvent stoptBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
values = new Values(stoptBp);
} else {
LOG.warn("Received unknown event " + type.toString());
}
sendBackpressureMessage(notifyList, values, type);
// If task set under backpressure has been changed, report the latest status
if (update) {
LOG.info("inputspouts=" + inputSpouts + " for " + componentId + "-" + sourceTask + ", eventType=" + type.toString());
reportBackpressureStatus();
}
}
private Set<Integer> getTasksUnderBackpressure() {
Set<Integer> ret = new HashSet<Integer>();
for (Entry<String, SourceBackpressureInfo> entry : SourceTobackpressureInfo.entrySet()) {
SourceBackpressureInfo sourceInfo = entry.getValue();
if (sourceInfo.getTasks().size() > 0) {
ret.addAll(sourceInfo.getTasks());
for (Entry<String, TargetBackpressureInfo> targetEntry: sourceInfo.getTargetTasks().entrySet()) {
ret.addAll(targetEntry.getValue().getTasks());
}
}
}
return ret;
}
private void reportBackpressureStatus() {
try {
StringBuilder stringBuilder = new StringBuilder();
Set<Integer> underTasks = getTasksUnderBackpressure();
stringBuilder.append(BACKPRESSURE_TAG);
if (underTasks.isEmpty()){
stringBuilder.append("closed ");
}else {
stringBuilder.append("opened: ");
stringBuilder.append(underTasks);
}
zkCluster.report_task_error(context.getTopologyId(), context.getThisTaskId(), stringBuilder.toString(), BACKPRESSURE_TAG);
zkCluster.set_backpressure_info(context.getTopologyId(), SourceTobackpressureInfo);
LOG.info(stringBuilder.toString());
} catch (Exception e) {
LOG.error("can't update backpressure state ", e);
}
}
}