blob: 6c49a50c5882c5a7c8741e787d710d9b93548610 [file] [log] [blame]
package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.maven.surefire.api.booter.Command;
import org.apache.maven.surefire.api.booter.Shutdown;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.maven.surefire.api.booter.Command.BYE_ACK;
import static org.apache.maven.surefire.api.booter.Command.NOOP;
import static org.apache.maven.surefire.api.booter.Command.SKIP_SINCE_NEXT_TEST;
import static org.apache.maven.surefire.api.booter.Command.toShutdown;
/**
* Dispatches commands without tests.
*
* @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
* @since 2.19
*/
public final class TestLessInputStream
extends DefaultCommandReader
{
private final Semaphore barrier = new Semaphore( 0 );
private final AtomicBoolean closed = new AtomicBoolean();
private final Queue<Command> immediateCommands = new ConcurrentLinkedQueue<>();
private final TestLessInputStreamBuilder builder;
private Iterator<Command> cachableCommands;
private TestLessInputStream( TestLessInputStreamBuilder builder )
{
this.builder = builder;
}
@Override
public void provideNewTest()
{
}
@Override
public void skipSinceNextTest()
{
if ( canContinue() )
{
immediateCommands.add( SKIP_SINCE_NEXT_TEST );
barrier.release();
}
}
@Override
public void shutdown( Shutdown shutdownType )
{
if ( canContinue() )
{
immediateCommands.add( toShutdown( shutdownType ) );
barrier.release();
}
}
@Override
public void noop()
{
if ( canContinue() )
{
immediateCommands.add( NOOP );
barrier.release();
}
}
@Override
public void acknowledgeByeEventReceived()
{
if ( canContinue() )
{
immediateCommands.add( BYE_ACK );
barrier.release();
}
}
@Override
public boolean isClosed()
{
return closed.get();
}
@Override
protected Command nextCommand()
{
Command cmd = immediateCommands.poll();
if ( cmd == null )
{
if ( cachableCommands == null )
{
cachableCommands = builder.getIterableCachable().iterator();
}
cmd = cachableCommands.next();
}
return cmd;
}
@Override
protected void beforeNextCommand()
throws IOException
{
awaitNextCommand();
}
@Override
public void close()
{
if ( closed.compareAndSet( false, true ) )
{
barrier.drainPermits();
barrier.release();
}
}
/**
* For testing purposes only.
*
* @return permits used internally by {@link #beforeNextCommand()}
*/
int availablePermits()
{
return barrier.availablePermits();
}
private void awaitNextCommand()
throws IOException
{
try
{
barrier.acquire();
}
catch ( InterruptedException e )
{
throw new IOException( e.getLocalizedMessage() );
}
}
/**
* Builds {@link TestLessInputStream streams}, registers cachable commands
* and provides accessible API to dispatch immediate commands to all atomically
* alive streams.
*/
public static final class TestLessInputStreamBuilder
{
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Queue<TestLessInputStream> aliveStreams = new ConcurrentLinkedQueue<>();
private final ImmediateCommands immediateCommands = new ImmediateCommands();
private final CachableCommands cachableCommands = new CachableCommands();
private final Node head = new Node( null );
private final Iterable<Command> iterableCachable;
public TestLessInputStreamBuilder()
{
iterableCachable = new Iterable<Command>()
{
@Override
public Iterator<Command> iterator()
{
return new CIt();
}
};
}
public TestLessInputStream build()
{
Lock lock = rwLock.writeLock();
lock.lock();
try
{
TestLessInputStream is = new TestLessInputStream( this );
aliveStreams.offer( is );
return is;
}
finally
{
lock.unlock();
}
}
public void removeStream( TestLessInputStream is )
{
Lock lock = rwLock.writeLock();
lock.lock();
try
{
aliveStreams.remove( is );
}
finally
{
lock.unlock();
}
}
/**
* Only {@link NotifiableTestStream#noop()} and {@link NotifiableTestStream#shutdown(Shutdown)} are supported.
* Another methods throw {@link UnsupportedOperationException}.
*
* @return commands which are immediately transmitted once to all alive forked JVMs, not cached. As opposite
* to cached commands, the immediate commands disappear and cannot be seen by any fork initiated after
* the command has dispatched.
*/
public NotifiableTestStream getImmediateCommands()
{
return immediateCommands;
}
/**
* Cached commands are sent to all alive or future alive forks. These are termination commands which are not
* reversible and therefore only {@link NotifiableTestStream#shutdown(Shutdown)} and
* {@link NotifiableTestStream#skipSinceNextTest()} are supported.
* Another methods throw {@link UnsupportedOperationException}.
*
* @return commands which are cached for currently alive or future forks.
*/
public NotifiableTestStream getCachableCommands()
{
return cachableCommands;
}
/**
* The iterator is not thread safe.
*/
Iterable<Command> getIterableCachable()
{
return iterableCachable;
}
@SuppressWarnings( "checkstyle:innerassignment" )
private boolean addTailNodeIfAbsent( Command command )
{
Node newTail = new Node( command );
Node currentTail = head;
do
{
for ( Node successor; ( successor = currentTail.next.get() ) != null; )
{
currentTail = successor;
if ( command.equals( currentTail.command ) )
{
return false;
}
}
} while ( !currentTail.next.compareAndSet( null, newTail ) );
return true;
}
private static Node nextCachedNode( Node current )
{
return current.next.get();
}
private final class CIt
implements Iterator<Command>
{
private Node node = TestLessInputStreamBuilder.this.head;
@Override
public boolean hasNext()
{
return examineNext( false ) != null;
}
@Override
public Command next()
{
Command command = examineNext( true );
if ( command == null )
{
throw new NoSuchElementException();
}
return command;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
private Command examineNext( boolean store )
{
Node next = nextCachedNode( node );
if ( store && next != null )
{
node = next;
}
return next == null ? null : next.command;
}
}
/**
* Event is called just now for all alive streams and command is not persisted.
*/
private final class ImmediateCommands
implements NotifiableTestStream
{
@Override
public void provideNewTest()
{
throw new UnsupportedOperationException();
}
@Override
public void skipSinceNextTest()
{
throw new UnsupportedOperationException();
}
@Override
public void shutdown( Shutdown shutdownType )
{
Lock lock = rwLock.readLock();
lock.lock();
try
{
for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
{
aliveStream.shutdown( shutdownType );
}
}
finally
{
lock.unlock();
}
}
@Override
public void noop()
{
Lock lock = rwLock.readLock();
lock.lock();
try
{
for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
{
aliveStream.noop();
}
}
finally
{
lock.unlock();
}
}
@Override
public void acknowledgeByeEventReceived()
{
throw new UnsupportedOperationException();
}
}
/**
* Event is persisted.
*/
private final class CachableCommands
implements NotifiableTestStream
{
@Override
public void provideNewTest()
{
throw new UnsupportedOperationException();
}
@Override
public void skipSinceNextTest()
{
Lock lock = rwLock.readLock();
lock.lock();
try
{
if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( SKIP_SINCE_NEXT_TEST ) )
{
release();
}
}
finally
{
lock.unlock();
}
}
@Override
public void shutdown( Shutdown shutdownType )
{
Lock lock = rwLock.readLock();
lock.lock();
try
{
if ( TestLessInputStreamBuilder.this.addTailNodeIfAbsent( toShutdown( shutdownType ) ) )
{
release();
}
}
finally
{
lock.unlock();
}
}
@Override
public void noop()
{
throw new UnsupportedOperationException();
}
@Override
public void acknowledgeByeEventReceived()
{
throw new UnsupportedOperationException();
}
private void release()
{
for ( TestLessInputStream aliveStream : TestLessInputStreamBuilder.this.aliveStreams )
{
aliveStream.barrier.release();
}
}
}
private static class Node
{
private final AtomicReference<Node> next = new AtomicReference<>();
private final Command command;
Node( Command command )
{
this.command = command;
}
}
}
}