FELIX-6177 :  Use BlockingQueue instead of synchronized in UpdateThread. Apply patch from Alexey Markevich

git-svn-id: https://svn.apache.org/repos/asf/felix/trunk@1866501 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/configadmin/changelog.txt b/configadmin/changelog.txt
index 598e556..0db8827 100644
--- a/configadmin/changelog.txt
+++ b/configadmin/changelog.txt
@@ -1,5 +1,7 @@
 Changes in 1.9.18
 -----------------
+** Improvement
+    * [FELIX-6177] : Use BlockingQueue instead of synchronized in UpdateThread
 ** Bug
     * [FELIX-6162] : ConfigurationManager crashes on shutdown if PersistenceManager not yet available
 
diff --git a/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java b/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java
index 5b43dfe..ebeaf7a 100644
--- a/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java
+++ b/configadmin/src/main/java/org/apache/felix/cm/impl/UpdateThread.java
@@ -22,7 +22,8 @@
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
-import java.util.LinkedList;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
 
 import org.osgi.service.log.LogService;
 
@@ -41,10 +42,10 @@
     private final String workerBaseName;
 
     // the queue of Runnable instances  to be run
-    private final LinkedList<Runnable> updateTasks;
+    private final BlockingDeque<Runnable> updateTasks = new LinkedBlockingDeque<>();
 
     // the actual thread
-    private Thread worker;
+    private volatile Thread worker;
 
     // the access control context
     private final AccessControlContext acc;
@@ -54,8 +55,6 @@
         this.workerThreadGroup = tg;
         this.workerBaseName = name;
         this.acc = AccessController.getContext();
-
-        this.updateTasks = new LinkedList<>();
     }
 
 
@@ -67,52 +66,37 @@
     @Override
     public void run()
     {
-        for ( ;; )
+        try
         {
             Runnable task;
-            synchronized ( updateTasks )
-            {
-                while ( updateTasks.isEmpty() )
-                {
-                    try
-                    {
-                        updateTasks.wait();
-                    }
-                    catch ( InterruptedException ie )
-                    {
-                        // don't care
-                    }
-                }
-
-                task = updateTasks.removeFirst();
-            }
-
             // return if the task is this thread itself
-            if ( task == this )
+            while ((task = updateTasks.take()) != this)
             {
-                return;
-            }
+                // otherwise execute the task, log any issues
+                try
+                {
+                    // set the thread name indicating the current task
+                    Thread.currentThread().setName( workerBaseName + " (" + task + ")" );
 
-            // otherwise execute the task, log any issues
-            try
-            {
-                // set the thread name indicating the current task
-                Thread.currentThread().setName( workerBaseName + " (" + task + ")" );
+                    Log.logger.log( LogService.LOG_DEBUG, "Running task {0}", new Object[]
+                        { task } );
 
-                Log.logger.log( LogService.LOG_DEBUG, "Running task {0}", new Object[]
-                    { task } );
-
-                run0(task);
+                    run0(task);
+                }
+                catch ( Throwable t )
+                {
+                    Log.logger.log( LogService.LOG_ERROR, "Unexpected problem executing task", t );
+                }
+                finally
+                {
+                    // reset the thread name to "idle"
+                    Thread.currentThread().setName( workerBaseName );
+                }
             }
-            catch ( Throwable t )
-            {
-                Log.logger.log( LogService.LOG_ERROR, "Unexpected problem executing task", t );
-            }
-            finally
-            {
-                // reset the thread name to "idle"
-                Thread.currentThread().setName( workerBaseName );
-            }
+        }
+        catch (InterruptedException e)
+        {
+            // don't care
         }
     }
 
@@ -177,7 +161,7 @@
             Thread workerThread = this.worker;
             this.worker = null;
 
-            schedule( this );
+            updateTasks.offerFirst( this );
 
             // wait for all updates to terminate (<= 10 seconds !)
             try
@@ -194,7 +178,7 @@
                 Log.logger.log( LogService.LOG_ERROR,
                     "Worker thread {0} did not terminate within 5 seconds; trying to kill", new Object[]
                         { workerBaseName } );
-                workerThread.stop();
+                workerThread.interrupt();
             }
         }
     }
@@ -203,16 +187,10 @@
     // queue the given runnable to be run as soon as possible
     void schedule( Runnable update )
     {
-        synchronized ( updateTasks )
-        {
-            Log.logger.log( LogService.LOG_DEBUG, "Scheduling task {0}", new Object[]
-                { update } );
+        Log.logger.log( LogService.LOG_DEBUG, "Scheduling task {0}", new Object[]
+            { update } );
 
-            // append to the task queue
-            updateTasks.add( update );
-
-            // notify the waiting thread
-            updateTasks.notifyAll();
-        }
+        // append to the task queue
+        updateTasks.offer( update );
     }
 }
diff --git a/configadmin/src/test/java/org/apache/felix/cm/impl/UpdateThreadTest.java b/configadmin/src/test/java/org/apache/felix/cm/impl/UpdateThreadTest.java
new file mode 100755
index 0000000..7802207
--- /dev/null
+++ b/configadmin/src/test/java/org/apache/felix/cm/impl/UpdateThreadTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.felix.cm.impl;
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class UpdateThreadTest
+{
+    private static final int COUNT = 10;
+
+    @Test
+    public void testUpdateThread() throws Exception {
+        final UpdateThread updateThread = new UpdateThread(null, "name");
+        updateThread.start();
+        try {
+            final CountDownLatch counter = new CountDownLatch(COUNT);
+            for (int i = 0; i < COUNT; ++i) {
+                updateThread.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        counter.countDown();
+                    }
+                });
+            }
+            assertTrue(counter.await(1L, TimeUnit.MINUTES));
+        } finally {
+            updateThread.terminate();
+        }
+    }
+}