SAMZA-2577 : Adding support for Async-Logger in Log4j2 Stream Appender (#1411)
diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index 9d3e477..660638c 100644
--- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -30,8 +30,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
@@ -88,14 +86,16 @@
private SystemStream systemStream = null;
private SystemProducer systemProducer = null;
private String key = null;
+ private byte[] keyBytes; // Serialize the key once, since we will use it for every event.
private String containerName = null;
private int partitionCount = 0;
private boolean isApplicationMaster;
private Serde<LogEvent> serde = null;
- private Logger log = LogManager.getLogger(StreamAppender.class);
+
private Thread transferThread;
private Config config = null;
private String streamName = null;
+ private final boolean usingAsyncLogger;
/**
* used to detect if this thread is called recursively
@@ -107,9 +107,11 @@
protected StreamAppenderMetrics metrics;
protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
- protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, String streamName) {
+ protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions,
+ boolean usingAsyncLogger, String streamName) {
super(name, filter, layout, ignoreExceptions);
this.streamName = streamName;
+ this.usingAsyncLogger = usingAsyncLogger;
}
@Override
@@ -123,6 +125,13 @@
". This is used as the key for the log appender, so can't proceed.");
}
key = containerName; // use the container name as the key for the logs
+ try {
+ // Serialize the key once, since we will use it for every event.
+ keyBytes = key.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new SamzaException(
+ String.format("Container name: %s could not be encoded to bytes. %s cannot proceed.", key, getName()), e);
+ }
// StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
if (isApplicationMaster) {
@@ -134,7 +143,7 @@
}
/**
- * Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+ * Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
* Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
* @return The configured stream name.
*/
@@ -153,7 +162,7 @@
}
/**
- * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+ * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
* Example: {@literal <param name="PartitionCount" value="4"/>}
* @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
*/
@@ -165,7 +174,7 @@
}
/**
- * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
+ * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, boolean, String)} for when this is called.
* Example: {@literal <param name="PartitionCount" value="4"/>}
* @param partitionCount Configurable partition count.
*/
@@ -180,8 +189,9 @@
@PluginElement("Filter") final Filter filter,
@PluginElement("Layout") Layout layout,
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+ @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) final boolean usingAsyncLogger,
@PluginAttribute("streamName") String streamName) {
- return new StreamAppender(name, filter, layout, ignoreExceptions, streamName);
+ return new StreamAppender(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName);
}
@Override
@@ -195,35 +205,11 @@
setupSystem();
systemInitialized = true;
} else {
- log.trace("Waiting for the JobCoordinator to be instantiated...");
+ System.out.println("Waiting for the JobCoordinator to be instantiated...");
}
} else {
- // Serialize the event before adding to the queue to leverage the caller thread
- // and ensure that the transferThread can keep up.
- if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
- // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
- // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
- // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
- // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
-
- // Scenario:
- // T1: holds L1 and is waiting for L2
- // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
-
- // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
- // so dropping events in the StreamAppender is our best recourse.
-
- // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
- int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
- log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
- queueTimeoutS,
- systemStream.toString(),
- messagesDropped));
-
- // Emit a metric which can be monitored to ensure it doesn't happen often.
- metrics.logMessagesDropped.inc(messagesDropped);
- }
- metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+ // handle event based on if async or sync logger is being used
+ handleEvent(event);
}
} catch (Exception e) {
if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.
@@ -239,6 +225,46 @@
}
}
+ /**
+ * If async-Logger is enabled, the log-event is sent directly to the systemProducer. Else, the event is serialized
+ * and added to a bounded blocking queue, before returning to the "synchronous" caller.
+ * @param event the log event to append
+ * @throws InterruptedException
+ */
+ private void handleEvent(LogEvent event) throws InterruptedException {
+ if (usingAsyncLogger) {
+ sendEventToSystemProducer(encodeLogEventToBytes(event));
+ return;
+ }
+
+ // Serialize the event before adding to the queue to leverage the caller thread
+ // and ensure that the transferThread can keep up.
+ if (!logQueue.offer(encodeLogEventToBytes(event), queueTimeoutS, TimeUnit.SECONDS)) {
+ // Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
+ // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
+ // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
+ // of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
+
+ // Scenario:
+ // T1: holds L1 and is waiting for L2
+ // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
+
+ // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
+ // so dropping events in the StreamAppender is our best recourse.
+
+ // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
+ int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
+ System.err.println(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
+ queueTimeoutS,
+ systemStream.toString(),
+ messagesDropped));
+
+ // Emit a metric which can be monitored to ensure it doesn't happen often.
+ metrics.logMessagesDropped.inc(messagesDropped);
+ }
+ metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
+ }
+
protected byte[] encodeLogEventToBytes(LogEvent event) {
return serde.toBytes(subLog(event));
}
@@ -275,12 +301,12 @@
@Override
public void stop() {
- log.info(String.format("Shutting down the %s...", getName()));
+ System.out.println(String.format("Shutting down the %s...", getName()));
transferThread.interrupt();
try {
transferThread.join();
} catch (InterruptedException e) {
- log.error("Interrupted while waiting for transfer thread to finish.", e);
+ System.err.println("Interrupted while waiting for transfer thread to finish." + e);
Thread.currentThread().interrupt();
}
@@ -382,50 +408,41 @@
systemProducer.register(SOURCE);
systemProducer.start();
- log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+ System.out.println(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+ " in " + systemName + ". Logs are partitioned by " + key);
startTransferThread();
}
private void startTransferThread() {
-
- try {
- // Serialize the key once, since we will use it for every event.
- final byte[] keyBytes = key.getBytes("UTF-8");
-
- Runnable transferFromQueueToSystem = () -> {
- while (!Thread.currentThread().isInterrupted()) {
- try {
- byte[] serializedLogEvent = logQueue.take();
-
- metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
- metrics.logMessagesCountSent.inc();
-
- OutgoingMessageEnvelope outgoingMessageEnvelope =
- new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent);
- systemProducer.send(SOURCE, outgoingMessageEnvelope);
-
- } catch (InterruptedException e) {
- // Preserve the interrupted status for the loop condition.
- Thread.currentThread().interrupt();
- } catch (Throwable t) {
- metrics.logMessagesErrors.inc();
- log.error("Error sending " + getName() + " event to SystemProducer", t);
- }
+ Runnable transferFromQueueToSystem = () -> {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ sendEventToSystemProducer(logQueue.take());
+ } catch (InterruptedException e) {
+ // Preserve the interrupted status for the loop condition.
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ metrics.logMessagesErrors.inc();
+ System.err.println("Error sending " + getName() + " event to SystemProducer " + t);
}
- };
+ }
+ };
- transferThread = new Thread(transferFromQueueToSystem);
- transferThread.setDaemon(true);
- transferThread.setName("Samza " + getName() + " Producer " + transferThread.getName());
- transferThread.start();
+ transferThread = new Thread(transferFromQueueToSystem);
+ transferThread.setDaemon(true);
+ transferThread.setName("Samza " + getName() + " Producer " + transferThread.getName());
+ transferThread.start();
+ }
- } catch (UnsupportedEncodingException e) {
- throw new SamzaException(String.format(
- "Container name: %s could not be encoded to bytes. %s cannot proceed.", key, getName()),
- e);
- }
+ /**
+ * Helper method to send a serialized log-event to the systemProducer, and increment respective methods.
+ * @param serializedLogEvent
+ */
+ private void sendEventToSystemProducer(byte[] serializedLogEvent) {
+ metrics.logMessagesBytesSent.inc(serializedLogEvent.length);
+ metrics.logMessagesCountSent.inc();
+ systemProducer.send(SOURCE, new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent));
}
protected String getStreamName(String jobName, String jobId) {
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
index 07df10c..ce6f081 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/MockSystemProducerAppender.java
@@ -41,8 +41,8 @@
class MockSystemProducerAppender extends StreamAppender {
private static Config config;
- protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Config config, String streamName) {
- super(name, filter, layout, ignoreExceptions, streamName);
+ protected MockSystemProducerAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, final boolean usingAsyncLogger, Config config, String streamName) {
+ super(name, filter, layout, ignoreExceptions, usingAsyncLogger, streamName);
}
@PluginFactory
@@ -51,6 +51,7 @@
@PluginElement("Filter") final Filter filter,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
+ @PluginAttribute(value = "usingAsyncLogger", defaultBoolean = false) final boolean usingAsyncLogger,
@PluginElement("Config") final Config testConfig,
@PluginAttribute("streamName") String streamName) {
if (testConfig == null) {
@@ -58,7 +59,7 @@
} else {
config = testConfig;
}
- return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, config, streamName);
+ return new MockSystemProducerAppender(name, filter, layout, ignoreExceptions, usingAsyncLogger, config, streamName);
}
@Override
diff --git a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
index 1680a84..2cfd59b 100644
--- a/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
+++ b/samza-log4j2/src/test/java/org/apache/samza/logging/log4j2/TestStreamAppender.java
@@ -19,21 +19,28 @@
package org.apache.samza.logging.log4j2;
-import static org.junit.Assert.*;
-
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.apache.logging.log4j.core.config.ConfigurationSource;
+import org.apache.logging.log4j.core.config.Order;
+import org.apache.logging.log4j.core.config.builder.api.ConfigurationBuilder;
+import org.apache.logging.log4j.core.config.builder.api.RootLoggerComponentBuilder;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.apache.samza.config.MapConfig;
import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerde;
@@ -43,6 +50,8 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.*;
+
public class TestStreamAppender {
static Logger log = (Logger) LogManager.getLogger(TestStreamAppender.class);
@@ -59,7 +68,8 @@
public void testDefaultSerde() {
System.setProperty("samza.container.name", "samza-container-1");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
systemProducerAppender.start();
assertNotNull(systemProducerAppender.getSerde());
assertEquals(LoggingEventJsonSerde.class, systemProducerAppender.getSerde().getClass());
@@ -76,7 +86,7 @@
map.put("systems.mock.streams.__samza_log4jTest_1_logs.samza.msg.serde", "log4j-string");
map.put("task.log4j.system", "mock");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, new MapConfig(map), null);
+ MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, new MapConfig(map), null);
systemProducerAppender.start();
assertNotNull(systemProducerAppender.getSerde());
assertEquals(LoggingEventStringSerde.class, systemProducerAppender.getSerde().getClass());
@@ -86,7 +96,7 @@
public void testDefaultStreamName() {
System.setProperty("samza.container.name", "samza-container-1");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+ MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
systemProducerAppender.start();
log.addAppender(systemProducerAppender);
Assert.assertEquals("__samza_log4jTest_1_logs", systemProducerAppender.getStreamName());
@@ -96,7 +106,7 @@
public void testCustomStreamName() {
System.setProperty("samza.container.name", "samza-container-1");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, "test-stream-name");
+ MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, "test-stream-name");
systemProducerAppender.start();
log.addAppender(systemProducerAppender);
Assert.assertEquals("test-stream-name", systemProducerAppender.getStreamName());
@@ -107,7 +117,8 @@
System.setProperty("samza.container.name", "samza-container-1");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
systemProducerAppender.start();
log.addAppender(systemProducerAppender);
@@ -118,11 +129,58 @@
}
@Test
+ public void testSystemProducerAppenderInContainerWithAsyncLogger() throws InterruptedException {
+ System.setProperty("samza.container.name", "samza-container-1");
+ // Enabling async logger on log4j2 programmatically
+ ConfigurationFactory.setConfigurationFactory(new AsyncLoggerConfigurationFactory());
+
+ PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, true, null, null);
+ systemProducerAppender.start();
+ log.addAppender(systemProducerAppender);
+ log.setLevel(Level.INFO);
+ List<String> messages = Lists.newArrayList("testing1", "testing2");
+ logAndVerifyMessages(messages);
+ systemProducerAppender.stop();
+ }
+
+
+ @Plugin(name = "AsyncLoggerConfigurationFactory", category = ConfigurationFactory.CATEGORY)
+ @Order(50)
+ public static class AsyncLoggerConfigurationFactory extends ConfigurationFactory {
+
+ private static Configuration createConfiguration(final String name, ConfigurationBuilder<BuiltConfiguration> builder) {
+ builder.setConfigurationName(name);
+ RootLoggerComponentBuilder rootLoggerBuilder = builder.newAsyncRootLogger(Level.INFO);
+ builder.add(rootLoggerBuilder);
+ return builder.build();
+ }
+
+ @Override
+ public Configuration getConfiguration(final LoggerContext loggerContext, final ConfigurationSource source) {
+ return getConfiguration(loggerContext, source.toString(), null);
+ }
+
+ @Override
+ public Configuration getConfiguration(final LoggerContext loggerContext, final String name, final URI configLocation) {
+ ConfigurationBuilder<BuiltConfiguration> builder = newConfigurationBuilder();
+ return createConfiguration(name, builder);
+ }
+
+ @Override
+ protected String[] getSupportedTypes() {
+ return new String[]{"*"};
+ }
+ }
+
+ @Test
public void testSystemProducerAppenderInAM() throws InterruptedException {
System.setProperty("samza.container.name", "samza-job-coordinator");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
systemProducerAppender.start();
log.addAppender(systemProducerAppender);
log.setLevel(Level.INFO);
@@ -142,7 +200,8 @@
System.setProperty("samza.container.name", "samza-container-1");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
systemProducerAppender.start();
log.addAppender(systemProducerAppender);
@@ -161,7 +220,8 @@
"task.log4j.system", "mock"));
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, mapConfig, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, false, mapConfig, null);
systemProducerAppender.start();
log.addAppender(systemProducerAppender);
@@ -171,7 +231,8 @@
@Test
public void testDefaultPartitionCount() {
System.setProperty("samza.container.name", "samza-container-1");
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, null, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // job.container.count defaults to 1
Map<String, String> map = new HashMap<>();
@@ -180,10 +241,12 @@
map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName());
map.put("task.log4j.system", "mock");
map.put("job.container.count", "4");
- systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, new MapConfig(map), null);
+ systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, null, false, false, new MapConfig(map), null);
Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
- systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, null, false, null, null);
+ systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, null, false, false, null, null);
systemProducerAppender.setPartitionCount(8);
Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
}
@@ -193,7 +256,8 @@
System.setProperty("samza.container.name", "samza-container-1");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
systemProducerAppender.start();
log.addAppender(systemProducerAppender);
log.setLevel(Level.INFO);
@@ -223,7 +287,8 @@
System.setProperty("samza.container.name", "samza-container-1");
PatternLayout layout = PatternLayout.newBuilder().withPattern("%m").build();
- MockSystemProducerAppender systemProducerAppender = MockSystemProducerAppender.createAppender("testName", null, layout, false, null, null);
+ MockSystemProducerAppender systemProducerAppender =
+ MockSystemProducerAppender.createAppender("testName", null, layout, false, false, null, null);
systemProducerAppender.queueTimeoutS = 1;
systemProducerAppender.start();
log.addAppender(systemProducerAppender);