Merge pull request #1451 from Sanil15/SAMZA-2608

SAMZA-2608: Updating the RocksDb version for Samza 1.6 release
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7687eb2..7b0a08f 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -431,25 +431,38 @@
       userDefinedProcessorLifecycleListener.afterStart();
     }
 
+    private void closeAndRemoveProcessor() {
+      processors.forEach(sp -> {
+        if (sp.getLeft().equals(processor)) {
+          sp.getLeft().stop();
+          if (sp.getRight() != null) {
+            sp.getRight().close();
+          }
+        }
+      });
+      processors.removeIf(pair -> pair.getLeft().equals(processor));
+    }
     @Override
     public void afterStop() {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+      closeAndRemoveProcessor();
       // successful shutdown
       handleProcessorShutdown(null);
     }
 
     @Override
     public void afterFailure(Throwable t) {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
+      // we need to close associated coordinator metadata store, although the processor failed
+      closeAndRemoveProcessor();
 
       // the processor stopped with failure, this is logging the first processor's failure as the cause of
       // the whole application failure
       if (failure.compareAndSet(null, t)) {
         // shutdown the other processors
         processors.forEach(sp -> {
-          sp.getLeft().stop();    // Stop StreamProcessor
-          sp.getRight().close();  // Close associated coordinator metadata store
+          sp.getLeft().stop();     // Stop StreamProcessor
+          if (sp.getRight() != null) {
+            sp.getRight().close(); // Close associated coordinator metadata store
+          }
         });
       }
 
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index 5eda128..1fb49a9 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -140,7 +140,10 @@
     SystemConfig systemConfig = new SystemConfig(config);
     storeNameSystemStreamMapping.forEach((storeName, systemStream) -> {
       // Load system admin for this system.
-      SystemAdmin systemAdmin = systemConfig.getSystemAdmin(systemStream.getSystem());
+      SystemAdmin systemAdmin = systemConfig
+          .getSystemFactories()
+          .get(systemStream.getSystem())
+          .getAdmin(systemStream.getSystem(), config, ChangelogStreamManager.class.getSimpleName());
 
       if (systemAdmin == null) {
         throw new SamzaException(String.format(
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6224a3e..b1e4206 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -46,7 +46,6 @@
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
@@ -166,7 +165,6 @@
     }
     // if diagnostics is enabled, create diagnostics stream if it doesnt exist
 
-    SystemAdmins systemAdmins = new SystemAdmins(config, DiagnosticsUtil.class.getSimpleName());
     String diagnosticsSystemStreamName = new MetricsConfig(config)
         .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
         .orElseThrow(() -> new ConfigException("Missing required config: " +
@@ -174,7 +172,9 @@
                 MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)));
 
     SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName);
-    SystemAdmin diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem());
+    SystemConfig systemConfig = new SystemConfig(config);
+    SystemAdmin diagnosticsSysAdmin = systemConfig.getSystemFactories().get(diagnosticsSystemStream.getSystem())
+        .getAdmin(diagnosticsSystemStream.getSystem(), config, DiagnosticsUtil.class.getSimpleName());
     StreamSpec diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream(),
         diagnosticsSystemStream.getSystem(), new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID));
 
diff --git a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 93536fa..a5acdca 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -99,7 +99,9 @@
       .getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
 
     var systemStreams = Seq.empty[String]
-    val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName, this.getClass.getSimpleName)
+    val systemConfig = new SystemConfig(config)
+    val systemAdmin = systemConfig.getSystemFactories
+      .get(systemName).getAdmin(systemName, config, this.getClass.getSimpleName)
     try {
       systemAdmin.start()
       systemStreams =
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 254f86d..b9469cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -54,11 +54,12 @@
    * @param config to create coordinator stream.
    */
   def createCoordinatorStream(config: Config): Unit = {
-    val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
-
     info("Creating coordinator stream")
     val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
-    val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+    val systemConfig = new SystemConfig(config)
+    val coordinatorSystemAdmin = systemConfig.getSystemFactories.get(coordinatorSystemStream.getSystem)
+      .getAdmin(coordinatorSystemStream.getSystem, config, classOf[DiagnosticsUtil].getSimpleName)
+
     coordinatorSystemAdmin.start()
     CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
     coordinatorSystemAdmin.stop()
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 344f082..324c5e2 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -62,6 +62,8 @@
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -75,6 +77,7 @@
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -135,7 +138,7 @@
     runner.run(externalContext);
 
     verify(metadataStore).init();
-    verify(metadataStore, never()).close();
+    verify(metadataStore).close();
 
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
@@ -171,7 +174,7 @@
     runner.run();
 
     verify(metadataStore).init();
-    verify(metadataStore, never()).close();
+    verify(metadataStore).close();
 
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
@@ -209,7 +212,7 @@
     runner.waitForFinish();
 
     verify(coordinatorStreamStore).init();
-    verify(coordinatorStreamStore, never()).close();
+    verify(coordinatorStreamStore).close();
 
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
@@ -311,10 +314,17 @@
       return null;
     }).when(sp).start();
 
