JSON receiver for ECS log messages (#9)

diff --git a/pom.xml b/pom.xml
index 4e2c490..537cd5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -456,6 +456,11 @@
       <version>1.18.0</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>com.owlike</groupId>
+      <artifactId>genson</artifactId>
+      <version>1.6</version>
+    </dependency>
   </dependencies>
 
   <reporting>
diff --git a/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java b/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java
index 42e0409..dbddddc 100644
--- a/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java
+++ b/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java
@@ -38,6 +38,7 @@
 import java.net.URL;
 import java.util.List;
 import java.util.Locale;
+import org.apache.log4j.net.JsonReceiver;
 
 
 /**
@@ -290,6 +291,7 @@
 
         networkReceiverClassNameComboBoxModel = new DefaultComboBoxModel<>();
         networkReceiverClassNameComboBoxModel.addElement(UDPReceiver.class.getName());
+        networkReceiverClassNameComboBoxModel.addElement(JsonReceiver.class.getName());
 
         networkReceiverClassNameComboBox = new JComboBox<>(networkReceiverClassNameComboBoxModel);
 
diff --git a/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java b/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java
index 8764cd1..1212f2c 100644
--- a/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java
+++ b/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java
@@ -50,6 +50,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import org.apache.log4j.net.JsonReceiver;
 
 
 /**
diff --git a/src/main/java/org/apache/log4j/net/ECSLogEvent.java b/src/main/java/org/apache/log4j/net/ECSLogEvent.java
new file mode 100644
index 0000000..87d41e6
--- /dev/null
+++ b/src/main/java/org/apache/log4j/net/ECSLogEvent.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.log4j.net;
+
+import com.owlike.genson.annotation.JsonProperty;
+import java.time.ZonedDateTime;
+import java.util.Hashtable;
+import java.util.List;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LocationInfo;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.spi.ThrowableInformation;
+
+/**
+ * Represents a LogEvent as from a ECS(ElasticSearch) event.
+ */
+public class ECSLogEvent {
+    @JsonProperty("@timestamp")
+    public String timestamp;
+    @JsonProperty("log.level")
+    public String level;
+    public String message;
+    @JsonProperty("process.thread.name")
+    public String thread_name;
+    @JsonProperty("log.logger")
+    public String logger;
+    public List<String> tags;
+
+    LoggingEvent toLoggingEvent(){
+        Logger logger;
+        long timeStamp;
+        Level level;
+        String ndc = null;
+        String[] exception = null;
+        String className = null;
+        String methodName = null;
+        String fileName = null;
+        String lineNumber = null;
+        Hashtable properties = null;
+
+        logger = Logger.getLogger(this.logger);
+        timeStamp = ZonedDateTime.parse(this.timestamp).toInstant().toEpochMilli();
+        level = Level.toLevel(this.level);
+
+        LocationInfo info;
+        if ((fileName != null)
+            || (className != null)
+            || (methodName != null)
+            || (lineNumber != null)) {
+            info = new LocationInfo(fileName, className, methodName, lineNumber);
+        } else {
+            info = LocationInfo.NA_LOCATION_INFO;
+        }
+        ThrowableInformation throwableInfo = null;
+        if (exception != null) {
+            throwableInfo = new ThrowableInformation(exception);
+        }
+
+        LoggingEvent loggingEvent = new LoggingEvent(null,
+                logger, timeStamp, level, message,
+                this.thread_name,
+                throwableInfo,
+                ndc,
+                info,
+                properties);
+
+        return loggingEvent;
+    }
+}
diff --git a/src/main/java/org/apache/log4j/net/JsonReceiver.java b/src/main/java/org/apache/log4j/net/JsonReceiver.java
new file mode 100644
index 0000000..2561ee0
--- /dev/null
+++ b/src/main/java/org/apache/log4j/net/JsonReceiver.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.log4j.net;
+
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.List;
+import java.util.Vector;
+import static org.apache.log4j.net.XMLSocketReceiver.ZONE;
+import org.apache.log4j.plugins.Pauseable;
+import org.apache.log4j.plugins.Plugin;
+import org.apache.log4j.plugins.Receiver;
+import org.apache.log4j.spi.LoggerRepository;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * The JsonReceiver class receives log events over a TCP socket(as JSON) and
+ * turns those into log events.
+ *
+ * @author Robert Middleton
+ */
+public class JsonReceiver extends Receiver implements Runnable, PortBased, Pauseable {
+    private boolean m_paused;
+    //default to log4j xml decoder
+    protected String m_decoder = "org.apache.log4j.xml.XMLDecoder";
+    private ServerSocket m_serverSocket;
+    private List<Socket> m_socketList = new Vector<>();
+    private Thread m_rxThread;
+    public static final int DEFAULT_PORT = 4449;
+    protected int m_port = DEFAULT_PORT;
+    private boolean m_advertiseViaMulticastDNS;
+    private ZeroConfSupport m_zeroConf;
+
+    /**
+     * The MulticastDNS zone advertised by an XMLSocketReceiver
+     */
+    public static final String ZONE = "_log4j_json_tcpaccept_receiver.local.";
+
+    public JsonReceiver() {
+    }
+
+    public JsonReceiver(int _port) {
+        m_port = _port;
+    }
+
+    public JsonReceiver(int _port, LoggerRepository _repository) {
+        m_port = _port;
+        repository = _repository;
+    }
+
+    @Override
+    public void shutdown() {
+        // mark this as no longer running
+        active = false;
+
+        if (m_rxThread != null) {
+            m_rxThread.interrupt();
+            m_rxThread = null;
+        }
+        doShutdown();
+    }
+
+    /**
+     * Does the actual shutting down by closing the server socket
+     * and any connected sockets that have been created.
+     */
+    private synchronized void doShutdown() {
+        active = false;
+
+        getLogger().debug("{} doShutdown called", getName());
+
+        // close the server socket
+        closeServerSocket();
+
+        // close all of the accepted sockets
+        closeAllAcceptedSockets();
+
+        if (m_advertiseViaMulticastDNS) {
+            m_zeroConf.unadvertise();
+        }
+    }
+
+    /**
+     * Closes the server socket, if created.
+     */
+    private void closeServerSocket() {
+        getLogger().debug("{} closing server socket", getName());
+
+        try {
+            if (m_serverSocket != null) {
+                m_serverSocket.close();
+            }
+        } catch (Exception e) {
+            // ignore for now
+        }
+
+        m_serverSocket = null;
+    }
+
+    /**
+     * Closes all the connected sockets in the List.
+     */
+    private synchronized void closeAllAcceptedSockets() {
+        for (Socket sock : m_socketList) {
+            try {
+                sock.close();
+            } catch (Exception e) {
+                // ignore for now
+            }
+        }
+
+        // clear member variables
+        m_socketList.clear();
+    }
+
+    @Override
+    public void activateOptions() {
+        if (!isActive()) {
+            m_rxThread = new Thread(this);
+            m_rxThread.setDaemon(true);
+            m_rxThread.start();
+
+            if (m_advertiseViaMulticastDNS) {
+                m_zeroConf = new ZeroConfSupport(ZONE, m_port, getName());
+                m_zeroConf.advertise();
+            }
+
+            active = true;
+        }
+    }
+
+    @Override
+    public void run() {
+        /**
+         * Ensure we start fresh.
+         */
+        getLogger().debug("performing socket cleanup prior to entering loop for {}", name);
+        closeServerSocket();
+        closeAllAcceptedSockets();
+        getLogger().debug("socket cleanup complete for {}", name);
+        active = true;
+
+        // start the server socket
+        try {
+            m_serverSocket = new ServerSocket(m_port);
+        } catch (Exception e) {
+            getLogger().error(
+                "error starting JsonReceiver (" + this.getName()
+                    + "), receiver did not start", e);
+            active = false;
+            doShutdown();
+
+            return;
+        }
+
+        Socket socket = null;
+
+        try {
+            getLogger().debug("in run-about to enter while isactiveloop");
+
+            active = true;
+
+            while (!m_rxThread.isInterrupted()) {
+                // if we have a socket, start watching it
+                if (socket != null) {
+                    getLogger().debug("socket not null - creating and starting socketnode");
+                    m_socketList.add(socket);
+
+                    JsonSocketNode node = new JsonSocketNode(socket, this);
+                    node.setLoggerRepository(this.repository);
+                    new Thread(node).start();
+                }
+
+                getLogger().debug("waiting to accept socket");
+
+                // wait for a socket to open, then loop to start it
+                socket = m_serverSocket.accept();
+                getLogger().debug("accepted socket");
+            }
+
+            // socket not watched because we a no longer running
+            // so close it now.
+            if (socket != null) {
+                socket.close();
+            }
+        } catch (Exception e) {
+            getLogger().warn(
+                "socket server disconnected, stopping");
+        }
+    }
+
+    @Override
+    public int getPort() {
+        return m_port;
+    }
+
+    @Override
+    public void setPaused(boolean paused) {
+        m_paused = paused;
+    }
+
+    @Override
+    public boolean isPaused() {
+        return m_paused;
+    }
+
+    public boolean isEquivalent(Plugin testPlugin) {
+        if ((testPlugin != null) && testPlugin instanceof JsonReceiver) {
+            JsonReceiver sReceiver = (JsonReceiver) testPlugin;
+
+            return (m_port == sReceiver.getPort() && super.isEquivalent(testPlugin));
+        }
+
+        return false;
+    }
+
+    public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
+        m_advertiseViaMulticastDNS = advertiseViaMulticastDNS;
+    }
+
+    public boolean isAdvertiseViaMulticastDNS() {
+        return m_advertiseViaMulticastDNS;
+    }
+
+    @Override
+    public void doPost(LoggingEvent event) {
+        if (!isPaused()) {
+            super.doPost(event);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/log4j/net/JsonSocketNode.java b/src/main/java/org/apache/log4j/net/JsonSocketNode.java
new file mode 100644
index 0000000..240f8dd
--- /dev/null
+++ b/src/main/java/org/apache/log4j/net/JsonSocketNode.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.log4j.net;
+
+import com.owlike.genson.Genson;
+import com.owlike.genson.GensonBuilder;
+import com.owlike.genson.stream.ObjectReader;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.Constants;
+import org.apache.log4j.plugins.Receiver;
+import org.apache.log4j.spi.ComponentBase;
+import org.apache.log4j.spi.Decoder;
+import org.apache.log4j.spi.LoggerRepository;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * Read {@link LoggingEvent} objects sent from a remote client using JSON over
+ * Sockets (TCP). These logging events are logged according to local
+ * policy, as if they were generated locally.
+ */
+public class JsonSocketNode extends ComponentBase implements Runnable {
+    Socket m_socket;
+    Receiver m_receiver;
+    SocketNodeEventListener m_listener;
+    private List<Byte> m_jsonBuffer;
+
+    /**
+     * Constructor for socket and logger repository.
+     */
+    public JsonSocketNode(
+        Socket socket, LoggerRepository hierarchy) {
+        this.repository = hierarchy;
+
+        this.m_socket = socket;
+    }
+
+    /**
+     * Constructor for socket and reciever.
+     */
+    public JsonSocketNode(Socket socket, Receiver receiver) {
+        this.m_socket = socket;
+        this.m_receiver = receiver;
+    }
+
+    /**
+     * Set the event listener on this node.
+     */
+    public void setListener(SocketNodeEventListener _listener) {
+        m_listener = _listener;
+    }
+
+    public void run() {
+        Logger remoteLogger;
+        Exception listenerException = null;
+        InputStream is;
+
+        if ((this.m_receiver == null) ) {
+            listenerException =
+                new Exception(
+                    "No receiver provided.  Cannot process JSON socket events");
+            getLogger().error(
+                "Exception constructing JSON Socket Receiver", listenerException);
+        }
+
+        m_jsonBuffer = new ArrayList<>( 8192 );
+
+        try {
+            is = m_socket.getInputStream();
+        } catch (Exception e) {
+            is = null;
+            listenerException = e;
+            getLogger().error("Exception opening InputStream to " + m_socket, e);
+        }
+
+        if (is != null) {
+            String hostName = m_socket.getInetAddress().getHostName();
+            String remoteInfo = hostName + ":" + m_socket.getPort();
+            Genson genson = new GensonBuilder()
+                    .useDateAsTimestamp(true)
+                    .create();
+
+            try {
+                //read data from the socket.
+                // Once we have a full JSON message, parse it
+                while (true) {
+                    getLogger().debug( "About to deserialize values" );
+                    Iterator<ECSLogEvent> iter = genson.deserializeValues(is, ECSLogEvent.class);
+                    // Because the socket can be closed, if we don't have anything parsed
+                    // assume that the socket is closed.
+                    if( !iter.hasNext() ) break;
+                    while( iter.hasNext() ){
+                        ECSLogEvent evt = iter.next();
+                        LoggingEvent e = evt.toLoggingEvent();
+                        e.setProperty(Constants.HOSTNAME_KEY, hostName);
+
+                        // store the known remote info in an event property
+                        e.setProperty("log4j.remoteSourceInfo", remoteInfo);
+
+                        // if configured with a receiver, tell it to post the event
+                        if (m_receiver != null) {
+                            m_receiver.doPost(e);
+
+                            // else post it via the hierarchy
+                        } else {
+                            // get a logger from the hierarchy. The name of the logger
+                            // is taken to be the name contained in the event.
+                            remoteLogger = repository.getLogger(e.getLoggerName());
+
+                            //event.logger = remoteLogger;
+                            // apply the logger-level filter
+                            if (
+                                e.getLevel().isGreaterOrEqual(
+                                    remoteLogger.getEffectiveLevel())) {
+                                // finally log the event as if was generated locally
+                                remoteLogger.callAppenders(e);
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                getLogger().error("Unexpected exception. Closing connection.", e);
+                listenerException = e;
+            }
+        }
+
+        // close the socket
+        try {
+            if (is != null) {
+                is.close();
+            }
+        } catch (Exception e) {
+            //logger.info("Could not close connection.", e);
+        }
+
+        // send event to listener, if configured
+        if (m_listener != null) {
+            m_listener.socketClosedEvent(listenerException);
+        }
+    }
+}
diff --git a/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers b/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers
index bc13efb..ee0b203 100644
--- a/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers
+++ b/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers
@@ -25,3 +25,4 @@
 org.apache.log4j.varia.LogFilePatternReceiver
 org.apache.log4j.xml.LogFileXMLReceiver
 org.apache.log4j.chainsaw.vfs.VFSLogFilePatternReceiver
+org.apache.log4j.net.JsonReceiver
diff --git a/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java b/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java
index 82dc7e6..0fd244b 100644
--- a/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java
+++ b/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java
@@ -46,7 +46,8 @@
 
         Class[] expectedList =
             new Class[] {
-                MulticastReceiver.class, 
+                MulticastReceiver.class,
+                JsonReceiver.class,
                 UDPReceiver.class,
                 XMLSocketReceiver.class,
                 LogFilePatternReceiver.class,