| /************************************************************** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| *************************************************************/ |
| |
| |
| |
| package com.sun.star.lib.uno.environments.remote; |
| |
| import com.sun.star.lang.DisposedException; |
| |
| /** |
| * The <code>JobQueue</code> implements a queue for jobs. |
| * For every jobs thread id exists a job queue which is registered |
| * at the <code>ThreadPool</code>. |
| * A JobQueue is splitted in a sync job queue and an async job queue. |
| * The sync job queue is the registerd queue, it delegates async jobs |
| * (put by <code>putjob</code>) into the async queue, which is only |
| * known by the sync queue. |
| * <p> |
| * @version $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $ |
| * @author Kay Ramme |
| * @see com.sun.star.lib.uno.environments.remote.ThreadPool |
| * @see com.sun.star.lib.uno.environments.remote.Job |
| * @see com.sun.star.lib.uno.environments.remote.ThreadId |
| * @since UDK1.0 |
| */ |
| public class JobQueue { |
| /** |
| * When set to true, enables various debugging output. |
| */ |
| private static final boolean DEBUG = false; |
| |
| protected Job _head; // the head of the job list |
| protected Job _tail; // the tail of the job list |
| |
| protected ThreadId _threadId; // the thread id of the queue |
| protected int _ref_count = 0; // the stack deepness |
| protected boolean _createThread; // create a worker thread, if needed |
| protected boolean _createThread_now; // create a worker thread, if needed |
| protected Thread _worker_thread; // the thread that does the jobs |
| |
| protected Object _disposeId; // the active dispose id |
| protected Object _doDispose = null; |
| protected Throwable _throwable; |
| |
| protected JobQueue _async_jobQueue; // chaining job qeueus for asyncs |
| protected JobQueue _sync_jobQueue; // chaining job qeueus for syncs |
| |
| protected boolean _active = false; |
| |
| protected JavaThreadPoolFactory _javaThreadPoolFactory; |
| |
| /** |
| * A thread for dispatching jobs |
| */ |
| class JobDispatcher extends Thread { |
| Object _disposeId; |
| |
| JobDispatcher(Object disposeId) { |
| if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId); |
| |
| _disposeId = disposeId; |
| } |
| |
| ThreadId getThreadId() { |
| return _threadId; |
| } |
| |
| public void run() { |
| if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread()); |
| |
| try { |
| enter(2000, _disposeId); |
| } |
| catch(Throwable throwable) { |
| if(_head != null || _active) { // there was a job in progress, so give a stack |
| System.err.println(getClass().getName() + " - exception occurred:" + throwable); |
| throwable.printStackTrace(System.err); |
| } |
| } |
| finally { |
| release(); |
| } |
| |
| if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId); |
| |
| // try { |
| // Object object = new Object(); |
| // synchronized(object) { |
| // object.wait(); |
| // } |
| // } |
| // catch(InterruptedException interruptedException) { |
| // } |
| } |
| } |
| |
| |
| /** |
| * Constructs a async job queue with the given thread id |
| * which belongs to the given sync job queue. |
| * <p> |
| * @param threadId the thread id |
| * @param sync_jobQueue the sync queue this async queue belongs to |
| * @see com.sun.star.lib.uno.environments.remote.ThreadID |
| */ |
| JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) { |
| _javaThreadPoolFactory = javaThreadPoolFactory; |
| _threadId = ThreadId.createFresh(); |
| |
| _sync_jobQueue = javaThreadPoolFactory.getJobQueue(threadId); |
| if(_sync_jobQueue == null) { |
| _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true); |
| _sync_jobQueue.acquire(); |
| } |
| |
| _sync_jobQueue._async_jobQueue = this; |
| |
| _createThread = true; |
| _createThread_now = true; |
| |
| acquire(); |
| |
| if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId); |
| } |
| |
| /** |
| * Constructs a sync job queue with the given thread id and the given thread. |
| * <p> |
| * @param threadId the thread id |
| * @param createThread if true, the queue creates a worker thread if needed |
| * @see com.sun.star.lib.uno.environments.remote.ThreadID |
| */ |
| JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){ |
| _javaThreadPoolFactory = javaThreadPoolFactory; |
| _threadId = threadId; |
| _createThread = createThread; |
| _createThread_now = createThread; |
| |
| if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId + " " + createThread); |
| } |
| |
| /** |
| * Gives the thread id of this queue |
| * <p> |
| * @return the thread id |
| * @see com.sun.star.lib.uno.environments.remote.ThreadID |
| */ |
| ThreadId getThreadId() { |
| return _threadId; |
| } |
| |
| synchronized void acquire() { |
| // add only synchronous queues . |
| if(_ref_count <= 0 && _sync_jobQueue == null ) |
| _javaThreadPoolFactory.addJobQueue(this); |
| |
| ++ _ref_count; |
| } |
| |
| synchronized void release() { |
| -- _ref_count; |
| |
| if(_ref_count <= 0) { |
| // only synchronous queues needs to be removed . |
| if( _sync_jobQueue == null ) |
| _javaThreadPoolFactory.removeJobQueue(this); |
| |
| |
| if(_sync_jobQueue != null) { |
| _sync_jobQueue._async_jobQueue = null; |
| _sync_jobQueue.release(); |
| } |
| } |
| } |
| |
| /** |
| * Removes a job from the queue. |
| * <p> |
| * @return a job or null if timed out |
| * @param waitTime the maximum amount of time to wait for a job |
| */ |
| private Job removeJob(int waitTime) { |
| if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId); |
| |
| Job job = null; |
| synchronized (this) { |
| // wait max. waitTime time for a job to enter the queue |
| boolean waited = false; |
| while(_head == null && (waitTime == 0 || !waited)) { |
| if(_doDispose == _disposeId) { |
| _doDispose = null; |
| throw (DisposedException) |
| new DisposedException().initCause(_throwable); |
| } |
| |
| // notify sync queues |
| notifyAll(); |
| |
| try { |
| // wait for new job |
| wait(waitTime); |
| } |
| catch(InterruptedException interruptedException) { |
| throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); |
| } |
| |
| // signal that we have already waited once |
| waited = true; |
| } |
| |
| |
| if(_head != null) { |
| Job current = _head; |
| _head = _head._next; |
| |
| if(_head == null) |
| _tail = null; |
| |
| job = current; |
| _active = true; |
| } |
| } |
| |
| // always wait for asynchron jobqueue to be finished ! |
| if(job != null && _async_jobQueue != null) { |
| synchronized(_async_jobQueue) { |
| // wait for async queue to be empty and last job to be done |
| while(_async_jobQueue._active || _async_jobQueue._head != null) { |
| if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " + _async_jobQueue._worker_thread); |
| |
| if(_doDispose == _disposeId) { |
| _doDispose = null; |
| throw (DisposedException) |
| new DisposedException().initCause(_throwable); |
| } |
| |
| try { |
| _async_jobQueue.wait(); |
| } |
| catch(InterruptedException interruptedException) { |
| throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); |
| } |
| } |
| } |
| } |
| |
| return job; |
| } |
| |
| /** |
| * Puts a job into the queue. |
| * <p> |
| * @param job the job |
| * @param disposeId a dispose id |
| */ |
| synchronized void putJob(Job job, Object disposeId) { |
| if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job); |
| |
| if(_tail != null) |
| _tail._next = job; |
| else |
| _head = job; |
| |
| _tail = job; |
| |
| if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one |
| |
| acquire(); |
| |
| _createThread_now = false; |
| new JobDispatcher(disposeId).start(); |
| } |
| |
| // always notify possible waiters |
| notifyAll(); |
| } |
| |
| /** |
| * Enters the job queue. |
| * <p> |
| * @return the result of the final job (reply) |
| * @param disposeId a dispose id |
| */ |
| Object enter(Object disposeId) throws Throwable { |
| return enter(0, disposeId); // wait infinitly |
| } |
| |
| /** |
| * Enters the job queue. |
| * <p> |
| * @return the result of the final job (reply) |
| * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly) |
| * @param disposeId a dispose id |
| */ |
| Object enter(int waitTime, Object disposeId) throws Throwable { |
| if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId); |
| |
| boolean quit = false; |
| |
| Object hold_disposeId = _disposeId; |
| _disposeId = disposeId; |
| |
| Object result = null; |
| |
| Thread hold_worker_thread = _worker_thread; |
| _worker_thread = Thread.currentThread(); |
| |
| while(!quit) { |
| Job job = null; |
| |
| try { |
| job = removeJob(waitTime); |
| |
| if(job != null) { |
| try { |
| result = job.execute(); |
| } |
| finally { |
| _active = false; |
| } |
| |
| if (!job.isRequest()) { |
| job.dispose(); |
| |
| quit = true; |
| } |
| |
| job = null; |
| } |
| else |
| quit = true; |
| |
| |
| } |
| finally { // ensure that this queue becomes disposed, if necessary |
| if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result); |
| |
| synchronized(this) { |
| if(job != null || (quit && _head == null)) { |
| _worker_thread = hold_worker_thread; |
| |
| _createThread_now = true; |
| |
| _disposeId = hold_disposeId; |
| |
| if(_sync_jobQueue != null) |
| notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting) |
| } |
| else |
| quit = false; |
| |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| /** |
| * If the given disposeId is registered, |
| * interrups the worker thread. |
| * <p> |
| * @param disposeId the dispose id |
| */ |
| synchronized void dispose(Object disposeId, Throwable throwable) { |
| if(_sync_jobQueue == null) { // dispose only sync queues |
| _doDispose = disposeId; |
| _throwable = throwable; |
| |
| // get thread out of wait and let it throw the throwable |
| if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread"); |
| |
| notifyAll(); |
| } |
| } |
| } |
| |