Merge branch 'S4-60' into piper
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
index 213001d..52f1800 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
@@ -11,7 +11,7 @@
@Parameter(names = "-help", description = "usage")
boolean help = false;
- @Parameter(names = "-s4ScriptPath", description = "path of the S4 script", hidden = true, required = true)
+ @Parameter(names = "-s4ScriptPath", description = "path of the S4 script", hidden = true, required = false)
String s4ScriptPath;
@Parameter(names = "-gradleOpts", variableArity = true, description = "gradle system properties (as in GRADLE_OPTS environment properties) passed to gradle scripts", required = false, converter = GradleOptsConverter.class)
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 733c7ad..4391a49 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -97,7 +97,7 @@
}
jc.parse(cliArgs);
} catch (Exception e) {
- JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
+ JCommander.getConsole().println("Cannot parse arguments: " + e.getClass() + " -> " + e.getMessage());
jc.usage();
System.exit(1);
}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
index 0842f07..668499a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -1,19 +1,28 @@
package org.apache.s4.tools;
import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkServer;
import org.apache.commons.io.FileUtils;
+import org.apache.s4.comm.tools.TaskSetup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
public class ZKServer {
+ private static final String TEST_MODE_CLUSTER_CONF_2 = "c=testCluster2:flp=13000:nbTasks=1";
+ private static final String TEST_MODE_CLUSTER_CONF_1 = "c=testCluster1:flp=12000:nbTasks=1";
static Logger logger = LoggerFactory.getLogger(ZKServer.class);
public static void main(String[] args) {
@@ -39,6 +48,32 @@
ZkServer zkServer = new ZkServer(zkArgs.dataDir, zkArgs.logDir, defaultNameSpace, zkArgs.zkPort);
zkServer.start();
+
+ logger.info("Zookeeper server started on port [{}]", zkArgs.zkPort);
+
+ // now upload cluster configs if specified or if using test mode
+ List<ClusterConfig> clusterConfigs = zkArgs.clusterConfigs;
+ if (clusterConfigs == null) {
+ if (zkArgs.testMode) {
+ logger.info("Initializing test mode with default clusters configurations");
+ clusterConfigs = new ArrayList<ZKServer.ClusterConfig>() {
+ {
+ add(new ClusterConfig(TEST_MODE_CLUSTER_CONF_1));
+ add(new ClusterConfig(TEST_MODE_CLUSTER_CONF_2));
+ }
+ };
+ } else {
+ clusterConfigs = Collections.emptyList();
+ }
+ }
+ for (ClusterConfig config : clusterConfigs) {
+ TaskSetup taskSetup = new TaskSetup("localhost:" + zkArgs.zkPort);
+ taskSetup.clean(config.clusterName);
+ taskSetup.setup(config.clusterName, config.nbTasks, config.firstListeningPort);
+ logger.info("Defined S4 cluster [{}] with [{}] tasks with first listening port [{}]", new String[] {
+ config.clusterName, String.valueOf(config.nbTasks), String.valueOf(config.firstListeningPort) });
+ }
+
} catch (Exception e) {
logger.error("Cannot initialize zookeeper with specified configuration", e);
}
@@ -61,6 +96,44 @@
String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+ "zookeeper" + File.separator + "log").getAbsolutePath();
+ @Parameter(names = { "-t", "-testMode" }, description = "Launch Zookeeper instance and load a default cluster configuration for easy testing (2 clusters with following configs: {"
+ + TEST_MODE_CLUSTER_CONF_1 + "} and {" + TEST_MODE_CLUSTER_CONF_2 + "}")
+ boolean testMode = false;
+
+ @Parameter(names = "-clusters", description = "Inline clusters configuration, comma-separated list of curly-braces enclosed cluster definitions with format: {c=<cluster name>:flp=<first listening port for this cluster>:nbTasks=<number of tasks>} (Overrides default configuration for test mode)", converter = ClusterConfigsConverter.class)
+ List<ClusterConfig> clusterConfigs;
+
+ }
+
+ public static class ClusterConfigsConverter implements IStringConverter<ClusterConfig> {
+
+ @Override
+ public ClusterConfig convert(String arg) {
+ Pattern clusterConfigPattern = Pattern.compile("\\{(c=\\w+[:]flp=\\d+[:]nbTasks=\\d+)\\}");
+ logger.info("processing cluster configuration {}", arg);
+ Matcher configMatcher = clusterConfigPattern.matcher(arg);
+ if (!configMatcher.find()) {
+ throw new IllegalArgumentException("Cannot understand parameter " + arg);
+ }
+ String clusterConfigString = configMatcher.group(1);
+ return new ClusterConfig(clusterConfigString);
+ }
+ }
+
+ public static class ClusterConfig {
+
+ public ClusterConfig(String config) {
+ String[] split = config.split(":");
+ this.clusterName = split[0].split("=")[1];
+ this.firstListeningPort = Integer.valueOf(split[1].split("=")[1]);
+ this.nbTasks = Integer.valueOf(split[2].split("=")[1]);
+
+ }
+
+ String clusterName;
+ int firstListeningPort;
+ int nbTasks;
+
}
}