| ;; Copyright (c) Rich Hickey and contributors. All rights reserved. |
| ;; The use and distribution terms for this software are covered by the |
| ;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) |
| ;; which can be found in the file epl-v10.html at the root of this distribution. |
| ;; By using this software in any fashion, you are agreeing to be bound by |
| ;; the terms of this license. |
| ;; You must not remove this notice, or any other, from this software. |
| |
| (ns cljs.core.async.impl.channels |
| (:require [cljs.core.async.impl.protocols :as impl] |
| [cljs.core.async.impl.dispatch :as dispatch] |
| [cljs.core.async.impl.buffers :as buffers])) |
| |
| |
| |
| (defn box [val] |
| (reify cljs.core/IDeref |
| (-deref [_] val))) |
| |
| (deftype PutBox [handler val]) |
| |
| (defn put-active? [box] |
| (impl/active? (.-handler box))) |
| |
| (def ^:const MAX_DIRTY 64) |
| |
| (defprotocol MMC |
| (abort [this])) |
| |
| (deftype ManyToManyChannel [takes ^:mutable dirty-takes puts ^:mutable dirty-puts ^not-native buf ^:mutable closed add!] |
| MMC |
| (abort [this] |
| (loop [] |
| (let [putter (.pop puts)] |
| (when-not (nil? putter) |
| (let [^not-native put-handler (.-handler putter) |
| val (.-val putter)] |
| (if ^boolean (impl/active? put-handler) |
| (let [put-cb (impl/commit put-handler)] |
| (dispatch/run #(put-cb true))) |
| (recur)))))) |
| (.cleanup puts (constantly false)) |
| (impl/close! this)) |
| impl/WritePort |
| (put! [this val ^not-native handler] |
| (assert (not (nil? val)) "Can't put nil on a channel") |
| ;; bug in CLJS compiler boolean inference - David |
| (let [^boolean closed closed] |
| (if (or closed (not ^boolean (impl/active? handler))) |
| (box (not closed)) |
| (if (and buf (not (impl/full? buf))) |
| (do |
| (impl/commit handler) |
| (let [done? (reduced? (add! buf val)) |
| take-cbs (loop [takers []] |
| (if (and (pos? (.-length takes)) (pos? (count buf))) |
| (let [^not-native taker (.pop takes)] |
| (if ^boolean (impl/active? taker) |
| (let [ret (impl/commit taker) |
| val (impl/remove! buf)] |
| (recur (conj takers (fn [] (ret val))))) |
| (recur takers))) |
| takers))] |
| (when done? (abort this)) |
| (when (seq take-cbs) |
| (doseq [f take-cbs] |
| (dispatch/run f))) |
| (box true))) |
| (let [taker (loop [] |
| (let [^not-native taker (.pop takes)] |
| (when taker |
| (if (impl/active? taker) |
| taker |
| (recur)))))] |
| (if taker |
| (let [take-cb (impl/commit taker)] |
| (impl/commit handler) |
| (dispatch/run (fn [] (take-cb val))) |
| (box true)) |
| (do |
| (if (> dirty-puts MAX_DIRTY) |
| (do (set! dirty-puts 0) |
| (.cleanup puts put-active?)) |
| (set! dirty-puts (inc dirty-puts))) |
| (when (impl/blockable? handler) |
| (assert (< (.-length puts) impl/MAX-QUEUE-SIZE) |
| (str "No more than " impl/MAX-QUEUE-SIZE |
| " pending puts are allowed on a single channel." |
| " Consider using a windowed buffer.")) |
| (.unbounded-unshift puts (PutBox. handler val))) |
| nil))))))) |
| impl/ReadPort |
| (take! [this ^not-native handler] |
| (if (not ^boolean (impl/active? handler)) |
| nil |
| (if (and (not (nil? buf)) (pos? (count buf))) |
| (do |
| (if-let [take-cb (impl/commit handler)] |
| (let [val (impl/remove! buf) |
| [done? cbs] (when (pos? (.-length puts)) |
| (loop [cbs []] |
| (let [putter (.pop puts) |
| ^not-native put-handler (.-handler putter) |
| val (.-val putter) |
| cb (and ^boolean (impl/active? put-handler) (impl/commit put-handler)) |
| cbs (if cb (conj cbs cb) cbs) |
| done? (when cb (reduced? (add! buf val)))] |
| (if (and (not done?) (not (impl/full? buf)) (pos? (.-length puts))) |
| (recur cbs) |
| [done? cbs]))))] |
| (when done? |
| (abort this)) |
| (doseq [cb cbs] |
| (dispatch/run #(cb true))) |
| (box val)))) |
| (let [putter (loop [] |
| (let [putter (.pop puts)] |
| (when putter |
| (if ^boolean (impl/active? (.-handler putter)) |
| putter |
| (recur)))))] |
| (if putter |
| (let [put-cb (impl/commit (.-handler putter))] |
| (impl/commit handler) |
| (dispatch/run #(put-cb true)) |
| (box (.-val putter))) |
| (if closed |
| (do |
| (when buf (add! buf)) |
| (if (and (impl/active? handler) (impl/commit handler)) |
| (let [has-val (and buf (pos? (count buf)))] |
| (let [val (when has-val (impl/remove! buf))] |
| (box val))) |
| nil)) |
| (do |
| (if (> dirty-takes MAX_DIRTY) |
| (do (set! dirty-takes 0) |
| (.cleanup takes impl/active?)) |
| (set! dirty-takes (inc dirty-takes))) |
| (when (impl/blockable? handler) |
| (assert (< (.-length takes) impl/MAX-QUEUE-SIZE) |
| (str "No more than " impl/MAX-QUEUE-SIZE |
| " pending takes are allowed on a single channel.")) |
| (.unbounded-unshift takes handler)) |
| nil))))))) |
| impl/Channel |
| (closed? [_] closed) |
| (close! [this] |
| (if ^boolean closed |
| nil |
| (do (set! closed true) |
| (when (and buf (zero? (.-length puts))) |
| (add! buf)) |
| (loop [] |
| (let [^not-native taker (.pop takes)] |
| (when-not (nil? taker) |
| (when ^boolean (impl/active? taker) |
| (let [take-cb (impl/commit taker) |
| val (when (and buf (pos? (count buf))) (impl/remove! buf))] |
| (dispatch/run (fn [] (take-cb val))))) |
| (recur)))) |
| (when buf (impl/close-buf! buf)) |
| nil)))) |
| |
| (defn- ex-handler [ex] |
| (.log js/console ex) |
| nil) |
| |
| (defn- handle [buf exh t] |
| (let [else ((or exh ex-handler) t)] |
| (if (nil? else) |
| buf |
| (impl/add! buf else)))) |
| |
| (defn chan |
| ([buf] (chan buf nil)) |
| ([buf xform] (chan buf xform nil)) |
| ([buf xform exh] |
| (ManyToManyChannel. (buffers/ring-buffer 32) 0 (buffers/ring-buffer 32) |
| 0 buf false |
| (let [add! (if xform (xform impl/add!) impl/add!)] |
| (fn |
| ([buf] |
| (try |
| (add! buf) |
| (catch :default t |
| (handle buf exh t)))) |
| ([buf val] |
| (try |
| (add! buf val) |
| (catch :default t |
| (handle buf exh t))))))))) |