blob: 5434da70c3996118a8ca058a01a4ce6e0f812f41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache license, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the license for the specific language governing permissions and
* limitations under the license.
*/
package org.apache.logging.log4j.flume.appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AppenderBase;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttr;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.RFC5424Layout;
import java.net.InetAddress;
/**
* An Appender that uses the Avro protocol to route events to Flume.
*/
@Plugin(name = "Flume", type = "Core", elementType = "appender", printObject = true)
public final class FlumeAvroAppender extends AppenderBase implements FlumeEventFactory {
private FlumeAvroManager manager;
private final String mdcIncludes;
private final String mdcExcludes;
private final String mdcRequired;
private final String eventPrefix;
private final String mdcPrefix;
private final boolean compressBody;
private final String hostname;
private final int reconnectDelay;
private final int retries;
private final FlumeEventFactory factory;
private FlumeAvroAppender(String name, Filter filter, Layout layout, boolean handleException,
String hostname, String includes, String excludes, String required, String mdcPrefix,
String eventPrefix, boolean compress, int delay, int retries,
FlumeEventFactory factory, FlumeAvroManager manager) {
super(name, filter, layout, handleException);
this.manager = manager;
this.mdcIncludes = includes;
this.mdcExcludes = excludes;
this.mdcRequired = required;
this.eventPrefix = eventPrefix;
this.mdcPrefix = mdcPrefix;
this.compressBody = compress;
this.hostname = hostname;
this.reconnectDelay = delay;
this.retries = retries;
this.factory = factory == null ? this : factory;
}
/**
* Publish the event.
* @param event The LogEvent.
*/
public void append(LogEvent event) {
FlumeEvent flumeEvent = factory.createEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
eventPrefix, compressBody);
flumeEvent.setBody(getLayout().format(flumeEvent));
manager.send(flumeEvent, reconnectDelay, retries);
}
@Override
public void stop() {
super.stop();
manager.release();
}
/**
* Create a Flume event.
* @param event The Log4j LogEvent.
* @param includes comma separated list of mdc elements to include.
* @param excludes comma separated list of mdc elements to exclude.
* @param required comma separated list of mdc elements that must be present with a value.
* @param mdcPrefix The prefix to add to MDC key names.
* @param eventPrefix The prefix to add to event fields.
* @param compress If true the body will be compressed.
* @return A Flume Event.
*/
public FlumeEvent createEvent(LogEvent event, String includes, String excludes, String required,
String mdcPrefix, String eventPrefix, boolean compress) {
return new FlumeEvent(event, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
eventPrefix, compressBody);
}
/**
* Create a Flume Avro Appender.
* @param agents An array of Agents.
* @param delay The amount of time in milliseconds to wait between retries.
* @param agentRetries The number of times to retry an agent before failing to the next agent.
* @param name The name of the Appender.
* @param suppress If true exceptions will be handled in the appender.
* @param excludes A comma separated list of MDC elements to exclude.
* @param includes A comma separated list of MDC elements to include.
* @param required A comma separated list of MDC elements that are required.
* @param mdcPrefix The prefix to add to MDC key names.
* @param eventPrefix The prefix to add to event key names.
* @param compressBody If true the event body will be compressed.
* @param batchSize Number of events to include in a batch. Defaults to 1.
* @param factory The factory to use to create Flume events.
* @param layout The layout to format the event.
* @param filter A Filter to filter events.
* @return A Flume Avro Appender.
*/
@PluginFactory
public static FlumeAvroAppender createAppender(@PluginElement("agents") Agent[] agents,
@PluginAttr("reconnectionDelay") String delay,
@PluginAttr("agentRetries") String agentRetries,
@PluginAttr("name") String name,
@PluginAttr("suppressExceptions") String suppress,
@PluginAttr("mdcExcludes") String excludes,
@PluginAttr("mdcIncludes") String includes,
@PluginAttr("mdcRequired") String required,
@PluginAttr("mdcPrefix") String mdcPrefix,
@PluginAttr("eventPrefix") String eventPrefix,
@PluginAttr("compress") String compressBody,
@PluginAttr("batchSize") String batchSize,
@PluginElement("flumeEventFactory") FlumeEventFactory factory,
@PluginElement("layout") Layout layout,
@PluginElement("filters") Filter filter) {
String hostname;
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (Exception ex) {
LOGGER.error("Unable to determine local hostname", ex);
return null;
}
if (agents == null || agents.length == 0) {
LOGGER.debug("No agents provided, using defaults");
agents = new Agent[] {Agent.createAgent(null, null)};
}
boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
int batchCount = batchSize == null ? 1 : Integer.parseInt(batchSize);
int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
if (layout == null) {
layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes,
includes, required, null, null);
}
if (name == null) {
LOGGER.error("No name provided for Appender");
return null;
}
FlumeAvroManager manager = FlumeAvroManager.getManager(agents, batchCount);
if (manager == null) {
return null;
}
return new FlumeAvroAppender(name, filter, layout, handleExceptions, hostname, includes,
excludes, required, mdcPrefix, eventPrefix, compress, reconnectDelay, retries, factory, manager);
}
}