Add support for Flume.

git-svn-id: https://svn.apache.org/repos/asf/logging/log4j/branches/BRANCH_2_0_EXPERIMENTAL/rgoers@1156861 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/log4j2-core/pom.xml b/log4j2-core/pom.xml
index e9e9066..309d725 100644
--- a/log4j2-core/pom.xml
+++ b/log4j2-core/pom.xml
@@ -75,6 +75,18 @@
       <artifactId>logback-core</artifactId>
       <scope>test</scope>
     </dependency>
+     <dependency>
+      <groupId>com.cloudera</groupId>
+      <artifactId>flume-core</artifactId>
+      <version>0.9.4-cdh3u1</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/Agent.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/Agent.java
new file mode 100644
index 0000000..2eef735
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/Agent.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttr;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.internal.StatusLogger;
+
+/**
+ *
+ */
+@Plugin(name="Agent",type="Core",printObject=true)
+public class Agent {
+
+    private final String host;
+
+    private final int port;
+
+    private static final String DEFAULT_HOST = "localhost";
+
+    private static final int DEFAULT_PORT = 35853;
+
+    private static Logger logger = StatusLogger.getLogger();
+
+    private Agent(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+
+    @PluginFactory
+    public static Agent createAgent(@PluginAttr("host") String host,
+                                    @PluginAttr("port") String port) {
+        if (host == null) {
+            host = DEFAULT_HOST;
+        }
+
+        int portNum;
+        if (port != null) {
+            try {
+                portNum = Integer.parseInt(port);
+            } catch (Exception ex) {
+                logger.error("Error parsing port number " + port, ex);
+                return null;
+            }
+        } else {
+            portNum = DEFAULT_PORT;
+        }
+        return new Agent(host, portNum);
+    }
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
new file mode 100644
index 0000000..70004a0
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppender.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import org.apache.logging.log4j.core.Layout;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.AppenderBase;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttr;
+import org.apache.logging.log4j.core.config.plugins.PluginElement;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.filter.Filters;
+import org.apache.logging.log4j.core.layout.RFC5424Layout;
+
+import java.net.InetAddress;
+
+/**
+ *
+ */
+@Plugin(name="Flume",type="Core",elementType="appender",printObject=true)
+public class FlumeAvroAppender extends AppenderBase {
+
+    private FlumeAvroManager manager;
+
+    private final String mdcIncludes;
+    private final String mdcExcludes;
+    private final String mdcRequired;
+
+    private final String eventPrefix;
+
+    private final String mdcPrefix;
+
+    private final boolean compressBody;
+
+    private final String hostname;
+
+    private FlumeAvroAppender(String name, Filters filters, Layout layout, boolean handleException,
+                              String hostname, String includes, String excludes, String required, String mdcPrefix,
+                              String eventPrefix, boolean compress, FlumeAvroManager manager) {
+        super(name, filters, layout, handleException);
+        this.manager = manager;
+        this.mdcIncludes = includes;
+        this.mdcExcludes = excludes;
+        this.mdcRequired = required;
+        this.eventPrefix = eventPrefix;
+        this.mdcPrefix = mdcPrefix;
+        this.compressBody = compress;
+        this.hostname = hostname;
+    }
+
+    public void append(LogEvent event) {
+
+        FlumeEvent flumeEvent = new FlumeEvent(event, hostname, mdcIncludes, mdcExcludes, mdcRequired, mdcPrefix,
+            eventPrefix, compressBody);
+        flumeEvent.setBody(getLayout().format(flumeEvent));
+        manager.send(flumeEvent);
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+        manager.release();
+    }
+
+    @PluginFactory
+    public static FlumeAvroAppender createAppender(@PluginAttr("agents") Agent[] agents,
+                                                   @PluginAttr("reconnectionDelay") String delay,
+                                                   @PluginAttr("agentRetries") String agentRetries,
+                                                   @PluginAttr("name") String name,
+                                                   @PluginAttr("suppressExceptions") String suppress,
+                                                   @PluginAttr("mdcExcludes") String excludes,
+                                                   @PluginAttr("mdcIncludes") String includes,
+                                                   @PluginAttr("mdcRequired") String required,
+                                                   @PluginAttr("mdcPrefix") String mdcPrefix,
+                                                   @PluginAttr("eventPrefix") String eventPrefix,
+                                                   @PluginAttr("compress") String compressBody,
+                                                   @PluginElement("layout") Layout layout,
+                                                   @PluginElement("filters") Filters filters) {
+
+        String hostname;
+        try {
+            hostname = InetAddress.getLocalHost().getHostName();
+        } catch (Exception ex) {
+            logger.error("Unable to determine local hostname", ex);
+            return null;
+        }
+        if (agents == null || agents.length == 0) {
+            logger.debug("No agents provided, using defaults");
+            agents = new Agent[] {Agent.createAgent(null, null)};
+        }
+
+        boolean handleExceptions = suppress == null ? true : Boolean.valueOf(suppress);
+        boolean compress = compressBody == null ? true : Boolean.valueOf(compressBody);
+
+        int reconnectDelay = delay == null ? 0 : Integer.parseInt(delay);
+        int retries = agentRetries == null ? 0 : Integer.parseInt(agentRetries);
+
+        if (layout == null) {
+            layout = RFC5424Layout.createLayout(null, null, null, "True", null, null, null, null, excludes, includes,
+                required, null);
+        }
+
+        if (name == null) {
+            logger.error("No name provided for Appender");
+            return null;
+        }
+
+        FlumeAvroManager manager = FlumeAvroManager.getManager(agents, reconnectDelay, retries);
+        if (manager == null) {
+            return null;
+        }
+        return new FlumeAvroAppender(name, filters, layout,  handleExceptions, hostname, includes,
+            excludes, required, mdcPrefix, eventPrefix, compress, manager);
+    }
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroManager.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroManager.java
new file mode 100644
index 0000000..98d93c3
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroManager.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import com.cloudera.flume.handlers.avro.AvroFlumeEvent;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.appender.AbstractManager;
+import org.apache.logging.log4j.core.appender.AppenderRuntimeException;
+import org.apache.logging.log4j.core.appender.ManagerFactory;
+
+import com.cloudera.flume.handlers.avro.FlumeEventAvroServer;
+import com.cloudera.flume.handlers.avro.AvroEventConvertUtil;
+import org.apache.logging.log4j.internal.StatusLogger;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ *
+ */
+public class FlumeAvroManager extends AbstractManager {
+
+    private final int reconnectionDelay;
+
+    private FlumeEventAvroServer client;
+
+    private final Agent[] agents;
+
+    private final int retries;
+
+    private static final int DEFAULT_RECONNECTS = 3;
+
+    private int current = 0;
+
+    /**
+      The default reconnection delay (500 milliseconds or .5 seconds).
+     */
+    public static final int DEFAULT_RECONNECTION_DELAY   = 500;
+
+    private static ManagerFactory factory = new AvroManagerFactory();
+
+    private static Logger logger = StatusLogger.getLogger();
+
+    public static FlumeAvroManager getManager(Agent[] agents, int delay, int retries) {
+        if (agents == null || agents.length == 0) {
+            throw new IllegalArgumentException("At least one agent is required");
+        }
+        if (delay == 0) {
+            delay = DEFAULT_RECONNECTION_DELAY;
+        }
+        if (retries == 0) {
+            retries = DEFAULT_RECONNECTS;
+        }
+        StringBuilder sb = new StringBuilder("FlumeAvro[");
+        boolean first = true;
+        for (Agent agent : agents) {
+            if (!first) {
+                sb.append(",");
+            }
+            sb.append(agent.getHost()).append(":").append(agent.getPort());
+            first = false;
+        }
+        sb.append("]");
+        return (FlumeAvroManager) getManager(sb.toString(), factory, new FactoryData(agents, delay, retries));
+    }
+
+
+    public FlumeAvroManager(String name, Agent[] agents, int delay, int retries) {
+        super(name);
+        this.agents = agents;
+        this.client = connect(agents);
+        this.reconnectionDelay = delay;
+        this.retries = retries;
+    }
+
+    protected synchronized void send(FlumeEvent event)  {
+        AvroFlumeEvent avroEvent = AvroEventConvertUtil.toAvroEvent(event);
+        int i = 0;
+
+        String msg = "Error writing to " + getName();
+
+        do {
+            try {
+                client.append(avroEvent);
+                return;
+            } catch (Exception ex) {
+                if (i == retries - 1) {
+                    msg = "Error writing to " + getName() + " at " + agents[0].getHost() + ":" + agents[0].getPort();
+                    logger.warn(msg, ex);
+                    break;
+                }
+                sleep();
+            }
+        } while (++i < retries);
+
+        for (int index = 0; index < agents.length; ++index) {
+            if (index == current) {
+                continue;
+            }
+            Agent agent = agents[index];
+            i = 0;
+            do {
+                try {
+
+                    FlumeEventAvroServer c = connect(agent.getHost(), agent.getPort());
+                    c.append(avroEvent);
+                    client = c;
+                    current = i;
+                    return;
+                } catch (Exception ex) {
+                    if (i == retries - 1) {
+                        String warnMsg = "Error writing to " + getName() + " at " + agent.getHost() + ":" +
+                            agent.getPort();
+                        logger.warn(warnMsg, ex);
+                        break;
+                    }
+                    sleep();
+                }
+            } while (++i < retries);
+        }
+
+        throw new AppenderRuntimeException(msg);
+
+    }
+
+    private void sleep() {
+        try {
+            Thread.sleep(reconnectionDelay);
+        } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * There is a very good chance that this will always return the first agent even if it isn't available.
+     * @param agents The list of agents to choose from
+     * @return The FlumeEventAvroServer.
+     */
+    private FlumeEventAvroServer connect(Agent[] agents) {
+        int i = 0;
+        for (Agent agent : agents) {
+            FlumeEventAvroServer server = connect(agent.getHost(), agent.getPort());
+            if (server != null) {
+                current = i;
+                return server;
+            }
+            ++i;
+        }
+        throw new AppenderRuntimeException("Unable to connect to any agents");
+    }
+
+    public void releaseSub() {
+    }
+
+    private FlumeEventAvroServer connect(String hostname, int port) {
+        URL url;
+
+        try {
+            url = new URL("http", hostname, port, "/");
+        } catch (MalformedURLException ex) {
+            logger.error("Unable to create a URL for hostname " + hostname + " at port " + port, ex);
+            return null;
+        }
+
+        try {
+            return SpecificRequestor.getClient(FlumeEventAvroServer.class, new HttpTransceiver(url));
+        } catch (IOException ioe) {
+            logger.error("Unable to create Avro client");
+            return null;
+        }
+    }
+
+    private static class FactoryData {
+        Agent[] agents;
+        int delay;
+        int retries;
+
+        public FactoryData(Agent[] agents, int delay, int retries) {
+            this.agents = agents;
+            this.delay = delay;
+            this.retries = retries;
+        }
+    }
+
+    private static class AvroManagerFactory implements ManagerFactory<FlumeAvroManager, FactoryData> {
+
+        public FlumeAvroManager createManager(String name, FactoryData data) {
+            try {
+
+                return new FlumeAvroManager(name, data.agents, data.delay, data.retries);
+            } catch (Exception ex) {
+                logger.error("Could not create FlumeAvroManager", ex);
+            }
+            return null;
+        }
+    }
+
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
new file mode 100644
index 0000000..97c8a7a
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/FlumeEvent.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import com.cloudera.flume.core.EventBaseImpl;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Marker;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggingException;
+import org.apache.logging.log4j.message.Message;
+import org.apache.logging.log4j.message.StructuredDataId;
+import org.apache.logging.log4j.message.StructuredDataMessage;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.zip.GZIPOutputStream;
+
+/**
+ *
+ */
+class FlumeEvent extends EventBaseImpl implements LogEvent {
+
+    private final LogEvent event;
+
+    private byte[] body;
+
+    private final String hostname;
+
+    private final Map<String, Object> ctx = new HashMap<String, Object>();
+
+    private static final String DEFAULT_MDC_PREFIX = "mdc:";
+
+    private static final String DEFAULT_EVENT_PREFIX = "";
+
+    private static final String EVENT_TYPE = "EventType";
+
+    private static final String EVENT_ID = "EventId";
+
+    private static final String GUID = "guid";
+
+    private final boolean compress;
+
+    public FlumeEvent(LogEvent event, String hostname, String includes, String excludes, String required,
+                      String mdcPrefix, String eventPrefix, boolean compress) {
+        this.event = event;
+        this.hostname = hostname;
+        this.compress = compress;
+        if (mdcPrefix == null) {
+            mdcPrefix = DEFAULT_MDC_PREFIX;
+        }
+        if (eventPrefix == null) {
+            eventPrefix = DEFAULT_EVENT_PREFIX;
+        }
+        this.fields = new HashMap<String, byte[]>();
+        Map<String, Object> mdc = event.getContextMap();
+        if (includes != null) {
+            String[] array = includes.split(",");
+            if (array.length > 0) {
+                for (String str : array) {
+                    if (mdc.containsKey(str)) {
+                        ctx.put(str, mdc.get(str));
+                    }
+                }
+            }
+        } else if (excludes != null) {
+            String[] array = excludes.split(",");
+            if (array.length > 0) {
+                List<String> list = Arrays.asList(array);
+                for (Map.Entry<String, Object> entry : mdc.entrySet()) {
+                    if (!list.contains(entry.getKey())) {
+                        ctx.put(entry.getKey(), entry.getValue());
+                    }
+                }
+            }
+        }
+
+        if (required != null) {
+            String[] array = required.split(",");
+            if (array.length > 0) {
+                for (String str : array) {
+                    if (!mdc.containsKey(str)) {
+                        throw new LoggingException("Required key " + str + " is missing from the MDC");
+                    }
+                }
+            }
+        }
+        if (event.getMessage() instanceof StructuredDataMessage) {
+            StructuredDataMessage msg = (StructuredDataMessage) event.getMessage();
+            fields.put(eventPrefix + EVENT_TYPE, msg.getType().getBytes());
+            StructuredDataId id = msg.getId();
+            fields.put(eventPrefix + EVENT_ID, id.getName().getBytes());
+            Map<String, String> data = msg.getData();
+            for (Map.Entry<String, String> entry : data.entrySet()) {
+                fields.put(eventPrefix + entry.getKey(), entry.getValue().getBytes());
+            }
+        }
+
+        for (Map.Entry<String, Object> entry : ctx.entrySet()) {
+            fields.put(mdcPrefix + entry.getKey(), entry.getValue().toString().getBytes());
+        }
+
+        fields.put(GUID, UUIDUtil.getTimeBasedUUID().toString().getBytes());
+    }
+
+    public void setBody(byte[] body) {
+        if (body == null || body.length == 0) {
+            this.body = new byte[0];
+            return;
+        }
+        if (compress) {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            try {
+                GZIPOutputStream os = new GZIPOutputStream(baos);
+                os.write(body);
+                os.close();
+            } catch (IOException ioe) {
+                throw new LoggingException("Unable to compress message", ioe);
+            }
+            this.body = baos.toByteArray();
+        } else {
+            this.body = body;
+        }
+    }
+
+    @Override
+    public byte[] getBody() {
+        return this.body;
+    }
+
+    @Override
+    public Priority getPriority() {
+        switch (event.getLevel()) {
+            case INFO:
+                return Priority.INFO;
+            case ERROR:
+                return Priority.ERROR;
+            case DEBUG:
+                return Priority.DEBUG;
+            case WARN:
+                return Priority.WARN;
+            case TRACE:
+                return Priority.TRACE;
+            case FATAL:
+                return Priority.FATAL;
+        }
+        return Priority.INFO;
+    }
+
+    @Override
+    public long getTimestamp() {
+        return event.getMillis();
+    }
+
+    @Override
+    public long getNanos() {
+        return System.nanoTime();
+    }
+
+    @Override
+    public String getHost() {
+        return hostname;
+    }
+
+    public Level getLevel() {
+        return event.getLevel();
+    }
+
+    public String getLoggerName() {
+        return event.getLoggerName();
+    }
+
+    public StackTraceElement getSource() {
+        return event.getSource();
+    }
+
+    public Message getMessage() {
+        return event.getMessage();
+    }
+
+    public Marker getMarker() {
+        return event.getMarker();
+    }
+
+    public String getThreadName() {
+        return event.getThreadName();
+    }
+
+    public long getMillis() {
+        return event.getMillis();
+    }
+
+    public Throwable getThrown() {
+        return event.getThrown();
+    }
+
+    public Map<String, Object> getContextMap() {
+        return ctx;
+    }
+
+    public Stack<String> getContextStack() {
+        return event.getContextStack();
+    }
+}
diff --git a/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/UUIDUtil.java b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/UUIDUtil.java
new file mode 100644
index 0000000..7c78b64
--- /dev/null
+++ b/log4j2-core/src/main/java/org/apache/logging/log4j/core/appender/flume/UUIDUtil.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class UUIDUtil
+{
+    private static AtomicInteger count = new AtomicInteger(0);
+
+    private static final long VERSION = 0x9000L;
+
+    private static final byte VARIANT = (byte)0xC0;
+
+    private static long least;
+
+    static
+    {
+        byte[] mac = null;
+        try
+        {
+            InetAddress address = InetAddress.getLocalHost();
+
+            try {
+                NetworkInterface ni = NetworkInterface.getByInetAddress(address);
+                if (ni != null) {
+                    Method method = ni.getClass().getMethod("getHardwareAddress");
+                    mac = (byte[]) method.invoke(ni);
+                }
+            } catch (Exception ex) {
+                ex.printStackTrace();
+                // Ignore exception
+            }
+            if (mac == null || mac.length == 0) {
+                mac = address.getAddress();
+            }
+        }
+        catch (UnknownHostException e) {
+            // Ignore exception
+        }
+        if (mac == null || mac.length == 0) {
+            Random randomGenerator = new SecureRandom();
+            mac = new byte[6];
+            randomGenerator.nextBytes(mac);
+        }
+        int length = mac.length >= 6 ? 6 : mac.length;
+        int index = mac.length >= 6 ? mac.length - 6 : 0;
+        byte[] node = new byte[8];
+        node[0] = VARIANT;
+        node[1] = 0;
+        for (int i=2; i < 8 ; ++i) {
+            node[i] = 0;
+        }
+        System.arraycopy(mac, index, node, index + 2, length);
+        ByteBuffer buf = ByteBuffer.wrap(node);
+        least = buf.getLong();
+    }
+
+    private static String toHexString(byte[] bytes) {
+        char[] hexArray = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+        char[] hexChars = new char[bytes.length * 2];
+        int v;
+
+        for (int j = 0; j < bytes.length; j++) {
+            v = bytes[j] & 0xFF;
+            hexChars[j*2] = hexArray[v/16];
+            hexChars[j*2 + 1] = hexArray[v%16];
+        }
+        return new String(hexChars);
+    }
+
+    /* This class cannot be instantiated */
+    private UUIDUtil() {
+    }
+
+    /**
+     * Convert a UUID to a String with no dashes.
+     * @param uuid The UUID.
+     * @return The String version of the UUID with the '-' characters removed.
+     */
+    public static String getUUIDString(UUID uuid)
+    {
+        return StringUtils.replaceChars(uuid.toString(), "-", "");
+    }
+
+    /**
+     * Generates universally unique identifiers (UUIDs).
+     * UUID combines enough of the system information to make it unique across
+     * space and time. UUID string is composed of following fields:
+     * <ol>
+     * <li>Digits 1-12 are the lower 48 bits of the <code>System.currentTimeMillis()</code> call.
+     * This makes the UUID unique down to the millisecond for about 8,925 years.</li>
+     * <li>Digit 13 is the version (with a value of 9).</li>
+     * <li>Digits 14-16 are a sequence number that is incremented each time a UUID is generated.</li>
+     * <li>Digit 17 is the variant (with a value of 0xC)</li>
+     * <li>Digit 18 is zero.</li>
+     * <li>Digits 19-32 represent the system the application is running on.
+     * </ol>
+     *
+     * @return universally unique identifiers (UUID)
+     */
+    public static UUID getTimeBasedUUID()
+    {
+        int timeHi = count.incrementAndGet() & 0xfff;
+        long most = (System.currentTimeMillis() << 24) | VERSION | timeHi;
+
+        return new UUID(most, least);
+    }
+}
+
diff --git a/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java b/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
new file mode 100644
index 0000000..5dcd71f
--- /dev/null
+++ b/log4j2-core/src/test/java/org/apache/logging/log4j/core/appender/flume/FlumeAvroAppenderTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache license, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the license for the specific language governing permissions and
+ * limitations under the license.
+ */
+package org.apache.logging.log4j.core.appender.flume;
+
+import com.cloudera.flume.core.Event;
+import com.cloudera.flume.handlers.avro.AvroEventSource;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.zip.GZIPInputStream;
+
+/**
+ *
+ */
+public class FlumeAvroAppenderTest {
+
+    private LoggerContext ctx = (LoggerContext) LogManager.getContext();
+
+    private static final String LOGBACK_CONF = "logback.configurationFile";
+    private static final String LOGBACK_CONFIG = "logback-flume.xml";
+
+    private static final int testServerPort = 12345;
+    private static final int testEventCount = 100;
+
+    private AvroEventSource eventSource;
+    private Logger avroLogger;
+
+    @BeforeClass
+    public static void setupClass() {
+        System.setProperty(LOGBACK_CONF, LOGBACK_CONFIG);
+    }
+
+    @AfterClass
+    public static void cleanupClass() {
+        System.clearProperty(LOGBACK_CONF);
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        eventSource = new AvroEventSource(testServerPort);
+        avroLogger = (Logger) LogManager.getLogger("avrologger");
+        /*
+        * Clear out all other appenders associated with this logger to ensure we're
+        * only hitting the Avro appender.
+        */
+        removeAppenders(avroLogger);
+        eventSource.open();
+    }
+
+    @After
+    public void teardown() throws IOException {
+        removeAppenders(avroLogger);
+        eventSource.close();
+    }
+
+    @Test
+    public void testLog4jAvroAppender() throws InterruptedException, IOException {
+        Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+            null, null, null, null, "true", null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+
+        Assert.assertNotNull(avroLogger);
+
+        int loggedCount = 0;
+        int receivedCount = 0;
+
+        for (int i = 0; i < testEventCount; i++) {
+            avroLogger.info("test i:" + i);
+            loggedCount++;
+        }
+
+        /*
+        * We perform this in another thread so we can put a time SLA on it by using
+        * Future#get(). Internally, the AvroEventSource uses a BlockingQueue.
+        */
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Callable<Event> callable = new Callable<Event>() {
+
+            public Event call() throws Exception {
+                return eventSource.next();
+            }
+        };
+
+        for (int i = 0; i < loggedCount; i++) {
+            try {
+                Future<Event> future = executor.submit(callable);
+
+                /*
+                * We must receive events in less than 1 second. This should be more
+                * than enough as all events should be held in AvroEventSource's
+                * BlockingQueue.
+                */
+                Event event = future.get(1, TimeUnit.SECONDS);
+
+                Assert.assertNotNull(event);
+                Assert.assertNotNull(event.getBody());
+                String body = getBody(event);
+                Assert.assertTrue(body.endsWith("test i:" + i));
+
+                receivedCount++;
+            } catch (ExecutionException e) {
+                Assert.fail("Flume failed to handle an event: " + e.getMessage());
+                break;
+            } catch (TimeoutException e) {
+                Assert
+                    .fail("Flume failed to handle an event within the given time SLA: "
+                        + e.getMessage());
+                break;
+            } catch (InterruptedException e) {
+                Assert
+                    .fail("Flume source executor thread was interrupted. We count this as a failure.");
+                Thread.currentThread().interrupt();
+                break;
+            }
+        }
+
+        executor.shutdown();
+
+        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+            throw new IllegalStateException(
+                "Executor is refusing to shutdown cleanly");
+        }
+
+        Assert.assertEquals(loggedCount, receivedCount);
+    }
+
+    @Test
+    public void testConnectionRefused() {
+        Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(44000))};
+        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "100", "3", "avro", "false", null,
+            null, null, null, null, "true", null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+
+        boolean caughtException = false;
+
+        try {
+            avroLogger.info("message 1");
+        } catch (Throwable t) {
+            //logger.debug("Logging to a non-existant server failed (as expected)", t);
+
+            caughtException = true;
+        }
+
+        Assert.assertTrue(caughtException);
+    }
+
+    @Test
+    public void testReconnect() throws IOException {
+        Agent[] agents = new Agent[] {Agent.createAgent("localhost", Integer.toString(testServerPort))};
+        FlumeAvroAppender avroAppender = FlumeAvroAppender.createAppender(agents, "500", "10", "avro", "false", null,
+            null, null, null, null, "true", null, null);
+        avroAppender.start();
+        avroLogger.addAppender(avroAppender);
+        avroLogger.setLevel(Level.ALL);
+        avroLogger.info("message 1");
+
+        Event event = eventSource.next();
+
+        Assert.assertNotNull(event);
+        String body = getBody(event);
+        Assert.assertTrue(body.endsWith("message 1"));
+
+        eventSource.close();
+
+        Callable<Void> logCallable = new Callable<Void>() {
+
+            public Void call() throws Exception {
+                avroLogger.info("message 2");
+                return null;
+            }
+        };
+
+        ExecutorService logExecutor = Executors.newSingleThreadExecutor();
+
+        boolean caughtException = false;
+
+        try {
+            logExecutor.submit(logCallable);
+
+            Thread.sleep(1500);
+
+            eventSource.open();
+
+            logExecutor.shutdown();
+
+            if (!logExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+                throw new IllegalStateException(
+                    "Log executor is refusing to shutdown cleanly");
+            }
+        } catch (Throwable t) {
+            System.err.println("Failed to reestablish a connection and log to an avroSource");
+
+            caughtException = true;
+        }
+
+        Assert.assertFalse(caughtException);
+
+        event = eventSource.next();
+
+        Assert.assertNotNull(event);
+        body = getBody(event);
+        Assert.assertTrue(body.endsWith("message 2"));
+
+        caughtException = false;
+
+        try {
+            avroLogger.info("message 3");
+        } catch (Throwable t) {
+            System.err.println("Logging to a closed server failed (not expected)");
+
+            caughtException = true;
+        }
+
+        Assert.assertFalse(caughtException);
+
+        event = eventSource.next();
+
+        Assert.assertNotNull(event);
+        body = getBody(event);
+        Assert.assertTrue(body.endsWith("message 3"));
+    }
+
+
+    private void removeAppenders(Logger logger) {
+        Map<String,Appender> map = logger.getAppenders();
+        for (Map.Entry<String, Appender> entry : map.entrySet()) {
+            Appender app = entry.getValue();
+            avroLogger.removeAppender(app);
+            app.stop();
+        }
+    }
+
+    private Appender getAppender(Logger logger, String name) {
+        Map<String,Appender> map = logger.getAppenders();
+        return map.get(name);
+    }
+
+    private String getBody(Event event) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            InputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
+            int n = 0;
+            while (-1 != (n = is.read())) {
+                baos.write(n);
+            }
+            return new String(baos.toByteArray());
+
+    }
+}
diff --git a/log4j2-core/src/test/resources/logback-flume.xml b/log4j2-core/src/test/resources/logback-flume.xml
new file mode 100644
index 0000000..0042db3
--- /dev/null
+++ b/log4j2-core/src/test/resources/logback-flume.xml
@@ -0,0 +1,11 @@
+<configuration>
+ <appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
+   <encoder>
+     <Pattern>%d{ISO8601} %5p [%t] %c{0} %X{transactionId} - %m%n</Pattern>
+   </encoder>
+ </appender>
+
+ <root level="error">
+   <appender-ref ref="Console" />
+ </root>
+</configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f61ba78..724d017 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,6 +220,12 @@
       </plugin>
     </plugins>
   </reporting>
+  <repositories>
+    <repository>
+      <id>cloudera</id>
+      <url>https://repository.cloudera.com/content/repositories/releases/</url>
+    </repository>
+  </repositories>
   <distributionManagement>
     <site>
       <id>apache.website</id>