blob: 7e4495a18ac7bf31bc18db8ba527f567f8ead04f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.task.execute;
import backtype.storm.Config;
import backtype.storm.Constants;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.Utils;
import backtype.storm.utils.WorkerClassLoader;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.common.metric.AsmGauge;
import com.alibaba.jstorm.common.metric.QueueGauge;
import com.alibaba.jstorm.daemon.worker.timer.RotatingMapTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TaskBatchFlushTrigger;
import com.alibaba.jstorm.daemon.worker.timer.TaskHeartbeatTrigger;
import com.alibaba.jstorm.metric.*;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskBatchTransfer;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
//import com.alibaba.jstorm.message.zeroMq.IRecvConnection;
/**
* Base executor share between spout and bolt
*
*
* @author Longda
*
*/
public class BaseExecutors extends RunnableCallback {
private static Logger LOG = LoggerFactory.getLogger(BaseExecutors.class);
protected final String topologyId;
protected final String componentId;
protected final int taskId;
protected final String idStr;
protected Map storm_conf;
protected final boolean isDebug;
protected TopologyContext userTopologyCtx;
protected TopologyContext sysTopologyCtx;
protected TaskBaseMetric task_stats;
protected volatile TaskStatus taskStatus;
protected int message_timeout_secs = 30;
protected Throwable error = null;
protected ITaskReportErr report_error;
protected DisruptorQueue exeQueue;
protected BlockingQueue<Object> controlQueue;
protected Map<Integer, DisruptorQueue> innerTaskTransfer;
protected Task task;
protected long assignmentTs;
protected TaskTransfer taskTransfer;
protected JStormMetricsReporter metricsReporter;
protected boolean isFinishInit = false;
protected RotatingMapTrigger rotatingMapTrigger;
protected TaskHeartbeatTrigger taskHbTrigger;
// protected IntervalCheck intervalCheck = new IntervalCheck();
public BaseExecutors(Task task) {
this.task = task;
this.storm_conf = task.getStormConf();
this.userTopologyCtx = task.getUserContext();
this.sysTopologyCtx = task.getTopologyContext();
this.task_stats = task.getTaskStats();
this.taskId = sysTopologyCtx.getThisTaskId();
this.innerTaskTransfer = task.getInnerTaskTransfer();
this.topologyId = sysTopologyCtx.getTopologyId();
this.componentId = sysTopologyCtx.getThisComponentId();
this.idStr = JStormServerUtils.getName(componentId, taskId);
this.taskStatus = task.getTaskStatus();
this.report_error = task.getReportErrorDie();
this.taskTransfer = task.getTaskTransfer();
this.metricsReporter = task.getWorkerData().getMetricsReporter();
this.isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
message_timeout_secs = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 30);
int queue_size = Utils.getInt(storm_conf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE), 256);
WaitStrategy waitStrategy = (WaitStrategy) JStormUtils.createDisruptorWaitStrategy(storm_conf);
this.exeQueue = DisruptorQueue.mkInstance(idStr, ProducerType.MULTI, queue_size, waitStrategy);
this.exeQueue.consumerStarted();
this.controlQueue = new LinkedBlockingDeque<Object>();
this.registerInnerTransfer(exeQueue);
QueueGauge exeQueueGauge = new QueueGauge(exeQueue, idStr, MetricDef.EXECUTE_QUEUE);
JStormMetrics.registerTaskMetric(MetricUtils.taskMetricName(topologyId, componentId, taskId, MetricDef.EXECUTE_QUEUE, MetricType.GAUGE), new AsmGauge(
exeQueueGauge));
JStormHealthCheck.registerTaskHealthCheck(taskId, MetricDef.EXECUTE_QUEUE, exeQueueGauge);
rotatingMapTrigger = new RotatingMapTrigger(storm_conf, idStr + "_rotating", exeQueue);
rotatingMapTrigger.register();
taskHbTrigger = new TaskHeartbeatTrigger(storm_conf, idStr + "_taskHeartbeat", exeQueue, controlQueue, taskId, componentId, sysTopologyCtx, report_error);
taskHbTrigger.register();
assignmentTs = System.currentTimeMillis();
}
public void init() throws Exception {
// this function will be override by SpoutExecutor or BoltExecutor
throw new RuntimeException("Should implement this function");
}
public void initWrapper() {
try {
LOG.info("{} begin to init", idStr);
init();
if (taskId == getMinTaskIdOfWorker()) {
metricsReporter.setOutputCollector(getOutputCollector());
}
isFinishInit = true;
} catch (Throwable e) {
error = e;
LOG.error("Init error ", e);
report_error.report(e);
} finally {
LOG.info("{} initialization finished", idStr);
}
}
@Override
public void preRun() {
WorkerClassLoader.switchThreadContext();
}
@Override
public void postRun() {
WorkerClassLoader.restoreThreadContext();
}
@Override
public void run() {
// this function will be override by SpoutExecutor or BoltExecutor
throw new RuntimeException("Should implement this function");
}
@Override
public Exception error() {
if (error == null) {
return null;
}
return new Exception(error);
}
@Override
public void shutdown() {
LOG.info("Shutdown executing thread of " + idStr);
if (taskStatus.isShutdown() == false) {
LOG.info("Taskstatus isn't shutdown, but enter shutdown method, Occur exception");
}
this.unregistorInnerTransfer();
}
protected void registerInnerTransfer(DisruptorQueue disruptorQueue) {
LOG.info("Registor inner transfer for executor thread of " + idStr);
DisruptorQueue existInnerTransfer = innerTaskTransfer.get(taskId);
if (existInnerTransfer != null) {
LOG.info("Exist inner task transfer for executing thread of " + idStr);
if (existInnerTransfer != disruptorQueue) {
throw new RuntimeException("Inner task transfer must be only one in executing thread of " + idStr);
}
}
innerTaskTransfer.put(taskId, disruptorQueue);
}
protected void unregistorInnerTransfer() {
LOG.info("Unregistor inner transfer for executor thread of " + idStr);
innerTaskTransfer.remove(taskId);
}
protected int getMinTaskIdOfWorker() {
SortedSet<Integer> tasks = new TreeSet<Integer>(sysTopologyCtx.getThisWorkerTasks());
return tasks.first();
}
public Object getOutputCollector() {
return null;
}
}