STORM-3026: Upgrade ZK instance for security
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index ceabd59..e24b879 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -47,6 +47,8 @@
storm.nimbus.retry.times: 5
storm.nimbus.retry.interval.millis: 2000
storm.nimbus.retry.intervalceiling.millis: 60000
+storm.nimbus.zookeeper.acls.check: true
+storm.nimbus.zookeeper.acls.fixup: true
storm.auth.simple-white-list.users: []
storm.auth.simple-acl.users: []
storm.auth.simple-acl.users.commands: []
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj
index 810b3c3..f1a0412 100644
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster.clj
@@ -30,13 +30,22 @@
(:require [org.apache.storm [zookeeper :as zk]])
(:require [org.apache.storm.daemon [common :as common]]))
-(defn mk-topo-only-acls
- [topo-conf]
+(defn mk-topo-acls
+ [topo-conf type]
(let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)]
(when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
[(first ZooDefs$Ids/CREATOR_ALL_ACL)
- (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))])))
-
+ (ACL. type (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))])))
+
+(defn mk-topo-read-write-acls
+ [topo-conf]
+ (mk-topo-acls topo-conf ZooDefs$Perms/ALL))
+
+(defn mk-topo-read-only-acls
+ [topo-conf]
+ [topo-conf]
+ (mk-topo-acls topo-conf ZooDefs$Perms/READ))
+
(defnk mk-distributed-cluster-state
[conf :auth-conf nil :acls nil :context (ClusterStateContext.)]
(let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE)
@@ -68,26 +77,27 @@
(executor-beats [this storm-id executor->node+port])
(supervisors [this callback])
(supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
- (setup-heartbeats! [this storm-id])
+ (setup-heartbeats! [this storm-id topo-conf])
(teardown-heartbeats! [this storm-id])
(teardown-topology-errors! [this storm-id])
(heartbeat-storms [this])
(error-topologies [this])
(backpressure-topologies [this])
- (set-topology-log-config! [this storm-id log-config])
+ (set-topology-log-config! [this storm-id log-config topo-conf])
(topology-log-config [this storm-id cb])
(worker-heartbeat! [this storm-id node port info])
(remove-worker-heartbeat! [this storm-id node port])
(supervisor-heartbeat! [this supervisor-id info])
(worker-backpressure! [this storm-id node port info])
(topology-backpressure [this storm-id callback])
- (setup-backpressure! [this storm-id])
+ (setup-backpressure! [this storm-id topo-conf])
(remove-backpressure! [this storm-id])
(remove-worker-backpressure! [this storm-id node port])
- (activate-storm! [this storm-id storm-base])
+ (activate-storm! [this storm-id storm-base topo-conf])
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
- (set-assignment! [this storm-id info])
+ (setup-errors! [this storm-id topo-conf])
+ (set-assignment! [this storm-id info topo-conf])
;; sets up information related to key consisting of nimbus
;; host:port and version info of the blob
(setup-blobstore! [this key nimbusInfo versionInfo])
@@ -416,8 +426,9 @@
(maybe-deserialize (.get_data cluster-state (log-config-path storm-id) (not-nil? cb)) LogConfig))
(set-topology-log-config!
- [this storm-id log-config]
- (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls))
+ [this storm-id log-config topo-conf]
+ (.mkdirs cluster-state LOGCONFIG-SUBTREE acls)
+ (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) (mk-topo-read-only-acls topo-conf)))
(set-worker-profile-request
[this storm-id profile-request]
@@ -472,8 +483,9 @@
(.delete_worker_hb cluster-state (workerbeat-path storm-id node port)))
(setup-heartbeats!
- [this storm-id]
- (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls))
+ [this storm-id topo-conf]
+ (.mkdirs cluster-state WORKERBEATS-SUBTREE acls)
+ (.mkdirs cluster-state (workerbeat-storm-root storm-id) (mk-topo-read-write-acls topo-conf)))
(teardown-heartbeats!
[this storm-id]
@@ -506,8 +518,9 @@
(> (count children) 0)))
(setup-backpressure!
- [this storm-id]
- (.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
+ [this storm-id topo-conf]
+ (.mkdirs cluster-state BACKPRESSURE-SUBTREE acls)
+ (.mkdirs cluster-state (backpressure-storm-root storm-id) (mk-topo-read-write-acls topo-conf)))
(remove-backpressure!
[this storm-id]
@@ -533,9 +546,10 @@
(.set_ephemeral_node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls)))
(activate-storm!
- [this storm-id storm-base]
+ [this storm-id storm-base topo-conf]
(let [thrift-storm-base (thriftify-storm-base storm-base)]
- (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls)))
+ (.mkdirs cluster-state STORMS-SUBTREE acls)
+ (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) (mk-topo-read-only-acls topo-conf))))
(update-storm!
[this storm-id new-elems]
@@ -562,9 +576,10 @@
(.delete_node cluster-state (storm-path storm-id)))
(set-assignment!
- [this storm-id info]
+ [this storm-id info topo-conf]
+ (.mkdirs cluster-state ASSIGNMENTS-SUBTREE acls)
(let [thrift-assignment (thriftify-assignment info)]
- (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
+ (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) (mk-topo-read-only-acls topo-conf))))
(remove-blobstore-key!
[this blob-key]
@@ -585,9 +600,10 @@
(set-credentials!
[this storm-id creds topo-conf]
- (let [topo-acls (mk-topo-only-acls topo-conf)
+ (let [topo-acls (mk-topo-read-only-acls topo-conf)
path (credentials-path storm-id)
thriftified-creds (thriftify-credentials creds)]
+ (.mkdirs cluster-state CREDENTIALS-SUBTREE acls)
(.set_data cluster-state path (Utils/serialize thriftified-creds) topo-acls)))
(credentials
@@ -596,6 +612,11 @@
(swap! credentials-callback assoc storm-id callback))
(clojurify-crdentials (maybe-deserialize (.get_data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials)))
+ (setup-errors!
+ [this storm-id topo-conf]
+ (.mkdirs cluster-state ERRORS-SUBTREE acls)
+ (.mkdirs cluster-state (error-storm-root storm-id) (mk-topo-read-write-acls topo-conf)))
+
(report-error
[this storm-id component-id node port error]
(let [path (error-path storm-id component-id)
diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
index fdef972..d2915eb 100644
--- a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
+++ b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
@@ -28,7 +28,7 @@
(ZKStateStorage. conf, auth-conf, acls, context))
(defn -mkState [this conf auth-conf acls context]
- (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
+ (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) acls :auth-conf auth-conf)]
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
(.close zk))
(let [callbacks (atom {})
@@ -36,6 +36,7 @@
zk-writer (zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
+ acls
:auth-conf auth-conf
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
@@ -50,6 +51,7 @@
(zk/mk-client conf
(conf STORM-ZOOKEEPER-SERVERS)
(conf STORM-ZOOKEEPER-PORT)
+ acls
:auth-conf auth-conf
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 887ab3b..1bb3d10 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -15,20 +15,18 @@
;; limitations under the License.
(ns org.apache.storm.command.shell-submission
(:import [org.apache.storm StormSubmitter])
- (:use [org.apache.storm thrift util config log zookeeper])
+ (:use [org.apache.storm util config log])
(:require [clojure.string :as str])
+ (:import [org.apache.storm.utils ConfigUtils NimbusClient])
(:gen-class))
-
(defn -main [^String tmpjarpath & args]
- (let [conf (read-storm-config)
- ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
- zk-leader-elector (zk-leader-elector conf nil)
- leader-nimbus (.getLeader zk-leader-elector)
- host (.getHost leader-nimbus)
- port (.getPort leader-nimbus)
- no-op (.close zk-leader-elector)
- jarpath (StormSubmitter/submitJar conf tmpjarpath)
- args (concat args [host port jarpath])]
- (exec-command! (str/join " " args))
- ))
+ (let [conf (clojurify-structure (ConfigUtils/readStormConfig))]
+ (with-open [client (NimbusClient/getConfiguredClient conf)]
+ (let [c (.getClient client)
+ ns (.getLeader c)
+ host (.get_host ns)
+ port (.get_port ns)
+ jarpath (StormSubmitter/submitJar conf tmpjarpath)
+ args (concat args [host port jarpath])]
+ (exec-command! (str/join " " args))))))
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 6eaf643..779fdd3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -54,6 +54,7 @@
[converter :as converter]
[stats :as stats]])
(:require [clojure.set :as set])
+ (:import [org.apache.storm.zookeeper AclEnforcement])
(:import [org.apache.storm.daemon.common StormBase Assignment])
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm config])
@@ -124,6 +125,16 @@
scheduler
))
+(def NIMBUS-ZK-ACLS ZooDefs$Ids/CREATOR_ALL_ACL)
+
+(defn mk-zk-client [conf]
+ (let [zk-servers (conf STORM-ZOOKEEPER-SERVERS)
+ zk-port (conf STORM-ZOOKEEPER-PORT)
+ zk-root (conf STORM-ZOOKEEPER-ROOT)]
+ (if (and zk-servers zk-port)
+ (mk-client conf zk-servers zk-port (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil) :root zk-root
+ :auth-conf conf))))
+
(defmulti blob-sync cluster-mode)
(defnk is-leader [nimbus :throw-exception true]
@@ -133,10 +144,6 @@
(let [leader-address (.getLeader leader-elector)]
(throw (RuntimeException. (str "not a leader, current leader is " leader-address))))))))
-(def NIMBUS-ZK-ACLS
- [(first ZooDefs$Ids/CREATOR_ALL_ACL)
- (ACL. (bit-or ZooDefs$Perms/READ ZooDefs$Perms/CREATE) ZooDefs$Ids/ANYONE_ID_UNSAFE)])
-
(defn mk-blob-cache-map
"Constructs a TimeCacheMap instance with a blob store timeout whose
expiration callback invokes cancel on the value held by an expired entry when
@@ -196,7 +203,7 @@
(exit-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
- :leader-elector (zk-leader-elector conf blob-store)
+ :leader-elector (zk-leader-elector conf blob-store (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil))
:id->sched-status (atom {})
:node-id->resources (atom {}) ;;resources of supervisors
:id->resources (atom {}) ;;resources of topologies
@@ -454,7 +461,7 @@
(defn- get-version-for-key [key nimbus-host-port-info conf]
(let [version (KeySequenceNumber. key nimbus-host-port-info)]
- (.getKeySequenceNumber version conf)))
+ (.getKeySequenceNumber version conf (if (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS nil))))
(defn get-key-seq-from-blob-store [blob-store]
(let [key-iter (.listKeys blob-store)]
@@ -933,7 +940,7 @@
td (.get tds tid)
assignment (if (and (not (:owner assignment)) (not (nil? td)))
(let [new-assignment (fixup-assignment assignment td)]
- (.set-assignment! storm-cluster-state tid new-assignment)
+ (.set-assignment! storm-cluster-state tid new-assignment (.getConf td))
new-assignment)
assignment)]
{tid assignment}))))
@@ -987,7 +994,7 @@
(log-debug "Assignment for " topology-id " hasn't changed")
(do
(log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
- (.set-assignment! storm-cluster-state topology-id assignment)
+ (.set-assignment! storm-cluster-state topology-id assignment (.getConf topology-details))
)))
(->> new-assignments
(map (fn [[topology-id assignment]]
@@ -1025,7 +1032,8 @@
nil
nil
{}
- principal))
+ principal)
+ storm-conf)
(notify-topology-action-listener nimbus storm-name "activate")))
(defn storm-active? [storm-cluster-state storm-name]
@@ -1561,7 +1569,7 @@
(log-message "uploadedJar " uploadedJarLocation)
(setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology)
(wait-for-desired-code-replication nimbus total-storm-conf storm-id)
- (.setup-heartbeats! storm-cluster-state storm-id)
+ (.setup-heartbeats! storm-cluster-state storm-id total-storm-conf)
(if (total-storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
(.setup-backpressure! storm-cluster-state storm-id))
(notify-topology-action-listener nimbus storm-name "submitTopology")
@@ -1710,7 +1718,7 @@
(.containsKey named-loggers logger-name))
(.remove named-loggers logger-name))))))
(log-message "Setting log config for " storm-name ":" merged-log-config)
- (.set-topology-log-config! storm-cluster-state id merged-log-config)))
+ (.set-topology-log-config! storm-cluster-state id merged-log-config topology-conf)))
(uploadNewCredentials [this storm-name credentials]
(mark! nimbus:num-uploadNewCredentials-calls)
@@ -2269,8 +2277,12 @@
(defn -launch [nimbus]
(let [conf (merge
(read-storm-config)
- (read-yaml-config "storm-cluster-auth.yaml" false))]
- (launch-server! conf nimbus)))
+ (read-yaml-config "storm-cluster-auth.yaml" false))
+ fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-FIXUP)
+ check-acl (or fixup-acl (conf STORM-NIMBUS-ZOOKEEPER-ACLS-CHECK))]
+ (when check-acl
+ (AclEnforcement/verifyAcls conf fixup-acl))
+ (launch-server! conf nimbus)))
(defn standalone-nimbus []
(reify INimbus
diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj
index 2b5da55..5633f04 100644
--- a/storm-core/src/clj/org/apache/storm/zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj
@@ -53,11 +53,11 @@
(log-message "Zookeeper state update: " state type path))
(defnk mk-client
- [conf servers port
+ [conf servers port default-acl
:root ""
:watcher default-watcher
:auth-conf nil]
- (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
+ (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)) default-acl)]
(.. fk
(getCuratorListenable)
(addListener
@@ -273,9 +273,9 @@
(defn zk-leader-elector
"Zookeeper Implementation of ILeaderElector."
- [conf blob-store]
+ [conf blob-store default-acl]
(let [servers (conf STORM-ZOOKEEPER-SERVERS)
- zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)
+ zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) default-acl :auth-conf conf)
leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
id (.toHostPortString (NimbusInfo/fromConf conf))
leader-latch (atom (LeaderLatch. zk leader-lock-path id))
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java
index b626495..547de78 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -53,6 +53,21 @@
private static final long serialVersionUID = -1550278723792864455L;
/**
+ * In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not
+ * don't start nimbus.
+ */
+ @isBoolean
+ public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK = "storm.nimbus.zookeeper.acls.check";
+
+ /**
+ * In nimbus on startup check if all of the zookeeper ACLs are correct before starting. If not do
+ * your best to fix them before nimbus starts, if it cannot fix them nimbus will not start.
+ * This overrides any value set for storm.nimbus.zookeeper.acls.check.
+ */
+ @isBoolean
+ public static final String STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP = "storm.nimbus.zookeeper.acls.fixup";
+
+ /**
* This is part of a temporary workaround to a ZK bug, it is the 'scheme:acl' for
* the user Nimbus and Supervisors use to authenticate with ZK.
*/
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index 8a17d36..fec74b7 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -31,6 +31,7 @@
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,11 +48,15 @@
private static final String BLOBSTORE_SUBTREE="/blobstore";
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class);
- public static CuratorFramework createZKClient(Map conf) {
+ public static String getBlobStoreSubtree() {
+ return BLOBSTORE_SUBTREE;
+ }
+
+ public static CuratorFramework createZKClient(Map conf, List<ACL> defaultAcls) {
List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf);
- CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo);
+ CuratorFramework zkClient = Utils.newCurator(conf, zkServers, port, (String) conf.get(Config.STORM_ZOOKEEPER_ROOT), zkAuthInfo, defaultAcls);
zkClient.start();
return zkClient;
}
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
index 3321bcf..e3c2ae6 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -20,6 +20,7 @@
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +73,7 @@
public synchronized void syncBlobs() {
try {
LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet());
- zkClient = BlobStoreUtils.createZKClient(conf);
+ zkClient = BlobStoreUtils.createZKClient(conf, ZooDefs.Ids.CREATOR_ALL_ACL);
deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet());
updateKeySetForBlobStore(getBlobStoreKeySet());
Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
index adbd4c4..c175a97 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -25,6 +25,7 @@
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,9 +133,9 @@
this.nimbusInfo = nimbusInfo;
}
- public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException {
+ public synchronized int getKeySequenceNumber(Map conf, List<ACL> defaultAcls) throws KeyNotFoundException {
TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
- CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
+ CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf, defaultAcls);
try {
// Key has not been created yet and it is the first time it is being created
if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index e85d9e0..281cfbf 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -28,6 +28,8 @@
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +83,11 @@
public void prepare(Map conf, String overrideBase, NimbusInfo nimbusInfo) {
this.conf = conf;
this.nimbusInfo = nimbusInfo;
- zkClient = BlobStoreUtils.createZKClient(conf);
+ List<ACL> acl = null;
+ if (Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+ acl = ZooDefs.Ids.CREATOR_ALL_ACL;
+ }
+ zkClient = BlobStoreUtils.createZKClient(conf, acl);
if (overrideBase == null) {
overrideBase = ConfigUtils.absoluteStormBlobStoreDir(conf);
}
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 96c177b..11ab035 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -43,7 +43,6 @@
public static final String ZK_SEPERATOR = "/";
public static final String ASSIGNMENTS_ROOT = "assignments";
- public static final String CODE_ROOT = "code";
public static final String STORMS_ROOT = "storms";
public static final String SUPERVISORS_ROOT = "supervisors";
public static final String WORKERBEATS_ROOT = "workerbeats";
@@ -91,15 +90,38 @@
_instance = INSTANCE;
}
- public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws NoSuchAlgorithmException {
+ /**
+ * Get ZK ACLs for a topology to have read/write access.
+ * @param topoConf the topology config.
+ * @return the ACLs.
+ */
+ public static List<ACL> mkTopoReadWriteAcls(Map<String, Object> topoConf) {
+ return mkTopoAcls(topoConf, ZooDefs.Perms.ALL);
+ }
+
+ /**
+ * Get ZK ACLs for a topology to have read only access.
+ * @param topoConf the topology config.
+ * @return the ACLs.
+ */
+ public static List<ACL> mkTopoReadOnlyAcls(Map<String, Object> topoConf) {
+ return mkTopoAcls(topoConf, ZooDefs.Perms.READ);
+ }
+
+ private static List<ACL> mkTopoAcls(Map<String, Object> topoConf, int perms) {
List<ACL> aclList = null;
String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
aclList = new ArrayList<>();
ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
aclList.add(acl1);
- ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
- aclList.add(acl2);
+ try {
+ ACL acl2 = new ACL(perms, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
+ aclList.add(acl2);
+ } catch (NoSuchAlgorithmException e) {
+ //Should only happen on a badly configured system
+ throw new RuntimeException(e);
+ }
}
return aclList;
}
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index a6f07ed..c93a6b2 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -59,7 +59,7 @@
public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
- public void setupHeatbeats(String stormId);
+ public void setupHeatbeats(String stormId, Map<String, Object> topoConf);
public void teardownHeartbeats(String stormId);
@@ -71,7 +71,7 @@
public List<String> backpressureTopologies();
- public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+ public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf);
public LogConfig topologyLogConfig(String stormId, Runnable cb);
@@ -85,19 +85,19 @@
public boolean topologyBackpressure(String stormId, Runnable callback);
- public void setupBackpressure(String stormId);
+ public void setupBackpressure(String stormId, Map<String, Object> topoConf);
public void removeBackpressure(String stormId);
public void removeWorkerBackpressure(String stormId, String node, Long port);
- public void activateStorm(String stormId, StormBase stormBase);
+ public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf);
public void updateStorm(String stormId, StormBase newElems);
public void removeStormBase(String stormId);
- public void setAssignment(String stormId, Assignment info);
+ public void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf);
public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
@@ -113,11 +113,13 @@
public void reportError(String stormId, String componentId, String node, Long port, Throwable error);
+ void setupErrors(String stormId, Map<String, Object> topoConf);
+
public List<ErrorInfo> errors(String stormId, String componentId);
public ErrorInfo lastError(String stormId, String componentId);
- public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException;
+ public void setCredentials(String stormId, Credentials creds, Map topoConf);
public Credentials credentials(String stormId, Runnable callback);
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index f9952cf..2d26c2f 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -33,7 +33,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -243,7 +242,6 @@
public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
-
}
@Override
@@ -338,8 +336,9 @@
}
@Override
- public void setupHeatbeats(String stormId) {
- stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+ public void setupHeatbeats(String stormId, Map<String, Object> topoConf) {
+ stateStorage.mkdirs(ClusterUtils.WORKERBEATS_SUBTREE, acls);
+ stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), ClusterUtils.mkTopoReadWriteAcls(topoConf));
}
@Override
@@ -386,8 +385,9 @@
}
@Override
- public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
- stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+ public void setTopologyLogConfig(String stormId, LogConfig logConfig, Map<String, Object> topoConf) {
+ stateStorage.mkdirs(ClusterUtils.LOGCONFIG_SUBTREE, acls);
+ stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
}
@Override
@@ -467,8 +467,9 @@
}
@Override
- public void setupBackpressure(String stormId) {
- stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+ public void setupBackpressure(String stormId, Map<String, Object> topoConf) {
+ stateStorage.mkdirs(ClusterUtils.BACKPRESSURE_SUBTREE, acls);
+ stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), ClusterUtils.mkTopoReadWriteAcls(topoConf));
}
@Override
@@ -495,9 +496,10 @@
}
@Override
- public void activateStorm(String stormId, StormBase stormBase) {
+ public void activateStorm(String stormId, StormBase stormBase, Map<String, Object> topoConf) {
String path = ClusterUtils.stormPath(stormId);
- stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+ stateStorage.mkdirs(ClusterUtils.STORMS_SUBTREE, acls);
+ stateStorage.set_data(path, Utils.serialize(stormBase), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
}
/**
@@ -589,8 +591,9 @@
}
@Override
- public void setAssignment(String stormId, Assignment info) {
- stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
+ public void setAssignment(String stormId, Assignment info, Map<String, Object> topoConf) {
+ stateStorage.mkdirs(ClusterUtils.ASSIGNMENTS_SUBTREE, acls);
+ stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), ClusterUtils.mkTopoReadOnlyAcls(topoConf));
}
@Override
@@ -639,6 +642,12 @@
}
@Override
+ public void setupErrors(String stormId, Map<String, Object> topoConf) {
+ stateStorage.mkdirs(ClusterUtils.ERRORS_SUBTREE, acls);
+ stateStorage.mkdirs(ClusterUtils.errorStormRoot(stormId), ClusterUtils.mkTopoReadWriteAcls(topoConf));
+ }
+
+ @Override
public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
String path = ClusterUtils.errorPath(stormId, componentId);
@@ -708,11 +717,10 @@
}
@Override
- public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
- List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+ public void setCredentials(String stormId, Credentials creds, Map topoConf) {
+ List<ACL> aclList = ClusterUtils.mkTopoReadOnlyAcls(topoConf);
String path = ClusterUtils.credentialsPath(stormId);
stateStorage.set_data(path, Utils.serialize(creds), aclList);
-
}
@Override
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index e337b1f..af15dd9 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -80,15 +80,15 @@
this.isNimbus = true;
// just mkdir STORM_ZOOKEEPER_ROOT dir
- CuratorFramework zkTemp = mkZk();
+ CuratorFramework zkTemp = mkZk(acls);
String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
Zookeeper.mkdirs(zkTemp, rootPath, acls);
zkTemp.close();
active = new AtomicBoolean(true);
- zkWriter = mkZk(new ZkWatcherCallBack());
+ zkWriter = mkZk(acls, new ZkWatcherCallBack());
if (isNimbus) {
- zkReader = mkZk(new ZkWatcherCallBack());
+ zkReader = mkZk(acls, new ZkWatcherCallBack());
} else {
zkReader = zkWriter;
}
@@ -96,15 +96,15 @@
}
@SuppressWarnings("unchecked")
- private CuratorFramework mkZk() throws IOException {
+ private CuratorFramework mkZk(List<ACL> acls) throws IOException {
return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "",
- new DefaultWatcherCallBack(), authConf);
+ new DefaultWatcherCallBack(), authConf, acls);
}
@SuppressWarnings("unchecked")
- private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+ private CuratorFramework mkZk(List<ACL> acls, WatcherCallBack watcher) throws NumberFormatException, IOException {
return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
- String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
+ String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf, acls);
}
@Override
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index ef4b54d..31d1130 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -312,9 +312,6 @@
}
static List<ACL> supervisorZkAcls() {
- final List<ACL> acls = new ArrayList<>();
- acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0));
- acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE));
- return acls;
+ return ZooDefs.Ids.CREATOR_ALL_ACL;
}
}
diff --git a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
index bec5f66..ba86c93 100644
--- a/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
+++ b/storm-core/src/jvm/org/apache/storm/transactional/state/TransactionalState.java
@@ -66,7 +66,7 @@
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
- CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth);
+ CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth, null);
_zkAcls = Utils.getWorkerACL(conf);
try {
TransactionalState.createNode(initter, transactionalRoot, null, null, null);
@@ -78,7 +78,7 @@
}
initter.close();
- _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
+ _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth, null);
_ser = new KryoValuesSerializer(conf);
_des = new KryoValuesDeserializer(conf);
} catch (Exception e) {
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index b5546c9..503943d 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -65,7 +65,7 @@
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
ZookeeperAuthInfo auth = new ZookeeperAuthInfo(conf);
- CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth);
+ CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port, auth, null);
_zkAcls = Utils.getWorkerACL(conf);
try {
TransactionalState.createNode(initter, transactionalRoot, null, null, null);
@@ -77,7 +77,7 @@
}
initter.close();
- _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
+ _curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth, null);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 7032f71..a14d56f 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -32,6 +32,7 @@
import org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient;
import org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider;
import org.apache.curator.ensemble.exhibitor.Exhibitors;
+import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.Config;
@@ -1077,15 +1078,16 @@
return false;
}
- public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
- return newCurator(conf, servers, port, root, null);
+ public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, List<ACL> defaultAcl) {
+ return newCurator(conf, servers, port, root, null, defaultAcl);
}
- public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
- return newCurator(conf, servers, port, "", auth);
+ public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth, List<ACL> defaultAcl) {
+ return newCurator(conf, servers, port, "", auth, defaultAcl);
}
- public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
+ public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth,
+ final List<ACL> defaultAcl) {
List<String> serverPorts = new ArrayList<String>();
for (String zkServer : servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
@@ -1094,6 +1096,19 @@
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
setupBuilder(builder, zkStr, conf, auth);
+ if (defaultAcl != null) {
+ builder.aclProvider(new ACLProvider() {
+ @Override
+ public List<ACL> getDefaultAcl() {
+ return defaultAcl;
+ }
+
+ @Override
+ public List<ACL> getAclForPath(String s) {
+ return null;
+ }
+ });
+ }
return builder.build();
}
@@ -1140,14 +1155,14 @@
setupBuilder(builder, zkStr, conf, auth);
}
- public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
- CuratorFramework ret = newCurator(conf, servers, port, root, auth);
+ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth, List<ACL> defaultAcl) {
+ CuratorFramework ret = newCurator(conf, servers, port, root, auth, defaultAcl);
ret.start();
return ret;
}
- public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
- CuratorFramework ret = newCurator(conf, servers, port, auth);
+ public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth, List<ACL> defaultAcl) {
+ CuratorFramework ret = newCurator(conf, servers, port, auth, defaultAcl);
ret.start();
return ret;
}
@@ -1223,23 +1238,39 @@
&& !((String)conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
-
- public static List<ACL> getWorkerACL(Map conf) {
- //This is a work around to an issue with ZK where a sasl super user is not super unless there is an open SASL ACL so we are trying to give the correct perms
- if (!isZkAuthenticationConfiguredTopology(conf)) {
- return null;
+ public static Id parseZkId(String id, String configName) {
+ String[] split = id.split(":", 2);
+ if (split.length != 2) {
+ throw new IllegalArgumentException(configName + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
+ return new Id(split[0], split[1]);
+ }
+
+ /**
+ * Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled.
+ * @param conf the config to get the super User ACL from
+ * @return the super user ACL.
+ */
+ public static ACL getSuperUserAcl(Map<String, Object> conf) {
String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
}
- String[] split = stormZKUser.split(":", 2);
- if (split.length != 2) {
- throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+ return new ACL(ZooDefs.Perms.ALL, parseZkId(stormZKUser, Config.STORM_ZOOKEEPER_SUPERACL));
+ }
+
+ /**
+ * Get the ZK ACLs that a worker should use when writing to ZK.
+ * @param conf the config for the topology.
+ * @return the ACLs
+ */
+ public static List<ACL> getWorkerACL(Map<String, Object> conf) {
+ if (!isZkAuthenticationConfiguredTopology(conf)) {
+ return null;
}
ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
- ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
- return ret;
+ ret.add(getSuperUserAcl(conf));
+ return ret;
}
/**
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java b/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java
new file mode 100644
index 0000000..b39db99
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/AclEnforcement.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.zookeeper;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.security.auth.Subject;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.security.auth.NimbusPrincipal;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is code intended to enforce ZK ACLs.
+ */
+public class AclEnforcement {
+ private static final Logger LOG = LoggerFactory.getLogger(AclEnforcement.class);
+
+ /**
+ * Verify the ZK ACLs are correct and optionally fix them if needed.
+ * @param conf the cluster config.
+ * @param fixUp true if we want to fix the ACLs else false.
+ * @throws Exception on any error.
+ */
+ public static void verifyAcls(Map<String, Object> conf, final boolean fixUp) throws Exception {
+ if (!Utils.isZkAuthenticationConfiguredStormServer(conf)) {
+ LOG.info("SECURITY IS DISABLED NO FURTHER CHECKS...");
+ //There is no security so we are done.
+ return;
+ }
+ ACL superUserAcl = Utils.getSuperUserAcl(conf);
+ List<ACL> superAcl = new ArrayList<>(1);
+ superAcl.add(superUserAcl);
+
+ List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+ int port = Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_PORT));
+ String stormRoot = (String) conf.get(Config.STORM_ZOOKEEPER_ROOT);
+
+ try (CuratorFramework zk = Zookeeper.mkClient(conf, zkServers, port, "",
+ new DefaultWatcherCallBack(), conf, superAcl)) {
+ if (zk.checkExists().forPath(stormRoot) != null) {
+ //First off we want to verify that ROOT is good
+ verifyAclStrict(zk, superAcl, stormRoot, fixUp);
+ } else {
+ LOG.warn("{} does not exist no need to check any more...", stormRoot);
+ return;
+ }
+ }
+
+ // Now that the root is fine we can start to look at the other paths under it.
+ try (CuratorFramework zk = Zookeeper.mkClient(conf, zkServers, port, stormRoot,
+ new DefaultWatcherCallBack(), conf, superAcl)) {
+ //Next verify that the blob store is correct before we start it up.
+ if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_SUBTREE) != null) {
+ verifyAclStrictRecursive(zk, superAcl, ClusterUtils.BLOBSTORE_SUBTREE, fixUp);
+ }
+
+ if (zk.checkExists().forPath(ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE) != null) {
+ verifyAclStrict(zk, superAcl, ClusterUtils.BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE, fixUp);
+ }
+
+ //The blobstore is good, now lets get the list of all topo Ids
+ Set<String> topoIds = new HashSet<>();
+ if (zk.checkExists().forPath(ClusterUtils.STORMS_SUBTREE) != null) {
+ topoIds.addAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE));
+ }
+
+ Map<String, Id> topoToZkCreds = new HashMap<>();
+ //Now lets get the creds for the topos so we can verify those as well.
+ BlobStore bs = Utils.getNimbusBlobStore(conf, NimbusInfo.fromConf(conf));
+ try {
+ Subject nimbusSubject = new Subject();
+ nimbusSubject.getPrincipals().add(new NimbusPrincipal());
+ for (String topoId: topoIds) {
+ try {
+ String blobKey = topoId + "-stormconf.ser";
+ Map<String, Object> topoConf = Utils.fromCompressedJsonConf(bs.readBlob(blobKey, nimbusSubject));
+ String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+ try {
+ topoToZkCreds.put(topoId, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException(e);
+ }
+ } catch (KeyNotFoundException knf) {
+ LOG.debug("topo removed {}", topoId, knf);
+ }
+ }
+ } finally {
+ if (bs != null) {
+ bs.shutdown();
+ }
+ }
+
+ verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, ClusterUtils.STORMS_SUBTREE, topoToZkCreds, fixUp);
+ verifyParentWithReadOnlyTopoChildren(zk, superUserAcl, ClusterUtils.ASSIGNMENTS_SUBTREE, topoToZkCreds, fixUp);
+ //There is a race on credentials where they can be leaked in some versions of storm.
+ verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.CREDENTIALS_SUBTREE, topoToZkCreds, fixUp);
+ //There is a race on logconfig where they can be leaked in some versions of storm.
+ verifyParentWithReadOnlyTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.LOGCONFIG_SUBTREE, topoToZkCreds, fixUp);
+ //There is a race on backpressure too...
+ verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.BACKPRESSURE_SUBTREE, topoToZkCreds, fixUp);
+
+ if (zk.checkExists().forPath(ClusterUtils.ERRORS_SUBTREE) != null) {
+ //errors is a bit special because in older versions of storm the worker created the parent directories lazily
+ // because of this it means we need to auto create at least the topo-id directory for all running topos.
+ for (String topoId : topoToZkCreds.keySet()) {
+ String path = ClusterUtils.errorStormRoot(topoId);
+ if (zk.checkExists().forPath(path) == null) {
+ LOG.warn("Creating missing errors location {}", path);
+ zk.create().withACL(getTopoReadWrite(path, topoId, topoToZkCreds, superUserAcl, fixUp)).forPath(path);
+ }
+ }
+ }
+ //Error should not be leaked according to the code, but they are not important enough to fail the build if
+ // for some odd reason they are leaked.
+ verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.ERRORS_SUBTREE, topoToZkCreds, fixUp);
+
+ if (zk.checkExists().forPath(ClusterUtils.NIMBUSES_SUBTREE) != null) {
+ verifyAclStrictRecursive(zk, superAcl, ClusterUtils.NIMBUSES_SUBTREE, fixUp);
+ }
+
+ if (zk.checkExists().forPath("/leader-lock") != null) {
+ verifyAclStrictRecursive(zk, superAcl, "/leader-lock", fixUp);
+ }
+
+ if (zk.checkExists().forPath(ClusterUtils.PROFILERCONFIG_SUBTREE) != null) {
+ verifyAclStrictRecursive(zk, superAcl, ClusterUtils.PROFILERCONFIG_SUBTREE, fixUp);
+ }
+
+ if (zk.checkExists().forPath(ClusterUtils.SUPERVISORS_SUBTREE) != null) {
+ verifyAclStrictRecursive(zk, superAcl, ClusterUtils.SUPERVISORS_SUBTREE, fixUp);
+ }
+
+ // When moving to pacemaker workerbeats can be leaked too...
+ verifyParentWithReadWriteTopoChildrenDeleteDead(zk, superUserAcl, ClusterUtils.WORKERBEATS_SUBTREE, topoToZkCreds, fixUp);
+ }
+ }
+
+ private static List<ACL> getTopoAcl(String path, String topoId, Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp, int perms) {
+ Id id = topoToZkCreds.get(topoId);
+ if (id == null) {
+ String error = "Could not find credentials for topology " + topoId + " at path " + path + ".";
+ if (fixUp) {
+ error += " Don't know how to fix this automatically. Please add needed ACLs, or delete the path.";
+ }
+ throw new IllegalStateException(error);
+ }
+ List<ACL> ret = new ArrayList<>(2);
+ ret.add(superAcl);
+ ret.add(new ACL(perms, id));
+ return ret;
+ }
+
+ private static List<ACL> getTopoReadWrite(String path, String topoId, Map<String, Id> topoToZkCreds, ACL superAcl, boolean fixUp) {
+ return getTopoAcl(path, topoId, topoToZkCreds, superAcl, fixUp, ZooDefs.Perms.ALL);
+ }
+
+ private static void verifyParentWithTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path,
+ Map<String, Id> topoToZkCreds, boolean fixUp, int perms) throws Exception {
+ if (zk.checkExists().forPath(path) != null) {
+ verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp);
+ Set<String> possiblyBadIds = new HashSet<>();
+ for (String topoId : zk.getChildren().forPath(path)) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId;
+ if (!topoToZkCreds.containsKey(topoId)) {
+ //Save it to try again later...
+ possiblyBadIds.add(topoId);
+ } else {
+ List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, superUserAcl, fixUp, perms);
+ verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp);
+ }
+ }
+
+ if (!possiblyBadIds.isEmpty()) {
+ //Lets reread the children in STORMS as the source of truth and see if a new one was created in the background
+ possiblyBadIds.removeAll(zk.getChildren().forPath(ClusterUtils.STORMS_SUBTREE));
+ for (String topoId: possiblyBadIds) {
+ //Now we know for sure that this is a bad id
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId;
+ zk.delete().deletingChildrenIfNeeded().forPath(childPath);
+ }
+ }
+ }
+ }
+
+ private static void verifyParentWithReadOnlyTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path,
+ Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception {
+ verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.READ);
+ }
+
+ private static void verifyParentWithReadWriteTopoChildrenDeleteDead(CuratorFramework zk, ACL superUserAcl, String path,
+ Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception {
+ verifyParentWithTopoChildrenDeleteDead(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.ALL);
+ }
+
+ private static void verifyParentWithTopoChildren(CuratorFramework zk, ACL superUserAcl, String path,
+ Map<String, Id> topoToZkCreds, boolean fixUp, int perms) throws Exception {
+ if (zk.checkExists().forPath(path) != null) {
+ verifyAclStrict(zk, Arrays.asList(superUserAcl), path, fixUp);
+ for (String topoId : zk.getChildren().forPath(path)) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + topoId;
+ List<ACL> rwAcl = getTopoAcl(path, topoId, topoToZkCreds, superUserAcl, fixUp, perms);
+ verifyAclStrictRecursive(zk, rwAcl, childPath, fixUp);
+ }
+ }
+ }
+
+ private static void verifyParentWithReadOnlyTopoChildren(CuratorFramework zk, ACL superUserAcl, String path,
+ Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception {
+ verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.READ);
+ }
+
+ private static void verifyParentWithReadWriteTopoChildren(CuratorFramework zk, ACL superUserAcl, String path,
+ Map<String, Id> topoToZkCreds, boolean fixUp) throws Exception {
+ verifyParentWithTopoChildren(zk, superUserAcl, path, topoToZkCreds, fixUp, ZooDefs.Perms.ALL);
+ }
+
+ private static void verifyAclStrictRecursive(CuratorFramework zk, List<ACL> strictAcl, String path, boolean fixUp) throws Exception {
+ verifyAclStrict(zk, strictAcl, path, fixUp);
+ for (String child: zk.getChildren().forPath(path)) {
+ String newPath = path + ClusterUtils.ZK_SEPERATOR + child;
+ verifyAclStrictRecursive(zk, strictAcl, newPath, fixUp);
+ }
+ }
+
+ private static void verifyAclStrict(CuratorFramework zk, List<ACL> strictAcl, String path, boolean fixUp) throws Exception {
+ try {
+ List<ACL> foundAcl = zk.getACL().forPath(path);
+ if (!equivalent(foundAcl, strictAcl)) {
+ if (fixUp) {
+ LOG.warn("{} expected to have ACL {}, but has {}. Fixing...", path, strictAcl, foundAcl);
+ zk.setACL().withACL(strictAcl).forPath(path);
+ } else {
+ throw new IllegalStateException(path + " did not have the correct ACL found " + foundAcl + " expected " + strictAcl);
+ }
+ }
+ } catch (KeeperException.NoNodeException ne) {
+ LOG.debug("{} removed in the middle of checking it", ne);
+ }
+ }
+
+ private static boolean equivalent(List<ACL> a, List<ACL> b) {
+ if (a.size() == b.size()) {
+ for (ACL aAcl: a) {
+ if (!b.contains(aAcl)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public static void main(String [] args) throws Exception {
+ Map<String, Object> conf = Utils.readStormConfig();
+ boolean fixUp = false;
+ for (String arg: args) {
+ String a = arg.toLowerCase();
+ if ("-f".equals(a) || "--fixup".equals(a)) {
+ fixUp = true;
+ } else {
+ throw new IllegalArgumentException("Unsupported argument " + arg + " only -f or --fixup is supported.");
+ }
+ }
+ verifyAcls(conf, fixUp);
+ }
+}
\ No newline at end of file
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 0580f41..92f08fa 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -36,6 +36,7 @@
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
@@ -44,6 +45,7 @@
import org.apache.storm.utils.ZookeeperAuthInfo;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
@@ -89,28 +91,28 @@
_instance = INSTANCE;
}
- public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root) {
- return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack());
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, List<ACL> defaultAcl) {
+ return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), null, defaultAcl);
}
- public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf) {
- return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf, List<ACL> defaultAcl) {
+ return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf, defaultAcl);
}
- public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf) {
- return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf, List<ACL> defaultAcl) {
+ return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf, defaultAcl);
}
- public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
- return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf);
+ public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf, List<ACL> defaultAcl) {
+ return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf, defaultAcl);
}
- public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf, List<ACL> defaultAcl) {
CuratorFramework fk;
if (authConf != null) {
- fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf));
+ fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf), defaultAcl);
} else {
- fk = Utils.newCurator(conf, servers, port, root);
+ fk = Utils.newCurator(conf, servers, port, root, defaultAcl);
}
fk.getCuratorListenable().addListener(new CuratorListener() {
@@ -131,8 +133,9 @@
*
* @return
*/
- public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {
- return mkClientImpl(conf, servers, port, root, watcher, null);
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher,
+ List<ACL> defaultAcl) {
+ return mkClientImpl(conf, servers, port, root, watcher, null,defaultAcl);
}
public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) {
@@ -372,14 +375,14 @@
};
}
- public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
- return _instance.zkLeaderElectorImpl(conf, blobStore);
+ public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore, List<ACL> defaultAcl) throws UnknownHostException {
+ return _instance.zkLeaderElectorImpl(conf, blobStore, defaultAcl);
}
- protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
+ protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore, List<ACL> defaultAcl) throws UnknownHostException {
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
- CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
+ CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf, defaultAcl);
String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
String id = NimbusInfo.fromConf(conf).toHostPortString();
AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index 55b686e..77c371d 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -180,21 +180,21 @@
base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil {} "")
base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil {} "")]
(is (= [] (.assignments state nil)))
- (.set-assignment! state "storm1" assignment1)
+ (.set-assignment! state "storm1" assignment1 {})
(is (= assignment1 (.assignment-info state "storm1" nil)))
(is (= nil (.assignment-info state "storm3" nil)))
- (.set-assignment! state "storm1" assignment2)
- (.set-assignment! state "storm3" assignment1)
+ (.set-assignment! state "storm1" assignment2 {})
+ (.set-assignment! state "storm3" assignment1 {})
(is (= #{"storm1" "storm3"} (set (.assignments state nil))))
(is (= assignment2 (.assignment-info state "storm1" nil)))
(is (= assignment1 (.assignment-info state "storm3" nil)))
(is (= [] (.active-storms state)))
- (.activate-storm! state "storm1" base1)
+ (.activate-storm! state "storm1" base1 {})
(is (= ["storm1"] (.active-storms state)))
(is (= base1 (.storm-base state "storm1" nil)))
(is (= nil (.storm-base state "storm2" nil)))
- (.activate-storm! state "storm2" base2)
+ (.activate-storm! state "storm2" base2 {})
(is (= base1 (.storm-base state "storm1" nil)))
(is (= base2 (.storm-base state "storm2" nil)))
(is (= #{"storm1" "storm2"} (set (.active-storms state))))
diff --git a/storm-core/test/clj/org/apache/storm/utils_test.clj b/storm-core/test/clj/org/apache/storm/utils_test.clj
index b59967a..0d1d67d 100644
--- a/storm-core/test/clj/org/apache/storm/utils_test.clj
+++ b/storm-core/test/clj/org/apache/storm/utils_test.clj
@@ -32,7 +32,7 @@
Config/STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING expected_ceiling})
servers ["bogus_server"]
arbitrary_port 42
- curator (Utils/newCurator conf servers arbitrary_port nil)
+ curator (Utils/newCurator conf servers arbitrary_port nil nil)
retry (-> curator .getZookeeperClient .getRetryPolicy)
]
(is (.isAssignableFrom ExponentialBackoffRetry (.getClass retry)))