blob: c2aaf477ae92b0c00ce0817146a9a987ba7f463d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.io.input;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
/**
* The {@link ObservableInputStream} allows, that an InputStream may be consumed
* by other receivers, apart from the thread, which is reading it.
* The other consumers are implemented as instances of {@link Observer}. A
* typical application may be the generation of a {@link java.security.MessageDigest} on the
* fly.
* {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,
* as instances of InputStream usually aren't.
* If you must access the stream from multiple threads, then synchronization, locking,
* or a similar means must be used.
* @see MessageDigestCalculatingInputStream
*/
public class ObservableInputStream extends ProxyInputStream {
/**
* Abstracts observer callback for {@code ObservableInputStream}s.
*/
public static abstract class Observer {
/** Called to indicate, that {@link InputStream#read()} has been invoked
* on the {@link ObservableInputStream}, and will return a value.
* @param pByte The value, which is being returned. This will never be -1 (EOF),
* because, in that case, {@link #finished()} will be invoked instead.
* @throws IOException if an i/o-error occurs
*/
public void data(final int pByte) throws IOException {
// noop
}
/** Called to indicate, that {@link InputStream#read(byte[])}, or
* {@link InputStream#read(byte[], int, int)} have been called, and are about to
* invoke data.
* @param pBuffer The byte array, which has been passed to the read call, and where
* data has been stored.
* @param pOffset The offset within the byte array, where data has been stored.
* @param pLength The number of bytes, which have been stored in the byte array.
* @throws IOException if an i/o-error occurs
*/
public void data(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
// noop
}
/** Called to indicate, that EOF has been seen on the underlying stream.
* This method may be called multiple times, if the reader keeps invoking
* either of the read methods, and they will consequently keep returning
* EOF.
* @throws IOException if an i/o-error occurs
*/
public void finished() throws IOException {
// noop
}
/** Called to indicate, that the {@link ObservableInputStream} has been closed.
* @throws IOException if an i/o-error occurs
*/
public void closed() throws IOException {
// noop
}
/**
* Called to indicate, that an error occurred on the underlying stream.
* @param pException the exception to throw
* @throws IOException if an i/o-error occurs
*/
public void error(final IOException pException) throws IOException { throw pException; }
}
private final List<Observer> observers = new ArrayList<>();
/**
* Creates a new ObservableInputStream for the given InputStream.
* @param pProxy the input stream to proxy
*/
public ObservableInputStream(final InputStream pProxy) {
super(pProxy);
}
/**
* Adds an Observer.
* @param pObserver the observer to add
*/
public void add(final Observer pObserver) {
observers.add(pObserver);
}
/**
* Removes an Observer.
* @param pObserver the observer to remove
*/
public void remove(final Observer pObserver) {
observers.remove(pObserver);
}
/**
* Removes all Observers.
*/
public void removeAllObservers() {
observers.clear();
}
@Override
public int read() throws IOException {
int result = 0;
IOException ioe = null;
try {
result = super.read();
} catch (final IOException pException) {
ioe = pException;
}
if (ioe != null) {
noteError(ioe);
} else if (result == -1) {
noteFinished();
} else {
noteDataByte(result);
}
return result;
}
@Override
public int read(final byte[] pBuffer) throws IOException {
int result = 0;
IOException ioe = null;
try {
result = super.read(pBuffer);
} catch (final IOException pException) {
ioe = pException;
}
if (ioe != null) {
noteError(ioe);
} else if (result == -1) {
noteFinished();
} else if (result > 0) {
noteDataBytes(pBuffer, 0, result);
}
return result;
}
@Override
public int read(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
int result = 0;
IOException ioe = null;
try {
result = super.read(pBuffer, pOffset, pLength);
} catch (final IOException pException) {
ioe = pException;
}
if (ioe != null) {
noteError(ioe);
} else if (result == -1) {
noteFinished();
} else if (result > 0) {
noteDataBytes(pBuffer, pOffset, result);
}
return result;
}
/** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}
* with the given arguments.
* @param pBuffer Passed to the observers.
* @param pOffset Passed to the observers.
* @param pLength Passed to the observers.
* @throws IOException Some observer has thrown an exception, which is being
* passed down.
*/
protected void noteDataBytes(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
for (final Observer observer : getObservers()) {
observer.data(pBuffer, pOffset, pLength);
}
}
/** Notifies the observers by invoking {@link Observer#finished()}.
* @throws IOException Some observer has thrown an exception, which is being
* passed down.
*/
protected void noteFinished() throws IOException {
for (final Observer observer : getObservers()) {
observer.finished();
}
}
/** Notifies the observers by invoking {@link Observer#data(int)}
* with the given arguments.
* @param pDataByte Passed to the observers.
* @throws IOException Some observer has thrown an exception, which is being
* passed down.
*/
protected void noteDataByte(final int pDataByte) throws IOException {
for (final Observer observer : getObservers()) {
observer.data(pDataByte);
}
}
/** Notifies the observers by invoking {@link Observer#error(IOException)}
* with the given argument.
* @param pException Passed to the observers.
* @throws IOException Some observer has thrown an exception, which is being
* passed down. This may be the same exception, which has been passed as an
* argument.
*/
protected void noteError(final IOException pException) throws IOException {
for (final Observer observer : getObservers()) {
observer.error(pException);
}
}
/** Notifies the observers by invoking {@link Observer#finished()}.
* @throws IOException Some observer has thrown an exception, which is being
* passed down.
*/
protected void noteClosed() throws IOException {
for (final Observer observer : getObservers()) {
observer.closed();
}
}
/** Gets all currently registered observers.
* @return a list of the currently registered observers
*/
protected List<Observer> getObservers() {
return observers;
}
@Override
public void close() throws IOException {
IOException ioe = null;
try {
super.close();
} catch (final IOException e) {
ioe = e;
}
if (ioe == null) {
noteClosed();
} else {
noteError(ioe);
}
}
/** Reads all data from the underlying {@link InputStream}, while notifying the
* observers.
* @throws IOException The underlying {@link InputStream}, or either of the
* observers has thrown an exception.
*/
public void consume() throws IOException {
final byte[] buffer = new byte[IOUtils.DEFAULT_BUFFER_SIZE];
for (;;) {
final int res = read(buffer);
if (res == -1) {
return;
}
}
}
}