Merge pull request #1451 from Sanil15/SAMZA-2608
SAMZA-2608: Updating the RocksDb version for Samza 1.6 release
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7687eb2..7b0a08f 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -431,25 +431,38 @@
userDefinedProcessorLifecycleListener.afterStart();
}
+ private void closeAndRemoveProcessor() {
+ processors.forEach(sp -> {
+ if (sp.getLeft().equals(processor)) {
+ sp.getLeft().stop();
+ if (sp.getRight() != null) {
+ sp.getRight().close();
+ }
+ }
+ });
+ processors.removeIf(pair -> pair.getLeft().equals(processor));
+ }
@Override
public void afterStop() {
- processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+ closeAndRemoveProcessor();
// successful shutdown
handleProcessorShutdown(null);
}
@Override
public void afterFailure(Throwable t) {
- processors.removeIf(pair -> pair.getLeft().equals(processor));
+ // we need to close associated coordinator metadata store, although the processor failed
+ closeAndRemoveProcessor();
// the processor stopped with failure, this is logging the first processor's failure as the cause of
// the whole application failure
if (failure.compareAndSet(null, t)) {
// shutdown the other processors
processors.forEach(sp -> {
- sp.getLeft().stop(); // Stop StreamProcessor
- sp.getRight().close(); // Close associated coordinator metadata store
+ sp.getLeft().stop(); // Stop StreamProcessor
+ if (sp.getRight() != null) {
+ sp.getRight().close(); // Close associated coordinator metadata store
+ }
});
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
index 5eda128..1fb49a9 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java
@@ -140,7 +140,10 @@
SystemConfig systemConfig = new SystemConfig(config);
storeNameSystemStreamMapping.forEach((storeName, systemStream) -> {
// Load system admin for this system.
- SystemAdmin systemAdmin = systemConfig.getSystemAdmin(systemStream.getSystem());
+ SystemAdmin systemAdmin = systemConfig
+ .getSystemFactories()
+ .get(systemStream.getSystem())
+ .getAdmin(systemStream.getSystem(), config, ChangelogStreamManager.class.getSimpleName());
if (systemAdmin == null) {
throw new SamzaException(String.format(
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6224a3e..b1e4206 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -46,7 +46,6 @@
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
@@ -166,7 +165,6 @@
}
// if diagnostics is enabled, create diagnostics stream if it doesnt exist
- SystemAdmins systemAdmins = new SystemAdmins(config, DiagnosticsUtil.class.getSimpleName());
String diagnosticsSystemStreamName = new MetricsConfig(config)
.getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
.orElseThrow(() -> new ConfigException("Missing required config: " +
@@ -174,7 +172,9 @@
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)));
SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName);
- SystemAdmin diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem());
+ SystemConfig systemConfig = new SystemConfig(config);
+ SystemAdmin diagnosticsSysAdmin = systemConfig.getSystemFactories().get(diagnosticsSystemStream.getSystem())
+ .getAdmin(diagnosticsSystemStream.getSystem(), config, DiagnosticsUtil.class.getSimpleName());
StreamSpec diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream(),
diagnosticsSystemStream.getSystem(), new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID));
diff --git a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 93536fa..a5acdca 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -99,7 +99,9 @@
.getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
var systemStreams = Seq.empty[String]
- val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName, this.getClass.getSimpleName)
+ val systemConfig = new SystemConfig(config)
+ val systemAdmin = systemConfig.getSystemFactories
+ .get(systemName).getAdmin(systemName, config, this.getClass.getSimpleName)
try {
systemAdmin.start()
systemStreams =
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 254f86d..b9469cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -54,11 +54,12 @@
* @param config to create coordinator stream.
*/
def createCoordinatorStream(config: Config): Unit = {
- val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
-
info("Creating coordinator stream")
val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
- val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+ val systemConfig = new SystemConfig(config)
+ val coordinatorSystemAdmin = systemConfig.getSystemFactories.get(coordinatorSystemStream.getSystem)
+ .getAdmin(coordinatorSystemStream.getSystem, config, classOf[DiagnosticsUtil].getSimpleName)
+
coordinatorSystemAdmin.start()
CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
coordinatorSystemAdmin.stop()
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 344f082..324c5e2 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -62,6 +62,8 @@
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -75,6 +77,7 @@
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -135,7 +138,7 @@
runner.run(externalContext);
verify(metadataStore).init();
- verify(metadataStore, never()).close();
+ verify(metadataStore).close();
assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
}
@@ -171,7 +174,7 @@
runner.run();
verify(metadataStore).init();
- verify(metadataStore, never()).close();
+ verify(metadataStore).close();
assertEquals(ApplicationStatus.SuccessfulFinish, runner.status());
}
@@ -209,7 +212,7 @@
runner.waitForFinish();
verify(coordinatorStreamStore).init();
- verify(coordinatorStreamStore, never()).close();
+ verify(coordinatorStreamStore).close();
assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
}
@@ -311,10 +314,17 @@
return null;
}).when(sp).start();
- doAnswer(i -> {
- ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
- listener.afterStop();
- return null;
+ doAnswer(new Answer() {
+ private int count = 0;
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (++count == 1) {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStop();
+ return null;
+ }
+ return null;
+ }
}).when(sp).stop();
ExternalContext externalContext = mock(ExternalContext.class);
@@ -326,7 +336,7 @@
runner.kill();
verify(coordinatorStreamStore).init();
- verify(coordinatorStreamStore).close();
+ verify(coordinatorStreamStore, atLeastOnce()).close();
assertEquals(runner.status(), ApplicationStatus.SuccessfulFinish);
}
@@ -353,10 +363,17 @@
return null;
}).when(sp).start();
- doAnswer(i -> {
- ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
- listener.afterStop();
- return null;
+ doAnswer(new Answer() {
+ private int count = 0;
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (++count == 1) {
+ ProcessorLifecycleListener listener = captor.getValue().createInstance(sp);
+ listener.afterStop();
+ return null;
+ }
+ return null;
+ }
}).when(sp).stop();
ExternalContext externalContext = mock(ExternalContext.class);
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index ce4ec0b..04ae4e2 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -67,13 +67,13 @@
val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
- val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
- val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+ var systemConsumer: SystemConsumer = _
+ var systemAdmin: SystemAdmin = _
var taskNames: Set[TaskName] = Set[TaskName]()
var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
- val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
+ var producerRef: AtomicReference[SystemProducer] = _
val producerCreationLock: Object = new Object
// if true, systemConsumer can be safely closed after the first call to readLastCheckpoint.
@@ -83,19 +83,24 @@
/**
* Create checkpoint stream prior to start.
+ *
*/
override def createResources(): Unit = {
+ val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
Preconditions.checkNotNull(systemAdmin)
systemAdmin.start()
+ try {
+ info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
+ s"partition count: ${checkpointSpec.getPartitionCount}")
+ systemAdmin.createStream(checkpointSpec)
- info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
- s"partition count: ${checkpointSpec.getPartitionCount}")
- systemAdmin.createStream(checkpointSpec)
-
- if (validateCheckpoint) {
- info(s"Validating checkpoint stream")
- systemAdmin.validateStream(checkpointSpec)
+ if (validateCheckpoint) {
+ info(s"Validating checkpoint stream")
+ systemAdmin.validateStream(checkpointSpec)
+ }
+ } finally {
+ systemAdmin.stop()
}
}
@@ -106,7 +111,8 @@
// register and start a producer for the checkpoint topic
info("Starting the checkpoint SystemProducer")
producerRef.get().start()
-
+ info("Starting the checkpoint SystemAdmin")
+ systemAdmin.start()
// register and start a consumer for the checkpoint topic
val oldestOffset = getOldestOffset(checkpointSsp)
info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
@@ -118,6 +124,10 @@
* @inheritdoc
*/
override def register(taskName: TaskName) {
+ systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
+ systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
+ producerRef = new AtomicReference[SystemProducer](getSystemProducer())
+
debug(s"Registering taskName: $taskName")
producerRef.get().register(taskName.getTaskName)
taskNames += taskName
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 7d6db64..35a74fd 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -84,7 +84,7 @@
val checkPointManager = Mockito.spy(new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry))
val newKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])
- Mockito.doReturn(newKafkaProducer).when(checkPointManager).getSystemProducer()
+ Mockito.when(checkPointManager.getSystemProducer()).thenReturn(mockKafkaProducer).thenReturn(newKafkaProducer)
checkPointManager.register(taskName)
checkPointManager.start
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index 4a4ae7b..edcb159 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -288,7 +288,6 @@
checkpointManagerOption match {
case Some(checkpointManager) =>
checkpointManager.createResources()
- checkpointManager.stop()
case _ => throw new ConfigException("No checkpoint manager factory configured")
}