Merge branch 'APEXCORE-726' of github.com:PramodSSImmaneni/apex-core
diff --git a/api/src/main/java/com/datatorrent/api/Attribute.java b/api/src/main/java/com/datatorrent/api/Attribute.java
index a3b2f97..1d7b7b1 100644
--- a/api/src/main/java/com/datatorrent/api/Attribute.java
+++ b/api/src/main/java/com/datatorrent/api/Attribute.java
@@ -29,6 +29,8 @@
import com.google.common.base.Throwables;
+import static com.datatorrent.api.StreamingApplication.APEX_PREFIX;
+
/**
* Attribute represents the attribute which can be set on various components in the system.
*
@@ -88,6 +90,11 @@
return "attr" + name.substring(name.lastIndexOf('.'));
}
+ public String getLongName()
+ {
+ return APEX_PREFIX + getSimpleName().replaceAll("_",".").toLowerCase();
+ }
+
public String getSimpleName()
{
return name.substring(name.lastIndexOf('.') + 1);
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 743f0f1..ff1a2d4 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -393,6 +393,10 @@
*/
Attribute<String> CONTAINER_JVM_OPTIONS = new Attribute<>(String2String.getInstance());
/**
+ * The options of dynamic apex logger appender
+ */
+ Attribute<String> LOGGER_APPENDER = new Attribute<>(String2String.getInstance());
+ /**
* The amount of memory to be requested for the application master. Not used in local mode.
* Default value is 1GB.
*/
diff --git a/docs/application_packages.md b/docs/application_packages.md
index 74886fc..95a0f27 100644
--- a/docs/application_packages.md
+++ b/docs/application_packages.md
@@ -12,7 +12,7 @@
You will need have the following installed:
1. Apache Maven 3.0 or later (for assembling the App Package)
-2. Apache Apex 3.2.0 or later (for launching the App Package in your cluster)
+2. Apache Apex 3.6.0 or later (for launching the App Package in your cluster)
## Creating Your First Apex App Package
@@ -24,11 +24,13 @@
First, change to the directory where you put your projects, and create
an Apex application project using Maven by running the following
command. Replace "com.example", "myapp" and "1.0-SNAPSHOT" with the
-appropriate values (make sure this is all on one line):
+appropriate values (make sure this is all on one line). You can also
+replace "RELEASE" with a specific Apex version number (like "3.6.0")
+if you don't want to use the most recent release:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.apex \
- -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.4.0 \
+ -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=RELEASE \
-DgroupId=com.example -Dpackage=com.example.myapp -DartifactId=myapp \
-Dversion=1.0-SNAPSHOT
@@ -84,7 +86,7 @@
Group ID: org.apache.apex
Artifact ID: apex-app-archetype
-Version: 3.4.0 (or any later version)
+Version: 3.6.0 (or any later version)
## Writing Your Own App Package
@@ -96,7 +98,7 @@
Under the project, you can add project dependencies in pom.xml, or do it
through your IDE. Here’s the section that describes the dependencies in
the default pom.xml:
-```
+```xml
<dependencies>
<!-- add your dependencies here -->
<dependency>
@@ -173,7 +175,7 @@
properties and application specific properties. They are all specified
as name value pairs, in XML format, like the following.
-```
+```xml
<?xml version="1.0"?>
<configuration>
<property>
@@ -196,7 +198,7 @@
specifies the name of the attribute. Below is an example snippet setting
the streaming windows size of the application to be 1000 milliseconds.
-```
+```xml
<property>
<name>apex.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
<value>1000</value>
@@ -223,7 +225,7 @@
shown below. It specifies the number of streaming windows for one
application window of an operator named “input” to be 10
-```
+```xml
<property>
<name>apex.operator.input.attr.APPLICATION_WINDOW_COUNT</name>
<value>10</value>
@@ -247,7 +249,7 @@
this is specified below. It specifies the property “hostname” of the
redis server for a “redis” output operator.
-```
+```xml
<property>
<name>apex.operator.redis.prop.host</name>
<value>127.0.0.1</value>
@@ -263,6 +265,17 @@
value is passed as an argument. In the above example the method setHost
will be called on the “redis” operator with “127.0.0.1” as the argument.
+Properties that are collection types can also be configured, based on the beanutils
+syntax. For example, the connection properties of the JDBC store can be accessed
+like this:
+
+```xml
+ <property>
+ <name>apex.operator.jdbc.prop.store.connectionProperties(user)</name>
+ <value>your-user-name</value>
+ </property>
+```
+
### Port attributes
Port attributes are used to specify the platform behavior for input and
output ports. They can be specified using the parameter ```apex.operator.<operator-name>.inputport.<port-name>.attr.<attribute>```
@@ -274,7 +287,7 @@
capacity for an input port named “input” of an operator named “range” to
be 4k.
-```
+```xml
<property>
<name>apex.operator.range.inputport.input.attr.QUEUE_CAPACITY</name>
<value>4000</value>
@@ -306,7 +319,7 @@
“stream1” to container local indicating that the operators the stream is
connecting be run in the same container.
-```
+```xml
<property>
<name>apex.stream.stream1.prop.locality</name>
<value>CONTAINER_LOCAL</value>
@@ -337,7 +350,7 @@
specify a group for applications, operators, ports or streams. For
example, to specify an attribute for all ports of an operator it can be
done as follows
-```
+```xml
<property>
<name>apex.operator.range.port.*.attr.QUEUE_CAPACITY</name>
<value>4000</value>
@@ -357,7 +370,7 @@
src/main/resources/META-INF/properties.xml under the App Package
project. The properties.xml may look like:
-```
+```xml
<?xml version="1.0"?>
<configuration>
<property>
@@ -433,9 +446,9 @@
In a Apex App Package project, the pom.xml file contains a
section that looks like:
-```
+```xml
<properties>
- <apex.version>3.4.0</apex.version>
+ <apex.core.version>3.6.0</apex.core.version>
<apex.apppackage.classpath\>lib*.jar</apex.apppackage.classpath>
</properties>
```
@@ -535,7 +548,7 @@
```
$ mvn archetype:generate -DarchetypeGroupId=org.apache.apex \
- -DarchetypeArtifactId=apex-conf-archetype -DarchetypeVersion=3.4.0 \
+ -DarchetypeArtifactId=apex-conf-archetype -DarchetypeVersion=RELEASE \
-DgroupId=com.example -Dpackage=com.example.myconfig -DartifactId=myconfig \
-Dversion=1.0-SNAPSHOT
```
diff --git a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
index b90cdab..598cdba 100644
--- a/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
+++ b/engine/src/main/java/com/datatorrent/stram/LaunchContainerRunnable.java
@@ -273,6 +273,8 @@
vargs.add(String.format("-D%scid=%s", StreamingApplication.DT_PREFIX, jvmID));
vargs.add("-Dhadoop.root.logger=" + (dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
vargs.add("-Dhadoop.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ StramClientUtils.addAttributeToArgs(LogicalPlan.APPLICATION_NAME, dag, vargs);
+ StramClientUtils.addAttributeToArgs(LogicalPlan.LOGGER_APPENDER, dag, vargs);
String loggersLevel = System.getProperty(StramUtils.DT_LOGGERS_LEVEL);
if (loggersLevel != null) {
@@ -353,5 +355,4 @@
throw new RuntimeException("Error generating delegation token", e);
}
}
-
}
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index f7d3da9..c5017eb 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -580,6 +580,8 @@
vargs.add("-Dhadoop.root.logger=" + (dag.isDebug() ? "DEBUG" : "INFO") + ",RFA");
vargs.add("-Dhadoop.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
vargs.add(String.format("-D%s=%s", StreamingContainer.PROP_APP_PATH, dag.assertAppPath()));
+ StramClientUtils.addAttributeToArgs(LogicalPlan.APPLICATION_NAME, dag, vargs);
+ StramClientUtils.addAttributeToArgs(LogicalPlan.LOGGER_APPENDER, dag, vargs);
if (dag.isDebug()) {
vargs.add("-Dlog4j.debug=true");
}
diff --git a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
index 9a7b128..787f20b 100644
--- a/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
+++ b/engine/src/main/java/com/datatorrent/stram/cli/ApexCli.java
@@ -124,6 +124,7 @@
import com.datatorrent.stram.plan.logical.requests.SetStreamAttributeRequest;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.JSONSerializationProvider;
+import com.datatorrent.stram.util.LoggerUtil;
import com.datatorrent.stram.util.SecurityUtils;
import com.datatorrent.stram.util.VersionInfo;
import com.datatorrent.stram.util.WebServicesClient;
@@ -4128,6 +4129,7 @@
public static void main(final String[] args) throws Exception
{
+ LoggerUtil.addAppenders();
final ApexCli shell = new ApexCli();
shell.preImpersonationInit(args);
String hadoopUserName = System.getenv("HADOOP_USER_NAME");
diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
index 15adab4..d9032e5 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java
@@ -80,6 +80,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.StramClient;
import com.datatorrent.stram.StramUtils;
@@ -870,4 +872,12 @@
return appInfo;
}
+ public static void addAttributeToArgs(Attribute<String> attribute, Context context, List<CharSequence> vargs)
+ {
+ String value = context.getValue(attribute);
+ if (value != null) {
+ vargs.add(String.format("-D%s=$'%s'", attribute.getLongName(),
+ value.replace("\\", "\\\\\\\\").replaceAll("['\"$]", "\\\\$0")));
+ }
+ }
}
diff --git a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
index 1a14c90..014ba6e 100644
--- a/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
+++ b/engine/src/main/java/com/datatorrent/stram/debug/StdOutErrLog.java
@@ -26,6 +26,8 @@
import org.apache.log4j.Appender;
import org.apache.log4j.RollingFileAppender;
+import com.datatorrent.stram.util.LoggerUtil;
+
/**
* <p>StdOutErrLog class.</p>
*
@@ -52,6 +54,7 @@
logger.warn("found appender {} instead of RollingFileAppender", appender);
}
+ LoggerUtil.addAppenders();
System.setOut(createLoggingProxy(System.out));
System.setErr(createLoggingProxy(System.err));
}
diff --git a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
index ffe9c8c..14662e1 100644
--- a/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
+++ b/engine/src/main/java/com/datatorrent/stram/util/LoggerUtil.java
@@ -19,9 +19,15 @@
package com.datatorrent.stram.util;
import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
@@ -35,6 +41,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.spi.DefaultRepositorySelector;
import org.apache.log4j.spi.HierarchyEventListener;
import org.apache.log4j.spi.LoggerFactory;
@@ -45,6 +52,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import static com.datatorrent.api.Context.DAGContext.LOGGER_APPENDER;
+
/**
* @since 3.5.0
*/
@@ -187,8 +196,6 @@
}
}
- private static FileAppender fileAppender;
- private static boolean shouldFetchLogFileInfo;
static {
logger.debug("initializing LoggerUtil");
initializeLogger();
@@ -198,8 +205,6 @@
static void initializeLogger()
{
LogManager.setRepositorySelector(new DefaultRepositorySelector(new DelegatingLoggerRepository(LogManager.getLoggerRepository())), null);
- fileAppender = getFileAppender();
- shouldFetchLogFileInfo = shouldFetchLogFileInformation();
}
private static synchronized Level getLevelFor(String name)
@@ -300,7 +305,8 @@
*/
public static LogFileInformation getLogFileInformation()
{
- if (shouldFetchLogFileInfo) {
+ FileAppender fileAppender = getFileAppender();
+ if (shouldFetchLogFileInformation(fileAppender)) {
File logFile = new File(fileAppender.getFile());
LogFileInformation logFileInfo = new LogFileInformation(fileAppender.getFile(), logFile.length());
return logFileInfo;
@@ -331,9 +337,9 @@
* we have single file Appender, the logging level of appender is set to level Error or above and immediateFlush is set to true.
* In future we should be able to enhance this feature to support multiple file appenders.
*/
- private static boolean shouldFetchLogFileInformation()
+ private static boolean shouldFetchLogFileInformation(FileAppender fileAppender)
{
- if (fileAppender != null && isErrorLevelEnable() && fileAppender.getImmediateFlush()) {
+ if (fileAppender != null && isErrorLevelEnable(fileAppender) && fileAppender.getImmediateFlush()) {
return true;
}
logger.warn(
@@ -341,7 +347,7 @@
return false;
}
- private static boolean isErrorLevelEnable()
+ private static boolean isErrorLevelEnable(FileAppender fileAppender)
{
if (fileAppender != null) {
Level p = (Level)fileAppender.getThreshold();
@@ -355,4 +361,107 @@
return false;
}
+ /**
+ * Adds Logger Appender
+ * @param name Appender name
+ * @param properties Appender properties
+ * @return True if the appender has been added successfully
+ */
+ public static boolean addAppender(String name, Properties properties)
+ {
+ if (getAppendersNames().contains(name)) {
+ logger.warn("A logger appender with the name '{}' exists. Cannot add a new logger appender with the same name", name);
+ } else {
+ try {
+ Method method = PropertyConfigurator.class.getDeclaredMethod("parseAppender", Properties.class, String.class);
+ method.setAccessible(true);
+ Appender appender = (Appender)method.invoke(new PropertyConfigurator(), properties, name);
+ if (appender == null) {
+ logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties);
+ } else {
+ LogManager.getRootLogger().addAppender(appender);
+ return true;
+ }
+ } catch (Exception ex) {
+ logger.warn("Cannot add a new logger appender. Name: {}, Properties: {}", name, properties, ex);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Adds Logger Appenders
+ * @param names Names of appender
+ * @param args Args with properties
+ * @param propertySeparator Property separator
+ * @return True if all of the appenders have been added successfully
+ */
+ public static boolean addAppenders(String[] names, String args, String propertySeparator)
+ {
+ if (names == null || args == null || names.length == 0 || propertySeparator == null) {
+ throw new IllegalArgumentException("Incorrect appender parametrs");
+ }
+ boolean status = true;
+ try {
+ Properties properties = new Properties();
+ properties.load(new StringReader(args.replaceAll(propertySeparator, "\n")));
+ for (String name : names) {
+ if (!addAppender(name, properties)) {
+ status = false;
+ }
+ }
+ } catch (IOException ex) {
+ ;
+ }
+ return status;
+ }
+
+ /**
+ * Adds Default Logger Appenders
+ * Syntax of a value of the default appender parameters: {appender-names};{string-with-properties}
+ * Comma is a separator between appender names and properties
+ * @return True if all of the appenders have been added successfully
+ */
+ public static boolean addAppenders()
+ {
+ String appenderParameters = System.getProperty(LOGGER_APPENDER.getLongName());
+ if (appenderParameters != null) {
+ String[] splits = appenderParameters.split(";", 2);
+ if (splits.length != 2) {
+ return false;
+ }
+ return addAppenders(splits[0].split(","), splits[1], ",");
+ }
+ return false;
+ }
+
+ /**
+ * Removes Logger Appender
+ * @param name Appender name
+ * @return True if the appender has been removed successfully
+ */
+ public static boolean removeAppender(String name)
+ {
+ try {
+ LogManager.getRootLogger().removeAppender(name);
+ } catch (Exception ex) {
+ logger.error("Cannot remove the logger appender: {}", name, ex);
+ return false;
+ }
+ return false;
+ }
+
+ /**
+ * Returns a list names of the appenders
+ * @return Names of the appenders
+ */
+ public static List<String> getAppendersNames()
+ {
+ Enumeration enumeration = LogManager.getRootLogger().getAllAppenders();
+ List<String> names = new LinkedList<>();
+ while (enumeration.hasMoreElements()) {
+ names.add(((Appender)enumeration.nextElement()).getName());
+ }
+ return names;
+ }
}
diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
index 5e468f5..a4aca46 100644
--- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
+++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java
@@ -170,12 +170,10 @@
@Override
public void dispatch(Event event)
{
- if (!plugins.isEmpty()) {
- if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
- clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
- } else if (event instanceof DAGExecutionEvent) {
- dispatchExecutionEvent((DAGExecutionEvent)event);
- }
+ if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
+ clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag);
+ } else if (!plugins.isEmpty() && (event instanceof DAGExecutionEvent)) {
+ dispatchExecutionEvent((DAGExecutionEvent)event);
}
}
}
diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
index ca85f5d..455604b 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java
@@ -603,4 +603,36 @@
}
+ private static String APP_NAME = "$test\\\"'";
+
+ @Test
+ public void testAddAttributeToArgs() throws Exception
+ {
+ LogicalPlan dag = new LogicalPlan();
+ dag.setAttribute(LogicalPlan.APPLICATION_NAME, APP_NAME);
+ AddAttributeToArgsOperator operator = dag.addOperator("test", AddAttributeToArgsOperator.class);
+ dag.getContextAttributes(operator).put(OperatorContext.RECOVERY_ATTEMPTS, 0);
+
+ StramClient client = new StramClient(conf, dag);
+ if (StringUtils.isBlank(System.getenv("JAVA_HOME"))) {
+ client.javaCmd = "java";
+ }
+ try {
+ client.start();
+ client.startApplication();
+ Assert.assertTrue(client.monitorApplication());
+ } finally {
+ client.stop();
+ }
+ }
+
+ public static class AddAttributeToArgsOperator extends BaseOperator implements InputOperator
+ {
+ @Override
+ public void emitTuples()
+ {
+ throw APP_NAME.equals(System.getProperty(LogicalPlan.APPLICATION_NAME.getLongName()))
+ ? new ShutdownException() : new RuntimeException();
+ }
+ }
}
diff --git a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
index 2ad3f4a..bc73ca8 100644
--- a/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/util/LoggerUtilTest.java
@@ -26,14 +26,20 @@
import org.junit.runners.MethodSorters;
import org.slf4j.LoggerFactory;
+import org.apache.log4j.Appender;
import org.apache.log4j.Category;
+import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
+import org.apache.log4j.spi.LoggingEvent;
import com.google.common.collect.Maps;
+import com.datatorrent.api.Context;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -165,4 +171,66 @@
}
assertSame(log4jLogger.getEffectiveLevel(), parent.getLevel());
}
+
+ @Test
+ public void testAppender()
+ {
+ logger.info("Running Appender Test");
+ String appenderName = "testAppender";
+ String appenderName1 = "testAppender1";
+ String args = "log4j.appender.testAppender=com.datatorrent.stram.util.LoggerUtilTest$TestAppender"
+ + ",log4j.appender.testAppender.layout=org.apache.log4j.PatternLayout"
+ + ",log4j.appender.testAppender.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n"
+ + ",log4j.appender.testAppender1=org.apache.log4j.ConsoleAppender"
+ + ",log4j.appender.testAppender1.layout=org.apache.log4j.PatternLayout"
+ + ",log4j.appender.testAppender1.layout.ConversionPattern=%d %d{Z} [%t] %-5p (%F:%L) - %m%n";
+
+ assertTrue(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+ TestAppender appender = (TestAppender)LogManager.getRootLogger().getAppender(appenderName);
+
+ logger.info(args);
+ assertEquals(args, appender.lastMessage);
+ assertEquals(appender.level, Level.INFO);
+
+ logger.warn(appenderName1);
+ assertEquals(appenderName1, appender.lastMessage);
+ assertEquals(appender.level, Level.WARN);
+
+ // don't allow to add an appender with the same name
+ assertFalse(LoggerUtil.addAppenders(new String[] {appenderName }, args, ","));
+ logger.info("Test Appender is added: " + LoggerUtil.getAppendersNames());
+ testAndRemoveAppender(appenderName);
+ logger.info("Test Appender is removed: " + LoggerUtil.getAppendersNames());
+
+ System.setProperty(Context.DAGContext.LOGGER_APPENDER.getLongName(), appenderName + "," + appenderName1 + ";" + args);
+ assertTrue(LoggerUtil.addAppenders());
+ logger.info("Test Appenders are added: " + LoggerUtil.getAppendersNames());
+
+ testAndRemoveAppender(appenderName);
+ testAndRemoveAppender(appenderName1);
+ logger.info("Test Appenders are removed: " + LoggerUtil.getAppendersNames());
+ }
+
+ private static void testAndRemoveAppender(String name)
+ {
+ Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(name);
+ assertNotNull(appender);
+ assertTrue(LoggerUtil.getAppendersNames().contains(name));
+ LoggerUtil.removeAppender(name);
+ assertNull(org.apache.log4j.Logger.getRootLogger().getAppender(name));
+ }
+
+ public static class TestAppender extends ConsoleAppender
+ {
+ private String lastMessage = null;
+ private Level level;
+
+ @Override
+ public void append(LoggingEvent event)
+ {
+ lastMessage = event.getRenderedMessage();
+ level = event.getLevel();
+ super.append(event);
+ }
+ }
}
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
index 833d69f..654a4ce 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java
@@ -27,6 +27,8 @@
import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
+import com.datatorrent.api.DAG;
+
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT;
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT;
import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT;
@@ -39,10 +41,13 @@
private int heartbeatCount = 0;
private int commitCount = 0;
CountDownLatch latch = new CountDownLatch(3);
+ private Context context;
@Override
public void setup(DAGExecutionPlugin.Context context)
{
+ this.context = context;
+
context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>()
{
@Override
@@ -102,4 +107,9 @@
{
latch.await(timeout, TimeUnit.SECONDS);
}
+
+ public DAG getLogicalPlan()
+ {
+ return context.getDAG();
+ }
}
diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
index 140dc65..34589b0 100644
--- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
+++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java
@@ -33,6 +33,7 @@
import com.datatorrent.api.Attribute;
import com.datatorrent.stram.api.StramEvent;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
public class PluginTests
@@ -93,12 +94,14 @@
}));
pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234));
pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat()));
+ LogicalPlan plan = new LogicalPlan();
+ pluginManager.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan));
debugPlugin.waitForEventDelivery(10);
pluginManager.stop();
Assert.assertEquals(1, debugPlugin.getEventCount());
Assert.assertEquals(1, debugPlugin.getHeartbeatCount());
Assert.assertEquals(1, debugPlugin.getCommitCount());
+ Assert.assertEquals(plan, debugPlugin.getLogicalPlan());
}
-
}