modifies to close the clients according to lifecycle
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
index 1050662..039f597 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
@@ -55,6 +55,7 @@
public void createResources() {
if (checkpointManager != null) {
checkpointManager.createResources();
+ checkpointManager.stop();
}
createChangelogStreams();
}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7687eb2..cc24345 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -441,15 +441,24 @@
@Override
public void afterFailure(Throwable t) {
- processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+ // we need to close associated coordinator metadata store, although the processor failed
+ processors.forEach(sp -> {
+ if (sp.getLeft().equals(processor)) {
+ if (sp.getRight() != null) {
+ sp.getRight().close();
+ }
+ processors.remove(sp);
+ }
+ });
// the processor stopped with failure, this is logging the first processor's failure as the cause of
// the whole application failure
if (failure.compareAndSet(null, t)) {
// shutdown the other processors
processors.forEach(sp -> {
- sp.getLeft().stop(); // Stop StreamProcessor
- sp.getRight().close(); // Close associated coordinator metadata store
+ sp.getLeft().stop(); // Stop StreamProcessor
+ if (sp.getRight() != null) {
+ sp.getRight().close(); // Close associated coordinator metadata store
+ }
});
}
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6224a3e..d0e705f 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -166,7 +166,6 @@
}
// if diagnostics is enabled, create diagnostics stream if it doesnt exist
- SystemAdmins systemAdmins = new SystemAdmins(config, DiagnosticsUtil.class.getSimpleName());
String diagnosticsSystemStreamName = new MetricsConfig(config)
.getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
.orElseThrow(() -> new ConfigException("Missing required config: " +
@@ -174,7 +173,9 @@
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)));
SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName);
- SystemAdmin diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem());
+ SystemConfig systemConfig = new SystemConfig(config);
+ SystemAdmin diagnosticsSysAdmin = systemConfig.getSystemFactories().get(diagnosticsSystemStream.getSystem())
+ .getAdmin(diagnosticsSystemStream.getSystem(), config, DiagnosticsUtil.class.getSimpleName());
StreamSpec diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream(),
diagnosticsSystemStream.getSystem(), new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID));
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 254f86d..b9469cb 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -54,11 +54,12 @@
* @param config to create coordinator stream.
*/
def createCoordinatorStream(config: Config): Unit = {
- val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
-
info("Creating coordinator stream")
val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
- val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+ val systemConfig = new SystemConfig(config)
+ val coordinatorSystemAdmin = systemConfig.getSystemFactories.get(coordinatorSystemStream.getSystem)
+ .getAdmin(coordinatorSystemStream.getSystem, config, classOf[DiagnosticsUtil].getSimpleName)
+
coordinatorSystemAdmin.start()
CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
coordinatorSystemAdmin.stop()
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 0218caa..6145fd5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -83,6 +83,7 @@
/**
* Create checkpoint stream prior to start.
+ * Need to close KafkaCheckPointManager after createResources
*/
override def createResources(): Unit = {
Preconditions.checkNotNull(systemAdmin)
@@ -109,7 +110,8 @@
// register and start a producer for the checkpoint topic
info("Starting the checkpoint SystemProducer")
producerRef.get().start()
-
+ info("Starting the checkpoint SystemAdmin")
+ systemAdmin.start()
// register and start a consumer for the checkpoint topic
val oldestOffset = getOldestOffset(checkpointSsp)
info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")