[SUREFIRE-1826] Improved performance of ThreadedStreamConsumer
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 7136834..1114948 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -20,41 +20,42 @@
  */
 
 import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 import org.apache.maven.surefire.extensions.EventHandler;
-import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
+import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 
 import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
-import static java.lang.Thread.currentThread;
+import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
 
 /**
- * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
+ * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
+ * <br>
+ * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
+ * 6.33 mega messages per second
+ * (158 nano seconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
+ * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
  *
  * @author Kristian Rosenvold
  */
 public final class ThreadedStreamConsumer
-        implements EventHandler<Event>, Closeable
+    implements EventHandler<Event>, Closeable
 {
+    private static final int QUEUE_MAX_ITEMS = 10_000;
     private static final Event END_ITEM = new FinalEvent();
 
-    private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
-
-    private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
-
+    private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
     private final AtomicBoolean stop = new AtomicBoolean();
-
-    private final Thread thread;
-
+    private final AtomicBoolean isAlive = new AtomicBoolean( true );
     private final Pumper pumper;
 
     final class Pumper
-            implements Runnable
+        implements Runnable
     {
         private final EventHandler<Event> target;
 
@@ -79,15 +80,17 @@
         @Override
         public void run()
         {
-            while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
+            while ( !stop.get() || !synchronizer.isEmptyQueue() )
             {
                 try
                 {
-                    Event item = ThreadedStreamConsumer.this.items.take();
+                    Event item = synchronizer.awaitNext();
+
                     if ( shouldStopQueueing( item ) )
                     {
-                        return;
+                        break;
                     }
+
                     target.handleEvent( item );
                 }
                 catch ( Throwable t )
@@ -95,6 +98,8 @@
                     errors.addException( t );
                 }
             }
+
+            isAlive.set( false );
         }
 
         boolean hasErrors()
@@ -111,7 +116,7 @@
     public ThreadedStreamConsumer( EventHandler<Event> target )
     {
         pumper = new Pumper( target );
-        thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
+        Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" );
         thread.start();
     }
 
@@ -122,37 +127,24 @@
         {
             return;
         }
-        else if ( !thread.isAlive() )
+        // Do NOT call Thread.isAlive() - slow.
+        // It makes worse performance from 790 millis to 1250 millis for 5 million messages.
+        else if ( !isAlive.get() )
         {
-            items.clear();
+            synchronizer.clearQueue();
             return;
         }
 
-        try
-        {
-            items.put( event );
-        }
-        catch ( InterruptedException e )
-        {
-            currentThread().interrupt();
-            throw new IllegalStateException( e );
-        }
+        synchronizer.pushNext( event );
     }
 
     @Override
     public void close()
-            throws IOException
+        throws IOException
     {
         if ( stop.compareAndSet( false, true ) )
         {
-            try
-            {
-                items.put( END_ITEM );
-            }
-            catch ( InterruptedException e )
-            {
-                currentThread().interrupt();
-            }
+            synchronizer.markStopped();
         }
 
         if ( pumper.hasErrors() )
@@ -167,7 +159,7 @@
      * @param item    element from <code>items</code>
      * @return {@code true} if tail of the queue
      */
-    private boolean shouldStopQueueing( Event item )
+    private static boolean shouldStopQueueing( Event item )
     {
         return item == END_ITEM;
     }
@@ -224,4 +216,122 @@
             return false;
         }
     }
+
+    /**
+     * This synchronization helper mostly avoids the locks.
+     * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
+     * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
+     *
+     * @param <T> element type in the queue
+     */
+    static class QueueSynchronizer<T>
+    {
+        private final SyncT1 t1 = new SyncT1();
+        private final SyncT2 t2 = new SyncT2();
+        private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
+        private final AtomicInteger queueSize = new AtomicInteger();
+        private final int maxQueueSize;
+        private final T stopItemMarker;
+
+        QueueSynchronizer( int maxQueueSize, T stopItemMarker )
+        {
+            this.maxQueueSize = maxQueueSize;
+            this.stopItemMarker = stopItemMarker;
+        }
+
+        private class SyncT1 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() == 0 ? -1 : 1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void waitIfZero() throws InterruptedException
+            {
+                acquireSharedInterruptibly( 1 );
+            }
+
+            void release()
+            {
+                releaseShared( 0 );
+            }
+        }
+
+        private class SyncT2 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() < maxQueueSize ? 1 : -1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void awaitMax()
+            {
+                acquireShared( 1 );
+            }
+
+            void tryRelease()
+            {
+                if ( queueSize.get() == 0 )
+                {
+                    releaseShared( 0 );
+                }
+            }
+        }
+
+        void markStopped()
+        {
+            addNext( stopItemMarker );
+        }
+
+        void pushNext( T t )
+        {
+            t2.awaitMax();
+            addNext( t );
+        }
+
+        T awaitNext() throws InterruptedException
+        {
+            t2.tryRelease();
+            t1.waitIfZero();
+            queueSize.decrementAndGet();
+            return queue.pollFirst();
+        }
+
+        boolean isEmptyQueue()
+        {
+            return queue.isEmpty();
+        }
+
+        void clearQueue()
+        {
+            queue.clear();
+        }
+
+        private void addNext( T t )
+        {
+            queue.addLast( t );
+            if ( queueSize.getAndIncrement() == 0 )
+            {
+                t1.release();
+            }
+        }
+    }
 }
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
index c5b8f22..729a072 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
@@ -30,7 +30,6 @@
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.maven.plugin.surefire.report.FileReporter.getReportFile;
 import static org.apache.maven.surefire.api.util.internal.StringUtils.NL;
@@ -58,8 +57,6 @@
     private final AtomicStampedReference<FilterOutputStream> fileOutputStream =
             new AtomicStampedReference<>( null, OPEN );
 
