Merge branch 'STORM-2010'
diff --git a/bin/storm.py b/bin/storm.py
index 0da2ae1..c49ab09 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -338,7 +338,7 @@
     get  PATH - Get the heartbeat data at PATH
     """
     exec_storm_class(
-        "org.apache.storm.command.heartbeats",
+        "org.apache.storm.command.Heartbeats",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
deleted file mode 100644
index 625cff7..0000000
--- a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
+++ /dev/null
@@ -1,54 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.heartbeats
-  (:require [org.apache.storm
-             [config :refer :all]
-             [log :refer :all]
-             [util :refer :all]
-             [converter :refer :all]]
-            [clojure.string :as string])
-  (:import [org.apache.storm.generated ClusterWorkerHeartbeat]
-           [org.apache.storm.utils Utils ConfigUtils]
-           [org.apache.storm.cluster ZKStateStorage ClusterStateContext ClusterUtils]
-           [org.apache.storm.stats StatsUtil])
-  (:gen-class))
-
-(defn -main [command path & args]
-  (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        cluster (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.))]
-    (println "Command: [" command "]")
-    (condp = command
-      "list"
-      (let [message (clojure.string/join " \n" (.get_worker_hb_children cluster path false))]
-        (log-message "list " path ":\n"
-                     message "\n"))
-      "get"
-      (log-message 
-       (if-let [hb (.get_worker_hb cluster path false)]
-         (StatsUtil/convertZkWorkerHb
-          (Utils/deserialize
-           hb
-           ClusterWorkerHeartbeat))
-         "Nothing"))
-      
-      (log-message "Usage: heartbeats [list|get] path"))
-    
-    (try
-      (.close cluster)
-      (catch Exception e
-        (log-message "Caught exception: " e " on close."))))
-  (System/exit 0))
-         
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
new file mode 100644
index 0000000..2a149cb
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import com.google.common.base.Joiner;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class Heartbeats {
+    private static final Logger LOG = LoggerFactory.getLogger(Heartbeats.class);
+
+    public static void main(String[] args) throws Exception {
+        if (args.length < 2) {
+            throw new IllegalArgumentException("Command and path arguments must be provided.");
+        }
+
+        String command = args[0];
+        String path = args[1];
+
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, null, new ClusterStateContext());
+
+        LOG.info("Command: [{}]", command);
+
+        switch (command) {
+            case "list":
+                handleListCommand(cluster, path);
+                break;
+
+            case "get":
+                handleGetCommand(cluster, path);
+                break;
+
+            default:
+                LOG.info("Usage: heartbeats [list|get] path");
+        }
+
+        try {
+            cluster.close();
+        } catch (Exception e) {
+            LOG.info("Caught exception: {} on close.", e);
+        }
+
+        // force process to be terminated
+        System.exit(0);
+    }
+
+    private static void handleListCommand(IStateStorage cluster, String path) {
+        String message = Joiner.on("\n").join(cluster.get_worker_hb_children(path, false));
+        LOG.info("list {}:\n{}\n", path, message);
+    }
+
+    private static void handleGetCommand(IStateStorage cluster, String path) {
+        String message;
+        byte[] hb = cluster.get_worker_hb(path, false);
+        if (hb != null) {
+            Map<String, Object> heartbeatMap = StatsUtil.convertZkWorkerHb(Utils.deserialize(hb, ClusterWorkerHeartbeat.class));
+            message = JSONValue.toJSONString(heartbeatMap);
+        } else {
+            message = "No Heartbeats found";
+        }
+        LOG.info(message);
+    }
+}