SAMZA-2577 : Adding support for Async-Logger in Log4j2 Stream Appender (#1411)

diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 9d3e477..660638c 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -30,8 +30,6 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
@@ -88,14 +86,16 @@
   private SystemStream systemStream = null;
   private SystemProducer systemProducer = null;
   private String key = null;
+  private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
   private String containerName = null;
   private int partitionCount = 0;
   private boolean isApplicationMaster;
   private Serde<LogEvent> serde = null;
-  private Logger log = LogManager.getLogger(StreamAppender.class);
+
   private Thread transferThread;
   private Config config = null;
   private String streamName = null;
+  private final boolean usingAsyncLogger;
 
   /**
    * used to detect if this thread is called recursively
@@ -107,9 +107,11 @@
   protected StreamAppenderMetrics metrics;
   protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
 
-  protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, String streamName) {
+  protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
+      boolean usingAsyncLogger, String streamName) {
     super(name, filter, layout, ignoreExceptions);
     this.streamName = streamName;
+    this.usingAsyncLogger = usingAsyncLogger;
   }
 
   @Override
@@ -123,6 +125,13 @@
           ". This is used as the key for the log appender, so can't proceed.");
     }
     key = containerName; // use the container name as the key for the logs
+    try {
+      // Serialize the key once, since we will use it for every event.
+      keyBytes = key.getBytes("UTF-8");
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException(
+          String.format("Container name: %s could not be encoded to bytes. %s cannot proceed.", key, getName()), e);
+    }
 
     // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
     if (isApplicationMaster) {
@@ -134,7 +143,7 @@
   }
 
   /**
-   * Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
    * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
    * @return The configured stream name.
    */
@@ -153,7 +162,7 @@
   }
 
   /**
-   * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
    * Example: {@literal <param name="PartitionCount" value="4"/>}
    * @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
    */
@@ -165,7 +174,7 @@
   }
 
   /**
-   * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+   * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
    * Example: {@literal <param name="PartitionCount" value="4"/>}
    * @param partitionCount Configurable partition count.
    */
@@ -180,8 +189,9 @@
       @PluginElement("Filter") final Filter filter,
       @PluginElement("Layout") Layout layout,
       @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+      @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) final boolean usingAsyncLogger,
       @PluginAttribute("streamName") String streamName) {
-    return new StreamAppender(name, filter, layout, ignoreExceptions, streamName);
+    return new StreamAppender(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName);
   }
 
   @Override
@@ -195,35 +205,11 @@
             setupSystem();
             systemInitialized = true;
           } else {
-            log.trace("Waiting for the JobCoordinator to be instantiated...");
+            System.out.println("Waiting for the JobCoordinator to be instantiated...");
           }
         } else {
-          // Serialize the event before adding to the queue to leverage the caller thread
-          // and ensure that the transferThread can keep up.
-          if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
-            // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
-            // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
-            // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
-            // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
-
-            // Scenario:
-            // T1: holds L1 and is waiting for L2
-            // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
-
-            // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
-            // so dropping events in the StreamAppender is our best recourse.
-
-            // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
-            int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
-            log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
-                queueTimeoutS,
-                systemStream.toString(),
-                messagesDropped));
-
-            // Emit a metric which can be monitored to ensure it doesn't happen often.
-            metrics.logMessagesDropped.inc(messagesDropped);
-          }
-          metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+          // handle event based on if async or sync logger is being used
+          handleEvent(event);
         }
       } catch (Exception e) {
         if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.
@@ -239,6 +225,46 @@
     }
   }
 
+  /**
+   * If async-Logger is enabled, the log-event is sent directly to the systemProducer. Else, the event is serialized
+   * and added to a bounded blocking queue, before returning to the "synchronous" caller.
+   * @param event the log event to append
+   * @throws InterruptedException
+   */
+  private void handleEvent(LogEvent event) throws InterruptedException {
+    if (usingAsyncLogger) {
+      sendEventToSystemProducer(encodeLogEventToBytes(event));
+      return;
+    }
+
+    // Serialize the event before adding to the queue to leverage the caller thread
+    // and ensure that the transferThread can keep up.
+    if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
+      // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
+      // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
+      // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
+      // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
+
+      // Scenario:
+      // T1: holds L1 and is waiting for L2
+      // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
+
+      // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
+      // so dropping events in the StreamAppender is our best recourse.
+
+      // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
+      int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
+      System.err.println(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
+          queueTimeoutS,
+          systemStream.toString(),
+          messagesDropped));
+
+      // Emit a metric which can be monitored to ensure it doesn't happen often.
+      metrics.logMessagesDropped.inc(messagesDropped);
+    }
+    metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+  }
+
   protected byte[] encodeLogEventToBytes(LogEvent event) {
     return serde.toBytes(subLog(event));
   }
