address comments
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 039f597..1050662 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -55,7 +55,6 @@
public void createResources() {
if (checkpointManager != null) {
checkpointManager.createResources();
- checkpointManager.stop();
}
createChangelogStreams();
}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 1da1f2e..7b0a08f 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -431,10 +431,20 @@
userDefinedProcessorLifecycleListener.afterStart();
}
+ private void closeAndRemoveProcessor() {
+ processors.forEach(sp -> {
+ if (sp.getLeft().equals(processor)) {
+ sp.getLeft().stop();
+ if (sp.getRight() != null) {
+ sp.getRight().close();
+ }
+ }
+ });
+ processors.removeIf(pair -> pair.getLeft().equals(processor));
+ }
@Override
public void afterStop() {
- processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+ closeAndRemoveProcessor();
// successful shutdown
handleProcessorShutdown(null);
}
@@ -442,14 +452,7 @@
@Override
public void afterFailure(Throwable t) {
// we need to close associated coordinator metadata store, although the processor failed
- processors.forEach(sp -> {
- if (sp.getLeft().equals(processor)) {
- if (sp.getRight() != null) {
- sp.getRight().close();
- }
- }
- });
- processors.removeIf(pair -> pair.getLeft().equals(processor));
+ closeAndRemoveProcessor();
// the processor stopped with failure, this is logging the first processor's failure as the cause of
// the whole application failure
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 344f082..54468ee 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -62,6 +62,8 @@
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -75,13 +77,8 @@
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
+
@RunWith(PowerMockRunner.class)
@PrepareForTest({LocalJobPlanner.class, LocalApplicationRunner.class, ZkMetadataStoreFactory.class})
@@ -135,7 +132,7 @@
runner.run(externalContext);
verify(metadataStore).init();
- verify(metadataStore, never()).close();
+ verify(metadataStore).close();
assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
}
@@ -171,7 +168,7 @@
runner.run();
verify(metadataStore).init();
- verify(metadataStore, never()).close();
+ verify(metadataStore).close();
assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
}
@@ -209,7 +206,7 @@
runner.waitForFinish();
verify(coordinatorStreamStore).init();
- verify(coordinatorStreamStore, never()).close();
+ verify(coordinatorStreamStore).close();
assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
}
@@ -311,10 +308,17 @@
return null;
}).when(sp).start();
- doAnswer(i -> {
- ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
- listener.afterStop();
- return null;
+ doAnswer(new Answer() {
+ private int count = 0;
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (++count == 1) {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStop();
+ return null;
+ }
+ return null;
+ }
}).when(sp).stop();
ExternalContext externalContext = mock(ExternalContext.class);
@@ -326,7 +330,7 @@
runner.kill();
verify(coordinatorStreamStore).init();
- verify(coordinatorStreamStore).close();
+ verify(coordinatorStreamStore, atLeastOnce()).close();
assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
}
@@ -353,10 +357,17 @@
return null;
}).when(sp).start();
- doAnswer(i -> {
- ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
- listener.afterStop();
- return null;
+ doAnswer(new Answer() {
+ private int count = 0;
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (++count == 1) {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStop();
+ return null;
+ }
+ return null;
+ }
}).when(sp).stop();
ExternalContext externalContext = mock(ExternalContext.class);
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index b1bcc4d..a458281 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -67,13 +67,13 @@
val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
- val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
- val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+ var systemConsumer: SystemConsumer = _
+ var systemAdmin: SystemAdmin = _
var taskNames: Set[TaskName] = Set[TaskName]()
var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
- val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
+ var producerRef: AtomicReference[SystemProducer] = _
val producerCreationLock: Object = new Object
// if true, systemConsumer can be safely closed after the first call to readLastCheckpoint.
@@ -86,17 +86,21 @@
* Need to close KafkaCheckPointManager after createResources
*/
override def createResources(): Unit = {
+ val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
Preconditions.checkNotNull(systemAdmin)
systemAdmin.start()
+ try {
+ info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
+ s"partition count: ${checkpointSpec.getPartitionCount}")
+ systemAdmin.createStream(checkpointSpec)
- info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
- s"partition count: ${checkpointSpec.getPartitionCount}")
- systemAdmin.createStream(checkpointSpec)
-
- if (validateCheckpoint) {
- info(s"Validating checkpoint stream")
- systemAdmin.validateStream(checkpointSpec)
+ if (validateCheckpoint) {
+ info(s"Validating checkpoint stream")
+ systemAdmin.validateStream(checkpointSpec)
+ }
+ } finally {
+ systemAdmin.stop()
}
}
@@ -120,6 +124,10 @@
* @inheritdoc
*/
override def register(taskName: TaskName) {
+ systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
+ systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+ producerRef = new AtomicReference[SystemProducer](getSystemProducer())
+
debug(s"Registering taskName: $taskName")
producerRef.get().register(taskName.getTaskName)
taskNames += taskName
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 7d6db64..35a74fd 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -84,7 +84,7 @@
val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry))
val newKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
- Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()
+ Mockito.when(checkPointManager.getSystemProducer()).thenReturn(mockKafkaProducer).thenReturn(newKafkaProducer)
checkPointManager.register(taskName)
checkPointManager.start
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 4a4ae7b..edcb159 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -288,7 +288,6 @@
checkpointManagerOption match {
case Some(checkpointManager) =>
checkpointManager.createResources()
- checkpointManager.stop()
case _ => throw new ConfigException("No checkpoint manager factory configured")
}