handle end of stream/watermark, add runloop test
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index e7e56b3..f2d07de 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -111,12 +111,27 @@
     return systemStreamPartition;
   }
 
-  // used for elasticity to determine which elastic task should handle this envelope
+  /**
+   * fetches the SSP (with keybucket) for the envelope based on elasticity factor.
+   * keyBucket is determined by the key if non-null else by the offset
+   *
+   * Special case messages like Watermark msgs which should belong to all keybuckets of an ssp
+   *     however, they might have both key and offset as null, hence keybucket cant be determined
+   *     but choosing keybucket = 0 as ssp with keybucket is expected with elasticity enabled
+   *     the correct handling of special case messages is the responsibility of the caller
+   * @param elasticityFactor
+   * @return
+   *  SSP without keybucket if elasticity is not enabled
+   *  SSP with keybucket otherwise
+   */
   public SystemStreamPartition getSystemStreamPartition(int elasticityFactor) {
     if (elasticityFactor <= 1) {
       return systemStreamPartition;
     }
     Object envelopeKeyorOffset = key != null ? key : offset;
+    if (envelopeKeyorOffset == null) {
+      return new SystemStreamPartition(systemStreamPartition, 0);
+    }
     int keyBucket = Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor;
     return new SystemStreamPartition(systemStreamPartition, keyBucket);
   }
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 2389e2f..b70f463 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -21,6 +21,7 @@
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -34,6 +35,7 @@
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.MessageType;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.CoordinatorRequests;
@@ -242,7 +244,7 @@
     IncomingMessageEnvelope envelope = consumerMultiplexer.choose(false);
     if (envelope != null) {
       log.trace("Choose envelope ssp {} offset {} for processing",
-          envelope.getSystemStreamPartition(), envelope.getOffset());
+          envelope.getSystemStreamPartition(elasticityFactor), envelope.getOffset());
       containerMetrics.envelopes().inc();
     } else {
       log.trace("No envelope is available");
@@ -258,12 +260,10 @@
     if (!shutdownNow) {
       if (envelope != null) {
         PendingEnvelope pendingEnvelope = new PendingEnvelope(envelope);
-        SystemStreamPartition sspOfEnvelope = null;
         // when elasticity is enabled
         // the tasks actually consume a keyBucket of the ssp.
         // hence use the SSP with keybucket to find the worker(s) for the envelope
-        sspOfEnvelope = envelope.getSystemStreamPartition(elasticityFactor);
-        List<AsyncTaskWorker> listOfWorkersForEnvelope = sspToTaskWorkerMapping.get(sspOfEnvelope);
+        List<AsyncTaskWorker> listOfWorkersForEnvelope = getWorkersForEnvelope(envelope);
         if (listOfWorkersForEnvelope != null) {
           for (AsyncTaskWorker worker : listOfWorkersForEnvelope) {
             worker.state.insertEnvelope(pendingEnvelope);
@@ -273,7 +273,7 @@
           // this condition happens when a keyBucket of the SSP is being consumed but other keyBuckets are not consumed
           // if this update is not done for the SSP then the unprocessed envelopes from other keyBuckets
           // will make the consumerMultiplexer not poll as it sees envelopes available for consumption.
-          consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition());
+          consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition(elasticityFactor));
           log.trace("updating the system consumers for ssp keyBucket {} not processed by this runloop",
               envelope.getSystemStreamPartition(elasticityFactor));
         }
@@ -285,6 +285,40 @@
     }
   }
 
+  /**
+   * when elasticity is not enabled, fetch the workers from sspToTaskWorkerMapping using envelope.getSSP()
+   * when elasticity is enabled,
+   *       sspToTaskWorkerMapping has workers for a SSP which has keyBucket
+   *       hence need to use envelop.getSSP(elasticityFactor)
+   *       Additionally, when envelope is EnofStream, it needs to be sent to all works for the ssp irrespective of keyBucket
+   * @param envelope
+   * @return list of workers for the envelope
+   */
+  private List<AsyncTaskWorker> getWorkersForEnvelope(IncomingMessageEnvelope envelope) {
+    if (elasticityFactor <= 1) {
+      return sspToTaskWorkerMapping.get(envelope.getSystemStreamPartition());
+    }
+
+    final SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition(elasticityFactor);
+    List<AsyncTaskWorker> listOfWorkersForEnvelope = null;
+
+    // if envelope is end of stream or watermark, it needs to be routed to all tasks consuming the ssp irresp of keybucket
+    MessageType messageType = MessageType.of(envelope.getMessage());
+    if (envelope.isEndOfStream() || MessageType.END_OF_STREAM == messageType || MessageType.WATERMARK == messageType) {
+
+      //sspToTaskWorkerMapping has ssps with keybucket so extract and check only system, stream and partition and ignore the keybucket
+      listOfWorkersForEnvelope = sspToTaskWorkerMapping.entrySet()
+          .stream()
+          .filter(sspToTask -> sspToTask.getKey().getSystemStream().equals(sspOfEnvelope.getSystemStream())
+              && sspToTask.getKey().getPartition().equals(sspOfEnvelope.getPartition()))
+          .map(sspToWorker -> sspToWorker.getValue())
+          .flatMap(Collection::stream)
+          .collect(Collectors.toList());
+    } else {
+      listOfWorkersForEnvelope = sspToTaskWorkerMapping.get(sspOfEnvelope);
+    }
+    return listOfWorkersForEnvelope;
+  }
 
   /**
    * Block the runloop thread if all tasks are busy. When a task worker finishes or window/commit completes,
@@ -505,7 +539,7 @@
      */
     private void process() {
       final IncomingMessageEnvelope envelope = state.fetchEnvelope();
-      log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(), envelope.getOffset());
+      log.trace("Process ssp {} offset {}", envelope.getSystemStreamPartition(elasticityFactor), envelope.getOffset());
 
       final ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
       TaskCallbackFactory callbackFactory = new TaskCallbackFactory() {
@@ -744,7 +778,7 @@
         IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isEndOfStream()) {
-          SystemStreamPartition ssp = envelope.getSystemStreamPartition();
+          SystemStreamPartition ssp = envelope.getSystemStreamPartition(elasticityFactor);
           processingSspSet.remove(ssp);
           if (!hasIntermediateStreams) {
             pendingEnvelopeQueue.remove();
@@ -887,11 +921,11 @@
       int queueSize = pendingEnvelopeQueue.size();
       taskMetrics.pendingMessages().set(queueSize);
       log.trace("fetch envelope ssp {} offset {} to process.",
-          pendingEnvelope.envelope.getSystemStreamPartition(), pendingEnvelope.envelope.getOffset());
+          pendingEnvelope.envelope.getSystemStreamPartition(elasticityFactor), pendingEnvelope.envelope.getOffset());
       log.debug("Task {} pending envelopes count is {} after fetching.", taskName, queueSize);
 
       if (pendingEnvelope.markProcessed()) {
-        SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition();
+        SystemStreamPartition partition = pendingEnvelope.envelope.getSystemStreamPartition(elasticityFactor);
         consumerMultiplexer.tryUpdate(partition);
         log.debug("Update chooser for {}", partition);
       }
diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index ac6bdd7..c413d70 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -175,7 +175,7 @@
       // If elasticity is enabled then the RunLoop gives SSP with keybucket
       // but the actual systemConsumer which consumes from the input does not know about KeyBucket.
       // hence, use an SSP without KeyBucket
-      consumer.register(new SystemStreamPartition(systemStreamPartition.getSystemStream, systemStreamPartition.getPartition), offset)
+      consumer.register(getSSPWithoutKeyBucket(systemStreamPartition), offset)
     }
 
     debug("Starting consumers.")
