Merge branch 'APEXCORE-726' of github.com:PramodSSImmaneni/apex-core
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index a3b2f97..1d7b7b1 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -29,6 +29,8 @@
 
 import com.google.common.base.Throwables;
 
+import static com.datatorrent.api.StreamingApplication.APEX_PREFIX;
+
 /**
  * Attribute represents the attribute which can be set on various components in the system.
  *
@@ -88,6 +90,11 @@
     return "attr" + name.substring(name.lastIndexOf('.'));
   }
 
+  public String getLongName()
+  {
+    return APEX_PREFIX + getSimpleName().replaceAll("_",".").toLowerCase();
+  }
+
   public String getSimpleName()
   {
     return name.substring(name.lastIndexOf('.') + 1);
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 743f0f1..ff1a2d4 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -393,6 +393,10 @@
      */
     Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<>(String2String.getInstance());
     /**
+     * The options of dynamic apex logger appender
+     */
+    Attribute<String> LOGGER_APPENDER = new Attribute<>(String2String.getInstance());
+    /**
      * The amount of memory to be requested for the application master. Not used in local mode.
      * Default value is 1GB.
      */
diff --git a/docs/application_packages.md b/docs/application_packages.md
index 74886fc..95a0f27 100644
--- a/docs/application_packages.md
+++ b/docs/application_packages.md
@@ -12,7 +12,7 @@
 You will need have the following installed:
 
 1. Apache Maven 3.0 or later (for assembling the App Package)
-2. Apache Apex 3.2.0 or later (for launching the App Package in your cluster)
+2. Apache Apex 3.6.0 or later (for launching the App Package in your cluster)
 
 ## Creating Your First Apex App Package
 
@@ -24,11 +24,13 @@
 First, change to the directory where you put your projects, and create
 an Apex application project using Maven by running the following
 command.  Replace "com.example", "myapp" and "1.0-SNAPSHOT" with the
-appropriate values (make sure this is all on one line):
+appropriate values (make sure this is all on one line). You can also
+replace "RELEASE" with a specific Apex version number (like "3.6.0")
+if you don't want to use the most recent release:
 
     $ mvn archetype:generate \
      -DarchetypeGroupId=org.apache.apex \
-     -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \
+     -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=RELEASE \
      -DgroupId=com.example -Dpackage=com.example.myapp -DartifactId=myapp \
      -Dversion=1.0-SNAPSHOT
 
@@ -84,7 +86,7 @@
 
 Group ID: org.apache.apex
 Artifact ID: apex-app-archetype
-Version: 3.4.0 (or any later version)
+Version: 3.6.0 (or any later version)
 
 ## Writing Your Own App Package
 
@@ -96,7 +98,7 @@
 Under the project, you can add project dependencies in pom.xml, or do it
 through your IDE.  Here’s the section that describes the dependencies in
 the default pom.xml:
-```
+```xml
   <dependencies>
     <!-- add your dependencies here -->
     <dependency>
@@ -173,7 +175,7 @@
 properties and application specific properties. They are all specified
 as name value pairs, in XML format, like the following.
 
-```
+```xml
 <?xml version="1.0"?>
 <configuration>
   <property>
@@ -196,7 +198,7 @@
 specifies the name of the attribute. Below is an example snippet setting
 the streaming windows size of the application to be 1000 milliseconds.
 
-```
+```xml
   <property>
      <name>apex.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
      <value>1000</value>
@@ -223,7 +225,7 @@
 shown below. It specifies the number of streaming windows for one
 application window of an operator named “input” to be 10
 
-```
+```xml
 <property>
   <name>apex.operator.input.attr.APPLICATION_WINDOW_COUNT</name>
   <value>10</value>
@@ -247,7 +249,7 @@
 this is specified below. It specifies the property “hostname” of the
 redis server for a “redis” output operator.
 
-```
+```xml
   <property>
     <name>apex.operator.redis.prop.host</name>
     <value>127.0.0.1</value>
@@ -263,6 +265,17 @@
 value is passed as an argument. In the above example the method setHost
 will be called on the “redis” operator with “127.0.0.1” as the argument.
 
+Properties that are collection types can also be configured, based on the beanutils
+syntax. For example, the connection properties of the JDBC store can be accessed
+like this:
+
+```xml
+  <property>
+    <name>apex.operator.jdbc.prop.store.connectionProperties(user)</name>
+    <value>your-user-name</value>
+  </property>
+```
+
 ### Port attributes
 Port attributes are used to specify the platform behavior for input and
 output ports. They can be specified using the parameter ```apex.operator.<operator-name>.inputport.<port-name>.attr.<attribute>```
