FELIX-6177 : Use BlockingQueue instead of synchronized in UpdateThread. Apply patch from Alexey Markevich
git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1866501 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/configadmin/changelog.txt b/configadmin/changelog.txt
index 598e556..0db8827 100644
--- a/configadmin/changelog.txt
+++ b/configadmin/changelog.txt
@@ -1,5 +1,7 @@
Changes in 1.9.18
-----------------
+** Improvement
+ * [FELIX-6177] : Use BlockingQueue instead of synchronized in UpdateThread
** Bug
* [FELIX-6162] : ConfigurationManager crashes on shutdown if PersistenceManager not yet available
diff --git a/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java b/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java
index 5b43dfe..ebeaf7a 100644
--- a/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java
+++ b/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java
@@ -22,7 +22,8 @@
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
-import java.util.LinkedList;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
import org.osgi.service.log.LogService;
@@ -41,10 +42,10 @@
private final String workerBaseName;
// the queue of Runnable instances to be run
- private final LinkedList<Runnable> updateTasks;
+ private final BlockingDeque<Runnable> updateTasks = new LinkedBlockingDeque<>();
// the actual thread
- private Thread worker;
+ private volatile Thread worker;
// the access control context
private final AccessControlContext acc;
@@ -54,8 +55,6 @@
this.workerThreadGroup = tg;
this.workerBaseName = name;
this.acc = AccessController.getContext();
-
- this.updateTasks = new LinkedList<>();
}
@@ -67,52 +66,37 @@
@Override
public void run()
{
- for ( ;; )
+ try
{
Runnable task;
- synchronized ( updateTasks )
- {
- while ( updateTasks.isEmpty() )
- {
- try
- {
- updateTasks.wait();
- }
- catch ( InterruptedException ie )
- {
- // don't care
- }
- }
-
- task = updateTasks.removeFirst();
- }
-
// return if the task is this thread itself
- if ( task == this )
+ while ((task = updateTasks.take()) != this)
{
- return;
- }
+ // otherwise execute the task, log any issues
+ try
+ {
+ // set the thread name indicating the current task
+ Thread.currentThread().setName( workerBaseName + " (" + task + ")" );
- // otherwise execute the task, log any issues
- try
- {
- // set the thread name indicating the current task
- Thread.currentThread().setName( workerBaseName + " (" + task + ")" );
+ Log.logger.log( LogService.LOG_DEBUG, "Running task {0}", new Object[]
+ { task } );
- Log.logger.log( LogService.LOG_DEBUG, "Running task {0}", new Object[]
- { task } );
-
- run0(task);
+ run0(task);
+ }
+ catch ( Throwable t )
+ {
+ Log.logger.log( LogService.LOG_ERROR, "Unexpected problem executing task", t );
+ }
+ finally
+ {
+ // reset the thread name to "idle"
+ Thread.currentThread().setName( workerBaseName );
+ }
}
- catch ( Throwable t )
- {
- Log.logger.log( LogService.LOG_ERROR, "Unexpected problem executing task", t );
- }
- finally
- {
- // reset the thread name to "idle"
- Thread.currentThread().setName( workerBaseName );
- }
+ }
+ catch (InterruptedException e)
+ {
+ // don't care
}
}
@@ -177,7 +161,7 @@
Thread workerThread = this.worker;
this.worker = null;
- schedule( this );
+ updateTasks.offerFirst( this );
// wait for all updates to terminate (<= 10 seconds !)
try
@@ -194,7 +178,7 @@
Log.logger.log( LogService.LOG_ERROR,
"Worker thread {0} did not terminate within 5 seconds; trying to kill", new Object[]
{ workerBaseName } );
- workerThread.stop();
+ workerThread.interrupt();
}
}
}
@@ -203,16 +187,10 @@
// queue the given runnable to be run as soon as possible
void schedule( Runnable update )
{
- synchronized ( updateTasks )
- {
- Log.logger.log( LogService.LOG_DEBUG, "Scheduling task {0}", new Object[]
- { update } );
+ Log.logger.log( LogService.LOG_DEBUG, "Scheduling task {0}", new Object[]
+ { update } );
- // append to the task queue
- updateTasks.add( update );
-
- // notify the waiting thread
- updateTasks.notifyAll();
- }
+ // append to the task queue
+ updateTasks.offer( update );
}
}
diff --git a/configadmin/src/test/java/org/apache/felix/cm/impl/UpdateThreadTest.java b/configadmin/src/test/java/org/apache/felix/cm/impl/UpdateThreadTest.java
new file mode 100755
index 0000000..7802207
--- /dev/null
+++ b/configadmin/src/test/java/org/apache/felix/cm/impl/UpdateThreadTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.cm.impl;
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class UpdateThreadTest
+{
+ private static final int COUNT = 10;
+
+ @Test
+ public void testUpdateThread() throws Exception {
+ final UpdateThread updateThread = new UpdateThread(null, "name");
+ updateThread.start();
+ try {
+ final CountDownLatch counter = new CountDownLatch(COUNT);
+ for (int i = 0; i < COUNT; ++i) {
+ updateThread.schedule(new Runnable() {
+ @Override
+ public void run() {
+ counter.countDown();
+ }
+ });
+ }
+ assertTrue(counter.await(1L, TimeUnit.MINUTES));
+ } finally {
+ updateThread.terminate();
+ }
+ }
+}