blob: baf709fa1fff8a3059741707ccc1ad7a08631f51 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.task.execute.spout;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.jstorm.common.metric.AsmHistogram;
import com.alibaba.jstorm.metric.JStormMetrics;
import com.alibaba.jstorm.metric.MetricDef;
import com.alibaba.jstorm.metric.MetricType;
import com.alibaba.jstorm.metric.MetricUtils;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.task.acker.Acker;
import com.alibaba.jstorm.task.comm.TaskSendTargets;
import com.alibaba.jstorm.task.comm.TupleInfo;
import com.alibaba.jstorm.task.comm.UnanchoredSend;
import com.alibaba.jstorm.task.error.ITaskReportErr;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.TimeOutMap;
import com.alibaba.jstorm.utils.TimeUtils;
import backtype.storm.Config;
import backtype.storm.spout.ISpout;
import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.TupleImplExt;
import backtype.storm.utils.DisruptorQueue;
/**
* spout collector, sending tuple through this Object
*
* @author yannian/Longda
*/
public class SpoutCollector implements ISpoutOutputCollector {
private static Logger LOG = LoggerFactory.getLogger(SpoutCollector.class);
private TaskSendTargets sendTargets;
private Map storm_conf;
private TaskTransfer transfer_fn;
// private TimeCacheMap pending;
private TimeOutMap<Long, TupleInfo> pending;
// topology_context is system topology context
private TopologyContext topology_context;
private DisruptorQueue disruptorAckerQueue;
private TaskBaseMetric task_stats;
private backtype.storm.spout.ISpout spout;
private ITaskReportErr report_error;
private Integer task_id;
private Integer ackerNum;
private boolean isDebug = false;
private AsmHistogram emitTotalTimer;
Random random;
//Integer task_id, backtype.storm.spout.ISpout spout, TaskBaseMetric task_stats, TaskSendTargets sendTargets, Map _storm_conf,
//TaskTransfer _transfer_fn, TimeOutMap<Long, TupleInfo> pending, TopologyContext topology_context, DisruptorQueue disruptorAckerQueue,
//ITaskReportErr _report_error
public SpoutCollector(Task task, TimeOutMap<Long, TupleInfo> pending, DisruptorQueue disruptorAckerQueue) {
this.sendTargets = task.getTaskSendTargets();
this.storm_conf = task.getStormConf();
this.transfer_fn = task.getTaskTransfer();
this.pending = pending;
this.topology_context = task.getTopologyContext();
this.disruptorAckerQueue = disruptorAckerQueue;
this.task_stats = task.getTaskStats();
this.spout = (ISpout)task.getTaskObj();
this.task_id = task.getTaskId();
this.report_error = task.getReportErrorDie();
ackerNum = JStormUtils.parseInt(storm_conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
isDebug = JStormUtils.parseBoolean(storm_conf.get(Config.TOPOLOGY_DEBUG), false);
random = new Random();
random.setSeed(System.currentTimeMillis());
String componentId = topology_context.getThisComponentId();
emitTotalTimer =
(AsmHistogram) JStormMetrics
.registerTaskMetric(MetricUtils.taskMetricName(topology_context.getTopologyId(), componentId, task_id, MetricDef.COLLECTOR_EMIT_TIME,
MetricType.HISTOGRAM), new AsmHistogram());
}
@Override
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
return sendSpoutMsg(streamId, tuple, messageId, null);
}
@Override
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
sendSpoutMsg(streamId, tuple, messageId, taskId);
}
private List<Integer> sendSpoutMsg(String out_stream_id, List<Object> values, Object message_id, Integer out_task_id) {
final long startTime = System.nanoTime();
try {
List<Integer> out_tasks;
if (out_task_id != null) {
out_tasks = sendTargets.get(out_task_id, out_stream_id, values);
} else {
out_tasks = sendTargets.get(out_stream_id, values);
}
if (out_tasks.size() == 0) {
// don't need send tuple to other task
return out_tasks;
}
List<Long> ackSeq = new ArrayList<Long>();
Boolean needAck = (message_id != null) && (ackerNum > 0);
// This change storm logic
// Storm can't make sure root_id is unique
// storm's logic is root_id = MessageId.generateId(random);
// when duplicate root_id, it will miss call ack/fail
Long root_id = MessageId.generateId(random);
if (needAck) {
while (pending.containsKey(root_id)) {
root_id = MessageId.generateId(random);
}
}
for (Integer t : out_tasks) {
MessageId msgid;
if (needAck) {
// Long as = MessageId.generateId();
Long as = MessageId.generateId(random);
msgid = MessageId.makeRootId(root_id, as);
ackSeq.add(as);
} else {
msgid = MessageId.makeUnanchored();
}
TupleImplExt tp = new TupleImplExt(topology_context, values, task_id, out_stream_id, msgid);
tp.setTargetTaskId(t);
transfer_fn.transfer(tp);
}
if (needAck) {
TupleInfo info = new TupleInfo();
info.setStream(out_stream_id);
info.setValues(values);
info.setMessageId(message_id);
info.setTimestamp(System.nanoTime());
pending.putHead(root_id, info);
List<Object> ackerTuple = JStormUtils.mk_list((Object) root_id, JStormUtils.bit_xor_vals(ackSeq), task_id);
UnanchoredSend.send(topology_context, sendTargets, transfer_fn, Acker.ACKER_INIT_STREAM_ID, ackerTuple);
} else if (message_id != null) {
TupleInfo info = new TupleInfo();
info.setStream(out_stream_id);
info.setValues(values);
info.setMessageId(message_id);
info.setTimestamp(0);
AckSpoutMsg ack = new AckSpoutMsg(spout, null, info, task_stats, isDebug);
ack.run();
}
return out_tasks;
} finally {
long endTime = System.nanoTime();
emitTotalTimer.update((endTime - startTime) / TimeUtils.NS_PER_US);
}
}
@Override
public void reportError(Throwable error) {
report_error.report(error);
}
}