[SUREFIRE-1826] Improved performance of ThreadedStreamConsumer
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 7136834..1114948 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -20,41 +20,42 @@
*/
import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
import org.apache.maven.surefire.extensions.EventHandler;
-import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
+import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-import static java.lang.Thread.currentThread;
+import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
/**
- * Knows how to reconstruct *all* the state transmitted over stdout by the forked process.
+ * Knows how to reconstruct *all* the state transmitted over Channel by the forked process.
+ * <br>
+ * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes
+ * 6.33 mega messages per second
+ * (158 nano seconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest)
+ * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
*
* @author Kristian Rosenvold
*/
public final class ThreadedStreamConsumer
- implements EventHandler<Event>, Closeable
+ implements EventHandler<Event>, Closeable
{
+ private static final int QUEUE_MAX_ITEMS = 10_000;
private static final Event END_ITEM = new FinalEvent();
- private static final int ITEM_LIMIT_BEFORE_SLEEP = 10_000;
-
- private final BlockingQueue<Event> items = new ArrayBlockingQueue<>( ITEM_LIMIT_BEFORE_SLEEP );
-
+ private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
private final AtomicBoolean stop = new AtomicBoolean();
-
- private final Thread thread;
-
+ private final AtomicBoolean isAlive = new AtomicBoolean( true );
private final Pumper pumper;
final class Pumper
- implements Runnable
+ implements Runnable
{
private final EventHandler<Event> target;
@@ -79,15 +80,17 @@
@Override
public void run()
{
- while ( !ThreadedStreamConsumer.this.stop.get() || !ThreadedStreamConsumer.this.items.isEmpty() )
+ while ( !stop.get() || !synchronizer.isEmptyQueue() )
{
try
{
- Event item = ThreadedStreamConsumer.this.items.take();
+ Event item = synchronizer.awaitNext();
+
if ( shouldStopQueueing( item ) )
{
- return;
+ break;
}
+
target.handleEvent( item );
}
catch ( Throwable t )
@@ -95,6 +98,8 @@
errors.addException( t );
}
}
+
+ isAlive.set( false );
}
boolean hasErrors()
@@ -111,7 +116,7 @@
public ThreadedStreamConsumer( EventHandler<Event> target )
{
pumper = new Pumper( target );
- thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" );
+ Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" );
thread.start();
}
@@ -122,37 +127,24 @@
{
return;
}
- else if ( !thread.isAlive() )
+ // Do NOT call Thread.isAlive() - slow.
+ // It makes worse performance from 790 millis to 1250 millis for 5 million messages.
+ else if ( !isAlive.get() )
{
- items.clear();
+ synchronizer.clearQueue();
return;
}
- try
- {
- items.put( event );
- }
- catch ( InterruptedException e )
- {
- currentThread().interrupt();
- throw new IllegalStateException( e );
- }
+ synchronizer.pushNext( event );
}
@Override
public void close()
- throws IOException
+ throws IOException
{
if ( stop.compareAndSet( false, true ) )
{
- try
- {
- items.put( END_ITEM );
- }
- catch ( InterruptedException e )
- {
- currentThread().interrupt();
- }
+ synchronizer.markStopped();
}
if ( pumper.hasErrors() )
@@ -167,7 +159,7 @@
* @param item element from <code>items</code>
* @return {@code true} if tail of the queue
*/
- private boolean shouldStopQueueing( Event item )
+ private static boolean shouldStopQueueing( Event item )
{
return item == END_ITEM;
}
@@ -224,4 +216,122 @@
return false;
}
}
+
+ /**
+ * This synchronization helper mostly avoids the locks.
+ * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked).
+ * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent").
+ *
+ * @param <T> element type in the queue
+ */
+ static class QueueSynchronizer<T>
+ {
+ private final SyncT1 t1 = new SyncT1();
+ private final SyncT2 t2 = new SyncT2();
+ private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>();
+ private final AtomicInteger queueSize = new AtomicInteger();
+ private final int maxQueueSize;
+ private final T stopItemMarker;
+
+ QueueSynchronizer( int maxQueueSize, T stopItemMarker )
+ {
+ this.maxQueueSize = maxQueueSize;
+ this.stopItemMarker = stopItemMarker;
+ }
+
+ private class SyncT1 extends AbstractQueuedSynchronizer
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected int tryAcquireShared( int arg )
+ {
+ return queueSize.get() == 0 ? -1 : 1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared( int arg )
+ {
+ return true;
+ }
+
+ void waitIfZero() throws InterruptedException
+ {
+ acquireSharedInterruptibly( 1 );
+ }
+
+ void release()
+ {
+ releaseShared( 0 );
+ }
+ }
+
+ private class SyncT2 extends AbstractQueuedSynchronizer
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected int tryAcquireShared( int arg )
+ {
+ return queueSize.get() < maxQueueSize ? 1 : -1;
+ }
+
+ @Override
+ protected boolean tryReleaseShared( int arg )
+ {
+ return true;
+ }
+
+ void awaitMax()
+ {
+ acquireShared( 1 );
+ }
+
+ void tryRelease()
+ {
+ if ( queueSize.get() == 0 )
+ {
+ releaseShared( 0 );
+ }
+ }
+ }
+
+ void markStopped()
+ {
+ addNext( stopItemMarker );
+ }
+
+ void pushNext( T t )
+ {
+ t2.awaitMax();
+ addNext( t );
+ }
+
+ T awaitNext() throws InterruptedException
+ {
+ t2.tryRelease();
+ t1.waitIfZero();
+ queueSize.decrementAndGet();
+ return queue.pollFirst();
+ }
+
+ boolean isEmptyQueue()
+ {
+ return queue.isEmpty();
+ }
+
+ void clearQueue()
+ {
+ queue.clear();
+ }
+
+ private void addNext( T t )
+ {
+ queue.addLast( t );
+ if ( queueSize.getAndIncrement() == 0 )
+ {
+ t1.release();
+ }
+ }
+ }
}
diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
index c5b8f22..729a072 100644
--- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
+++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/report/ConsoleOutputFileReporter.java
@@ -30,7 +30,6 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicStampedReference;
-import java.util.concurrent.locks.ReentrantLock;
import static org.apache.maven.plugin.surefire.report.FileReporter.getReportFile;
import static org.apache.maven.surefire.api.util.internal.StringUtils.NL;
@@ -58,8 +57,6 @@
private final AtomicStampedReference<FilterOutputStream> fileOutputStream =
new AtomicStampedReference<>( null, OPEN );
- private final ReentrantLock lock = new ReentrantLock();
-
private volatile String reportEntryName;
public ConsoleOutputFileReporter( File reportsDirectory, String reportNameSuffix, boolean usePhrasedFileName,
@@ -73,17 +70,9 @@
}
@Override
- public void testSetStarting( TestSetReportEntry reportEntry )
+ public synchronized void testSetStarting( TestSetReportEntry reportEntry )
{
- lock.lock();
- try
- {
- closeNullReportFile( reportEntry );
- }
- finally
- {
- lock.unlock();
- }
+ closeNullReportFile( reportEntry );
}
@Override
@@ -92,24 +81,15 @@
}
@Override
- public void close()
+ public synchronized void close()
{
// The close() method is called in main Thread T2.
- lock.lock();
- try
- {
- closeReportFile();
- }
- finally
- {
- lock.unlock();
- }
+ closeReportFile();
}
@Override
- public void writeTestOutput( String output, boolean newLine, boolean stdout )
+ public synchronized void writeTestOutput( String output, boolean newLine, boolean stdout )
{
- lock.lock();
try
{
// This method is called in single thread T1 per fork JVM (see ThreadedStreamConsumer).
@@ -148,10 +128,6 @@
// todo use UncheckedIOException in Java 8
throw new RuntimeException( e );
}
- finally
- {
- lock.unlock();
- }
}
@SuppressWarnings( "checkstyle:emptyblock" )
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
new file mode 100644
index 0000000..a859c76
--- /dev/null
+++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -0,0 +1,151 @@
+package org.apache.maven.plugin.surefire.booterclient.output;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer.QueueSynchronizer;
+import org.apache.maven.surefire.api.event.Event;
+import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
+import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+import static org.fest.assertions.Assertions.assertThat;
+
+/**
+ *
+ */
+@SuppressWarnings( "checkstyle:magicnumber" )
+public class ThreadedStreamConsumerTest
+{
+ @Test
+ public void testQueueSynchronizer() throws Exception
+ {
+ final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
+ final QueueSynchronizer<Integer> sync = new QueueSynchronizer<>( 8 * 1024, null );
+
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ while ( true )
+ {
+ try
+ {
+ sync.awaitNext();
+ countDown.countDown();
+ }
+ catch ( InterruptedException e )
+ {
+ throw new IllegalStateException( e );
+ }
+ }
+ }
+ };
+ t.setDaemon( true );
+ t.start();
+
+ SECONDS.sleep( 1 );
+ System.gc();
+ SECONDS.sleep( 2 );
+
+ long t1 = System.currentTimeMillis();
+
+ for ( int i = 0; i < 5_000_000; i++ )
+ {
+ sync.pushNext( i );
+ }
+
+ assertThat( countDown.await( 3L, SECONDS ) )
+ .isTrue();
+
+ long t2 = System.currentTimeMillis();
+ System.out.println( ( t2 - t1 ) + " millis in testQueueSynchronizer()" );
+ }
+
+ @Test
+ public void testThreadedStreamConsumer() throws Exception
+ {
+ final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
+ EventHandler<Event> handler = new EventHandler<Event>()
+ {
+ @Override
+ public void handleEvent( @Nonnull Event event )
+ {
+ countDown.countDown();
+ }
+ };
+
+ ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler );
+
+ SECONDS.sleep( 1 );
+ System.gc();
+ SECONDS.sleep( 2 );
+
+ long t1 = System.currentTimeMillis();
+
+ Event event = new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" );
+ for ( int i = 0; i < 5_000_000; i++ )
+ {
+ streamConsumer.handleEvent( event );
+ }
+
+ assertThat( countDown.await( 3L, SECONDS ) )
+ .isTrue();
+
+ long t2 = System.currentTimeMillis();
+ System.out.println( ( t2 - t1 ) + " millis in testThreadedStreamConsumer()" );
+
+ streamConsumer.close();
+ }
+
+ @Test
+ public void test3() throws Exception
+ {
+ final QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2, null );
+ sync.pushNext( "1" );
+ sync.pushNext( "2" );
+ String s1 = sync.awaitNext();
+ String s2 = sync.awaitNext();
+ assertThat( s1 ).isEqualTo( "1" );
+ assertThat( s2 ).isEqualTo( "2" );
+ FutureTask<Void> future = new FutureTask<>( new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ sync.awaitNext();
+ return null;
+ }
+ } );
+ Thread t = new Thread( future );
+ t.setDaemon( true );
+ t.start();
+ SECONDS.sleep( 3L );
+ assertThat( t.getState() )
+ .isEqualTo( Thread.State.WAITING );
+ }
+}
diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
index 36425ed..9770f8a 100644
--- a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
+++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
@@ -41,6 +41,7 @@
import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStreamBuilderTest;
import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestProvidingInputStreamTest;
import org.apache.maven.plugin.surefire.booterclient.output.ForkClientTest;
+import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumerTest;
import org.apache.maven.plugin.surefire.extensions.ConsoleOutputReporterTest;
import org.apache.maven.plugin.surefire.extensions.E2ETest;
import org.apache.maven.plugin.surefire.extensions.ForkedProcessEventNotifierTest;
@@ -112,6 +113,7 @@
suite.addTest( new JUnit4TestAdapter( ForkChannelTest.class ) );
suite.addTest( new JUnit4TestAdapter( StreamFeederTest.class ) );
suite.addTest( new JUnit4TestAdapter( E2ETest.class ) );
+ suite.addTest( new JUnit4TestAdapter( ThreadedStreamConsumerTest.class ) );
return suite;
}
}