Merge pull request #1408 from MabelYC/interfaceChange

SAMZA-2574: improve flexibility of SystemFactory interface
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java b/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
index a1aae37..08c1b49 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemFactory.java
@@ -27,9 +27,56 @@
  * a particular system, as well as the accompanying {@link org.apache.samza.system.SystemAdmin}.
  */
 public interface SystemFactory {
+  @Deprecated
   SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry);
 
+  @Deprecated
   SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry);
 
+  @Deprecated
   SystemAdmin getAdmin(String systemName, Config config);
+
+  /**
+   * This function provides an extra input parameter to {@link #getConsumer}, which can be used to provide extra
+   * information for the consumer instance, e.g. ownership of client instance, to help better identify consumers in logs,
+   * threads and client instances etc., along with other relevant information like systemName.
+   *
+   * @param systemName The name of the system to create consumer for.
+   * @param config The config to create consumer with.
+   * @param registry MetricsRegistry to which to publish consumer specific metrics.
+   * @param consumerLabel a string to provide info the consumer instance.
+   * @return A SystemConsumer
+   */
+  default SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry, String consumerLabel) {
+    return getConsumer(systemName, config, registry);
+  }
+
+  /**
+   * This function provides an extra input parameter to {@link #getProducer}, which can be used to provide extra
+   * information for the producer instance, e.g. ownership of client instance, to help better identify producers in logs,
+   * threads and client instances etc., along with other relevant information like systemName.
+   *
+   * @param systemName The name of the system to create producer for.
+   * @param config The config to create producer with.
+   * @param registry MetricsRegistry to which to publish producer specific metrics.
+   * @param producerLabel a string to provide info the producer instance.
+   * @return A SystemProducer
+   */
+  default SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry, String producerLabel) {
+    return getProducer(systemName, config, registry);
+  }
+
+  /**
+   * This function provides an extra input parameter to {@link #getAdmin}, which can be used to provide extra
+   * information for the admin instance, e.g. ownership of client instance, to help better identify admins in logs,
+   * threads and client instances etc., along with other relevant information like systemName.
+   *
+   * @param systemName The name of the system to create admin for.
+   * @param config The config to create admin with.
+   * @param adminLabel a string to provide info the admin instance.
+   * @return A SystemAdmin
+   */
+  default SystemAdmin getAdmin(String systemName, Config config, String adminLabel) {
+    return getAdmin(systemName, config);
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
index 5af0b31..b72d231 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
@@ -88,9 +88,9 @@
     this.coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
     this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
     SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
-    this.systemProducer = systemFactory.getProducer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry);
-    this.systemConsumer = systemFactory.getConsumer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry);
-    this.systemAdmin = systemFactory.getAdmin(this.coordinatorSystemStream.getSystem(), config);
+    this.systemProducer = systemFactory.getProducer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry, this.getClass().getSimpleName());
+    this.systemConsumer = systemFactory.getConsumer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry, this.getClass().getSimpleName());
+    this.systemAdmin = systemFactory.getAdmin(this.coordinatorSystemStream.getSystem(), config, this.getClass().getSimpleName());
   }
 
   @VisibleForTesting
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
index ff28723..3cc00c3 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -70,8 +70,8 @@
   public CoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) {
     SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
     SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
-    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
-    SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry);
+    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config, this.getClass().getSimpleName());
+    SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry, this.getClass().getSimpleName());
 
     this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
     this.systemConsumer = systemConsumer;
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
index 555620c..61c0ed9 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -58,8 +58,8 @@
   public CoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) {
     SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
     SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
-    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
-    SystemProducer systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem(), config, registry);
+    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config, this.getClass().getSimpleName());
+    SystemProducer systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem(), config, registry, this.getClass().getSimpleName());
     this.systemStream = coordinatorSystemStream;
     this.systemProducer = systemProducer;
     this.systemAdmin = systemAdmin;
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 724614b..0f7ff76 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -144,7 +144,8 @@
       }
       SystemFactory systemFactory = ReflectionUtil.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
       SystemProducer systemProducer =
