SAMOA-29: Fix storm local execution mode duration
diff --git a/bin/samoa-storm.properties b/bin/samoa-storm.properties
index cf46080..423b8b2 100644
--- a/bin/samoa-storm.properties
+++ b/bin/samoa-storm.properties
@@ -33,3 +33,5 @@
# possible values: any integer greater than 0
samoa.storm.numworker=4
+# samoa.storm.local.mode.execution.duration corresponds to the execution duration of the local topology in seconds.
+samoa.storm.local.mode.execution.duration=100
diff --git a/samoa-storm/pom.xml b/samoa-storm/pom.xml
index 2cb1566..fd4029c 100644
--- a/samoa-storm/pom.xml
+++ b/samoa-storm/pom.xml
@@ -121,5 +121,13 @@
</configuration>
</plugin>
</plugins>
+ <resources>
+ <resource>
+ <directory>${project.basedir}/../bin</directory>
+ <includes>
+ <include>*storm.properties</include>
+ </includes>
+ </resource>
+ </resources>
</build>
</project>
diff --git a/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java b/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java
index f2b9c0c..a31fa58 100644
--- a/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java
+++ b/samoa-storm/src/main/java/org/apache/samoa/LocalStormDoTask.java
@@ -27,6 +27,7 @@
import org.apache.samoa.topology.impl.StormTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.configuration.Configuration;
import backtype.storm.Config;
import backtype.storm.utils.Utils;
@@ -40,6 +41,8 @@
public class LocalStormDoTask {
private static final Logger logger = LoggerFactory.getLogger(LocalStormDoTask.class);
+ private static final String EXECUTION_DURATION_KEY ="samoa.storm.local.mode.execution.duration";
+ private static final String SAMOA_STORM_PROPERTY_FILE_LOC ="samoa-storm.properties";
/**
* The main method.
@@ -69,7 +72,10 @@
backtype.storm.LocalCluster cluster = new backtype.storm.LocalCluster();
cluster.submitTopology(topologyName, conf, stormTopo.getStormBuilder().createTopology());
- backtype.storm.utils.Utils.sleep(600 * 1000);
+ // Read local mode execution duration from property file
+ Configuration stormConfig = StormSamoaUtils.getPropertyConfig(LocalStormDoTask.SAMOA_STORM_PROPERTY_FILE_LOC);
+ long executionDuration= stormConfig.getLong(LocalStormDoTask.EXECUTION_DURATION_KEY);
+ backtype.storm.utils.Utils.sleep(executionDuration * 1000);
cluster.killTopology(topologyName);
cluster.shutdown();
diff --git a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java
index 86a5578..7f7e578 100644
--- a/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java
+++ b/samoa-storm/src/main/java/org/apache/samoa/topology/impl/StormSamoaUtils.java
@@ -33,6 +33,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
/**
* Utility class for samoa-storm project. It is used by StormDoTask to process its arguments.
*
@@ -106,4 +110,21 @@
}
return task;
}
+
+ public static Configuration getPropertyConfig(String configPropertyPath){
+ Configuration config = null;
+ try {
+ config = new PropertiesConfiguration(configPropertyPath);
+ if (null == config || config.isEmpty()) {
+ logger.error("Configuration is null or empty at file = {}",configPropertyPath);
+ throw new RuntimeException("Configuration is null or empty : " + configPropertyPath);
+ }
+ }
+ catch(ConfigurationException configurationException)
+ {
+ logger.error("ConfigurationException while reading property file = {}",configurationException);
+ throw new RuntimeException("ConfigurationException while reading property file : " + configPropertyPath);
+ }
+ return config;
+ }
}