blob: 27a54e8d6a9640c57764181ed42015e232f6f69e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.daemon;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.Thrift;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.grouping.LoadAwareShuffleGrouping;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
public class GrouperFactory {
// A no-op grouper
public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
public void refreshLoad(LoadMapping loadMapping) {
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return null;
public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId,
Fields outFields,
Grouping thriftGrouping,
List<Integer> unsortedTargetTasks,
Map<String, Object> topoConf) {
List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks);
final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
CustomStreamGrouping result = null;
switch (Thrift.groupingType(thriftGrouping)) {
case FIELDS:
if (Thrift.isGlobalGrouping(thriftGrouping)) {
result = new GlobalGrouper();
} else {
result = new FieldsGrouper(outFields, thriftGrouping);
if (isNotLoadAware) {
result = new ShuffleGrouping();
} else {
result = new LoadAwareShuffleGrouping();
case ALL:
result = new AllGrouper();
// Prefer local tasks as target tasks if possible
Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks()));
targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks);
if (isNotLoadAware) {
result = new ShuffleGrouping();
} else {
result = new LoadAwareShuffleGrouping();
case NONE:
result = new NoneGrouper();
result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class);
case DIRECT:
result = DIRECT;
result = null;
if (null != result) {
result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks);
if (result instanceof LoadAwareCustomStreamGrouping) {
return (LoadAwareCustomStreamGrouping) result;
} else {
return new BasicLoadAwareCustomStreamGrouping(result);
* A bridge between CustomStreamGrouping and LoadAwareCustomStreamGrouping
public static class BasicLoadAwareCustomStreamGrouping implements LoadAwareCustomStreamGrouping {
private final CustomStreamGrouping customStreamGrouping;
public BasicLoadAwareCustomStreamGrouping(CustomStreamGrouping customStreamGrouping) {
this.customStreamGrouping = customStreamGrouping;
public void refreshLoad(LoadMapping loadMapping) {
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
customStreamGrouping.prepare(context, stream, targetTasks);
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return customStreamGrouping.chooseTasks(taskId, values);
public static class FieldsGrouper implements CustomStreamGrouping {
private Fields outFields;
private List<List<Integer>> targetTasks;
private Fields groupFields;
private int numTasks;
public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
this.outFields = outFields;
this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = new ArrayList<List<Integer>>();
for (Integer targetTask : targetTasks) {
this.numTasks = targetTasks.size();
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int targetTaskIndex = TupleUtils.chooseTaskIndex(, values), numTasks);
return targetTasks.get(targetTaskIndex);
public static class GlobalGrouper implements CustomStreamGrouping {
private List<Integer> targetTasks;
public GlobalGrouper() {
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
public List<Integer> chooseTasks(int taskId, List<Object> values) {
if (targetTasks.isEmpty()) {
return null;
// It's possible for target to have multiple tasks if it reads multiple sources
return Collections.singletonList(targetTasks.get(0));
public static class NoneGrouper implements CustomStreamGrouping {
private final Random random;
private List<Integer> targetTasks;
private int numTasks;
public NoneGrouper() {
random = new Random();
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
this.numTasks = targetTasks.size();
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int index = random.nextInt(numTasks);
return Collections.singletonList(targetTasks.get(index));
public static class AllGrouper implements CustomStreamGrouping {
private List<Integer> targetTasks;
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return targetTasks;