blob: b0fdc92dcd0f3c5949a33dfe7b7baa7805f7fe2f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.task;
import backtype.storm.messaging.IContext;
import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.spout.ISpout;
import backtype.storm.task.IBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.DisruptorQueue;
import backtype.storm.utils.WorkerClassLoader;
import clojure.lang.Atom;
import com.alibaba.jstorm.callback.AsyncLoopDefaultKill;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.task.error.TaskReportError;
import com.alibaba.jstorm.task.error.TaskReportErrorAndDie;
import com.alibaba.jstorm.task.execute.BaseExecutors;
import com.alibaba.jstorm.task.execute.BoltExecutors;
import com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SingleThreadSpoutExecutors;
import com.alibaba.jstorm.task.execute.spout.SpoutExecutors;
import com.alibaba.jstorm.task.group.MkGrouper;
import com.alibaba.jstorm.utils.JStormServerUtils;
import com.alibaba.jstorm.utils.JStormUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Task instance
*
* @author yannian/Longda
*/
public class Task implements Runnable{
private final static Logger LOG = LoggerFactory.getLogger(Task.class);
private Map<Object, Object> stormConf;
private TopologyContext topologyContext;
private TopologyContext userContext;
private IContext context;
private TaskTransfer taskTransfer;
private TaskReceiver taskReceiver;
private Map<Integer, DisruptorQueue> innerTaskTransfer;
private Map<Integer, DisruptorQueue> deserializeQueues;
private AsyncLoopDefaultKill workHalt;
private String topologyId;
private Integer taskId;
private String componentId;
private volatile TaskStatus taskStatus;
private Atom openOrPrepareWasCalled;
// running time counter
private UptimeComputer uptime = new UptimeComputer();
private StormClusterState zkCluster;
private Object taskObj;
private TaskBaseMetric taskStats;
private WorkerData workerData;
private TaskSendTargets taskSendTargets;
private TaskReportErrorAndDie reportErrorDie;
private boolean isTaskBatchTuple;
private TaskShutdownDameon taskShutdownDameon;
@SuppressWarnings("rawtypes")
public Task(WorkerData workerData, int taskId) throws Exception {
openOrPrepareWasCalled = new Atom(Boolean.valueOf(false));
this.workerData = workerData;
this.topologyContext = workerData.getContextMaker().makeTopologyContext(workerData.getSysTopology(), taskId, openOrPrepareWasCalled);
this.userContext = workerData.getContextMaker().makeTopologyContext(workerData.getRawTopology(), taskId, openOrPrepareWasCalled);
this.taskId = taskId;
this.componentId = topologyContext.getThisComponentId();
this.stormConf = Common.component_conf(workerData.getStormConf(), topologyContext, componentId);
this.taskStatus = new TaskStatus();
this.innerTaskTransfer = workerData.getInnerTaskTransfer();
this.deserializeQueues = workerData.getDeserializeQueues();
this.topologyId = workerData.getTopologyId();
this.context = workerData.getContext();
this.workHalt = workerData.getWorkHalt();
this.zkCluster =workerData.getZkCluster();
this.taskStats = new TaskBaseMetric(topologyId, componentId, taskId,
ConfigExtension.isEnableMetrics(workerData.getStormConf()));
LOG.info("Begin to deserialize taskObj " + componentId + ":" + this.taskId);
WorkerClassLoader.switchThreadContext();
// get real task object -- spout/bolt/spoutspec
this.taskObj = Common.get_task_object(topologyContext.getRawTopology(), componentId, WorkerClassLoader.getInstance());
WorkerClassLoader.restoreThreadContext();
isTaskBatchTuple = ConfigExtension.isTaskBatchTuple(stormConf);
LOG.info("Transfer/receive in batch mode :" + isTaskBatchTuple);
LOG.info("Loading task " + componentId + ":" + this.taskId);
}
private TaskSendTargets makeSendTargets() {
String component = topologyContext.getThisComponentId();
// get current task's output
// <Stream_id,<component, Grouping>>
Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData);
return new TaskSendTargets(stormConf, component, streamComponentGrouper, topologyContext, taskStats);
}
private void updateSendTargets() {
if (taskSendTargets != null) {
Map<String, Map<String, MkGrouper>> streamComponentGrouper = Common.outbound_components(topologyContext, workerData);
taskSendTargets.updateStreamCompGrouper(streamComponentGrouper);
} else {
LOG.error("taskSendTargets is null when trying to update it.");
}
}
public TaskSendTargets echoToSystemBolt() {
// send "startup" tuple to system bolt
List<Object> msg = new ArrayList<Object>();
msg.add("startup");
// create task receive object
TaskSendTargets sendTargets = makeSendTargets();
UnanchoredSend.send(topologyContext, sendTargets, taskTransfer, Common.SYSTEM_STREAM_ID, msg);
return sendTargets;
}
public boolean isSingleThread(Map conf) {
boolean isOnePending = JStormServerUtils.isOnePending(conf);
if (isOnePending == true) {
return true;
}
return ConfigExtension.isSpoutSingleThread(conf);
}
public BaseExecutors mkExecutor() {
BaseExecutors baseExecutor = null;
if (taskObj instanceof IBolt) {
baseExecutor = new BoltExecutors(this);
} else if (taskObj instanceof ISpout) {
if (isSingleThread(stormConf) == true) {
baseExecutor = new SingleThreadSpoutExecutors(this);
} else {
baseExecutor = new MultipleThreadSpoutExecutors(this);
}
}
return baseExecutor;
}
/**
* create executor to receive tuples and run bolt/spout execute function
*/
private RunnableCallback prepareExecutor() {
// create report error callback,
// in fact it is storm_cluster.report-task-error
ITaskReportErr reportError = new TaskReportError(zkCluster, topologyId, taskId);
// report error and halt worker
reportErrorDie = new TaskReportErrorAndDie(reportError, workHalt);
final BaseExecutors baseExecutor = mkExecutor();
return baseExecutor;
}
public TaskReceiver mkTaskReceiver() {
String taskName = JStormServerUtils.getName(componentId, taskId);
if (isTaskBatchTuple)
taskReceiver = new TaskBatchReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
else
taskReceiver = new TaskReceiver(this, taskId, stormConf, topologyContext, innerTaskTransfer, taskStatus, taskName);
deserializeQueues.put(taskId, taskReceiver.getDeserializeQueue());
return taskReceiver;
}
public TaskShutdownDameon execute() throws Exception {
taskSendTargets = echoToSystemBolt();
// create thread to get tuple from zeroMQ,
// and pass the tuple to bolt/spout
taskTransfer = mkTaskSending(workerData);
RunnableCallback baseExecutor = prepareExecutor();
AsyncLoopThread executor_threads = new AsyncLoopThread(baseExecutor, false, Thread.MAX_PRIORITY, true);
taskReceiver = mkTaskReceiver();
List<AsyncLoopThread> allThreads = new ArrayList<AsyncLoopThread>();
allThreads.add(executor_threads);
LOG.info("Finished loading task " + componentId + ":" + taskId);
taskShutdownDameon = getShutdown(allThreads, taskReceiver.getDeserializeQueue(),
baseExecutor);
return taskShutdownDameon;
}
private TaskTransfer mkTaskSending(WorkerData workerData) {
// sending tuple's serializer
KryoTupleSerializer serializer = new KryoTupleSerializer(workerData.getStormConf(), topologyContext);
String taskName = JStormServerUtils.getName(componentId, taskId);
// Task sending all tuples through this Object
TaskTransfer taskTransfer;
if (isTaskBatchTuple)
taskTransfer = new TaskBatchTransfer(this, taskName, serializer, taskStatus, workerData);
else
taskTransfer = new TaskTransfer(this, taskName, serializer, taskStatus, workerData);
return taskTransfer;
}
public TaskShutdownDameon getShutdown(List<AsyncLoopThread> allThreads, DisruptorQueue deserializeQueue, RunnableCallback baseExecutor) {
AsyncLoopThread ackerThread = null;
if (baseExecutor instanceof SpoutExecutors) {
ackerThread = ((SpoutExecutors) baseExecutor).getAckerRunnableThread();
if (ackerThread != null) {
allThreads.add(ackerThread);
}
}
AsyncLoopThread recvThread = taskReceiver.getDeserializeThread();
allThreads.add(recvThread);
AsyncLoopThread serializeThread = taskTransfer.getSerializeThread();
allThreads.add(serializeThread);
TaskShutdownDameon shutdown = new TaskShutdownDameon(taskStatus, topologyId, taskId, allThreads, zkCluster, taskObj, this);
return shutdown;
}
public TaskShutdownDameon getTaskShutdownDameon(){
return taskShutdownDameon;
}
public void run(){
try {
taskShutdownDameon=this.execute();
}catch (Throwable e){
LOG.error("init task take error", e);
}
}
public static TaskShutdownDameon mk_task(WorkerData workerData, int taskId) throws Exception {
Task t = new Task(workerData, taskId);
return t.execute();
}
/**
* Update the data which can be changed dynamically e.g. when scale-out of a task parallelism
*/
public void updateTaskData() {
// Only update the local task list of topologyContext here. Because
// other
// relative parts in context shall be updated while the updating of
// WorkerData (Task2Component and Component2Task map)
List<Integer> localTasks = JStormUtils.mk_list(workerData.getTaskids());
topologyContext.setThisWorkerTasks(localTasks);
userContext.setThisWorkerTasks(localTasks);
// Update the TaskSendTargets
updateSendTargets();
}
public long getWorkerAssignmentTs() {
return workerData.getAssignmentTs();
}
public AssignmentType getWorkerAssignmentType() {
return workerData.getAssignmentType();
}
public void unregisterDeserializeQueue() {
deserializeQueues.remove(taskId);
}
public String getComponentId() {
return componentId;
}
public Integer getTaskId() {
return taskId;
}
public DisruptorQueue getExecuteQueue() {
return innerTaskTransfer.get(taskId);
}
public DisruptorQueue getDeserializeQueue() {
return deserializeQueues.get(taskId);
}
public Map<Object, Object> getStormConf() {
return stormConf;
}
public TopologyContext getTopologyContext() {
return topologyContext;
}
public TopologyContext getUserContext() {
return userContext;
}
public TaskTransfer getTaskTransfer() {
return taskTransfer;
}
public TaskReceiver getTaskReceiver() {
return taskReceiver;
}
public Map<Integer, DisruptorQueue> getInnerTaskTransfer() {
return innerTaskTransfer;
}
public Map<Integer, DisruptorQueue> getDeserializeQueues() {
return deserializeQueues;
}
public String getTopologyId() {
return topologyId;
}
public TaskStatus getTaskStatus() {
return taskStatus;
}
public StormClusterState getZkCluster() {
return zkCluster;
}
public Object getTaskObj() {
return taskObj;
}
public TaskBaseMetric getTaskStats() {
return taskStats;
}
public WorkerData getWorkerData() {
return workerData;
}
public TaskSendTargets getTaskSendTargets() {
return taskSendTargets;
}
public TaskReportErrorAndDie getReportErrorDie() {
return reportErrorDie;
}
public boolean isTaskBatchTuple() {
return isTaskBatchTuple;
}
}