STORM-1285 port backtype.storm.command.get-errors to java
* port get-errors to GetErrors
diff --git a/bin/storm.py b/bin/storm.py
index 74a7825..a0c9717 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -456,7 +456,7 @@
print_usage(command="get_errors")
sys.exit(2)
exec_storm_class(
- "org.apache.storm.command.get_errors",
+ "org.apache.storm.command.GetErrors",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")])
diff --git a/storm-core/src/clj/org/apache/storm/command/get_errors.clj b/storm-core/src/clj/org/apache/storm/command/get_errors.clj
deleted file mode 100644
index 4f83a86..0000000
--- a/storm-core/src/clj/org/apache/storm/command/get_errors.clj
+++ /dev/null
@@ -1,55 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.get-errors
- (:use [clojure.tools.cli :only [cli]])
- (:use [org.apache.storm log])
- (:use [org.apache.storm.internal thrift])
- (:use [org.apache.storm util])
- (:require [org.apache.storm.daemon
- [nimbus :as nimbus]
- [common :as common]])
- (:import [org.apache.storm.generated GetInfoOptions NumErrorsChoice
- TopologySummary ErrorInfo]
- [org.json.simple JSONValue])
- (:gen-class))
-
-(defn get-topology-id [name topologies]
- (let [topology (first (filter #(= (.get_name %1) name) topologies))]
- (when (not-nil? topology) (.get_id topology))))
-
-(defn get-component-errors
- [topology-errors]
- (apply hash-map (remove nil?
- (flatten (for [[comp-name comp-errors] topology-errors]
- (let [latest-error (when (not (empty? comp-errors)) (first comp-errors))]
- (if latest-error [comp-name (.get_error ^ErrorInfo latest-error)])))))))
-
-(defn -main [name]
- (with-configured-nimbus-connection nimbus
- (let [opts (doto (GetInfoOptions.)
- (.set_num_err_choice NumErrorsChoice/ONE))
- cluster-info (.getClusterInfo nimbus)
- topologies (.get_topologies cluster-info)
- topo-id (get-topology-id name topologies)
- topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))]
- (if (or (nil? topo-id) (nil? topo-info))
- (println (JSONValue/toJSONString {"Failure" (str "No topologies running with name " name)}))
- (let [topology-name (.get_name topo-info)
- topology-errors (.get_errors topo-info)]
- (println (JSONValue/toJSONString
- (hash-map
- "Topology Name" topology-name
- "Comp-Errors" (get-component-errors topology-errors)))))))))
diff --git a/storm-core/src/jvm/org/apache/storm/command/GetErrors.java b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
new file mode 100644
index 0000000..1f90673
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/GetErrors.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.storm.generated.ErrorInfo;
+import org.apache.storm.generated.GetInfoOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.NumErrorsChoice;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GetErrors {
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ throw new IllegalArgumentException("Topology name must be provided.");
+ }
+
+ final String name = args[0];
+
+ NimbusClient.withConfiguredClient(new NimbusClient.WithNimbus() {
+ @Override
+ public void run(Nimbus.Client client) throws Exception {
+ GetInfoOptions opts = new GetInfoOptions();
+ opts.set_num_err_choice(NumErrorsChoice.ONE);
+ String topologyId = Utils.getTopologyId(name, client);
+
+ TopologyInfo topologyInfo = null;
+ if (topologyId != null) {
+ topologyInfo = client.getTopologyInfoWithOpts(topologyId, opts);
+ }
+
+ Map<String, Object> outputMap = new HashMap<>();
+ if (topologyId == null || topologyInfo == null) {
+ outputMap.put("Failure", "No topologies running with name " + name);
+ } else {
+ String topologyName = topologyInfo.get_name();
+ Map<String, List<ErrorInfo>> topologyErrors = topologyInfo.get_errors();
+ outputMap.put("Topology Name", topologyName);
+ outputMap.put("Comp-Errors", getComponentErrors(topologyErrors));
+ }
+ System.out.println(JSONValue.toJSONString(outputMap));
+ }
+
+ private Map<String, String> getComponentErrors(Map<String, List<ErrorInfo>> topologyErrors) {
+ Map<String, String> componentErrorMap = new HashMap<>();
+ for (Map.Entry<String, List<ErrorInfo>> compNameToCompErrors : topologyErrors.entrySet()) {
+ String compName = compNameToCompErrors.getKey();
+ List<ErrorInfo> compErrors = compNameToCompErrors.getValue();
+ if (compErrors != null && !compErrors.isEmpty()) {
+ ErrorInfo latestError = compErrors.get(0);
+ componentErrorMap.put(compName, latestError.get_error());
+ }
+ }
+
+ return componentErrorMap;
+ }
+ });
+ }
+}