;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements.  See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership.  The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License.  You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.

(ns backtype.storm.daemon.drpc
  (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
  (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
  (:import [org.apache.thrift.exception])
  (:import [org.apache.thrift.transport
            TNonblockingServerTransport TNonblockingServerSocket])
  (:import [backtype.storm.generated
            DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
            DRPCRequest DRPCExecutionException
            DistributedRPCInvocations DistributedRPCInvocations$Iface
            DistributedRPCInvocations$Processor])
  (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue
            ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
  (:import [backtype.storm.daemon Shutdownable])
  (:import [java.net InetAddress])
  (:use [backtype.storm bootstrap config log])
  (:gen-class))

(bootstrap)

(defn timeout-check-secs [] 5)

(defn acquire-queue [queues-atom function]
  (swap! queues-atom
    (fn [amap]
      (if-not (amap function)
        (assoc amap function (ConcurrentLinkedQueue.))
        amap)))
  (@queues-atom function))

;; TODO: change this to use TimeCacheMap
(defn service-handler []
  (let [conf (read-storm-config)
        ctr (atom 0)
        id->sem (atom {})
        id->result (atom {})
        id->start (atom {})
        id->function (atom {})
        id->request (atom {})
        request-queues (atom {})
        cleanup (fn [id] (swap! id->sem dissoc id)
                  (swap! id->result dissoc id)
                  (swap! id->function dissoc id)
                  (swap! id->request dissoc id)
                  (swap! id->start dissoc id))
        my-ip (.getHostAddress (InetAddress/getLocalHost))
        clear-thread (async-loop
                       (fn []
                         (doseq [[id start] @id->start]
                           (when (> (time-delta start) (conf DRPC-REQUEST-TIMEOUT-SECS))
                             (when-let [sem (@id->sem id)]
                               (.remove (acquire-queue request-queues (@id->function id)) (@id->request id))
                               (log-warn "Timeout DRPC request id: " id " start at " start)
                               (.release sem))
                             (cleanup id)))
                         (timeout-check-secs)))]
    (reify DistributedRPC$Iface

      (^String execute
        [this ^String function ^String args]
        (log-debug "Received DRPC request for " function " " args " at " (System/currentTimeMillis))
        (let [id (str (swap! ctr (fn [v] (mod (inc v) 1000000000))))
              ^Semaphore sem (Semaphore. 0)
              req (DRPCRequest. args id)
              ^ConcurrentLinkedQueue queue (acquire-queue request-queues function)]
          (swap! id->start assoc id (current-time-secs))
          (swap! id->sem assoc id sem)
          (swap! id->function assoc id function)
          (swap! id->request assoc id req)
          (.add queue req)
          (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
          (.acquire sem)
          (log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis))
          (let [result (@id->result id)]
            (cleanup id)
            (log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis))
            (if (instance? DRPCExecutionException result)
              (throw result)
              (if (nil? result)
                (throw (DRPCExecutionException. "Request timed out"))
                result)))))

      DistributedRPCInvocations$Iface

      (^void result
        [this ^String id ^String result]
        (let [^Semaphore sem (@id->sem id)]
          (log-debug "Received result " result " for " id " at " (System/currentTimeMillis))
          (when sem
            (swap! id->result assoc id result)
            (.release sem))))

      (^void failRequest
        [this ^String id]
        (let [^Semaphore sem (@id->sem id)]
          (when sem
            (swap! id->result assoc id (DRPCExecutionException. "Request failed"))
            (.release sem))))

      (^DRPCRequest fetchRequest
        [this ^String func]
        (let [^ConcurrentLinkedQueue queue (acquire-queue request-queues func)
              ret (.poll queue)]
          (if ret
            (do (log-debug "Fetched request for " func " at " (System/currentTimeMillis))
              ret)
            (DRPCRequest. "" ""))))

      Shutdownable

      (shutdown
        [this]
        (.interrupt clear-thread)))))

(defn launch-server!
  ([]
    (let [conf (read-storm-config)
          worker-threads (int (conf DRPC-WORKER-THREADS))
          queue-size (int (conf DRPC-QUEUE-SIZE))
          service-handler (service-handler)

          ;; Requests and returns need to be on separate thread pools, since
          ;; calls to "execute" don't unblock until other thrift methods are
          ;; called. So if 64 threads are calling execute, the server won't
          ;; accept the result invocations that will unblock those threads.

          handler-server
          (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
                            (THsHaServer$Args.)
                            (.workerThreads 64)
                            (.executorService
                              (ThreadPoolExecutor.
                                worker-threads worker-threads 60 TimeUnit/SECONDS
                                (ArrayBlockingQueue. queue-size)))
                            (.protocolFactory (TBinaryProtocol$Factory.))
                            (.processor (DistributedRPC$Processor. service-handler))))

          invoke-server
          (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-INVOCATIONS-PORT)))
                            (THsHaServer$Args.)
                            (.workerThreads 64)
                            (.protocolFactory (TBinaryProtocol$Factory.))
                            (.processor
                              (DistributedRPCInvocations$Processor. service-handler))))]
      (add-shutdown-hook-with-force-kill-in-1-sec (fn []
                                                    (.stop handler-server)
                                                    (.stop invoke-server)))
      (log-message "Starting Distributed RPC servers...")
      (future (.serve invoke-server))
      (.serve handler-server))))

(defn -main []
  (launch-server!))
