JSON receiver for ECS log messages (#9)
diff --git a/pom.xml b/pom.xml
index 4e2c490..537cd5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -456,6 +456,11 @@
<version>1.18.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.owlike</groupId>
+ <artifactId>genson</artifactId>
+ <version>1.6</version>
+ </dependency>
</dependencies>
<reporting>
diff --git a/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java b/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java
index 42e0409..dbddddc 100644
--- a/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java
+++ b/src/main/java/org/apache/log4j/chainsaw/ReceiverConfigurationPanel.java
@@ -38,6 +38,7 @@
import java.net.URL;
import java.util.List;
import java.util.Locale;
+import org.apache.log4j.net.JsonReceiver;
/**
@@ -290,6 +291,7 @@
networkReceiverClassNameComboBoxModel = new DefaultComboBoxModel<>();
networkReceiverClassNameComboBoxModel.addElement(UDPReceiver.class.getName());
+ networkReceiverClassNameComboBoxModel.addElement(JsonReceiver.class.getName());
networkReceiverClassNameComboBox = new JComboBox<>(networkReceiverClassNameComboBoxModel);
diff --git a/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java b/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java
index 8764cd1..1212f2c 100644
--- a/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java
+++ b/src/main/java/org/apache/log4j/chainsaw/receivers/ReceiversPanel.java
@@ -50,6 +50,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import org.apache.log4j.net.JsonReceiver;
/**
diff --git a/src/main/java/org/apache/log4j/net/ECSLogEvent.java b/src/main/java/org/apache/log4j/net/ECSLogEvent.java
new file mode 100644
index 0000000..87d41e6
--- /dev/null
+++ b/src/main/java/org/apache/log4j/net/ECSLogEvent.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.log4j.net;
+
+import com.owlike.genson.annotation.JsonProperty;
+import java.time.ZonedDateTime;
+import java.util.Hashtable;
+import java.util.List;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LocationInfo;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.log4j.spi.ThrowableInformation;
+
+/**
+ * Represents a LogEvent as from a ECS(ElasticSearch) event.
+ */
+public class ECSLogEvent {
+ @JsonProperty("@timestamp")
+ public String timestamp;
+ @JsonProperty("log.level")
+ public String level;
+ public String message;
+ @JsonProperty("process.thread.name")
+ public String thread_name;
+ @JsonProperty("log.logger")
+ public String logger;
+ public List<String> tags;
+
+ LoggingEvent toLoggingEvent(){
+ Logger logger;
+ long timeStamp;
+ Level level;
+ String ndc = null;
+ String[] exception = null;
+ String className = null;
+ String methodName = null;
+ String fileName = null;
+ String lineNumber = null;
+ Hashtable properties = null;
+
+ logger = Logger.getLogger(this.logger);
+ timeStamp = ZonedDateTime.parse(this.timestamp).toInstant().toEpochMilli();
+ level = Level.toLevel(this.level);
+
+ LocationInfo info;
+ if ((fileName != null)
+ || (className != null)
+ || (methodName != null)
+ || (lineNumber != null)) {
+ info = new LocationInfo(fileName, className, methodName, lineNumber);
+ } else {
+ info = LocationInfo.NA_LOCATION_INFO;
+ }
+ ThrowableInformation throwableInfo = null;
+ if (exception != null) {
+ throwableInfo = new ThrowableInformation(exception);
+ }
+
+ LoggingEvent loggingEvent = new LoggingEvent(null,
+ logger, timeStamp, level, message,
+ this.thread_name,
+ throwableInfo,
+ ndc,
+ info,
+ properties);
+
+ return loggingEvent;
+ }
+}
diff --git a/src/main/java/org/apache/log4j/net/JsonReceiver.java b/src/main/java/org/apache/log4j/net/JsonReceiver.java
new file mode 100644
index 0000000..2561ee0
--- /dev/null
+++ b/src/main/java/org/apache/log4j/net/JsonReceiver.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.log4j.net;
+
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.List;
+import java.util.Vector;
+import static org.apache.log4j.net.XMLSocketReceiver.ZONE;
+import org.apache.log4j.plugins.Pauseable;
+import org.apache.log4j.plugins.Plugin;
+import org.apache.log4j.plugins.Receiver;
+import org.apache.log4j.spi.LoggerRepository;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * The JsonReceiver class receives log events over a TCP socket(as JSON) and
+ * turns those into log events.
+ *
+ * @author Robert Middleton
+ */
+public class JsonReceiver extends Receiver implements Runnable, PortBased, Pauseable {
+ private boolean m_paused;
+ //default to log4j xml decoder
+ protected String m_decoder = "org.apache.log4j.xml.XMLDecoder";
+ private ServerSocket m_serverSocket;
+ private List<Socket> m_socketList = new Vector<>();
+ private Thread m_rxThread;
+ public static final int DEFAULT_PORT = 4449;
+ protected int m_port = DEFAULT_PORT;
+ private boolean m_advertiseViaMulticastDNS;
+ private ZeroConfSupport m_zeroConf;
+
+ /**
+ * The MulticastDNS zone advertised by an XMLSocketReceiver
+ */
+ public static final String ZONE = "_log4j_json_tcpaccept_receiver.local.";
+
+ public JsonReceiver() {
+ }
+
+ public JsonReceiver(int _port) {
+ m_port = _port;
+ }
+
+ public JsonReceiver(int _port, LoggerRepository _repository) {
+ m_port = _port;
+ repository = _repository;
+ }
+
+ @Override
+ public void shutdown() {
+ // mark this as no longer running
+ active = false;
+
+ if (m_rxThread != null) {
+ m_rxThread.interrupt();
+ m_rxThread = null;
+ }
+ doShutdown();
+ }
+
+ /**
+ * Does the actual shutting down by closing the server socket
+ * and any connected sockets that have been created.
+ */
+ private synchronized void doShutdown() {
+ active = false;
+
+ getLogger().debug("{} doShutdown called", getName());
+
+ // close the server socket
+ closeServerSocket();
+
+ // close all of the accepted sockets
+ closeAllAcceptedSockets();
+
+ if (m_advertiseViaMulticastDNS) {
+ m_zeroConf.unadvertise();
+ }
+ }
+
+ /**
+ * Closes the server socket, if created.
+ */
+ private void closeServerSocket() {
+ getLogger().debug("{} closing server socket", getName());
+
+ try {
+ if (m_serverSocket != null) {
+ m_serverSocket.close();
+ }
+ } catch (Exception e) {
+ // ignore for now
+ }
+
+ m_serverSocket = null;
+ }
+
+ /**
+ * Closes all the connected sockets in the List.
+ */
+ private synchronized void closeAllAcceptedSockets() {
+ for (Socket sock : m_socketList) {
+ try {
+ sock.close();
+ } catch (Exception e) {
+ // ignore for now
+ }
+ }
+
+ // clear member variables
+ m_socketList.clear();
+ }
+
+ @Override
+ public void activateOptions() {
+ if (!isActive()) {
+ m_rxThread = new Thread(this);
+ m_rxThread.setDaemon(true);
+ m_rxThread.start();
+
+ if (m_advertiseViaMulticastDNS) {
+ m_zeroConf = new ZeroConfSupport(ZONE, m_port, getName());
+ m_zeroConf.advertise();
+ }
+
+ active = true;
+ }
+ }
+
+ @Override
+ public void run() {
+ /**
+ * Ensure we start fresh.
+ */
+ getLogger().debug("performing socket cleanup prior to entering loop for {}", name);
+ closeServerSocket();
+ closeAllAcceptedSockets();
+ getLogger().debug("socket cleanup complete for {}", name);
+ active = true;
+
+ // start the server socket
+ try {
+ m_serverSocket = new ServerSocket(m_port);
+ } catch (Exception e) {
+ getLogger().error(
+ "error starting JsonReceiver (" + this.getName()
+ + "), receiver did not start", e);
+ active = false;
+ doShutdown();
+
+ return;
+ }
+
+ Socket socket = null;
+
+ try {
+ getLogger().debug("in run-about to enter while isactiveloop");
+
+ active = true;
+
+ while (!m_rxThread.isInterrupted()) {
+ // if we have a socket, start watching it
+ if (socket != null) {
+ getLogger().debug("socket not null - creating and starting socketnode");
+ m_socketList.add(socket);
+
+ JsonSocketNode node = new JsonSocketNode(socket, this);
+ node.setLoggerRepository(this.repository);
+ new Thread(node).start();
+ }
+
+ getLogger().debug("waiting to accept socket");
+
+ // wait for a socket to open, then loop to start it
+ socket = m_serverSocket.accept();
+ getLogger().debug("accepted socket");
+ }
+
+ // socket not watched because we a no longer running
+ // so close it now.
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (Exception e) {
+ getLogger().warn(
+ "socket server disconnected, stopping");
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return m_port;
+ }
+
+ @Override
+ public void setPaused(boolean paused) {
+ m_paused = paused;
+ }
+
+ @Override
+ public boolean isPaused() {
+ return m_paused;
+ }
+
+ public boolean isEquivalent(Plugin testPlugin) {
+ if ((testPlugin != null) && testPlugin instanceof JsonReceiver) {
+ JsonReceiver sReceiver = (JsonReceiver) testPlugin;
+
+ return (m_port == sReceiver.getPort() && super.isEquivalent(testPlugin));
+ }
+
+ return false;
+ }
+
+ public void setAdvertiseViaMulticastDNS(boolean advertiseViaMulticastDNS) {
+ m_advertiseViaMulticastDNS = advertiseViaMulticastDNS;
+ }
+
+ public boolean isAdvertiseViaMulticastDNS() {
+ return m_advertiseViaMulticastDNS;
+ }
+
+ @Override
+ public void doPost(LoggingEvent event) {
+ if (!isPaused()) {
+ super.doPost(event);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/log4j/net/JsonSocketNode.java b/src/main/java/org/apache/log4j/net/JsonSocketNode.java
new file mode 100644
index 0000000..240f8dd
--- /dev/null
+++ b/src/main/java/org/apache/log4j/net/JsonSocketNode.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.log4j.net;
+
+import com.owlike.genson.Genson;
+import com.owlike.genson.GensonBuilder;
+import com.owlike.genson.stream.ObjectReader;
+import org.apache.log4j.Logger;
+import org.apache.log4j.helpers.Constants;
+import org.apache.log4j.plugins.Receiver;
+import org.apache.log4j.spi.ComponentBase;
+import org.apache.log4j.spi.Decoder;
+import org.apache.log4j.spi.LoggerRepository;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * Read {@link LoggingEvent} objects sent from a remote client using JSON over
+ * Sockets (TCP). These logging events are logged according to local
+ * policy, as if they were generated locally.
+ */
+public class JsonSocketNode extends ComponentBase implements Runnable {
+ Socket m_socket;
+ Receiver m_receiver;
+ SocketNodeEventListener m_listener;
+ private List<Byte> m_jsonBuffer;
+
+ /**
+ * Constructor for socket and logger repository.
+ */
+ public JsonSocketNode(
+ Socket socket, LoggerRepository hierarchy) {
+ this.repository = hierarchy;
+
+ this.m_socket = socket;
+ }
+
+ /**
+ * Constructor for socket and reciever.
+ */
+ public JsonSocketNode(Socket socket, Receiver receiver) {
+ this.m_socket = socket;
+ this.m_receiver = receiver;
+ }
+
+ /**
+ * Set the event listener on this node.
+ */
+ public void setListener(SocketNodeEventListener _listener) {
+ m_listener = _listener;
+ }
+
+ public void run() {
+ Logger remoteLogger;
+ Exception listenerException = null;
+ InputStream is;
+
+ if ((this.m_receiver == null) ) {
+ listenerException =
+ new Exception(
+ "No receiver provided. Cannot process JSON socket events");
+ getLogger().error(
+ "Exception constructing JSON Socket Receiver", listenerException);
+ }
+
+ m_jsonBuffer = new ArrayList<>( 8192 );
+
+ try {
+ is = m_socket.getInputStream();
+ } catch (Exception e) {
+ is = null;
+ listenerException = e;
+ getLogger().error("Exception opening InputStream to " + m_socket, e);
+ }
+
+ if (is != null) {
+ String hostName = m_socket.getInetAddress().getHostName();
+ String remoteInfo = hostName + ":" + m_socket.getPort();
+ Genson genson = new GensonBuilder()
+ .useDateAsTimestamp(true)
+ .create();
+
+ try {
+ //read data from the socket.
+ // Once we have a full JSON message, parse it
+ while (true) {
+ getLogger().debug( "About to deserialize values" );
+ Iterator<ECSLogEvent> iter = genson.deserializeValues(is, ECSLogEvent.class);
+ // Because the socket can be closed, if we don't have anything parsed
+ // assume that the socket is closed.
+ if( !iter.hasNext() ) break;
+ while( iter.hasNext() ){
+ ECSLogEvent evt = iter.next();
+ LoggingEvent e = evt.toLoggingEvent();
+ e.setProperty(Constants.HOSTNAME_KEY, hostName);
+
+ // store the known remote info in an event property
+ e.setProperty("log4j.remoteSourceInfo", remoteInfo);
+
+ // if configured with a receiver, tell it to post the event
+ if (m_receiver != null) {
+ m_receiver.doPost(e);
+
+ // else post it via the hierarchy
+ } else {
+ // get a logger from the hierarchy. The name of the logger
+ // is taken to be the name contained in the event.
+ remoteLogger = repository.getLogger(e.getLoggerName());
+
+ //event.logger = remoteLogger;
+ // apply the logger-level filter
+ if (
+ e.getLevel().isGreaterOrEqual(
+ remoteLogger.getEffectiveLevel())) {
+ // finally log the event as if was generated locally
+ remoteLogger.callAppenders(e);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ getLogger().error("Unexpected exception. Closing connection.", e);
+ listenerException = e;
+ }
+ }
+
+ // close the socket
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (Exception e) {
+ //logger.info("Could not close connection.", e);
+ }
+
+ // send event to listener, if configured
+ if (m_listener != null) {
+ m_listener.socketClosedEvent(listenerException);
+ }
+ }
+}
diff --git a/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers b/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers
index bc13efb..ee0b203 100644
--- a/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers
+++ b/src/main/resources/org/apache/log4j/chainsaw/receivers/known.receivers
@@ -25,3 +25,4 @@
org.apache.log4j.varia.LogFilePatternReceiver
org.apache.log4j.xml.LogFileXMLReceiver
org.apache.log4j.chainsaw.vfs.VFSLogFilePatternReceiver
+org.apache.log4j.net.JsonReceiver
diff --git a/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java b/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java
index 82dc7e6..0fd244b 100644
--- a/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java
+++ b/src/test/java/org/apache/log4j/chainsaw/receivers/ReceiversHelperTest.java
@@ -46,7 +46,8 @@
Class[] expectedList =
new Class[] {
- MulticastReceiver.class,
+ MulticastReceiver.class,
+ JsonReceiver.class,
UDPReceiver.class,
XMLSocketReceiver.class,
LogFilePatternReceiver.class,