Merge branch 'STORM-2010'
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8c623e5..7b9e910 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2009: port org.apache.storm.blobstore.clj and org.apache.storm.command.blobstore.clj to Java
  * STORM-1876: Option to build storm-kafka and storm-kafka-client with different kafka client version
  * STORM-2000: Package storm-opentsdb as part of external dir in installation
  * STORM-1962: support python 3 and 2 in multilang
@@ -162,6 +163,10 @@
  * STORM-1720: Support GEO in storm-redis
  * STORM-1868: Modify TridentKafkaWordCount to run in distributed mode
 
+## 1.0.3
+ * STORM-1594: org.apache.storm.tuple.Fields can throw NPE if given invalid field in selector
+ * STORM-1995: downloadChunk in nimbus.clj should close the input stream
+
 ## 1.0.2
  * STORM-1976: Remove cleanup-corrupt-topologies!
  * STORM-1977: Restore logic: give up leadership when elected as leader but doesn't have one or more of topology codes on local
diff --git a/bin/storm.py b/bin/storm.py
index 1668d70..c49ab09 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -326,7 +326,7 @@
     storm blobstore create mytopo:data.tgz -f data.tgz -a u:alice:rwa,u:bob:rw,o::r
     """
     exec_storm_class(
-        "org.apache.storm.command.blobstore",
+        "org.apache.storm.command.Blobstore",
         args=args,
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
diff --git a/storm-core/src/clj/org/apache/storm/blobstore.clj b/storm-core/src/clj/org/apache/storm/blobstore.clj
deleted file mode 100644
index 92fb44f..0000000
--- a/storm-core/src/clj/org/apache/storm/blobstore.clj
+++ /dev/null
@@ -1,28 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.blobstore
-  (:import [org.apache.storm.utils Utils ConfigUtils])
-  (:import [org.apache.storm.blobstore ClientBlobStore])
-  (:use [org.apache.storm config util]))
-
-(defmacro with-configured-blob-client
-  [client-sym & body]
-  `(let [conf# (clojurify-structure (ConfigUtils/readStormConfig))
-         ^ClientBlobStore ~client-sym (Utils/getClientBlobStore conf#)]
-     (try
-       ~@body
-       (finally (.shutdown ~client-sym)))))
diff --git a/storm-core/src/clj/org/apache/storm/command/blobstore.clj b/storm-core/src/clj/org/apache/storm/command/blobstore.clj
deleted file mode 100644
index 924f825..0000000
--- a/storm-core/src/clj/org/apache/storm/command/blobstore.clj
+++ /dev/null
@@ -1,163 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.blobstore
-  (:import [java.io InputStream OutputStream]
-           [org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException
-            KeyNotFoundException]
-           [org.apache.storm.blobstore BlobStoreAclHandler]
-           [org.apache.storm.utils Utils])
-  (:use [org.apache.storm config]
-        [clojure.string :only [split]]
-        [clojure.tools.cli :only [cli]]
-        [clojure.java.io :only [copy input-stream output-stream]]
-        [org.apache.storm blobstore log])
-  (:gen-class))
-
-(defn update-blob-from-stream
-  "Update a blob in the blob store from an InputStream"
-  [key ^InputStream in]
-  (with-configured-blob-client blobstore
-    (let [out (.updateBlob blobstore key)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (log-message e)
-          (.cancel out)
-          (throw e))))))
-
-(defn create-blob-from-stream
-  "Create a blob in the blob store from an InputStream"
-  [key ^InputStream in ^SettableBlobMeta meta]
-  (with-configured-blob-client blobstore
-    (let [out (.createBlob blobstore key meta)]
-      (try 
-        (copy in out)
-        (.close out)
-        (catch Exception e
-          (.cancel out)
-          (throw e))))))
-
-(defn read-blob
-  "Read a blob in the blob store and write to an OutputStream"
-  [key ^OutputStream out]
-  (with-configured-blob-client blobstore
-    (with-open [in (.getBlob blobstore key)]
-      (copy in out))))
-
-(defn as-access-control
-  "Convert a parameter to an AccessControl object"
-  [param]
-  (BlobStoreAclHandler/parseAccessControl (str param)))
-
-(defn as-acl
-  [param]
-  (map as-access-control (split param #",")))
-
-(defn access-control-str
-  [^AccessControl acl]
-  (BlobStoreAclHandler/accessControlToString acl))
-
-(defn read-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (output-stream file)]
-        (read-blob key f))
-      (read-blob key System/out))))
-
-(defn update-cli [args]
-  (let [[{file :file} [key] _] (cli args ["-f" "--file" :default nil])]
-    (if file
-      (with-open [f (input-stream file)]
-        (update-blob-from-stream key f))
-      (update-blob-from-stream key System/in))
-    (log-message "Successfully updated " key)))
-
-(defn create-cli [args]
-  (let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
-                                                  ["-a" "--acl" :default [] :parse-fn as-acl]
-                                                  ["-r" "--replication-factor" :default -1 :parse-fn #(Integer/parseInt %)])
-        meta (doto (SettableBlobMeta. acl)
-                   (.set_replication_factor replication-factor))]
-    (Utils/validateKeyName key)
-    (log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
-    (if file
-      (with-open [f (input-stream file)]
-        (create-blob-from-stream key f meta))
-      (create-blob-from-stream key System/in meta))
-    (log-message "Successfully created " key)))
-
-(defn delete-cli [args]
-  (with-configured-blob-client blobstore
-    (doseq [key args]
-      (.deleteBlob blobstore key)
-      (log-message "deleted " key))))
-
-(defn list-cli [args]
-  (with-configured-blob-client blobstore
-    (let [keys (if (empty? args) (iterator-seq (.listKeys blobstore)) args)]
-      (doseq [key keys]
-        (try
-          (let [meta (.getBlobMeta blobstore key)
-                version (.get_version meta)
-                acl (.get_acl (.get_settable meta))]
-            (log-message key " " version " " (pr-str (map access-control-str acl))))
-          (catch AuthorizationException ae
-            (if-not (empty? args) (log-error "ACCESS DENIED to key: " key)))
-          (catch KeyNotFoundException knf
-            (if-not (empty? args) (log-error key " NOT FOUND"))))))))
-
-(defn set-acl-cli [args]
-  (let [[{set-acl :set} [key] _]
-           (cli args ["-s" "--set" :default [] :parse-fn as-acl])]
-    (with-configured-blob-client blobstore
-      (let [meta (.getBlobMeta blobstore key)
-            acl (.get_acl (.get_settable meta))
-            new-acl (if set-acl set-acl acl)
-            new-meta (SettableBlobMeta. new-acl)]
-        (log-message "Setting ACL for " key " to " (pr-str (map access-control-str new-acl)))
-        (.setBlobMeta blobstore key new-meta)))))
-
-(defn rep-cli [args]
-  (let [sub-command (first args)
-        new-args (rest args)]
-    (with-configured-blob-client blobstore
-      (condp = sub-command
-      "--read" (let [key (first new-args)
-                     blob-replication (.getBlobReplication blobstore key)]
-                 (log-message "Current replication factor " blob-replication)
-                 blob-replication)
-      "--update" (let [[{replication-factor :replication-factor} [key] _]
-                        (cli new-args ["-r" "--replication-factor" :parse-fn #(Integer/parseInt %)])]
-                   (if (nil? replication-factor)
-                     (throw (RuntimeException. (str "Please set the replication factor")))
-                     (let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
-                       (log-message "Replication factor is set to " blob-replication)
-                       blob-replication)))
-      :else (throw (RuntimeException. (str sub-command " is not a supported blobstore command")))))))
-
-(defn -main [& args]
-  (let [command (first args)
-        new-args (rest args)]
-    (condp = command
-      "cat" (read-cli new-args)
-      "create" (create-cli new-args)
-      "update" (update-cli new-args)
-      "delete" (delete-cli new-args)
-      "list" (list-cli new-args)
-      "set-acl" (set-acl-cli new-args)
-      "replication" (rep-cli new-args)
-      :else (throw (RuntimeException. (str command " is not a supported blobstore command"))))))
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 93507ff..24d5398 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1781,16 +1781,16 @@
         (.mark nimbus:num-downloadChunk-calls)
         (check-authorization! nimbus nil nil "fileDownload")
         (let [downloaders (:downloaders nimbus)
-              ^BufferFileInputStream is (.get downloaders id)]
+              ^BufferInputStream is (.get downloaders id)]
           (when-not is
             (throw (RuntimeException.
-                    "Could not find input stream for that id")))
+                    "Could not find input stream for id " id)))
           (let [ret (.read is)]
             (.put downloaders id is)
             (when (empty? ret)
+              (.close is)
               (.remove downloaders id))
-            (ByteBuffer/wrap ret)
-            )))
+            (ByteBuffer/wrap ret))))
 
       (^String getNimbusConf [this]
         (.mark nimbus:num-getNimbusConf-calls)
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
index 61e48b3..2cb1cce 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/ClientBlobStore.java
@@ -23,7 +23,9 @@
 import org.apache.storm.generated.SettableBlobMeta;
 import org.apache.storm.generated.KeyAlreadyExistsException;
 import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
 
 import java.util.Iterator;
 import java.util.Map;
@@ -50,6 +52,21 @@
 public abstract class ClientBlobStore implements Shutdownable {
     protected Map conf;
 
+    public interface WithBlobstore {
+        void run(ClientBlobStore blobStore) throws Exception;
+    }
+
+    public static void withConfiguredClient(WithBlobstore withBlobstore) throws Exception {
+        Map<String, Object> conf = ConfigUtils.readStormConfig();
+        ClientBlobStore blobStore = Utils.getClientBlobStore(conf);
+
+        try {
+            withBlobstore.run(blobStore);
+        } finally {
+            blobStore.shutdown();
+        }
+    }
+
     /**
      * Sets up the client API by parsing the configs.
      * @param conf The storm conf containing the config details.
diff --git a/storm-core/src/jvm/org/apache/storm/command/Blobstore.java b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
new file mode 100644
index 0000000..a5b01fc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Blobstore.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.blobstore.AtomicOutputStream;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.InputStreamWithMeta;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class Blobstore {
+    private static final Logger LOG = LoggerFactory.getLogger(Blobstore.class);
+
+    public static void main(String[] args) throws Exception {
+        if (args.length == 0) {
+            throw new IllegalArgumentException("You should provide command.");
+        }
+
+        String command = args[0];
+        String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+
+        switch (command) {
+            case "cat":
+                readCli(newArgs);
+                break;
+
+            case "create":
+                createCli(newArgs);
+                break;
+
+            case "update":
+                updateCli(newArgs);
+                break;
+
+            case "delete":
+                deleteCli(newArgs);
+                break;
+
+            case "list":
+                listCli(newArgs);
+                break;
+
+            case "set-acl":
+                setAclCli(newArgs);
+                break;
+
+            case "replication":
+                replicationCli(newArgs);
+                break;
+
+            default:
+                throw new RuntimeException("" + command + " is not a supported blobstore command");
+        }
+    }
+
+    private static void readCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+        final String key = (String) cl.get("key");
+        final String file = (String) cl.get("f");
+
+        if (StringUtils.isNotEmpty(file)) {
+            try (BufferedOutputStream f = new BufferedOutputStream(new FileOutputStream(file))) {
+                BlobStoreSupport.readBlob(key, f);
+            }
+        } else {
+            BlobStoreSupport.readBlob(key, System.out);
+        }
+    }
+
+    private static void createCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+                .opt("a", "acl", Collections.emptyList(), new AsAclParser())
+                .opt("r", "replication-factor", -1, CLI.AS_INT)
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+
+        final String key = (String) cl.get("key");
+        final String file = (String) cl.get("f");
+        final List<AccessControl> acl = (List<AccessControl>) cl.get("a");
+        final Integer replicationFactor = (Integer) cl.get("r");
+
+        SettableBlobMeta meta = new SettableBlobMeta(acl);
+        meta.set_replication_factor(replicationFactor);
+
+        Utils.validateKeyName(key);
+
+        LOG.info("Creating {} with ACL {}", key, generateAccessControlsInfo(acl));
+
+        if (StringUtils.isNotEmpty(file)) {
+            try (BufferedInputStream f = new BufferedInputStream(new FileInputStream(file))) {
+                BlobStoreSupport.createBlobFromStream(key, f, meta);
+            }
+        } else {
+            BlobStoreSupport.createBlobFromStream(key, System.in, meta);
+        }
+
+        LOG.info("Successfully created {}", key);
+    }
+
+    private static void updateCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("f", "file", null, CLI.AS_STRING)
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+
+        final String key = (String) cl.get("key");
+        final String file = (String) cl.get("f");
+
+        if (StringUtils.isNotEmpty(file)) {
+            try (BufferedInputStream f = new BufferedInputStream(new FileInputStream(file))) {
+                BlobStoreSupport.updateBlobFromStream(key, f);
+            }
+        } else {
+            BlobStoreSupport.updateBlobFromStream(key, System.in);
+        }
+
+        LOG.info("Successfully updated {}", key);
+    }
+
+    private static void deleteCli(final String[] args) throws Exception {
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                for (String key : args) {
+                    blobStore.deleteBlob(key);
+
+                    LOG.info("deleted {}", key);
+                }
+            }
+        });
+    }
+
+    private static void listCli(final String[] args) throws Exception {
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                Iterator<String> keys;
+                boolean isArgsEmpty = (args == null || args.length == 0);
+                if (isArgsEmpty) {
+                    keys = blobStore.listKeys();
+                } else {
+                    keys = Arrays.asList(args).iterator();
+                }
+
+                while (keys.hasNext()) {
+                    String key = keys.next();
+
+                    try {
+                        ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+                        long version = meta.get_version();
+                        List<AccessControl> acl = meta.get_settable().get_acl();
+
+                        LOG.info("{} {} {}", key, version, generateAccessControlsInfo(acl));
+                    } catch (AuthorizationException ae) {
+                        if (!isArgsEmpty) {
+                            LOG.error("ACCESS DENIED to key: {}", key);
+                        }
+                    } catch (KeyNotFoundException knf) {
+                        if (!isArgsEmpty) {
+                            LOG.error("{} NOT FOUND", key);
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    private static void setAclCli(String[] args) throws Exception {
+        Map<String, Object> cl = CLI.opt("s", "set", Collections.emptyList(), new AsAclParser())
+                .arg("key", CLI.FIRST_WINS)
+                .parse(args);
+
+        final String key = (String) cl.get("key");
+        final List<AccessControl> setAcl = (List<AccessControl>) cl.get("s");
+
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                ReadableBlobMeta meta = blobStore.getBlobMeta(key);
+                List<AccessControl> acl = meta.get_settable().get_acl();
+                List<AccessControl> newAcl;
+                if (setAcl != null && !setAcl.isEmpty()) {
+                    newAcl = setAcl;
+                } else {
+                    newAcl = acl;
+                }
+
+                SettableBlobMeta newMeta = new SettableBlobMeta(newAcl);
+                LOG.info("Setting ACL for {} to {}", key, generateAccessControlsInfo(newAcl));
+                blobStore.setBlobMeta(key, newMeta);
+            }
+        });
+    }
+
+    private static void replicationCli(String[] args) throws Exception {
+        if (args.length == 0) {
+            throw new IllegalArgumentException("replication command needs at least subcommand as parameter.");
+        }
+        final String subCommand = args[0];
+        final String[] newArgs = Arrays.copyOfRange(args, 1, args.length);
+
+        ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+            @Override
+            public void run(ClientBlobStore blobStore) throws Exception {
+                switch (subCommand) {
+                    case "--read":
+                        if (newArgs.length == 0) {
+                            throw new IllegalArgumentException("replication --read needs key as parameter.");
+                        }
+
+                        String key = newArgs[0];
+                        int blobReplication = blobStore.getBlobReplication(key);
+                        LOG.info("Current replication factor {}", blobReplication);
+                        break;
+
+                    case "--update":
+                        updateReplicationFactor(blobStore, newArgs);
+                        break;
+
+                    default:
+                        throw new RuntimeException("" + subCommand + " is not a supported blobstore command");
+                }
+            }
+
+            private void updateReplicationFactor(ClientBlobStore blobStore, String[] args) throws Exception {
+                Map<String, Object> cl = CLI.opt("r", "replication-factor", null, CLI.AS_INT)
+                        .arg("key", CLI.FIRST_WINS)
+                        .parse(args);
+
+                final String key = (String) cl.get("key");
+                final Integer replicationFactor = (Integer) cl.get("r");
+
+                if (replicationFactor == null) {
+                    throw new RuntimeException("Please set the replication factor");
+                }
+
+                int blobReplication = blobStore.updateBlobReplication(key, replicationFactor);
+                LOG.info("Replication factor is set to {}", blobReplication);
+            }
+        });
+    }
+
+    private static final class BlobStoreSupport {
+        static void readBlob(final String key, final OutputStream os) throws Exception {
+            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+                @Override
+                public void run(ClientBlobStore blobStore) throws Exception {
+                    try (InputStreamWithMeta is = blobStore.getBlob(key)) {
+                        IOUtils.copy(is, os);
+                    }
+                }
+            });
+        }
+
+        static void createBlobFromStream(final String key, final InputStream is, final SettableBlobMeta meta) throws Exception {
+            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+                @Override
+                public void run(ClientBlobStore blobStore) throws Exception {
+                    AtomicOutputStream os = blobStore.createBlob(key, meta);
+                    copyInputStreamToBlobOutputStream(is, os);
+                }
+            });
+        }
+
+        static void updateBlobFromStream(final String key, final InputStream is) throws Exception {
+            ClientBlobStore.withConfiguredClient(new ClientBlobStore.WithBlobstore() {
+                @Override
+                public void run(ClientBlobStore blobStore) throws Exception {
+                    AtomicOutputStream os = blobStore.updateBlob(key);
+                    copyInputStreamToBlobOutputStream(is, os);
+                }
+            });
+        }
+
+        static void copyInputStreamToBlobOutputStream(InputStream is, AtomicOutputStream os) throws IOException {
+            try {
+                IOUtils.copy(is, os);
+                os.close();
+            } catch (Exception e) {
+                os.cancel();
+                throw e;
+            }
+        }
+    }
+
+    private static List<String> generateAccessControlsInfo(List<AccessControl> acl) {
+        List<String> accessControlStrings = new ArrayList<>();
+        for (AccessControl ac : acl) {
+            accessControlStrings.add(BlobStoreAclHandler.accessControlToString(ac));
+        }
+        return accessControlStrings;
+    }
+
+    private static final class AsAclParser implements CLI.Parse {
+        @Override
+        public Object parse(String value) {
+            List<AccessControl> accessControls = new ArrayList<>();
+            for (String part : value.split(",")) {
+                accessControls.add(asAccessControl(part));
+            }
+
+            return accessControls;
+        }
+
+        private AccessControl asAccessControl(String param) {
+            return BlobStoreAclHandler.parseAccessControl(param);
+        }
+    }
+}
diff --git a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
index 1c6bd5c..840b2d3 100644
--- a/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
+++ b/storm-core/src/jvm/org/apache/storm/tuple/Fields.java
@@ -59,8 +59,8 @@
      */
     public List<Object> select(Fields selector, List<Object> tuple) {
         List<Object> ret = new ArrayList<>(selector.size());
-        for(String s: selector) {
-            ret.add(tuple.get(_index.get(s)));
+        for (String s : selector) {
+            ret.add(tuple.get(fieldIndex(s))); 
         }
         return ret;
     }