[S4-110] Formatting code to make the diff look better
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
index c9cfcb7..cf2cd40 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/HelixBasedCommModule.java
@@ -33,7 +33,7 @@
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.name.Names;
 
-public class HelixBasedCommModule extends AbstractModule{
+public class HelixBasedCommModule extends AbstractModule {
 
     private static Logger logger = LoggerFactory.getLogger(DefaultCommModule.class);
     InputStream commConfigInputStream;
@@ -74,7 +74,8 @@
 
         // a node holds a single partition assignment
         // ==> Assignment and Cluster are singletons so they can be shared between comm layer and app.
-        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(TaskStateModelFactory.class);
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.task.statemodelfactory")).to(
+                TaskStateModelFactory.class);
         bind(Assignment.class).to(AssignmentFromHelix.class);
         bind(Cluster.class).to(ClusterFromHelix.class);
 
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
index 5d79859..4c89aab 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/RemoteEmitterFactory.java
@@ -24,7 +24,7 @@
 /**
  * Used for creating RemoteEmitter instances depending on the cluster configuration. Follows the "assisted injection"
  * pattern from Guice 3.
- *
+ * 
  */
 public interface RemoteEmitterFactory {
 
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
index 9eebd06..ea35bd6 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/S4StateModel.java
@@ -10,52 +10,45 @@
 import org.apache.helix.participant.statemachine.Transition;
 
 @StateModelInfo(states = { "LEADER,STANDBY" }, initialState = "OFFLINE")
-public class S4StateModel extends StateModel
-{
-  private static Logger logger = LoggerFactory.getLogger(S4StateModel.class);
+public class S4StateModel extends StateModel {
+    private static Logger logger = LoggerFactory.getLogger(S4StateModel.class);
 
-  private final String streamName;
-  private final String partitionId;
+    private final String streamName;
+    private final String partitionId;
 
-  public S4StateModel(String partitionName)
-  {
-    String[] parts = partitionName.split("_");
-    this.streamName = parts[0];
-    this.partitionId = parts[1];
-  }
+    public S4StateModel(String partitionName) {
+        String[] parts = partitionName.split("_");
+        this.streamName = parts[0];
+        this.partitionId = parts[1];
+    }
 
-  @Transition(from = "OFFLINE", to = "STANDBY")
-  public void becomeLeaderFromOffline(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
+    @Transition(from = "OFFLINE", to = "STANDBY")
+    public void becomeLeaderFromOffline(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
 
-  @Transition(from = "STANDBY", to = "LEADER")
-  public void becomeLeaderFromStandby(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
+    @Transition(from = "STANDBY", to = "LEADER")
+    public void becomeLeaderFromStandby(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
 
-  @Transition(from = "LEADER", to = "STANDBY")
-  public void becomeStandbyFromLeader(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
+    @Transition(from = "LEADER", to = "STANDBY")
+    public void becomeStandbyFromLeader(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
 
-  @Transition(from = "STANDBY", to = "OFFLINE")
-  public void becomeOfflineFromStandby(Message msg, NotificationContext context)
-  {
-    logger.info("Transitioning from " + msg.getFromState() + " to "
-        + msg.getToState() + "for " + msg.getPartitionName());
-  }
-  
-  @Transition(from = "OFFLINE", to = "DROPPED")
-  public void dropPartition(Message msg, NotificationContext context)
-  {
-    logger.info("Dropping partition" + msg.getPartitionName());
-  }
+    @Transition(from = "STANDBY", to = "OFFLINE")
+    public void becomeOfflineFromStandby(Message msg, NotificationContext context) {
+        logger.info("Transitioning from " + msg.getFromState() + " to " + msg.getToState() + "for "
+                + msg.getPartitionName());
+    }
+
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void dropPartition(Message msg, NotificationContext context) {
+        logger.info("Dropping partition" + msg.getPartitionName());
+    }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
index eef64ea..3b885a8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/helix/TaskStateModelFactory.java
@@ -2,8 +2,7 @@
 
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
-
-public class TaskStateModelFactory extends StateModelFactory<S4StateModel>{
+public class TaskStateModelFactory extends StateModelFactory<S4StateModel> {
 
     @Override
     public S4StateModel createNewStateModel(String partitionName) {
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index a0e5002..120eee3 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -65,8 +65,7 @@
  */
 
 public class TCPEmitter implements Emitter, ClusterChangeListener {
-    private static final Logger logger = LoggerFactory
-            .getLogger(TCPEmitter.class);
+    private static final Logger logger = LoggerFactory.getLogger(TCPEmitter.class);
 
     private final int nettyTimeout;
 
@@ -87,24 +86,21 @@
     private final Lock lock;
 
     @Inject
-    SerializerDeserializer serDeser= new KryoSerDeser();
+    SerializerDeserializer serDeser = new KryoSerDeser();
 
     @Inject
-    public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout)
-            throws InterruptedException {
+    public TCPEmitter(Cluster topology, @Named("s4.comm.timeout") int timeout) throws InterruptedException {
         this.nettyTimeout = timeout;
         this.topology = topology;
         this.lock = new ReentrantLock();
 
         // Initialize data structures
-        //int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
+        // int clusterSize = this.topology.getPhysicalCluster().getNodes().size();
         // TODO cluster can grow in size
-        nodeChannelMap = Maps.synchronizedBiMap(HashBiMap
-                .<InstanceConfig, Channel> create());
+        nodeChannelMap = Maps.synchronizedBiMap(HashBiMap.<InstanceConfig, Channel> create());
 
         // Initialize netty related structures
-        ChannelFactory factory = new NioClientSocketChannelFactory(
-                Executors.newCachedThreadPool(),
+        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
                 Executors.newCachedThreadPool());
         bootstrap = new ClientBootstrap(factory);
         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -137,19 +133,16 @@
         }
 
         try {
-            ChannelFuture connectFuture = this.bootstrap
-                    .connect(new InetSocketAddress(config.getHostName(),
-                            Integer.parseInt(config.getPort())));
+            ChannelFuture connectFuture = this.bootstrap.connect(new InetSocketAddress(config.getHostName(), Integer
+                    .parseInt(config.getPort())));
             connectFuture.await();
             if (connectFuture.isSuccess()) {
                 channels.add(connectFuture.getChannel());
-                nodeChannelMap
-                        .forcePut(config, connectFuture.getChannel());
+                nodeChannelMap.forcePut(config, connectFuture.getChannel());
                 return true;
             }
         } catch (InterruptedException ie) {
-            logger.error(String.format("Interrupted while connecting to %s:%d",
-                    config.getHostName(), config.getPort()));
+            logger.error(String.format("Interrupted while connecting to %s:%d", config.getHostName(), config.getPort()));
             Thread.currentThread().interrupt();
         }
         return false;
@@ -158,8 +151,7 @@
     private void sendMessage(String streamName, int partitionId, byte[] message) {
         ChannelBuffer buffer = ChannelBuffers.buffer(message.length);
         buffer.writeBytes(message);
-        InstanceConfig config = topology
-                .getDestination(streamName, partitionId);
+        InstanceConfig config = topology.getDestination(streamName, partitionId);
         if (!nodeChannelMap.containsKey(config)) {
             if (!connectTo(config)) {
                 // Couldn't connect, discard message
@@ -176,8 +168,7 @@
 
     @Override
     public boolean send(int partitionId, EventMessage message) {
-        sendMessage(message.getStreamName(), partitionId,
-                serDeser.serialize(message));
+        sendMessage(message.getStreamName(), partitionId, serDeser.serialize(message));
         return true;
     }
 
@@ -188,8 +179,7 @@
         }
         c.close().addListener(new ChannelFutureListener() {
             @Override
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
+            public void operationComplete(ChannelFuture future) throws Exception {
                 if (future.isSuccess())
                     channels.remove(future.getChannel());
                 else
@@ -208,18 +198,18 @@
         }
     }
 
-    //@Override
+    // @Override
     public int getPartitionCount() {
         return topology.getPhysicalCluster().getPartitionCount();
     }
+
     public int getPartitionCount(String streamName) {
-    return topology.getPhysicalCluster().getPartitionCount();
-  }
+        return topology.getPhysicalCluster().getPartitionCount();
+    }
 
     class ExceptionHandler extends SimpleChannelUpstreamHandler {
         @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-                throws Exception {
+        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
             Throwable t = e.getCause();
             if (t instanceof ClosedChannelException) {
                 nodeChannelMap.inverse().remove(e.getChannel());
@@ -248,10 +238,8 @@
                 try {
                     // TODO handle possible cluster reconfiguration between send
                     // and failure callback
-                    logger.warn(
-                            "Failed to send message to node {} (according to current cluster information)",
-                            topology.getPhysicalCluster().getNodes()
-                                    .get(partitionId));
+                    logger.warn("Failed to send message to node {} (according to current cluster information)",
+                            topology.getPhysicalCluster().getNodes().get(partitionId));
                 } catch (IndexOutOfBoundsException ignored) {
                     // cluster was changed
                 }
@@ -262,6 +250,6 @@
 
     @Override
     public void onChange() {
-        
+
     }
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
index 952306a..dfe98bf 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tools/TaskSetup.java
@@ -34,100 +34,80 @@
  * Used for defining and dimensioning logical clusters in Zookeeper.
  * 
  */
-public class TaskSetup
-{
+public class TaskSetup {
 
-  private ZkClient zkclient;
-  private boolean isHelixEnabled = true;
-  private HelixAdmin helixAdmin;
-  org.apache.helix.manager.zk.ZkClient helixZkClient;
+    private ZkClient zkclient;
+    private boolean isHelixEnabled = true;
+    private HelixAdmin helixAdmin;
+    org.apache.helix.manager.zk.ZkClient helixZkClient;
 
-  public TaskSetup(String zookeeperAddress)
-  {
-    if (isHelixEnabled)
-    {
-      helixAdmin = new ZKHelixAdmin(zookeeperAddress);
-    } else
-    {
-      zkclient = new ZkClient(zookeeperAddress);
-      zkclient.setZkSerializer(new ZNRecordSerializer());
-      if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS))
-      {
-        throw new RuntimeException(
-            "Could not connect to ZooKeeper after 10 seconds.");
-      }
-    }
-  }
-
-  public void clean(String clusterName)
-  {
-    if (isHelixEnabled)
-    {
-      helixAdmin.dropCluster(clusterName);
-    } else
-    {
-      zkclient.deleteRecursive("/s4/clusters/" + clusterName);
-    }
-  }
-
-  public void setup(String cluster, int tasks, int initialPort)
-  {
-    if (isHelixEnabled)
-    {
-      helixAdmin.addCluster(cluster, false);
-      StateModelDefinition onlineofflinemodel = new StateModelDefinition(
-          new StateModelConfigGenerator().generateConfigForOnlineOffline());
-      StateModelDefinition leaderstandbymodel = new StateModelDefinition(
-          new StateModelConfigGenerator().generateConfigForLeaderStandby());
-
-      helixAdmin.addStateModelDef(cluster, "OnlineOffline", onlineofflinemodel);
-      helixAdmin.addStateModelDef(cluster, "LeaderStandby", leaderstandbymodel);
-      
-      for (int i = 0; i < tasks; i++)
-      {
-        InstanceConfig instanceConfig = new InstanceConfig("localhost_"
-            + initialPort);
-        instanceConfig.setHostName("localhost");
-        instanceConfig.setPort("" + initialPort);
-        helixAdmin.addInstance(cluster, instanceConfig);
-        initialPort = initialPort + 1;
-      }
-
-      return;
-    }
-    try
-    {
-      zkclient.createPersistent("/s4/streams", true);
-    } catch (ZkException ignored)
-    {
-      // ignore if exists
+    public TaskSetup(String zookeeperAddress) {
+        if (isHelixEnabled) {
+            helixAdmin = new ZKHelixAdmin(zookeeperAddress);
+        } else {
+            zkclient = new ZkClient(zookeeperAddress);
+            zkclient.setZkSerializer(new ZNRecordSerializer());
+            if (!zkclient.waitUntilConnected(10, TimeUnit.SECONDS)) {
+                throw new RuntimeException("Could not connect to ZooKeeper after 10 seconds.");
+            }
+        }
     }
 
-    zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
-    zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
-    zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
-    for (int i = 0; i < tasks; i++)
-    {
-      String taskId = "Task-" + i;
-      ZNRecord record = new ZNRecord(taskId);
-      record.putSimpleField("taskId", taskId);
-      record.putSimpleField("port", String.valueOf(initialPort + i));
-      record.putSimpleField("partition", String.valueOf(i));
-      record.putSimpleField("cluster", cluster);
-      zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId,
-          record);
+    public void clean(String clusterName) {
+        if (isHelixEnabled) {
+            helixAdmin.dropCluster(clusterName);
+        } else {
+            zkclient.deleteRecursive("/s4/clusters/" + clusterName);
+        }
     }
-  }
 
-  public void disconnect()
-  {
-    if (isHelixEnabled)
-    {
-      helixZkClient.close();
-    } else
-    {
-      zkclient.close();
+    public void setup(String cluster, int tasks, int initialPort) {
+        if (isHelixEnabled) {
+            helixAdmin.addCluster(cluster, false);
+            StateModelDefinition onlineofflinemodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForOnlineOffline());
+            StateModelDefinition leaderstandbymodel = new StateModelDefinition(
+                    new StateModelConfigGenerator().generateConfigForLeaderStandby());
+
+            helixAdmin.addStateModelDef(cluster, "OnlineOffline", onlineofflinemodel);
+            helixAdmin.addStateModelDef(cluster, "LeaderStandby", leaderstandbymodel);
+
+            for (int i = 0; i < tasks; i++) {
+                InstanceConfig instanceConfig = new InstanceConfig("localhost_" + initialPort);
+                instanceConfig.setHostName("localhost");
+                instanceConfig.setPort("" + initialPort);
+                helixAdmin.addInstance(cluster, instanceConfig);
+                initialPort = initialPort + 1;
+            }
+
+            return;
+        }
+        try {
+            zkclient.createPersistent("/s4/streams", true);
+        } catch (ZkException ignored) {
+            // ignore if exists
+        }
+
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks", true);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/process", true);
+        zkclient.createPersistent("/s4/clusters/" + cluster + "/app", true);
+        for (int i = 0; i < tasks; i++) {
+            String taskId = "Task-" + i;
+            ZNRecord record = new ZNRecord(taskId);
+            record.putSimpleField("taskId", taskId);
+            record.putSimpleField("port", String.valueOf(initialPort + i));
+            record.putSimpleField("partition", String.valueOf(i));
+            record.putSimpleField("cluster", cluster);
+            zkclient.createPersistent("/s4/clusters/" + cluster + "/tasks/" + taskId, record);
+        }
     }
-  }
+
+    public void disconnect() {
+        if (isHelixEnabled) {
+            helixZkClient.close();
+        } else {
+            zkclient.close();
+        }
+    }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
index 7e0b4ac..243faac 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -35,135 +35,109 @@
 import com.google.inject.name.Named;
 
 @Singleton
-public class AssignmentFromHelix implements Assignment
-{
-  private static final Logger logger = LoggerFactory
-      .getLogger(AssignmentFromHelix.class);
+public class AssignmentFromHelix implements Assignment {
+    private static final Logger logger = LoggerFactory.getLogger(AssignmentFromHelix.class);
 
-  private String clusterName;
-  private final String zookeeperAddress;
-  private String machineId;
-  private HelixManager zkHelixManager;
-  
-  private HelixDataAccessor helixDataAccessor;
-  AtomicReference<ClusterNode> clusterNodeRef;
-  private final Lock lock;
-  private final AtomicBoolean currentlyOwningTask;
-  private final Condition taskAcquired;
+    private String clusterName;
+    private final String zookeeperAddress;
+    private String machineId;
+    private HelixManager zkHelixManager;
 
-  private final StateModelFactory<? extends StateModel> taskStateModelFactory;
-  //private final StateModelFactory<? extends StateModel> appStateModelFactory;
+    private HelixDataAccessor helixDataAccessor;
+    AtomicReference<ClusterNode> clusterNodeRef;
+    private final Lock lock;
+    private final AtomicBoolean currentlyOwningTask;
+    private final Condition taskAcquired;
 
-  @Inject
-  public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
-                             @Named("s4.instance.name") String instanceName,
-                             @Named("s4.cluster.zk_address") String zookeeperAddress 
-                             ) throws Exception
-  {
-    this.taskStateModelFactory = new TaskStateModelFactory();
-//    this.appStateModelFactory = appStateModelFactory;
-    this.clusterName = clusterName;
-    this.zookeeperAddress = zookeeperAddress;
-    machineId = "localhost";
-    lock = new ReentrantLock();
-    ZkClient zkClient = new ZkClient(zookeeperAddress);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
-    zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
-    BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(
-        zkClient);
-    helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
-    clusterNodeRef = new AtomicReference<ClusterNode>();
-    taskAcquired = lock.newCondition();
-    currentlyOwningTask = new AtomicBoolean(true);
-    try
-    {
-      machineId = InetAddress.getLocalHost().getCanonicalHostName();
-    } catch (UnknownHostException e)
-    {
-      logger.warn("Unable to get hostname", e);
-      machineId = "UNKNOWN";
-    }
-    ClusterNode node = new ClusterNode(-1,
-        Integer.parseInt(instanceName.split("_")[1]), machineId,
-        instanceName);
-    clusterNodeRef.set(node);
-    currentlyOwningTask.set(true);
-  }
+    private final StateModelFactory<? extends StateModel> taskStateModelFactory;
 
-  @Inject
-  public void init()
-  {
-    //joinCluster();
-  }
+    // private final StateModelFactory<? extends StateModel> appStateModelFactory;
 
-  @Override
-  public ClusterNode assignClusterNode()
-  {
-    lock.lock();
-    try
-    {
-      while (!currentlyOwningTask.get())
-      {
-        taskAcquired.awaitUninterruptibly();
-      }
-    } catch (Exception e)
-    {
-      logger.error("Exception while waiting to join the cluster");
-      return null;
-    } finally
-    {
-      lock.unlock();
-    }
-    return clusterNodeRef.get();
-  }
-
-  public void joinClusterOld()
-  {
-    lock.lock();
-    try
-    {
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      do
-      {
-        List<InstanceConfig> instances = helixDataAccessor
-            .getChildValues(keyBuilder.instanceConfigs());
-        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
-            .liveInstances());
-        for (InstanceConfig instanceConfig : instances)
-        {
-          String instanceName = instanceConfig.getInstanceName();
-          if (!liveInstances.contains(instanceName))
-          {
-            zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName,
-                instanceName, InstanceType.PARTICIPANT, zookeeperAddress);
-            zkHelixManager.getStateMachineEngine().registerStateModelFactory(
-                "LeaderStandby", taskStateModelFactory);
-          
-            zkHelixManager.connect();
-            ClusterNode node = new ClusterNode(-1,
-                Integer.parseInt(instanceConfig.getPort()), machineId,
-                instanceName);
-            clusterNodeRef.set(node);
-            currentlyOwningTask.set(true);
-            taskAcquired.signalAll();
-            break;
-          }
+    @Inject
+    public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
+            throws Exception {
+        this.taskStateModelFactory = new TaskStateModelFactory();
+        // this.appStateModelFactory = appStateModelFactory;
+        this.clusterName = clusterName;
+        this.zookeeperAddress = zookeeperAddress;
+        machineId = "localhost";
+        lock = new ReentrantLock();
+        ZkClient zkClient = new ZkClient(zookeeperAddress);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
+        BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+        helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+        clusterNodeRef = new AtomicReference<ClusterNode>();
+        taskAcquired = lock.newCondition();
+        currentlyOwningTask = new AtomicBoolean(true);
+        try {
+            machineId = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            logger.warn("Unable to get hostname", e);
+            machineId = "UNKNOWN";
         }
-        if (instances.size() == liveInstances.size())
-        {
-          System.out
-              .println("No more nodes can join the cluster. Will wait for some node to die.");
-          Thread.sleep(100000);
-        }
-      } while (!currentlyOwningTask.get());
-      System.out.println("Joined the cluster:"+ clusterName +" as "+ clusterNodeRef.get().getTaskId());
-    } catch (Exception e)
-    {
-      e.printStackTrace();
-    } finally
-    {
-      lock.unlock();
+        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
+        clusterNodeRef.set(node);
+        currentlyOwningTask.set(true);
     }
-  }
+
+    @Inject
+    public void init() {
+        // joinCluster();
+    }
+
+    @Override
+    public ClusterNode assignClusterNode() {
+        lock.lock();
+        try {
+            while (!currentlyOwningTask.get()) {
+                taskAcquired.awaitUninterruptibly();
+            }
+        } catch (Exception e) {
+            logger.error("Exception while waiting to join the cluster");
+            return null;
+        } finally {
+            lock.unlock();
+        }
+        return clusterNodeRef.get();
+    }
+
+    public void joinClusterOld() {
+        lock.lock();
+        try {
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            do {
+                List<InstanceConfig> instances = helixDataAccessor.getChildValues(keyBuilder.instanceConfigs());
+                List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+                for (InstanceConfig instanceConfig : instances) {
+                    String instanceName = instanceConfig.getInstanceName();
+                    if (!liveInstances.contains(instanceName)) {
+                        zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+                                InstanceType.PARTICIPANT, zookeeperAddress);
+                        zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
+                                taskStateModelFactory);
+
+                        zkHelixManager.connect();
+                        ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
+                                instanceName);
+                        clusterNodeRef.set(node);
+                        currentlyOwningTask.set(true);
+                        taskAcquired.signalAll();
+                        break;
+                    }
+                }
+                if (instances.size() == liveInstances.size()) {
+                    System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
+                    Thread.sleep(100000);
+                }
+            } while (!currentlyOwningTask.get());
+            System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            lock.unlock();
+        }
+    }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 0b81854..c482624 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -44,7 +44,7 @@
 
 /**
  * Handles partition assignment through Zookeeper.
- *
+ * 
  */
 @Singleton
 public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 3226fab..19e0628 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -46,172 +46,145 @@
 import org.apache.helix.spectator.RoutingTableProvider;
 
 /**
- * Represents a logical cluster definition fetched from Zookeeper. Notifies
- * listeners of runtime changes in the configuration.
+ * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
+ * configuration.
  * 
  */
-public class ClusterFromHelix extends RoutingTableProvider implements Cluster
-{
+public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
 
-  private static Logger logger = LoggerFactory
-      .getLogger(ClusterFromHelix.class);
+    private static Logger logger = LoggerFactory.getLogger(ClusterFromHelix.class);
 
-  private final String clusterName;
-  private final AtomicReference<PhysicalCluster> clusterRef;
-  private final List<ClusterChangeListener> listeners;
-  private final Lock lock;
-  private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
+    private final String clusterName;
+    private final AtomicReference<PhysicalCluster> clusterRef;
+    private final List<ClusterChangeListener> listeners;
+    private final Lock lock;
+    private final AtomicReference<Map<String, Integer>> partitionCountMapRef;
 
-  /**
-   * only the local topology
-   */
-  @Inject
-  public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
-      @Named("s4.cluster.zk_address") String zookeeperAddress,
-      @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-      @Named("s4.cluster.zk_connection_timeout") int connectionTimeout)
-      throws Exception
-  {
-    this.clusterName = clusterName;
-    Map<String, Integer> map = Collections.emptyMap();
-    partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
-    this.clusterRef = new AtomicReference<PhysicalCluster>();
-    this.listeners = new ArrayList<ClusterChangeListener>();
-    lock = new ReentrantLock();
+    /**
+     * only the local topology
+     */
+    @Inject
+    public ClusterFromHelix(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        this.clusterName = clusterName;
+        Map<String, Integer> map = Collections.emptyMap();
+        partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        lock = new ReentrantLock();
 
-  }
+    }
 
-  /**
-   * any topology
-   */
-  public ClusterFromHelix(String clusterName, ZkClient zkClient,
-      String machineId)
-  {
-    this.clusterName = clusterName;
-    Map<String, Integer> map = Collections.emptyMap();
-    partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
-    this.clusterRef = new AtomicReference<PhysicalCluster>();
-    this.listeners = new ArrayList<ClusterChangeListener>();
-    lock = new ReentrantLock();
+    /**
+     * any topology
+     */
+    public ClusterFromHelix(String clusterName, ZkClient zkClient, String machineId) {
+        this.clusterName = clusterName;
+        Map<String, Integer> map = Collections.emptyMap();
+        partitionCountMapRef = new AtomicReference<Map<String, Integer>>(map);
+        this.clusterRef = new AtomicReference<PhysicalCluster>();
+        this.listeners = new ArrayList<ClusterChangeListener>();
+        lock = new ReentrantLock();
 
-  }
+    }
 
-  @Override
-  public void onExternalViewChange(List<ExternalView> externalViewList,
-      NotificationContext changeContext)
-  {
-    lock.lock();
-    try
-    {
-      logger.info("Start:Processing change in cluster topology");
-      super.onExternalViewChange(externalViewList, changeContext);
-      HelixManager manager = changeContext.getManager();
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      ConfigAccessor configAccessor = manager.getConfigAccessor();
-      ConfigScopeBuilder builder = new ConfigScopeBuilder();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      List<String> resources = helixDataAccessor.getChildNames(keyBuilder
-          .idealStates());
-      Map<String,Integer> map = new HashMap<String, Integer>();
-      for (String resource : resources)
-      {
-        String resourceType = configAccessor.get(
-            builder.forCluster(clusterName).forResource(resource)
-                .build(), "type");
-        if("Task".equalsIgnoreCase(resourceType)){
-          String streamName = configAccessor.get(
-              builder.forCluster(clusterName).forResource(resource)
-                  .build(), "streamName");
-          IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
-          map.put(streamName, idealstate.getNumPartitions());
+    @Override
+    public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) {
+        lock.lock();
+        try {
+            logger.info("Start:Processing change in cluster topology");
+            super.onExternalViewChange(externalViewList, changeContext);
+            HelixManager manager = changeContext.getManager();
+            HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+            ConfigAccessor configAccessor = manager.getConfigAccessor();
+            ConfigScopeBuilder builder = new ConfigScopeBuilder();
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            List<String> resources = helixDataAccessor.getChildNames(keyBuilder.idealStates());
+            Map<String, Integer> map = new HashMap<String, Integer>();
+            for (String resource : resources) {
+                String resourceType = configAccessor.get(builder.forCluster(clusterName).forResource(resource).build(),
+                        "type");
+                if ("Task".equalsIgnoreCase(resourceType)) {
+                    String streamName = configAccessor.get(builder.forCluster(clusterName).forResource(resource)
+                            .build(), "streamName");
+                    IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(resource));
+                    map.put(streamName, idealstate.getNumPartitions());
+                }
+            }
+            partitionCountMapRef.set(map);
+            for (ClusterChangeListener listener : listeners) {
+                listener.onChange();
+            }
+            logger.info("End:Processing change in cluster topology");
+
+        } catch (Exception e) {
+            logger.error("", e);
+        } finally {
+            lock.unlock();
         }
-      }
-      partitionCountMapRef.set(map);
-      for (ClusterChangeListener listener : listeners)
-      {
-        listener.onChange();
-      }
-      logger.info("End:Processing change in cluster topology");
-
-    } catch (Exception e)
-    {
-      logger.error("", e);
-    } finally
-    {
-      lock.unlock();
     }
-  }
 
-  @Override
-  public PhysicalCluster getPhysicalCluster()
-  {
-    return clusterRef.get();
-  }
-
-  @Override
-  public void addListener(ClusterChangeListener listener)
-  {
-    logger.info("Adding topology change listener:" + listener);
-    listeners.add(listener);
-  }
-
-  @Override
-  public void removeListener(ClusterChangeListener listener)
-  {
-    logger.info("Removing topology change listener:" + listener);
-    listeners.remove(listener);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result
-        + ((clusterName == null) ? 0 : clusterName.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj)
-  {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    ClusterFromHelix other = (ClusterFromHelix) obj;
-    if (clusterName == null)
-    {
-      if (other.clusterName != null)
-        return false;
-    } else if (!clusterName.equals(other.clusterName))
-      return false;
-    return true;
-  }
-
-  @Override
-  public InstanceConfig getDestination(String streamName, int partitionId)
-  {
-    List<InstanceConfig> instances = getInstances(streamName, streamName + "_"
-        + partitionId, "LEADER");
-    if (instances.size() == 1)
-    {
-      return instances.get(0);
-    } else
-    {
-      return null;
+    @Override
+    public PhysicalCluster getPhysicalCluster() {
+        return clusterRef.get();
     }
-  }
-  
-  @Override
-  public Integer getPartitionCount(String streamName){
-    Integer numPartitions = partitionCountMapRef.get().get(streamName);
-    if(numPartitions==null){
-      return -1;
+
+    @Override
+    public void addListener(ClusterChangeListener listener) {
+        logger.info("Adding topology change listener:" + listener);
+        listeners.add(listener);
     }
-    return numPartitions;
-  }
+
+    @Override
+    public void removeListener(ClusterChangeListener listener) {
+        logger.info("Removing topology change listener:" + listener);
+        listeners.remove(listener);
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ClusterFromHelix other = (ClusterFromHelix) obj;
+        if (clusterName == null) {
+            if (other.clusterName != null)
+                return false;
+        } else if (!clusterName.equals(other.clusterName))
+            return false;
+        return true;
+    }
+
+    @Override
+    public InstanceConfig getDestination(String streamName, int partitionId) {
+        List<InstanceConfig> instances = getInstances(streamName, streamName + "_" + partitionId, "LEADER");
+        if (instances.size() == 1) {
+            return instances.get(0);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public Integer getPartitionCount(String streamName) {
+        Integer numPartitions = partitionCountMapRef.get().get(streamName);
+        if (numPartitions == null) {
+            return -1;
+        }
+        return numPartitions;
+    }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index f6d4df9..3e4567a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -42,7 +42,7 @@
 /**
  * Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
  * configuration.
- *
+ * 
  */
 public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
 
@@ -216,15 +216,13 @@
     }
 
     @Override
-    public InstanceConfig getDestination(String streamName, int partitionId)
-    {
-      return null;
+    public InstanceConfig getDestination(String streamName, int partitionId) {
+        return null;
     }
 
     @Override
-    public Integer getPartitionCount(String streamName)
-    {
-      return null;
+    public Integer getPartitionCount(String streamName) {
+        return null;
     }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
index e622107..1e117c8 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterNode.java
@@ -20,7 +20,7 @@
 
 /**
  * Represents an node.
- *
+ * 
  */
 public class ClusterNode {
     private int partition;
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
index 217d4e7..4e5351b 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromHelix.java
@@ -55,11 +55,8 @@
         this.clusterName = clusterName;
         this.connectionTimeout = connectionTimeout;
 
-
     }
 
-  
-
     public Cluster getCluster(String clusterName) {
         return clusters.get(clusterName);
     }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
index ce53c1a..cbf209e 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/PhysicalCluster.java
@@ -99,8 +99,9 @@
     }
 
     public int getPartitionCount(String stream) {
-      return numPartitions;
-  }
+        return numPartitions;
+    }
+
     /**
      * @param node
      */
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index 7720bbf..7b75ff4 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -107,7 +107,9 @@
 
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.topology.RemoteStreamsManager#getConsumers(java.lang.String)
      */
     @Override
@@ -186,8 +188,11 @@
         streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.s4.comm.topology.RemoteStreamsManager#addOutputStream(java.lang.String, java.lang.String, java.lang.String)
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.comm.topology.RemoteStreamsManager#addOutputStream(java.lang.String, java.lang.String,
+     * java.lang.String)
      */
     @Override
     public void addOutputStream(String appId, String clusterName, String streamName) {
@@ -219,7 +224,9 @@
         zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
     }
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.comm.topology.RemoteStreamsManager#addInputStream(int, java.lang.String, java.lang.String)
      */
     @Override
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
index 94c95b6..3eb6d6a 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManager.java
@@ -2,22 +2,19 @@
 
 import java.util.Set;
 
-public interface RemoteStreamsManager
-{
+public interface RemoteStreamsManager {
 
-  public abstract Set<StreamConsumer> getConsumers(String streamName);
+    public abstract Set<StreamConsumer> getConsumers(String streamName);
 
-  public abstract void addOutputStream(String appId, String clusterName,
-      String streamName);
+    public abstract void addOutputStream(String appId, String clusterName, String streamName);
 
-  /**
-   * Publishes interest in a stream from an application.
-   * 
-   * @param appId
-   * @param clusterName
-   * @param streamName
-   */
-  public abstract void addInputStream(int appId, String clusterName,
-      String streamName);
+    /**
+     * Publishes interest in a stream from an application.
+     * 
+     * @param appId
+     * @param clusterName
+     * @param streamName
+     */
+    public abstract void addInputStream(int appId, String clusterName, String streamName);
 
-}
\ No newline at end of file
+}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
index 957318d..845e649 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreamsManagerImpl.java
@@ -2,26 +2,21 @@
 
 import java.util.Set;
 
-public class RemoteStreamsManagerImpl implements RemoteStreamsManager
-{
+public class RemoteStreamsManagerImpl implements RemoteStreamsManager {
 
-  @Override
-  public Set<StreamConsumer> getConsumers(String streamName)
-  {
-    return null;
-  }
+    @Override
+    public Set<StreamConsumer> getConsumers(String streamName) {
+        return null;
+    }
 
-  @Override
-  public void addOutputStream(String appId, String clusterName,
-      String streamName)
-  {
-    
-  }
+    @Override
+    public void addOutputStream(String appId, String clusterName, String streamName) {
 
-  @Override
-  public void addInputStream(int appId, String clusterName, String streamName)
-  {
-    
-  }
+    }
+
+    @Override
+    public void addInputStream(int appId, String clusterName, String streamName) {
+
+    }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
index b55f469..9858aef 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZNRecordSerializer.java
@@ -24,12 +24,12 @@
 import com.google.common.base.Preconditions;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+
 /**
  * 
  * Utility to serialize/deserialize data in ZK. <br/>
- * Using Json format and Gson library. 
- * TODO: Explore other libraries like jackson much richer features.
- * Gson needs no-arg constructor to work with without additional work
+ * Using Json format and Gson library. TODO: Explore other libraries like jackson much richer features. Gson needs
+ * no-arg constructor to work with without additional work
  */
 public class ZNRecordSerializer implements ZkSerializer {
 
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
index e5a8689..cab6c0f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkClient.java
@@ -25,72 +25,70 @@
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.data.Stat;
+
 /**
  * 
  * Overwriting the ZKclient since the org.I0Itec.zkclient.ZkClient does not expose some important methods
  */
 public class ZkClient extends org.I0Itec.zkclient.ZkClient {
 
-	public ZkClient(IZkConnection connection, int connectionTimeout,
-			ZkSerializer zkSerializer) {
-		super(connection, connectionTimeout, zkSerializer);
-	}
+    public ZkClient(IZkConnection connection, int connectionTimeout, ZkSerializer zkSerializer) {
+        super(connection, connectionTimeout, zkSerializer);
+    }
 
-	public ZkClient(IZkConnection connection, int connectionTimeout) {
-		super(connection, connectionTimeout);
-	}
+    public ZkClient(IZkConnection connection, int connectionTimeout) {
+        super(connection, connectionTimeout);
+    }
 
-	public ZkClient(IZkConnection connection) {
-		super(connection);
-	}
+    public ZkClient(IZkConnection connection) {
+        super(connection);
+    }
 
-	public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
-		super(zkServers, sessionTimeout, connectionTimeout);
-	}
+    public ZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
+        super(zkServers, sessionTimeout, connectionTimeout);
+    }
 
-	public ZkClient(String zkServers, int connectionTimeout) {
-		super(zkServers, connectionTimeout);
-	}
+    public ZkClient(String zkServers, int connectionTimeout) {
+        super(zkServers, connectionTimeout);
+    }
 
-	public ZkClient(String serverstring) {
-		super(serverstring);
-	}
+    public ZkClient(String serverstring) {
+        super(serverstring);
+    }
 
-	public IZkConnection getConnection() {
-		return _connection;
-	}
-	
-	public long getSessionId(){
-		return ((ZkConnection)_connection).getZookeeper().getSessionId();
-	}
+    public IZkConnection getConnection() {
+        return _connection;
+    }
 
-	public Stat getStat(final String path) {
-		Stat stat = retryUntilConnected(new Callable<Stat>() {
+    public long getSessionId() {
+        return ((ZkConnection) _connection).getZookeeper().getSessionId();
+    }
 
-			@Override
-			public Stat call() throws Exception {
-				Stat stat = ((ZkConnection) _connection).getZookeeper().exists(
-						path, false);
-				return stat;
-			}
-		});
+    public Stat getStat(final String path) {
+        Stat stat = retryUntilConnected(new Callable<Stat>() {
 
-		return stat;
-	}
+            @Override
+            public Stat call() throws Exception {
+                Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
+                return stat;
+            }
+        });
 
-	@SuppressWarnings("unchecked")
-	@Override
-	public <T extends Object> T readData(String path,
-			boolean returnNullIfPathNotExists) {
-		T data = null;
-		try {
-			data = (T) readData(path, null);
-		} catch (ZkNoNodeException e) {
-			if (!returnNullIfPathNotExists) {
-				throw e;
-			}
-		}
-		return data;
-	}
+        return stat;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) {
+        T data = null;
+        try {
+            data = (T) readData(path, null);
+        } catch (ZkNoNodeException e) {
+            if (!returnNullIfPathNotExists) {
+                throw e;
+            }
+        }
+        return data;
+    }
 
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
index fb18dd6..a326535 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java
@@ -39,7 +39,7 @@
 
 /**
  * UDP based emitter.
- *
+ * 
  */
 public class UDPEmitter implements Emitter, ClusterChangeListener {
     private DatagramSocket socket;
@@ -103,11 +103,11 @@
         return true;
     }
 
-   // @Override
+    // @Override
     public int getPartitionCount() {
         return topology.getPhysicalCluster().getPartitionCount();
     }
-    
+
     @Override
     public void onChange() {
         refreshCluster();
@@ -131,8 +131,7 @@
     }
 
     @Override
-    public int getPartitionCount(String stream)
-    {
-      return topology.getPhysicalCluster().getPartitionCount(stream);      
+    public int getPartitionCount(String stream) {
+        return topology.getPhysicalCluster().getPartitionCount(stream);
     }
 }
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
index 050a6ec..da015d7 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
@@ -25,7 +25,7 @@
 
 /**
  * UDP-based emitter for sending events to remote clusters.
- *
+ * 
  */
 public class UDPRemoteEmitter extends UDPEmitter {
 
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index 97ec9af..1c07870 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -83,14 +83,14 @@
         /* Use Kryo to serialize events. */
         bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 
-        bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(AppStateModelFactory.class);
+        bind(StateModelFactory.class).annotatedWith(Names.named("s4.app.statemodelfactory")).to(
+                AppStateModelFactory.class);
 
         bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class);
-        
-        bind(RemoteSendersManager.class).to(RemoteSendersManagerImpl.class);
-        
-        bind(RemoteStreamsManager.class).to(RemoteStreamsManagerImpl.class);
 
+        bind(RemoteSendersManager.class).to(RemoteSendersManagerImpl.class);
+
+        bind(RemoteStreamsManager.class).to(RemoteStreamsManagerImpl.class);
 
         bind(S4RLoaderFactory.class);
 
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 810b8f4..9ae356b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -155,11 +155,12 @@
                 }
                 combinedModule = Modules.override(combinedModule).with(new ParametersInjectionModule(namedParameters));
             }
-            
+
             injector = Guice.createInjector(combinedModule);
-            //start a HelixController to manage the cluster
+            // start a HelixController to manage the cluster
             String controllerName = Inet4Address.getLocalHost().getCanonicalHostName() + UUID.randomUUID().toString();
-            HelixControllerMain.startHelixController(mainArgs.zkConnectionString, mainArgs.clusterName, controllerName, HelixControllerMain.STANDALONE);
+            HelixControllerMain.startHelixController(mainArgs.zkConnectionString, mainArgs.clusterName, controllerName,
+                    HelixControllerMain.STANDALONE);
 
             if (mainArgs.appClass != null) {
                 logger.info("Starting S4 node with single application from class [{}]", mainArgs.appClass);
@@ -190,7 +191,7 @@
 
         @Parameter(names = { "-c", "-cluster" }, description = "cluster name", required = true)
         String clusterName = null;
-        
+
         @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
         String instanceName = null;
 
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index 6bc52e3..c8da41a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -24,7 +24,7 @@
 
 /**
  * Sends events to a remote cluster.
- *
+ * 
  */
 public class RemoteSender {
 
@@ -41,7 +41,8 @@
     public void send(String hashKey, EventMessage eventMessage) {
         if (hashKey == null) {
             // round robin by default
-            emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())), eventMessage);
+            emitter.send(Math.abs(targetPartition++ % emitter.getPartitionCount(eventMessage.getStreamName())),
+                    eventMessage);
         } else {
             int partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(eventMessage.getStreamName()));
             emitter.send(partition, eventMessage);
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
index a43aa7d..e8ed8e0 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
@@ -38,7 +38,7 @@
 /**
  * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
  * the event.
- *
+ * 
  */
 public class RemoteSenders implements RemoteSendersManager {
 
@@ -61,7 +61,9 @@
 
     Map<String, RemoteSender> sendersByTopology = new HashMap<String, RemoteSender>();
 
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     * 
      * @see org.apache.s4.core.RemoteSendersManager#send(java.lang.String, org.apache.s4.base.Event)
      */
     @Override
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
index 3e0e462..0f1f9fa 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManager.java
@@ -2,9 +2,8 @@
 
 import org.apache.s4.base.Event;
 
-public interface RemoteSendersManager
-{
+public interface RemoteSendersManager {
 
-  public abstract void send(String hashKey, Event event);
+    public abstract void send(String hashKey, Event event);
 
-}
\ No newline at end of file
+}
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
index 13f8ac8..ed2ce4a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSendersManagerImpl.java
@@ -2,13 +2,11 @@
 
 import org.apache.s4.base.Event;
 
-public class RemoteSendersManagerImpl implements RemoteSendersManager
-{
+public class RemoteSendersManagerImpl implements RemoteSendersManager {
 
-  @Override
-  public void send(String hashKey, Event event)
-  {
-    
-  }
+    @Override
+    public void send(String hashKey, Event event) {
+
+    }
 
 }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index 38f71a8..d47ee0b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -23,90 +23,69 @@
 import com.google.common.io.Files;
 
 @StateModelInfo(states = { "ONLINE,OFFLINE" }, initialState = "OFFLINE")
-public class AppStateModel extends StateModel
-{
-  private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
-  private final String appName;
-  private final Server server;
+public class AppStateModel extends StateModel {
+    private static Logger logger = LoggerFactory.getLogger(AppStateModel.class);
+    private final String appName;
+    private final Server server;
 
-  public AppStateModel(Server server, String appName)
-  {
-    this.server = server;
-    this.appName = appName;
-  }
-
-  @Transition(from = "OFFLINE", to = "ONLINE")
-  public void deploy(Message message, NotificationContext context) throws Exception
-  {
-    HelixManager manager = context.getManager();
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
-    String uriString = configAccessor.get(scope,
-        DistributedDeploymentManager.S4R_URI);
-    String clusterName = manager.getClusterName();
-    try
-    {
-      URI uri = new URI(uriString);
-      // fetch application
-      File localS4RFileCopy;
-      try
-      {
-        localS4RFileCopy = File.createTempFile("tmp", "s4r");
-      } catch (IOException e1)
-      {
-        logger
-            .error(
-                "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
-                appName, e1.getClass().getName() + "->" + e1.getMessage());
-        throw new DeploymentFailedException("Cannot deploy application ["
-            + appName + "]", e1);
-      }
-      localS4RFileCopy.deleteOnExit();
-      try
-      {
-        if (ByteStreams.copy(DistributedDeploymentManager.fetchS4App(uri),
-            Files.newOutputStreamSupplier(localS4RFileCopy)) == 0)
-        {
-          throw new DeploymentFailedException("Cannot copy archive from ["
-              + uri.toString() + "] to [" + localS4RFileCopy.getAbsolutePath()
-              + "] (nothing was copied)");
-        }
-      } catch (IOException e)
-      {
-        throw new DeploymentFailedException("Cannot deploy application ["
-            + appName + "] from URI [" + uri.toString() + "] ", e);
-      }
-      // install locally
-      App loaded = server.loadApp(localS4RFileCopy, appName);
-      if (loaded != null)
-      {
-        logger.info("Successfully installed application {}", appName);
-        // TODO sync with other nodes? (e.g. wait for other apps deployed before
-        // starting?
-        server.startApp(loaded, appName, clusterName);
-      } else
-      {
-        throw new DeploymentFailedException("Cannot deploy application ["
-            + appName + "] from URI [" + uri.toString()
-            + "] : cannot start application");
-      }
-
-    } catch (URISyntaxException e)
-    {
-      logger
-          .error(
-              "Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ",
-              new String[] { appName, uriString, e.getMessage() });
-      throw new DeploymentFailedException("Cannot deploy application ["
-          + appName + "]", e);
+    public AppStateModel(Server server, String appName) {
+        this.server = server;
+        this.appName = appName;
     }
-  }
 
-  @Transition(from = "OFFLINE", to = "ONLINE")
-  public void undeploy(Message message, NotificationContext context) throws Exception
-  {
-    //todo
-  }
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void deploy(Message message, NotificationContext context) throws Exception {
+        HelixManager manager = context.getManager();
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(manager.getClusterName()).forResource(appName).build();
+        String uriString = configAccessor.get(scope, DistributedDeploymentManager.S4R_URI);
+        String clusterName = manager.getClusterName();
+        try {
+            URI uri = new URI(uriString);
+            // fetch application
+            File localS4RFileCopy;
+            try {
+                localS4RFileCopy = File.createTempFile("tmp", "s4r");
+            } catch (IOException e1) {
+                logger.error(
+                        "Cannot deploy app [{}] because a local copy of the S4R file could not be initialized due to [{}]",
+                        appName, e1.getClass().getName() + "->" + e1.getMessage());
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e1);
+            }
+            localS4RFileCopy.deleteOnExit();
+            try {
+                if (ByteStreams.copy(DistributedDeploymentManager.fetchS4App(uri),
+                        Files.newOutputStreamSupplier(localS4RFileCopy)) == 0) {
+                    throw new DeploymentFailedException("Cannot copy archive from [" + uri.toString() + "] to ["
+                            + localS4RFileCopy.getAbsolutePath() + "] (nothing was copied)");
+                }
+            } catch (IOException e) {
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+                        + uri.toString() + "] ", e);
+            }
+            // install locally
+            App loaded = server.loadApp(localS4RFileCopy, appName);
+            if (loaded != null) {
+                logger.info("Successfully installed application {}", appName);
+                // TODO sync with other nodes? (e.g. wait for other apps deployed before
+                // starting?
+                server.startApp(loaded, appName, clusterName);
+            } else {
+                throw new DeploymentFailedException("Cannot deploy application [" + appName + "] from URI ["
+                        + uri.toString() + "] : cannot start application");
+            }
+
+        } catch (URISyntaxException e) {
+            logger.error("Cannot deploy app {} : invalid uri for fetching s4r archive {} : {} ", new String[] {
+                    appName, uriString, e.getMessage() });
+            throw new DeploymentFailedException("Cannot deploy application [" + appName + "]", e);
+        }
+    }
+
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void undeploy(Message message, NotificationContext context) throws Exception {
+        // todo
+    }
 
 }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
index 3af4531..ef6ae58 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModelFactory.java
@@ -5,20 +5,20 @@
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+
 @Singleton
-public class AppStateModelFactory extends StateModelFactory<AppStateModel>
-{
-  private final Server server;
-  
-  @Inject
-  public AppStateModelFactory(Server server){
-    this.server = server;
-    
-  }
-  @Override
-  public AppStateModel createNewStateModel(String partitionName)
-  {
-    return new AppStateModel(server,partitionName);
-  }
+public class AppStateModelFactory extends StateModelFactory<AppStateModel> {
+    private final Server server;
+
+    @Inject
+    public AppStateModelFactory(Server server) {
+        this.server = server;
+
+    }
+
+    @Override
+    public AppStateModel createNewStateModel(String partitionName) {
+        return new AppStateModel(server, partitionName);
+    }
 
 }
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
index ed32085..5dcfa76 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -5,39 +5,24 @@
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-public class HelixBasedDeploymentManager implements DeploymentManager
-{
-  private final Server server;
-  boolean deployed = false;
-  private final String clusterName;
+public class HelixBasedDeploymentManager implements DeploymentManager {
+    private final Server server;
+    boolean deployed = false;
+    private final String clusterName;
 
-  @Inject
-  public HelixBasedDeploymentManager(
-      @Named("s4.cluster.name") String clusterName,
-      @Named("s4.cluster.zk_address") String zookeeperAddress,
-      @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
-      @Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
-      Server server)
-  {
-    this.clusterName = clusterName;
-    this.server = server;
+    @Inject
+    public HelixBasedDeploymentManager(@Named("s4.cluster.name") String clusterName,
+            @Named("s4.cluster.zk_address") String zookeeperAddress,
+            @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
+            @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, Server server) {
+        this.clusterName = clusterName;
+        this.server = server;
 
-  }
+    }
 
-  @Override
-  public void start()
-  {
-   /* File s4r = new File(
-        "/Users/kgopalak/Documents/projects/s4/incubator-s4-helix/myApp/build/libs/myApp.s4r");
-    String appName = "myApp";
-    try
-    {
-      App loaded = server.loadApp(s4r, "myApp");
-      server.startApp(loaded, appName, clusterName);
-    } catch (Exception e)
-    {
-      e.printStackTrace();
-    }*/
-  }
+    @Override
+    public void start() {
+        
+    }
 
 }
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
index 5bab316..6dfc82a 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/AddNodes.java
@@ -38,7 +38,7 @@
         Tools.parseArgs(clusterArgs, args);
         try {
 
-            logger.info("Adding new nodes [{}] to cluster [{}] node(s)",  clusterArgs.nbNodes, clusterArgs.clusterName);
+            logger.info("Adding new nodes [{}] to cluster [{}] node(s)", clusterArgs.nbNodes, clusterArgs.clusterName);
             HelixAdmin helixAdmin = new ZKHelixAdmin(clusterArgs.zkConnectionString);
             int initialPort = clusterArgs.firstListeningPort;
             if (clusterArgs.nbNodes > 0) {
@@ -74,14 +74,14 @@
 
         @Parameter(names = "-nodes", description = "Host names of the nodes", required = false)
         String nodes = "";
-        
+
         @Parameter(names = "-zk", description = "Zookeeper connection string")
         String zkConnectionString = "localhost:2181";
 
         @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
         int firstListeningPort = -1;
-        
-        @Parameter(names = {"-ng","-nodeGroup"}, description = "Assign the nodes to one or more groups. This will be useful when you create task", required=false)
+
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Assign the nodes to one or more groups. This will be useful when you create task", required = false)
         String nodeGroup = "default";
     }
 
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
index 38fbbe8..190c8c0 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/CreateCluster.java
@@ -88,10 +88,10 @@
 
         @Parameter(names = { "-flp", "-firstListeningPort" }, description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
         int firstListeningPort = -1;
-        
+
         @Parameter(names = { "-ng", "-nodeGroup" }, description = "Name of the App", required = false, arity = 1)
         String nodeGroup = "default";
-        
+
     }
 
 }
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
index ff1cb88..83a7088 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DeployApp.java
@@ -47,14 +47,14 @@
         for (String instanceName : instancesInCluster) {
             InstanceConfig instanceConfig = admin.getInstanceConfig(deployArgs.clusterName, instanceName);
             String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
-            if(nodeGroup.equals(deployArgs.nodeGroup)){
+            if (nodeGroup.equals(deployArgs.nodeGroup)) {
                 instancesInGroup.add(instanceName);
             }
         }
-        for(String instanceName:instancesInGroup){
+        for (String instanceName : instancesInGroup) {
             is.setPartitionState(deployArgs.appName, instanceName, "ONLINE");
         }
-        
+
         admin.setResourceIdealState(deployArgs.clusterName, deployArgs.appName, is);
     }
 
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
index b54187f..ce41ecd 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/GenericEventAdapter.java
@@ -19,64 +19,53 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-public class GenericEventAdapter
-{
+public class GenericEventAdapter {
 
-  public static void main(String[] args)
-  {
-    AdapterArgs adapterArgs = new AdapterArgs();
+    public static void main(String[] args) {
+        AdapterArgs adapterArgs = new AdapterArgs();
 
-    Tools.parseArgs(adapterArgs, args);
-    try
-    {
-      String instanceName = "adapter";
-      HelixManager manager = HelixManagerFactory.getZKHelixManager(
-          adapterArgs.clusterName, instanceName, InstanceType.SPECTATOR,
-          adapterArgs.zkConnectionString);
-      ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName,
-          adapterArgs.zkConnectionString, 30, 60);
-      manager.connect();
-      manager.addExternalViewChangeListener(cluster);
+        Tools.parseArgs(adapterArgs, args);
+        try {
+            String instanceName = "adapter";
+            HelixManager manager = HelixManagerFactory.getZKHelixManager(adapterArgs.clusterName, instanceName,
+                    InstanceType.SPECTATOR, adapterArgs.zkConnectionString);
+            ClusterFromHelix cluster = new ClusterFromHelix(adapterArgs.clusterName, adapterArgs.zkConnectionString,
+                    30, 60);
+            manager.connect();
+            manager.addExternalViewChangeListener(cluster);
 
-      HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-      Builder keyBuilder = helixDataAccessor.keyBuilder();
-      IdealState idealstate = helixDataAccessor.getProperty(keyBuilder
-          .idealStates(adapterArgs.streamName));
-      TCPEmitter emitter = new TCPEmitter(cluster, 1000);
-      while (true)
-      {
-        int partitionId = ((int) (Math.random() * 1000))
-            % idealstate.getNumPartitions();
-        Event event = new Event();
-        event.put("name", String.class,
-            "Hello world to partition:" + partitionId);
-        KryoSerDeser serializer = new KryoSerDeser();
-        EventMessage message = new EventMessage("-1", adapterArgs.streamName,
-            serializer.serialize(event));
-        System.out.println("Sending event to partition:"+partitionId);
-        emitter.send(partitionId, message);
-        Thread.sleep(1000);
-      }
-    } catch (Exception e)
-    {
-      e.printStackTrace();
+            HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+            Builder keyBuilder = helixDataAccessor.keyBuilder();
+            IdealState idealstate = helixDataAccessor.getProperty(keyBuilder.idealStates(adapterArgs.streamName));
+            TCPEmitter emitter = new TCPEmitter(cluster, 1000);
+            while (true) {
+                int partitionId = ((int) (Math.random() * 1000)) % idealstate.getNumPartitions();
+                Event event = new Event();
+                event.put("name", String.class, "Hello world to partition:" + partitionId);
+                KryoSerDeser serializer = new KryoSerDeser();
+                EventMessage message = new EventMessage("-1", adapterArgs.streamName, serializer.serialize(event));
+                System.out.println("Sending event to partition:" + partitionId);
+                emitter.send(partitionId, message);
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
     }
 
-  }
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class AdapterArgs extends S4ArgsBase {
 
-  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-  static class AdapterArgs extends S4ArgsBase
-  {
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
 
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
 
-    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
-    String clusterName;
+        @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
+        String streamName;
 
-    @Parameter(names = { "-s", "-streamName" }, description = "Stream Name where the event will be sent to", required = true)
-    String streamName;
-
-  }
+    }
 
 }
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
index 1b8214f..13b88ca 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/RebalanceTask.java
@@ -17,46 +17,43 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-public class RebalanceTask extends S4ArgsBase
-{
-  public static void main(String[] args)
-  {
-    RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
+public class RebalanceTask extends S4ArgsBase {
+    public static void main(String[] args) {
+        RebalanceTaskArgs taskArgs = new RebalanceTaskArgs();
 
-    Tools.parseArgs(taskArgs, args);
+        Tools.parseArgs(taskArgs, args);
 
-    HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
-    // This does the assignment of partition to nodes. It uses a modified
-    // version of consistent hashing to distribute active partitions and standbys
-    // equally among nodes.
-    IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
-    List<String> instancesInGroup = new ArrayList<String>();
-    List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
-    for (String instanceName : instancesInCluster) {
-        InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
-        String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
-        if (nodeGroup.equals(taskArgs.nodeGroup)) {
-            instancesInGroup.add(instanceName);
+        HelixAdmin admin = new ZKHelixAdmin(taskArgs.zkConnectionString);
+        // This does the assignment of partition to nodes. It uses a modified
+        // version of consistent hashing to distribute active partitions and standbys
+        // equally among nodes.
+        IdealState currentAssignment = admin.getResourceIdealState(taskArgs.clusterName, taskArgs.taskId);
+        List<String> instancesInGroup = new ArrayList<String>();
+        List<String> instancesInCluster = admin.getInstancesInCluster(taskArgs.clusterName);
+        for (String instanceName : instancesInCluster) {
+            InstanceConfig instanceConfig = admin.getInstanceConfig(taskArgs.clusterName, instanceName);
+            String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+            if (nodeGroup.equals(taskArgs.nodeGroup)) {
+                instancesInGroup.add(instanceName);
+            }
         }
+        admin.rebalance(taskArgs.clusterName, currentAssignment, instancesInGroup);
     }
-    admin.rebalance(taskArgs.clusterName,currentAssignment, instancesInGroup);
-  }
 
-  @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
-  static class RebalanceTaskArgs extends S4ArgsBase
-  {
+    @Parameters(commandNames = "newStreamProcessor", separators = "=", commandDescription = "Create a new stream processor")
+    static class RebalanceTaskArgs extends S4ArgsBase {
 
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
 
-    @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
-    String clusterName;
+        @Parameter(names = { "-c", "-cluster" }, description = "Logical name of the S4 cluster", required = true)
+        String clusterName;
 
-    @Parameter(names={"-id","-taskId"},description = "id of the task that produces/consumes a stream", required = true, arity = 1)
-    String taskId;
+        @Parameter(names = { "-id", "-taskId" }, description = "id of the task that produces/consumes a stream", required = true, arity = 1)
+        String taskId;
 
-    @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
-    String nodeGroup = "default";
+        @Parameter(names = { "-ng", "-nodeGroup" }, description = "Node group name where the task needs to be run", required = true, arity = 1)
+        String nodeGroup = "default";
 
-  }
+    }
 }
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
index bd4224e..2be58c1 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/S4Status.java
@@ -54,521 +54,413 @@
 import com.beust.jcommander.Parameters;
 import com.google.common.collect.Maps;
 
-public class S4Status extends S4ArgsBase
-{
-  static Logger logger = LoggerFactory.getLogger(S4Status.class);
+public class S4Status extends S4ArgsBase {
+    static Logger logger = LoggerFactory.getLogger(S4Status.class);
 
-  private static String NONE = "--";
+    private static String NONE = "--";
 
-  public static void main(String[] args)
-  {
+    public static void main(String[] args) {
 
-    StatusArgs statusArgs = new StatusArgs();
-    Tools.parseArgs(statusArgs, args);
+        StatusArgs statusArgs = new StatusArgs();
+        Tools.parseArgs(statusArgs, args);
 
-    try
-    {
-      if (statusArgs.clusters.size() > 0)
-      {
-        for (String cluster : statusArgs.clusters)
-        {
-          HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster,
-              "ADMIN", InstanceType.ADMINISTRATOR,
-              statusArgs.zkConnectionString);
-          manager.connect();
-          ConfigAccessor configAccessor = manager.getConfigAccessor();
-          ConfigScopeBuilder builder = new ConfigScopeBuilder();
-          printClusterInfo(manager, cluster);
-          List<String> resourcesInCluster = manager.getClusterManagmentTool()
-              .getResourcesInCluster(cluster);
+        try {
+            if (statusArgs.clusters.size() > 0) {
+                for (String cluster : statusArgs.clusters) {
+                    HelixManager manager = HelixManagerFactory.getZKHelixManager(cluster, "ADMIN",
+                            InstanceType.ADMINISTRATOR, statusArgs.zkConnectionString);
+                    manager.connect();
+                    ConfigAccessor configAccessor = manager.getConfigAccessor();
+                    ConfigScopeBuilder builder = new ConfigScopeBuilder();
+                    printClusterInfo(manager, cluster);
+                    List<String> resourcesInCluster = manager.getClusterManagmentTool().getResourcesInCluster(cluster);
 
-          List<String> apps = new ArrayList<String>();
-          List<String> tasks = new ArrayList<String>();
+                    List<String> apps = new ArrayList<String>();
+                    List<String> tasks = new ArrayList<String>();
 
-          for (String resource : resourcesInCluster)
-          {
-            ConfigScope scope = builder.forCluster(cluster)
-                .forResource(resource).build();
-            String resourceType = configAccessor.get(scope, "type");
-            if ("App".equals(resourceType))
-            {
-              apps.add(resource);
-            } else if ("Task".equals(resourceType))
-            {
-              tasks.add(resource);
+                    for (String resource : resourcesInCluster) {
+                        ConfigScope scope = builder.forCluster(cluster).forResource(resource).build();
+                        String resourceType = configAccessor.get(scope, "type");
+                        if ("App".equals(resourceType)) {
+                            apps.add(resource);
+                        } else if ("Task".equals(resourceType)) {
+                            tasks.add(resource);
+                        }
+                    }
+                    if (statusArgs.apps == null && statusArgs.streams == null) {
+                        statusArgs.apps = apps;
+                        statusArgs.streams = tasks;
+                    }
+                    for (String app : statusArgs.apps) {
+                        if (resourcesInCluster.contains(app)) {
+                            printAppInfo(manager, cluster, app);
+                        }
+                    }
+
+                    if (statusArgs.streams != null && statusArgs.streams.size() > 0) {
+                        for (String stream : statusArgs.streams) {
+                            if (resourcesInCluster.contains(stream)) {
+                                printStreamInfo(manager, cluster, stream);
+                            }
+                        }
+                    }
+                    manager.disconnect();
+                }
             }
-          }
-          if (statusArgs.apps == null && statusArgs.streams == null)
-          {
-            statusArgs.apps = apps;
-            statusArgs.streams = tasks;
-          }
-          for (String app : statusArgs.apps)
-          {
-            if (resourcesInCluster.contains(app))
-            {
-              printAppInfo(manager, cluster, app);
+        } catch (Exception e) {
+            logger.error("Cannot get the status of S4", e);
+        }
+
+    }
+
+    private static void printStreamInfo(HelixManager manager, String cluster, String taskId) {
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
+        String streamName = configAccessor.get(scope, "streamName");
+        String taskType = configAccessor.get(scope, "taskType");
+
+        System.out.println("Task Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20), inMiddle("Cluster", 20),
+                inMiddle("Description", 90));
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20), inMiddle(cluster, 20),
+                inMiddle(streamName + " " + taskType, 90));
+        System.out.println(generateEdge(130));
+        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = helixDataAccessor.keyBuilder();
+        IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(taskId));
+        ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(taskId));
+        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+        System.out.format("%-50s%-100s%n", inMiddle("Partition", 50), inMiddle("State", 20));
+        System.out.println(generateEdge(130));
+        for (String partition : assignment.getPartitionSet()) {
+            Map<String, String> stateMap = view.getStateMap(partition);
+            StringBuilder sb = new StringBuilder();
+            String delim = "";
+            for (String instance : stateMap.keySet()) {
+                sb.append(delim);
+                String state = stateMap.get(instance);
+                if (liveInstances.contains(instance)) {
+                    sb.append(instance).append(":").append(state);
+                } else {
+                    sb.append(instance).append(":").append("OFFLINE");
+                }
+                delim = ", ";
             }
-          }
+            System.out.format("%-50s%-10s%n", inMiddle(partition, 50), inMiddle(sb.toString(), 100));
+        }
+        System.out.println(generateEdge(130));
+    }
 
-          if (statusArgs.streams != null && statusArgs.streams.size() > 0)
-          {
-            for (String stream : statusArgs.streams)
-            {
-              if (resourcesInCluster.contains(stream))
-              {
-                printStreamInfo(manager, cluster, stream);
-              }
+    private static void printAppInfo(HelixManager manager, String cluster, String app) {
+        ConfigAccessor configAccessor = manager.getConfigAccessor();
+        ConfigScopeBuilder builder = new ConfigScopeBuilder();
+        ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
+        String uri = configAccessor.get(scope, "s4r_uri");
+
+        System.out.println("App Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20), inMiddle(cluster, 20), inMiddle(uri, 90));
+        System.out.println(generateEdge(130));
+        HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = helixDataAccessor.keyBuilder();
+        IdealState assignment = helixDataAccessor.getProperty(keyBuilder.idealStates(app));
+        ExternalView view = helixDataAccessor.getProperty(keyBuilder.externalView(app));
+        List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
+        Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
+        Map<String, String> appStateMap = view.getStateMap(app);
+        System.out.format("%-50s%-20s%n", inMiddle("Node id", 50), inMiddle("DEPLOYED", 20));
+        System.out.println(generateEdge(130));
+        for (String instance : assignmentMap.keySet()) {
+            String state = appStateMap.get(instance);
+            System.out.format("%-50s%-10s%n", inMiddle(instance, 50),
+                    inMiddle((("ONLINE".equals(state) && liveInstances.contains(instance)) ? "Y" : "N"), 20));
+        }
+
+        System.out.println(generateEdge(130));
+
+    }
+
+    private static void printClusterInfo(HelixManager manager, String cluster) {
+        HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = dataAccessor.keyBuilder();
+        List<String> instances = dataAccessor.getChildNames(keyBuilder.instanceConfigs());
+        List<String> liveInstances = dataAccessor.getChildNames(keyBuilder.liveInstances());
+        if (liveInstances == null) {
+            liveInstances = Collections.emptyList();
+        }
+        System.out.println("Cluster Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20), inMiddle("Nodes", 20),
+                inMiddle("Active", 10), generateEdge(80));
+        System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ", inMiddle("Node id", 10), inMiddle("Host", 50),
+                inMiddle("Port", 8), inMiddle("Active", 10));
+        System.out.println(generateEdge(130));
+
+        System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20), inMiddle("" + instances.size(), 8),
+                inMiddle("" + liveInstances.size(), 8));
+        boolean first = true;
+
+        for (String instance : instances) {
+            InstanceConfig config = dataAccessor.getProperty(keyBuilder.instanceConfig(instance));
+            // System.out.println(config);
+            if (first) {
+                first = false;
+            } else {
+                System.out.format("%n%-50s", " ");
             }
-          }
-          manager.disconnect();
+            System.out.format("%-10s%-46s%-10s%-10s", inMiddle("" + config.getId(), 10),
+                    inMiddle(config.getHostName(), 50), inMiddle(config.getPort() + "", 10),
+                    inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y" : "N", 10));
         }
-      }
-    } catch (Exception e)
-    {
-      logger.error("Cannot get the status of S4", e);
+
+        System.out.println();
     }
 
-  }
+    @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
+    static class StatusArgs extends S4ArgsBase {
 
-  private static void printStreamInfo(HelixManager manager, String cluster,
-      String taskId)
-  {
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(cluster).forResource(taskId).build();
-    String streamName = configAccessor.get(scope, "streamName");
-    String taskType = configAccessor.get(scope, "taskType");
+        @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
+        List<String> apps;
 
-    System.out.println("Task Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle("Task Id", 20),
-        inMiddle("Cluster", 20), inMiddle("Description", 90));
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle(taskId, 20),
-        inMiddle(cluster, 20), inMiddle(streamName + " " + taskType, 90));
-    System.out.println(generateEdge(130));
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    IdealState assignment = helixDataAccessor.getProperty(keyBuilder
-        .idealStates(taskId));
-    ExternalView view = helixDataAccessor.getProperty(keyBuilder
-        .externalView(taskId));
-    List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
-        .liveInstances());
-    System.out.format("%-50s%-100s%n", inMiddle("Partition", 50),
-        inMiddle("State", 20));
-    System.out.println(generateEdge(130));
-    for (String partition : assignment.getPartitionSet())
-    {
-      Map<String, String> stateMap = view.getStateMap(partition);
-      StringBuilder sb = new StringBuilder();
-      String delim="";
-      for (String instance : stateMap.keySet())
-      {
-        sb.append(delim);
-        String state = stateMap.get(instance);
-        if (liveInstances.contains(instance))
-        {
-          sb.append(instance).append(":").append(state);
-        } else
-        {
-          sb.append(instance).append(":").append("OFFLINE");
+        @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
+        List<String> clusters;
+
+        @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
+        List<String> streams;
+
+        @Parameter(names = "-zk", description = "ZooKeeper connection string")
+        String zkConnectionString = "localhost:2181";
+
+        @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
+        int timeout = 10000;
+    }
+
+    private static void showAppsStatus(List<Cluster> clusters) {
+        System.out.println("App Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20), inMiddle("Cluster", 20), inMiddle("URI", 90));
+        System.out.println(generateEdge(130));
+        for (Cluster cluster : clusters) {
+            if (!NONE.equals(cluster.app.name)) {
+                System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
+                        inMiddle(cluster.app.cluster, 20), cluster.app.uri);
+            }
         }
-       delim=", ";
-      }
-      System.out.format("%-50s%-10s%n", inMiddle(partition, 50),
-          inMiddle(sb.toString(), 100));
-    }
-    System.out.println(generateEdge(130));
-  }
+        System.out.println(generateEdge(130));
 
-  private static void printAppInfo(HelixManager manager, String cluster,
-      String app)
-  {
-    ConfigAccessor configAccessor = manager.getConfigAccessor();
-    ConfigScopeBuilder builder = new ConfigScopeBuilder();
-    ConfigScope scope = builder.forCluster(cluster).forResource(app).build();
-    String uri = configAccessor.get(scope, "s4r_uri");
-
-    System.out.println("App Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
-        inMiddle("Cluster", 20), inMiddle("URI", 90));
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle(app, 20),
-        inMiddle(cluster, 20), inMiddle(uri, 90));
-    System.out.println(generateEdge(130));
-    HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = helixDataAccessor.keyBuilder();
-    IdealState assignment = helixDataAccessor.getProperty(keyBuilder
-        .idealStates(app));
-    ExternalView view = helixDataAccessor.getProperty(keyBuilder
-        .externalView(app));
-    List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder
-        .liveInstances());
-    Map<String, String> assignmentMap = assignment.getInstanceStateMap(app);
-    Map<String, String> appStateMap = view.getStateMap(app);
-    System.out.format("%-50s%-20s%n", inMiddle("Node id", 50),
-        inMiddle("DEPLOYED", 20));
-    System.out.println(generateEdge(130));
-    for (String instance : assignmentMap.keySet())
-    {
-      String state = appStateMap.get(instance);
-      System.out
-          .format(
-              "%-50s%-10s%n",
-              inMiddle(instance, 50),
-              inMiddle((("ONLINE".equals(state) && liveInstances
-                  .contains(instance)) ? "Y" : "N"), 20));
     }
 
-    System.out.println(generateEdge(130));
+    private static void showClustersStatus(List<Cluster> clusters) {
+        System.out.println("Cluster Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
+        System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20), inMiddle("App", 20), inMiddle("Tasks", 10),
+                generateEdge(80));
+        System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ", inMiddle("Number", 8), inMiddle("Task id", 10),
+                inMiddle("Host", 50), inMiddle("Port", 8));
+        System.out.println(generateEdge(130));
 
-  }
-
-  private static void printClusterInfo(HelixManager manager, String cluster)
-  {
-    HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
-    Builder keyBuilder = dataAccessor.keyBuilder();
-    List<String> instances = dataAccessor.getChildNames(keyBuilder
-        .instanceConfigs());
-    List<String> liveInstances = dataAccessor.getChildNames(keyBuilder
-        .liveInstances());
-    if (liveInstances == null)
-    {
-      liveInstances = Collections.emptyList();
-    }
-    System.out.println("Cluster Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-50s%-80s%n", " ", inMiddle("Nodes", 80));
-    System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Cluster Name", 20),
-        inMiddle("Nodes", 20), inMiddle("Active", 10), generateEdge(80));
-    System.out.format("%-54s%-10s%-50s%-8s%-8s%n", " ",
-        inMiddle("Node id", 10), inMiddle("Host", 50), inMiddle("Port", 8),
-        inMiddle("Active", 10));
-    System.out.println(generateEdge(130));
-
-    System.out.format("%-20s%-20s%-10s", inMiddle(cluster, 20),
-        inMiddle("" + instances.size(), 8),
-        inMiddle("" + liveInstances.size(), 8));
-    boolean first = true;
-
-    for (String instance : instances)
-    {
-      InstanceConfig config = dataAccessor.getProperty(keyBuilder
-          .instanceConfig(instance));
-      // System.out.println(config);
-      if (first)
-      {
-        first = false;
-      } else
-      {
-        System.out.format("%n%-50s", " ");
-      }
-      System.out
-          .format(
-              "%-10s%-46s%-10s%-10s",
-              inMiddle("" + config.getId(), 10),
-              inMiddle(config.getHostName(), 50),
-              inMiddle(config.getPort() + "", 10),
-              inMiddle(liveInstances.contains(config.getInstanceName()) ? "Y"
-                  : "N", 10));
-    }
-
-    System.out.println();
-  }
-
-  @Parameters(commandNames = "s4 status", commandDescription = "Show status of S4", separators = "=")
-  static class StatusArgs extends S4ArgsBase
-  {
-
-    @Parameter(names = { "-app" }, description = "Only show status of specified S4 application(s)", required = false)
-    List<String> apps;
-
-    @Parameter(names = { "-c", "-cluster" }, description = "Only show status of specified S4 cluster(s)", required = true)
-    List<String> clusters;
-
-    @Parameter(names = { "-s", "-stream" }, description = "Only show status of specified published stream(s)", required = false)
-    List<String> streams;
-
-    @Parameter(names = "-zk", description = "ZooKeeper connection string")
-    String zkConnectionString = "localhost:2181";
-
-    @Parameter(names = "-timeout", description = "Connection timeout to Zookeeper, in ms")
-    int timeout = 10000;
-  }
-
-  private static void showAppsStatus(List<Cluster> clusters)
-  {
-    System.out.println("App Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-20s%-90s%n", inMiddle("Name", 20),
-        inMiddle("Cluster", 20), inMiddle("URI", 90));
-    System.out.println(generateEdge(130));
-    for (Cluster cluster : clusters)
-    {
-      if (!NONE.equals(cluster.app.name))
-      {
-        System.out.format("%-20s%-20s%-90s%n", inMiddle(cluster.app.name, 20),
-            inMiddle(cluster.app.cluster, 20), cluster.app.uri);
-      }
-    }
-    System.out.println(generateEdge(130));
-
-  }
-
-  private static void showClustersStatus(List<Cluster> clusters)
-  {
-    System.out.println("Cluster Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-50s%-80s%n", " ", inMiddle("Active nodes", 80));
-    System.out.format("%-20s%-20s%-10s%s%n", inMiddle("Name", 20),
-        inMiddle("App", 20), inMiddle("Tasks", 10), generateEdge(80));
-    System.out.format("%-50s%-10s%-10s%-50s%-10s%n", " ",
-        inMiddle("Number", 8), inMiddle("Task id", 10), inMiddle("Host", 50),
-        inMiddle("Port", 8));
-    System.out.println(generateEdge(130));
-
-    for (Cluster cluster : clusters)
-    {
-      System.out.format("%-20s%-20s%-10s%-10s",
-          inMiddle(cluster.clusterName, 20), inMiddle(cluster.app.name, 20),
-          inMiddle("" + cluster.taskNumber, 8),
-          inMiddle("" + cluster.nodes.size(), 8));
-      boolean first = true;
-      for (ClusterNode node : cluster.nodes)
-      {
-        if (first)
-        {
-          first = false;
-        } else
-        {
-          System.out.format("%n%-60s", " ");
+        for (Cluster cluster : clusters) {
+            System.out.format("%-20s%-20s%-10s%-10s", inMiddle(cluster.clusterName, 20),
+                    inMiddle(cluster.app.name, 20), inMiddle("" + cluster.taskNumber, 8),
+                    inMiddle("" + cluster.nodes.size(), 8));
+            boolean first = true;
+            for (ClusterNode node : cluster.nodes) {
+                if (first) {
+                    first = false;
+                } else {
+                    System.out.format("%n%-60s", " ");
+                }
+                System.out.format("%-10s%-50s%-10s", inMiddle("" + node.getTaskId(), 10),
+                        inMiddle(node.getMachineName(), 50), inMiddle(node.getPort() + "", 10));
+            }
+            System.out.println();
         }
-        System.out.format("%-10s%-50s%-10s",
-            inMiddle("" + node.getTaskId(), 10),
-            inMiddle(node.getMachineName(), 50),
-            inMiddle(node.getPort() + "", 10));
-      }
-      System.out.println();
+        System.out.println(generateEdge(130));
     }
-    System.out.println(generateEdge(130));
-  }
 
-  private static void showStreamsStatus(List<Stream> streams)
-  {
-    System.out.println("Stream Status");
-    System.out.println(generateEdge(130));
-    System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20),
-        inMiddle("Producers", 55), inMiddle("Consumers", 55));
-    System.out.println(generateEdge(130));
+    private static void showStreamsStatus(List<Stream> streams) {
+        System.out.println("Stream Status");
+        System.out.println(generateEdge(130));
+        System.out.format("%-20s%-55s%-55s%n", inMiddle("Name", 20), inMiddle("Producers", 55),
+                inMiddle("Consumers", 55));
+        System.out.println(generateEdge(130));
 
-    for (Stream stream : streams)
-    {
-      System.out
-          .format(
-              "%-20s%-55s%-55s%n",
-              inMiddle(stream.streamName, 20),
-              inMiddle(getFormatString(stream.producers, stream.clusterAppMap),
-                  55),
-              inMiddle(getFormatString(stream.consumers, stream.clusterAppMap),
-                  55));
-    }
-    System.out.println(generateEdge(130));
-
-  }
-
-  private static String inMiddle(String content, int width)
-  {
-    int i = (width - content.length()) / 2;
-    return String.format("%" + i + "s%s", " ", content);
-  }
-
-  private static String generateEdge(int length)
-  {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < length; i++)
-    {
-      sb.append("-");
-    }
-    return sb.toString();
-  }
-
-  /**
-   * show as cluster1(app1), cluster2(app2)
-   * 
-   * @param clusters
-   *          cluster list
-   * @param clusterAppMap
-   *          <cluster,app>
-   * @return
-   */
-  private static String getFormatString(Collection<String> clusters,
-      Map<String, String> clusterAppMap)
-  {
-    if (clusters == null || clusters.size() == 0)
-    {
-      return NONE;
-    } else
-    {
-      // show as: cluster1(app1), cluster2(app2)
-      StringBuilder sb = new StringBuilder();
-      for (String cluster : clusters)
-      {
-        String app = clusterAppMap.get(cluster);
-        sb.append(cluster);
-        if (!NONE.equals(app))
-        {
-          sb.append("(").append(app).append(")");
+        for (Stream stream : streams) {
+            System.out.format("%-20s%-55s%-55s%n", inMiddle(stream.streamName, 20),
+                    inMiddle(getFormatString(stream.producers, stream.clusterAppMap), 55),
+                    inMiddle(getFormatString(stream.consumers, stream.clusterAppMap), 55));
         }
-        sb.append(" ");
-      }
-      return sb.toString();
-    }
-  }
+        System.out.println(generateEdge(130));
 
-  static class Stream
-  {
-
-    private final ZkClient zkClient;
-    private final String consumerPath;
-    private final String producerPath;
-
-    String streamName;
-    Set<String> producers = new HashSet<String>();// cluster name
-    Set<String> consumers = new HashSet<String>();// cluster name
-
-    Map<String, String> clusterAppMap = Maps.newHashMap();
-
-    public Stream(String streamName, ZkClient zkClient) throws Exception
-    {
-      this.streamName = streamName;
-      this.zkClient = zkClient;
-      this.consumerPath = "/s4/streams/" + streamName + "/consumers";
-      this.producerPath = "/s4/streams/" + streamName + "/producers";
-      readStreamFromZk();
     }
 
-    private void readStreamFromZk() throws Exception
-    {
-      List<String> consumerNodes = zkClient.getChildren(consumerPath);
-      for (String node : consumerNodes)
-      {
-        ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
-        consumers.add(consumer.getSimpleField("clusterName"));
-      }
-
-      List<String> producerNodes = zkClient.getChildren(producerPath);
-      for (String node : producerNodes)
-      {
-        ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
-        producers.add(consumer.getSimpleField("clusterName"));
-      }
-
-      getAppNames();
+    private static String inMiddle(String content, int width) {
+        int i = (width - content.length()) / 2;
+        return String.format("%" + i + "s%s", " ", content);
     }
 
-    private void getAppNames()
-    {
-      Set<String> clusters = new HashSet<String>(consumers);
-      clusters.addAll(producers);
-      for (String cluster : clusters)
-      {
-        clusterAppMap.put(cluster, getApp(cluster, zkClient));
-      }
-    }
-
-    public boolean containsCluster(String cluster)
-    {
-      if (producers.contains(cluster) || consumers.contains(cluster))
-      {
-        return true;
-      }
-      return false;
-    }
-
-    private static String getApp(String clusterName, ZkClient zkClient)
-    {
-      String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
-      if (zkClient.exists(appPath))
-      {
-        ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName
-            + "/app/s4App");
-        return appRecord.getSimpleField("name");
-      }
-      return NONE;
-    }
-  }
-
-  static class App
-  {
-    private String name = NONE;
-    private String cluster;
-    private String uri = NONE;
-  }
-
-  static class Cluster
-  {
-    private final ZkClient zkClient;
-    private final String taskPath;
-    private final String processPath;
-    private final String appPath;
-
-    String clusterName;
-    int taskNumber;
-    App app;
-
-    List<ClusterNode> nodes = new ArrayList<ClusterNode>();
-
-    public Cluster(String clusterName, ZkClient zkClient) throws Exception
-    {
-      this.clusterName = clusterName;
-      this.zkClient = zkClient;
-      this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
-      this.processPath = "/s4/clusters/" + clusterName + "/process";
-      this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
-      readClusterFromZk();
-    }
-
-    public void readClusterFromZk() throws Exception
-    {
-      List<String> processes;
-      List<String> tasks;
-
-      tasks = zkClient.getChildren(taskPath);
-      processes = zkClient.getChildren(processPath);
-
-      taskNumber = tasks.size();
-
-      for (int i = 0; i < processes.size(); i++)
-      {
-        ZNRecord process = zkClient.readData(
-            processPath + "/" + processes.get(i), true);
-        if (process != null)
-        {
-          int partition = Integer.parseInt(process.getSimpleField("partition"));
-          String host = process.getSimpleField("host");
-          int port = Integer.parseInt(process.getSimpleField("port"));
-          String taskId = process.getSimpleField("taskId");
-          ClusterNode node = new ClusterNode(partition, port, host, taskId);
-          nodes.add(node);
+    private static String generateEdge(int length) {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            sb.append("-");
         }
-      }
-
-      app = new App();
-      app.cluster = clusterName;
-      try
-      {
-        ZNRecord appRecord = zkClient.readData(appPath);
-        app.name = appRecord.getSimpleField("name");
-        app.uri = appRecord.getSimpleField("s4r_uri");
-      } catch (ZkNoNodeException e)
-      {
-        logger.warn(appPath + " doesn't exist");
-      }
+        return sb.toString();
     }
 
-  }
+    /**
+     * show as cluster1(app1), cluster2(app2)
+     * 
+     * @param clusters
+     *            cluster list
+     * @param clusterAppMap
+     *            <cluster,app>
+     * @return
+     */
+    private static String getFormatString(Collection<String> clusters, Map<String, String> clusterAppMap) {
+        if (clusters == null || clusters.size() == 0) {
+            return NONE;
+        } else {
+            // show as: cluster1(app1), cluster2(app2)
+            StringBuilder sb = new StringBuilder();
+            for (String cluster : clusters) {
+                String app = clusterAppMap.get(cluster);
+                sb.append(cluster);
+                if (!NONE.equals(app)) {
+                    sb.append("(").append(app).append(")");
+                }
+                sb.append(" ");
+            }
+            return sb.toString();
+        }
+    }
+
+    static class Stream {
+
+        private final ZkClient zkClient;
+        private final String consumerPath;
+        private final String producerPath;
+
+        String streamName;
+        Set<String> producers = new HashSet<String>();// cluster name
+        Set<String> consumers = new HashSet<String>();// cluster name
+
+        Map<String, String> clusterAppMap = Maps.newHashMap();
+
+        public Stream(String streamName, ZkClient zkClient) throws Exception {
+            this.streamName = streamName;
+            this.zkClient = zkClient;
+            this.consumerPath = "/s4/streams/" + streamName + "/consumers";
+            this.producerPath = "/s4/streams/" + streamName + "/producers";
+            readStreamFromZk();
+        }
+
+        private void readStreamFromZk() throws Exception {
+            List<String> consumerNodes = zkClient.getChildren(consumerPath);
+            for (String node : consumerNodes) {
+                ZNRecord consumer = zkClient.readData(consumerPath + "/" + node, true);
+                consumers.add(consumer.getSimpleField("clusterName"));
+            }
+
+            List<String> producerNodes = zkClient.getChildren(producerPath);
+            for (String node : producerNodes) {
+                ZNRecord consumer = zkClient.readData(producerPath + "/" + node, true);
+                producers.add(consumer.getSimpleField("clusterName"));
+            }
+
+            getAppNames();
+        }
+
+        private void getAppNames() {
+            Set<String> clusters = new HashSet<String>(consumers);
+            clusters.addAll(producers);
+            for (String cluster : clusters) {
+                clusterAppMap.put(cluster, getApp(cluster, zkClient));
+            }
+        }
+
+        public boolean containsCluster(String cluster) {
+            if (producers.contains(cluster) || consumers.contains(cluster)) {
+                return true;
+            }
+            return false;
+        }
+
+        private static String getApp(String clusterName, ZkClient zkClient) {
+            String appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            if (zkClient.exists(appPath)) {
+                ZNRecord appRecord = zkClient.readData("/s4/clusters/" + clusterName + "/app/s4App");
+                return appRecord.getSimpleField("name");
+            }
+            return NONE;
+        }
+    }
+
+    static class App {
+        private String name = NONE;
+        private String cluster;
+        private String uri = NONE;
+    }
+
+    static class Cluster {
+        private final ZkClient zkClient;
+        private final String taskPath;
+        private final String processPath;
+        private final String appPath;
+
+        String clusterName;
+        int taskNumber;
+        App app;
+
+        List<ClusterNode> nodes = new ArrayList<ClusterNode>();
+
+        public Cluster(String clusterName, ZkClient zkClient) throws Exception {
+            this.clusterName = clusterName;
+            this.zkClient = zkClient;
+            this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
+            this.processPath = "/s4/clusters/" + clusterName + "/process";
+            this.appPath = "/s4/clusters/" + clusterName + "/app/s4App";
+            readClusterFromZk();
+        }
+
+        public void readClusterFromZk() throws Exception {
+            List<String> processes;
+            List<String> tasks;
+
+            tasks = zkClient.getChildren(taskPath);
+            processes = zkClient.getChildren(processPath);
+
+            taskNumber = tasks.size();
+
+            for (int i = 0; i < processes.size(); i++) {
+                ZNRecord process = zkClient.readData(processPath + "/" + processes.get(i), true);
+                if (process != null) {
+                    int partition = Integer.parseInt(process.getSimpleField("partition"));
+                    String host = process.getSimpleField("host");
+                    int port = Integer.parseInt(process.getSimpleField("port"));
+                    String taskId = process.getSimpleField("taskId");
+                    ClusterNode node = new ClusterNode(partition, port, host, taskId);
+                    nodes.add(node);
+                }
+            }
+
+            app = new App();
+            app.cluster = clusterName;
+            try {
+                ZNRecord appRecord = zkClient.readData(appPath);
+                app.name = appRecord.getSimpleField("name");
+                app.uri = appRecord.getSimpleField("s4r_uri");
+            } catch (ZkNoNodeException e) {
+                logger.warn(appPath + " doesn't exist");
+            }
+        }
+
+    }
 
 }
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
index 824aa03..ab0ba3e 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Status.java
@@ -219,7 +219,7 @@
 
     /**
      * show as cluster1(app1), cluster2(app2)
-     *
+     * 
      * @param clusters
      *            cluster list
      * @param clusterAppMap
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
index 192869a..3940c6d 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -38,20 +38,11 @@
 
     static Logger logger = LoggerFactory.getLogger(Tools.class);
 
-    enum Task { 
-        deployApp(DeployApp.class),
-        deploy(Deploy.class), 
-        node(Main.class), 
-        addNodes(AddNodes.class), 
-        zkServer(ZKServer.class), 
-        newCluster(CreateCluster.class), 
-        adapter(null), 
-        genericAdapter(GenericEventAdapter.class),
-        newApp(CreateApp.class), 
-        s4r(Package.class), 
-        status(S4Status.class),
-        createTask(CreateTask.class), 
-        rebalanceTask(RebalanceTask.class);
+    enum Task {
+        deployApp(DeployApp.class), deploy(Deploy.class), node(Main.class), addNodes(AddNodes.class), zkServer(
+                ZKServer.class), newCluster(CreateCluster.class), adapter(null), genericAdapter(
+                GenericEventAdapter.class), newApp(CreateApp.class), s4r(Package.class), status(S4Status.class), createTask(
+                CreateTask.class), rebalanceTask(RebalanceTask.class);
 
         Class<?> target;