Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.


git-svn-id: https://svn.apache.org/repos/asf/commons/proper/io/trunk@1750760 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index afafcb7..65a43f8 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -46,6 +46,11 @@
 
   <body>
     <!-- The release date is the date RC is cut -->
+    <release version="2.7" date="Not yet published">
+      <action dev="jochen" type="add">
+        Added the ObservableInputStream, and the MessageDigestCalculatingInputStream.
+      </action>
+    </release>
     <release version="2.6" date="2016-MM-DD" description="New features and bug fixes.">
       <action issue="IO-511" dev="britter" type="fix" due-to="Ahmet Celik">
         After a few unit tests, a few newly created directories not cleaned completely.
diff --git a/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java b/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
new file mode 100644
index 0000000..b12e53a
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/MessageDigestCalculatingInputStream.java
@@ -0,0 +1,84 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ * 

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package org.apache.commons.io.input;

+

+import java.io.IOException;

+import java.io.InputStream;

+import java.security.MessageDigest;

+import java.security.NoSuchAlgorithmException;

+

+

+/**

+ * This class is an example for using an {@link ObservableInputStream}. It

+ * creates its own {@link Observer}, which calculates a checksum using a

+ * MessageDigest, for example an MD5 sum.

+ * {@em Note}: Neither {@link ObservableInputStream}, nor {@link MessageDigest},

+ * are thread safe. So is {@link MessageDigestCalculatingInputStream}.

+ */

+public class MessageDigestCalculatingInputStream extends ObservableInputStream {

+    public static class MessageDigestMaintainingObserver extends Observer {

+        private final MessageDigest md;

+

+        public MessageDigestMaintainingObserver(MessageDigest pMd) {

+            md = pMd;

+        }

+

+        @Override

+        void data(int pByte) throws IOException {

+            md.update((byte) pByte);

+        }

+

+        @Override

+        void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {

+            md.update(pBuffer, pOffset, pLength);

+        }

+    }

+

+    private final MessageDigest messageDigest;

+

+    /** Creates a new instance, which calculates a signature on the given stream,

+     * using the given {@link MessageDigest}.

+     */

+    public MessageDigestCalculatingInputStream(InputStream pStream, MessageDigest pDigest) {

+        super(pStream);

+        messageDigest = pDigest;

+        add(new MessageDigestMaintainingObserver(pDigest));

+    }

+    /** Creates a new instance, which calculates a signature on the given stream,

+     * using a {@link MessageDigest} with the given algorithm.

+     */

+    public MessageDigestCalculatingInputStream(InputStream pStream, String pAlgorithm) throws NoSuchAlgorithmException {

+        this(pStream, MessageDigest.getInstance(pAlgorithm));

+    }

+    /** Creates a new instance, which calculates a signature on the given stream,

+     * using a {@link MessageDigest} with the "MD5" algorithm.

+     */

+    public MessageDigestCalculatingInputStream(InputStream pStream) throws NoSuchAlgorithmException {

+        this(pStream, MessageDigest.getInstance("MD5"));

+    }

+

+    /** Returns the {@link MessageDigest}, which is being used for generating the

+     * checksum.

+     * {@em Note}: The checksum will only reflect the data, which has been read so far.

+     * This is probably not, what you expect. Make sure, that the complete data has been

+     * read, if that is what you want. The easiest way to do so is by invoking

+     * {@link #consume()}.

+     */

+    public MessageDigest getMessageDigest() {

+        return messageDigest;

+    }

+}

diff --git a/src/main/java/org/apache/commons/io/input/ObservableInputStream.java b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
new file mode 100644
index 0000000..7d13472
--- /dev/null
+++ b/src/main/java/org/apache/commons/io/input/ObservableInputStream.java
@@ -0,0 +1,238 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ * 

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package org.apache.commons.io.input;

+

+import java.io.IOException;

+import java.io.InputStream;

+import java.security.MessageDigest;

+import java.util.ArrayList;

+import java.util.List;

+

+