@@ -275,12 +301,12 @@
 
   @Override
   public void stop() {
-    log.info(String.format("Shutting down the %s...", getName()));
+    System.out.println(String.format("Shutting down the %s...", getName()));
     transferThread.interrupt();
     try {
       transferThread.join();
     } catch (InterruptedException e) {
-      log.error("Interrupted while waiting for transfer thread to finish.", e);
+      System.err.println("Interrupted while waiting for transfer thread to finish." + e);
       Thread.currentThread().interrupt();
     }
 
@@ -382,50 +408,41 @@
     systemProducer.register(SOURCE);
     systemProducer.start();
 
-    log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+    System.out.println(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
         + " in " + systemName + ". Logs are partitioned by " + key);
 
     startTransferThread();
   }
 
   private void startTransferThread() {
-
-    try {
-      // Serialize the key once, since we will use it for every event.
-      final byte[] keyBytes = key.getBytes("UTF-8");
-
-      Runnable transferFromQueueToSystem = () -> {
-        while (!Thread.currentThread().isInterrupted()) {
-          try {
-            byte[] serializedLogEvent = logQueue.take();
-
-            metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
-            metrics.logMessagesCountSent.inc();
-
-            OutgoingMessageEnvelope outgoingMessageEnvelope =
-                new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent);
-            systemProducer.send(SOURCE, outgoingMessageEnvelope);
-
-          } catch (InterruptedException e) {
-            // Preserve the interrupted status for the loop condition.
-            Thread.currentThread().interrupt();
-          } catch (Throwable t) {
-            metrics.logMessagesErrors.inc();
-            log.error("Error sending " + getName() + " event to SystemProducer", t);
-          }
+    Runnable transferFromQueueToSystem = () -> {
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          sendEventToSystemProducer(logQueue.take());
+        } catch (InterruptedException e) {
+          // Preserve the interrupted status for the loop condition.
+          Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+          metrics.logMessagesErrors.inc();
+          System.err.println("Error sending " + getName() + " event to SystemProducer " + t);
         }
-      };
+      }
+    };
 
-      transferThread = new Thread(transferFromQueueToSystem);
-      transferThread.setDaemon(true);
-      transferThread.setName("Samza " + getName() + " Producer " + transferThread.getName());
-      transferThread.start();
+    transferThread = new Thread(transferFromQueueToSystem);
+    transferThread.setDaemon(true);
+    transferThread.setName("Samza " + getName() + " Producer " + transferThread.getName());
+    transferThread.start();
+  }
 
-    } catch (UnsupportedEncodingException e) {
-      throw new SamzaException(String.format(
-          "Container name: %s could not be encoded to bytes. %s cannot proceed.", key, getName()),
-          e);
-    }
+  /**
+   * Helper method to send a serialized log-event to the systemProducer, and increment respective methods.
+   * @param serializedLogEvent
+   */
+  private void sendEventToSystemProducer(byte[] serializedLogEvent) {
+    metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
+    metrics.logMessagesCountSent.inc();
+    systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent));
   }
 
   protected String getStreamName(String jobName, String jobId) {
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
index 07df10c..ce6f081 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
@@ -41,8 +41,8 @@
 class MockSystemProducerAppender extends StreamAppender {
   private static Config config;
 
-  protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Config config, String streamName) {
-    super(name, filter, layout, ignoreExceptions, streamName);
+  protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, final boolean usingAsyncLogger, Config config, String streamName) {
+    super(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName);
   }
 
   @PluginFactory
@@ -51,6 +51,7 @@
       @PluginElement("Filter") final Filter filter,
       @PluginElement("Layout") Layout<? extends Serializable> layout,
       @PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+      @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) final boolean usingAsyncLogger,
       @PluginElement("Config") final Config testConfig,
       @PluginAttribute("streamName") String streamName) {
     if (testConfig == null) {
@@ -58,7 +59,7 @@
     } else {
       config = testConfig;
     }
-    return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, config, streamName);
+    return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, usingAsyncLogger, config, streamName);
   }
 
   @Override
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 1680a84..2cfd59b 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -19,21 +19,28 @@
 
 package org.apache.samza.logging.log4j2;
 
-import static org.junit.Assert.*;
-
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.core.Logger;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.ConfigurationSource;
+import org.apache.logging.log4j.core.config.Order;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.api.RootLoggerComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
 import org.apache.logging.log4j.core.layout.PatternLayout;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerde;
