blob: e0acb9fdcde6092a6872a0886d91f14af8eb31bd [file] [log] [blame]
package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.maven.surefire.api.booter.Command;
import org.apache.maven.surefire.booter.spi.LegacyMasterProcessChannelDecoder;
import org.apache.maven.plugin.surefire.extensions.StreamFeeder;
import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.lang.Thread.State;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import static java.nio.channels.Channels.newChannel;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertTrue;
/**
* Asserts that this stream properly reads bytes from queue.
*
* @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
* @since 2.19
*/
public class TestProvidingInputStreamTest
{
private static final int WAIT_LOOPS = 100;
@Test
public void closedStreamShouldReturnNullAsEndOfStream()
throws IOException
{
Queue<String> commands = new ArrayDeque<>();
TestProvidingInputStream is = new TestProvidingInputStream( commands );
is.close();
assertThat( is.readNextCommand(), is( nullValue() ) );
}
@Test
public void emptyStreamShouldWaitUntilClosed()
throws Exception
{
Queue<String> commands = new ArrayDeque<>();
final TestProvidingInputStream is = new TestProvidingInputStream( commands );
final Thread streamThread = Thread.currentThread();
FutureTask<State> futureTask = new FutureTask<>( new Callable<State>()
{
@Override
public State call()
{
sleep( 1000L );
State state = streamThread.getState();
is.close();
return state;
}
} );
Thread assertionThread = new Thread( futureTask );
assertionThread.start();
assertThat( is.readNextCommand(), is( nullValue() ) );
State state = futureTask.get();
assertThat( state, is( State.WAITING ) );
}
@Test
public void finishedTestsetShouldNotBlock()
throws IOException
{
Queue<String> commands = new ArrayDeque<>();
final TestProvidingInputStream is = new TestProvidingInputStream( commands );
is.testSetFinished();
new Thread( new Runnable()
{
@Override
public void run()
{
is.provideNewTest();
}
} ).start();
Command cmd = is.readNextCommand();
assertThat( cmd.getData(), is( nullValue() ) );
String stream = new String( StreamFeeder.encode( cmd.getCommandType() ), US_ASCII );
cmd = is.readNextCommand();
assertThat( cmd.getData(), is( nullValue() ) );
stream += new String( StreamFeeder.encode( cmd.getCommandType() ), US_ASCII );
assertThat( stream,
is( ":maven-surefire-command:testset-finished::maven-surefire-command:testset-finished:" ) );
boolean emptyStream = isInputStreamEmpty( is );
is.close();
assertTrue( emptyStream );
assertThat( is.readNextCommand(), is( nullValue() ) );
}
@Test
public void shouldReadTest()
throws IOException
{
Queue<String> commands = new ArrayDeque<>();
commands.add( "Test" );
final TestProvidingInputStream is = new TestProvidingInputStream( commands );
new Thread( new Runnable()
{
@Override
public void run()
{
is.provideNewTest();
}
} ).start();
Command cmd = is.readNextCommand();
assertThat( cmd.getData(), is( "Test" ) );
is.close();
}
@Test
public void shouldDecodeTwoCommands()
throws IOException
{
final TestProvidingInputStream pluginIs = new TestProvidingInputStream( new ConcurrentLinkedQueue<String>() );
InputStream is = new InputStream()
{
private byte[] buffer;
private int idx;
@Override
public int read() throws IOException
{
if ( buffer == null )
{
idx = 0;
Command cmd = pluginIs.readNextCommand();
buffer = cmd == null ? null : StreamFeeder.encode( cmd.getCommandType() );
}
if ( buffer != null )
{
byte b = buffer[idx++];
if ( idx == buffer.length )
{
buffer = null;
idx = 0;
}
return b;
}
throw new IOException();
}
};
MasterProcessChannelDecoder decoder = new LegacyMasterProcessChannelDecoder( newChannel( is ) );
pluginIs.acknowledgeByeEventReceived();
pluginIs.noop();
Command bye = decoder.decode();
assertThat( bye, is( notNullValue() ) );
assertThat( bye.getCommandType(), is( BYE_ACK ) );
Command noop = decoder.decode();
assertThat( noop, is( notNullValue() ) );
assertThat( noop.getCommandType(), is( NOOP ) );
}
private static void sleep( long millis )
{
try
{
TimeUnit.MILLISECONDS.sleep( millis );
}
catch ( InterruptedException e )
{
// do nothing
}
}
/**
* Waiting (max of 20 seconds)
* @param is examined stream
* @return {@code true} if the {@link InputStream#read()} is waiting for a new byte.
*/
private static boolean isInputStreamEmpty( final TestProvidingInputStream is )
{
Thread t = new Thread( new Runnable()
{
@Override
public void run()
{
try
{
is.readNextCommand();
}
catch ( IOException e )
{
Throwable cause = e.getCause();
Throwable err = cause == null ? e : cause;
if ( !( err instanceof InterruptedException ) )
{
System.err.println( err.toString() );
}
}
}
} );
t.start();
State state;
int loops = 0;
do
{
sleep( 100L );
state = t.getState();
}
while ( state == State.NEW && loops++ < WAIT_LOOPS );
t.interrupt();
return state == State.WAITING || state == State.TIMED_WAITING;
}
}