Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.
git-svn-id: https://svn.apache.org/repos/asf/commons/proper/io/trunk@1750760 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index afafcb7..65a43f8 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -46,6 +46,11 @@
<body>
<!-- The release date is the date RC is cut -->
+ <release version="2.7" date="Not yet published">
+ <action dev="jochen" type="add">
+ Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.
+ </action>
+ </release>
<release version="2.6" date="2016-MM-DD" description="New features and bug fixes.">
<action issue="IO-511" dev="britter" type="fix" due-to="Ahmet Celik">
After a few unit tests, a few newly created directories not cleaned completely.
diff --git a/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java b/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
new file mode 100644
index 0000000..b12e53a
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+
+/**
+ * This class is an example for using an {@link ObservableInputStream}. It
+ * creates its own {@link Observer}, which calculates a checksum using a
+ * MessageDigest, for example an MD5 sum.
+ * {@em Note}: Neither {@link ObservableInputStream}, nor {@link MessageDigest},
+ * are thread safe. So is {@link MessageDigestCalculatingInputStream}.
+ */
+public class MessageDigestCalculatingInputStream extends ObservableInputStream {
+ public static class MessageDigestMaintainingObserver extends Observer {
+ private final MessageDigest md;
+
+ public MessageDigestMaintainingObserver(MessageDigest pMd) {
+ md = pMd;
+ }
+
+ @Override
+ void data(int pByte) throws IOException {
+ md.update((byte) pByte);
+ }
+
+ @Override
+ void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ md.update(pBuffer, pOffset, pLength);
+ }
+ }
+
+ private final MessageDigest messageDigest;
+
+ /** Creates a new instance, which calculates a signature on the given stream,
+ * using the given {@link MessageDigest}.
+ */
+ public MessageDigestCalculatingInputStream(InputStream pStream, MessageDigest pDigest) {
+ super(pStream);
+ messageDigest = pDigest;
+ add(new MessageDigestMaintainingObserver(pDigest));
+ }
+ /** Creates a new instance, which calculates a signature on the given stream,
+ * using a {@link MessageDigest} with the given algorithm.
+ */
+ public MessageDigestCalculatingInputStream(InputStream pStream, String pAlgorithm) throws NoSuchAlgorithmException {
+ this(pStream, MessageDigest.getInstance(pAlgorithm));
+ }
+ /** Creates a new instance, which calculates a signature on the given stream,
+ * using a {@link MessageDigest} with the "MD5" algorithm.
+ */
+ public MessageDigestCalculatingInputStream(InputStream pStream) throws NoSuchAlgorithmException {
+ this(pStream, MessageDigest.getInstance("MD5"));
+ }
+
+ /** Returns the {@link MessageDigest}, which is being used for generating the
+ * checksum.
+ * {@em Note}: The checksum will only reflect the data, which has been read so far.
+ * This is probably not, what you expect. Make sure, that the complete data has been
+ * read, if that is what you want. The easiest way to do so is by invoking
+ * {@link #consume()}.
+ */
+ public MessageDigest getMessageDigest() {
+ return messageDigest;
+ }
+}
diff --git a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
new file mode 100644
index 0000000..7d13472
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * The {@link ObservableInputStream} allows, that an InputStream may be consumed
+ * by other receivers, apart from the thread, which is reading it.
+ * The other consumers are implemented as instances of {@link Observer}. A
+ * typical application may be the generation of a {@link MessageDigest} on the
+ * fly.
+ * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,
+ * as instances of InputStream usually aren't.
+ * If you must access the stream from multiple threads, then synchronization, locking,
+ * or a similar means must be used.
+ * @see MessageDigestCalculatingInputStream
+ */
+public class ObservableInputStream extends ProxyInputStream {
+ public static abstract class Observer {
+ /** Called to indicate, that {@link InputStream#read()} has been invoked
+ * on the {@link ObservableInputStream}, and will return a value.
+ * @param pByte The value, which is being returned. This will never be -1 (EOF),
+ * because, in that case, {link #finished()} will be invoked instead.
+ */
+ void data(int pByte) throws IOException {}
+ /** Called to indicate, that {@link InputStream#read(byte[])}, or
+ * {@link InputStream#read(byte[], int, int)} have been called, and are about to
+ * invoke data.
+ * @param pBuffer The byte array, which has been passed to the read call, and where
+ * data has been stored.
+ * @param pOffset The offset within the byte array, where data has been stored.
+ * @param pLength The number of bytes, which have been stored in the byte array.
+ */
+ void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {}
+ /** Called to indicate, that EOF has been seen on the underlying stream.
+ * This method may be called multiple times, if the reader keeps invoking
+ * either of the read methods, and they will consequently keep returning
+ * EOF.
+ */
+ void finished() throws IOException {}
+ /** Called to indicate, that the {@link ObservableInputStream} has been closed.
+ */
+ void closed() throws IOException {}
+ /**
+ * Called to indicate, that an error occurred on the underlying stream.
+ */
+ void error(IOException pException) throws IOException { throw pException; }
+ }
+
+ private final List<Observer> observers = new ArrayList<Observer>();
+
+ public ObservableInputStream(InputStream pProxy) {
+ super(pProxy);
+ }
+
+ public void add(Observer pObserver) {
+ observers.add(pObserver);
+ }
+
+ public void remove(Observer pObserver) {
+ observers.remove(pObserver);
+ }
+
+ public void removeAllObservers() {
+ observers.clear();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = 0;
+ IOException ioe = null;
+ try {
+ result = super.read();
+ } catch (IOException pException) {
+ ioe = pException;
+ }
+ if (ioe != null) {
+ noteError(ioe);
+ } else if (result == -1) {
+ noteFinished();
+ } else {
+ noteDataByte(result);
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] pBuffer) throws IOException {
+ int result = 0;
+ IOException ioe = null;
+ try {
+ result = super.read(pBuffer);
+ } catch (IOException pException) {
+ ioe = pException;
+ }
+ if (ioe != null) {
+ noteError(ioe);
+ } else if (result == -1) {
+ noteFinished();
+ } else if (result > 0) {
+ noteDataBytes(pBuffer, 0, result);
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ int result = 0;
+ IOException ioe = null;
+ try {
+ result = super.read(pBuffer, pOffset, pLength);
+ } catch (IOException pException) {
+ ioe = pException;
+ }
+ if (ioe != null) {
+ noteError(ioe);
+ } else if (result == -1) {
+ noteFinished();
+ } else if (result > 0) {
+ noteDataBytes(pBuffer, pOffset, result);
+ }
+ return result;
+ }
+
+ /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}
+ * with the given arguments.
+ * @param pBuffer Passed to the observers.
+ * @param pOffset Passed to the observers.
+ * @param pLength Passed to the observers.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteDataBytes(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.data(pBuffer, pOffset, pLength);
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#finished()}.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteFinished() throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.finished();
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#data(int)}
+ * with the given arguments.
+ * @param pDataByte Passed to the observers.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteDataByte(int pDataByte) throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.data(pDataByte);
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#error(IOException)}
+ * with the given argument.
+ * @param pException Passed to the observers.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down. This may be the same exception, which has been passed as an
+ * argument.
+ */
+ protected void noteError(IOException pException) throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.error(pException);
+ }
+ }
+
+ /** Notifies the observers by invoking {@link Observer#finished()}.
+ * @throws IOException Some observer has thrown an exception, which is being
+ * passed down.
+ */
+ protected void noteClosed() throws IOException {
+ for (Observer observer : getObservers()) {
+ observer.closed();
+ }
+ }
+
+ protected List<Observer> getObservers() {
+ return observers;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException ioe = null;
+ try {
+ super.close();
+ } catch (IOException e) {
+ ioe = e;
+ }
+ if (ioe == null) {
+ noteClosed();
+ } else {
+ noteError(ioe);
+ }
+ }
+
+ /** Reads all data from the underlying {@link InputStream}, while notifying the
+ * observers.
+ * @throws IOException The underlying {@link InputStream}, or either of the
+ * observers has thrown an exception.
+ */
+ public void consume() throws IOException {
+ final byte[] buffer = new byte[8192];
+ for (;;) {
+ final int res = read(buffer);
+ if (res == -1) {
+ return;
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/commons/io/input/TeeInputStream.java b/src/main/java/org/apache/commons/io/input/TeeInputStream.java
index 3f2941d..6000824 100644
--- a/src/main/java/org/apache/commons/io/input/TeeInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/TeeInputStream.java
@@ -35,6 +35,7 @@
*
* @version $Id$
* @since 1.4
+ * @see ObservableInputStream
*/
public class TeeInputStream extends ProxyInputStream {
diff --git a/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java b/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
new file mode 100644
index 0000000..4162d21
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.security.MessageDigest;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class MessageDigestCalculatingInputStreamTest {
+ public static byte[] generateRandomByteStream(int pSize) {
+ final byte[] buffer = new byte[pSize];
+ final Random rnd = new Random();
+ rnd.nextBytes(buffer);
+ return buffer;
+ }
+
+ @Test
+ public void test() throws Exception {
+ for (int i = 256; i < 8192; i = i*2) {
+ final byte[] buffer = generateRandomByteStream(i);
+ final MessageDigest md5Sum = MessageDigest.getInstance("MD5");
+ final byte[] expect = md5Sum.digest(buffer);
+ final MessageDigestCalculatingInputStream md5InputStream = new MessageDigestCalculatingInputStream(new ByteArrayInputStream(buffer));
+ md5InputStream.consume();
+ final byte[] got = md5InputStream.getMessageDigest().digest();
+ assertArrayEquals(expect, got);
+ }
+ }
+
+}
diff --git a/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
new file mode 100644
index 0000000..d74af27
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.commons.io.input;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.input.ObservableInputStream;
+import org.apache.commons.io.input.ObservableInputStream.Observer;
+import org.junit.Test;
+
+public class ObservableInputStreamTest {
+ private static class LastByteKeepingObserver extends Observer {
+ private int lastByteSeen = -1;
+ private boolean finished;
+ private boolean closed;
+
+ @Override
+ void data(int pByte) throws IOException {
+ super.data(pByte);
+ lastByteSeen = pByte;
+ }
+
+ @Override
+ void finished() throws IOException {
+ super.finished();
+ finished = true;
+ }
+
+ @Override
+ void closed() throws IOException {
+ super.closed();
+ closed = true;
+ }
+ }
+ private static class LastBytesKeepingObserver extends Observer {
+ private byte[] buffer = null;
+ private int offset = -1;
+ private int length = -1;
+
+ @Override
+ void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {
+ super.data(pBuffer, pOffset, pLength);
+ buffer = pBuffer;
+ offset = pOffset;
+ length = pLength;
+ }
+ }
+
+ /** Tests, that {@link Observer#data(int)} is called.
+ */
+ @Test
+ public void testDataByteCalled() throws Exception {
+ final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
+ final ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer));
+ final LastByteKeepingObserver lko = new LastByteKeepingObserver();
+ assertEquals(-1, lko.lastByteSeen);
+ ois.read();
+ assertEquals(-1, lko.lastByteSeen);
+ assertFalse(lko.finished);
+ assertFalse(lko.closed);
+ ois.add(lko);
+ for (int i = 1; i < buffer.length; i++) {
+ final int result = ois.read();
+ assertEquals((byte) result, buffer[i]);
+ assertEquals(result, lko.lastByteSeen);
+ assertFalse(lko.finished);
+ assertFalse(lko.closed);
+ }
+ final int result = ois.read();
+ assertEquals(-1, result);
+ assertTrue(lko.finished);
+ assertFalse(lko.closed);
+ ois.close();
+ assertTrue(lko.finished);
+ assertTrue(lko.closed);
+ }
+
+ /** Tests, that {@link Observer#data(byte[],int,int)} is called.
+ */
+ @Test
+ public void testDataBytesCalled() throws Exception {
+ final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);
+ ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
+ final ObservableInputStream ois = new ObservableInputStream(bais);
+ final LastBytesKeepingObserver lko = new LastBytesKeepingObserver();
+ final byte[] readBuffer = new byte[23];
+ assertEquals(null, lko.buffer);
+ ois.read(readBuffer);
+ assertEquals(null, lko.buffer);
+ ois.add(lko);
+ for (;;) {
+ if (bais.available() >= 2048) {
+ final int result = ois.read(readBuffer);
+ if (result == -1) {
+ ois.close();
+ break;
+ } else {
+ assertEquals(readBuffer, lko.buffer);
+ assertEquals(0, lko.offset);
+ assertEquals(readBuffer.length, lko.length);
+ }
+ } else {
+ final int res = Math.min(11, bais.available());
+ final int result = ois.read(readBuffer, 1, 11);
+ if (result == -1) {
+ ois.close();
+ break;
+ } else {
+ assertEquals(readBuffer, lko.buffer);
+ assertEquals(1, lko.offset);
+ assertEquals(res, lko.length);
+ }
+ }
+ }
+ }
+
+}