-    private final ReentrantLock lock = new ReentrantLock();
-
     private volatile String reportEntryName;
 
     public ConsoleOutputFileReporter( File reportsDirectory, String reportNameSuffix, boolean usePhrasedFileName,
@@ -73,17 +70,9 @@
     }
 
     @Override
-    public void testSetStarting( TestSetReportEntry reportEntry )
+    public synchronized void testSetStarting( TestSetReportEntry reportEntry )
     {
-        lock.lock();
-        try
-        {
-            closeNullReportFile( reportEntry );
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeNullReportFile( reportEntry );
     }
 
     @Override
@@ -92,24 +81,15 @@
     }
 
     @Override
-    public void close()
+    public synchronized void close()
     {
         // The close() method is called in main Thread T2.
-        lock.lock();
-        try
-        {
-            closeReportFile();
-        }
-        finally
-        {
-            lock.unlock();
-        }
+        closeReportFile();
     }
 
     @Override
-    public void writeTestOutput( String output, boolean newLine, boolean stdout )
+    public synchronized void writeTestOutput( String output, boolean newLine, boolean stdout )
     {
-        lock.lock();
         try
         {
             // This method is called in single thread T1 per fork JVM (see ThreadedStreamConsumer).
@@ -148,10 +128,6 @@
             // todo use UncheckedIOException in Java 8
             throw new RuntimeException( e );
         }
-        finally
-        {
-            lock.unlock();
-        }
     }
 
     @SuppressWarnings( "checkstyle:emptyblock" )
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
new file mode 100644
index 0000000..a859c76
--- /dev/null
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -0,0 +1,151 @@
+package org.apache.maven.plugin.surefire.booterclient.output;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer.QueueSynchronizer;
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+import static org.fest.assertions.Assertions.assertThat;
+
+/**
+ *
+ */
+@SuppressWarnings( "checkstyle:magicnumber" )
+public class ThreadedStreamConsumerTest
+{
+    @Test
+    public void testQueueSynchronizer() throws Exception
+    {
+        final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
+        final QueueSynchronizer<Integer> sync = new QueueSynchronizer<>(  8 * 1024, null );
+
+        Thread t = new Thread()
+        {
+            @Override
+            public void run()
+            {
+                while ( true )
+                {
+                    try
+                    {
+                        sync.awaitNext();
+                        countDown.countDown();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        throw new IllegalStateException( e );
+                    }
+                }
+            }
+        };
+        t.setDaemon( true );
+        t.start();
+
+        SECONDS.sleep( 1 );
+        System.gc();
+        SECONDS.sleep( 2 );
+
+        long t1 = System.currentTimeMillis();
+
+        for ( int i = 0; i < 5_000_000; i++ )
+        {
+            sync.pushNext( i );
+        }
+
+        assertThat( countDown.await( 3L, SECONDS ) )
+            .isTrue();
+
+        long t2 = System.currentTimeMillis();
+        System.out.println( ( t2 - t1 ) + " millis in testQueueSynchronizer()" );
+    }
+
+    @Test
+    public void testThreadedStreamConsumer() throws Exception
+    {
+        final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
+        EventHandler<Event> handler = new EventHandler<Event>()
+        {
+            @Override
+            public void handleEvent( @Nonnull Event event )
+            {
+                countDown.countDown();
+            }
+        };
+
+        ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler );
+
+        SECONDS.sleep( 1 );
+        System.gc();
+        SECONDS.sleep( 2 );
+
+        long t1 = System.currentTimeMillis();
+
+        Event event = new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" );
+        for ( int i = 0; i < 5_000_000; i++ )
+        {
+            streamConsumer.handleEvent( event );
+        }
+
+        assertThat( countDown.await( 3L, SECONDS ) )
+            .isTrue();
+
+        long t2 = System.currentTimeMillis();
+        System.out.println( ( t2 - t1 ) + " millis in testThreadedStreamConsumer()" );
+
+        streamConsumer.close();
+    }
+
+    @Test
+    public void test3() throws Exception
+    {
+        final QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2, null );
+        sync.pushNext( "1" );
+        sync.pushNext( "2" );
+        String s1 = sync.awaitNext();
+        String s2 = sync.awaitNext();
+        assertThat( s1 ).isEqualTo( "1" );
+        assertThat( s2 ).isEqualTo( "2" );
+        FutureTask<Void> future = new FutureTask<>( new Callable<Void>()
+        {
+            @Override
+            public Void call() throws Exception
+            {
+                sync.awaitNext();
+                return null;
+            }
+        } );
+        Thread t = new Thread( future );
+        t.setDaemon( true );
+        t.start();
+        SECONDS.sleep( 3L );
+        assertThat( t.getState() )
+            .isEqualTo( Thread.State.WAITING );
+    }
+}
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
index 36425ed..9770f8a 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
@@ -41,6 +41,7 @@
 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStreamBuilderTest;
 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestProvidingInputStreamTest;
 import org.apache.maven.plugin.surefire.booterclient.output.ForkClientTest;
+import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumerTest;
 import org.apache.maven.plugin.surefire.extensions.ConsoleOutputReporterTest;
 import org.apache.maven.plugin.surefire.extensions.E2ETest;
 import org.apache.maven.plugin.surefire.extensions.ForkedProcessEventNotifierTest;
@@ -112,6 +113,7 @@
         suite.addTest( new JUnit4TestAdapter( ForkChannelTest.class ) );
         suite.addTest( new JUnit4TestAdapter( StreamFeederTest.class ) );
         suite.addTest( new JUnit4TestAdapter( E2ETest.class ) );
+        suite.addTest( new JUnit4TestAdapter( ThreadedStreamConsumerTest.class ) );
         return suite;
     }
 }