| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.logging.log4j; |
| |
| import com.google.common.collect.ImmutableMap; |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.log4j.AppenderSkeleton; |
| import org.apache.log4j.Logger; |
| import org.apache.log4j.spi.LoggingEvent; |
| import org.apache.samza.SamzaException; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.JobConfig; |
| import org.apache.samza.config.Log4jSystemConfig; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.config.SerializerConfig; |
| import org.apache.samza.config.ShellCommandConfig; |
| import org.apache.samza.config.TaskConfig; |
| import org.apache.samza.coordinator.JobModelManager; |
| import org.apache.samza.job.model.JobModel; |
| import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory; |
| import org.apache.samza.metrics.MetricsRegistry; |
| import org.apache.samza.metrics.MetricsRegistryMap; |
| import org.apache.samza.serializers.Serde; |
| import org.apache.samza.serializers.SerdeFactory; |
| import org.apache.samza.serializers.model.SamzaObjectMapper; |
| import org.apache.samza.system.OutgoingMessageEnvelope; |
| import org.apache.samza.system.StreamSpec; |
| import org.apache.samza.system.SystemAdmin; |
| import org.apache.samza.system.SystemFactory; |
| import org.apache.samza.system.SystemProducer; |
| import org.apache.samza.system.SystemStream; |
| import org.apache.samza.util.ExponentialSleepStrategy; |
| import org.apache.samza.util.HttpUtil; |
| import org.apache.samza.util.ReflectionUtil; |
| |
| /** |
| * StreamAppender is a log4j appender that sends logs to the system which is |
| * specified by the user in the Samza config. It can send to any system as long |
| * as the system is defined appropriately in the config. |
| */ |
| public class StreamAppender extends AppenderSkeleton { |
| |
| private static final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name"; |
| private static final String JOB_COORDINATOR_TAG = "samza-job-coordinator"; |
| private static final String SOURCE = "log4j-log"; |
| |
| // Hidden config for now. Will move to appropriate Config class when ready to. |
| private static final String CREATE_STREAM_ENABLED = "task.log4j.create.stream.enabled"; |
| |
| protected static final int DEFAULT_QUEUE_SIZE = 100; |
| private static final long DEFAULT_QUEUE_TIMEOUT_S = 2; // Abitrary choice |
| |
| protected static volatile boolean systemInitialized = false; |
| |
| private Config config = null; |
| private SystemStream systemStream = null; |
| private SystemProducer systemProducer = null; |
| private String key = null; |
| private String streamName = null; |
| private int partitionCount = 0; |
| private boolean isApplicationMaster = false; |
| private Serde<LoggingEvent> serde = null; |
| private Logger log = Logger.getLogger(StreamAppender.class); |
| protected StreamAppenderMetrics metrics; |
| |
| private final BlockingQueue<byte[]> logQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE); |
| protected long queueTimeoutS = DEFAULT_QUEUE_TIMEOUT_S; |
| |
| private Thread transferThread; |
| |
| /** |
| * used to detect if this thread is called recursively |
| */ |
| private final AtomicBoolean recursiveCall = new AtomicBoolean(false); |
| |
| /** |
| * Getter for the StreamName parameter. See also {@link #activateOptions()} for when this is called. |
| * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>} |
| * @return The configured stream name. |
| */ |
| public String getStreamName() { |
| return this.streamName; |
| } |
| |
| /** |
| * Setter for the StreamName parameter. See also {@link #activateOptions()} for when this is called. |
| * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>} |
| * @param streamName The configured stream name. |
| */ |
| public void setStreamName(String streamName) { |
| this.streamName = streamName; |
| } |
| |
| /** |
| * Getter for the number of partitions to create on a new StreamAppender stream. See also {@link #activateOptions()} for when this is called. |
| * Example: {@literal <param name="PartitionCount" value="4"/>} |
| * @return The configured partition count of the StreamAppender stream. If not set, returns {@link JobConfig#getContainerCount()}. |
| */ |
| public int getPartitionCount() { |
| if (partitionCount > 0) { |
| return partitionCount; |
| } |
| return new JobConfig(getConfig()).getContainerCount(); |
| } |
| |
| /** |
| * Setter for the number of partitions to create on a new StreamAppender stream. See also {@link #activateOptions()} for when this is called. |
| * Example: {@literal <param name="PartitionCount" value="4"/>} |
| * @param partitionCount Configurable partition count. |
| */ |
| public void setPartitionCount(int partitionCount) { |
| this.partitionCount = partitionCount; |
| } |
| |
| /** |
| * Additional configurations needed before logging to stream. Called once in the container before the first log event is sent. |
| */ |
| @Override |
| public void activateOptions() { |
| String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME); |
| if (containerName != null) { |
| isApplicationMaster = containerName.contains(JOB_COORDINATOR_TAG); |
| } else { |
| throw new SamzaException("Got null container name from system property: " + JAVA_OPTS_CONTAINER_NAME + |
| ". This is used as the key for the log appender, so can't proceed."); |
| } |
| key = containerName; // use the container name as the key for the logs |
| |
| // StreamAppender has to wait until the JobCoordinator is up when the log is in the AM |
| if (isApplicationMaster) { |
| systemInitialized = false; |
| } else { |
| setupSystem(); |
| systemInitialized = true; |
| } |
| } |
| |
| @Override |
| public void append(LoggingEvent event) { |
| if (!recursiveCall.get()) { |
| try { |
| recursiveCall.set(true); |
| if (!systemInitialized) { |
| if (JobModelManager.currentJobModelManager() != null) { |
| // JobCoordinator has been instantiated |
| setupSystem(); |
| systemInitialized = true; |
| } else { |
| log.trace("Waiting for the JobCoordinator to be instantiated..."); |
| } |
| } else { |
| // Serialize the event before adding to the queue to leverage the caller thread |
| // and ensure that the transferThread can keep up. |
| if (!logQueue.offer(serde.toBytes(subLog(event)), queueTimeoutS, TimeUnit.SECONDS)) { |
| // Do NOT retry adding to the queue. Dropping the event allows us to alleviate the unlikely |
| // possibility of a deadlock, which can arise due to a circular dependency between the SystemProducer |
| // which is used for StreamAppender and the log, which uses StreamAppender. Any locks held in the callstack |
| // of those two code paths can cause a deadlock. Dropping the event allows us to proceed. |
| |
| // Scenario: |
| // T1: holds L1 and is waiting for L2 |
| // T2: holds L2 and is waiting to produce to BQ1 which is drained by T3 (SystemProducer) which is waiting for L1 |
| |
| // This has happened due to locks in Kafka and log4j (see SAMZA-1537), which are both out of our control, |
| // so dropping events in the StreamAppender is our best recourse. |
| |
| // Drain the queue instead of dropping one message just to reduce the frequency of warn logs above. |
| int messagesDropped = logQueue.drainTo(new ArrayList<>()) + 1; // +1 because of the current log event |
| log.warn(String.format("Exceeded timeout %ss while trying to log to %s. Dropping %d log messages.", |
| queueTimeoutS, |
| systemStream.toString(), |
| messagesDropped)); |
| |
| // Emit a metric which can be monitored to ensure it doesn't happen often. |
| metrics.logMessagesDropped.inc(messagesDropped); |
| } |
| metrics.bufferFillPct.set(Math.round(100f * logQueue.size() / DEFAULT_QUEUE_SIZE)); |
| } |
| } catch (Exception e) { |
| System.err.println("[StreamAppender] Error sending log message:"); |
| e.printStackTrace(); |
| } finally { |
| recursiveCall.set(false); |
| } |
| } else if (metrics != null) { // setupSystem() may not have been invoked yet so metrics can be null here. |
| metrics.recursiveCalls.inc(); |
| } |
| } |
| |
| private String subAppend(LoggingEvent event) { |
| if (this.layout == null) { |
| return event.getRenderedMessage(); |
| } else { |
| return this.layout.format(event).trim(); |
| } |
| } |
| |
| private LoggingEvent subLog(LoggingEvent event) { |
| return new LoggingEvent(event.getFQNOfLoggerClass(), event.getLogger(), event.getTimeStamp(), |
| event.getLevel(), subAppend(event), event.getThreadName(), event.getThrowableInformation(), |
| event.getNDC(), event.getLocationInformation(), event.getProperties()); |
| } |
| |
| @Override |
| public void close() { |
| log.info("Shutting down the StreamAppender..."); |
| if (!this.closed) { |
| this.closed = true; |
| transferThread.interrupt(); |
| try { |
| transferThread.join(); |
| } catch (InterruptedException e) { |
| log.error("Interrupted while waiting for transfer thread to finish.", e); |
| Thread.currentThread().interrupt(); |
| } |
| |
| flushSystemProducer(); |
| if (systemProducer != null) { |
| systemProducer.stop(); |
| } |
| } |
| } |
| |
| @Override |
| public boolean requiresLayout() { |
| return false; |
| } |
| |
| /** |
| * force the system producer to flush the messages |
| */ |
| public void flushSystemProducer() { |
| if (systemProducer != null) { |
| systemProducer.flush(SOURCE); |
| } |
| } |
| |
| /** |
| * get the config for the AM or containers based on the containers' names. |
| * |
| * @return Config the config of this container |
| */ |
| protected Config getConfig() { |
| Config config; |
| |
| try { |
| if (isApplicationMaster) { |
| config = JobModelManager.currentJobModelManager().jobModel().getConfig(); |
| } else { |
| String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL); |
| String response = HttpUtil.read(new URL(url), 30000, new ExponentialSleepStrategy()); |
| config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig(); |
| } |
| } catch (IOException e) { |
| throw new SamzaException("can not read the config", e); |
| } |
| // Make system producer drop producer errors for StreamAppender |
| config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS, "true")); |
| |
| return config; |
| } |
| |
| protected void setupSystem() { |
| config = getConfig(); |
| Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config); |
| |
| if (streamName == null) { |
| streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId()); |
| } |
| |
| // TODO we need the ACTUAL metrics registry, or the metrics won't get reported by the metric reporters! |
| MetricsRegistry metricsRegistry = new MetricsRegistryMap(); |
| metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry); |
| |
| String systemName = log4jSystemConfig.getSystemName(); |
| String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName) |
| .orElseThrow(() -> new SamzaException( |
| "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use")); |
| SystemFactory systemFactory = ReflectionUtil.getObj(systemFactoryName, SystemFactory.class); |
| |
| setSerde(log4jSystemConfig, systemName, streamName); |
| |
| if (config.getBoolean(CREATE_STREAM_ENABLED, false)) { |
| // Explicitly create stream appender stream with the partition count the same as the number of containers. |
| System.out.println("[StreamAppender] creating stream " + streamName + " with partition count " + getPartitionCount()); |
| StreamSpec streamSpec = StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, getPartitionCount()); |
| |
| // SystemAdmin only needed for stream creation here. |
| SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config); |
| systemAdmin.start(); |
| systemAdmin.createStream(streamSpec); |
| systemAdmin.stop(); |
| } |
| |
| systemProducer = systemFactory.getProducer(systemName, config, metricsRegistry); |
| systemStream = new SystemStream(systemName, streamName); |
| systemProducer.register(SOURCE); |
| systemProducer.start(); |
| |
| log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName |
| + " in " + systemName + ". Logs are partitioned by " + key); |
| |
| startTransferThread(); |
| } |
| |
| private void startTransferThread() { |
| |
| try { |
| // Serialize the key once, since we will use it for every event. |
| final byte[] keyBytes = key.getBytes("UTF-8"); |
| |
| Runnable transferFromQueueToSystem = () -> { |
| while (!Thread.currentThread().isInterrupted()) { |
| try { |
| byte[] serializedLogEvent = logQueue.take(); |
| |
| OutgoingMessageEnvelope outgoingMessageEnvelope = |
| new OutgoingMessageEnvelope(systemStream, keyBytes, serializedLogEvent); |
| systemProducer.send(SOURCE, outgoingMessageEnvelope); |
| |
| } catch (InterruptedException e) { |
| // Preserve the interrupted status for the loop condition. |
| Thread.currentThread().interrupt(); |
| } catch (Throwable t) { |
| log.error("Error sending StreamAppender event to SystemProducer", t); |
| } |
| } |
| }; |
| |
| transferThread = new Thread(transferFromQueueToSystem); |
| transferThread.setDaemon(true); |
| transferThread.setName("Samza StreamAppender Producer " + transferThread.getName()); |
| transferThread.start(); |
| |
| } catch (UnsupportedEncodingException e) { |
| throw new SamzaException(String.format( |
| "Container name: %s could not be encoded to bytes. StreamAppender cannot proceed.", key), |
| e); |
| } |
| } |
| |
| protected static String getStreamName(String jobName, String jobId) { |
| if (jobName == null) { |
| throw new SamzaException("job name is null. Please specify job.name"); |
| } |
| if (jobId == null) { |
| jobId = "1"; |
| } |
| String streamName = "__samza_" + jobName + "_" + jobId + "_logs"; |
| return streamName.replace("-", "_"); |
| } |
| |
| /** |
| * set the serde for this appender. It looks for the stream serde first, then system serde. |
| * If still can not get the serde, throws exceptions. |
| * |
| * @param log4jSystemConfig log4jSystemConfig for this appender |
| * @param systemName name of the system |
| * @param streamName name of the stream |
| */ |
| private void setSerde(Log4jSystemConfig log4jSystemConfig, String systemName, String streamName) { |
| String serdeClass = LoggingEventJsonSerdeFactory.class.getCanonicalName(); |
| String serdeName = log4jSystemConfig.getStreamSerdeName(systemName, streamName); |
| |
| if (serdeName != null) { |
| serdeClass = log4jSystemConfig.getSerdeClass(serdeName); |
| } |
| |
| if (serdeClass != null) { |
| SerdeFactory<LoggingEvent> serdeFactory = ReflectionUtil.getObj(serdeClass, SerdeFactory.class); |
| serde = serdeFactory.getSerde(systemName, config); |
| } else { |
| String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS, serdeName); |
| throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " + |
| serdeKey + " property"); |
| } |
| } |
| |
| /** |
| * Returns the serde that is being used for the stream appender. |
| * |
| * @return The Serde<LoggingEvent> that the appender is using. |
| */ |
| public Serde<LoggingEvent> getSerde() { |
| return serde; |
| } |
| } |