@@ -274,7 +287,7 @@
 capacity for an input port named “input” of an operator named “range” to
 be 4k.
 
-```
+```xml
 <property>
   <name>apex.operator.range.inputport.input.attr.QUEUE_CAPACITY</name>
   <value>4000</value>
@@ -306,7 +319,7 @@
 “stream1” to container local indicating that the operators the stream is
 connecting be run in the same container.
 
-```
+```xml
   <property>
     <name>apex.stream.stream1.prop.locality</name>
     <value>CONTAINER_LOCAL</value>
@@ -337,7 +350,7 @@
 specify a group for applications, operators, ports or streams. For
 example, to specify an attribute for all ports of an operator it can be
 done as follows
-```
+```xml
 <property>
   <name>apex.operator.range.port.*.attr.QUEUE_CAPACITY</name>
   <value>4000</value>
@@ -357,7 +370,7 @@
 src/main/resources/META-INF/properties.xml under the App Package
 project. The properties.xml may look like:
 
-```
+```xml
 <?xml version="1.0"?>
 <configuration>
   <property>
@@ -433,9 +446,9 @@
 In a Apex App Package project, the pom.xml file contains a
 section that looks like:
 
-```
+```xml
 <properties>
-  <apex.version>3.4.0</apex.version>
+  <apex.core.version>3.6.0</apex.core.version>
   <apex.apppackage.classpath\>lib*.jar</apex.apppackage.classpath>
 </properties>
 ```
@@ -535,7 +548,7 @@
 
 ```
 $ mvn archetype:generate -DarchetypeGroupId=org.apache.apex \
