blob: b17c5421ed10e3ad7c69124bcd8c423bfaa5be99 [file] [log] [blame]
* Copyright (c) 2010, Rickard Öberg. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;
import org.apache.zest.functional.Function;
import org.apache.zest.functional.Specification;
* Utility class for I/O transforms
public class Transforms
* Filter items in a transfer by applying the given Specification to each item.
* @param specification The Specification defining the items to not filter away.
* @param output The Output instance to receive to result.
* @param <T> The item type
* @param <Receiver2ThrowableType> Exception type that might be thrown by the Receiver.
* @return And Output encapsulation the filter operation.
public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filter( final Specification<? super T> specification,
final Output<T, Receiver2ThrowableType> output
return new Output<T, Receiver2ThrowableType>()
public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
output.receiveFrom( new Sender<T, SenderThrowableType>()
public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, SenderThrowableType
sender.sendTo( new Receiver<T, ReceiverThrowableType>()
public void receive( T item )
throws ReceiverThrowableType
if( specification.satisfiedBy( item ) )
receiver.receive( item );
} );
} );
* Map items in a transfer from one type to another by applying the given function.
* @param function The transformation function to apply to the streaming items.
* @param output The output to receive the transformed items.
* @param <From> The type of the incoming items.
* @param <To> The type of the transformed items.
* @param <Receiver2ThrowableType> The exception type that the Receiver might throw.
* @return An Output instance that encapsulates the map transformation.
public static <From, To, Receiver2ThrowableType extends Throwable> Output<From, Receiver2ThrowableType> map( final Function<? super From, ? extends To> function,
final Output<To, Receiver2ThrowableType> output
return new Output<From, Receiver2ThrowableType>()
public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends From, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
output.receiveFrom( new Sender<To, SenderThrowableType>()
public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super To, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, SenderThrowableType
sender.sendTo( new Receiver<From, ReceiverThrowableType>()
public void receive( From item )
throws ReceiverThrowableType
receiver.receive( item ) );
} );
} );
* Apply the given function to items in the transfer that match the given specification. Other items will pass
* through directly.
* @param specification The Specification defining which items should be transformed.
* @param function The transformation function.
* @param output The Output that will receive the resulting items.
* @param <T> The item type. Items can not be transformed to a new type.
* @param <Receiver2ThrowableType> The exception that the Receiver might throw.
* @return An Output instance that encapsulates the operation.
public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> filteredMap( final Specification<? super T> specification,
final Function<? super T, ? extends T> function,
final Output<T, Receiver2ThrowableType> output
return new Output<T, Receiver2ThrowableType>()
public <SenderThrowableType extends Throwable> void receiveFrom( final Sender<? extends T, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
output.receiveFrom( new Sender<T, SenderThrowableType>()
public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver )
throws ReceiverThrowableType, SenderThrowableType
sender.sendTo( new Receiver<T, ReceiverThrowableType>()
public void receive( T item )
throws ReceiverThrowableType
if( specification.satisfiedBy( item ) )
receiver.receive( item ) );
receiver.receive( item );
} );
} );
* Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on
* the sending side and a write-lock would be used on the receiving side. Inputs can use this as well to create a
* wrapper on the send side when transferTo is invoked.
* @param lock the lock to be used for transfers
* @param output output to be wrapped
* @param <T> The Item type
* @param <Receiver2ThrowableType> The Exception type that the Receiver might throw.
* @return Output wrapper that uses the given lock during transfers.
public static <T, Receiver2ThrowableType extends Throwable> Output<T, Receiver2ThrowableType> lock( final Lock lock,
final Output<T, Receiver2ThrowableType> output
return new Output<T, Receiver2ThrowableType>()
public <SenderThrowableType extends Throwable> void receiveFrom( Sender<? extends T, SenderThrowableType> sender )
throws Receiver2ThrowableType, SenderThrowableType
* Fix for this bug:
while( true )
//noinspection StatementWithEmptyBody
while( !lock.tryLock( 1000, TimeUnit.MILLISECONDS ) )
// On timeout, try again
break; // Finally got a lock
catch( InterruptedException e )
// Try again
output.receiveFrom( sender );
* Wrapper for Outputs that uses a lock whenever a transfer is instantiated. Typically a read-lock would be used on the sending side and a write-lock
* would be used on the receiving side.
* @param lock the lock to be used for transfers
* @param input input to be wrapped
* @param <T> The item type.
* @param <SenderThrowableType> The Exception type that the Sender might throw.
* @return Input wrapper that uses the given lock during transfers.
public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> lock( final Lock lock,
final Input<T, SenderThrowableType> input
return new Input<T, SenderThrowableType>()
public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output )
throws SenderThrowableType, ReceiverThrowableType
* Fix for this bug:
while( true )
//noinspection StatementWithEmptyBody
while( !( lock.tryLock() || lock.tryLock( 1000, TimeUnit.MILLISECONDS ) ) )
// On timeout, try again
break; // Finally got a lock
catch( InterruptedException e )
// Try again
input.transferTo( output );
* Count the number of items in the transfer.
* @param <T>
// START SNIPPET: counter
public static class Counter<T>
implements Function<T, T>
private volatile long count = 0;
public long count()
return count;
public T map( T t )
return t;
// END SNIPPET: counter
* Convert strings to bytes using the given CharSet
@SuppressWarnings( "UnusedDeclaration" )
public static class String2Bytes
implements Function<String, byte[]>
private Charset charSet;
public String2Bytes( Charset charSet )
this.charSet = charSet;
public byte[] map( String s )
return s.getBytes( charSet );
* Convert ByteBuffers to Strings using the given CharSet
public static class ByteBuffer2String
implements Function<ByteBuffer, String>
private Charset charSet;
public ByteBuffer2String( Charset charSet )
this.charSet = charSet;
public String map( ByteBuffer buffer )
return new String( buffer.array(), charSet );
* Convert objects to Strings using .toString()
@SuppressWarnings( "UnusedDeclaration" )
public static class ObjectToString
implements Function<Object, String>
public String map( Object o )
return o.toString();
* Log the toString() representation of transferred items to the given log. The string is first formatted using MessageFormat
* with the given format.
* @param <T>
public static class Log<T>
implements Function<T, T>
private Logger logger;
private MessageFormat format;
public Log( Logger logger, String format )
this.logger = logger;
this.format = new MessageFormat( format );
public T map( T item )
{ format.format( new String[]{ item.toString() } ) );
return item;
* Track progress of transfer by emitting a log message in given intervals.
* If logger or format is null, then you need to override the logProgress to do something
* @param <T> type of items to be transferred
// START SNIPPET: progress
public static class ProgressLog<T>
implements Function<T, T>
private Counter<T> counter;
private Log<String> log;
private final long interval;
public ProgressLog( Logger logger, String format, long interval )
this.interval = interval;
if( logger != null && format != null )
log = new Log<>( logger, format );
counter = new Counter<>();
public ProgressLog( long interval )
this.interval = interval;
counter = new Counter<>();
public T map( T t )
{ t );
if( counter.count % interval == 0 )
return t;
// Override this to do something other than logging the progress
protected void logProgress()
if( log != null )
{ counter.count + "" );
// END SNIPPET: progress