Add support for Flume.
git-svn-id: https://svn.apache.org/repos/asf/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers@1156861 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/log4j2-core/pom.xml b/log4j2-core/pom.xml
index e9e9066..309d725 100644
--- a/log4j2-core/pom.xml
+++ b/log4j2-core/pom.xml
@@ -75,6 +75,18 @@
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.cloudera</groupId>
+ <artifactId>flume-core</artifactId>
+ <version>0.9.4-cdh3u1</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/Agent.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/Agent.java
new file mode 100644
index 0000000..2eef735
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/Agent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttr;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.internal.StatusLogger;
+
+/**
+ *
+ */
+@Plugin(name="Agent",type="Core",printObject=true)
+public class Agent {
+
+ private final String host;
+
+ private final int port;
+
+ private static final String DEFAULT_HOST = "localhost";
+
+ private static final int DEFAULT_PORT = 35853;
+
+ private static Logger logger = StatusLogger.getLogger();
+
+ private Agent(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+
+ @PluginFactory
+ public static Agent createAgent(@PluginAttr("host") String host,
+ @PluginAttr("port") String port) {
+ if (host == null) {
+ host = DEFAULT_HOST;
+ }
+
+ int portNum;
+ if (port != null) {
+ try {
+ portNum = Integer.parseInt(port);
+ } catch (Exception ex) {
+ logger.error("Error parsing port number " + port, ex);
+ return null;
+ }
+ } else {
+ portNum = DEFAULT_PORT;
+ }
+ return new Agent(host, portNum);
+ }
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
new file mode 100644
index 0000000..70004a0
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AppenderBase;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttr;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.filter.Filters;
+import org.apache.logging.log4j.core.layout.RFC5424Layout;
+
+import java.net.InetAddress;
+
+/**
+ *
+ */
+@Plugin(name="Flume",type="Core",elementType="appender",printObject=true)
+public class FlumeAvroAppender extends AppenderBase {
+
+ private FlumeAvroManager manager;
+
+ private final String mdcIncludes;
+ private final String mdcExcludes;
+ private final String mdcRequired;
+
+ private final String eventPrefix;
+
+ private final String mdcPrefix;
+
+ private final boolean compressBody;
+
+ private final String hostname;
+
+ private FlumeAvroAppender(String name, Filters filters, Layout layout, boolean handleException,
+ String hostname, String includes, String excludes, String required, String mdcPrefix,
+ String eventPrefix, boolean compress, FlumeAvroManager manager) {
+ super(name, filters, layout, handleException);
+ this.manager = manager;
+ this.mdcIncludes = includes;
+ this.mdcExcludes = excludes;
+ this.mdcRequired = required;
+ this.eventPrefix = eventPrefix;
+ this.mdcPrefix = mdcPrefix;
+ this.compressBody = compress;
+ this.hostname = hostname;
+ }
+
+ public void append(LogEvent event) {
+
+ FlumeEvent flumeEvent = new FlumeEvent(event, hostname, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
+ eventPrefix, compressBody);
+ flumeEvent.setBody(getLayout().format(flumeEvent));
+ manager.send(flumeEvent);
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ manager.release();
+ }
+
+ @PluginFactory
+ public static FlumeAvroAppender createAppender(@PluginAttr("agents") Agent[] agents,
+ @PluginAttr("reconnectionDelay") String delay,
+ @PluginAttr("agentRetries") String agentRetries,
+ @PluginAttr("name") String name,
+ @PluginAttr("suppressExceptions") String suppress,
+ @PluginAttr("mdcExcludes") String excludes,
+ @PluginAttr("mdcIncludes") String includes,
+ @PluginAttr("mdcRequired") String required,
+ @PluginAttr("mdcPrefix") String mdcPrefix,
+ @PluginAttr("eventPrefix") String eventPrefix,
+ @PluginAttr("compress") String compressBody,
+ @PluginElement("layout") Layout layout,
+ @PluginElement("filters") Filters filters) {
+
+ String hostname;
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (Exception ex) {
+ logger.error("Unable to determine local hostname", ex);
+ return null;
+ }
+ if (agents == null || agents.length == 0) {
+ logger.debug("No agents provided, using defaults");
+ agents = new Agent[] {Agent.createAgent(null, null)};
+ }
+
+ boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
+ boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
+
+ int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
+ int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
+
+ if (layout == null) {
+ layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes, includes,
+ required, null);
+ }
+
+ if (name == null) {
+ logger.error("No name provided for Appender");
+ return null;
+ }
+
+ FlumeAvroManager manager = FlumeAvroManager.getManager(agents, reconnectDelay, retries);
+ if (manager == null) {
+ return null;
+ }
+ return new FlumeAvroAppender(name, filters, layout, handleExceptions, hostname, includes,
+ excludes, required, mdcPrefix, eventPrefix, compress, manager);
+ }
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroManager.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroManager.java
new file mode 100644
index 0000000..98d93c3
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroManager.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import com.cloudera.flume.handlers.avro.AvroFlumeEvent;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+import com.cloudera.flume.handlers.avro.FlumeEventAvroServer;
+import com.cloudera.flume.handlers.avro.AvroEventConvertUtil;
+import org.apache.logging.log4j.internal.StatusLogger;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ *
+ */
+public class FlumeAvroManager extends AbstractManager {
+
+ private final int reconnectionDelay;
+
+ private FlumeEventAvroServer client;
+
+ private final Agent[] agents;
+
+ private final int retries;
+
+ private static final int DEFAULT_RECONNECTS = 3;
+
+ private int current = 0;
+
+ /**
+ The default reconnection delay (500 milliseconds or .5 seconds).
+ */
+ public static final int DEFAULT_RECONNECTION_DELAY = 500;
+
+ private static ManagerFactory factory = new AvroManagerFactory();
+
+ private static Logger logger = StatusLogger.getLogger();
+
+ public static FlumeAvroManager getManager(Agent[] agents, int delay, int retries) {
+ if (agents == null || agents.length == 0) {
+ throw new IllegalArgumentException("At least one agent is required");
+ }
+ if (delay == 0) {
+ delay = DEFAULT_RECONNECTION_DELAY;
+ }
+ if (retries == 0) {
+ retries = DEFAULT_RECONNECTS;
+ }
+ StringBuilder sb = new StringBuilder("FlumeAvro[");
+ boolean first = true;
+ for (Agent agent : agents) {
+ if (!first) {
+ sb.append(",");
+ }
+ sb.append(agent.getHost()).append(":").append(agent.getPort());
+ first = false;
+ }
+ sb.append("]");
+ return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents, delay, retries));
+ }
+
+
+ public FlumeAvroManager(String name, Agent[] agents, int delay, int retries) {
+ super(name);
+ this.agents = agents;
+ this.client = connect(agents);
+ this.reconnectionDelay = delay;
+ this.retries = retries;
+ }
+
+ protected synchronized void send(FlumeEvent event) {
+ AvroFlumeEvent avroEvent = AvroEventConvertUtil.toAvroEvent(event);
+ int i = 0;
+
+ String msg = "Error writing to " + getName();
+
+ do {
+ try {
+ client.append(avroEvent);
+ return;
+ } catch (Exception ex) {
+ if (i == retries - 1) {
+ msg = "Error writing to " + getName() + " at " + agents[0].getHost() + ":" + agents[0].getPort();
+ logger.warn(msg, ex);
+ break;
+ }
+ sleep();
+ }
+ } while (++i < retries);
+
+ for (int index = 0; index < agents.length; ++index) {
+ if (index == current) {
+ continue;
+ }
+ Agent agent = agents[index];
+ i = 0;
+ do {
+ try {
+
+ FlumeEventAvroServer c = connect(agent.getHost(), agent.getPort());
+ c.append(avroEvent);
+ client = c;
+ current = i;
+ return;
+ } catch (Exception ex) {
+ if (i == retries - 1) {
+ String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
+ agent.getPort();
+ logger.warn(warnMsg, ex);
+ break;
+ }
+ sleep();
+ }
+ } while (++i < retries);
+ }
+
+ throw new AppenderRuntimeException(msg);
+
+ }
+
+ private void sleep() {
+ try {
+ Thread.sleep(reconnectionDelay);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * There is a very good chance that this will always return the first agent even if it isn't available.
+ * @param agents The list of agents to choose from
+ * @return The FlumeEventAvroServer.
+ */
+ private FlumeEventAvroServer connect(Agent[] agents) {
+ int i = 0;
+ for (Agent agent : agents) {
+ FlumeEventAvroServer server = connect(agent.getHost(), agent.getPort());
+ if (server != null) {
+ current = i;
+ return server;
+ }
+ ++i;
+ }
+ throw new AppenderRuntimeException("Unable to connect to any agents");
+ }
+
+ public void releaseSub() {
+ }
+
+ private FlumeEventAvroServer connect(String hostname, int port) {
+ URL url;
+
+ try {
+ url = new URL("http", hostname, port, "/");
+ } catch (MalformedURLException ex) {
+ logger.error("Unable to create a URL for hostname " + hostname + " at port " + port, ex);
+ return null;
+ }
+
+ try {
+ return SpecificRequestor.getClient(FlumeEventAvroServer.class, new HttpTransceiver(url));
+ } catch (IOException ioe) {
+ logger.error("Unable to create Avro client");
+ return null;
+ }
+ }
+
+ private static class FactoryData {
+ Agent[] agents;
+ int delay;
+ int retries;
+
+ public FactoryData(Agent[] agents, int delay, int retries) {
+ this.agents = agents;
+ this.delay = delay;
+ this.retries = retries;
+ }
+ }
+
+ private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+ public FlumeAvroManager createManager(String name, FactoryData data) {
+ try {
+
+ return new FlumeAvroManager(name, data.agents, data.delay, data.retries);
+ } catch (Exception ex) {
+ logger.error("Could not create FlumeAvroManager", ex);
+ }
+ return null;
+ }
+ }
+
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
new file mode 100644
index 0000000..97c8a7a
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import com.cloudera.flume.core.EventBaseImpl;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Marker;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggingException;
+import org.apache.logging.log4j.message.Message;
+import org.apache.logging.log4j.message.StructuredDataId;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ *
+ */
+class FlumeEvent extends EventBaseImpl implements LogEvent {
+
+ private final LogEvent event;
+
+ private byte[] body;
+
+ private final String hostname;
+
+ private final Map<String, Object> ctx = new HashMap<String, Object>();
+
+ private static final String DEFAULT_MDC_PREFIX = "mdc:";
+
+ private static final String DEFAULT_EVENT_PREFIX = "";
+
+ private static final String EVENT_TYPE = "EventType";
+
+ private static final String EVENT_ID = "EventId";
+
+ private static final String GUID = "guid";
+
+ private final boolean compress;
+
+ public FlumeEvent(LogEvent event, String hostname, String includes, String excludes, String required,
+ String mdcPrefix, String eventPrefix, boolean compress) {
+ this.event = event;
+ this.hostname = hostname;
+ this.compress = compress;
+ if (mdcPrefix == null) {
+ mdcPrefix = DEFAULT_MDC_PREFIX;
+ }
+ if (eventPrefix == null) {
+ eventPrefix = DEFAULT_EVENT_PREFIX;
+ }
+ this.fields = new HashMap<String, byte[]>();
+ Map<String, Object> mdc = event.getContextMap();
+ if (includes != null) {
+ String[] array = includes.split(",");
+ if (array.length > 0) {
+ for (String str : array) {
+ if (mdc.containsKey(str)) {
+ ctx.put(str, mdc.get(str));
+ }
+ }
+ }
+ } else if (excludes != null) {
+ String[] array = excludes.split(",");
+ if (array.length > 0) {
+ List<String> list = Arrays.asList(array);
+ for (Map.Entry<String, Object> entry : mdc.entrySet()) {
+ if (!list.contains(entry.getKey())) {
+ ctx.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ if (required != null) {
+ String[] array = required.split(",");
+ if (array.length > 0) {
+ for (String str : array) {
+ if (!mdc.containsKey(str)) {
+ throw new LoggingException("Required key " + str + " is missing from the MDC");
+ }
+ }
+ }
+ }
+ if (event.getMessage() instanceof StructuredDataMessage) {
+ StructuredDataMessage msg = (StructuredDataMessage) event.getMessage();
+ fields.put(eventPrefix + EVENT_TYPE, msg.getType().getBytes());
+ StructuredDataId id = msg.getId();
+ fields.put(eventPrefix + EVENT_ID, id.getName().getBytes());
+ Map<String, String> data = msg.getData();
+ for (Map.Entry<String, String> entry : data.entrySet()) {
+ fields.put(eventPrefix + entry.getKey(), entry.getValue().getBytes());
+ }
+ }
+
+ for (Map.Entry<String, Object> entry : ctx.entrySet()) {
+ fields.put(mdcPrefix + entry.getKey(), entry.getValue().toString().getBytes());
+ }
+
+ fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString().getBytes());
+ }
+
+ public void setBody(byte[] body) {
+ if (body == null || body.length == 0) {
+ this.body = new byte[0];
+ return;
+ }
+ if (compress) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ GZIPOutputStream os = new GZIPOutputStream(baos);
+ os.write(body);
+ os.close();
+ } catch (IOException ioe) {
+ throw new LoggingException("Unable to compress message", ioe);
+ }
+ this.body = baos.toByteArray();
+ } else {
+ this.body = body;
+ }
+ }
+
+ @Override
+ public byte[] getBody() {
+ return this.body;
+ }
+
+ @Override
+ public Priority getPriority() {
+ switch (event.getLevel()) {
+ case INFO:
+ return Priority.INFO;
+ case ERROR:
+ return Priority.ERROR;
+ case DEBUG:
+ return Priority.DEBUG;
+ case WARN:
+ return Priority.WARN;
+ case TRACE:
+ return Priority.TRACE;
+ case FATAL:
+ return Priority.FATAL;
+ }
+ return Priority.INFO;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return event.getMillis();
+ }
+
+ @Override
+ public long getNanos() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public String getHost() {
+ return hostname;
+ }
+
+ public Level getLevel() {
+ return event.getLevel();
+ }
+
+ public String getLoggerName() {
+ return event.getLoggerName();
+ }
+
+ public StackTraceElement getSource() {
+ return event.getSource();
+ }
+
+ public Message getMessage() {
+ return event.getMessage();
+ }
+
+ public Marker getMarker() {
+ return event.getMarker();
+ }
+
+ public String getThreadName() {
+ return event.getThreadName();
+ }
+
+ public long getMillis() {
+ return event.getMillis();
+ }
+
+ public Throwable getThrown() {
+ return event.getThrown();
+ }
+
+ public Map<String, Object> getContextMap() {
+ return ctx;
+ }
+
+ public Stack<String> getContextStack() {
+ return event.getContextStack();
+ }
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/UUIDUtil.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/UUIDUtil.java
new file mode 100644
index 0000000..7c78b64
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/UUIDUtil.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class UUIDUtil
+{
+ private static AtomicInteger count = new AtomicInteger(0);
+
+ private static final long VERSION = 0x9000L;
+
+ private static final byte VARIANT = (byte)0xC0;
+
+ private static long least;
+
+ static
+ {
+ byte[] mac = null;
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+
+ try {
+ NetworkInterface ni = NetworkInterface.getByInetAddress(address);
+ if (ni != null) {
+ Method method = ni.getClass().getMethod("getHardwareAddress");
+ mac = (byte[]) method.invoke(ni);
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ // Ignore exception
+ }
+ if (mac == null || mac.length == 0) {
+ mac = address.getAddress();
+ }
+ }
+ catch (UnknownHostException e) {
+ // Ignore exception
+ }
+ if (mac == null || mac.length == 0) {
+ Random randomGenerator = new SecureRandom();
+ mac = new byte[6];
+ randomGenerator.nextBytes(mac);
+ }
+ int length = mac.length >= 6 ? 6 : mac.length;
+ int index = mac.length >= 6 ? mac.length - 6 : 0;
+ byte[] node = new byte[8];
+ node[0] = VARIANT;
+ node[1] = 0;
+ for (int i=2; i < 8 ; ++i) {
+ node[i] = 0;
+ }
+ System.arraycopy(mac, index, node, index + 2, length);
+ ByteBuffer buf = ByteBuffer.wrap(node);
+ least = buf.getLong();
+ }
+
+ private static String toHexString(byte[] bytes) {
+ char[] hexArray = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+ char[] hexChars = new char[bytes.length * 2];
+ int v;
+
+ for (int j = 0; j < bytes.length; j++) {
+ v = bytes[j] & 0xFF;
+ hexChars[j*2] = hexArray[v/16];
+ hexChars[j*2 + 1] = hexArray[v%16];
+ }
+ return new String(hexChars);
+ }
+
+ /* This class cannot be instantiated */
+ private UUIDUtil() {
+ }
+
+ /**
+ * Convert a UUID to a String with no dashes.
+ * @param uuid The UUID.
+ * @return The String version of the UUID with the '-' characters removed.
+ */
+ public static String getUUIDString(UUID uuid)
+ {
+ return StringUtils.replaceChars(uuid.toString(), "-", "");
+ }
+
+ /**
+ * Generates universally unique identifiers (UUIDs).
+ * UUID combines enough of the system information to make it unique across
+ * space and time. UUID string is composed of following fields:
+ * <ol>
+ * <li>Digits 1-12 are the lower 48 bits of the <code>System.currentTimeMillis()</code> call.
+ * This makes the UUID unique down to the millisecond for about 8,925 years.</li>
+ * <li>Digit 13 is the version (with a value of 9).</li>
+ * <li>Digits 14-16 are a sequence number that is incremented each time a UUID is generated.</li>
+ * <li>Digit 17 is the variant (with a value of 0xC)</li>
+ * <li>Digit 18 is zero.</li>
+ * <li>Digits 19-32 represent the system the application is running on.
+ * </ol>
+ *
+ * @return universally unique identifiers (UUID)
+ */
+ public static UUID getTimeBasedUUID()
+ {
+ int timeHi = count.incrementAndGet() & 0xfff;
+ long most = (System.currentTimeMillis() << 24) | VERSION | timeHi;
+
+ return new UUID(most, least);
+ }
+}
+
diff --git a/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java b/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
new file mode 100644
index 0000000..5dcd71f
--- /dev/null
+++ b/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.handlers.avro.AvroEventSource;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeAvroAppenderTest {
+
+ private LoggerContext ctx = (LoggerContext) LogManager.getContext();
+
+ private static final String LOGBACK_CONF = "logback.configurationFile";
+ private static final String LOGBACK_CONFIG = "logback-flume.xml";
+
+ private static final int testServerPort = 12345;
+ private static final int testEventCount = 100;
+
+ private AvroEventSource eventSource;
+ private Logger avroLogger;
+
+ @BeforeClass
+ public static void setupClass() {
+ System.setProperty(LOGBACK_CONF, LOGBACK_CONFIG);
+ }
+
+ @AfterClass
+ public static void cleanupClass() {
+ System.clearProperty(LOGBACK_CONF);
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ eventSource = new AvroEventSource(testServerPort);
+ avroLogger = (Logger) LogManager.getLogger("avrologger");
+ /*
+ * Clear out all other appenders associated with this logger to ensure we're
+ * only hitting the Avro appender.
+ */
+ removeAppenders(avroLogger);
+ eventSource.open();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ removeAppenders(avroLogger);
+ eventSource.close();
+ }
+
+ @Test
+ public void testLog4jAvroAppender() throws InterruptedException, IOException {
+ Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+ FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+ null, null, null, null, "true", null, null);
+ avroAppender.start();
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+
+ Assert.assertNotNull(avroLogger);
+
+ int loggedCount = 0;
+ int receivedCount = 0;
+
+ for (int i = 0; i < testEventCount; i++) {
+ avroLogger.info("test i:" + i);
+ loggedCount++;
+ }
+
+ /*
+ * We perform this in another thread so we can put a time SLA on it by using
+ * Future#get(). Internally, the AvroEventSource uses a BlockingQueue.
+ */
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Callable<Event> callable = new Callable<Event>() {
+
+ public Event call() throws Exception {
+ return eventSource.next();
+ }
+ };
+
+ for (int i = 0; i < loggedCount; i++) {
+ try {
+ Future<Event> future = executor.submit(callable);
+
+ /*
+ * We must receive events in less than 1 second. This should be more
+ * than enough as all events should be held in AvroEventSource's
+ * BlockingQueue.
+ */
+ Event event = future.get(1, TimeUnit.SECONDS);
+
+ Assert.assertNotNull(event);
+ Assert.assertNotNull(event.getBody());
+ String body = getBody(event);
+ Assert.assertTrue(body.endsWith("test i:" + i));
+
+ receivedCount++;
+ } catch (ExecutionException e) {
+ Assert.fail("Flume failed to handle an event: " + e.getMessage());
+ break;
+ } catch (TimeoutException e) {
+ Assert
+ .fail("Flume failed to handle an event within the given time SLA: "
+ + e.getMessage());
+ break;
+ } catch (InterruptedException e) {
+ Assert
+ .fail("Flume source executor thread was interrupted. We count this as a failure.");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ executor.shutdown();
+
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ throw new IllegalStateException(
+ "Executor is refusing to shutdown cleanly");
+ }
+
+ Assert.assertEquals(loggedCount, receivedCount);
+ }
+
+ @Test
+ public void testConnectionRefused() {
+ Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(44000))};
+ FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+ null, null, null, null, "true", null, null);
+ avroAppender.start();
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+
+ boolean caughtException = false;
+
+ try {
+ avroLogger.info("message 1");
+ } catch (Throwable t) {
+ //logger.debug("Logging to a non-existant server failed (as expected)", t);
+
+ caughtException = true;
+ }
+
+ Assert.assertTrue(caughtException);
+ }
+
+ @Test
+ public void testReconnect() throws IOException {
+ Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+ FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "500", "10", "avro", "false", null,
+ null, null, null, null, "true", null, null);
+ avroAppender.start();
+ avroLogger.addAppender(avroAppender);
+ avroLogger.setLevel(Level.ALL);
+ avroLogger.info("message 1");
+
+ Event event = eventSource.next();
+
+ Assert.assertNotNull(event);
+ String body = getBody(event);
+ Assert.assertTrue(body.endsWith("message 1"));
+
+ eventSource.close();
+
+ Callable<Void> logCallable = new Callable<Void>() {
+
+ public Void call() throws Exception {
+ avroLogger.info("message 2");
+ return null;
+ }
+ };
+
+ ExecutorService logExecutor = Executors.newSingleThreadExecutor();
+
+ boolean caughtException = false;
+
+ try {
+ logExecutor.submit(logCallable);
+
+ Thread.sleep(1500);
+
+ eventSource.open();
+
+ logExecutor.shutdown();
+
+ if (!logExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ throw new IllegalStateException(
+ "Log executor is refusing to shutdown cleanly");
+ }
+ } catch (Throwable t) {
+ System.err.println("Failed to reestablish a connection and log to an avroSource");
+
+ caughtException = true;
+ }
+
+ Assert.assertFalse(caughtException);
+
+ event = eventSource.next();
+
+ Assert.assertNotNull(event);
+ body = getBody(event);
+ Assert.assertTrue(body.endsWith("message 2"));
+
+ caughtException = false;
+
+ try {
+ avroLogger.info("message 3");
+ } catch (Throwable t) {
+ System.err.println("Logging to a closed server failed (not expected)");
+
+ caughtException = true;
+ }
+
+ Assert.assertFalse(caughtException);
+
+ event = eventSource.next();
+
+ Assert.assertNotNull(event);
+ body = getBody(event);
+ Assert.assertTrue(body.endsWith("message 3"));
+ }
+
+
+ private void removeAppenders(Logger logger) {
+ Map<String,Appender> map = logger.getAppenders();
+ for (Map.Entry<String, Appender> entry : map.entrySet()) {
+ Appender app = entry.getValue();
+ avroLogger.removeAppender(app);
+ app.stop();
+ }
+ }
+
+ private Appender getAppender(Logger logger, String name) {
+ Map<String,Appender> map = logger.getAppenders();
+ return map.get(name);
+ }
+
+ private String getBody(Event event) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+ int n = 0;
+ while (-1 != (n = is.read())) {
+ baos.write(n);
+ }
+ return new String(baos.toByteArray());
+
+ }
+}
diff --git a/log4j2-core/src/test/resources/logback-flume.xml b/log4j2-core/src/test/resources/logback-flume.xml
new file mode 100644
index 0000000..0042db3
--- /dev/null
+++ b/log4j2-core/src/test/resources/logback-flume.xml
@@ -0,0 +1,11 @@
+<configuration>
+ <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <Pattern>%d{ISO8601} %5p [%t] %c{0} %X{transactionId} - %m%n</Pattern>
+ </encoder>
+ </appender>
+
+ <root level="error">
+ <appender-ref ref="Console" />
+ </root>
+</configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f61ba78..724d017 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,6 +220,12 @@
</plugin>
</plugins>
</reporting>
+ <repositories>
+ <repository>
+ <id>cloudera</id>
+ <url>https://repository.cloudera.com/content/repositories/releases/</url>
+ </repository>
+ </repositories>
<distributionManagement>
<site>
<id>apache.website</id>