blob: 815626f27a76d9548642578fbefe9ebe1bb347c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package groovy
import groovy.test.GroovyTestCase
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.LinkedBlockingQueue
import java.lang.ref.WeakReference
class ActorTest extends GroovyTestCase {
void testSync () {
new FibCalculator().calcFibSync(15)
}
void testAsync () {
ReentrantLock.metaClass {
withLock { Closure c ->
lock()
try {
c.call ()
}
finally {
unlock()
}
}
}
WorkerThread.metaClass.mixin ReentrantLock
Thread.metaClass.mixin WorkerThread
WorkerThread.startPool(15)
Thread thread = Thread.currentThread()
thread.name = "Main App Thread"
thread.registerWorker ()
new FibCalculator().calcFib(18)
Object.metaClass = null
ReentrantLock.metaClass = null
WorkerThread.metaClass = null
}
}
class WorkerThread {
static {
Object.metaClass {
longOperation{ Closure c ->
new LongOperation (c)
}
send{ Closure c ->
new LongOperation (c).send ()
}
post{ Closure c ->
new LongOperation (c).post ()
}
}
}
def list = new LinkedList ()
private static final ReentrantLock globalLock = new ReentrantLock()
private static final LinkedBlockingQueue globalQueue = new LinkedBlockingQueue ()
private static final ArrayList allWorkers = new ArrayList ()
private static final Random r = new Random();
WorkerThread () {
}
void registerWorker() {
globalLock.withLock {
WeakReference ref = new WeakReference(delegate)
for (int i = 0; i != allWorkers.size(); ++i) {
WeakReference or = allWorkers[i]
if (or == null || or.get() == null) {
allWorkers[i] = ref
return ref
}
}
allWorkers << ref
}
}
void schedule (Closure action) {
withLock {
list.addFirst action
}
}
void post (Closure action) {
globalQueue.offer action
}
Object[] send (List actions) {
int len = actions.size()
AtomicInteger counter = new AtomicInteger()
def results = new Object[len]
actions.eachWithIndex {Closure action, int index ->
schedule {
Object res = action.call()
results[index] = res
counter.incrementAndGet()
}
}
while (counter.get() != len) {
execute()
}
return results
}
void execute () {
Closure action = nextTask ()
if (action)
action ()
}
Closure nextTask() {
Closure action = withLock {
return list.isEmpty() ? null : (Closure)list.removeFirst()
}
if (action != null)
return action
action = globalQueue.poll()
if (action != null)
return action
globalLock.withLock {
int len = allWorkers.size()
if (len == 1)
return null
int from = r.nextInt(len)
for (int i = from+1; i != from; ++i) {
if (i == len) {
i = 0
if (from == 0) {
return null
}
}
def worker = allWorkers[i]
if (worker) {
worker = worker.get ()
if (!worker) {
allWorkers [i] = null
continue
}
action = worker.withLock {
if (!worker.list.isEmpty()) {
worker.list.removeLast ()
}
}
if (action)
return action
}
}
return null
}
}
static void startPool(int count) {
for (int i = 0; i != count; ++i) {
def n = i
Thread.start {
Thread thread = Thread.currentThread()
thread.registerWorker ()
thread.name = "WorkerPool-" + n
try {
while(true) {
thread.execute ()
}
}
finally {
return
}
}
}
}
}
class FibCalculator {
def calcFib (value) {
def a = calcFibImpl (value)
String calc = Thread.currentThread().name
post {
println "fib(${value})=$a $calc ${Thread.currentThread().name}"
}
Thread.sleep(1)
a
}
def calcFibImpl (value) {
if (value <= 0)
0
else {
if (value <= 2) {
1
}
else
longOperation {
calcFib (value-1)
}
.and {
calcFib (value-2)
}
.send { Object [] it ->
it [0] + it [1]
}
}
}
def calcFibSync (value) {
def a = calcFibSyncImpl (value)
String calc = Thread.currentThread().name
println "fib(${value})=$a $calc ${Thread.currentThread().name}"
Thread.sleep(1)
a
}
def calcFibSyncImpl (value) {
if (value <= 0)
0
else {
if (value <= 2) {
1
}
else
calcFibSync(value-1) + calcFibSync(value-2)
}
}
}
class LongOperation {
def actions = []
LongOperation (Closure action) {
actions << action
}
LongOperation and (Closure c) {
actions << c
this
}
def send (Closure op) {
op.call (send())
}
Object[] send () {
Thread.currentThread().send (actions)
}
void post () {
actions.each { Closure action ->
Thread.currentThread().post(action)
}
}
}