Merge pull request #1430 from MabelYC/addLabel
SAMZA-2574 : improve flexibility of SystemFactory interface
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 1cd345e..3ea7aa9 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -43,6 +43,14 @@
default void stop() {};
/**
+ * To give the status of current systemAdmin
+ * @return boolean stopped or not;
+ */
+ default boolean isStopped() {
+ return false;
+ }
+
+ /**
* Fetches the offsets for the messages immediately after the supplied offsets
* for a group of SystemStreamPartitions.
*
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 6295ed6..63ee3c7 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -194,7 +194,7 @@
this.hasDurableStores = new StorageConfig(config).hasDurableStores();
this.state = new SamzaApplicationState(jobModelManager);
// The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
- this.systemAdmins = new SystemAdmins(config);
+ this.systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
this.partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
Set<SystemStream> inputSystemStreams = JobModelUtil.getSystemStreams(jobModelManager.jobModel());
diff --git a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
index a2da35d..9972f9d 100644
--- a/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java
@@ -20,6 +20,7 @@
package org.apache.samza.config;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -53,6 +54,8 @@
static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming";
static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest";
+ private Map<String, SystemAdmin> systemAdminMap = new HashMap<>();
+
public SystemConfig(Config config) {
super(config);
}
@@ -88,12 +91,19 @@
*
* @return map of system name to {@link SystemAdmin}
*/
+ public Map<String, SystemAdmin> getSystemAdmins(String adminLabel) {
+ if (systemAdminMap.isEmpty()) {
+ systemAdminMap = getSystemFactories().entrySet()
+ .stream()
+ .collect(Collectors.toMap(Entry::getKey,
+ systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
+ .getAdmin(systemNameToFactoryEntry.getKey(), this, adminLabel)));
+ }
+ return systemAdminMap;
+ }
+
public Map<String, SystemAdmin> getSystemAdmins() {
- return getSystemFactories().entrySet()
- .stream()
- .collect(Collectors.toMap(Entry::getKey,
- systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue()
- .getAdmin(systemNameToFactoryEntry.getKey(), this)));
+ return getSystemAdmins("");
}
/**
@@ -102,8 +112,15 @@
* @param systemName System name
* @return SystemAdmin of the system if it exists, otherwise null.
*/
+ public SystemAdmin getSystemAdmin(String systemName, String adminLabel) {
+ if (systemAdminMap.containsKey(systemName) && systemAdminMap.get(systemName).isStopped()) {
+ systemAdminMap.clear();
+ }
+ return getSystemAdmins(adminLabel).get(systemName);
+ }
+
public SystemAdmin getSystemAdmin(String systemName) {
- return getSystemAdmins().get(systemName);
+ return getSystemAdmin(systemName, "");
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
index f75a8dc..a1d417a 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
@@ -48,7 +48,7 @@
private final SystemAdmins systemAdmins;
public StreamManager(Config config) {
- this(new SystemAdmins(config));
+ this(new SystemAdmins(config, StreamManager.class.getSimpleName()));
}
@VisibleForTesting
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 37b2c0b..7687eb2 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -338,7 +338,7 @@
return false;
}
SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
- SystemAdmins systemAdmins = new SystemAdmins(config);
+ SystemAdmins systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
systemAdmins.start();
try {
SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 9fb9975..cc51732 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -120,7 +120,7 @@
@Override
public JobModel getJobModel() {
- SystemAdmins systemAdmins = new SystemAdmins(config);
+ SystemAdmins systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
systemAdmins.start();
try {
diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
index be15869..2ca81ca 100644
--- a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
+++ b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
@@ -38,6 +38,11 @@
this.systemAdminMap = systemConfig.getSystemAdmins();
}
+ public SystemAdmins(Config config, String adminLabel) {
+ SystemConfig systemConfig = new SystemConfig(config);
+ this.systemAdminMap = systemConfig.getSystemAdmins(adminLabel);
+ }
+
/**
* Creates a new instance of {@link SystemAdmins} with an empty admin mapping.
* @return New empty instance of {@link SystemAdmins}
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 0f7ff76..6224a3e 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -166,7 +166,7 @@
}
// if diagnostics is enabled, create diagnostics stream if it doesnt exist
- SystemAdmins systemAdmins = new SystemAdmins(config);
+ SystemAdmins systemAdmins = new SystemAdmins(config, DiagnosticsUtil.class.getSimpleName());
String diagnosticsSystemStreamName = new MetricsConfig(config)
.getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
.orElseThrow(() -> new ConfigException("Missing required config: " +
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index e03ce8b..1489f67 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -146,7 +146,7 @@
stop();
});
this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
- systemAdmins = new SystemAdmins(config);
+ systemAdmins = new SystemAdmins(config, this.getClass().getSimpleName());
streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
LocationIdProviderFactory locationIdProviderFactory =
ReflectionUtil.getObj(new JobConfig(config).getLocationIdProviderFactory(), LocationIdProviderFactory.class);
diff --git a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index cae59d6..93536fa 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -99,7 +99,7 @@
.getOrElse(throw new SamzaException("No system defined in config for rewriter %s." format rewriterName))
var systemStreams = Seq.empty[String]
- val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName)
+ val systemAdmin = new SystemConfig(config).getSystemAdmin(systemName, this.getClass.getSimpleName)
try {
systemAdmin.start()
systemStreams =
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index a5ed3dc..28c3135 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -215,7 +215,7 @@
}).toMap
info("Got system factories: %s" format systemFactories.keys)
- val systemAdmins = new SystemAdmins(config)
+ val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
info("Got system admins: %s" format systemAdmins.getSystemNames)
val streamMetadataCache = new StreamMetadataCache(systemAdmins)
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 5a3f7b3..7c0e747 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -87,7 +87,7 @@
val taskAssignmentManager = new TaskAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskContainerMapping.TYPE), new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskModeMapping.TYPE))
val taskPartitionAssignmentManager = new TaskPartitionAssignmentManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetTaskPartitionMapping.TYPE))
- val systemAdmins = new SystemAdmins(config)
+ val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
try {
systemAdmins.start()
val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 37d1393..254f86d 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -54,7 +54,7 @@
* @param config to create coordinator stream.
*/
def createCoordinatorStream(config: Config): Unit = {
- val systemAdmins = new SystemAdmins(config)
+ val systemAdmins = new SystemAdmins(config, this.getClass.getSimpleName)
info("Creating coordinator stream")
val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
index a60752c..5e95df5 100644
--- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
+++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
@@ -201,6 +201,11 @@
}
}
+ @Override
+ public boolean isStopped() {
+ return stopped.get();
+ }
+
/**
* Note! This method does not populate SystemStreamMetadata for each stream with real data.
* Thus, this method should ONLY be used to get number of partitions for each stream.