blob: bba635e5b24e2d866c4575b056a18ba81bb49a41 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.grouping;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.Fields;
* A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping), but it can send
* Tuples from a given partition to multiple downstream tasks.
* Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a subset of those tasks. Each
* key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.
* Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the default
* AssignmentCreator hashes the key and produces an assignment of two tasks
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -1672360572274911808L;
private List<Integer> targetTasks;
private Fields fields = null;
private Fields outFields = null;
private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;
public PartialKeyGrouping() {
public PartialKeyGrouping(Fields fields) {
this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
this(fields, assignmentCreator, new BalancedTargetSelector());
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {
final byte[] rawKeyBytes = getKeyBytes(values);
final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);
return boltIds;
* Extract the key from the input Tuple.
private byte[] getKeyBytes(List<Object> values) {
byte[] raw;
if (fields != null) {
List<Object> selectedFields =, values);
ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
for (Object o : selectedFields) {
if (o instanceof List) {
out.putInt(Arrays.deepHashCode(((List) o).toArray()));
} else if (o instanceof Object[]) {
out.putInt(Arrays.deepHashCode((Object[]) o));
} else if (o instanceof byte[]) {
out.putInt(Arrays.hashCode((byte[]) o));
} else if (o instanceof short[]) {
out.putInt(Arrays.hashCode((short[]) o));
} else if (o instanceof int[]) {
out.putInt(Arrays.hashCode((int[]) o));
} else if (o instanceof long[]) {
out.putInt(Arrays.hashCode((long[]) o));
} else if (o instanceof char[]) {
out.putInt(Arrays.hashCode((char[]) o));
} else if (o instanceof float[]) {
out.putInt(Arrays.hashCode((float[]) o));
} else if (o instanceof double[]) {
out.putInt(Arrays.hashCode((double[]) o));
} else if (o instanceof boolean[]) {
out.putInt(Arrays.hashCode((boolean[]) o));
} else if (o != null) {
} else {
raw = out.array();
} else {
raw = values.get(0).toString().getBytes(); // assume key is the first field
return raw;
* Helper Classes
* This interface is responsible for choosing a subset of the target tasks to use for a given key.
* NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on multiple Storm Workers, thus
* each of them needs to come up with the same assignment for a given key.
public interface AssignmentCreator extends Serializable {
int[] createAssignment(List<Integer> targetTasks, byte[] key);
* This interface chooses one element from a task assignment to send a specific Tuple to.
public interface TargetSelector extends Serializable {
Integer chooseTask(int[] assignedTasks);
/*========== Implementations ==========*/
* This implementation of AssignmentCreator chooses two arbitrary tasks.
public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {
* Creates a two task assignment by selecting random tasks.
public int[] createAssignment(List<Integer> tasks, byte[] key) {
// It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key
final long seedForRandom = Arrays.hashCode(key);
final Random random = new Random(seedForRandom);
final int choice1 = random.nextInt(tasks.size());
int choice2 = random.nextInt(tasks.size());
// ensure that choice1 and choice2 are not the same task
choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;
return new int[]{ tasks.get(choice1), tasks.get(choice2) };
* A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples
* overall from this instance of the grouping.
public static class BalancedTargetSelector implements TargetSelector {
private Map<Integer, Long> targetTaskStats = Maps.newHashMap();
* Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.
public Integer chooseTask(int[] assignedTasks) {
Integer taskIdWithMinLoad = null;
Long minTaskLoad = Long.MAX_VALUE;
for (Integer currentTaskId : assignedTasks) {
final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
if (currentTaskLoad < minTaskLoad) {
minTaskLoad = currentTaskLoad;
taskIdWithMinLoad = currentTaskId;
targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
return taskIdWithMinLoad;