Merge branch 'STORM-2010'
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8c623e5..7b9e910 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2009: port org.apache.storm.blobstore.clj and org.apache.storm.command.blobstore.clj to Java
* STORM-1876: Option to build storm-kafka and storm-kafka-client with different kafka client version
* STORM-2000: Package storm-opentsdb as part of external dir in installation
* STORM-1962: support python 3 and 2 in multilang
@@ -162,6 +163,10 @@
* STORM-1720: Support GEO in storm-redis
* STORM-1868: Modify TridentKafkaWordCount to run in distributed mode
+## 1.0.3
+ * STORM-1594: org.apache.storm.tuple.Fields can throw NPE if given invalid field in selector
+ * STORM-1995: downloadChunk in nimbus.clj should close the input stream
+
## 1.0.2
* STORM-1976: Remove cleanup-corrupt-topologies!
* STORM-1977: Restore logic: give up leadership when elected as leader but doesn't have one or more of topology codes on local
diff --git a/bin/storm.py b/bin/storm.py
index 1668d70..c49ab09 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -326,7 +326,7 @@
storm blobstore create mytopo:data.tgz -f data.tgz -a u:alice:rwa,u:bob:rw,o::r
"""
exec_storm_class(
- "org.apache.storm.command.blobstore",
+ "org.apache.storm.command.Blobstore",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
diff --git a/storm-core/src/clj/org/apache/storm/blobstore.clj b/storm-core/src/clj/org/apache/storm/blobstore.clj
deleted file mode 100644
index 92fb44f..0000000
--- a/storm-core/src/clj/org/apache/storm/blobstore.clj
+++ /dev/null
@@ -1,28 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.blobstore
- (:import [org.apache.storm.utils Utils ConfigUtils])
- (:import [org.apache.storm.blobstore ClientBlobStore])
- (:use [org.apache.storm config util]))
-
-(defmacro with-configured-blob-client
- [client-sym & body]
- `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
- ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)]
- (try
- ~@body
- (finally (.shutdown ~client-sym)))))
diff --git a/storm-core/src/clj/org/apache/storm/command/blobstore.clj b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
deleted file mode 100644
index 924f825..0000000
--- a/storm-core/src/clj/org/apache/storm/command/blobstore.clj
+++ /dev/null
@@ -1,163 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.blobstore
- (:import [java.io InputStream OutputStream]
- [org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException
- KeyNotFoundException]
- [org.apache.storm.blobstore BlobStoreAclHandler]
- [org.apache.storm.utils Utils])
- (:use [org.apache.storm config]
- [clojure.string :only [split]]
- [clojure.tools.cli :only [cli]]
- [clojure.java.io :only [copy input-stream output-stream]]
- [org.apache.storm blobstore log])
- (:gen-class))
-
-(defn update-blob-from-stream
- "Update a blob in the blob store from an InputStream"
- [key ^InputStream in]
- (with-configured-blob-client blobstore
- (let [out (.updateBlob blobstore key)]
- (try
- (copy in out)
- (.close out)
- (catch Exception e
- (log-message e)
- (.cancel out)
- (throw e))))))
-
-(defn create-blob-from-stream
- "Create a blob in the blob store from an InputStream"
- [key ^InputStream in ^SettableBlobMeta meta]
- (with-configured-blob-client blobstore
- (let [out (.createBlob blobstore key meta)]
- (try
- (copy in out)
- (.close out)
- (catch Exception e
- (.cancel out)
- (throw e))))))
-
-(defn read-blob
- "Read a blob in the blob store and write to an OutputStream"
- [key ^OutputStream out]
- (with-configured-blob-client blobstore
- (with-open [in (.getBlob blobstore key)]
- (copy in out))))
-
-(defn as-access-control
- "Convert a parameter to an AccessControl object"
- [param]
- (BlobStoreAclHandler/parseAccessControl (str param)))
-
-(defn as-acl
- [param]
- (map as-access-control (split param #",")))
-
-(defn access-control-str
- [^AccessControl acl]
- (BlobStoreAclHandler/accessControlToString acl))
-
-(defn read-cli [args]
- (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
- (if file
- (with-open [f (output-stream file)]
- (read-blob key f))
- (read-blob key System/out))))
-
-(defn update-cli [args]
- (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
- (if file
- (with-open [f (input-stream file)]
- (update-blob-from-stream key f))
- (update-blob-from-stream key System/in))
- (log-message "Successfully updated " key)))
-
-(defn create-cli [args]
- (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
- ["-a" "--acl" :default [] :parse-fn as-acl]
- ["-r" "--replication-factor" :default -1 :parse-fn #(Integer/parseInt %)])
- meta (doto (SettableBlobMeta. acl)
- (.set_replication_factor replication-factor))]
- (Utils/validateKeyName key)
- (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
- (if file
- (with-open [f (input-stream file)]
- (create-blob-from-stream key f meta))
- (create-blob-from-stream key System/in meta))
- (log-message "Successfully created " key)))
-
-(defn delete-cli [args]
- (with-configured-blob-client blobstore
- (doseq [key args]
- (.deleteBlob blobstore key)
- (log-message "deleted " key))))
-
-(defn list-cli [args]
- (with-configured-blob-client blobstore
- (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
- (doseq [key keys]
- (try
- (let [meta (.getBlobMeta blobstore key)
- version (.get_version meta)
- acl (.get_acl (.get_settable meta))]
- (log-message key " " version " " (pr-str (map access-control-str acl))))
- (catch AuthorizationException ae
- (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
- (catch KeyNotFoundException knf
- (if-not (empty? args) (log-error key " NOT FOUND"))))))))
-
-(defn set-acl-cli [args]
- (let [[{set-acl :set} [key] _]
- (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
- (with-configured-blob-client blobstore
- (let [meta (.getBlobMeta blobstore key)
- acl (.get_acl (.get_settable meta))
- new-acl (if set-acl set-acl acl)
- new-meta (SettableBlobMeta. new-acl)]
- (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl)))
- (.setBlobMeta blobstore key new-meta)))))
-
-(defn rep-cli [args]
- (let [sub-command (first args)
- new-args (rest args)]
- (with-configured-blob-client blobstore
- (condp = sub-command
- "--read" (let [key (first new-args)
- blob-replication (.getBlobReplication blobstore key)]
- (log-message "Current replication factor " blob-replication)
- blob-replication)
- "--update" (let [[{replication-factor :replication-factor} [key] _]
- (cli new-args ["-r" "--replication-factor" :parse-fn #(Integer/parseInt %)])]
- (if (nil? replication-factor)
- (throw (RuntimeException. (str "Please set the replication factor")))
- (let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
- (log-message "Replication factor is set to " blob-replication)
- blob-replication)))
- :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command")))))))
-
-(defn -main [& args]
- (let [command (first args)
- new-args (rest args)]
- (condp = command
- "cat" (read-cli new-args)
- "create" (create-cli new-args)
- "update" (update-cli new-args)
- "delete" (delete-cli new-args)
- "list" (list-cli new-args)
- "set-acl" (set-acl-cli new-args)
- "replication" (rep-cli new-args)
- :else (throw (RuntimeException. (str command " is not a supported blobstore command"))))))
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 93507ff..24d5398 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1781,16 +1781,16 @@
(.mark nimbus:num-downloadChunk-calls)
(check-authorization! nimbus nil nil "fileDownload")
(let [downloaders (:downloaders nimbus)
- ^BufferFileInputStream is (.get downloaders id)]
+ ^BufferInputStream is (.get downloaders id)]
(when-not is
(throw (RuntimeException.
- "Could not find input stream for that id")))
+ "Could not find input stream for id " id)))
(let [ret (.read is)]
(.put downloaders id is)
(when (empty? ret)
+ (.close is)
(.remove downloaders id))
- (ByteBuffer/wrap ret)
- )))
+ (ByteBuffer/wrap ret))))
(^String getNimbusConf [this]
(.mark nimbus:num-getNimbusConf-calls)
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
index 61e48b3..2cb1cce 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -23,7 +23,9 @@
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.KeyAlreadyExistsException;
import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
import java.util.Iterator;
import java.util.Map;
@@ -50,6 +52,21 @@
public abstract class ClientBlobStore implements Shutdownable {
protected Map conf;
+ public interface WithBlobstore {
+ void run(ClientBlobStore blobStore) throws Exception;
+ }
+
+ public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exception {
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ ClientBlobStore blobStore = Utils.getClientBlobStore(conf);
+
+ try {
+ withBlobstore.run(blobStore);
+ } finally {
+ blobStore.shutdown();
+ }
+ }
+
/**
* Sets up the client API by parsing the configs.
* @param conf The storm conf containing the config details.
diff --git a/storm-core/src/jvm/org/apache/storm/command/Blobstore.java b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
new file mode 100644
index 0000000..a5b01fc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class Blobstore {
+ private static final Logger LOG = LoggerFactory.getLogger(Blobstore.class);
+
+ public static void main(String[] args) throws Exception {
+ if (args.length == 0) {
+ throw new IllegalArgumentException("You should provide command.");
+ }
+
+ String command = args[0];
+ String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+
+ switch (command) {
+ case "cat":
+ readCli(newArgs);
+ break;
+
+ case "create":
+ createCli(newArgs);
+ break;
+
+ case "update":
+ updateCli(newArgs);
+ break;
+
+ case "delete":
+ deleteCli(newArgs);
+ break;
+
+ case "list":
+ listCli(newArgs);
+ break;
+
+ case "set-acl":
+ setAclCli(newArgs);
+ break;
+
+ case "replication":
+ replicationCli(newArgs);
+ break;
+
+ default:
+ throw new RuntimeException("" + command + " is not a supported blobstore command");
+ }
+ }
+
+ private static void readCli(String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+ .arg("key", CLI.FIRST_WINS)
+ .parse(args);
+ final String key = (String) cl.get("key");
+ final String file = (String) cl.get("f");
+
+ if (StringUtils.isNotEmpty(file)) {
+ try (BufferedOutputStream f = new BufferedOutputStream(new FileOutputStream(file))) {
+ BlobStoreSupport.readBlob(key, f);
+ }
+ } else {
+ BlobStoreSupport.readBlob(key, System.out);
+ }
+ }
+
+ private static void createCli(String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+ .opt("a", "acl", Collections.emptyList(), new AsAclParser())
+ .opt("r", "replication-factor", -1, CLI.AS_INT)
+ .arg("key", CLI.FIRST_WINS)
+ .parse(args);
+
+ final String key = (String) cl.get("key");
+ final String file = (String) cl.get("f");
+ final List<AccessControl> acl = (List<AccessControl>) cl.get("a");
+ final Integer replicationFactor = (Integer) cl.get("r");
+
+ SettableBlobMeta meta = new SettableBlobMeta(acl);
+ meta.set_replication_factor(replicationFactor);
+
+ Utils.validateKeyName(key);
+
+ LOG.info("Creating {} with ACL {}", key, generateAccessControlsInfo(acl));
+
+ if (StringUtils.isNotEmpty(file)) {
+ try (BufferedInputStream f = new BufferedInputStream(new FileInputStream(file))) {
+ BlobStoreSupport.createBlobFromStream(key, f, meta);
+ }
+ } else {
+ BlobStoreSupport.createBlobFromStream(key, System.in, meta);
+ }
+
+ LOG.info("Successfully created {}", key);
+ }
+
+ private static void updateCli(String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+ .arg("key", CLI.FIRST_WINS)
+ .parse(args);
+
+ final String key = (String) cl.get("key");
+ final String file = (String) cl.get("f");
+
+ if (StringUtils.isNotEmpty(file)) {
+ try (BufferedInputStream f = new BufferedInputStream(new FileInputStream(file))) {
+ BlobStoreSupport.updateBlobFromStream(key, f);
+ }
+ } else {
+ BlobStoreSupport.updateBlobFromStream(key, System.in);
+ }
+
+ LOG.info("Successfully updated {}", key);
+ }
+
+ private static void deleteCli(final String[] args) throws Exception {
+ ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+ @Override
+ public void run(ClientBlobStore blobStore) throws Exception {
+ for (String key : args) {
+ blobStore.deleteBlob(key);
+
+ LOG.info("deleted {}", key);
+ }
+ }
+ });
+ }
+
+ private static void listCli(final String[] args) throws Exception {
+ ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+ @Override
+ public void run(ClientBlobStore blobStore) throws Exception {
+ Iterator<String> keys;
+ boolean isArgsEmpty = (args == null || args.length == 0);
+ if (isArgsEmpty) {
+ keys = blobStore.listKeys();
+ } else {
+ keys = Arrays.asList(args).iterator();
+ }
+
+ while (keys.hasNext()) {
+ String key = keys.next();
+
+ try {
+ ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+ long version = meta.get_version();
+ List<AccessControl> acl = meta.get_settable().get_acl();
+
+ LOG.info("{} {} {}", key, version, generateAccessControlsInfo(acl));
+ } catch (AuthorizationException ae) {
+ if (!isArgsEmpty) {
+ LOG.error("ACCESS DENIED to key: {}", key);
+ }
+ } catch (KeyNotFoundException knf) {
+ if (!isArgsEmpty) {
+ LOG.error("{} NOT FOUND", key);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ private static void setAclCli(String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("s", "set", Collections.emptyList(), new AsAclParser())
+ .arg("key", CLI.FIRST_WINS)
+ .parse(args);
+
+ final String key = (String) cl.get("key");
+ final List<AccessControl> setAcl = (List<AccessControl>) cl.get("s");
+
+ ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+ @Override
+ public void run(ClientBlobStore blobStore) throws Exception {
+ ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+ List<AccessControl> acl = meta.get_settable().get_acl();
+ List<AccessControl> newAcl;
+ if (setAcl != null && !setAcl.isEmpty()) {
+ newAcl = setAcl;
+ } else {
+ newAcl = acl;
+ }
+
+ SettableBlobMeta newMeta = new SettableBlobMeta(newAcl);
+ LOG.info("Setting ACL for {} to {}", key, generateAccessControlsInfo(newAcl));
+ blobStore.setBlobMeta(key, newMeta);
+ }
+ });
+ }
+
+ private static void replicationCli(String[] args) throws Exception {
+ if (args.length == 0) {
+ throw new IllegalArgumentException("replication command needs at least subcommand as parameter.");
+ }
+ final String subCommand = args[0];
+ final String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+
+ ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+ @Override
+ public void run(ClientBlobStore blobStore) throws Exception {
+ switch (subCommand) {
+ case "--read":
+ if (newArgs.length == 0) {
+ throw new IllegalArgumentException("replication --read needs key as parameter.");
+ }
+
+ String key = newArgs[0];
+ int blobReplication = blobStore.getBlobReplication(key);
+ LOG.info("Current replication factor {}", blobReplication);
+ break;
+
+ case "--update":
+ updateReplicationFactor(blobStore, newArgs);
+ break;
+
+ default:
+ throw new RuntimeException("" + subCommand + " is not a supported blobstore command");
+ }
+ }
+
+ private void updateReplicationFactor(ClientBlobStore blobStore, String[] args) throws Exception {
+ Map<String, Object> cl = CLI.opt("r", "replication-factor", null, CLI.AS_INT)
+ .arg("key", CLI.FIRST_WINS)
+ .parse(args);
+
+ final String key = (String) cl.get("key");
+ final Integer replicationFactor = (Integer) cl.get("r");
+
+ if (replicationFactor == null) {
+ throw new RuntimeException("Please set the replication factor");
+ }
+
+ int blobReplication = blobStore.updateBlobReplication(key, replicationFactor);
+ LOG.info("Replication factor is set to {}", blobReplication);
+ }
+ });
+ }
+
+ private static final class BlobStoreSupport {
+ static void readBlob(final String key, final OutputStream os) throws Exception {
+ ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+ @Override
+ public void run(ClientBlobStore blobStore) throws Exception {
+ try (InputStreamWithMeta is = blobStore.getBlob(key)) {
+ IOUtils.copy(is, os);
+ }
+ }
+ });
+ }
+
+ static void createBlobFromStream(final String key, final InputStream is, final SettableBlobMeta meta) throws Exception {
+ ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+ @Override
+ public void run(ClientBlobStore blobStore) throws Exception {
+ AtomicOutputStream os = blobStore.createBlob(key, meta);
+ copyInputStreamToBlobOutputStream(is, os);
+ }
+ });
+ }
+
+ static void updateBlobFromStream(final String key, final InputStream is) throws Exception {
+ ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+ @Override
+ public void run(ClientBlobStore blobStore) throws Exception {
+ AtomicOutputStream os = blobStore.updateBlob(key);
+ copyInputStreamToBlobOutputStream(is, os);
+ }
+ });
+ }
+
+ static void copyInputStreamToBlobOutputStream(InputStream is, AtomicOutputStream os) throws IOException {
+ try {
+ IOUtils.copy(is, os);
+ os.close();
+ } catch (Exception e) {
+ os.cancel();
+ throw e;
+ }
+ }
+ }
+
+ private static List<String> generateAccessControlsInfo(List<AccessControl> acl) {
+ List<String> accessControlStrings = new ArrayList<>();
+ for (AccessControl ac : acl) {
+ accessControlStrings.add(BlobStoreAclHandler.accessControlToString(ac));
+ }
+ return accessControlStrings;
+ }
+
+ private static final class AsAclParser implements CLI.Parse {
+ @Override
+ public Object parse(String value) {
+ List<AccessControl> accessControls = new ArrayList<>();
+ for (String part : value.split(",")) {
+ accessControls.add(asAccessControl(part));
+ }
+
+ return accessControls;
+ }
+
+ private AccessControl asAccessControl(String param) {
+ return BlobStoreAclHandler.parseAccessControl(param);
+ }
+ }
+}
diff --git a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
index 1c6bd5c..840b2d3 100644
--- a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
+++ b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
@@ -59,8 +59,8 @@
*/
public List<Object> select(Fields selector, List<Object> tuple) {
List<Object> ret = new ArrayList<>(selector.size());
- for(String s: selector) {
- ret.add(tuple.get(_index.get(s)));
+ for (String s : selector) {
+ ret.add(tuple.get(fieldIndex(s)));
}
return ret;
}