SAMZA-720: Fix BootstrapChooser hanging issue. Backport to 0.9.1
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
index dd500b9..1cd8e06 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
@@ -91,12 +91,21 @@
     .toSet
 
   /**
+   * Store all the systemStreamPartitions registered
+   */
+  var registeredSystemStreamPartitions = Set[SystemStreamPartition]()
+
+  /**
    * The number of lagging partitions that the underlying wrapped chooser has
    * been updated with, grouped by SystemStream.
    */
   var updatedSystemStreams = Map[SystemStream, Int]()
 
   def start = {
+    // remove the systemStreamPartitions not registered.
+    laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
+    systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size}
+
     debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata)
     info("Got lagging partition counts for bootstrap streams: %s" format systemStreamLagCounts)
     metrics.setLaggingSystemStreams(() => laggingSystemStreamPartitions.size)
@@ -118,6 +127,8 @@
     checkOffset(systemStreamPartition, offset, OffsetType.UPCOMING)
 
     wrapped.register(systemStreamPartition, offset)
+
+    registeredSystemStreamPartitions += systemStreamPartition
   }
 
   def update(envelope: IncomingMessageEnvelope) {
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
index 3c2693c..2e0180d 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
@@ -180,6 +180,24 @@
     assertNull(chooser.choose)
     // Fin.
   }
+
+  @Test
+  def testChooserRegisteredCorrectSsps {
+    val mock = new MockMessageChooser
+    val metadata1 = getMetadata(envelope1, "123")
+    val metadata2 = getMetadata(envelope2, "321")
+    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2))
+
+    chooser.register(envelope1.getSystemStreamPartition, "1")
+    chooser.register(envelope2.getSystemStreamPartition, "1")
+    chooser.start
+
+    // it should only contain stream partition 0 and stream1 partition 1
+    val expectedLaggingSsps = Set(envelope1.getSystemStreamPartition, envelope2.getSystemStreamPartition)
+    assertEquals(expectedLaggingSsps, chooser.laggingSystemStreamPartitions)
+    val expectedSystemStreamLagCounts = Map(envelope1.getSystemStreamPartition.getSystemStream -> 1, envelope2.getSystemStreamPartition.getSystemStream -> 1)
+    assertEquals(expectedSystemStreamLagCounts, chooser.systemStreamLagCounts)
+  }
 }
 
 object TestBootstrappingChooser {