blob: 9b864582d046ad07dfb854570bb3921866902ab3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.task.comm;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import com.alibaba.jstorm.task.TaskBaseMetric;
import com.alibaba.jstorm.task.group.GrouperType;
import com.alibaba.jstorm.task.group.MkGrouper;
import com.alibaba.jstorm.utils.JStormUtils;
/**
*
* tuple sending object, which get which task should tuple be send to, and update statics
*
* @author yannian/Longda
*
*/
public class TaskSendTargets {
private static Logger LOG = LoggerFactory.getLogger(TaskSendTargets.class);
private Map<Object, Object> stormConf;
// it is system TopologyContext
private TopologyContext topologyContext;
// <Stream_id,<component, Grouping>>
private volatile Map<String, Map<String, MkGrouper>> streamComponentgrouper;
// SpoutTaskStatsRolling or BoltTaskStatsRolling
private TaskBaseMetric taskStats;
private String componentId;
private int taskId;
private boolean isDebuging = false;
private String debugIdStr;
public TaskSendTargets(Map<Object, Object> _storm_conf, String _component, Map<String, Map<String, MkGrouper>> _stream_component_grouper,
TopologyContext _topology_context, TaskBaseMetric _task_stats) {
this.stormConf = _storm_conf;
this.componentId = _component;
this.streamComponentgrouper = _stream_component_grouper;
this.topologyContext = _topology_context;
this.taskStats = _task_stats;
isDebuging = JStormUtils.parseBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
taskId = topologyContext.getThisTaskId();
debugIdStr = " Emit from " + componentId + ":" + taskId + " ";
}
// direct send tuple to special task
public List<Integer> get(Integer out_task_id, String stream, List<Object> tuple) {
// in order to improve acker's speed, skip checking
// String target_component =
// topologyContext.getComponentId(out_task_id);
// Map<String, MkGrouper> component_prouping = streamComponentgrouper
// .get(stream);
// MkGrouper grouping = component_prouping.get(target_component);
// if (grouping != null &&
// !GrouperType.direct.equals(grouping.gettype())) {
// throw new IllegalArgumentException(
// "Cannot emitDirect to a task expecting a regular grouping");
// }
if (isDebuging) {
LOG.info(debugIdStr + stream + " to " + out_task_id + ":" + tuple.toString());
}
taskStats.send_tuple(stream, 1);
List<Integer> out_tasks = new ArrayList<Integer>();
out_tasks.add(out_task_id);
return out_tasks;
}
// send tuple according to grouping
public List<Integer> get(String stream, List<Object> tuple) {
List<Integer> out_tasks = new ArrayList<Integer>();
// get grouper, then get which task should tuple be sent to.
Map<String, MkGrouper> componentCrouping = streamComponentgrouper.get(stream);
if (componentCrouping == null) {
// if the target component's parallelism is 0, don't need send to
// them
LOG.debug("Failed to get Grouper of " + stream + " in " + debugIdStr);
return out_tasks;
}
for (Entry<String, MkGrouper> ee : componentCrouping.entrySet()) {
String targetComponent = ee.getKey();
MkGrouper g = ee.getValue();
if (GrouperType.direct.equals(g.gettype())) {
throw new IllegalArgumentException("Cannot do regular emit to direct stream");
}
out_tasks.addAll(g.grouper(tuple));
}
if (isDebuging) {
LOG.info(debugIdStr + stream + " to " + out_tasks + ":" + tuple.toString());
}
int num_out_tasks = out_tasks.size();
taskStats.send_tuple(stream, num_out_tasks);
return out_tasks;
}
public void updateStreamCompGrouper(Map<String, Map<String, MkGrouper>> streamComponentgrouper) {
this.streamComponentgrouper = streamComponentgrouper;
}
}