[SUREFIRE-1827] The console output is not flushed
diff --git a/surefire-api/pom.xml b/surefire-api/pom.xml
index ff62005..158c88a 100644
--- a/surefire-api/pom.xml
+++ b/surefire-api/pom.xml
@@ -52,6 +52,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <artifactId>powermock-reflect</artifactId>
+ <groupId>org.powermock</groupId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java
index fe998f3..c2641e5 100644
--- a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java
+++ b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/AbstractNoninterruptibleWritableChannel.java
@@ -67,17 +67,12 @@
src.flip();
}
- int countWrittenBytes = 0;
-
- if ( src.hasRemaining() )
+ int countWrittenBytes = src.remaining();
+ writeImpl( src );
+ src.position( src.limit() );
+ if ( flush )
{
- countWrittenBytes = src.remaining();
- writeImpl( src );
- src.position( src.limit() );
- if ( flush )
- {
- flushImpl();
- }
+ flushImpl();
}
return countWrittenBytes;
}
diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java
index 7bd4efc..3938c4a 100644
--- a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java
+++ b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/Channels.java
@@ -32,7 +32,9 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;
/**
@@ -227,17 +229,28 @@
};
}
- private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out, @Nonnegative int bufferSize )
+ private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out,
+ @Nonnegative final int bufferSize )
{
requireNonNull( out, "the stream should not be null" );
final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream( out, bufferSize );
return new AbstractNoninterruptibleWritableChannel()
{
+ private final AtomicLong bytesCounter = new AtomicLong();
+
+ @Override
+ public long countBufferOverflows()
+ {
+ return bufferSize == 0 ? 0 : max( bytesCounter.get() - 1, 0 ) / bufferSize;
+ }
+
@Override
protected void writeImpl( ByteBuffer src ) throws IOException
{
- bos.write( src.array(), src.arrayOffset() + src.position(), src.remaining() );
+ int count = src.remaining();
+ bos.write( src.array(), src.arrayOffset() + src.position(), count );
+ bytesCounter.getAndAdd( count );
}
@Override
diff --git a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java
index 42c0d08..ea86a82 100644
--- a/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java
+++ b/surefire-api/src/main/java/org/apache/maven/surefire/api/util/internal/WritableBufferedByteChannel.java
@@ -29,8 +29,10 @@
* and the channel is flushed after the buffer has overflew.
* <br>
* The method {@link #write(ByteBuffer)} flushes every written message.
+ * You can flush the channel by {@link #write(ByteBuffer) writing} the zero length of {@link ByteBuffer}.
*/
public interface WritableBufferedByteChannel extends WritableByteChannel
{
void writeBuffered( ByteBuffer src ) throws IOException;
+ long countBufferOverflows();
}
diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java
index 42ca2c7..025cfb5 100644
--- a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java
+++ b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsReaderTest.java
@@ -28,11 +28,13 @@
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.ClosedChannelException;
@@ -54,6 +56,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import static org.powermock.reflect.Whitebox.invokeMethod;
/**
* The tests for {@link Channels#newChannel(InputStream)} and {@link Channels#newBufferedChannel(InputStream)}.
@@ -69,6 +72,36 @@
.build();
@Test
+ public void shouldOverflowBuffer() throws Exception
+ {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ WritableBufferedByteChannel channel = invokeMethod( Channels.class, "newChannel",
+ new Class[] {OutputStream.class, int.class}, new Object[] {out, 8} );
+
+ assertThat( channel.countBufferOverflows() )
+ .isEqualTo( 0 );
+
+ channel.write( ByteBuffer.wrap( new byte[] {1, 2, 3} ) );
+
+ assertThat( channel.countBufferOverflows() )
+ .isEqualTo( 0 );
+
+ channel.write( ByteBuffer.wrap( new byte[] {4, 5, 6, 7, 8} ) );
+
+ assertThat( channel.countBufferOverflows() )
+ .isEqualTo( 0 );
+
+ channel.write( ByteBuffer.wrap( new byte[] {9} ) );
+
+ assertThat( channel.countBufferOverflows() )
+ .isEqualTo( 1 );
+
+ assertThat( out.toByteArray() )
+ .isEqualTo( new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9} );
+ }
+
+ @Test
public void exactBufferSize() throws Exception
{
ByteArrayInputStream is = new ByteArrayInputStream( new byte[] {1, 2, 3} );
diff --git a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java
index 4befc24..24a09f3 100644
--- a/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java
+++ b/surefire-api/src/test/java/org/apache/maven/surefire/api/util/internal/ChannelsWriterTest.java
@@ -137,6 +137,48 @@
}
@Test
+ public void shouldFlushWhenEmptyBuffer() throws Exception
+ {
+ final boolean[] flushed = {false};
+ ByteArrayOutputStream out = new ByteArrayOutputStream()
+ {
+ @Override
+ public void flush() throws IOException
+ {
+ flushed[0] = true;
+ super.flush();
+ }
+ };
+ WritableByteChannel channel = Channels.newChannel( out );
+ ByteBuffer bb = ByteBuffer.allocate( 0 );
+ int countWritten = channel.write( bb );
+ assertThat( countWritten )
+ .isEqualTo( 0 );
+ assertThat( flushed[0] )
+ .isTrue();
+ }
+
+ @Test
+ public void shouldFlushWhenEmptyBufferOnBufferedWrites() throws Exception
+ {
+ final boolean[] flushed = {false};
+ ByteArrayOutputStream out = new ByteArrayOutputStream()
+ {
+ @Override
+ public void flush() throws IOException
+ {
+ flushed[0] = true;
+ super.flush();
+ }
+ };
+ WritableBufferedByteChannel channel = Channels.newBufferedChannel( out );
+ ByteBuffer bb = ByteBuffer.allocate( 0 );
+ channel.writeBuffered( bb );
+ assertThat( flushed[0] )
+ .isFalse();
+ }
+
+ @Test
public void bufferedChannel() throws Exception
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -151,11 +193,6 @@
assertThat( out.toByteArray() )
.isEmpty();
- channel.write( ByteBuffer.allocate( 0 ) );
-
- assertThat( out.toByteArray() )
- .isEmpty();
-
channel.write( ByteBuffer.wrap( new byte[] {4} ) );
assertThat( out.toByteArray() )
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/AbstractMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/AbstractMasterProcessChannelProcessorFactory.java
new file mode 100644
index 0000000..63407f2
--- /dev/null
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/AbstractMasterProcessChannelProcessorFactory.java
@@ -0,0 +1,106 @@
+package org.apache.maven.surefire.booter.spi;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
+import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.AccessControlException;
+import java.security.PrivilegedAction;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.security.AccessController.doPrivileged;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThreadFactory;
+
+/**
+ * Default implementation of {@link MasterProcessChannelProcessorFactory}.
+ */
+public abstract class AbstractMasterProcessChannelProcessorFactory
+ implements MasterProcessChannelProcessorFactory
+{
+ private static final String STREAM_FLUSHER = "surefire-forkedjvm-stream-flusher";
+ private final ScheduledExecutorService flusher;
+
+ public AbstractMasterProcessChannelProcessorFactory()
+ {
+ flusher = newScheduledThreadPool( 1, newDaemonThreadFactory( STREAM_FLUSHER ) );
+ }
+
+ protected void schedulePeriodicFlusher( int delayInMillis, final WritableBufferedByteChannel channel )
+ {
+ final AtomicLong bufferOverflows = new AtomicLong();
+ flusher.scheduleWithFixedDelay( new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ long currentBufferOverflows = channel.countBufferOverflows();
+ // optimization: flush the Channel only if the buffer has not overflew after last period of time
+ if ( bufferOverflows.get() == currentBufferOverflows )
+ {
+ try
+ {
+ channel.write( ByteBuffer.allocate( 0 ) );
+ }
+ catch ( Exception e )
+ {
+ // cannot do anything about this I/O issue
+ }
+ }
+ else
+ {
+ bufferOverflows.set( currentBufferOverflows );
+ }
+ }
+ }, 0L, delayInMillis, MILLISECONDS );
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ doPrivileged( new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ flusher.shutdown();
+ // Do NOT call awaitTermination() due to this would unnecessarily prolong teardown
+ // time of the JVM. It is not a critical situation when the JXM exits this daemon
+ // thread because the interrupted flusher does not break any business function.
+ // All business data is already safely flushed by test events, then by sending BYE
+ // event at the exit time and finally by flushEventChannelOnExit() in ForkedBooter.
+ return null;
+ }
+ }
+ );
+ }
+ catch ( AccessControlException e )
+ {
+ // ignore
+ }
+ }
+}
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java
index 1344f3d..6e28764 100644
--- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelProcessorFactory.java
@@ -21,7 +21,7 @@
import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
-import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory;
+import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -36,8 +36,10 @@
* @since 3.0.0-M5
*/
public class LegacyMasterProcessChannelProcessorFactory
- implements MasterProcessChannelProcessorFactory
+ extends AbstractMasterProcessChannelProcessorFactory
{
+ private static final int FLUSH_PERIOD_MILLIS = 100;
+
@Override
public boolean canUse( String channelConfig )
{
@@ -62,11 +64,8 @@
@Override
public MasterProcessChannelEncoder createEncoder()
{
- return new LegacyMasterProcessChannelEncoder( newBufferedChannel( System.out ) );
- }
-
- @Override
- public void close()
- {
+ WritableBufferedByteChannel channel = newBufferedChannel( System.out );
+ schedulePeriodicFlusher( FLUSH_PERIOD_MILLIS, channel );
+ return new LegacyMasterProcessChannelEncoder( channel );
}
}
diff --git a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
index 9efff25..0bebeb4 100644
--- a/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
+++ b/surefire-booter/src/main/java/org/apache/maven/surefire/booter/spi/SurefireMasterProcessChannelProcessorFactory.java
@@ -21,7 +21,7 @@
import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
-import org.apache.maven.surefire.spi.MasterProcessChannelProcessorFactory;
+import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -53,8 +53,9 @@
* @since 3.0.0-M5
*/
public class SurefireMasterProcessChannelProcessorFactory
- implements MasterProcessChannelProcessorFactory
+ extends AbstractMasterProcessChannelProcessorFactory
{
+ private static final int FLUSH_PERIOD_MILLIS = 100;
private volatile AsynchronousSocketChannel clientSocketChannel;
@Override
@@ -104,12 +105,15 @@
@Override
public MasterProcessChannelEncoder createEncoder()
{
- return new LegacyMasterProcessChannelEncoder( newBufferedChannel( newOutputStream( clientSocketChannel ) ) );
+ WritableBufferedByteChannel channel = newBufferedChannel( newOutputStream( clientSocketChannel ) );
+ schedulePeriodicFlusher( FLUSH_PERIOD_MILLIS, channel );
+ return new LegacyMasterProcessChannelEncoder( channel );
}
@Override
public void close() throws IOException
{
+ super.close();
if ( clientSocketChannel != null && clientSocketChannel.isOpen() )
{
clientSocketChannel.close();
diff --git a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java
index 1982f7e..c1140c8 100644
--- a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java
+++ b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/ForkedBooterMockTest.java
@@ -22,6 +22,8 @@
import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
import org.apache.maven.surefire.api.booter.MasterProcessChannelEncoder;
import org.apache.maven.surefire.api.report.StackTraceWriter;
+import org.apache.maven.surefire.api.util.internal.WritableBufferedByteChannel;
+import org.apache.maven.surefire.booter.spi.AbstractMasterProcessChannelProcessorFactory;
import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelDecoder;
import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelEncoder;
import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelProcessorFactory;
@@ -42,6 +44,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
@@ -55,6 +58,7 @@
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.maven.surefire.api.util.internal.Channels.newBufferedChannel;
import static org.fest.assertions.Assertions.assertThat;
import static org.fest.assertions.Fail.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -296,6 +300,72 @@
}
@Test
+ @SuppressWarnings( "checkstyle:magicnumber" )
+ public void shouldScheduleFlushes() throws Exception
+ {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ class Factory extends AbstractMasterProcessChannelProcessorFactory
+ {
+ @Override
+ public boolean canUse( String channelConfig )
+ {
+ return false;
+ }
+
+ @Override
+ public void connect( String channelConfig )
+ {
+ }
+
+ @Override
+ public MasterProcessChannelDecoder createDecoder()
+ {
+ return null;
+ }
+
+ @Override
+ public MasterProcessChannelEncoder createEncoder()
+ {
+ return null;
+ }
+
+ public void runScheduler() throws InterruptedException
+ {
+ final WritableBufferedByteChannel channel = newBufferedChannel( out );
+ schedulePeriodicFlusher( 100, channel );
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ for ( int i = 0; i < 10; i++ )
+ {
+ try
+ {
+ channel.write( ByteBuffer.wrap( new byte[] {1} ) );
+ Thread.sleep( 75 );
+ }
+ catch ( Exception e )
+ {
+ //
+ }
+ }
+ }
+ };
+ t.setDaemon( true );
+ t.start();
+ t.join( 5000L );
+ }
+ }
+
+ Factory factory = new Factory();
+ factory.runScheduler();
+ factory.close();
+ assertThat( out.size() ).isPositive();
+ assertThat( out.size() ).isLessThanOrEqualTo( 10 );
+ }
+
+ @Test
public void shouldLookupSurefireDecoderFactory() throws Exception
{
mockStatic( ForkedBooter.class );
diff --git a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java
index 7f72ba2..0262f3e 100644
--- a/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java
+++ b/surefire-booter/src/test/java/org/apache/maven/surefire/booter/spi/LegacyMasterProcessChannelEncoderTest.java
@@ -1418,6 +1418,12 @@
}
@Override
+ public long countBufferOverflows()
+ {
+ return 0;
+ }
+
+ @Override
public int write( ByteBuffer src ) throws IOException
{
this.src = src;