| /* |
| * Copyright (c) 2010, Rickard Öberg. All Rights Reserved. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| package org.qi4j.io; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.Reader; |
| import java.net.URL; |
| import java.net.URLConnection; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.FileChannel; |
| import java.util.Scanner; |
| import java.util.zip.GZIPInputStream; |
| import org.qi4j.functional.Visitor; |
| |
| /** |
| * Common inputs |
| */ |
| public class Inputs |
| { |
| // START SNIPPET: method |
| |
| /** |
| * Read lines from a String. |
| * |
| * @param source lines |
| * |
| * @return Input that provides lines from the string as strings |
| */ |
| public static Input<String, RuntimeException> text( final String source ) |
| // END SNIPPET: method |
| { |
| return new Input<String, RuntimeException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) |
| throws RuntimeException, ReceiverThrowableType |
| { |
| |
| output.receiveFrom( new Sender<String, RuntimeException>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, RuntimeException |
| { |
| Scanner scanner = new Scanner( source ); |
| while( scanner.hasNextLine() ) |
| { |
| receiver.receive( scanner.nextLine() ); |
| } |
| } |
| } ); |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Read lines from a Reader. |
| * |
| * @param source lines |
| * |
| * @return Input that provides lines from the string as strings |
| */ |
| public static Input<String, RuntimeException> text( final Reader source ) |
| // END SNIPPET: method |
| { |
| return new Input<String, RuntimeException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) |
| throws RuntimeException, ReceiverThrowableType |
| { |
| |
| output.receiveFrom( new Sender<String, RuntimeException>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, RuntimeException |
| { |
| Scanner scanner = new Scanner( source ); |
| while( scanner.hasNextLine() ) |
| { |
| receiver.receive( scanner.nextLine() ); |
| } |
| } |
| } ); |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Read lines from a UTF-8 encoded textfile. |
| * |
| * If the filename ends with .gz, then the data is automatically unzipped when read. |
| * |
| * @param source textfile with lines separated by \n character |
| * |
| * @return Input that provides lines from the textfiles as strings |
| */ |
| public static Input<String, IOException> text( final File source ) |
| // END SNIPPET: method |
| { |
| return text( source, "UTF-8" ); |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Read lines from a textfile with the given encoding. |
| * |
| * If the filename ends with .gz, then the data is automatically unzipped when read. |
| * |
| * @param source textfile with lines separated by \n character |
| * @param encoding encoding of file, e.g. "UTF-8" |
| * |
| * @return Input that provides lines from the textfiles as strings |
| */ |
| public static Input<String, IOException> text( final File source, final String encoding ) |
| // END SNIPPET: method |
| { |
| return new Input<String, IOException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) |
| throws IOException, ReceiverThrowableType |
| { |
| InputStream stream = new FileInputStream( source ); |
| |
| // If file is gzipped, unzip it automatically |
| if( source.getName().endsWith( ".gz" ) ) |
| { |
| stream = new GZIPInputStream( stream ); |
| } |
| |
| try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, encoding ) )) |
| { |
| output.receiveFrom( new Sender<String, IOException>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, IOException |
| { |
| String line; |
| while( ( line = reader.readLine() ) != null ) |
| { |
| receiver.receive( line ); |
| } |
| } |
| } ); |
| } |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Read lines from a textfile at a given URL. |
| * |
| * If the content support gzip encoding, then the data is automatically unzipped when read. |
| * |
| * The charset in the content-type of the URL will be used for parsing. Default is UTF-8. |
| * |
| * @param source textfile with lines separated by \n character |
| * |
| * @return Input that provides lines from the textfiles as strings |
| */ |
| public static Input<String, IOException> text( final URL source ) |
| // END SNIPPET: method |
| { |
| return new Input<String, IOException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super String, ReceiverThrowableType> output ) |
| throws IOException, ReceiverThrowableType |
| { |
| URLConnection urlConnection = source.openConnection(); |
| urlConnection.setRequestProperty( "Accept-Encoding", "gzip" ); |
| InputStream stream = urlConnection.getInputStream(); |
| |
| // If file is gzipped, unzip it automatically |
| if( "gzip".equals( urlConnection.getContentEncoding() ) ) |
| { |
| stream = new GZIPInputStream( stream ); |
| } |
| |
| // Figure out charset given content-type |
| String contentType = urlConnection.getContentType(); |
| String charSet = "UTF-8"; |
| if( contentType.contains( "charset=" ) ) |
| { |
| charSet = contentType.substring( contentType.indexOf( "charset=" ) + "charset=".length() ); |
| } |
| |
| try (BufferedReader reader = new BufferedReader( new InputStreamReader( stream, charSet ) )) |
| { |
| output.receiveFrom( new Sender<String, IOException>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super String, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, IOException |
| { |
| String line; |
| while( ( line = reader.readLine() ) != null ) |
| { |
| receiver.receive( line ); |
| } |
| } |
| } ); |
| } |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Read a file using ByteBuffer of a given size. Useful for transferring raw data. |
| * |
| * @param source The file to be read. |
| * @param bufferSize The size of the byte array. |
| * |
| * @return An Input instance to be applied to streaming operations. |
| */ |
| public static Input<ByteBuffer, IOException> byteBuffer( final File source, final int bufferSize ) |
| // END SNIPPET: method |
| { |
| return new Input<ByteBuffer, IOException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) |
| throws IOException, ReceiverThrowableType |
| { |
| final FileInputStream stream = new FileInputStream( source ); |
| final FileChannel fci = stream.getChannel(); |
| |
| final ByteBuffer buffer = ByteBuffer.allocate( bufferSize ); |
| |
| try |
| { |
| output.receiveFrom( new Sender<ByteBuffer, IOException>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, IOException |
| { |
| while( fci.read( buffer ) != -1 ) |
| { |
| buffer.flip(); |
| receiver.receive( buffer ); |
| buffer.clear(); |
| } |
| } |
| } ); |
| } |
| finally |
| { |
| stream.close(); |
| } |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Read an inputstream using ByteBuffer of a given size. |
| * |
| * @param source The InputStream to be read. |
| * @param bufferSize The size of the byte array. |
| * |
| * @return An Input instance to be applied to streaming operations. |
| */ |
| public static Input<ByteBuffer, IOException> byteBuffer( final InputStream source, final int bufferSize ) |
| // END SNIPPET: method |
| { |
| return new Input<ByteBuffer, IOException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) |
| throws IOException, ReceiverThrowableType |
| { |
| try |
| { |
| output.receiveFrom( new Sender<ByteBuffer, IOException>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, IOException |
| { |
| byte[] buffer = new byte[ bufferSize ]; |
| |
| int len; |
| while( ( len = source.read( buffer ) ) != -1 ) |
| { |
| ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, 0, len ); |
| receiver.receive( byteBuffer ); |
| } |
| } |
| } ); |
| } |
| finally |
| { |
| source.close(); |
| } |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Combine many Input into one single Input. When a transfer is initiated from it all items from all inputs will be transferred |
| * to the given Output. |
| * |
| * @param inputs An Iterable of Input instances to be combined. |
| * @param <T> The item type of the Input |
| * @param <SenderThrowableType> The Throwable that might be thrown by the Inputs. |
| * |
| * @return A combined Input, allowing for easy aggregation of many Input sources. |
| */ |
| public static <T, SenderThrowableType extends Throwable> Input<T, SenderThrowableType> combine( final Iterable<Input<T, SenderThrowableType>> inputs ) |
| // END SNIPPET: method |
| { |
| return new Input<T, SenderThrowableType>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void transferTo( Output<? super T, Receiver2ThrowableType> output ) |
| throws SenderThrowableType, Receiver2ThrowableType |
| { |
| output.receiveFrom( new Sender<T, SenderThrowableType>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void sendTo( final Receiver<? super T, ReceiverThrowableType> receiver ) |
| throws ReceiverThrowableType, SenderThrowableType |
| { |
| for( Input<T, SenderThrowableType> input : inputs ) |
| { |
| input.transferTo( new Output<T, ReceiverThrowableType>() |
| { |
| @Override |
| public <Sender2ThrowableType extends Throwable> void receiveFrom( Sender<? extends T, Sender2ThrowableType> sender ) |
| throws ReceiverThrowableType, Sender2ThrowableType |
| { |
| sender.sendTo( new Receiver<T, ReceiverThrowableType>() |
| { |
| @Override |
| public void receive( T item ) |
| throws ReceiverThrowableType |
| { |
| receiver.receive( item ); |
| } |
| } ); |
| } |
| } ); |
| } |
| } |
| } ); |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Create an Input that takes its items from the given Iterable. |
| * |
| * @param iterable The Iterable to be used as an Input. |
| * @param <T> The item type of the Input |
| * |
| * @return An Input instance that is backed by the Iterable. |
| */ |
| public static <T> Input<T, RuntimeException> iterable( final Iterable<T> iterable ) |
| // END SNIPPET: method |
| { |
| return new Input<T, RuntimeException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super T, ReceiverThrowableType> output ) |
| throws RuntimeException, ReceiverThrowableType |
| { |
| output.receiveFrom( new Sender<T, RuntimeException>() |
| { |
| @Override |
| public <Receiver2ThrowableType extends Throwable> void sendTo( Receiver<? super T, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, RuntimeException |
| { |
| for( T item : iterable ) |
| { |
| receiver.receive( item ); |
| } |
| } |
| } ); |
| } |
| }; |
| } |
| |
| // START SNIPPET: method |
| |
| /** |
| * Create an Input that allows a Visitor to write to an OutputStream. The stream is a BufferedOutputStream, so when enough |
| * data has been gathered it will send this in chunks of the given size to the Output it is transferred to. The Visitor does not have to call |
| * close() on the OutputStream, but should ensure that any wrapper streams or writers are flushed so that all data is sent. |
| * |
| * @param outputVisitor The OutputStream Visitor that will be backing the Input ByteBuffer. |
| * @param bufferSize The buffering size. |
| * |
| * @return An Input instance of ByteBuffer, that is backed by an Visitor to an OutputStream. |
| */ |
| public static Input<ByteBuffer, IOException> output( final Visitor<OutputStream, IOException> outputVisitor, |
| final int bufferSize |
| ) |
| // END SNIPPET: method |
| { |
| return new Input<ByteBuffer, IOException>() |
| { |
| @Override |
| public <ReceiverThrowableType extends Throwable> void transferTo( Output<? super ByteBuffer, ReceiverThrowableType> output ) |
| throws IOException, ReceiverThrowableType |
| { |
| output.receiveFrom( new Sender<ByteBuffer, IOException>() |
| { |
| @Override |
| @SuppressWarnings( "unchecked" ) |
| public <Receiver2ThrowableType extends Throwable> void sendTo( final Receiver<? super ByteBuffer, Receiver2ThrowableType> receiver ) |
| throws Receiver2ThrowableType, IOException |
| { |
| try (OutputStream out = new BufferedOutputStream( new OutputStream() |
| { |
| @Override |
| public void write( int b ) |
| throws IOException |
| { |
| // Ignore |
| } |
| |
| @SuppressWarnings( "NullableProblems" ) |
| @Override |
| public void write( byte[] b, int off, int len ) |
| throws IOException |
| { |
| try |
| { |
| ByteBuffer byteBuffer = ByteBuffer.wrap( b, 0, len ); |
| receiver.receive( byteBuffer ); |
| } |
| catch( Throwable ex ) |
| { |
| throw new IOException( ex ); |
| } |
| } |
| }, bufferSize )) |
| { |
| outputVisitor.visit( out ); |
| } |
| catch( IOException ex ) |
| { |
| throw (Receiver2ThrowableType) ex.getCause(); |
| } |
| } |
| } ); |
| } |
| }; |
| } |
| |
| private Inputs() |
| { |
| } |
| } |