address comments
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 039f597..1050662 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -55,7 +55,6 @@
   public void createResources() {
     if (checkpointManager != null) {
       checkpointManager.createResources();
-      checkpointManager.stop();
     }
     createChangelogStreams();
   }
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 1da1f2e..7b0a08f 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -431,10 +431,20 @@
       userDefinedProcessorLifecycleListener.afterStart();
     }
 
+    private void closeAndRemoveProcessor() {
+      processors.forEach(sp -> {
+        if (sp.getLeft().equals(processor)) {
+          sp.getLeft().stop();
+          if (sp.getRight() != null) {
+            sp.getRight().close();
+          }
+        }
+      });
+      processors.removeIf(pair -> pair.getLeft().equals(processor));
+    }
     @Override
     public void afterStop() {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+      closeAndRemoveProcessor();
       // successful shutdown
       handleProcessorShutdown(null);
     }
@@ -442,14 +452,7 @@
     @Override
     public void afterFailure(Throwable t) {
       // we need to close associated coordinator metadata store, although the processor failed
-      processors.forEach(sp -> {
-        if (sp.getLeft().equals(processor)) {
-          if (sp.getRight() != null) {
-            sp.getRight().close();
-          }
-        }
-      });
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
+      closeAndRemoveProcessor();
 
       // the processor stopped with failure, this is logging the first processor's failure as the cause of
       // the whole application failure
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 344f082..54468ee 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -62,6 +62,8 @@
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -75,13 +77,8 @@
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({LocalJobPlanner.class, LocalApplicationRunner.class, ZkMetadataStoreFactory.class})
@@ -135,7 +132,7 @@
     runner.run(externalContext);
 
     verify(metadataStore).init();
-    verify(metadataStore, never()).close();
+    verify(metadataStore).close();
 
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
@@ -171,7 +168,7 @@
     runner.run();
 
     verify(metadataStore).init();
-    verify(metadataStore, never()).close();
+    verify(metadataStore).close();
 
     assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
   }
@@ -209,7 +206,7 @@
     runner.waitForFinish();
 
     verify(coordinatorStreamStore).init();
-    verify(coordinatorStreamStore, never()).close();
+    verify(coordinatorStreamStore).close();
 
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
@@ -311,10 +308,17 @@
       return null;
     }).when(sp).start();
 
-    doAnswer(i -> {
-      ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
-      listener.afterStop();
-      return null;
+    doAnswer(new Answer() {
+      private int count = 0;
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (++count == 1) {
+          ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+          listener.afterStop();
+          return null;
+        }
+        return null;
+      }
     }).when(sp).stop();
 
     ExternalContext externalContext = mock(ExternalContext.class);
@@ -326,7 +330,7 @@
     runner.kill();
 
     verify(coordinatorStreamStore).init();
-    verify(coordinatorStreamStore).close();
+    verify(coordinatorStreamStore, atLeastOnce()).close();
 
     assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
   }
@@ -353,10 +357,17 @@
       return null;
     }).when(sp).start();
 
-    doAnswer(i -> {
-      ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
-      listener.afterStop();
-      return null;
+    doAnswer(new Answer() {
+      private int count = 0;
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (++count == 1) {
+          ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+          listener.afterStop();
+          return null;
+        }
+        return null;
+      }
     }).when(sp).stop();
 
     ExternalContext externalContext = mock(ExternalContext.class);
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index b1bcc4d..a458281 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -67,13 +67,13 @@
   val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
   val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
 
-  val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
-  val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+  var systemConsumer: SystemConsumer = _
+  var systemAdmin: SystemAdmin = _
 
   var taskNames: Set[TaskName] = Set[TaskName]()
   var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
 
-  val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
+  var producerRef: AtomicReference[SystemProducer] = _
   val producerCreationLock: Object = new Object
 
   // if true, systemConsumer can be safely closed after the first call to readLastCheckpoint.
@@ -86,17 +86,21 @@
     * Need to close KafkaCheckPointManager after createResources
     */
   override def createResources(): Unit = {
+    val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
     Preconditions.checkNotNull(systemAdmin)
 
     systemAdmin.start()
+    try {
+      info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
+        s"partition count: ${checkpointSpec.getPartitionCount}")
+      systemAdmin.createStream(checkpointSpec)
 
-    info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
-      s"partition count: ${checkpointSpec.getPartitionCount}")
-    systemAdmin.createStream(checkpointSpec)
-
-    if (validateCheckpoint) {
-      info(s"Validating checkpoint stream")
-      systemAdmin.validateStream(checkpointSpec)
+      if (validateCheckpoint) {
+        info(s"Validating checkpoint stream")
+        systemAdmin.validateStream(checkpointSpec)
+      }
+    } finally {
+      systemAdmin.stop()
     }
   }
 
@@ -120,6 +124,10 @@
     * @inheritdoc
     */
   override def register(taskName: TaskName) {
+    systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
+    systemAdmin =  systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+    producerRef = new AtomicReference[SystemProducer](getSystemProducer())
+
     debug(s"Registering taskName: $taskName")
     producerRef.get().register(taskName.getTaskName)
     taskNames += taskName
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 7d6db64..35a74fd 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -84,7 +84,7 @@
     val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry))
     val newKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
 
-    Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()
+    Mockito.when(checkPointManager.getSystemProducer()).thenReturn(mockKafkaProducer).thenReturn(newKafkaProducer)
 
     checkPointManager.register(taskName)
     checkPointManager.start
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 4a4ae7b..edcb159 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -288,7 +288,6 @@
     checkpointManagerOption match {
       case Some(checkpointManager) =>
         checkpointManager.createResources()
-        checkpointManager.stop()
       case _ => throw new ConfigException("No checkpoint manager factory configured")
     }