@@ -43,6 +50,8 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 public class TestStreamAppender {
 
   static Logger log = (Logger) LogManager.getLogger(TestStreamAppender.class);
@@ -59,7 +68,8 @@
   public void testDefaultSerde() {
     System.setProperty("samza.container.name", "samza-container-1");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     assertNotNull(systemProducerAppender.getSerde());
     assertEquals(LoggingEventJsonSerde.class, systemProducerAppender.getSerde().getClass());
@@ -76,7 +86,7 @@
     map.put("systems.mock.streams.__samza_log4jTest_1_logs.samza.msg.serde", "log4j-string");
     map.put("task.log4j.system", "mock");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, new MapConfig(map), null);
+    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, new MapConfig(map), null);
     systemProducerAppender.start();
     assertNotNull(systemProducerAppender.getSerde());
     assertEquals(LoggingEventStringSerde.class, systemProducerAppender.getSerde().getClass());
@@ -86,7 +96,7 @@
   public void testDefaultStreamName() {
     System.setProperty("samza.container.name", "samza-container-1");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     Assert.assertEquals("__samza_log4jTest_1_logs", systemProducerAppender.getStreamName());
@@ -96,7 +106,7 @@
   public void testCustomStreamName() {
     System.setProperty("samza.container.name", "samza-container-1");
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, "test-stream-name");
+    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, "test-stream-name");
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     Assert.assertEquals("test-stream-name", systemProducerAppender.getStreamName());
@@ -107,7 +117,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
 
     log.addAppender(systemProducerAppender);
@@ -118,11 +129,58 @@
   }
 
   @Test
+  public void testSystemProducerAppenderInContainerWithAsyncLogger() throws InterruptedException {
+    System.setProperty("samza.container.name", "samza-container-1");
+    // Enabling async logger on log4j2 programmatically
+    ConfigurationFactory.setConfigurationFactory(new AsyncLoggerConfigurationFactory());
+
+    PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, true, null, null);
+    systemProducerAppender.start();
+    log.addAppender(systemProducerAppender);
+    log.setLevel(Level.INFO);
+    List<String> messages = Lists.newArrayList("testing1", "testing2");
+    logAndVerifyMessages(messages);
+    systemProducerAppender.stop();
+  }
+
+
+  @Plugin(name = "AsyncLoggerConfigurationFactory", category = ConfigurationFactory.CATEGORY)
+  @Order(50)
+  public static class AsyncLoggerConfigurationFactory extends ConfigurationFactory {
+
+    private static Configuration createConfiguration(final String name, ConfigurationBuilder<BuiltConfiguration> builder) {
+      builder.setConfigurationName(name);
+      RootLoggerComponentBuilder rootLoggerBuilder = builder.newAsyncRootLogger(Level.INFO);
+      builder.add(rootLoggerBuilder);
+      return builder.build();
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) {
+      return getConfiguration(loggerContext, source.toString(), null);
+    }
+
+    @Override
+    public Configuration getConfiguration(final LoggerContext loggerContext, final String name, final URI configLocation) {
+      ConfigurationBuilder<BuiltConfiguration> builder = newConfigurationBuilder();
+      return createConfiguration(name, builder);
+    }
+
+    @Override
+    protected String[] getSupportedTypes() {
+      return new String[]{"*"};
+    }
+  }
+
+  @Test
   public void testSystemProducerAppenderInAM() throws InterruptedException {
     System.setProperty("samza.container.name", "samza-job-coordinator");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     log.setLevel(Level.INFO);
@@ -142,7 +200,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
 
@@ -161,7 +220,8 @@
         "task.log4j.system", "mock"));
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, mapConfig, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, mapConfig, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
 
@@ -171,7 +231,8 @@
   @Test
   public void testDefaultPartitionCount() {
     System.setProperty("samza.container.name", "samza-container-1");
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
     Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // job.container.count defaults to 1
 
     Map<String, String> map = new HashMap<>();
@@ -180,10 +241,12 @@
     map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
     map.put("task.log4j.system", "mock");
     map.put("job.container.count", "4");
-    systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, new MapConfig(map), null);
+    systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, null, false, false, new MapConfig(map), null);
     Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
 
-    systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, null, null);
+    systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
     systemProducerAppender.setPartitionCount(8);
     Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
   }
@@ -193,7 +256,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);
     log.setLevel(Level.INFO);
@@ -223,7 +287,8 @@
     System.setProperty("samza.container.name", "samza-container-1");
 
     PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
-    MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+    MockSystemProducerAppender systemProducerAppender =
+        MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
     systemProducerAppender.queueTimeoutS = 1;
     systemProducerAppender.start();
     log.addAppender(systemProducerAppender);