blob: 82dd938f2570f775edb9438f7694091802bb2035 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.task.backpressure;
import java.util.List;
import java.util.Map;
import com.alibaba.jstorm.utils.JStormUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.task.TaskTransfer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.tuple.Values;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent;
import com.alibaba.jstorm.task.master.TopoMasterCtrlEvent.EventType;
/**
* Flow Control
*
* @author Basti Liu
*/
public class BackpressureController extends Backpressure {
private static Logger LOG = LoggerFactory.getLogger(BackpressureController.class);
private int taskId;
private DisruptorQueue queueControlled;
private int totalQueueSize;
private int queueSizeReduced;
private boolean isBackpressureMode = false;
private SpoutOutputCollector outputCollector;
private long maxBound, minBound;
public BackpressureController(Map conf, int taskId, DisruptorQueue queue, int queueSize) {
super(conf);
this.queueControlled = queue;
this.totalQueueSize = queueSize;
this.queueSizeReduced = queueSize;
this.taskId = taskId;
this.maxBound = 0l;
this.minBound = 0l;
}
public void setOutputCollector(SpoutOutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
public void control(TopoMasterCtrlEvent ctrlEvent) {
if (isBackpressureEnable == false) {
return;
}
EventType eventType = ctrlEvent.getEventType();
LOG.debug("Received control event, " + eventType.toString());
if (eventType.equals(EventType.startBackpressure)) {
List<Object> value = ctrlEvent.getEventValue();
int flowCtrlTime = value.get(0) != null ? (Integer) value.get(0) : 0;
start(flowCtrlTime);
} else if (eventType.equals(EventType.stopBackpressure)) {
stop();
} else if (eventType.equals(EventType.updateBackpressureConfig)) {
List<Object> value = ctrlEvent.getEventValue();
if (value != null) {
Map stormConf = (Map) value.get(0);
updateConfig(stormConf);
if (isBackpressureEnable == false) {
LOG.info("Disable backpressure in controller.");
resetBackpressureInfo();
} else {
LOG.info("Enable backpressure in controller");
}
}
}
}
public void flowControl() {
if (isBackpressureEnable == false) {
return;
}
try {
Thread.sleep(sleepTime);
while (isQueueCapacityAvailable() == false) {
Thread.sleep(1);
}
} catch (InterruptedException e) {
LOG.error("Sleep was interrupted!");
}
}
private void resetBackpressureInfo() {
sleepTime = 0l;
maxBound = 0l;
minBound = 0l;
queueSizeReduced = totalQueueSize;
isBackpressureMode = false;
}
private void start(int flowCtrlTime) {
if (flowCtrlTime > 0) {
if (maxBound < flowCtrlTime) {
sleepTime = flowCtrlTime;
} else if (maxBound == flowCtrlTime) {
if (sleepTime >= maxBound) {
sleepTime++;
} else {
sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true);
}
} else {
if (maxBound <= sleepTime) {
sleepTime++;
} else {
if (sleepTime >= flowCtrlTime) {
sleepTime = JStormUtils.halfValueOfSum(maxBound, sleepTime, true);
} else {
sleepTime = JStormUtils.halfValueOfSum(flowCtrlTime, sleepTime, true);
}
}
}
} else {
sleepTime++;
}
if (sleepTime > maxBound) {
maxBound = sleepTime;
}
int size = totalQueueSize / 100;
queueSizeReduced = size > 10 ? size : 10;
isBackpressureMode = true;
LOG.info("Start backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced, flowCtrlTime);
}
private void stop() {
if (sleepTime == minBound) {
minBound = 0;
}
sleepTime = JStormUtils.halfValueOfSum(minBound, sleepTime, false);
if (sleepTime == 0) {
resetBackpressureInfo();
TopoMasterCtrlEvent stopBp = new TopoMasterCtrlEvent(EventType.stopBackpressure, null);
outputCollector.emit(Common.TOPOLOGY_MASTER_CONTROL_STREAM_ID, new Values(stopBp));
} else {
minBound = sleepTime;
}
LOG.info("Stop backpressure at spout-{}, sleepTime={}, queueSizeReduced={}, flowCtrlTime={}", taskId, sleepTime, queueSizeReduced);
}
public boolean isBackpressureMode() {
return isBackpressureMode & isBackpressureEnable;
}
public boolean isQueueCapacityAvailable() {
return (queueControlled.population() < queueSizeReduced);
}
}