APEXCORE-704 Add supporting of programmatic logger appender

Implemented supporting of a programmatic logger appender that can be added to Apex Application Master and Containers and be configurable programmatically. The new programmatic appender can be defined in Java code or via a value of the new Apex attribute "LOGGER_APPENDER".

The syntax of the attribute value: {appender-names};{logger-properties}
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index a3b2f97..1d7b7b1 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -29,6 +29,8 @@
 
 import com.google.common.base.Throwables;
 
+import static com.datatorrent.api.StreamingApplication.APEX_PREFIX;
+
 /**
  * Attribute represents the attribute which can be set on various components in the system.
  *
@@ -88,6 +90,11 @@
     return "attr" + name.substring(name.lastIndexOf('.'));
   }
 
+  public String getLongName()
+  {
+    return APEX_PREFIX + getSimpleName().replaceAll("_",".").toLowerCase();
+  }
+
   public String getSimpleName()
   {
     return name.substring(name.lastIndexOf('.') + 1);
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 743f0f1..ff1a2d4 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -393,6 +393,10 @@
      */
     Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<>(String2String.getInstance());
     /**
+     * The options of dynamic apex logger appender
+     */
+    Attribute<String> LOGGER_APPENDER = new Attribute<>(String2String.getInstance());
+    /**
      * The amount of memory to be requested for the application master. Not used in local mode.
      * Default value is 1GB.
      */
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index 76c1407..dce648b 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -230,6 +230,11 @@
       }
     }
 
+    String loggerAppender = dag.getValue(LogicalPlan.LOGGER_APPENDER);
+    if (loggerAppender != null) {
+      vargs.add(String.format("-D%s=\"%s\"", LogicalPlan.LOGGER_APPENDER.getLongName(), loggerAppender));
+    }
+
     List<DAG.OperatorMeta> operatorMetaList = Lists.newArrayList();
     int bufferServerMemory = 0;
     for (PTOperator operator : sca.getContainer().getOperators()) {
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index b280aad..22a1c63 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -576,6 +576,11 @@
         vargs.add("-Dlog4j.debug=true");
       }
 
+      String loggerAppender = dag.getValue(LogicalPlan.LOGGER_APPENDER);
+      if (loggerAppender != null) {
+        vargs.add(String.format("-D%s=\"%s\"", LogicalPlan.LOGGER_APPENDER.getLongName(), loggerAppender));
+      }
+
       String loggersLevel = conf.get(StramUtils.DT_LOGGERS_LEVEL);
       if (loggersLevel != null) {
         vargs.add(String.format("-D%s=%s", StramUtils.DT_LOGGERS_LEVEL, loggersLevel));
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 9a7b128..787f20b 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -124,6 +124,7 @@
 import com.datatorrent.stram.plan.logical.requests.SetStreamAttributeRequest;
 import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.util.JSONSerializationProvider;
+import com.datatorrent.stram.util.LoggerUtil;
 import com.datatorrent.stram.util.SecurityUtils;
 import com.datatorrent.stram.util.VersionInfo;
 import com.datatorrent.stram.util.WebServicesClient;
@@ -4128,6 +4129,7 @@
 
   public static void main(final String[] args) throws Exception
   {
+    LoggerUtil.addAppenders();
     final ApexCli shell = new ApexCli();
     shell.preImpersonationInit(args);
     String hadoopUserName = System.getenv("HADOOP_USER_NAME");
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
index 1a14c90..014ba6e 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
@@ -26,6 +26,8 @@
 import org.apache.log4j.Appender;
 import org.apache.log4j.RollingFileAppender;
 
+import com.datatorrent.stram.util.LoggerUtil;
+
 /**
  * <p>StdOutErrLog class.</p>
  *
@@ -52,6 +54,7 @@
       logger.warn("found appender {} instead of RollingFileAppender", appender);
     }
 
+    LoggerUtil.addAppenders();
     System.setOut(createLoggingProxy(System.out));
     System.setErr(createLoggingProxy(System.err));
   }
diff --git a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
index ffe9c8c..14662e1 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
@@ -19,9 +19,15 @@
 package com.datatorrent.stram.util;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Method;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
 import javax.annotation.Nonnull;
@@ -35,6 +41,7 @@
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.spi.DefaultRepositorySelector;
 import org.apache.log4j.spi.HierarchyEventListener;
 import org.apache.log4j.spi.LoggerFactory;
@@ -45,6 +52,8 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+import static com.datatorrent.api.Context.DAGContext.LOGGER_APPENDER;
+
 /**
  * @since 3.5.0
  */
@@ -187,8 +196,6 @@
     }
   }
 
