| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.commons.io.input; |
| |
| import static org.apache.commons.io.IOUtils.EOF; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import org.apache.commons.io.IOUtils; |
| |
| /** |
| * The {@link ObservableInputStream} allows, that an InputStream may be consumed by other receivers, apart from the |
| * thread, which is reading it. The other consumers are implemented as instances of {@link Observer}. A typical |
| * application may be the generation of a {@link java.security.MessageDigest} on the fly. {@code Note}: The |
| * {@link ObservableInputStream} is <em>not</em> thread safe, as instances of InputStream usually aren't. If you must |
| * access the stream from multiple threads, then synchronization, locking, or a similar means must be used. |
| * |
| * @see MessageDigestCalculatingInputStream |
| */ |
| public class ObservableInputStream extends ProxyInputStream { |
| |
| /** |
| * Abstracts observer callback for {@code ObservableInputStream}s. |
| */ |
| public static abstract class Observer { |
| |
| /** |
| * Called to indicate that the {@link ObservableInputStream} has been closed. |
| * |
| * @throws IOException if an I/O error occurs. |
| */ |
| @SuppressWarnings("unused") // Possibly thrown from subclasses. |
| public void closed() throws IOException { |
| // noop |
| } |
| |
| /** |
| * Called to indicate that {@link InputStream#read(byte[])}, or {@link InputStream#read(byte[], int, int)} have |
| * been called, and are about to invoke data. |
| * |
| * @param buffer The byte array, which has been passed to the read call, and where data has been stored. |
| * @param offset The offset within the byte array, where data has been stored. |
| * @param length The number of bytes, which have been stored in the byte array. |
| * @throws IOException if an I/O error occurs. |
| */ |
| @SuppressWarnings("unused") // Possibly thrown from subclasses. |
| public void data(final byte[] buffer, final int offset, final int length) throws IOException { |
| // noop |
| } |
| |
| /** |
| * Called to indicate, that {@link InputStream#read()} has been invoked on the {@link ObservableInputStream}, |
| * and will return a value. |
| * |
| * @param value The value, which is being returned. This will never be -1 (EOF), because, in that case, |
| * {@link #finished()} will be invoked instead. |
| * @throws IOException if an I/O error occurs. |
| */ |
| @SuppressWarnings("unused") // Possibly thrown from subclasses. |
| public void data(final int value) throws IOException { |
| // noop |
| } |
| |
| /** |
| * Called to indicate that an error occurred on the underlying stream. |
| * |
| * @param exception the exception to throw |
| * @throws IOException if an I/O error occurs. |
| */ |
| public void error(final IOException exception) throws IOException { |
| throw exception; |
| } |
| |
| /** |
| * Called to indicate that EOF has been seen on the underlying stream. This method may be called multiple times, |
| * if the reader keeps invoking either of the read methods, and they will consequently keep returning EOF. |
| * |
| * @throws IOException if an I/O error occurs. |
| */ |
| @SuppressWarnings("unused") // Possibly thrown from subclasses. |
| public void finished() throws IOException { |
| // noop |
| } |
| } |
| |
| private final List<Observer> observers; |
| |
| /** |
| * Creates a new ObservableInputStream for the given InputStream. |
| * |
| * @param inputStream the input stream to proxy. |
| */ |
| public ObservableInputStream(final InputStream inputStream) { |
| this(inputStream, new ArrayList<>()); |
| } |
| |
| /** |
| * Creates a new ObservableInputStream for the given InputStream. |
| * |
| * @param inputStream the input stream to proxy. |
| * @param observers List of callbacks. |
| */ |
| private ObservableInputStream(final InputStream inputStream, final List<Observer> observers) { |
| super(inputStream); |
| this.observers = observers; |
| } |
| |
| /** |
| * Creates a new ObservableInputStream for the given InputStream. |
| * |
| * @param inputStream the input stream to proxy. |
| * @param observers List of callbacks. |
| * @since 2.9.0 |
| */ |
| public ObservableInputStream(final InputStream inputStream, final Observer... observers) { |
| this(inputStream, Arrays.asList(observers)); |
| } |
| |
| /** |
| * Adds an Observer. |
| * |
| * @param observer the observer to add |
| */ |
| public void add(final Observer observer) { |
| observers.add(observer); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| IOException ioe = null; |
| try { |
| super.close(); |
| } catch (final IOException e) { |
| ioe = e; |
| } |
| if (ioe == null) { |
| noteClosed(); |
| } else { |
| noteError(ioe); |
| } |
| } |
| |
| /** |
| * Reads all data from the underlying {@link InputStream}, while notifying the observers. |
| * |
| * @throws IOException The underlying {@link InputStream}, or either of the observers has thrown an exception. |
| */ |
| public void consume() throws IOException { |
| final byte[] buffer = IOUtils.byteArray(); |
| while (read(buffer) != EOF) { |
| // empty |
| } |
| } |
| |
| /** |
| * Gets all currently registered observers. |
| * |
| * @return a list of the currently registered observers |
| * @since 2.9.0 |
| */ |
| public List<Observer> getObservers() { |
| return observers; |
| } |
| |
| /** |
| * Notifies the observers by invoking {@link Observer#finished()}. |
| * |
| * @throws IOException Some observer has thrown an exception, which is being passed down. |
| */ |
| protected void noteClosed() throws IOException { |
| for (final Observer observer : getObservers()) { |
| observer.closed(); |
| } |
| } |
| |
| /** |
| * Notifies the observers by invoking {@link Observer#data(int)} with the given arguments. |
| * |
| * @param value Passed to the observers. |
| * @throws IOException Some observer has thrown an exception, which is being passed down. |
| */ |
| protected void noteDataByte(final int value) throws IOException { |
| for (final Observer observer : getObservers()) { |
| observer.data(value); |
| } |
| } |
| |
| /** |
| * Notifies the observers by invoking {@link Observer#data(byte[],int,int)} with the given arguments. |
| * |
| * @param buffer Passed to the observers. |
| * @param offset Passed to the observers. |
| * @param length Passed to the observers. |
| * @throws IOException Some observer has thrown an exception, which is being passed down. |
| */ |
| protected void noteDataBytes(final byte[] buffer, final int offset, final int length) throws IOException { |
| for (final Observer observer : getObservers()) { |
| observer.data(buffer, offset, length); |
| } |
| } |
| |
| /** |
| * Notifies the observers by invoking {@link Observer#error(IOException)} with the given argument. |
| * |
| * @param exception Passed to the observers. |
| * @throws IOException Some observer has thrown an exception, which is being passed down. This may be the same |
| * exception, which has been passed as an argument. |
| */ |
| protected void noteError(final IOException exception) throws IOException { |
| for (final Observer observer : getObservers()) { |
| observer.error(exception); |
| } |
| } |
| |
| /** |
| * Notifies the observers by invoking {@link Observer#finished()}. |
| * |
| * @throws IOException Some observer has thrown an exception, which is being passed down. |
| */ |
| protected void noteFinished() throws IOException { |
| for (final Observer observer : getObservers()) { |
| observer.finished(); |
| } |
| } |
| |
| private void notify(final byte[] buffer, final int offset, final int result, final IOException ioe) throws IOException { |
| if (ioe != null) { |
| noteError(ioe); |
| throw ioe; |
| } |
| if (result == EOF) { |
| noteFinished(); |
| } else if (result > 0) { |
| noteDataBytes(buffer, offset, result); |
| } |
| } |
| |
| @Override |
| public int read() throws IOException { |
| int result = 0; |
| IOException ioe = null; |
| try { |
| result = super.read(); |
| } catch (final IOException ex) { |
| ioe = ex; |
| } |
| if (ioe != null) { |
| noteError(ioe); |
| throw ioe; |
| } |
| if (result == EOF) { |
| noteFinished(); |
| } else { |
| noteDataByte(result); |
| } |
| return result; |
| } |
| |
| @Override |
| public int read(final byte[] buffer) throws IOException { |
| int result = 0; |
| IOException ioe = null; |
| try { |
| result = super.read(buffer); |
| } catch (final IOException ex) { |
| ioe = ex; |
| } |
| notify(buffer, 0, result, ioe); |
| return result; |
| } |
| |
| @Override |
| public int read(final byte[] buffer, final int offset, final int length) throws IOException { |
| int result = 0; |
| IOException ioe = null; |
| try { |
| result = super.read(buffer, offset, length); |
| } catch (final IOException ex) { |
| ioe = ex; |
| } |
| notify(buffer, offset, result, ioe); |
| return result; |
| } |
| |
| /** |
| * Removes an Observer. |
| * |
| * @param observer the observer to remove |
| */ |
| public void remove(final Observer observer) { |
| observers.remove(observer); |
| } |
| |
| /** |
| * Removes all Observers. |
| */ |
| public void removeAllObservers() { |
| observers.clear(); |
| } |
| |
| } |