-          systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap());
+          systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap(),
+              DiagnosticsUtil.class.getSimpleName());
 
       DiagnosticsManager diagnosticsManager =
           new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), containerMemoryMb, containerNumCores,
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 83ce3a1..a5ed3dc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -228,7 +228,7 @@
         val systemFactory = systemFactories(systemName)
 
         try {
-          (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry))
+          (systemName, systemFactory.getConsumer(systemName, config, samzaContainerMetrics.registry, this.getClass.getSimpleName))
         } catch {
           case e: Exception =>
             error("Failed to create a consumer for %s, so skipping." format systemName, e)
@@ -244,7 +244,7 @@
       .map {
         case (systemName, systemFactory) =>
           try {
-            (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry))
+            (systemName, systemFactory.getProducer(systemName, config, samzaContainerMetrics.registry, this.getClass.getSimpleName))
           } catch {
             case e: Exception =>
               error("Failed to create a producer for %s, so skipping." format systemName, e)
diff --git a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
index f817c47..d17dac1 100644
--- a/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/util/TestDiagnosticsUtil.java
@@ -66,7 +66,7 @@
     MetricsReporterFactory metricsReporterFactory = mock(MetricsReporterFactory.class);
     MetricsSnapshotReporter mockReporter = mock(MetricsSnapshotReporter.class);
 
-    when(systemFactory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class))).thenReturn(mockProducer);
+    when(systemFactory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class), anyString())).thenReturn(mockProducer);
     when(metricsReporterFactory.getMetricsReporter(anyString(), anyString(), any(Config.class))).thenReturn(
         mockReporter);
     PowerMockito.mockStatic(ReflectionUtil.class);
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 1c3531f..ce4ec0b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -67,8 +67,8 @@
   val checkpointSsp: SystemStreamPartition = new SystemStreamPartition(checkpointSystem, checkpointTopic, new Partition(0))
   val expectedGrouperFactory: String = new JobConfig(config).getSystemStreamPartitionGrouperFactory
 
-  val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry)
-  val systemAdmin = systemFactory.getAdmin(checkpointSystem, config)
+  val systemConsumer = systemFactory.getConsumer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
+  val systemAdmin = systemFactory.getAdmin(checkpointSystem, config, this.getClass.getSimpleName)
 
   var taskNames: Set[TaskName] = Set[TaskName]()
   var taskNamesToCheckpoints: Map[TaskName, Checkpoint] = _
@@ -237,7 +237,7 @@
 
   @VisibleForTesting
   def getSystemProducer(): SystemProducer = {
-    systemFactory.getProducer(checkpointSystem, config, metricsRegistry)
+    systemFactory.getProducer(checkpointSystem, config, metricsRegistry, this.getClass.getSimpleName)
   }
 
   /**
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
index 1280e79..f32040a 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManagerJava.java
@@ -247,9 +247,9 @@
 
   private SystemFactory newFactory(SystemProducer producer, SystemConsumer consumer, SystemAdmin admin) {
     SystemFactory factory = mock(SystemFactory.class);
-    when(factory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class))).thenReturn(producer);
-    when(factory.getConsumer(anyString(), any(Config.class), any(MetricsRegistry.class))).thenReturn(consumer);
-    when(factory.getAdmin(anyString(), any(Config.class))).thenReturn(admin);
+    when(factory.getProducer(anyString(), any(Config.class), any(MetricsRegistry.class), anyString())).thenReturn(producer);
+    when(factory.getConsumer(anyString(), any(Config.class), any(MetricsRegistry.class), anyString())).thenReturn(consumer);
+    when(factory.getAdmin(anyString(), any(Config.class), anyString())).thenReturn(admin);
     return factory;
   }
 
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index c885454..e3d9771 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -316,7 +316,7 @@
       systemAdmin.stop();
     }
 
-    systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry);
+    systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry, this.getClass().getSimpleName());
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);
     systemProducer.start();
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 660638c..bfba754 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -403,7 +403,7 @@
 
     setupStream(systemFactory, systemName);
 
-    systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry);
+    systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry, this.getClass().getSimpleName());
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);
     systemProducer.start();