blob: 0f8359783c2cf676c1841479be27e22679d01790 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.daemon;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.Thrift;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.grouping.CustomStreamGrouping;
import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
import org.apache.storm.grouping.LoadAwareShuffleGrouping;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.grouping.ShuffleGrouping;
import org.apache.storm.shade.com.google.common.collect.Ordering;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
public class GrouperFactory {
// A no-op grouper
public static final LoadAwareCustomStreamGrouping DIRECT = new LoadAwareCustomStreamGrouping() {
@Override
public void refreshLoad(LoadMapping loadMapping) {
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return null;
}
};
public static LoadAwareCustomStreamGrouping mkGrouper(WorkerTopologyContext context, String componentId, String streamId,
Fields outFields,
Grouping thriftGrouping,
List<Integer> unsortedTargetTasks,
Map<String, Object> topoConf) {
List<Integer> targetTasks = Ordering.natural().sortedCopy(unsortedTargetTasks);
final boolean isNotLoadAware = (null != topoConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING) && (boolean) topoConf
.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING));
CustomStreamGrouping result = null;
switch (Thrift.groupingType(thriftGrouping)) {
case FIELDS:
if (Thrift.isGlobalGrouping(thriftGrouping)) {
result = new GlobalGrouper();
} else {
result = new FieldsGrouper(outFields, thriftGrouping);
}
break;
case SHUFFLE:
if (isNotLoadAware) {
result = new ShuffleGrouping();
} else {
result = new LoadAwareShuffleGrouping();
}
break;
case ALL:
result = new AllGrouper();
break;
case LOCAL_OR_SHUFFLE:
// Prefer local tasks as target tasks if possible
Set<Integer> sameTasks = Sets.intersection(Sets.newHashSet(targetTasks), Sets.newHashSet(context.getThisWorkerTasks()));
targetTasks = (sameTasks.isEmpty()) ? targetTasks : new ArrayList<>(sameTasks);
if (isNotLoadAware) {
result = new ShuffleGrouping();
} else {
result = new LoadAwareShuffleGrouping();
}
break;
case NONE:
result = new NoneGrouper();
break;
case CUSTOM_OBJECT:
result = (CustomStreamGrouping) Thrift.instantiateJavaObject(thriftGrouping.get_custom_object());
break;
case CUSTOM_SERIALIZED:
result = Utils.javaDeserialize(thriftGrouping.get_custom_serialized(), CustomStreamGrouping.class);
break;
case DIRECT:
result = DIRECT;
break;
default:
result = null;
break;
}
if (null != result) {
result.prepare(context, new GlobalStreamId(componentId, streamId), targetTasks);
}
if (result instanceof LoadAwareCustomStreamGrouping) {
return (LoadAwareCustomStreamGrouping) result;
} else {
return new BasicLoadAwareCustomStreamGrouping(result);
}
}
/**
* A bridge between CustomStreamGrouping and LoadAwareCustomStreamGrouping.
*/
public static class BasicLoadAwareCustomStreamGrouping implements LoadAwareCustomStreamGrouping {
private final CustomStreamGrouping customStreamGrouping;
public BasicLoadAwareCustomStreamGrouping(CustomStreamGrouping customStreamGrouping) {
this.customStreamGrouping = customStreamGrouping;
}
@Override
public void refreshLoad(LoadMapping loadMapping) {
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
customStreamGrouping.prepare(context, stream, targetTasks);
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return customStreamGrouping.chooseTasks(taskId, values);
}
}
public static class FieldsGrouper implements CustomStreamGrouping {
private Fields outFields;
private List<List<Integer>> targetTasks;
private Fields groupFields;
private int numTasks;
public FieldsGrouper(Fields outFields, Grouping thriftGrouping) {
this.outFields = outFields;
this.groupFields = new Fields(Thrift.fieldGrouping(thriftGrouping));
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = new ArrayList<List<Integer>>();
for (Integer targetTask : targetTasks) {
this.targetTasks.add(Collections.singletonList(targetTask));
}
this.numTasks = targetTasks.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int targetTaskIndex = TupleUtils.chooseTaskIndex(outFields.select(groupFields, values), numTasks);
return targetTasks.get(targetTaskIndex);
}
}
public static class GlobalGrouper implements CustomStreamGrouping {
private List<Integer> targetTasks;
public GlobalGrouper() {
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
if (targetTasks.isEmpty()) {
return null;
}
// It's possible for target to have multiple tasks if it reads multiple sources
return Collections.singletonList(targetTasks.get(0));
}
}
public static class NoneGrouper implements CustomStreamGrouping {
private final Random random;
private List<Integer> targetTasks;
private int numTasks;
public NoneGrouper() {
random = new Random();
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
this.numTasks = targetTasks.size();
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
int index = random.nextInt(numTasks);
return Collections.singletonList(targetTasks.get(index));
}
}
public static class AllGrouper implements CustomStreamGrouping {
private List<Integer> targetTasks;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
return targetTasks;
}
}
}