blob: dc6bfbb93491b1a9cc4821f79b798261608666e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.elasticity.util;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
// #TODO: going to make this entire class parametrized.
public class TestElasticityUtils {
private static final TaskName TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0");
private static final TaskName ELASTIC_TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0_1_2");
private static final TaskName TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
private static final TaskName ELASTIC_TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
@Test
public void testComputeLastProcessedOffsetsFromCheckpointMap() {
// Setup :
// there is one ssp = SystemStreamPartition [systemA, streamB, partition(0)] consumed by the job
// Note: Partition 0_1_2 means task consumes keyBucket 1 of partition 0 and has elasticityFactor 2.
// Before elasticity, job has one task with name "Partition 0"
// with elasticity factor 2, job has 2 tasks with names "Partition 0_0_2" and "Partition 0_1_2"
// Partition 0_0_2 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
// Partition 0_1_2 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
// with elasticity factor 4, job has 4 tasks with names "Partition 0_0_4", "Partition 0_1_4", "Partition 0_2_4" and "Partition 0_3_4"
// Partition 0_0_4 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
// Partition 0_1_4 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
// Partition 0_2_4 consumes SSP[systemA, stream B, partition(0), keyBucket(2)]
// Partition 0_3_4 consumes SSP[systemA, stream B, partition(0), keyBucket(3)]
//
// From the definition of keyBucket computation using elasticity factor in
// {@link IncomingMessageEnvelope.getSystemStresamPartition(elasticityFactor) as
// keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor
// messages processed by 0_0_4 and 0_2_4 will be the same as those processed by 0_0_2
// messages processed by 0_1_4 and 0_3_4 will be the same as those processed by 0_1_2
// messages processed by 0_0_2 and 0_1_2 will be the same as those processed by Partition 0 itself
TaskName taskName = new TaskName("Partition 0_0_2");
Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0);
SystemStreamPartition ssp2 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 2);
SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
// offsets ordering 1 < 2 < 3 < 4
Mockito.when(mockSystemAdmin.offsetComparator("1", "2")).thenReturn(-1);
Mockito.when(mockSystemAdmin.offsetComparator("2", "1")).thenReturn(1);
Mockito.when(mockSystemAdmin.offsetComparator("1", "3")).thenReturn(-1);
Mockito.when(mockSystemAdmin.offsetComparator("3", "1")).thenReturn(1);
Mockito.when(mockSystemAdmin.offsetComparator("1", "4")).thenReturn(-1);
Mockito.when(mockSystemAdmin.offsetComparator("4", "1")).thenReturn(1);
Mockito.when(mockSystemAdmin.offsetComparator("2", "3")).thenReturn(-1);
Mockito.when(mockSystemAdmin.offsetComparator("3", "2")).thenReturn(1);
Mockito.when(mockSystemAdmin.offsetComparator("2", "4")).thenReturn(-1);
Mockito.when(mockSystemAdmin.offsetComparator("4", "2")).thenReturn(1);
Mockito.when(mockSystemAdmin.offsetComparator("3", "4")).thenReturn(-1);
Mockito.when(mockSystemAdmin.offsetComparator("4", "3")).thenReturn(1);
SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
// case 1: for task Partition 0_0_2: last deploy was with ef = 2 itself.
// hence "Partition 0_0_2" has the largest offset and that should be used for computing checkpoint for 0_0_2 now also
checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "4"));
checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "2"));
checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "3"));
Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
Assert.assertEquals("4", result.get(ssp0));
// case 2: for task Partition 0_0_2: last deploy was with ef =1
// hence "Partition 0" has the largest offset. Computing checkpint for 0_0_2 should use this largest offset
checkpointMap = new HashMap<>();
checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "4"));
checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "1"));
checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "2"));
result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
Assert.assertEquals("4", result.get(ssp0));
// case 3: for task partition 0_0_2: last deploy was with ef = 4
// hence checkpoints of Partition 0_0_4 and Partition 0_3_4 are relevant.
// since messages from both end up in 0_0_2 with ef=2, need to take min of their checkpointed offsets
checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "2"));
checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "4"));
result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
Assert.assertEquals("3", result.get(ssp0));
}
@Test
public void testComputeLastProcessedOffsetsWithEdgeCases() {
TaskName taskName = new TaskName("Partition 0_0_2");
Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0);
SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
// case 1: empty checkpoint map
Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
Assert.assertTrue("if given checkpoint map is empty, return empty last processed offsets map", result.isEmpty());
// case 2: null checkpoints given for some ancestor tasks
checkpointMap.put(new TaskName("Partition 0"), null);
result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty());
// case 3: null checkpoints given for some descendant tasks
checkpointMap.put(new TaskName("Partition 0_0_4"), null);
result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty());
}
@Test
public void testTaskIsGroupByPartitionOrGroupBySSP() {
String msgPartition = "GroupByPartition task should start with Partition";
String msgSsp = "GroupBySystemStreamPartition task should start with SystemStreamPartition";
Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_PARTITION));
Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_PARTITION));
Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(
ELASTIC_TASKNAME_GROUP_BY_PARTITION));
Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_SSP));
Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_SSP));
Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
TaskName taskName = new TaskName("FooBar");
Assert.assertFalse(msgPartition, ElasticityUtils.isGroupByPartitionTask(taskName));
Assert.assertFalse(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(taskName));
}
@Test
public void testIsTaskNameElastic() {
Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_SSP));
Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_SSP));
Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_PARTITION));
Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
}
@Test
public void testGetElasticTaskNameParts() {
TaskNameComponents taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_PARTITION);
Assert.assertEquals(taskNameComponents.partition, 0);
Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET);
Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR);
taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_PARTITION);
Assert.assertEquals(taskNameComponents.partition, 0);
Assert.assertEquals(taskNameComponents.keyBucket, 1);
Assert.assertEquals(taskNameComponents.elasticityFactor, 2);
taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_SSP);
Assert.assertEquals(taskNameComponents.system, "systemA");
Assert.assertEquals(taskNameComponents.stream, "streamB");
Assert.assertEquals(taskNameComponents.partition, 0);
Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET);
Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR);
taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_SSP);
Assert.assertEquals(taskNameComponents.system, "systemA");
Assert.assertEquals(taskNameComponents.stream, "streamB");
Assert.assertEquals(taskNameComponents.partition, 0);
Assert.assertEquals(taskNameComponents.keyBucket, 1);
Assert.assertEquals(taskNameComponents.elasticityFactor, 2);
taskNameComponents = ElasticityUtils.getTaskNameParts(new TaskName("FooBar"));
Assert.assertEquals(taskNameComponents.partition, TaskNameComponents.INVALID_PARTITION);
}
@Test
public void testIsOtherTaskAncestorDescendantOfCurrentTask() {
TaskName task0 = new TaskName("Partition 0");
TaskName task1 = new TaskName("Partition 1");
TaskName task002 = new TaskName("Partition 0_0_2");
TaskName task012 = new TaskName("Partition 0_1_2");
TaskName task004 = new TaskName("Partition 0_0_4");
TaskName task014 = new TaskName("Partition 0_1_4");
TaskName task024 = new TaskName("Partition 0_2_4");
TaskName task034 = new TaskName("Partition 0_3_4");
TaskName sspTask0 = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
TaskName sspTask002 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_2");
TaskName sspTask012 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
TaskName sspTask004 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_4");
TaskName sspTask014 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_4");
TaskName sspTask024 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 2]_4");
TaskName sspTask034 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 3]_4");
// Partition 0 is ancestor of all tasks Partition 0_0_2, 0_1_2, 0_0_4, 0_1_4, 0_2_4, 0_3_4 and itself
// and all these tasks are descendants of Partition 0 (except itself)
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task0));
Assert.assertFalse(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task1));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task012, task0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task014, task0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task034, task0));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task002));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task012));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task004));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task014));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task024));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task034));
// Partition 0_0_2 is ancestor of tasks Partition 0_0_4 and 0_2_4 and itself
// these tasks are descendants of 0_0_2
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task002));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task002));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task002));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task004));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task024));
// "SystemStreamPartition [systemA, streamB, 0]
// is ancestor of all tasks "SystemStreamPartition [systemA, streamB, 0, 0]_2, [systemA, streamB, 0, 1]_2 and the rest incl itself
// and all these tasks are descendants of Partition 0 (except itself)
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask0, sspTask0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask012, sspTask0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask014, sspTask0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask0));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask034, sspTask0));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask002));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask012));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask004));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask014));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask024));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask034));
// SystemStreamPartition [systemA, streamB, 0, 0]_2 is ancestor of
// tasks SystemStreamPartition [systemA, streamB, 0, 0]_4, SystemStreamPartition [systemA, streamB, 0, 2]_4 and itself
// similarly, these tasks are descendants of SystemStreamPartition [systemA, streamB, 0, 0]_2
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask002));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask002));
Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask002));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask004));
Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask024));
}
@Test
public void testGetAncestorAndDescendantCheckpoints() {
TaskName taskName = new TaskName("Partition 0_0_2");
Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
Checkpoint ansCheckpoint1 = buildCheckpointV2(ssp, "1");
Checkpoint ansCheckpoint2 = buildCheckpointV2(ssp, "2");
Checkpoint desCheckpoint1 = buildCheckpointV2(ssp, "3");
Checkpoint desCheckpoint2 = buildCheckpointV2(ssp, "4");
Checkpoint unrelCheckpoint = buildCheckpointV2(ssp, "5");
Set<Checkpoint> ansCheckpointSet = new HashSet<>(Arrays.asList(ansCheckpoint1, ansCheckpoint2));
Set<Checkpoint> desCheckpointSet = new HashSet<>(Arrays.asList(desCheckpoint1, desCheckpoint2));
checkpointMap.put(new TaskName("Partition 0"), ansCheckpoint1);
checkpointMap.put(new TaskName("Partition 0_0_2"), ansCheckpoint2);
checkpointMap.put(new TaskName("Partition 0_0_4"), desCheckpoint1);
checkpointMap.put(new TaskName("Partition 0_2_4"), desCheckpoint2);
checkpointMap.put(new TaskName("Partition 0_1_4"), unrelCheckpoint);
Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> result =
ElasticityUtils.getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
Set<Checkpoint> anscestorCheckpointSet = result.getLeft();
Set<Checkpoint> descendantCheckpointSetForEf4 = result.getRight().get(4);
Assert.assertTrue("should contain all ancestors' checkpoints",
anscestorCheckpointSet.containsAll(ansCheckpointSet));
Assert.assertFalse("should not contain a descendant checkpoint in anscetor list",
anscestorCheckpointSet.contains(desCheckpoint1));
Assert.assertFalse("should not contain an unrelated checkpoint in ancestor list",
anscestorCheckpointSet.contains(unrelCheckpoint));
Assert.assertTrue("should contain all descendants' checkpoints",
descendantCheckpointSetForEf4.containsAll(desCheckpointSet));
Assert.assertFalse("should not contain a anscetor checkpoint in descendant list",
descendantCheckpointSetForEf4.contains(ansCheckpoint1));
Assert.assertFalse("should not contain an unrelated checkpoint in descendant list",
descendantCheckpointSetForEf4.contains(unrelCheckpoint));
}
@Test
public void testGetOffsetForSSPInCheckpoint() {
String offset1 = "1111";
String offset2 = "2222";
// case 1: when looking for exact ssp
SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset1);
// case 2: checkpoint has ssp with key bucket but looking for the full ssp (same system stream and partition but without keybucket)
SystemStreamPartition sspWithKB = new SystemStreamPartition("systemA", "streamB", new Partition(0), 1);
checkpoint1 = buildCheckpointV2(sspWithKB, offset2);
Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset2);
// case 3: try getting offset for an ssp not present in the checkpoint -> should return null
SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(1));
Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp2), null);
}
@Test
public void testGetMaxMinOffsetForSSPInCheckpointSet() {
String offset1 = "1111";
String offset2 = "2222";
SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
Checkpoint checkpoint2 = buildCheckpointV2(ssp, offset2);
Set<Checkpoint> checkpointSet = new HashSet<>(Arrays.asList(checkpoint1, checkpoint2));
SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
// offset 1 < offset2
Mockito.when(mockSystemAdmin.offsetComparator(offset1, offset2)).thenReturn(-1);
Mockito.when(mockSystemAdmin.offsetComparator(offset2, offset1)).thenReturn(1);
// case 1: when exact ssp is in checkpoint set
Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
// case 2: when looking for ssp with keyBucket 1 whereas checkpoint set only has full ssp (same system stream and partition but without keybucket)
SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, 1);
Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
// case 3: when ssp not in checkpoint set -> should receive null for min and max offset
SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(0));
Assert.assertEquals(null, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
Assert.assertEquals(null, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
}
@Test
public void testWasElasticityEnabled() {
Checkpoint checkpoint1 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0)), "1");
Checkpoint checkpoint2 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(1)), "2");
Checkpoint checkpoint3 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 0), "3");
Checkpoint checkpoint4 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 1), "4");
// case 0: empty checkpoint map
Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(new HashMap<>()));
// case 1: no tasks with elasticity enabled in the checkpoint map
Map<TaskName, Checkpoint> checkpointMap1 = new HashMap<>();
checkpointMap1.put(new TaskName("Partition 0"), checkpoint1);
checkpointMap1.put(new TaskName("Partition 2"), checkpoint2);
Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap1));
// case 2: tasks with no elasticity and tasks with elasticity both present in the checkpoint map
Map<TaskName, Checkpoint> checkpointMap2 = new HashMap<>();
checkpointMap2.put(new TaskName("Partition 0"), checkpoint1);
checkpointMap2.put(new TaskName("Partition 2"), checkpoint2);
checkpointMap2.put(new TaskName("Partition 0_0_2"), checkpoint3);
Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap2));
// case 3: only tasks with elasticity present in the checkpoint map
Map<TaskName, Checkpoint> checkpointMap3 = new HashMap<>();
checkpointMap3.put(new TaskName("Partition 0_0_2"), checkpoint3);
checkpointMap3.put(new TaskName("Partition 0_1_2"), checkpoint4);
Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap3));
// case 4: repeat same checks with GroupBySSP grouper tasks
Map<TaskName, Checkpoint> checkpointMap4 = new HashMap<>();
checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0]"), checkpoint1);
checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 1]"), checkpoint2);
Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap4));
checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 0]_2"), checkpoint3);
checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 1]_2"), checkpoint4);
Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap4));
// case 5: repeat same checks with AllSspToSingleTask grouper tasks - no elasticity supported for this grouper
Map<TaskName, Checkpoint> checkpointMap5 = new HashMap<>();
checkpointMap5.put(new TaskName("Task-0"), checkpoint1);
checkpointMap5.put(new TaskName("Task-1"), checkpoint2);
Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap5));
}
private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, String offset) {
return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, offset),
ImmutableMap.of("backend", ImmutableMap.of("store", "10")));
}
}