blob: 40744fb480a42b112a517600ec065c846356999d [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.daemon.drpc
(:import [backtype.storm.security.auth AuthUtils ThriftServer ThriftConnectionType ReqContext])
(:import [backtype.storm.security.auth.authorizer DRPCAuthorizerBase])
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
DistributedRPCInvocations$Processor])
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
(:import [backtype.storm.daemon Shutdownable])
(:import [java.net InetAddress])
(:import [backtype.storm.generated AuthorizationException])
(:use [backtype.storm config log util])
(:use [backtype.storm.daemon common])
(:use [backtype.storm.ui helpers])
(:use compojure.core)
(:use ring.middleware.reload)
(:require [compojure.handler :as handler])
(:require [metrics.meters :refer [defmeter mark!]])
(:gen-class))
(defmeter drpc:num-execute-http-requests)
(defmeter drpc:num-execute-calls)
(defmeter drpc:num-result-calls)
(defmeter drpc:num-failRequest-calls)
(defmeter drpc:num-fetchRequest-calls)
(defmeter drpc:num-shutdown-calls)
(defn timeout-check-secs [] 5)
(defn acquire-queue [queues-atom function]
(swap! queues-atom
(fn [amap]
(if-not (amap function)
(assoc amap function (ConcurrentLinkedQueue.))
amap)))
(@queues-atom function))
(defn check-authorization
([aclHandler mapping operation context]
(if (not-nil? context)
(log-thrift-access (.requestID context) (.remoteAddress context) (.principal context) operation))
(if aclHandler
(let [context (or context (ReqContext/context))]
(if-not (.permit aclHandler context operation mapping)
(let [principal (.principal context)
user (if principal (.getName principal) "unknown")]
(throw (AuthorizationException.
(str "DRPC request '" operation "' for '"
user "' user is not authorized"))))))))
([aclHandler mapping operation]
(check-authorization aclHandler mapping operation (ReqContext/context))))
;; TODO: change this to use TimeCacheMap
(defn service-handler [conf]
(let [drpc-acl-handler (mk-authorization-handler (conf DRPC-AUTHORIZER) conf)
ctr (atom 0)
id->sem (atom {})
id->result (atom {})
id->start (atom {})
id->function (atom {})
id->request (atom {})
request-queues (atom {})
cleanup (fn [id] (swap! id->sem dissoc id)
(swap! id->result dissoc id)
(swap! id->function dissoc id)
(swap! id->request dissoc id)
(swap! id->start dissoc id))
my-ip (.getHostAddress (InetAddress/getLocalHost))
clear-thread (async-loop
(fn []
(doseq [[id start] @id->start]
(when (> (time-delta start) (conf DRPC-REQUEST-TIMEOUT-SECS))
(when-let [sem (@id->sem id)]
(.remove (acquire-queue request-queues (@id->function id)) (@id->request id))
(log-warn "Timeout DRPC request id: " id " start at " start)
(.release sem))
(cleanup id)))
(timeout-check-secs)))]
(reify DistributedRPC$Iface
(^String execute
[this ^String function ^String args]
(mark! drpc:num-execute-calls)
(log-debug "Received DRPC request for " function " (" args ") at " (System/currentTimeMillis))
(check-authorization drpc-acl-handler
{DRPCAuthorizerBase/FUNCTION_NAME function}
"execute")
(let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
^Semaphore sem (Semaphore. 0)
req (DRPCRequest. args id)
^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
(swap! id->start assoc id (current-time-secs))
(swap! id->sem assoc id sem)
(swap! id->function assoc id function)
(swap! id->request assoc id req)
(.add queue req)
(log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
(.acquire sem)
(log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis))
(let [result (@id->result id)]
(cleanup id)
(log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis))
(if (instance? DRPCExecutionException result)
(throw result)
(if (nil? result)
(throw (DRPCExecutionException. "Request timed out"))
result)))))
DistributedRPCInvocations$Iface
(^void result
[this ^String id ^String result]
(mark! drpc:num-result-calls)
(when-let [func (@id->function id)]
(check-authorization drpc-acl-handler
{DRPCAuthorizerBase/FUNCTION_NAME func}
"result")
(let [^Semaphore sem (@id->sem id)]
(log-debug "Received result " result " for " id " at " (System/currentTimeMillis))
(when sem
(swap! id->result assoc id result)
(.release sem)
))))
(^void failRequest
[this ^String id]
(mark! drpc:num-failRequest-calls)
(when-let [func (@id->function id)]
(check-authorization drpc-acl-handler
{DRPCAuthorizerBase/FUNCTION_NAME func}
"failRequest")
(let [^Semaphore sem (@id->sem id)]
(when sem
(swap! id->result assoc id (DRPCExecutionException. "Request failed"))
(.release sem)))))
(^DRPCRequest fetchRequest
[this ^String func]
(mark! drpc:num-fetchRequest-calls)
(check-authorization drpc-acl-handler
{DRPCAuthorizerBase/FUNCTION_NAME func}
"fetchRequest")
(let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
ret (.poll queue)]
(if ret
(do (log-debug "Fetched request for " func " at " (System/currentTimeMillis))
ret)
(DRPCRequest. "" ""))))
Shutdownable
(shutdown
[this]
(mark! drpc:num-shutdown-calls)
(.interrupt clear-thread)))))
(defn handle-request [handler]
(fn [request]
(handler request)))
(defn populate-context!
"Populate the Storm RequestContext from an servlet-request. This should be called in each handler"
[http-creds-handler servlet-request]
(when http-creds-handler
(.populateContext http-creds-handler (ReqContext/context) servlet-request)))
(defn webapp [handler http-creds-handler]
(mark! drpc:num-execute-http-requests)
(->
(routes
(POST "/drpc/:func" [:as {:keys [body servlet-request]} func & m]
(let [args (slurp body)]
(populate-context! http-creds-handler servlet-request)
(.execute handler func args)))
(POST "/drpc/:func/" [:as {:keys [body servlet-request]} func & m]
(let [args (slurp body)]
(populate-context! http-creds-handler servlet-request)
(.execute handler func args)))
(GET "/drpc/:func/:args" [:as {:keys [servlet-request]} func args & m]
(populate-context! http-creds-handler servlet-request)
(.execute handler func args))
(GET "/drpc/:func/" [:as {:keys [servlet-request]} func & m]
(populate-context! http-creds-handler servlet-request)
(.execute handler func ""))
(GET "/drpc/:func" [:as {:keys [servlet-request]} func & m]
(populate-context! http-creds-handler servlet-request)
(.execute handler func "")))
(wrap-reload '[backtype.storm.daemon.drpc])
handle-request))
(defn launch-server!
([]
(let [conf (read-storm-config)
worker-threads (int (conf DRPC-WORKER-THREADS))
queue-size (int (conf DRPC-QUEUE-SIZE))
drpc-http-port (int (conf DRPC-HTTP-PORT))
drpc-port (int (conf DRPC-PORT))
drpc-service-handler (service-handler conf)
;; requests and returns need to be on separate thread pools, since calls to
;; "execute" don't unblock until other thrift methods are called. So if
;; 64 threads are calling execute, the server won't accept the result
;; invocations that will unblock those threads
handler-server (when (> drpc-port 0)
(ThriftServer. conf
(DistributedRPC$Processor. drpc-service-handler)
ThriftConnectionType/DRPC))
invoke-server (ThriftServer. conf
(DistributedRPCInvocations$Processor. drpc-service-handler)
ThriftConnectionType/DRPC_INVOCATIONS)
http-creds-handler (AuthUtils/GetDrpcHttpCredentialsPlugin conf)]
(add-shutdown-hook-with-force-kill-in-1-sec (fn []
(if handler-server (.stop handler-server))
(.stop invoke-server)))
(log-message "Starting Distributed RPC servers...")
(future (.serve invoke-server))
(when (> drpc-http-port 0)
(let [app (-> (webapp drpc-service-handler http-creds-handler)
requests-middleware)
filter-class (conf DRPC-HTTP-FILTER)
filter-params (conf DRPC-HTTP-FILTER-PARAMS)
filters-confs [{:filter-class filter-class
:filter-params filter-params}]
https-port (int (conf DRPC-HTTPS-PORT))
https-ks-path (conf DRPC-HTTPS-KEYSTORE-PATH)
https-ks-password (conf DRPC-HTTPS-KEYSTORE-PASSWORD)
https-ks-type (conf DRPC-HTTPS-KEYSTORE-TYPE)
https-key-password (conf DRPC-HTTPS-KEY-PASSWORD)
https-ts-path (conf DRPC-HTTPS-TRUSTSTORE-PATH)
https-ts-password (conf DRPC-HTTPS-TRUSTSTORE-PASSWORD)
https-ts-type (conf DRPC-HTTPS-TRUSTSTORE-TYPE)
https-want-client-auth (conf DRPC-HTTPS-WANT-CLIENT-AUTH)
https-need-client-auth (conf DRPC-HTTPS-NEED-CLIENT-AUTH)]
(storm-run-jetty
{:port drpc-http-port
:configurator (fn [server]
(config-ssl server
https-port
https-ks-path
https-ks-password
https-ks-type
https-key-password
https-ts-path
https-ts-password
https-ts-type
https-need-client-auth
https-want-client-auth)
(config-filter server app filters-confs))})))
(start-metrics-reporters)
(when handler-server
(.serve handler-server)))))
(defn -main []
(setup-default-uncaught-exception-handler)
(launch-server!))