@@ -215,12 +215,11 @@
   }
 
 
-  def register(givenSystemStreamPartition: SystemStreamPartition, offset: String) {
+  def register(ssp: SystemStreamPartition, offset: String) {
     // If elasticity is enabled then the RunLoop gives SSP with keybucket
     // but the MessageChooser does not know about the KeyBucket
     // hence, use an SSP without KeyBucket
-    val systemStreamPartition = new SystemStreamPartition(givenSystemStreamPartition.getSystem,
-      givenSystemStreamPartition.getStream, givenSystemStreamPartition.getPartition)
+    val systemStreamPartition = getSSPWithoutKeyBucket(ssp)
     debug("Registering stream: %s, %s" format (systemStreamPartition, offset))
 
     if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
@@ -355,13 +354,14 @@
   }
 
   def tryUpdate(ssp: SystemStreamPartition) {
+    val systemStreamPartition = getSSPWithoutKeyBucket(ssp)
     var updated = false
     try {
-      updated = update(ssp)
+      updated = update(systemStreamPartition)
     } finally {
       if (!updated) {
         // if failed to update the chooser, add the ssp back into the emptySystemStreamPartitionBySystem map to ensure that we will poll for the next message
-        emptySystemStreamPartitionsBySystem.get(ssp.getSystem).add(ssp)
+        emptySystemStreamPartitionsBySystem.get(systemStreamPartition.getSystem).add(systemStreamPartition)
       }
     }
   }
@@ -409,6 +409,10 @@
 
     updated
   }
+
+  private def getSSPWithoutKeyBucket(sspWithKeyBucket: SystemStreamPartition): SystemStreamPartition = {
+    new SystemStreamPartition(sspWithKeyBucket.getSystem, sspWithKeyBucket.getStream, sspWithKeyBucket.getPartition)
+  }
 }
 
 /**
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 90d4c33..b919737 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -165,6 +165,38 @@
   }
 
   @Test
+  public void testProcessElasticityEnabled() {
+
+    TaskName taskName0 = new TaskName(p0.toString() + " 0");
+    SystemStreamPartition ssp = new SystemStreamPartition("testSystem", "testStream", p0);
+    SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStream", p0, 0);
+    SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStream", p0, 1);
+
+    // have a single task in the run loop that processes ssp0 -> 0th keybucket of ssp
+    RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
+
+    // create two IME such that one of their ssp keybucket maps to ssp0 and the other one maps to ssp1
+    // task in the runloop should process only the first ime (aka the one whose ssp keybucket is ssp0)
+    IncomingMessageEnvelope envelope00 = spy(new IncomingMessageEnvelope(ssp, "0", "key0", "value0"));
+    IncomingMessageEnvelope envelope01 = spy(new IncomingMessageEnvelope(ssp, "1", "key0", "value0"));
+    when(envelope00.getSystemStreamPartition(2)).thenReturn(ssp0);
+    when(envelope01.getSystemStreamPartition(2)).thenReturn(ssp1);
+
+
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope00).thenReturn(envelope01).thenReturn(ssp0EndOfStream).thenReturn(null);
+
+    Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
+    int maxMessagesInFlight = 1;
+    RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
+        callbackTimeoutMs, maxThrottlingDelayMs, 0, containerMetrics, () -> 0L, false, 2);
+    runLoop.run();
+
+    verify(task0).process(eq(envelope00), any(), any());
+    verify(task0, never()).process(eq(envelope01), any(), any());
+  }
+
+  @Test
   public void testWindow() {
     SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);