-  private static FileAppender fileAppender;
-  private static boolean shouldFetchLogFileInfo;
   static {
     logger.debug("initializing LoggerUtil");
     initializeLogger();
@@ -198,8 +205,6 @@
   static void initializeLogger()
   {
     LogManager.setRepositorySelector(new DefaultRepositorySelector(new DelegatingLoggerRepository(LogManager.getLoggerRepository())), null);
-    fileAppender = getFileAppender();
-    shouldFetchLogFileInfo = shouldFetchLogFileInformation();
   }
 
   private static synchronized Level getLevelFor(String name)
@@ -300,7 +305,8 @@
    */
   public static LogFileInformation getLogFileInformation()
   {
-    if (shouldFetchLogFileInfo) {
+    FileAppender fileAppender = getFileAppender();
+    if (shouldFetchLogFileInformation(fileAppender)) {
       File logFile = new File(fileAppender.getFile());
       LogFileInformation logFileInfo = new LogFileInformation(fileAppender.getFile(), logFile.length());
       return logFileInfo;
@@ -331,9 +337,9 @@
    * we have single file Appender, the logging level of appender is set to level Error or above and immediateFlush is set to true.
    * In future we should be able to enhance this feature to support multiple file appenders.
    */
-  private static boolean shouldFetchLogFileInformation()
+  private static boolean shouldFetchLogFileInformation(FileAppender fileAppender)
   {
-    if (fileAppender != null && isErrorLevelEnable() && fileAppender.getImmediateFlush()) {
+    if (fileAppender != null && isErrorLevelEnable(fileAppender) && fileAppender.getImmediateFlush()) {
       return true;
     }
     logger.warn(
@@ -341,7 +347,7 @@
     return false;
   }
 
-  private static boolean isErrorLevelEnable()
+  private static boolean isErrorLevelEnable(FileAppender fileAppender)
   {
     if (fileAppender != null) {
       Level p = (Level)fileAppender.getThreshold();
@@ -355,4 +361,107 @@
     return false;
   }
 
+  /**
+   * Adds Logger Appender
+   * @param name Appender name
+   * @param properties Appender properties
+   * @return True if the appender has been added successfully
+   */
+  public static boolean addAppender(String name, Properties properties)
+  {
+    if (getAppendersNames().contains(name)) {
+      logger.warn("A logger appender with the name '{}' exists. Cannot add a new logger appender with the same name", name);
+    } else {
+      try {
+        Method method = PropertyConfigurator.class.getDeclaredMethod("parseAppender", Properties.class, String.class);
+        method.setAccessible(true);
+        Appender appender = (Appender)method.invoke(new PropertyConfigurator(), properties, name);
+        if (appender == null) {
+          logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties);
+        } else {
+          LogManager.getRootLogger().addAppender(appender);
+          return true;
+        }
+      } catch (Exception ex) {
+        logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties, ex);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Adds Logger Appenders
+   * @param names Names of appender
+   * @param args Args with properties
+   * @param propertySeparator Property separator
+   * @return True if all of the appenders have been added successfully
+   */
+  public static boolean addAppenders(String[] names, String args, String propertySeparator)
+  {
+    if (names == null || args == null || names.length == 0 || propertySeparator == null) {
+      throw new IllegalArgumentException("Incorrect appender parametrs");
+    }
+    boolean status = true;
+    try {
+      Properties properties = new Properties();
+      properties.load(new StringReader(args.replaceAll(propertySeparator, "\n")));
+      for (String name : names) {
+        if (!addAppender(name, properties)) {
+          status = false;
+        }
+      }
+    } catch (IOException ex) {
+      ;
+    }
+    return status;
+  }
+
+  /**
+   * Adds Default Logger Appenders
+   * Syntax of a value of the default appender parameters: {appender-names};{string-with-properties}
+   * Comma is a separator between appender names and properties
+   * @return True if all of the appenders have been added successfully
+   */
+  public static boolean addAppenders()
+  {
+    String appenderParameters = System.getProperty(LOGGER_APPENDER.getLongName());
+    if (appenderParameters != null) {
+      String[] splits = appenderParameters.split(";", 2);
+      if (splits.length != 2) {
+        return false;
+      }
+      return addAppenders(splits[0].split(","), splits[1], ",");
+    }
+    return false;
+  }
+
+  /**
+   * Removes Logger Appender
+   * @param name Appender name
+   * @return True if the appender has been removed successfully
+   */
+  public static boolean removeAppender(String name)
+  {
+    try {
+      LogManager.getRootLogger().removeAppender(name);
+    } catch (Exception ex) {
+      logger.error("Cannot remove the logger appender: {}", name, ex);
+      return false;
+    }
+    return false;
+  }
+
+  /**
+   * Returns a list names of the appenders
+   * @return Names of the appenders
+   */
+  public static List<String> getAppendersNames()
+  {
+    Enumeration enumeration = LogManager.getRootLogger().getAllAppenders();
+    List<String> names = new LinkedList<>();
+    while (enumeration.hasMoreElements()) {
+      names.add(((Appender)enumeration.nextElement()).getName());
+    }
+    return names;
+  }
 }
diff --git a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
index 2ad3f4a..bc73ca8 100644
--- a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
@@ -26,14 +26,20 @@
 import org.junit.runners.MethodSorters;
 import org.slf4j.LoggerFactory;
 
+import org.apache.log4j.Appender;
 import org.apache.log4j.Category;
+import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
 
 import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Context;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
@@ -165,4 +171,66 @@
     }
     assertSame(log4jLogger.getEffectiveLevel(), parent.getLevel());
   }
+
+  @Test
+  public void testAppender()
+  {
+    logger.info("Running Appender Test");
+    String appenderName = "testAppender";
+    String appenderName1 = "testAppender1";
+    String args = "log4j.appender.testAppender=com.datatorrent.stram.util.LoggerUtilTest$TestAppender"
+        + ",log4j.appender.testAppender.layout=org.apache.log4j.PatternLayout"
+        + ",log4j.appender.testAppender.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n"
+        + ",log4j.appender.testAppender1=org.apache.log4j.ConsoleAppender"
+        + ",log4j.appender.testAppender1.layout=org.apache.log4j.PatternLayout"
+        + ",log4j.appender.testAppender1.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n";
+
+    assertTrue(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+    TestAppender appender = (TestAppender)LogManager.getRootLogger().getAppender(appenderName);
+
+    logger.info(args);
+    assertEquals(args, appender.lastMessage);
+    assertEquals(appender.level, Level.INFO);
+
+    logger.warn(appenderName1);
+    assertEquals(appenderName1, appender.lastMessage);
+    assertEquals(appender.level, Level.WARN);
+
+    // don't allow to add an appender with the same name
+    assertFalse(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+    logger.info("Test Appender is added: " + LoggerUtil.getAppendersNames());
+    testAndRemoveAppender(appenderName);
+    logger.info("Test Appender is removed: " + LoggerUtil.getAppendersNames());
+
+    System.setProperty(Context.DAGContext.LOGGER_APPENDER.getLongName(), appenderName + "," + appenderName1 + ";" + args);
+    assertTrue(LoggerUtil.addAppenders());
+    logger.info("Test Appenders are added: " + LoggerUtil.getAppendersNames());
+
+    testAndRemoveAppender(appenderName);
+    testAndRemoveAppender(appenderName1);
+    logger.info("Test Appenders are removed: " + LoggerUtil.getAppendersNames());
+  }
+
+  private static void testAndRemoveAppender(String name)
+  {
+    Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(name);
+    assertNotNull(appender);
+    assertTrue(LoggerUtil.getAppendersNames().contains(name));
+    LoggerUtil.removeAppender(name);
+    assertNull(org.apache.log4j.Logger.getRootLogger().getAppender(name));
+  }
+
+  public static class TestAppender extends ConsoleAppender
+  {
+    private String lastMessage = null;
+    private Level level;
+
+    @Override
+    public void append(LoggingEvent event)
+    {
+      lastMessage = event.getRenderedMessage();
+      level = event.getLevel();
+      super.append(event);
+    }
+  }
 }