| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache license, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the license for the specific language governing permissions and |
| * limitations under the license. |
| */ |
| package org.apache.logging.log4j.flume.appender; |
| |
| import java.io.Serializable; |
| import java.util.Locale; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.core.Appender; |
| import org.apache.logging.log4j.core.Filter; |
| import org.apache.logging.log4j.core.Layout; |
| import org.apache.logging.log4j.core.LogEvent; |
| import org.apache.logging.log4j.core.appender.AbstractAppender; |
| import org.apache.logging.log4j.core.config.Property; |
| import org.apache.logging.log4j.plugins.Plugin; |
| import org.apache.logging.log4j.plugins.PluginAliases; |
| import org.apache.logging.log4j.plugins.PluginAttribute; |
| import org.apache.logging.log4j.plugins.PluginElement; |
| import org.apache.logging.log4j.plugins.PluginFactory; |
| import org.apache.logging.log4j.core.layout.Rfc5424Layout; |
| import org.apache.logging.log4j.core.net.Facility; |
| import org.apache.logging.log4j.core.util.Booleans; |
| import org.apache.logging.log4j.core.util.Integers; |
| import org.apache.logging.log4j.util.Timer; |
| |
| /** |
| * An Appender that uses the Avro protocol to route events to Flume. |
| */ |
| @Plugin(name = "Flume", category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true) |
| public final class FlumeAppender extends AbstractAppender implements FlumeEventFactory { |
| |
| private static final String[] EXCLUDED_PACKAGES = {"org.apache.flume", "org.apache.avro"}; |
| private static final int DEFAULT_MAX_DELAY = 60000; |
| |
| private static final int DEFAULT_LOCK_TIMEOUT_RETRY_COUNT = 5; |
| |
| private final AbstractFlumeManager manager; |
| |
| private final String mdcIncludes; |
| private final String mdcExcludes; |
| private final String mdcRequired; |
| |
| private final String eventPrefix; |
| |
| private final String mdcPrefix; |
| |
| private final boolean compressBody; |
| |
| private final FlumeEventFactory factory; |
| |
| private final Timer timer = new Timer("FlumeEvent", 5000); |
| private volatile long count; |
| |
| /** |
| * Which Manager will be used by the appender instance. |
| */ |
| private enum ManagerType { |
| AVRO, EMBEDDED, PERSISTENT; |
| |
| public static ManagerType getType(final String type) { |
| return valueOf(type.toUpperCase(Locale.US)); |
| } |
| } |
| |
| private FlumeAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, |
| final boolean ignoreExceptions, final String includes, final String excludes, final String required, |
| final String mdcPrefix, final String eventPrefix, final boolean compress, final FlumeEventFactory factory, |
| final Property[] properties, final AbstractFlumeManager manager) { |
| super(name, filter, layout, ignoreExceptions, properties); |
| this.manager = manager; |
| this.mdcIncludes = includes; |
| this.mdcExcludes = excludes; |
| this.mdcRequired = required; |
| this.eventPrefix = eventPrefix; |
| this.mdcPrefix = mdcPrefix; |
| this.compressBody = compress; |
| this.factory = factory == null ? this : factory; |
| } |
| |
| /** |
| * Publish the event. |
| * @param event The LogEvent. |
| */ |
| @Override |
| public void append(final LogEvent event) { |
| final String name = event.getLoggerName(); |
| if (name != null) { |
| for (final String pkg : EXCLUDED_PACKAGES) { |
| if (name.startsWith(pkg)) { |
| return; |
| } |
| } |
| } |
| timer.startOrResume(); |
| final FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, |
| eventPrefix, compressBody); |
| flumeEvent.setBody(getLayout().toByteArray(flumeEvent)); |
| if (update()) { |
| String msg = timer.stop(); |
| LOGGER.debug(msg); |
| } else { |
| timer.pause(); |
| } |
| manager.send(flumeEvent); |
| } |
| |
| private synchronized boolean update() { |
| if (++count == 5000) { |
| count = 0; |
| return true; |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean stop(final long timeout, final TimeUnit timeUnit) { |
| setStopping(); |
| boolean stopped = super.stop(timeout, timeUnit, false); |
| stopped &= manager.stop(timeout, timeUnit); |
| setStopped(); |
| return stopped; |
| } |
| |
| /** |
| * Create a Flume event. |
| * @param event The Log4j LogEvent. |
| * @param includes comma separated list of mdc elements to include. |
| * @param excludes comma separated list of mdc elements to exclude. |
| * @param required comma separated list of mdc elements that must be present with a value. |
| * @param mdcPrefix The prefix to add to MDC key names. |
| * @param eventPrefix The prefix to add to event fields. |
| * @param compress If true the body will be compressed. |
| * @return A Flume Event. |
| */ |
| @Override |
| public FlumeEvent createEvent(final LogEvent event, final String includes, final String excludes, |
| final String required, final String mdcPrefix, final String eventPrefix, |
| final boolean compress) { |
| return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix, |
| eventPrefix, compressBody); |
| } |
| |
| /** |
| * Create a Flume Avro Appender. |
| * @param agents An array of Agents. |
| * @param properties Properties to pass to the embedded agent. |
| * @param embedded true if the embedded agent manager should be used. otherwise the Avro manager will be used. |
| * <b>Note: </b><i>The embedded attribute is deprecated in favor of specifying the type attribute.</i> |
| * @param type Avro (default), Embedded, or Persistent. |
| * @param dataDir The directory where the Flume FileChannel should write its data. |
| * @param connectionTimeoutMillis The amount of time in milliseconds to wait before a connection times out. Minimum is |
| * 1000. |
| * @param requestTimeoutMillis The amount of time in milliseconds to wait before a request times out. Minimum is 1000. |
| * @param agentRetries The number of times to retry an agent before failing to the next agent. |
| * @param maxDelayMillis The maximum number of milliseconds to wait for a complete batch. |
| * @param name The name of the Appender. |
| * @param ignore If {@code "true"} (default) exceptions encountered when appending events are logged; otherwise |
| * they are propagated to the caller. |
| * @param excludes A comma separated list of MDC elements to exclude. |
| * @param includes A comma separated list of MDC elements to include. |
| * @param required A comma separated list of MDC elements that are required. |
| * @param mdcPrefix The prefix to add to MDC key names. |
| * @param eventPrefix The prefix to add to event key names. |
| * @param compressBody If true the event body will be compressed. |
| * @param batchSize Number of events to include in a batch. Defaults to 1. |
| * @param lockTimeoutRetries Times to retry a lock timeout when writing to Berkeley DB. |
| * @param factory The factory to use to create Flume events. |
| * @param layout The layout to format the event. |
| * @param filter A Filter to filter events. |
| * |
| * @return A Flume Avro Appender. |
| */ |
| @PluginFactory |
| public static FlumeAppender createAppender(@PluginElement final Agent[] agents, |
| @PluginElement final Property[] properties, |
| @PluginAttribute final String hosts, |
| @PluginAttribute final String embedded, |
| @PluginAttribute final String type, |
| @PluginAttribute final String dataDir, |
| @PluginAliases("connectTimeout") |
| @PluginAttribute("connectTimeoutMillis") final String connectionTimeoutMillis, |
| @PluginAliases("requestTimeout") |
| @PluginAttribute final String requestTimeoutMillis, |
| @PluginAttribute final String agentRetries, |
| @PluginAliases("maxDelay") // deprecated |
| @PluginAttribute final String maxDelayMillis, |
| @PluginAttribute final String name, |
| @PluginAttribute("ignoreExceptions") final String ignore, |
| @PluginAttribute("mdcExcludes") final String excludes, |
| @PluginAttribute("mdcIncludes") final String includes, |
| @PluginAttribute("mdcRequired") final String required, |
| @PluginAttribute final String mdcPrefix, |
| @PluginAttribute final String eventPrefix, |
| @PluginAttribute("compress") final String compressBody, |
| @PluginAttribute final String batchSize, |
| @PluginAttribute final String lockTimeoutRetries, |
| @PluginElement final FlumeEventFactory factory, |
| @PluginElement Layout<? extends Serializable> layout, |
| @PluginElement final Filter filter) { |
| |
| final boolean embed = embedded != null ? Boolean.parseBoolean(embedded) : |
| (agents == null || agents.length == 0 || hosts == null || hosts.isEmpty()) && properties != null && properties.length > 0; |
| final boolean ignoreExceptions = Booleans.parseBoolean(ignore, true); |
| final boolean compress = Booleans.parseBoolean(compressBody, true); |
| ManagerType managerType; |
| if (type != null) { |
| if (embed && embedded != null) { |
| try { |
| managerType = ManagerType.getType(type); |
| LOGGER.warn("Embedded and type attributes are mutually exclusive. Using type " + type); |
| } catch (final Exception ex) { |
| LOGGER.warn("Embedded and type attributes are mutually exclusive and type " + type + |
| " is invalid."); |
| managerType = ManagerType.EMBEDDED; |
| } |
| } else { |
| try { |
| managerType = ManagerType.getType(type); |
| } catch (final Exception ex) { |
| LOGGER.warn("Type " + type + " is invalid."); |
| managerType = ManagerType.EMBEDDED; |
| } |
| } |
| } else if (embed) { |
| managerType = ManagerType.EMBEDDED; |
| } else { |
| managerType = ManagerType.AVRO; |
| } |
| |
| final int batchCount = Integers.parseInt(batchSize, 1); |
| final int connectTimeoutMillis = Integers.parseInt(connectionTimeoutMillis, 0); |
| final int reqTimeoutMillis = Integers.parseInt(requestTimeoutMillis, 0); |
| final int retries = Integers.parseInt(agentRetries, 0); |
| final int lockTimeoutRetryCount = Integers.parseInt(lockTimeoutRetries, DEFAULT_LOCK_TIMEOUT_RETRY_COUNT); |
| final int delayMillis = Integers.parseInt(maxDelayMillis, DEFAULT_MAX_DELAY); |
| |
| if (layout == null) { |
| final int enterpriseNumber = Rfc5424Layout.DEFAULT_ENTERPRISE_NUMBER; |
| layout = Rfc5424Layout.createLayout(Facility.LOCAL0, null, enterpriseNumber, true, Rfc5424Layout.DEFAULT_MDCID, |
| mdcPrefix, eventPrefix, false, null, null, null, excludes, includes, required, null, false, null, |
| null); |
| } |
| |
| if (name == null) { |
| LOGGER.error("No name provided for Appender"); |
| return null; |
| } |
| |
| AbstractFlumeManager manager; |
| |
| switch (managerType) { |
| case EMBEDDED: |
| manager = FlumeEmbeddedManager.getManager(name, agents, properties, batchCount, dataDir); |
| break; |
| case AVRO: |
| manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis); |
| break; |
| case PERSISTENT: |
| manager = FlumePersistentManager.getManager(name, getAgents(agents, hosts), properties, batchCount, retries, |
| connectTimeoutMillis, reqTimeoutMillis, delayMillis, lockTimeoutRetryCount, dataDir); |
| break; |
| default: |
| LOGGER.debug("No manager type specified. Defaulting to AVRO"); |
| manager = FlumeAvroManager.getManager(name, getAgents(agents, hosts), batchCount, delayMillis, retries, connectTimeoutMillis, reqTimeoutMillis); |
| } |
| |
| if (manager == null) { |
| return null; |
| } |
| |
| return new FlumeAppender(name, filter, layout, ignoreExceptions, includes, |
| excludes, required, mdcPrefix, eventPrefix, compress, factory, Property.EMPTY_ARRAY, manager); |
| } |
| |
| private static Agent[] getAgents(Agent[] agents, final String hosts) { |
| if (agents == null || agents.length == 0) { |
| if (hosts != null && !hosts.isEmpty()) { |
| LOGGER.debug("Parsing agents from hosts parameter"); |
| final String[] hostports = hosts.split(","); |
| agents = new Agent[hostports.length]; |
| for(int i = 0; i < hostports.length; ++i) { |
| final String[] h = hostports[i].split(":"); |
| agents[i] = Agent.createAgent(h[0], h.length > 1 ? h[1] : null); |
| } |
| } else { |
| LOGGER.debug("No agents provided, using defaults"); |
| agents = new Agent[] {Agent.createAgent(null, null)}; |
| } |
| } |
| |
| LOGGER.debug("Using agents {}", agents); |
| return agents; |
| } |
| } |