blob: 8b9e453dfb5bb096ef3ff5e8f19bf4a3fc84a6a7 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.security.auth.nimbus-auth-test
(:use [clojure test])
(:import [java.nio ByteBuffer])
(:import [java.util Optional])
(:import [org.apache.storm LocalCluster$Builder DaemonConfig Config])
(:import [org.apache.storm.blobstore BlobStore])
(:import [org.apache.storm.daemon.nimbus TopoCache])
(:import [org.apache.storm.generated NotAliveException StormBase])
(:import [org.apache.storm.security.auth ThriftServer ThriftClient
ReqContext ThriftConnectionType])
(:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Processor
AuthorizationException SubmitOptions TopologyInitialStatus KillOptions])
(:import [org.apache.storm.utils ConfigUtils NimbusClient Utils])
(:import [org.apache.storm.cluster IStormClusterState])
(:import [org.mockito Mockito Matchers])
(:use [org.apache.storm util config daemon-config log])
(:require [conjure.core])
(:use [conjure core]))
;; 3 seconds in milliseconds
;; This is plenty of time for a thrift client to respond.
(def nimbus-timeout (Integer. (* 3 1000)))
(defn to-conf [nimbus-port login-cfg aznClass transportPluginClass]
(let [conf {NIMBUS-AUTHORIZER aznClass
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
NIMBUS-THRIFT-PORT nimbus-port
STORM-THRIFT-TRANSPORT-PLUGIN transportPluginClass }
conf (if login-cfg (merge conf {"java.security.auth.login.config" login-cfg}) conf)]
conf))
(defmacro with-test-cluster [[cluster-sym & args] & body]
`(let [conf# (to-conf ~@args)]
(with-open [~cluster-sym (.build (doto (LocalCluster$Builder. )
(.withNimbusDaemon)
(.withDaemonConf conf#)
(.withSupervisors 0)
(.withPortsPerSupervisor 0)))]
~@body)))
(deftest Simple-authentication-test
(with-test-cluster [cluster 0 nil nil "org.apache.storm.security.auth.SimpleTransportPlugin"]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
nimbus_client (.getClient client)]
(testing "(Positive authorization) Simple protocol w/o authentication/authorization enforcement"
(is (thrown-cause? NotAliveException
(.activate nimbus_client "topo-name"))))
(.close client))))
(deftest test-noop-authorization-w-simple-transport
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)
topo-name "topo-name"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/empty))
(with-open [cluster (.build
(doto (LocalCluster$Builder.)
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
NIMBUS-THRIFT-PORT 0
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
nimbus_client (.getClient client)]
(testing "(Positive authorization) Authorization plugin should accept client request"
(is (thrown-cause? NotAliveException
(.activate nimbus_client topo-name))))
(.close client)))))
(deftest test-deny-authorization-w-simple-transport
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)
topo-name "topo-name"
topo-id "topo-name-1"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/of topo-id))
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(with-open [cluster (.build
(doto (LocalCluster$Builder.)
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
NIMBUS-THRIFT-PORT 0
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"})))]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.SimpleTransportPlugin"
Config/NIMBUS_THRIFT_PORT (.getThriftServerPort cluster)
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
nimbus_client (.getClient client)
topologyInitialStatus (TopologyInitialStatus/findByValue 2)
submitOptions (SubmitOptions. topologyInitialStatus)]
(is (thrown-cause? AuthorizationException (.submitTopology nimbus_client topo-name nil nil nil)))
(is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client topo-name nil nil nil submitOptions)))
(is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
(is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
(is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
(is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
(is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
(is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
(is (thrown-cause? AuthorizationException (.killTopology nimbus_client topo-name)))
(is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client topo-name (KillOptions.))))
(is (thrown-cause? AuthorizationException (.activate nimbus_client topo-name)))
(is (thrown-cause? AuthorizationException (.deactivate nimbus_client topo-name)))
(is (thrown-cause? AuthorizationException (.rebalance nimbus_client topo-name nil)))
(is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client topo-id)))
(is (thrown-cause? AuthorizationException (.getTopology nimbus_client topo-id)))
(is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client topo-id)))
(is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client topo-id)))
(.close client)))))
(deftest test-noop-authorization-w-sasl-digest
(with-test-cluster [cluster 0
"test/clj/org/apache/storm/security/auth/jaas_digest.conf"
"org.apache.storm.security.auth.authorizer.NoopAuthorizer"
"org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
Config/NIMBUS_THRIFT_PORT (.getThriftServerPort cluster)
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
nimbus_client (.getClient client)]
(testing "(Positive authorization) Authorization plugin should accept client request"
(is (thrown-cause? NotAliveException
(.activate nimbus_client "topo-name"))))
(.close client))))
(deftest test-deny-authorization-w-sasl-digest
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
tc (Mockito/mock TopoCache)
topo-name "topo-name"
topo-id "topo-name-1"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/of topo-id))
(.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(with-open [cluster (.build
(doto (LocalCluster$Builder.)
(.withClusterState cluster-state)
(.withBlobStore blob-store)
(.withTopoCache tc)
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
SUPERVISOR-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
NIMBUS-THRIFT-PORT 0
"java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"})))]
(let [storm-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{STORM-THRIFT-TRANSPORT-PLUGIN "org.apache.storm.security.auth.digest.DigestSaslTransportPlugin"
"java.security.auth.login.config" "test/clj/org/apache/storm/security/auth/jaas_digest.conf"
Config/NIMBUS_THRIFT_PORT (.getThriftServerPort cluster)
STORM-NIMBUS-RETRY-TIMES 0})
client (NimbusClient. storm-conf "localhost" (.getThriftServerPort cluster) nimbus-timeout)
nimbus_client (.getClient client)
topologyInitialStatus (TopologyInitialStatus/findByValue 2)
submitOptions (SubmitOptions. topologyInitialStatus)]
(is (thrown-cause? AuthorizationException (.submitTopology nimbus_client topo-name nil nil nil)))
(is (thrown-cause? AuthorizationException (.submitTopologyWithOpts nimbus_client topo-name nil nil nil submitOptions)))
(is (thrown-cause? AuthorizationException (.beginFileUpload nimbus_client)))
(is (thrown-cause? AuthorizationException (.uploadChunk nimbus_client nil nil)))
(is (thrown-cause? AuthorizationException (.finishFileUpload nimbus_client nil)))
(is (thrown-cause? AuthorizationException (.downloadChunk nimbus_client nil)))
(is (thrown-cause? AuthorizationException (.getNimbusConf nimbus_client)))
(is (thrown-cause? AuthorizationException (.getClusterInfo nimbus_client)))
(is (thrown-cause? AuthorizationException (.killTopology nimbus_client topo-name)))
(is (thrown-cause? AuthorizationException (.killTopologyWithOpts nimbus_client topo-name (KillOptions.))))
(is (thrown-cause? AuthorizationException (.activate nimbus_client topo-name)))
(is (thrown-cause? AuthorizationException (.deactivate nimbus_client topo-name)))
(is (thrown-cause? AuthorizationException (.rebalance nimbus_client topo-name nil)))
(is (thrown-cause? AuthorizationException (.getTopologyConf nimbus_client topo-id)))
(is (thrown-cause? AuthorizationException (.getTopology nimbus_client topo-id)))
(is (thrown-cause? AuthorizationException (.getUserTopology nimbus_client topo-id)))
(is (thrown-cause? AuthorizationException (.getTopologyInfo nimbus_client topo-id)))
(.close client)))))