Merge branch 'S4-60' into piper
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
index 213001d..52f1800 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4ArgsBase.java
@@ -11,7 +11,7 @@
     @Parameter(names = "-help", description = "usage")
     boolean help = false;
 
-    @Parameter(names = "-s4ScriptPath", description = "path of the S4 script", hidden = true, required = true)
+    @Parameter(names = "-s4ScriptPath", description = "path of the S4 script", hidden = true, required = false)
     String s4ScriptPath;
 
     @Parameter(names = "-gradleOpts", variableArity = true, description = "gradle system properties (as in GRADLE_OPTS environment properties) passed to gradle scripts", required = false, converter = GradleOptsConverter.class)
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 733c7ad..4391a49 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -97,7 +97,7 @@
             }
             jc.parse(cliArgs);
         } catch (Exception e) {
-            JCommander.getConsole().println("Cannot parse arguments: " + e.getMessage());
+            JCommander.getConsole().println("Cannot parse arguments: " + e.getClass() + " -> " + e.getMessage());
             jc.usage();
             System.exit(1);
         }
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
index 0842f07..668499a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -1,19 +1,28 @@
 package org.apache.s4.tools;
 
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.commons.io.FileUtils;
+import org.apache.s4.comm.tools.TaskSetup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
 public class ZKServer {
 
+    private static final String TEST_MODE_CLUSTER_CONF_2 = "c=testCluster2:flp=13000:nbTasks=1";
+    private static final String TEST_MODE_CLUSTER_CONF_1 = "c=testCluster1:flp=12000:nbTasks=1";
     static Logger logger = LoggerFactory.getLogger(ZKServer.class);
 
     public static void main(String[] args) {
@@ -39,6 +48,32 @@
 
             ZkServer zkServer = new ZkServer(zkArgs.dataDir, zkArgs.logDir, defaultNameSpace, zkArgs.zkPort);
             zkServer.start();
+
+            logger.info("Zookeeper server started on port [{}]", zkArgs.zkPort);
+
+            // now upload cluster configs if specified or if using test mode
+            List<ClusterConfig> clusterConfigs = zkArgs.clusterConfigs;
+            if (clusterConfigs == null) {
+                if (zkArgs.testMode) {
+                    logger.info("Initializing test mode with default clusters configurations");
+                    clusterConfigs = new ArrayList<ZKServer.ClusterConfig>() {
+                        {
+                            add(new ClusterConfig(TEST_MODE_CLUSTER_CONF_1));
+                            add(new ClusterConfig(TEST_MODE_CLUSTER_CONF_2));
+                        }
+                    };
+                } else {
+                    clusterConfigs = Collections.emptyList();
+                }
+            }
+            for (ClusterConfig config : clusterConfigs) {
+                TaskSetup taskSetup = new TaskSetup("localhost:" + zkArgs.zkPort);
+                taskSetup.clean(config.clusterName);
+                taskSetup.setup(config.clusterName, config.nbTasks, config.firstListeningPort);
+                logger.info("Defined S4 cluster [{}] with [{}] tasks with first listening port [{}]", new String[] {
+                        config.clusterName, String.valueOf(config.nbTasks), String.valueOf(config.firstListeningPort) });
+            }
+
         } catch (Exception e) {
             logger.error("Cannot initialize zookeeper with specified configuration", e);
         }
@@ -61,6 +96,44 @@
         String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
                 + "zookeeper" + File.separator + "log").getAbsolutePath();
 
+        @Parameter(names = { "-t", "-testMode" }, description = "Launch Zookeeper instance and load a default cluster configuration for easy testing (2 clusters with following configs: {"
+                + TEST_MODE_CLUSTER_CONF_1 + "} and {" + TEST_MODE_CLUSTER_CONF_2 + "}")
+        boolean testMode = false;
+
+        @Parameter(names = "-clusters", description = "Inline clusters configuration, comma-separated list of curly-braces enclosed cluster definitions with format: {c=<cluster name>:flp=<first listening port for this cluster>:nbTasks=<number of tasks>} (Overrides default configuration for test mode)", converter = ClusterConfigsConverter.class)
+        List<ClusterConfig> clusterConfigs;
+
+    }
+
+    public static class ClusterConfigsConverter implements IStringConverter<ClusterConfig> {
+
+        @Override
+        public ClusterConfig convert(String arg) {
+            Pattern clusterConfigPattern = Pattern.compile("\\{(c=\\w+[:]flp=\\d+[:]nbTasks=\\d+)\\}");
+            logger.info("processing cluster configuration {}", arg);
+            Matcher configMatcher = clusterConfigPattern.matcher(arg);
+            if (!configMatcher.find()) {
+                throw new IllegalArgumentException("Cannot understand parameter " + arg);
+            }
+            String clusterConfigString = configMatcher.group(1);
+            return new ClusterConfig(clusterConfigString);
+        }
+    }
+
+    public static class ClusterConfig {
+
+        public ClusterConfig(String config) {
+            String[] split = config.split(":");
+            this.clusterName = split[0].split("=")[1];
+            this.firstListeningPort = Integer.valueOf(split[1].split("=")[1]);
+            this.nbTasks = Integer.valueOf(split[2].split("=")[1]);
+
+        }
+
+        String clusterName;
+        int firstListeningPort;
+        int nbTasks;
+
     }
 
 }