+/**

+ * The {@link ObservableInputStream} allows, that an InputStream may be consumed

+ * by other receivers, apart from the thread, which is reading it.

+ * The other consumers are implemented as instances of {@link Observer}. A

+ * typical application may be the generation of a {@link MessageDigest} on the

+ * fly.

+ * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,

+ * as instances of InputStream usually aren't.

+ * If you must access the stream from multiple threads, then synchronization, locking,

+ * or a similar means must be used.

+ * @see MessageDigestCalculatingInputStream

+ */

+public class ObservableInputStream extends ProxyInputStream {

+    public static abstract class Observer {

+        /** Called to indicate, that {@link InputStream#read()} has been invoked

+         * on the {@link ObservableInputStream}, and will return a value.

+         * @param pByte The value, which is being returned. This will never be -1 (EOF),

+         *    because, in that case, {link #finished()} will be invoked instead.

+         */

+        void data(int pByte) throws IOException {}

+        /** Called to indicate, that {@link InputStream#read(byte[])}, or

+         * {@link InputStream#read(byte[], int, int)} have been called, and are about to

+         * invoke data.

+         * @param pBuffer The byte array, which has been passed to the read call, and where

+         *   data has been stored.

+         * @param pOffset The offset within the byte array, where data has been stored.

+         * @param pLength The number of bytes, which have been stored in the byte array.

+         */

+        void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {}

+        /** Called to indicate, that EOF has been seen on the underlying stream.

+         * This method may be called multiple times, if the reader keeps invoking

+         * either of the read methods, and they will consequently keep returning

+         * EOF.

+         */

+        void finished() throws IOException {}

+        /** Called to indicate, that the {@link ObservableInputStream} has been closed.

+         */

+        void closed() throws IOException {}

+        /**

+         * Called to indicate, that an error occurred on the underlying stream.

+         */

+        void error(IOException pException) throws IOException { throw pException; }

+    }

+

+    private final List<Observer> observers = new ArrayList<Observer>();

+    

+    public ObservableInputStream(InputStream pProxy) {

+        super(pProxy);

+    }

+

+    public void add(Observer pObserver) {

+        observers.add(pObserver);

+    }

+

+    public void remove(Observer pObserver) {

+        observers.remove(pObserver);

+    }

+

+    public void removeAllObservers() {

+        observers.clear();

+    }

+

+    @Override

+    public int read() throws IOException {

+        int result = 0;

+        IOException ioe = null;

+        try {

+            result = super.read();

+        } catch (IOException pException) {

+            ioe = pException;

+        }

+        if (ioe != null) {

+            noteError(ioe);

+        } else if (result == -1) {

+            noteFinished();

+        } else {

+            noteDataByte(result);

+        }

+        return result;

+    }

+

+    @Override

+    public int read(byte[] pBuffer) throws IOException {

+        int result = 0;

+        IOException ioe = null;

+        try {

+            result = super.read(pBuffer);

+        } catch (IOException pException) {

+            ioe = pException;

+        }

+        if (ioe != null) {

+            noteError(ioe);

+        } else if (result == -1) {

+            noteFinished();

+        } else if (result > 0) {

+            noteDataBytes(pBuffer, 0, result);

+        }

+        return result;

+    }

+

+    @Override

+    public int read(byte[] pBuffer, int pOffset, int pLength) throws IOException {

+        int result = 0;

+        IOException ioe = null;

+        try {

+            result = super.read(pBuffer, pOffset, pLength);

+        } catch (IOException pException) {

+            ioe = pException;

+        }

+        if (ioe != null) {

+            noteError(ioe);

+        } else if (result == -1) {

+            noteFinished();

+        } else if (result > 0) {

+            noteDataBytes(pBuffer, pOffset, result);

+        }

+        return result;

+    }

+

+    /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}

+     * with the given arguments.

+     * @param pBuffer Passed to the observers.

+     * @param pOffset Passed to the observers.

+     * @param pLength Passed to the observers.

+     * @throws IOException Some observer has thrown an exception, which is being

+     *   passed down.

+     */

+    protected void noteDataBytes(byte[] pBuffer, int pOffset, int pLength) throws IOException {

+        for (Observer observer : getObservers()) {

+            observer.data(pBuffer, pOffset, pLength);

+        }

+    }

+

+    /** Notifies the observers by invoking {@link Observer#finished()}.

+     * @throws IOException Some observer has thrown an exception, which is being

+     *   passed down.

+     */

