Merge pull request #1454 from MabelYC/fixDupLog

SAMZA-2614:Fix NPE at KafkaCheckpointManager.start in standby containers
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 04ae4e2..757e7ae 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -67,13 +67,13 @@
   val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
   val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
 
-  var systemConsumer: SystemConsumer = _
-  var systemAdmin: SystemAdmin = _
+  val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
+  val systemAdmin =  systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
 
   var taskNames: Set[TaskName] = Set[TaskName]()
   var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
 
-  var producerRef: AtomicReference[SystemProducer] = _
+  val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
   val producerCreationLock: Object = new Object
 
   // if true, systemConsumer can be safely closed after the first call to readLastCheckpoint.
@@ -86,21 +86,20 @@
     *
     */
   override def createResources(): Unit = {
-    val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
-    Preconditions.checkNotNull(systemAdmin)
-
-    systemAdmin.start()
+    val createResourcesSystemAdmin =  systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName + "createResource")
+    Preconditions.checkNotNull(createResourcesSystemAdmin)
+    createResourcesSystemAdmin.start()
     try {
       info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
         s"partition count: ${checkpointSpec.getPartitionCount}")
-      systemAdmin.createStream(checkpointSpec)
+      createResourcesSystemAdmin.createStream(checkpointSpec)
 
       if (validateCheckpoint) {
         info(s"Validating checkpoint stream")
-        systemAdmin.validateStream(checkpointSpec)
+        createResourcesSystemAdmin.validateStream(checkpointSpec)
       }
     } finally {
-      systemAdmin.stop()
+      createResourcesSystemAdmin.stop()
     }
   }
 
@@ -124,10 +123,6 @@
     * @inheritdoc
     */
   override def register(taskName: TaskName) {
-    systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
-    systemAdmin =  systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
-    producerRef = new AtomicReference[SystemProducer](getSystemProducer())
-
     debug(s"Registering taskName: $taskName")
     producerRef.get().register(taskName.getTaskName)
     taskNames += taskName
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 35a74fd..7d6db64 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -84,7 +84,7 @@
     val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry))
     val newKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
 
-    Mockito.when(checkPointManager.getSystemProducer()).thenReturn(mockKafkaProducer).thenReturn(newKafkaProducer)
+    Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()
 
     checkPointManager.register(taskName)
     checkPointManager.start