blob: f6fb4d986bfef19ee01476400b7c7f2520b1a42a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.samza.logging.log4j2;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.SimpleMessage;
import org.apache.logging.log4j.util.StringMap;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.logging.log4j2.serializers.LoggingEventJsonSerdeFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.HttpUtil;
import org.apache.samza.util.ReflectionUtil;
@Plugin(name = "Stream", category = "Core", elementType = "appender", printObject = true)
public class StreamAppender extends AbstractAppender {
private static final String JAVA_OPTS_CONTAINER_NAME = "";
private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator";
private static final String SOURCE = "log4j-log";
// Hidden config for now. Will move to appropriate Config class when ready to.
private static final String CREATE_STREAM_ENABLED = "";
protected static final int DEFAULT_QUEUE_SIZE = 100;
private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice
protected static volatile boolean systemInitialized = false;
private Config config = null;
private SystemStream systemStream = null;
private SystemProducer systemProducer = null;
private String key = null;
private String streamName = null;
private int partitionCount = 0;
private boolean isApplicationMaster;
private Serde<LogEvent> serde = null;
private Logger log = LogManager.getLogger(StreamAppender.class);
protected StreamAppenderMetrics metrics;
private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE);
protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S;
private Thread transferThread;
protected StreamAppender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, String streamName) {
super(name, filter, layout, ignoreExceptions);
this.streamName = streamName;
public void start() {
String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
if (containerName != null) {
isApplicationMaster = containerName.contains(JOB_COORDINATOR_TAG);
} else {
throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME +
". This is used as the key for the log appender, so can't proceed.");
key = containerName; // use the container name as the key for the logs
// StreamAppender has to wait until the JobCoordinator is up when the log is in the AM
if (isApplicationMaster) {
systemInitialized = false;
} else {
systemInitialized = true;
* used to detect if this thread is called recursively
private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
* Getter for the StreamName parameter. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
* Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
* @return The configured stream name.
public String getStreamName() {
return this.streamName;
* Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
* Example: {@literal <param name="PartitionCount" value="4"/>}
* @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}.
public int getPartitionCount() {
if (partitionCount > 0) {
return partitionCount;
return new JobConfig(getConfig()).getContainerCount();
* Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #createAppender(String, Filter, Layout, boolean, String)} for when this is called.
* Example: {@literal <param name="PartitionCount" value="4"/>}
* @param partitionCount Configurable partition count.
public void setPartitionCount(int partitionCount) {
this.partitionCount = partitionCount;
public static StreamAppender createAppender(
@PluginAttribute("name") final String name,
@PluginElement("Filter") final Filter filter,
@PluginElement("Layout") Layout layout,
@PluginAttribute(value = "ignoreExceptions", defaultBoolean = true) final boolean ignoreExceptions,
@PluginAttribute("streamName") String streamName) {
return new StreamAppender(name, filter, layout, ignoreExceptions, streamName);
public void append(LogEvent event) {
if (!recursiveCall.get()) {
try {
if (!systemInitialized) {
if (JobModelManager.currentJobModelManager() != null) {
// JobCoordinator has been instantiated
systemInitialized = true;
} else {
log.trace("Waiting for the JobCoordinator to be instantiated...");
} else {
// Serialize the event before adding to the queue to leverage the caller thread
// and ensure that the transferThread can keep up.
if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, TimeUnit.SECONDS)) {
// Do NOT retry adding system to the queue. Dropping the event allows us to alleviate the unlikely
// possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer
// which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack
// of those two code paths can cause a deadlock. Dropping the event allows us to proceed.
// Scenario:
// T1: holds L1 and is waiting for L2
// T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1
// This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control,
// so dropping events in the StreamAppender is our best recourse.
// Drain the queue instead of dropping one message just to reduce the frequency of warn logs above.
int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event
log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.",
// Emit a metric which can be monitored to ensure it doesn't happen often.;
metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE));
} catch (Exception e) {
System.err.println("[StreamAppender] Error sending log message:");
} finally {
} else if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here.;
private Message subAppend(LogEvent event) {
if (getLayout() == null) {
return new SimpleMessage(event.getMessage().getFormattedMessage());
} else {
Object obj = getLayout().toSerializable(event);
if (obj instanceof Message) {
return new SimpleMessage(((Message) obj).getFormattedMessage());
} else if (obj instanceof LogEvent) {
return new SimpleMessage(((LogEvent) obj).getMessage().getFormattedMessage());
} else {
return new SimpleMessage(obj.toString());
private LogEvent subLog(LogEvent event) {
return Log4jLogEvent.newBuilder()
.setContextData((StringMap) event.getContextData())
public void stop() {"Shutting down the StreamAppender...");
try {
} catch (InterruptedException e) {
log.error("Interrupted while waiting for transfer thread to finish.", e);
if (systemProducer != null) {
* force the system producer to flush the messages
private void flushSystemProducer() {
if (systemProducer != null) {
* get the config for the AM or containers based on the containers' names.
* @return Config the config of this container
protected Config getConfig() {
Config config;
try {
if (isApplicationMaster) {
config = JobModelManager.currentJobModelManager().jobModel().getConfig();
} else {
String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
String response = URL(url), 30000, new ExponentialSleepStrategy());
config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig();
} catch (IOException e) {
throw new SamzaException("can not read the config", e);
// Make system producer drop producer errors for StreamAppender
config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true"));
return config;
protected void setupSystem() {
config = getConfig();
Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
if (streamName == null) {
streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId());
// TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters!
MetricsRegistry metricsRegistry = new MetricsRegistryMap();
metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry);
String systemName = log4jSystemConfig.getSystemName();
String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName)
.orElseThrow(() -> new SamzaException(
"Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"));
SystemFactory systemFactory = ReflectionUtil.getObj(systemFactoryName, SystemFactory.class);
setSerde(log4jSystemConfig, systemName, streamName);
if (config.getBoolean(CREATE_STREAM_ENABLED, false)) {
// Explicitly create stream appender stream with the partition count the same as the number of containers.
System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount());
StreamSpec streamSpec =
StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount());
// SystemAdmin only needed for stream creation here.
SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry);
systemStream = new SystemStream(systemName, streamName);
systemProducer.start(); + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName
+ " in " + systemName + ". Logs are partitioned by " + key);
private void startTransferThread() {
try {
// Serialize the key once, since we will use it for every event.
final byte[] keyBytes = key.getBytes("UTF-8");
Runnable transferFromQueueToSystem = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
byte[] serializedLogEvent = logQueue.take();
OutgoingMessageEnvelope outgoingMessageEnvelope =
new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent);
systemProducer.send(SOURCE, outgoingMessageEnvelope);
} catch (InterruptedException e) {
// Preserve the interrupted status for the loop condition.
} catch (Throwable t) {
log.error("Error sending StreamAppender event to SystemProducer", t);
transferThread = new Thread(transferFromQueueToSystem);
transferThread.setName("Samza StreamAppender Producer " + transferThread.getName());
} catch (UnsupportedEncodingException e) {
throw new SamzaException(String.format(
"Container name: %s could not be encoded to bytes. StreamAppender cannot proceed.", key),
protected static String getStreamName(String jobName, String jobId) {
if (jobName == null) {
throw new SamzaException("job name is null. Please specify");
if (jobId == null) {
jobId = "1";
String streamName = "__samza_" + jobName + "_" + jobId + "_logs";
return streamName.replace("-", "_");
* set the serde for this appender. It looks for the stream serde first, then system serde.
* If still can not get the serde, throws exceptions.
* @param log4jSystemConfig log4jSystemConfig for this appender
* @param systemName name of the system
* @param streamName name of the stream
private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) {
String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName();
String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName);
if (serdeName != null) {
serdeClass = log4jSystemConfig.getSerdeClass(serdeName);
if (serdeClass != null) {
SerdeFactory<LogEvent> serdeFactory = ReflectionUtil.getObj(serdeClass, SerdeFactory.class);
serde = serdeFactory.getSerde(systemName, config);
} else {
String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS, serdeName);
throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
serdeKey + " property");
* Returns the serde that is being used for the stream appender.
* @return The Serde&lt;LoggingEvent&gt; that the appender is using.
public Serde<LogEvent> getSerde() {
return serde;