+    protected void noteFinished() throws IOException {

+        for (Observer observer : getObservers()) {

+            observer.finished();

+        }

+    }

+

+    /** Notifies the observers by invoking {@link Observer#data(int)}

+     * with the given arguments.

+     * @param pDataByte Passed to the observers.

+     * @throws IOException Some observer has thrown an exception, which is being

+     *   passed down.

+     */

+    protected void noteDataByte(int pDataByte) throws IOException {

+        for (Observer observer : getObservers()) {

+            observer.data(pDataByte);

+        }

+    }

+

+    /** Notifies the observers by invoking {@link Observer#error(IOException)}

+     * with the given argument.

+     * @param pException Passed to the observers.

+     * @throws IOException Some observer has thrown an exception, which is being

+     *   passed down. This may be the same exception, which has been passed as an

+     *   argument.

+     */

+    protected void noteError(IOException pException) throws IOException {

+        for (Observer observer : getObservers()) {

+            observer.error(pException);

+        }

+    }

+

+    /** Notifies the observers by invoking {@link Observer#finished()}.

+     * @throws IOException Some observer has thrown an exception, which is being

+     *   passed down.

+     */

+    protected void noteClosed() throws IOException {

+        for (Observer observer : getObservers()) {

+            observer.closed();

+        }

+    }

+

+    protected List<Observer> getObservers() {

+        return observers;

+    }

+

+    @Override

+    public void close() throws IOException {

+        IOException ioe = null;

+        try {

+            super.close();

+        } catch (IOException e) {

+            ioe = e;

+        }

+        if (ioe == null) {

+            noteClosed();

+        } else {

+            noteError(ioe);

+        }

+    }

+

+    /** Reads all data from the underlying {@link InputStream}, while notifying the

+     * observers.

+     * @throws IOException The underlying {@link InputStream}, or either of the

+     *   observers has thrown an exception.

+     */

+    public void consume() throws IOException {

+        final byte[] buffer = new byte[8192];

+        for (;;) {

+            final int res = read(buffer);

+            if (res == -1) {

+                return;

+            }

+        }

+    }

+    

+}

diff --git a/src/main/java/org/apache/commons/io/input/TeeInputStream.java b/src/main/java/org/apache/commons/io/input/TeeInputStream.java
index 3f2941d..6000824 100644
--- a/src/main/java/org/apache/commons/io/input/TeeInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/TeeInputStream.java
@@ -35,6 +35,7 @@
  *
  * @version $Id$
  * @since 1.4