-    doAnswer(i -> {
-      ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
-      listener.afterStop();
-      return null;
+    doAnswer(new Answer() {
+      private int count = 0;
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (++count == 1) {
+          ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+          listener.afterStop();
+          return null;
+        }
+        return null;
+      }
     }).when(sp).stop();
 
     ExternalContext externalContext = mock(ExternalContext.class);
@@ -326,7 +336,7 @@
     runner.kill();
 
     verify(coordinatorStreamStore).init();
-    verify(coordinatorStreamStore).close();
+    verify(coordinatorStreamStore, atLeastOnce()).close();
 
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
@@ -353,10 +363,17 @@
       return null;
     }).when(sp).start();
 
-    doAnswer(i -> {
-      ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
-      listener.afterStop();
-      return null;
+    doAnswer(new Answer() {
+      private int count = 0;
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (++count == 1) {
+          ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+          listener.afterStop();
+          return null;
+        }
+        return null;
+      }
     }).when(sp).stop();
 
     ExternalContext externalContext = mock(ExternalContext.class);
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index ce4ec0b..04ae4e2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -67,13 +67,13 @@
   val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
   val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
 
-  val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
-  val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+  var systemConsumer: SystemConsumer = _
+  var systemAdmin: SystemAdmin = _
 
   var taskNames: Set[TaskName] = Set[TaskName]()
   var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
 
-  val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
+  var producerRef: AtomicReference[SystemProducer] = _
   val producerCreationLock: Object = new Object
 
   // if true, systemConsumer can be safely closed after the first call to readLastCheckpoint.
@@ -83,19 +83,24 @@
 
   /**
     * Create checkpoint stream prior to start.
+    *
     */
   override def createResources(): Unit = {
+    val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
     Preconditions.checkNotNull(systemAdmin)
 
     systemAdmin.start()
+    try {
+      info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
+        s"partition count: ${checkpointSpec.getPartitionCount}")
+      systemAdmin.createStream(checkpointSpec)
 
-    info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
-      s"partition count: ${checkpointSpec.getPartitionCount}")
-    systemAdmin.createStream(checkpointSpec)
-
-    if (validateCheckpoint) {
-      info(s"Validating checkpoint stream")
-      systemAdmin.validateStream(checkpointSpec)
+      if (validateCheckpoint) {
+        info(s"Validating checkpoint stream")
+        systemAdmin.validateStream(checkpointSpec)
+      }
+    } finally {
+      systemAdmin.stop()
     }
   }
 
@@ -106,7 +111,8 @@
     // register and start a producer for the checkpoint topic
     info("Starting the checkpoint SystemProducer")
     producerRef.get().start()
-
+    info("Starting the checkpoint SystemAdmin")
+    systemAdmin.start()
     // register and start a consumer for the checkpoint topic
     val oldestOffset = getOldestOffset(checkpointSsp)
     info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
@@ -118,6 +124,10 @@
     * @inheritdoc
     */
   override def register(taskName: TaskName) {
+    systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
+    systemAdmin =  systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+    producerRef = new AtomicReference[SystemProducer](getSystemProducer())
+
     debug(s"Registering taskName: $taskName")
     producerRef.get().register(taskName.getTaskName)
     taskNames += taskName
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 7d6db64..35a74fd 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -84,7 +84,7 @@
     val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry))
     val newKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
 
-    Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()
+    Mockito.when(checkPointManager.getSystemProducer()).thenReturn(mockKafkaProducer).thenReturn(newKafkaProducer)
 
     checkPointManager.register(taskName)
     checkPointManager.start
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 4a4ae7b..edcb159 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -288,7 +288,6 @@
     checkpointManagerOption match {
       case Some(checkpointManager) =>
         checkpointManager.createResources()
-        checkpointManager.stop()
       case _ => throw new ConfigException("No checkpoint manager factory configured")
     }