blob: 1f628b9a797a794d9de7bbcb6dabfc533ed0bb9b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.grouping;
import com.google.common.collect.Lists;
import org.apache.storm.task.WorkerTopologyContext;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class ShuffleGroupingTest {
/**
* Tests that we round robbin correctly using ShuffleGrouping implementation.
* */
@Test
public void testShuffleGrouping() {
final int numTasks = 6;
final ShuffleGrouping grouper = new ShuffleGrouping();
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
// Define our taskIds
final List<Integer> availableTaskIds = Lists.newArrayList();
for (int i = 0; i < numTasks; i++) {
availableTaskIds.add(i);
}
WorkerTopologyContext context = mock(WorkerTopologyContext.class);
grouper.prepare(context, null, availableTaskIds);
// Keep track of how many times we see each taskId
int[] taskCounts = new int[numTasks];
for (int i = 1; i <= 30000; i++) {
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
assertEquals("Single task Id returned", 1, taskIds.size());
int taskId = taskIds.get(0);
assertTrue("TaskId should exist", taskId >= 0 && taskId < numTasks);
taskCounts[taskId]++;
}
for (int i = 0; i < numTasks; i++) {
assertEquals("Distribution should be even for all nodes", 5000, taskCounts[i]);
}
}
/**
* Tests that we round robbin correctly with multiple threads using ShuffleGrouping implementation.
*/
@Test
public void testShuffleGroupMultiThreaded() throws InterruptedException, ExecutionException {
final int numTasks = 6;
final int groupingExecutionsPerThread = 30000;
final int numThreads = 10;
final CustomStreamGrouping grouper = new ShuffleGrouping();
// Task Id not used, so just pick a static value
final int inputTaskId = 100;
// Define our taskIds - the test expects these to be incrementing by one up from zero
final List<Integer> availableTaskIds = Lists.newArrayList();
for (int i = 0; i < numTasks; i++) {
availableTaskIds.add(i);
}
final WorkerTopologyContext context = mock(WorkerTopologyContext.class);
// Call prepare with our available taskIds
grouper.prepare(context, null, availableTaskIds);
List<Callable<int[]>> threadTasks = Lists.newArrayList();
for (int x=0; x < numThreads; x++) {
Callable<int[]> threadTask = new Callable<int[]>() {
@Override
public int[] call() throws Exception {
int[] taskCounts = new int[availableTaskIds.size()];
for (int i = 1; i <= groupingExecutionsPerThread; i++) {
List<Integer> taskIds = grouper.chooseTasks(inputTaskId, Lists.newArrayList());
// Validate a single task id return
assertNotNull("Not null taskId list returned", taskIds);
assertEquals("Single task Id returned", 1, taskIds.size());
int taskId = taskIds.get(0);
assertTrue("TaskId should exist", taskId >= 0 && taskId < availableTaskIds.size());
taskCounts[taskId]++;
}
return taskCounts;
}
};
// Add to our collection.
threadTasks.add(threadTask);
}
ExecutorService executor = Executors.newFixedThreadPool(threadTasks.size());
List<Future<int[]>> taskResults = executor.invokeAll(threadTasks);
// Wait for all tasks to complete
int[] taskIdTotals = new int[numTasks];
for (Future taskResult: taskResults) {
while (!taskResult.isDone()) {
Thread.sleep(1000);
}
int[] taskDistributions = (int[]) taskResult.get();
for (int i = 0; i < taskDistributions.length; i++) {
taskIdTotals[i] += taskDistributions[i];
}
}
for (int i = 0; i < numTasks; i++) {
assertEquals(numThreads * groupingExecutionsPerThread / numTasks, taskIdTotals[i]);
}
}
}