blob: 30d641cd1bf1b861dd3887f5ecb719b99438384a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.jstorm.task.group;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.JavaObject;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.RandomRange;
import com.alibaba.jstorm.utils.Thrift;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
/**
* Grouper, get which task should be send to for one tuple
*
* @author yannian
*
*/
public class MkGrouper {
private static final Logger LOG = LoggerFactory.getLogger(MkGrouper.class);
private TopologyContext topology_context;
// this component output fields
private Fields out_fields;
private Grouping thrift_grouping;
private Grouping._Fields fields;
private GrouperType grouptype;
private List<Integer> out_tasks;
private List<Integer> local_tasks;
private String streamId;
// grouping method
private RandomRange randomrange;
private Random random;
private MkShuffer shuffer;
private MkCustomGrouper custom_grouper;
private MkFieldsGrouper fields_grouper;
private MkLocalShuffer local_shuffer_grouper;
private MkLocalFirst localFirst;
public MkGrouper(TopologyContext _topology_context, Fields _out_fields, Grouping _thrift_grouping, List<Integer> _outTasks, String streamId,
WorkerData workerData) {
this.topology_context = _topology_context;
this.out_fields = _out_fields;
this.thrift_grouping = _thrift_grouping;
this.streamId = streamId;
this.out_tasks = new ArrayList<Integer>();
this.out_tasks.addAll(_outTasks);
Collections.sort(this.out_tasks);
this.local_tasks = _topology_context.getThisWorkerTasks();
this.fields = Thrift.groupingType(thrift_grouping);
this.grouptype = this.parseGroupType(workerData);
String id = _topology_context.getThisTaskId() + ":" + streamId;
LOG.info(id + " grouptype is " + grouptype + ", out_tasks is " + out_tasks + ", local_tasks" + local_tasks);
}
public GrouperType gettype() {
return grouptype;
}
private GrouperType parseGroupType(WorkerData workerData) {
GrouperType grouperType = null;
if (Grouping._Fields.FIELDS.equals(fields)) {
if (Thrift.isGlobalGrouping(thrift_grouping)) {
// global grouping, just send tuple to first task
grouperType = GrouperType.global;
} else {
List<String> fields_group = Thrift.fieldGrouping(thrift_grouping);
Fields fields = new Fields(fields_group);
fields_grouper = new MkFieldsGrouper(out_fields, fields, out_tasks);
// hashcode by fields
grouperType = GrouperType.fields;
}
} else if (Grouping._Fields.ALL.equals(fields)) {
// send to every task
grouperType = GrouperType.all;
} else if (Grouping._Fields.SHUFFLE.equals(fields)) {
grouperType = GrouperType.shuffle;
shuffer = new MkShuffer(out_tasks, workerData);
} else if (Grouping._Fields.NONE.equals(fields)) {
// random send one task
this.random = new Random();
grouperType = GrouperType.none;
} else if (Grouping._Fields.CUSTOM_OBJECT.equals(fields)) {
// user custom grouping by JavaObject
JavaObject jobj = thrift_grouping.get_custom_object();
CustomStreamGrouping g = Thrift.instantiateJavaObject(jobj);
int myTaskId = topology_context.getThisTaskId();
String componentId = topology_context.getComponentId(myTaskId);
GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
grouperType = GrouperType.custom_obj;
} else if (Grouping._Fields.CUSTOM_SERIALIZED.equals(fields)) {
// user custom group by serialized Object
byte[] obj = thrift_grouping.get_custom_serialized();
CustomStreamGrouping g = (CustomStreamGrouping) Utils.javaDeserialize(obj);
int myTaskId = topology_context.getThisTaskId();
String componentId = topology_context.getComponentId(myTaskId);
GlobalStreamId stream = new GlobalStreamId(componentId, streamId);
custom_grouper = new MkCustomGrouper(topology_context, g, stream, out_tasks, myTaskId);
grouperType = GrouperType.custom_serialized;
} else if (Grouping._Fields.DIRECT.equals(fields)) {
// directly send to a special task
grouperType = GrouperType.direct;
} else if (Grouping._Fields.LOCAL_OR_SHUFFLE.equals(fields)) {
grouperType = GrouperType.local_or_shuffle;
local_shuffer_grouper = new MkLocalShuffer(local_tasks, out_tasks, workerData);
} else if (Grouping._Fields.LOCAL_FIRST.equals(fields)) {
grouperType = GrouperType.localFirst;
localFirst = new MkLocalFirst(local_tasks, out_tasks, workerData);
}
return grouperType;
}
/**
* get which task should tuple be sent to
*
* @param values
* @return
*/
public List<Integer> grouper(List<Object> values) {
if (GrouperType.global.equals(grouptype)) {
// send to task which taskId is 0
return JStormUtils.mk_list(out_tasks.get(0));
} else if (GrouperType.fields.equals(grouptype)) {
// field grouping
return fields_grouper.grouper(values);
} else if (GrouperType.all.equals(grouptype)) {
// send to every task
return out_tasks;
} else if (GrouperType.shuffle.equals(grouptype)) {
// random, but the random is different from none
return shuffer.grouper(values);
} else if (GrouperType.none.equals(grouptype)) {
int rnd = Math.abs(random.nextInt() % out_tasks.size());
return JStormUtils.mk_list(out_tasks.get(rnd));
} else if (GrouperType.custom_obj.equals(grouptype)) {
return custom_grouper.grouper(values);
} else if (GrouperType.custom_serialized.equals(grouptype)) {
return custom_grouper.grouper(values);
} else if (GrouperType.local_or_shuffle.equals(grouptype)) {
return local_shuffer_grouper.grouper(values);
} else if (GrouperType.localFirst.equals(grouptype)) {
return localFirst.grouper(values);
} else {
LOG.warn("Unsupportted group type");
}
return new ArrayList<Integer>();
}
}