blob: d61494c0f883d683490002ef55d393212604c22c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.grouping;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.shade.com.google.common.collect.Maps;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.Fields;
/**
* A variation on FieldGrouping. This grouping operates on a partitioning of the incoming tuples (like a FieldGrouping),
* but it can send Tuples from a given partition to multiple downstream tasks.
*
* <p>Given a total pool of target tasks, this grouping will always send Tuples with a given key to one member of a
* subset of those tasks. Each key is assigned a subset of tasks. Each tuple is then sent to one task from that subset.
*
* <p>Notes: - the default TaskSelector ensures each task gets as close to a balanced number of Tuples as possible - the
* default AssignmentCreator hashes the key and produces an assignment of two tasks
*/
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -1672360572274911808L;
private List<Integer> targetTasks;
private Fields fields = null;
private Fields outFields = null;
private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;
public PartialKeyGrouping() {
this(null);
}
public PartialKeyGrouping(Fields fields) {
this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
this(fields, assignmentCreator, new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks;
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}
@Override
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> boltIds = new ArrayList<>(1);
if (values.size() > 0) {
final byte[] rawKeyBytes = getKeyBytes(values);
final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);
boltIds.add(selectedTask);
}
return boltIds;
}
/**
* Extract the key from the input Tuple.
*/
private byte[] getKeyBytes(List<Object> values) {
byte[] raw;
if (fields != null) {
List<Object> selectedFields = outFields.select(fields, values);
ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
for (Object o : selectedFields) {
if (o instanceof List) {
out.putInt(Arrays.deepHashCode(((List) o).toArray()));
} else if (o instanceof Object[]) {
out.putInt(Arrays.deepHashCode((Object[]) o));
} else if (o instanceof byte[]) {
out.putInt(Arrays.hashCode((byte[]) o));
} else if (o instanceof short[]) {
out.putInt(Arrays.hashCode((short[]) o));
} else if (o instanceof int[]) {
out.putInt(Arrays.hashCode((int[]) o));
} else if (o instanceof long[]) {
out.putInt(Arrays.hashCode((long[]) o));
} else if (o instanceof char[]) {
out.putInt(Arrays.hashCode((char[]) o));
} else if (o instanceof float[]) {
out.putInt(Arrays.hashCode((float[]) o));
} else if (o instanceof double[]) {
out.putInt(Arrays.hashCode((double[]) o));
} else if (o instanceof boolean[]) {
out.putInt(Arrays.hashCode((boolean[]) o));
} else if (o != null) {
out.putInt(o.hashCode());
} else {
out.putInt(0);
}
}
raw = out.array();
} else {
raw = values.get(0).toString().getBytes(); // assume key is the first field
}
return raw;
}
/*==================================================
* Helper Classes
*==================================================*/
/**
* This interface is responsible for choosing a subset of the target tasks to use for a given key.
*
* <p>NOTE: whatever scheme you use to create the assignment should be deterministic. This may be executed on
* multiple Storm Workers, thus each of them needs to come up with the same assignment for a given key.
*/
public interface AssignmentCreator extends Serializable {
int[] createAssignment(List<Integer> targetTasks, byte[] key);
}
/**
* This interface chooses one element from a task assignment to send a specific Tuple to.
*/
public interface TargetSelector extends Serializable {
Integer chooseTask(int[] assignedTasks);
}
/*========== Implementations ==========*/
/**
* This implementation of AssignmentCreator chooses two arbitrary tasks.
*/
public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {
/**
* Creates a two task assignment by selecting random tasks.
*/
@Override
public int[] createAssignment(List<Integer> tasks, byte[] key) {
// It is necessary that this produce a deterministic assignment based on the key, so seed the Random from the key
final long seedForRandom = Arrays.hashCode(key);
final Random random = new Random(seedForRandom);
final int choice1 = random.nextInt(tasks.size());
int choice2 = random.nextInt(tasks.size());
// ensure that choice1 and choice2 are not the same task
choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;
return new int[]{ tasks.get(choice1), tasks.get(choice2) };
}
}
/**
* A basic implementation of target selection. This strategy chooses the task within the assignment that has received the fewest Tuples
* overall from this instance of the grouping.
*/
public static class BalancedTargetSelector implements TargetSelector {
private Map<Integer, Long> targetTaskStats = Maps.newHashMap();
/**
* Chooses one of the incoming tasks and selects the one that has been selected the fewest times so far.
*/
@Override
public Integer chooseTask(int[] assignedTasks) {
Integer taskIdWithMinLoad = null;
Long minTaskLoad = Long.MAX_VALUE;
for (Integer currentTaskId : assignedTasks) {
final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
if (currentTaskLoad < minTaskLoad) {
minTaskLoad = currentTaskLoad;
taskIdWithMinLoad = currentTaskId;
}
}
targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
return taskIdWithMinLoad;
}
}
}