| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package groovy |
| |
| import groovy.test.GroovyTestCase |
| |
| import java.util.concurrent.atomic.AtomicInteger |
| import java.util.concurrent.locks.ReentrantLock |
| import java.util.concurrent.LinkedBlockingQueue |
| import java.lang.ref.WeakReference |
| |
| class ActorTest extends GroovyTestCase { |
| void testSync () { |
| new FibCalculator().calcFibSync(15) |
| } |
| |
| void testAsync () { |
| ReentrantLock.metaClass { |
| withLock { Closure c -> |
| lock() |
| try { |
| c.call () |
| } |
| finally { |
| unlock() |
| } |
| } |
| } |
| |
| WorkerThread.metaClass.mixin ReentrantLock |
| Thread.metaClass.mixin WorkerThread |
| |
| WorkerThread.startPool(15) |
| |
| Thread thread = Thread.currentThread() |
| thread.name = "Main App Thread" |
| thread.registerWorker () |
| new FibCalculator().calcFib(18) |
| |
| Object.metaClass = null |
| ReentrantLock.metaClass = null |
| WorkerThread.metaClass = null |
| } |
| } |
| |
| class WorkerThread { |
| static { |
| Object.metaClass { |
| longOperation{ Closure c -> |
| new LongOperation (c) |
| } |
| |
| send{ Closure c -> |
| new LongOperation (c).send () |
| } |
| |
| post{ Closure c -> |
| new LongOperation (c).post () |
| } |
| } |
| } |
| |
| def list = new LinkedList () |
| |
| private static final ReentrantLock globalLock = new ReentrantLock() |
| private static final LinkedBlockingQueue globalQueue = new LinkedBlockingQueue () |
| private static final ArrayList allWorkers = new ArrayList () |
| private static final Random r = new Random(); |
| |
| WorkerThread () { |
| } |
| |
| void registerWorker() { |
| globalLock.withLock { |
| WeakReference ref = new WeakReference(delegate) |
| for (int i = 0; i != allWorkers.size(); ++i) { |
| WeakReference or = allWorkers[i] |
| if (or == null || or.get() == null) { |
| allWorkers[i] = ref |
| return ref |
| } |
| } |
| |
| allWorkers << ref |
| } |
| } |
| |
| void schedule (Closure action) { |
| withLock { |
| list.addFirst action |
| } |
| } |
| |
| void post (Closure action) { |
| globalQueue.offer action |
| } |
| |
| Object[] send (List actions) { |
| int len = actions.size() |
| AtomicInteger counter = new AtomicInteger() |
| def results = new Object[len] |
| actions.eachWithIndex {Closure action, int index -> |
| schedule { |
| Object res = action.call() |
| results[index] = res |
| counter.incrementAndGet() |
| } |
| } |
| |
| while (counter.get() != len) { |
| execute() |
| } |
| return results |
| } |
| |
| void execute () { |
| Closure action = nextTask () |
| |
| if (action) |
| action () |
| } |
| |
| Closure nextTask() { |
| Closure action = withLock { |
| return list.isEmpty() ? null : (Closure)list.removeFirst() |
| } |
| |
| if (action != null) |
| return action |
| |
| action = globalQueue.poll() |
| if (action != null) |
| return action |
| |
| globalLock.withLock { |
| int len = allWorkers.size() |
| if (len == 1) |
| return null |
| |
| int from = r.nextInt(len) |
| for (int i = from+1; i != from; ++i) { |
| if (i == len) { |
| i = 0 |
| if (from == 0) { |
| return null |
| } |
| } |
| |
| def worker = allWorkers[i] |
| if (worker) { |
| worker = worker.get () |
| if (!worker) { |
| allWorkers [i] = null |
| continue |
| } |
| |
| action = worker.withLock { |
| if (!worker.list.isEmpty()) { |
| worker.list.removeLast () |
| } |
| } |
| |
| if (action) |
| return action |
| } |
| } |
| |
| return null |
| } |
| } |
| |
| static void startPool(int count) { |
| for (int i = 0; i != count; ++i) { |
| def n = i |
| Thread.start { |
| Thread thread = Thread.currentThread() |
| thread.registerWorker () |
| thread.name = "WorkerPool-" + n |
| try { |
| while(true) { |
| thread.execute () |
| } |
| } |
| finally { |
| return |
| } |
| } |
| } |
| } |
| } |
| |
| class FibCalculator { |
| def calcFib (value) { |
| def a = calcFibImpl (value) |
| String calc = Thread.currentThread().name |
| post { |
| println "fib(${value})=$a $calc ${Thread.currentThread().name}" |
| } |
| Thread.sleep(1) |
| a |
| } |
| |
| def calcFibImpl (value) { |
| if (value <= 0) |
| 0 |
| else { |
| if (value <= 2) { |
| 1 |
| } |
| else |
| longOperation { |
| calcFib (value-1) |
| } |
| .and { |
| calcFib (value-2) |
| } |
| .send { Object [] it -> |
| it [0] + it [1] |
| } |
| } |
| } |
| |
| def calcFibSync (value) { |
| def a = calcFibSyncImpl (value) |
| String calc = Thread.currentThread().name |
| println "fib(${value})=$a $calc ${Thread.currentThread().name}" |
| Thread.sleep(1) |
| a |
| } |
| |
| |
| def calcFibSyncImpl (value) { |
| if (value <= 0) |
| 0 |
| else { |
| if (value <= 2) { |
| 1 |
| } |
| else |
| calcFibSync(value-1) + calcFibSync(value-2) |
| } |
| } |
| } |
| |
| class LongOperation { |
| def actions = [] |
| |
| LongOperation (Closure action) { |
| actions << action |
| } |
| |
| LongOperation and (Closure c) { |
| actions << c |
| this |
| } |
| |
| def send (Closure op) { |
| op.call (send()) |
| } |
| |
| Object[] send () { |
| Thread.currentThread().send (actions) |
| } |
| |
| void post () { |
| actions.each { Closure action -> |
| Thread.currentThread().post(action) |
| } |
| } |
| } |