-  -DarchetypeArtifactId=apex-conf-archetype -DarchetypeVersion=3.4.0 \
+  -DarchetypeArtifactId=apex-conf-archetype -DarchetypeVersion=RELEASE \
   -DgroupId=com.example -Dpackage=com.example.myconfig -DartifactId=myconfig \
   -Dversion=1.0-SNAPSHOT
 ```
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index b90cdab..598cdba 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -273,6 +273,8 @@
     vargs.add(String.format("-D%scid=%s", StreamingApplication.DT_PREFIX, jvmID));
     vargs.add("-Dhadoop.root.logger=" + (dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
     vargs.add("-Dhadoop.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+    StramClientUtils.addAttributeToArgs(LogicalPlan.APPLICATION_NAME, dag, vargs);
+    StramClientUtils.addAttributeToArgs(LogicalPlan.LOGGER_APPENDER, dag, vargs);
 
     String loggersLevel = System.getProperty(StramUtils.DT_LOGGERS_LEVEL);
     if (loggersLevel != null) {
@@ -353,5 +355,4 @@
       throw new RuntimeException("Error generating delegation token", e);
     }
   }
-
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index f7d3da9..c5017eb 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -580,6 +580,8 @@
       vargs.add("-Dhadoop.root.logger=" + (dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
       vargs.add("-Dhadoop.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
       vargs.add(String.format("-D%s=%s", StreamingContainer.PROP_APP_PATH, dag.assertAppPath()));
+      StramClientUtils.addAttributeToArgs(LogicalPlan.APPLICATION_NAME, dag, vargs);
+      StramClientUtils.addAttributeToArgs(LogicalPlan.LOGGER_APPENDER, dag, vargs);
       if (dag.isDebug()) {
         vargs.add("-Dlog4j.debug=true");
       }
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 9a7b128..787f20b 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -124,6 +124,7 @@
 import com.datatorrent.stram.plan.logical.requests.SetStreamAttributeRequest;
 import com.datatorrent.stram.security.StramUserLogin;
 import com.datatorrent.stram.util.JSONSerializationProvider;
+import com.datatorrent.stram.util.LoggerUtil;
 import com.datatorrent.stram.util.SecurityUtils;
 import com.datatorrent.stram.util.VersionInfo;
 import com.datatorrent.stram.util.WebServicesClient;
@@ -4128,6 +4129,7 @@
 
   public static void main(final String[] args) throws Exception
   {
+    LoggerUtil.addAppenders();
     final ApexCli shell = new ApexCli();
     shell.preImpersonationInit(args);
     String hadoopUserName = System.getenv("HADOOP_USER_NAME");
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 15adab4..d9032e5 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -80,6 +80,8 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.stram.StramClient;
 import com.datatorrent.stram.StramUtils;
@@ -870,4 +872,12 @@
     return appInfo;
   }
 
+  public static void addAttributeToArgs(Attribute<String> attribute, Context context, List<CharSequence> vargs)
+  {
+    String value = context.getValue(attribute);
+    if (value != null) {
+      vargs.add(String.format("-D%s=$'%s'", attribute.getLongName(),
+          value.replace("\\", "\\\\\\\\").replaceAll("['\"$]", "\\\\$0")));
+    }
+  }
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
index 1a14c90..014ba6e 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
@@ -26,6 +26,8 @@
 import org.apache.log4j.Appender;
 import org.apache.log4j.RollingFileAppender;
 
+import com.datatorrent.stram.util.LoggerUtil;
+
 /**
  * <p>StdOutErrLog class.</p>
  *
@@ -52,6 +54,7 @@
       logger.warn("found appender {} instead of RollingFileAppender", appender);
     }
 
+    LoggerUtil.addAppenders();
     System.setOut(createLoggingProxy(System.out));
     System.setErr(createLoggingProxy(System.err));
   }
diff --git a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
index ffe9c8c..14662e1 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
@@ -19,9 +19,15 @@
 package com.datatorrent.stram.util;
 
 import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Method;
 import java.util.Enumeration;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
 import javax.annotation.Nonnull;
@@ -35,6 +41,7 @@
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.spi.DefaultRepositorySelector;
 import org.apache.log4j.spi.HierarchyEventListener;
 import org.apache.log4j.spi.LoggerFactory;
@@ -45,6 +52,8 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
+import static com.datatorrent.api.Context.DAGContext.LOGGER_APPENDER;
+
 /**
  * @since 3.5.0
  */
@@ -187,8 +196,6 @@
     }
   }
 
-  private static FileAppender fileAppender;
-  private static boolean shouldFetchLogFileInfo;
   static {
     logger.debug("initializing LoggerUtil");
     initializeLogger();
@@ -198,8 +205,6 @@
   static void initializeLogger()
   {
     LogManager.setRepositorySelector(new DefaultRepositorySelector(new DelegatingLoggerRepository(LogManager.getLoggerRepository())), null);
-    fileAppender = getFileAppender();
-    shouldFetchLogFileInfo = shouldFetchLogFileInformation();
   }
 
   private static synchronized Level getLevelFor(String name)
@@ -300,7 +305,8 @@
    */
   public static LogFileInformation getLogFileInformation()
   {
-    if (shouldFetchLogFileInfo) {
+    FileAppender fileAppender = getFileAppender();
+    if (shouldFetchLogFileInformation(fileAppender)) {
       File logFile = new File(fileAppender.getFile());
       LogFileInformation logFileInfo = new LogFileInformation(fileAppender.getFile(), logFile.length());
       return logFileInfo;
@@ -331,9 +337,9 @@
    * we have single file Appender, the logging level of appender is set to level Error or above and immediateFlush is set to true.
    * In future we should be able to enhance this feature to support multiple file appenders.
    */
-  private static boolean shouldFetchLogFileInformation()
+  private static boolean shouldFetchLogFileInformation(FileAppender fileAppender)
   {
-    if (fileAppender != null && isErrorLevelEnable() && fileAppender.getImmediateFlush()) {
+    if (fileAppender != null && isErrorLevelEnable(fileAppender) && fileAppender.getImmediateFlush()) {
       return true;
     }
     logger.warn(
@@ -341,7 +347,7 @@
     return false;
   }
 
-  private static boolean isErrorLevelEnable()
+  private static boolean isErrorLevelEnable(FileAppender fileAppender)
   {
     if (fileAppender != null) {
       Level p = (Level)fileAppender.getThreshold();
@@ -355,4 +361,107 @@
     return false;
   }
 
+  /**
+   * Adds Logger Appender
+   * @param name Appender name
+   * @param properties Appender properties
+   * @return True if the appender has been added successfully
+   */
+  public static boolean addAppender(String name, Properties properties)
+  {
+    if (getAppendersNames().contains(name)) {
+      logger.warn("A logger appender with the name '{}' exists. Cannot add a new logger appender with the same name", name);
+    } else {
+      try {
+        Method method = PropertyConfigurator.class.getDeclaredMethod("parseAppender", Properties.class, String.class);
+        method.setAccessible(true);
+        Appender appender = (Appender)method.invoke(new PropertyConfigurator(), properties, name);
+        if (appender == null) {
+          logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties);
+        } else {
+          LogManager.getRootLogger().addAppender(appender);
+          return true;
+        }
+      } catch (Exception ex) {
+        logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties, ex);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Adds Logger Appenders
+   * @param names Names of appender
+   * @param args Args with properties
+   * @param propertySeparator Property separator
+   * @return True if all of the appenders have been added successfully
+   */
+  public static boolean addAppenders(String[] names, String args, String propertySeparator)
+  {
+    if (names == null || args == null || names.length == 0 || propertySeparator == null) {
+      throw new IllegalArgumentException("Incorrect appender parametrs");
+    }
+    boolean status = true;
+    try {
+      Properties properties = new Properties();
+      properties.load(new StringReader(args.replaceAll(propertySeparator, "\n")));
+      for (String name : names) {
+        if (!addAppender(name, properties)) {
+          status = false;
+        }
+      }
+    } catch (IOException ex) {
+      ;
+    }
+    return status;
+  }
+
+  /**
+   * Adds Default Logger Appenders
+   * Syntax of a value of the default appender parameters: {appender-names};{string-with-properties}
+   * Comma is a separator between appender names and properties
+   * @return True if all of the appenders have been added successfully
+   */
+  public static boolean addAppenders()
+  {
+    String appenderParameters = System.getProperty(LOGGER_APPENDER.getLongName());
+    if (appenderParameters != null) {
+      String[] splits = appenderParameters.split(";", 2);
+      if (splits.length != 2) {
+        return false;
+      }
+      return addAppenders(splits[0].split(","), splits[1], ",");
+    }
+    return false;
+  }
+
+  /**
+   * Removes Logger Appender
+   * @param name Appender name
+   * @return True if the appender has been removed successfully
+   */
+  public static boolean removeAppender(String name)
+  {
+    try {
+      LogManager.getRootLogger().removeAppender(name);
+    } catch (Exception ex) {
+      logger.error("Cannot remove the logger appender: {}", name, ex);
+      return false;
+    }
+    return false;
+  }
+
+  /**
+   * Returns a list names of the appenders
+   * @return Names of the appenders
+   */
+  public static List<String> getAppendersNames()
+  {
+    Enumeration enumeration = LogManager.getRootLogger().getAllAppenders();
+    List<String> names = new LinkedList<>();
+    while (enumeration.hasMoreElements()) {
+      names.add(((Appender)enumeration.nextElement()).getName());
+    }
+    return names;
+  }
 }
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
index 5e468f5..a4aca46 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -170,12 +170,10 @@
   @Override
   public void dispatch(Event event)
   {
-    if (!plugins.isEmpty()) {
-      if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
-        clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
-      } else if (event instanceof DAGExecutionEvent) {
-        dispatchExecutionEvent((DAGExecutionEvent)event);
-      }
+    if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
+      clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
+    } else if (!plugins.isEmpty() && (event instanceof DAGExecutionEvent)) {
+      dispatchExecutionEvent((DAGExecutionEvent)event);
     }
   }
 }
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index ca85f5d..455604b 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -603,4 +603,36 @@
 
   }
 
+  private static String APP_NAME = "$test\\\"'";
+
+  @Test
+  public void testAddAttributeToArgs() throws Exception
+  {
+    LogicalPlan dag = new LogicalPlan();
+    dag.setAttribute(LogicalPlan.APPLICATION_NAME, APP_NAME);
+    AddAttributeToArgsOperator operator = dag.addOperator("test", AddAttributeToArgsOperator.class);
+    dag.getContextAttributes(operator).put(OperatorContext.RECOVERY_ATTEMPTS, 0);
+
+    StramClient client = new StramClient(conf, dag);
+    if (StringUtils.isBlank(System.getenv("JAVA_HOME"))) {
+      client.javaCmd = "java";
+    }
+    try {
+      client.start();
+      client.startApplication();
+      Assert.assertTrue(client.monitorApplication());
+    } finally {
+      client.stop();
+    }
+  }
+
+  public static class AddAttributeToArgsOperator extends BaseOperator implements InputOperator
+  {
+    @Override
+    public void emitTuples()
+    {
+      throw APP_NAME.equals(System.getProperty(LogicalPlan.APPLICATION_NAME.getLongName()))
+          ? new ShutdownException() : new RuntimeException();
+    }
+  }
 }
diff --git a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
index 2ad3f4a..bc73ca8 100644
--- a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
@@ -26,14 +26,20 @@
 import org.junit.runners.MethodSorters;
 import org.slf4j.LoggerFactory;
 
+import org.apache.log4j.Appender;
 import org.apache.log4j.Category;
+import org.apache.log4j.ConsoleAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
 
 import com.google.common.collect.Maps;
 
+import com.datatorrent.api.Context;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
@@ -165,4 +171,66 @@
     }
     assertSame(log4jLogger.getEffectiveLevel(), parent.getLevel());
   }
+
+  @Test
+  public void testAppender()
+  {
+    logger.info("Running Appender Test");
+    String appenderName = "testAppender";
+    String appenderName1 = "testAppender1";
+    String args = "log4j.appender.testAppender=com.datatorrent.stram.util.LoggerUtilTest$TestAppender"
+        + ",log4j.appender.testAppender.layout=org.apache.log4j.PatternLayout"
+        + ",log4j.appender.testAppender.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n"
+        + ",log4j.appender.testAppender1=org.apache.log4j.ConsoleAppender"
+        + ",log4j.appender.testAppender1.layout=org.apache.log4j.PatternLayout"
+        + ",log4j.appender.testAppender1.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n";
+
+    assertTrue(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+    TestAppender appender = (TestAppender)LogManager.getRootLogger().getAppender(appenderName);
+
+    logger.info(args);
+    assertEquals(args, appender.lastMessage);
+    assertEquals(appender.level, Level.INFO);
+
+    logger.warn(appenderName1);
+    assertEquals(appenderName1, appender.lastMessage);
+    assertEquals(appender.level, Level.WARN);
+
+    // don't allow to add an appender with the same name
+    assertFalse(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+    logger.info("Test Appender is added: " + LoggerUtil.getAppendersNames());
+    testAndRemoveAppender(appenderName);
+    logger.info("Test Appender is removed: " + LoggerUtil.getAppendersNames());
+
+    System.setProperty(Context.DAGContext.LOGGER_APPENDER.getLongName(), appenderName + "," + appenderName1 + ";" + args);
+    assertTrue(LoggerUtil.addAppenders());
+    logger.info("Test Appenders are added: " + LoggerUtil.getAppendersNames());
+
+    testAndRemoveAppender(appenderName);
+    testAndRemoveAppender(appenderName1);
+    logger.info("Test Appenders are removed: " + LoggerUtil.getAppendersNames());
+  }
+
+  private static void testAndRemoveAppender(String name)
+  {
+    Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(name);
+    assertNotNull(appender);
+    assertTrue(LoggerUtil.getAppendersNames().contains(name));
+    LoggerUtil.removeAppender(name);
+    assertNull(org.apache.log4j.Logger.getRootLogger().getAppender(name));
+  }
+
+  public static class TestAppender extends ConsoleAppender
+  {
+    private String lastMessage = null;
+    private Level level;
+
+    @Override
+    public void append(LoggingEvent event)
+    {
+      lastMessage = event.getRenderedMessage();
+      level = event.getLevel();
+      super.append(event);
+    }
+  }
 }
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 833d69f..654a4ce 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -27,6 +27,8 @@
 import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
 
+import com.datatorrent.api.DAG;
+
 import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
 import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
 import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
@@ -39,10 +41,13 @@
   private int heartbeatCount = 0;
   private int commitCount = 0;
   CountDownLatch latch = new CountDownLatch(3);
+  private Context context;
 
   @Override
   public void setup(DAGExecutionPlugin.Context context)
   {
+    this.context = context;
+
     context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>()
     {
       @Override
@@ -102,4 +107,9 @@
   {
     latch.await(timeout, TimeUnit.SECONDS);
   }
+
+  public DAG getLogicalPlan()
+  {
+    return context.getDAG();
+  }
 }
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index 140dc65..34589b0 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -33,6 +33,7 @@
 import com.datatorrent.api.Attribute;
 import com.datatorrent.stram.api.StramEvent;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.support.StramTestSupport;
 
 public class PluginTests
@@ -93,12 +94,14 @@
     }));
     pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234));
     pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()));
+    LogicalPlan plan = new LogicalPlan();
+    pluginManager.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan));
     debugPlugin.waitForEventDelivery(10);
     pluginManager.stop();
 
     Assert.assertEquals(1, debugPlugin.getEventCount());
     Assert.assertEquals(1, debugPlugin.getHeartbeatCount());
     Assert.assertEquals(1, debugPlugin.getCommitCount());
+    Assert.assertEquals(plan, debugPlugin.getLogicalPlan());
   }
-
 }