APEXCORE-704 Add supporting of programmatic logger appender
Implemented supporting of a programmatic logger appender that can be added to Apex Application Master and Containers and be configurable programmatically. The new programmatic appender can be defined in Java code or via a value of the new Apex attribute "LOGGER_APPENDER".
The syntax of the attribute value: {appender-names};{logger-properties}
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index a3b2f97..1d7b7b1 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -29,6 +29,8 @@
import com.google.common.base.Throwables;
+import static com.datatorrent.api.StreamingApplication.APEX_PREFIX;
+
/**
* Attribute represents the attribute which can be set on various components in the system.
*
@@ -88,6 +90,11 @@
return "attr" + name.substring(name.lastIndexOf('.'));
}
+ public String getLongName()
+ {
+ return APEX_PREFIX + getSimpleName().replaceAll("_",".").toLowerCase();
+ }
+
public String getSimpleName()
{
return name.substring(name.lastIndexOf('.') + 1);
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 743f0f1..ff1a2d4 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -393,6 +393,10 @@
*/
Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<>(String2String.getInstance());
/**
+ * The options of dynamic apex logger appender
+ */
+ Attribute<String> LOGGER_APPENDER = new Attribute<>(String2String.getInstance());
+ /**
* The amount of memory to be requested for the application master. Not used in local mode.
* Default value is 1GB.
*/
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index 76c1407..dce648b 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -230,6 +230,11 @@
}
}
+ String loggerAppender = dag.getValue(LogicalPlan.LOGGER_APPENDER);
+ if (loggerAppender != null) {
+ vargs.add(String.format("-D%s=\"%s\"", LogicalPlan.LOGGER_APPENDER.getLongName(), loggerAppender));
+ }
+
List<DAG.OperatorMeta> operatorMetaList = Lists.newArrayList();
int bufferServerMemory = 0;
for (PTOperator operator : sca.getContainer().getOperators()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index b280aad..22a1c63 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -576,6 +576,11 @@
vargs.add("-Dlog4j.debug=true");
}
+ String loggerAppender = dag.getValue(LogicalPlan.LOGGER_APPENDER);
+ if (loggerAppender != null) {
+ vargs.add(String.format("-D%s=\"%s\"", LogicalPlan.LOGGER_APPENDER.getLongName(), loggerAppender));
+ }
+
String loggersLevel = conf.get(StramUtils.DT_LOGGERS_LEVEL);
if (loggersLevel != null) {
vargs.add(String.format("-D%s=%s", StramUtils.DT_LOGGERS_LEVEL, loggersLevel));
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 9a7b128..787f20b 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -124,6 +124,7 @@
import com.datatorrent.stram.plan.logical.requests.SetStreamAttributeRequest;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.JSONSerializationProvider;
+import com.datatorrent.stram.util.LoggerUtil;
import com.datatorrent.stram.util.SecurityUtils;
import com.datatorrent.stram.util.VersionInfo;
import com.datatorrent.stram.util.WebServicesClient;
@@ -4128,6 +4129,7 @@
public static void main(final String[] args) throws Exception
{
+ LoggerUtil.addAppenders();
final ApexCli shell = new ApexCli();
shell.preImpersonationInit(args);
String hadoopUserName = System.getenv("HADOOP_USER_NAME");
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
index 1a14c90..014ba6e 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
@@ -26,6 +26,8 @@
import org.apache.log4j.Appender;
import org.apache.log4j.RollingFileAppender;
+import com.datatorrent.stram.util.LoggerUtil;
+
/**
* <p>StdOutErrLog class.</p>
*
@@ -52,6 +54,7 @@
logger.warn("found appender {} instead of RollingFileAppender", appender);
}
+ LoggerUtil.addAppenders();
System.setOut(createLoggingProxy(System.out));
System.setErr(createLoggingProxy(System.err));
}
diff --git a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
index ffe9c8c..14662e1 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
@@ -19,9 +19,15 @@
package com.datatorrent.stram.util;
import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
@@ -35,6 +41,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.spi.DefaultRepositorySelector;
import org.apache.log4j.spi.HierarchyEventListener;
import org.apache.log4j.spi.LoggerFactory;
@@ -45,6 +52,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import static com.datatorrent.api.Context.DAGContext.LOGGER_APPENDER;
+
/**
* @since 3.5.0
*/
@@ -187,8 +196,6 @@
}
}
- private static FileAppender fileAppender;
- private static boolean shouldFetchLogFileInfo;
static {
logger.debug("initializing LoggerUtil");
initializeLogger();
@@ -198,8 +205,6 @@
static void initializeLogger()
{
LogManager.setRepositorySelector(new DefaultRepositorySelector(new DelegatingLoggerRepository(LogManager.getLoggerRepository())), null);
- fileAppender = getFileAppender();
- shouldFetchLogFileInfo = shouldFetchLogFileInformation();
}
private static synchronized Level getLevelFor(String name)
@@ -300,7 +305,8 @@
*/
public static LogFileInformation getLogFileInformation()
{
- if (shouldFetchLogFileInfo) {
+ FileAppender fileAppender = getFileAppender();
+ if (shouldFetchLogFileInformation(fileAppender)) {
File logFile = new File(fileAppender.getFile());
LogFileInformation logFileInfo = new LogFileInformation(fileAppender.getFile(), logFile.length());
return logFileInfo;
@@ -331,9 +337,9 @@
* we have single file Appender, the logging level of appender is set to level Error or above and immediateFlush is set to true.
* In future we should be able to enhance this feature to support multiple file appenders.
*/
- private static boolean shouldFetchLogFileInformation()
+ private static boolean shouldFetchLogFileInformation(FileAppender fileAppender)
{
- if (fileAppender != null && isErrorLevelEnable() && fileAppender.getImmediateFlush()) {
+ if (fileAppender != null && isErrorLevelEnable(fileAppender) && fileAppender.getImmediateFlush()) {
return true;
}
logger.warn(
@@ -341,7 +347,7 @@
return false;
}
- private static boolean isErrorLevelEnable()
+ private static boolean isErrorLevelEnable(FileAppender fileAppender)
{
if (fileAppender != null) {
Level p = (Level)fileAppender.getThreshold();
@@ -355,4 +361,107 @@
return false;
}
+ /**
+ * Adds Logger Appender
+ * @param name Appender name
+ * @param properties Appender properties
+ * @return True if the appender has been added successfully
+ */
+ public static boolean addAppender(String name, Properties properties)
+ {
+ if (getAppendersNames().contains(name)) {
+ logger.warn("A logger appender with the name '{}' exists. Cannot add a new logger appender with the same name", name);
+ } else {
+ try {
+ Method method = PropertyConfigurator.class.getDeclaredMethod("parseAppender", Properties.class, String.class);
+ method.setAccessible(true);
+ Appender appender = (Appender)method.invoke(new PropertyConfigurator(), properties, name);
+ if (appender == null) {
+ logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties);
+ } else {
+ LogManager.getRootLogger().addAppender(appender);
+ return true;
+ }
+ } catch (Exception ex) {
+ logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties, ex);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Adds Logger Appenders
+ * @param names Names of appender
+ * @param args Args with properties
+ * @param propertySeparator Property separator
+ * @return True if all of the appenders have been added successfully
+ */
+ public static boolean addAppenders(String[] names, String args, String propertySeparator)
+ {
+ if (names == null || args == null || names.length == 0 || propertySeparator == null) {
+ throw new IllegalArgumentException("Incorrect appender parametrs");
+ }
+ boolean status = true;
+ try {
+ Properties properties = new Properties();
+ properties.load(new StringReader(args.replaceAll(propertySeparator, "\n")));
+ for (String name : names) {
+ if (!addAppender(name, properties)) {
+ status = false;
+ }
+ }
+ } catch (IOException ex) {
+ ;
+ }
+ return status;
+ }
+
+ /**
+ * Adds Default Logger Appenders
+ * Syntax of a value of the default appender parameters: {appender-names};{string-with-properties}
+ * Comma is a separator between appender names and properties
+ * @return True if all of the appenders have been added successfully
+ */
+ public static boolean addAppenders()
+ {
+ String appenderParameters = System.getProperty(LOGGER_APPENDER.getLongName());
+ if (appenderParameters != null) {
+ String[] splits = appenderParameters.split(";", 2);
+ if (splits.length != 2) {
+ return false;
+ }
+ return addAppenders(splits[0].split(","), splits[1], ",");
+ }
+ return false;
+ }
+
+ /**
+ * Removes Logger Appender
+ * @param name Appender name
+ * @return True if the appender has been removed successfully
+ */
+ public static boolean removeAppender(String name)
+ {
+ try {
+ LogManager.getRootLogger().removeAppender(name);
+ } catch (Exception ex) {
+ logger.error("Cannot remove the logger appender: {}", name, ex);
+ return false;
+ }
+ return false;
+ }
+
+ /**
+ * Returns a list names of the appenders
+ * @return Names of the appenders
+ */
+ public static List<String> getAppendersNames()
+ {
+ Enumeration enumeration = LogManager.getRootLogger().getAllAppenders();
+ List<String> names = new LinkedList<>();
+ while (enumeration.hasMoreElements()) {
+ names.add(((Appender)enumeration.nextElement()).getName());
+ }
+ return names;
+ }
}
diff --git a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
index 2ad3f4a..bc73ca8 100644
--- a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
@@ -26,14 +26,20 @@
import org.junit.runners.MethodSorters;
import org.slf4j.LoggerFactory;
+import org.apache.log4j.Appender;
import org.apache.log4j.Category;
+import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -165,4 +171,66 @@
}
assertSame(log4jLogger.getEffectiveLevel(), parent.getLevel());
}
+
+ @Test
+ public void testAppender()
+ {
+ logger.info("Running Appender Test");
+ String appenderName = "testAppender";
+ String appenderName1 = "testAppender1";
+ String args = "log4j.appender.testAppender=com.datatorrent.stram.util.LoggerUtilTest$TestAppender"
+ + ",log4j.appender.testAppender.layout=org.apache.log4j.PatternLayout"
+ + ",log4j.appender.testAppender.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n"
+ + ",log4j.appender.testAppender1=org.apache.log4j.ConsoleAppender"
+ + ",log4j.appender.testAppender1.layout=org.apache.log4j.PatternLayout"
+ + ",log4j.appender.testAppender1.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n";
+
+ assertTrue(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+ TestAppender appender = (TestAppender)LogManager.getRootLogger().getAppender(appenderName);
+
+ logger.info(args);
+ assertEquals(args, appender.lastMessage);
+ assertEquals(appender.level, Level.INFO);
+
+ logger.warn(appenderName1);
+ assertEquals(appenderName1, appender.lastMessage);
+ assertEquals(appender.level, Level.WARN);
+
+ // don't allow to add an appender with the same name
+ assertFalse(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+ logger.info("Test Appender is added: " + LoggerUtil.getAppendersNames());
+ testAndRemoveAppender(appenderName);
+ logger.info("Test Appender is removed: " + LoggerUtil.getAppendersNames());
+
+ System.setProperty(Context.DAGContext.LOGGER_APPENDER.getLongName(), appenderName + "," + appenderName1 + ";" + args);
+ assertTrue(LoggerUtil.addAppenders());
+ logger.info("Test Appenders are added: " + LoggerUtil.getAppendersNames());
+
+ testAndRemoveAppender(appenderName);
+ testAndRemoveAppender(appenderName1);
+ logger.info("Test Appenders are removed: " + LoggerUtil.getAppendersNames());
+ }
+
+ private static void testAndRemoveAppender(String name)
+ {
+ Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(name);
+ assertNotNull(appender);
+ assertTrue(LoggerUtil.getAppendersNames().contains(name));
+ LoggerUtil.removeAppender(name);
+ assertNull(org.apache.log4j.Logger.getRootLogger().getAppender(name));
+ }
+
+ public static class TestAppender extends ConsoleAppender
+ {
+ private String lastMessage = null;
+ private Level level;
+
+ @Override
+ public void append(LoggingEvent event)
+ {
+ lastMessage = event.getRenderedMessage();
+ level = event.getLevel();
+ super.append(event);
+ }
+ }
}