+ * @see ObservableInputStream 
  */
 public class TeeInputStream extends ProxyInputStream {
 
diff --git a/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java b/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
new file mode 100644
index 0000000..4162d21
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/MessageDigestCalculatingInputStreamTest.java
@@ -0,0 +1,48 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ * 

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package org.apache.commons.io.input;

+

+import static org.junit.Assert.*;

+

+import java.io.ByteArrayInputStream;

+import java.security.MessageDigest;

+import java.util.Random;

+

+import org.junit.Test;

+

+public class MessageDigestCalculatingInputStreamTest {

+    public static byte[] generateRandomByteStream(int pSize) {

+        final byte[] buffer = new byte[pSize];

+        final Random rnd = new Random();

+        rnd.nextBytes(buffer);

+        return buffer;

+    }

+

+    @Test

+    public void test() throws Exception {

+        for (int i = 256;  i < 8192;  i = i*2) {

+            final byte[] buffer = generateRandomByteStream(i);

+            final MessageDigest md5Sum = MessageDigest.getInstance("MD5");

+            final byte[] expect = md5Sum.digest(buffer);

+            final MessageDigestCalculatingInputStream md5InputStream = new MessageDigestCalculatingInputStream(new ByteArrayInputStream(buffer));

+            md5InputStream.consume();

+            final byte[] got = md5InputStream.getMessageDigest().digest();

+            assertArrayEquals(expect, got);

+        }

+    }

+

+}

diff --git a/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
new file mode 100644
index 0000000..d74af27
--- /dev/null
+++ b/src/test/java/org/apache/commons/io/input/ObservableInputStreamTest.java
@@ -0,0 +1,134 @@
+/*

+ * Licensed to the Apache Software Foundation (ASF) under one or more

+ * contributor license agreements.  See the NOTICE file distributed with

+ * this work for additional information regarding copyright ownership.

+ * The ASF licenses this file to You under the Apache License, Version 2.0

+ * (the "License"); you may not use this file except in compliance with

+ * the License.  You may obtain a copy of the License at

+ * 

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ * 

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+package org.apache.commons.io.input;

+

+import static org.junit.Assert.*;

+

+import java.io.ByteArrayInputStream;

+import java.io.IOException;

+

+import org.apache.commons.io.input.ObservableInputStream;

+import org.apache.commons.io.input.ObservableInputStream.Observer;

+import org.junit.Test;

+

+public class ObservableInputStreamTest {

+    private static class LastByteKeepingObserver extends Observer {

+        private int lastByteSeen = -1;

+        private boolean finished;

+        private boolean closed;

+

+        @Override

+        void data(int pByte) throws IOException {

+            super.data(pByte);

+            lastByteSeen = pByte;

+        }

+        

+        @Override

+        void finished() throws IOException {

+            super.finished();

+            finished = true;

+        }

+        

+        @Override

+        void closed() throws IOException {

+            super.closed();

+            closed = true;

+        }

+    }

+    private static class LastBytesKeepingObserver extends Observer {

+        private byte[] buffer = null;

+        private int offset = -1;

+        private int length = -1;

+

+        @Override

+        void data(byte[] pBuffer, int pOffset, int pLength) throws IOException {

+            super.data(pBuffer, pOffset, pLength);

+            buffer = pBuffer;

+            offset = pOffset;

+            length = pLength;

+        }

+    }

+

+    /** Tests, that {@link Observer#data(int)} is called.

+     */

+    @Test

+    public void testDataByteCalled() throws Exception {

+        final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);

+        final ObservableInputStream ois = new ObservableInputStream(new ByteArrayInputStream(buffer));

+        final LastByteKeepingObserver lko = new LastByteKeepingObserver();

+        assertEquals(-1, lko.lastByteSeen);

+        ois.read();

+        assertEquals(-1, lko.lastByteSeen);

+        assertFalse(lko.finished);

+        assertFalse(lko.closed);

+        ois.add(lko);

+        for (int i = 1;  i < buffer.length;  i++) {

+            final int result = ois.read();

+            assertEquals((byte) result, buffer[i]);

+            assertEquals(result, lko.lastByteSeen);

+            assertFalse(lko.finished);

+            assertFalse(lko.closed);

+        }

+        final int result = ois.read();

+        assertEquals(-1, result);

+        assertTrue(lko.finished);

+        assertFalse(lko.closed);

+        ois.close();

+        assertTrue(lko.finished);

+        assertTrue(lko.closed);

+    }

+

+    /** Tests, that {@link Observer#data(byte[],int,int)} is called.

+     */

+    @Test

+    public void testDataBytesCalled() throws Exception {

+        final byte[] buffer = MessageDigestCalculatingInputStreamTest.generateRandomByteStream(4096);

+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer);

+        final ObservableInputStream ois = new ObservableInputStream(bais);

+        final LastBytesKeepingObserver lko = new LastBytesKeepingObserver();

+        final byte[] readBuffer = new byte[23];

+        assertEquals(null, lko.buffer);

+        ois.read(readBuffer);

+        assertEquals(null, lko.buffer);

+        ois.add(lko);

+        for (;;) {

+            if (bais.available() >= 2048) {

+                final int result = ois.read(readBuffer);

+                if (result == -1) {

+                    ois.close();

+                    break;

+                } else {

+                    assertEquals(readBuffer, lko.buffer);

+                    assertEquals(0, lko.offset);

+                    assertEquals(readBuffer.length, lko.length);

+                }

+            } else {

+                final int res = Math.min(11, bais.available());

+                final int result = ois.read(readBuffer, 1, 11);

+                if (result == -1) {

+                    ois.close();

+                    break;

+                } else {

+                    assertEquals(readBuffer, lko.buffer);

+                    assertEquals(1, lko.offset);

+                    assertEquals(res, lko.length);

+                }

+            }

+        }

+    }

+

+}