blob: a5d004035ca29b9ad801fb9995f7f70309e66d99 [file] [log] [blame]
/*
* Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.qi4j.io;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;
import org.qi4j.functional.Function;
import org.qi4j.functional.Specification;
/**
* Utility class for I/O transforms
*/
public class Transforms
{
/**
* Filter items in a transfer by applying the given Specification to each item.
*
* @param specification The Specification defining the items to not filter away.
* @param output The Output instance to receive to result.
* @param <T> The item type
* @param <Receiver2ThrowableType> Exception type that might be thrown by the Receiver.
*
* @return And Output encapsulation the filter operation.
*/
public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filter( final Specification<? super T> specification,
final Output<T, Receiver2ThrowableType> output
)
{
return new Output<T, Receiver2ThrowableType>()
{
@Override
public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
{
output.receiveFrom( new Sender<T, SenderThrowableType>()
{
@Override
public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, SenderThrowableType
{
sender.sendTo( new Receiver<T, ReceiverThrowableType>()
{
@Override
public void receive( T item )
throws ReceiverThrowableType
{
if( specification.satisfiedBy( item ) )
{
receiver.receive( item );
}
}
} );
}
} );
}
};
}
/**
* Map items in a transfer from one type to another by applying the given function.
*
* @param function The transformation function to apply to the streaming items.
* @param output The output to receive the transformed items.
* @param <From> The type of the incoming items.
* @param <To> The type of the transformed items.
* @param <Receiver2ThrowableType> The exception type that the Receiver might throw.
*
* @return An Output instance that encapsulates the map transformation.
*/
public static <From, To, Receiver2ThrowableType extends Throwable> Output<From, Receiver2ThrowableType> map( final Function<? super From, ? extends To> function,
final Output<To, Receiver2ThrowableType> output
)
{
return new Output<From, Receiver2ThrowableType>()
{
@Override
public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends From, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
{
output.receiveFrom( new Sender<To, SenderThrowableType>()
{
@Override
public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super To, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, SenderThrowableType
{
sender.sendTo( new Receiver<From, ReceiverThrowableType>()
{
@Override
public void receive( From item )
throws ReceiverThrowableType
{
receiver.receive( function.map( item ) );
}
} );
}
} );
}
};
}
/**
* Apply the given function to items in the transfer that match the given specification. Other items will pass
* through directly.
*
* @param specification The Specification defining which items should be transformed.
* @param function The transformation function.
* @param output The Output that will receive the resulting items.
* @param <T> The item type. Items can not be transformed to a new type.
* @param <Receiver2ThrowableType> The exception that the Receiver might throw.
*
* @return An Output instance that encapsulates the operation.
*/
public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filteredMap( final Specification<? super T> specification,
final Function<? super T, ? extends T> function,
final Output<T, Receiver2ThrowableType> output
)
{
return new Output<T, Receiver2ThrowableType>()
{
@Override
public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
{
output.receiveFrom( new Sender<T, SenderThrowableType>()
{
@Override
public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, SenderThrowableType
{
sender.sendTo( new Receiver<T, ReceiverThrowableType>()
{
@Override
public void receive( T item )
throws ReceiverThrowableType
{
if( specification.satisfiedBy( item ) )
{
receiver.receive( function.map( item ) );
}
else
{
receiver.receive( item );
}
}
} );
}
} );
}
};
}
/**
* Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on
* the sending side and a write-lock would be used on the receiving side. Inputs can use this as well to create a
* wrapper on the send side when transferTo is invoked.
*
* @param lock the lock to be used for transfers
* @param output output to be wrapped
* @param <T> The Item type
* @param <Receiver2ThrowableType> The Exception type that the Receiver might throw.
*
* @return Output wrapper that uses the given lock during transfers.
*/
public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> lock( final Lock lock,
final Output<T, Receiver2ThrowableType> output
)
{
return new Output<T, Receiver2ThrowableType>()
{
@Override
public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
{
/**
* Fix for this bug:
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370
*/
while( true )
{
try
{
//noinspection StatementWithEmptyBody
while( !lock.tryLock( 1000, TimeUnit.MILLISECONDS ) )
{
// On timeout, try again
}
break; // Finally got a lock
}
catch( InterruptedException e )
{
// Try again
}
}
try
{
output.receiveFrom( sender );
}
finally
{
lock.unlock();
}
}
};
}
/**
* Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on the sending side and a write-lock
* would be used on the receiving side.
*
* @param lock the lock to be used for transfers
* @param input input to be wrapped
* @param <T> The item type.
* @param <SenderThrowableType> The Exception type that the Sender might throw.
*
* @return Input wrapper that uses the given lock during transfers.
*/
public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> lock( final Lock lock,
final Input<T, SenderThrowableType> input
)
{
return new Input<T, SenderThrowableType>()
{
@Override
public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output )
throws SenderThrowableType, ReceiverThrowableType
{
/**
* Fix for this bug:
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6822370
*/
while( true )
{
try
{
//noinspection StatementWithEmptyBody
while( !( lock.tryLock() || lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) )
{
// On timeout, try again
}
break; // Finally got a lock
}
catch( InterruptedException e )
{
// Try again
}
}
try
{
input.transferTo( output );
}
finally
{
lock.unlock();
}
}
};
}
/**
* Count the number of items in the transfer.
*
* @param <T>
*/
// START SNIPPET: counter
public static class Counter<T>
implements Function<T, T>
{
private volatile long count = 0;
public long count()
{
return count;
}
@Override
public T map( T t )
{
count++;
return t;
}
}
// END SNIPPET: counter
/**
* Convert strings to bytes using the given CharSet
*/
@SuppressWarnings( "UnusedDeclaration" )
public static class String2Bytes
implements Function<String, byte[]>
{
private Charset charSet;
public String2Bytes( Charset charSet )
{
this.charSet = charSet;
}
@Override
public byte[] map( String s )
{
return s.getBytes( charSet );
}
}
/**
* Convert ByteBuffers to Strings using the given CharSet
*/
public static class ByteBuffer2String
implements Function<ByteBuffer, String>
{
private Charset charSet;
public ByteBuffer2String( Charset charSet )
{
this.charSet = charSet;
}
@Override
public String map( ByteBuffer buffer )
{
return new String( buffer.array(), charSet );
}
}
/**
* Convert objects to Strings using .toString()
*/
@SuppressWarnings( "UnusedDeclaration" )
public static class ObjectToString
implements Function<Object, String>
{
@Override
public String map( Object o )
{
return o.toString();
}
}
/**
* Log the toString() representation of transferred items to the given log. The string is first formatted using MessageFormat
* with the given format.
*
* @param <T>
*/
public static class Log<T>
implements Function<T, T>
{
private Logger logger;
private MessageFormat format;
public Log( Logger logger, String format )
{
this.logger = logger;
this.format = new MessageFormat( format );
}
@Override
public T map( T item )
{
logger.info( format.format( new String[]{ item.toString() } ) );
return item;
}
}
/**
* Track progress of transfer by emitting a log message in given intervals.
*
* If logger or format is null, then you need to override the logProgress to do something
*
* @param <T> type of items to be transferred
*/
// START SNIPPET: progress
public static class ProgressLog<T>
implements Function<T, T>
{
private Counter<T> counter;
private Log<String> log;
private final long interval;
public ProgressLog( Logger logger, String format, long interval )
{
this.interval = interval;
if( logger != null && format != null )
{
log = new Log<>( logger, format );
}
counter = new Counter<>();
}
public ProgressLog( long interval )
{
this.interval = interval;
counter = new Counter<>();
}
@Override
public T map( T t )
{
counter.map( t );
if( counter.count % interval == 0 )
{
logProgress();
}
return t;
}
// Override this to do something other than logging the progress
protected void logProgress()
{
if( log != null )
{
log.map( counter.count + "" );
}
}
}
// END SNIPPET: progress
}