handle end of stream/watermark, add runloop test
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index e7e56b3..f2d07de 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -111,12 +111,27 @@
return systemStreamPartition;
}
- // used for elasticity to determine which elastic task should handle this envelope
+ /**
+ * fetches the SSP (with keybucket) for the envelope based on elasticity factor.
+ * keyBucket is determined by the key if non-null else by the offset
+ *
+ * Special case messages like Watermark msgs which should belong to all keybuckets of an ssp
+ * however, they might have both key and offset as null, hence keybucket cant be determined
+ * but choosing keybucket = 0 as ssp with keybucket is expected with elasticity enabled
+ * the correct handling of special case messages is the responsibility of the caller
+ * @param elasticityFactor
+ * @return
+ * SSP without keybucket if elasticity is not enabled
+ * SSP with keybucket otherwise
+ */
public SystemStreamPartition getSystemStreamPartition(int elasticityFactor) {
if (elasticityFactor <= 1) {
return systemStreamPartition;
}
Object envelopeKeyorOffset = key != null ? key : offset;
+ if (envelopeKeyorOffset == null) {
+ return new SystemStreamPartition(systemStreamPartition, 0);
+ }
int keyBucket = Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor;
return new SystemStreamPartition(systemStreamPartition, keyBucket);
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 2389e2f..b70f463 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -21,6 +21,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -34,6 +35,7 @@
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.MessageType;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.CoordinatorRequests;
@@ -242,7 +244,7 @@
IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false);
if (envelope != null) {
log.trace("Choose envelope ssp {} offset {} for processing",
- envelope.getSystemStreamPartition(), envelope.getOffset());
+ envelope.getSystemStreamPartition(elasticityFactor), envelope.getOffset());
containerMetrics.envelopes().inc();
} else {
log.trace("No envelope is available");
@@ -258,12 +260,10 @@
if (!shutdownNow) {
if (envelope != null) {
PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
- SystemStreamPartition sspOfEnvelope = null;
// when elasticity is enabled
// the tasks actually consume a keyBucket of the ssp.
// hence use the SSP with keybucket to find the worker(s) for the envelope
- sspOfEnvelope = envelope.getSystemStreamPartition(elasticityFactor);
- List<AsyncTaskWorker> listOfWorkersForEnvelope = sspToTaskWorkerMapping.get(sspOfEnvelope);
+ List<AsyncTaskWorker> listOfWorkersForEnvelope = getWorkersForEnvelope(envelope);
if (listOfWorkersForEnvelope != null) {
for (AsyncTaskWorker worker : listOfWorkersForEnvelope) {
worker.state.insertEnvelope(pendingEnvelope);
@@ -273,7 +273,7 @@
// this condition happens when a keyBucket of the SSP is being consumed but other keyBuckets are not consumed
// if this update is not done for the SSP then the unprocessed envelopes from other keyBuckets
// will make the consumerMultiplexer not poll as it sees envelopes available for consumption.
- consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition());
+ consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition(elasticityFactor));
log.trace("updating the system consumers for ssp keyBucket {} not processed by this runloop",
envelope.getSystemStreamPartition(elasticityFactor));
}
@@ -285,6 +285,40 @@
}
}
+ /**
+ * when elasticity is not enabled, fetch the workers from sspToTaskWorkerMapping using envelope.getSSP()
+ * when elasticity is enabled,
+ * sspToTaskWorkerMapping has workers for a SSP which has keyBucket
+ * hence need to use envelop.getSSP(elasticityFactor)
+ * Additionally, when envelope is EnofStream, it needs to be sent to all works for the ssp irrespective of keyBucket
+ * @param envelope
+ * @return list of workers for the envelope
+ */
+ private List<AsyncTaskWorker> getWorkersForEnvelope(IncomingMessageEnvelope envelope) {
+ if (elasticityFactor <= 1) {
+ return sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition());
+ }
+
+ final SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition(elasticityFactor);
+ List<AsyncTaskWorker> listOfWorkersForEnvelope = null;
+
+ // if envelope is end of stream or watermark, it needs to be routed to all tasks consuming the ssp irresp of keybucket
+ MessageType messageType = MessageType.of(envelope.getMessage());
+ if (envelope.isEndOfStream() || MessageType.END_OF_STREAM == messageType || MessageType.WATERMARK == messageType) {
+
+ //sspToTaskWorkerMapping has ssps with keybucket so extract and check only system, stream and partition and ignore the keybucket
+ listOfWorkersForEnvelope = sspToTaskWorkerMapping.entrySet()
+ .stream()
+ .filter(sspToTask -> sspToTask.getKey().getSystemStream().equals(sspOfEnvelope.getSystemStream())
+ && sspToTask.getKey().getPartition().equals(sspOfEnvelope.getPartition()))
+ .map(sspToWorker -> sspToWorker.getValue())
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ } else {
+ listOfWorkersForEnvelope = sspToTaskWorkerMapping.get(sspOfEnvelope);
+ }
+ return listOfWorkersForEnvelope;
+ }
/**
* Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes,
@@ -505,7 +539,7 @@
*/
private void process() {
final IncomingMessageEnvelope envelope = state.fetchEnvelope();
- log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
+ log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(elasticityFactor), envelope.getOffset());
final ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
TaskCallbackFactory callbackFactory = new TaskCallbackFactory() {
@@ -744,7 +778,7 @@
IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
if (envelope.isEndOfStream()) {
- SystemStreamPartition ssp = envelope.getSystemStreamPartition();
+ SystemStreamPartition ssp = envelope.getSystemStreamPartition(elasticityFactor);
processingSspSet.remove(ssp);
if (!hasIntermediateStreams) {
pendingEnvelopeQueue.remove();
@@ -887,11 +921,11 @@
int queueSize = pendingEnvelopeQueue.size();
taskMetrics.pendingMessages().set(queueSize);
log.trace("fetch envelope ssp {} offset {} to process.",
- pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
+ pendingEnvelope.envelope.getSystemStreamPartition(elasticityFactor), pendingEnvelope.envelope.getOffset());
log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
if (pendingEnvelope.markProcessed()) {
- SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
+ SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition(elasticityFactor);
consumerMultiplexer.tryUpdate(partition);
log.debug("Update chooser for {}", partition);
}
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index ac6bdd7..c413d70 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -175,7 +175,7 @@
// If elasticity is enabled then the RunLoop gives SSP with keybucket
// but the actual systemConsumer which consumes from the input does not know about KeyBucket.
// hence, use an SSP without KeyBucket
- consumer.register(new SystemStreamPartition(systemStreamPartition.getSystemStream, systemStreamPartition.getPartition), offset)
+ consumer.register(getSSPWithoutKeyBucket(systemStreamPartition), offset)
}
debug("Starting consumers.")
@@ -215,12 +215,11 @@
}
- def register(givenSystemStreamPartition: SystemStreamPartition, offset: String) {
+ def register(ssp: SystemStreamPartition, offset: String) {
// If elasticity is enabled then the RunLoop gives SSP with keybucket
// but the MessageChooser does not know about the KeyBucket
// hence, use an SSP without KeyBucket
- val systemStreamPartition = new SystemStreamPartition(givenSystemStreamPartition.getSystem,
- givenSystemStreamPartition.getStream, givenSystemStreamPartition.getPartition)
+ val systemStreamPartition = getSSPWithoutKeyBucket(ssp)
debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
@@ -355,13 +354,14 @@
}
def tryUpdate(ssp: SystemStreamPartition) {
+ val systemStreamPartition = getSSPWithoutKeyBucket(ssp)
var updated = false
try {
- updated = update(ssp)
+ updated = update(systemStreamPartition)
} finally {
if (!updated) {
// if failed to update the chooser, add the ssp back into the emptySystemStreamPartitionBySystem map to ensure that we will poll for the next message
- emptySystemStreamPartitionsBySystem.get(ssp.getSystem).add(ssp)
+ emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
}
}
}
@@ -409,6 +409,10 @@
updated
}
+
+ private def getSSPWithoutKeyBucket(sspWithKeyBucket: SystemStreamPartition): SystemStreamPartition = {
+ new SystemStreamPartition(sspWithKeyBucket.getSystem, sspWithKeyBucket.getStream, sspWithKeyBucket.getPartition)
+ }
}
/**
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 90d4c33..b919737 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -165,6 +165,38 @@
}
@Test
+ public void testProcessElasticityEnabled() {
+
+ TaskName taskName0 = new TaskName(p0.toString() + " 0");
+ SystemStreamPartition ssp = new SystemStreamPartition("testSystem", "testStream", p0);
+ SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStream", p0, 0);
+ SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStream", p0, 1);
+
+ // have a single task in the run loop that processes ssp0 -> 0th keybucket of ssp
+ RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
+
+ // create two IME such that one of their ssp keybucket maps to ssp0 and the other one maps to ssp1
+ // task in the runloop should process only the first ime (aka the one whose ssp keybucket is ssp0)
+ IncomingMessageEnvelope envelope00 = spy(new IncomingMessageEnvelope(ssp, "0", "key0", "value0"));
+ IncomingMessageEnvelope envelope01 = spy(new IncomingMessageEnvelope(ssp, "1", "key0", "value0"));
+ when(envelope00.getSystemStreamPartition(2)).thenReturn(ssp0);
+ when(envelope01.getSystemStreamPartition(2)).thenReturn(ssp1);
+
+
+ SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+ when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope01).thenReturn(ssp0EndOfStream).thenReturn(null);
+
+ Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
+ int maxMessagesInFlight = 1;
+ RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+ callbackTimeoutMs, maxThrottlingDelayMs, 0, containerMetrics, () -> 0L, false, 2);
+ runLoop.run();
+
+ verify(task0).process(eq(envelope00), any(), any());
+ verify(task0, never()).process(eq(envelope01), any(), any());
+ }
+
+ @Test
public void testWindow() {
SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);