blob: 3938c4af0c154f66e5b67e59bf6b387f86805015 [file] [log] [blame]
package org.apache.maven.surefire.api.util.internal;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousByteChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;
/**
* Converts {@link OutputStream}, {@link java.io.PrintStream}, {@link InputStream} to the Java {@link
* java.nio.channels.Channel}.
* <br>
* We do not use the Java's utility class {@link java.nio.channels.Channels} because the utility closes the stream as
* soon as the particular Thread is interrupted. If the frameworks (Zookeeper, Netty) interrupts the thread, the
* communication channels become closed and the JVM hangs. Therefore we developed internal utility which is safe for the
* Surefire.
*
* @since 3.0.0-M5
*/
public final class Channels
{
private static final int BUFFER_SIZE = 64 * 1024;
private Channels()
{
throw new IllegalStateException( "no instantiable constructor" );
}
public static WritableByteChannel newChannel( @Nonnull OutputStream out )
{
return newChannel( out, 0 );
}
public static WritableBufferedByteChannel newBufferedChannel( @Nonnull OutputStream out )
{
return newChannel( out, BUFFER_SIZE );
}
public static ReadableByteChannel newChannel( @Nonnull final InputStream is )
{
return newChannel( is, 0 );
}
public static ReadableByteChannel newBufferedChannel( @Nonnull final InputStream is )
{
return newChannel( is, BUFFER_SIZE );
}
public static OutputStream newOutputStream( final AsynchronousByteChannel channel )
{
return new OutputStream()
{
@Override
public synchronized void write( byte[] b, int off, int len ) throws IOException
{
if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
{
throw new IndexOutOfBoundsException(
"b.length = " + b.length + ", off = " + off + ", len = " + len );
}
else if ( len > 0 )
{
ByteBuffer bb = ByteBuffer.wrap( b, off, len );
while ( bb.hasRemaining() )
{
try
{
channel.write( bb ).get();
}
catch ( ExecutionException e )
{
Throwable t = e.getCause();
throw t instanceof IOException
? (IOException) t
: new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
}
catch ( Exception e )
{
throw new IOException( e.getLocalizedMessage(), e );
}
}
}
}
@Override
public void write( int b ) throws IOException
{
write( new byte[] {(byte) b} );
}
@Override
public synchronized void close() throws IOException
{
if ( channel.isOpen() )
{
try
{
channel.close();
}
catch ( ClosedChannelException e )
{
// closed channel anyway
}
}
}
};
}
public static InputStream newInputStream( final AsynchronousByteChannel channel )
{
return new InputStream()
{
@Override
public synchronized int read( byte[] b, int off, int len ) throws IOException
{
if ( off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0 )
{
throw new IndexOutOfBoundsException(
"b.length = " + b.length + ", off = " + off + ", len = " + len );
}
else if ( len == 0 )
{
return 0;
}
ByteBuffer bb = ByteBuffer.wrap( b, off, len );
try
{
return channel.read( bb ).get();
}
catch ( ExecutionException e )
{
Throwable t = e.getCause();
throw t instanceof IOException
? (IOException) t
: new IOException( ( t == null ? e : t ).getLocalizedMessage(), t );
}
catch ( Exception e )
{
throw new IOException( e.getLocalizedMessage(), e );
}
}
@Override
public int read() throws IOException
{
int count;
byte[] b = new byte[1];
do
{
count = read( b, 0, 1 );
}
while ( count == 0 );
return count == -1 ? -1 : b[0];
}
@Override
public synchronized void close() throws IOException
{
if ( channel.isOpen() )
{
try
{
channel.close();
}
catch ( ClosedChannelException e )
{
// closed channel anyway
}
}
}
};
}
private static ReadableByteChannel newChannel( @Nonnull InputStream is, @Nonnegative int bufferSize )
{
requireNonNull( is, "the stream should not be null" );
final InputStream bis = bufferSize == 0 ? is : new BufferedInputStream( is, bufferSize );
return new AbstractNoninterruptibleReadableChannel()
{
@Override
protected int readImpl( ByteBuffer src ) throws IOException
{
int count = bis.read( src.array(), src.arrayOffset() + src.position(), src.remaining() );
if ( count > 0 )
{
src.position( count + src.position() );
}
return count;
}
@Override
protected void closeImpl() throws IOException
{
bis.close();
}
};
}
private static WritableBufferedByteChannel newChannel( @Nonnull OutputStream out,
@Nonnegative final int bufferSize )
{
requireNonNull( out, "the stream should not be null" );
final OutputStream bos = bufferSize == 0 ? out : new BufferedOutputStream( out, bufferSize );
return new AbstractNoninterruptibleWritableChannel()
{
private final AtomicLong bytesCounter = new AtomicLong();
@Override
public long countBufferOverflows()
{
return bufferSize == 0 ? 0 : max( bytesCounter.get() - 1, 0 ) / bufferSize;
}
@Override
protected void writeImpl( ByteBuffer src ) throws IOException
{
int count = src.remaining();
bos.write( src.array(), src.arrayOffset() + src.position(), count );
bytesCounter.getAndAdd( count );
}
@Override
protected void closeImpl() throws IOException
{
bos.close();
}
@Override
protected void flushImpl() throws IOException
{
bos.flush();
}
};
}
}