blob: da855a120341be6f9995864d788cd592a996a3ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.ReadableCoordinator;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackFactory;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.InOrder;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
public class TestRunLoop {
// Immutable objects shared by all test methods.
private final ExecutorService executor = null;
private final SamzaContainerMetrics containerMetrics = new SamzaContainerMetrics("container", new MetricsRegistryMap());
private final long windowMs = -1;
private final long commitMs = -1;
private final long callbackTimeoutMs = 0;
private final long maxThrottlingDelayMs = 0;
private final long maxIdleMs = 10;
private final Partition p0 = new Partition(0);
private final Partition p1 = new Partition(1);
private final TaskName taskName0 = new TaskName(p0.toString());
private final TaskName taskName1 = new TaskName(p1.toString());
private final SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStream", p0);
private final SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStream", p1);
private final IncomingMessageEnvelope envelope00 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0");
private final IncomingMessageEnvelope envelope11 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1");
private final IncomingMessageEnvelope envelope01 = new IncomingMessageEnvelope(ssp0, "1", "key0", "value0");
private final IncomingMessageEnvelope ssp0EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0);
private final IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1);
@Rule
public Timeout maxTestDurationInSeconds = Timeout.seconds(120);
@Test
public void testProcessMultipleTasks() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
tasks.put(taskName1, task1);
int maxMessagesInFlight = 1;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope11).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream).thenReturn(null);
runLoop.run();
verify(task0).process(eq(envelope00), any(), any());
verify(task1).process(eq(envelope11), any(), any());
assertEquals(4L, containerMetrics.envelopes().getCount());
}
@Test
public void testProcessInOrder() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope01).thenReturn(ssp0EndOfStream).thenReturn(null);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
int maxMessagesInFlight = 1;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
runLoop.run();
InOrder inOrder = inOrder(task0);
inOrder.verify(task0).process(eq(envelope00), any(), any());
inOrder.verify(task0).process(eq(envelope01), any(), any());
}
@Test
public void testProcessCallbacksCompletedOutOfOrder() {
int maxMessagesInFlight = 2;
ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
OffsetManager offsetManager = mock(OffsetManager.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
when(task0.offsetManager()).thenReturn(offsetManager);
CountDownLatch firstMessageBarrier = new CountDownLatch(1);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1, ReadableCoordinator.class);
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
taskExecutor.submit(() -> {
firstMessageBarrier.await();
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
callback.complete();
return null;
});
return null;
}).when(task0).process(eq(envelope00), any(), any());
doAnswer(invocation -> {
assertEquals(1, task0.metrics().messagesInFlight().getValue());
assertEquals(0, task0.metrics().asyncCallbackCompleted().getCount());
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
callback.complete();
firstMessageBarrier.countDown();
return null;
}).when(task0).process(eq(envelope01), any(), any());
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope01).thenReturn(null);
runLoop.run();
InOrder inOrder = inOrder(task0);
inOrder.verify(task0).process(eq(envelope00), any(), any());
inOrder.verify(task0).process(eq(envelope01), any(), any());
verify(offsetManager).update(eq(taskName0), eq(ssp0), eq(envelope00.getOffset()));
assertEquals(2L, containerMetrics.processes().getCount());
}
@Test
public void testWindow() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
int maxMessagesInFlight = 1;
long windowMs = 1;
RunLoopTask task = getMockRunLoopTask(taskName0, ssp0);
when(task.isWindowableTask()).thenReturn(true);
final AtomicInteger windowCount = new AtomicInteger(0);
doAnswer(x -> {
windowCount.incrementAndGet();
if (windowCount.get() == 4) {
x.getArgumentAt(0, ReadableCoordinator.class).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
return null;
}).when(task).window(any());
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task);
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(null);
runLoop.run();
verify(task, times(4)).window(any());
}
@Test
public void testCommitSingleTask() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1, ReadableCoordinator.class);
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
callback.complete();
return null;
}).when(task0).process(eq(envelope00), any(), any());
RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(this.taskName0, task0);
tasks.put(taskName1, task1);
int maxMessagesInFlight = 1;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
//have a null message in between to make sure task0 finishes processing and invoke the commit
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope11).thenReturn(null);
runLoop.run();
verify(task0).process(any(), any(), any());
verify(task1).process(any(), any(), any());
verify(task0).commit();
verify(task1, never()).commit();
}
@Test
public void testCommitAllTasks() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1, ReadableCoordinator.class);
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
coordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
coordinator.shutdown(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
callback.complete();
return null;
}).when(task0).process(eq(envelope00), any(), any());
RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(this.taskName0, task0);
tasks.put(taskName1, task1);
int maxMessagesInFlight = 1;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
//have a null message in between to make sure task0 finishes processing and invoke the commit
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope11).thenReturn(null);
runLoop.run();
verify(task0).process(any(), any(), any());
verify(task1).process(any(), any(), any());
verify(task0).commit();
verify(task1).commit();
}
@Test
public void testShutdownOnConsensus() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
int maxMessagesInFlight = 1;
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1, ReadableCoordinator.class);
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
callback.complete();
return null;
}).when(task0).process(eq(envelope00), any(), any());
RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1, ReadableCoordinator.class);
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
callback.complete();
return null;
}).when(task1).process(eq(envelope11), any(), any());
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
tasks.put(taskName1, task1);
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
// consensus is reached after envelope1 is processed.
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope11).thenReturn(null);
runLoop.run();
verify(task0).process(any(), any(), any());
verify(task1).process(any(), any(), any());
assertEquals(2L, containerMetrics.envelopes().getCount());
assertEquals(2L, containerMetrics.processes().getCount());
}
@Test
public void testEndOfStreamWithMultipleTasks() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
tasks.put(taskName1, task1);
int maxMessagesInFlight = 1;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false))
.thenReturn(envelope00)
.thenReturn(envelope11)
.thenReturn(ssp0EndOfStream)
.thenReturn(ssp1EndOfStream)
.thenReturn(null);
runLoop.run();
verify(task0).process(eq(envelope00), any(), any());
verify(task0).endOfStream(any());
verify(task1).process(eq(envelope11), any(), any());
verify(task1).endOfStream(any());
assertEquals(4L, containerMetrics.envelopes().getCount());
}
@Test
public void testEndOfStreamWaitsForInFlightMessages() {
int maxMessagesInFlight = 2;
ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
OffsetManager offsetManager = mock(OffsetManager.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
when(task0.offsetManager()).thenReturn(offsetManager);
CountDownLatch firstMessageBarrier = new CountDownLatch(2);
doAnswer(invocation -> {
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
taskExecutor.submit(() -> {
firstMessageBarrier.await();
callback.complete();
return null;
});
return null;
}).when(task0).process(eq(envelope00), any(), any());
doAnswer(invocation -> {
assertEquals(1, task0.metrics().messagesInFlight().getValue());
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
callback.complete();
firstMessageBarrier.countDown();
return null;
}).when(task0).process(eq(envelope01), any(), any());
doAnswer(invocation -> {
assertEquals(0, task0.metrics().messagesInFlight().getValue());
assertEquals(2, task0.metrics().asyncCallbackCompleted().getCount());
return null;
}).when(task0).endOfStream(any());
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope01).thenReturn(ssp0EndOfStream)
.thenAnswer(invocation -> {
// this ensures that the end of stream message has passed through run loop BEFORE the last remaining in flight message completes
firstMessageBarrier.countDown();
return null;
});
runLoop.run();
verify(task0).endOfStream(any());
}
@Test
public void testEndOfStreamCommitBehavior() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(0, ReadableCoordinator.class);
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
return null;
}).when(task0).endOfStream(any());
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
int maxMessagesInFlight = 1;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(ssp0EndOfStream).thenReturn(null);
runLoop.run();
InOrder inOrder = inOrder(task0);
inOrder.verify(task0).endOfStream(any());
inOrder.verify(task0).commit();
}
@Test
public void testCommitWithMessageInFlightWhenAsyncCommitIsEnabled() {
int maxMessagesInFlight = 2;
ExecutorService taskExecutor = Executors.newFixedThreadPool(2);
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
OffsetManager offsetManager = mock(OffsetManager.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
when(task0.offsetManager()).thenReturn(offsetManager);
CountDownLatch firstMessageBarrier = new CountDownLatch(1);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1, ReadableCoordinator.class);
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
taskExecutor.submit(() -> {
firstMessageBarrier.await();
coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
callback.complete();
return null;
});
return null;
}).when(task0).process(eq(envelope00), any(), any());
CountDownLatch secondMessageBarrier = new CountDownLatch(1);
doAnswer(invocation -> {
ReadableCoordinator coordinator = invocation.getArgumentAt(1, ReadableCoordinator.class);
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
taskExecutor.submit(() -> {
// let the first message proceed to ask for a commit
firstMessageBarrier.countDown();
// block this message until commit is executed
secondMessageBarrier.await();
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
callback.complete();
return null;
});
return null;
}).when(task0).process(eq(envelope01), any(), any());
doAnswer(invocation -> {
assertEquals(1, task0.metrics().asyncCallbackCompleted().getCount());
assertEquals(1, task0.metrics().messagesInFlight().getValue());
secondMessageBarrier.countDown();
return null;
}).when(task0).commit();
Map<TaskName, RunLoopTask> tasks = new HashMap<>();
tasks.put(taskName0, task0);
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, true);
when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope01).thenReturn(null);
runLoop.run();
InOrder inOrder = inOrder(task0);
inOrder.verify(task0).process(eq(envelope00), any(), any());
inOrder.verify(task0).process(eq(envelope01), any(), any());
inOrder.verify(task0).commit();
}
@Test(expected = SamzaException.class)
public void testExceptionIsPropagated() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
doAnswer(invocation -> {
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
callbackFactory.createCallback().failure(new Exception("Intentional failure"));
return null;
}).when(task0).process(eq(envelope00), any(), any());
Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
int maxMessagesInFlight = 1;
RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
when(consumerMultiplexer.choose(false))
.thenReturn(envelope00)
.thenReturn(ssp0EndOfStream)
.thenReturn(null);
runLoop.run();
}
private RunLoopTask getMockRunLoopTask(TaskName taskName, SystemStreamPartition ssp0) {
RunLoopTask task0 = mock(RunLoopTask.class);
when(task0.systemStreamPartitions()).thenReturn(Collections.singleton(ssp0));
when(task0.metrics()).thenReturn(new TaskInstanceMetrics("test", new MetricsRegistryMap()));
when(task0.taskName()).thenReturn(taskName);
return task0;
}
}