Merge pull request #1454 from MabelYC/fixDupLog
SAMZA-2614:Fix NPE at KafkaCheckpointManager.start in standby containers
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 04ae4e2..757e7ae 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -67,13 +67,13 @@
val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
- var systemConsumer: SystemConsumer = _
- var systemAdmin: SystemAdmin = _
+ val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
+ val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
var taskNames: Set[TaskName] = Set[TaskName]()
var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
- var producerRef: AtomicReference[SystemProducer] = _
+ val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
val producerCreationLock: Object = new Object
// if true, systemConsumer can be safely closed after the first call to readLastCheckpoint.
@@ -86,21 +86,20 @@
*
*/
override def createResources(): Unit = {
- val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
- Preconditions.checkNotNull(systemAdmin)
-
- systemAdmin.start()
+ val createResourcesSystemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName + "createResource")
+ Preconditions.checkNotNull(createResourcesSystemAdmin)
+ createResourcesSystemAdmin.start()
try {
info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
s"partition count: ${checkpointSpec.getPartitionCount}")
- systemAdmin.createStream(checkpointSpec)
+ createResourcesSystemAdmin.createStream(checkpointSpec)
if (validateCheckpoint) {
info(s"Validating checkpoint stream")
- systemAdmin.validateStream(checkpointSpec)
+ createResourcesSystemAdmin.validateStream(checkpointSpec)
}
} finally {
- systemAdmin.stop()
+ createResourcesSystemAdmin.stop()
}
}
@@ -124,10 +123,6 @@
* @inheritdoc
*/
override def register(taskName: TaskName) {
- systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
- systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
- producerRef = new AtomicReference[SystemProducer](getSystemProducer())
-
debug(s"Registering taskName: $taskName")
producerRef.get().register(taskName.getTaskName)
taskNames += taskName
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 35a74fd..7d6db64 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -84,7 +84,7 @@
val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry))
val newKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
- Mockito.when(checkPointManager.getSystemProducer()).thenReturn(mockKafkaProducer).thenReturn(newKafkaProducer)
+ Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()
checkPointManager.register(taskName)
checkPointManager.start