BoundedInputStream can count its bytes without wrapping a
CountingInputStream

- Deprecate CountingInputStream in favor of BoundedInputStream
- Add a BoundedInputStream builder
diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index fb3ee4a..3251c1b 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -130,6 +130,8 @@
       <action dev="ggregory" type="add"                due-to="Gary Gregory">Add AbstractStreamBuilder.getReader().</action>
       <action dev="ggregory" type="add"                due-to="Gary Gregory">Add Maven property project.build.outputTimestamp for build reproducibility.</action>
       <action dev="ggregory" type="add"                due-to="Gary Gregory">Add ProxyInputStream.unwrap().</action>
+      <action dev="ggregory" type="add"                due-to="Gary Gregory">Add a running count and builder to BoundedInputStream.</action>
+      <action dev="ggregory" type="add"                due-to="Gary Gregory">Add a builder to CountingInputStream.</action>
       <!--  UPDATE -->
       <action dev="ggregory" type="update"             due-to="Gary Gregory">Bump commons.bytebuddy.version from 1.14.10 to 1.14.11 #534.</action>
       <action dev="ggregory" type="update"             due-to="Gary Gregory">Bump org.apache.commons:commons-parent from 65 to 66.</action>
diff --git a/src/main/java/org/apache/commons/io/input/BoundedInputStream.java b/src/main/java/org/apache/commons/io/input/BoundedInputStream.java
index 7175dac..91e56f7 100644
--- a/src/main/java/org/apache/commons/io/input/BoundedInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/BoundedInputStream.java
@@ -24,28 +24,153 @@
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.build.AbstractStreamBuilder;
 
+//@formatter:off
 /**
- * Reads bytes up to a maximum length, if its count goes above that, it stops.
+ * Reads bytes up to a maximum count and stops once reached.
  * <p>
- * This is useful to wrap {@code ServletInputStream}s. The {@code ServletInputStream} will block if you try to read content from it that isn't there, because it
- * doesn't know whether the content hasn't arrived yet or whether the content has finished. So, one of these, initialized with the {@code Content-Length} sent
- * in the {@code ServletInputStream}'s header, will stop it blocking, providing it's been sent with a correct content length.
+ * To build an instance, see {@link AbstractBuilder}.
  * </p>
  * <p>
- * To build an instance, use {@link Builder}.
+ * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
  * </p>
- *
+ * <p>
+ * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
+ * </p>
+ * <h2>Using a ServletInputStream</h2>
+ * <p>
+ * A {@code ServletInputStream} can block if you try to read content that isn't there
+ * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
+ * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
+ * length in the first place.
+ * </p>
+ * <h2>Using NIO</h2>
+ * <pre>{@code
+ * BoundedInputStream s = BoundedInputStream.builder()
+ *   .setPath(Paths.get("MyFile.xml"))
+ *   .setMaxCount(1024)
+ *   .setPropagateClose(false)
+ *   .get();
+ * }
+ * </pre>
+ * <h2>Using IO</h2>
+ * <pre>{@code
+ * BoundedInputStream s = BoundedInputStream.builder()
+ *   .setFile(new File("MyFile.xml"))
+ *   .setMaxCount(1024)
+ *   .setPropagateClose(false)
+ *   .get();
+ * }
+ * </pre>
+ * <h2>Counting Bytes</h2>
+ * <p>You can set the running count when building, which is most useful when starting from another stream:
+ * <pre>{@code
+ * InputStream in = ...;
+ * BoundedInputStream s = BoundedInputStream.builder()
+ *   .setInputStream(in)
+ *   .setCount(12)
+ *   .setMaxCount(1024)
+ *   .setPropagateClose(false)
+ *   .get();
+ * }
+ * </pre>
  * @see Builder
  * @since 2.0
  */
+//@formatter:on
 public class BoundedInputStream extends ProxyInputStream {
 
-    // TODO For 3.0, extend CountingInputStream. Or, add a max feature to CountingInputStream.
+    /**
+     * For subclassing builders from {@link BoundedInputStream} subclassses.
+     *
+     * @param <T> The subclass.
+     */
+    static abstract class AbstractBuilder<T extends AbstractBuilder<T>> extends AbstractStreamBuilder<BoundedInputStream, T> {
+
+        /** The current count of bytes counted. */
+        private long count;
+
+        /** The max count of bytes to read. */
+        private long maxCount = EOF;
+
+        /** Flag if {@link #close()} should be propagated, {@code true} by default. */
+        private boolean propagateClose = true;
+
+        long getCount() {
+            return count;
+        }
+
+        long getMaxCount() {
+            return maxCount;
+        }
+
+        boolean isPropagateClose() {
+            return propagateClose;
+        }
+
+        /**
+         * Sets the current number of bytes counted.
+         * <p>
+         * Useful when building from another stream to carry forward a read count.
+         * </p>
+         * <p>
+         * Default is {@code 0}, negative means 0.
+         * </p>
+         *
+         * @param count The current number of bytes counted.
+         * @return this.
+         */
+        public T setCount(final long count) {
+            this.count = Math.max(0, count);
+            return asThis();
+        }
+
+        /**
+         * Sets the maximum number of bytes to return.
+         * <p>
+         * Default is {@value IOUtils#EOF}, negative means unbound.
+         * </p>
+         *
+         * @param maxCount The maximum number of bytes to return.
+         * @return this.
+         */
+        public T setMaxCount(final long maxCount) {
+            this.maxCount = Math.max(EOF, maxCount);
+            return asThis();
+        }
+
+        /**
+         * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
+         * <p>
+         * Default is {@code true}.
+         * </p>
+         *
+         * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
+         *                       it does not.
+         * @return this.
+         */
+        public T setPropagateClose(final boolean propagateClose) {
+            this.propagateClose = propagateClose;
+            return asThis();
+        }
+
+    }
 
     //@formatter:off
     /**
      * Builds a new {@link BoundedInputStream}.
-     *
+     * <p>
+     * By default, a {@link BoundedInputStream} is <em>unbound</em>; so make sure to call {@link AbstractBuilder#setMaxCount(long)}.
+     * </p>
+     * <p>
+     * You can find out how many bytes this stream has seen so far by calling {@link BoundedInputStream#getCount()}. This value reflects bytes read and skipped.
+     * </p>
+     * <h2>Using a ServletInputStream</h2>
+     * <p>
+     * A {@code ServletInputStream} can block if you try to read content that isn't there
+     * because it doesn't know whether the content hasn't arrived yet or whether the content has finished. Initialize an {@link BoundedInputStream} with the
+     * {@code Content-Length} sent in the {@code ServletInputStream}'s header, this stop it from blocking, providing it's been sent with a correct content
+     * length in the first place.
+     * </p>
      * <h2>Using NIO</h2>
      * <pre>{@code
      * BoundedInputStream s = BoundedInputStream.builder()
@@ -64,18 +189,24 @@
      *   .get();
      * }
      * </pre>
+     * <h2>Counting Bytes</h2>
+     * <p>You can set the running count when building, which is most useful when starting from another stream:
+     * <pre>{@code
+     * InputStream in = ...;
+     * BoundedInputStream s = BoundedInputStream.builder()
+     *   .setInputStream(in)
+     *   .setCount(12)
+     *   .setMaxCount(1024)
+     *   .setPropagateClose(false)
+     *   .get();
+     * }
+     * </pre>
      *
      * @see #get()
      * @since 2.16.0
      */
     //@formatter:on
-    public static class Builder extends AbstractStreamBuilder<BoundedInputStream, Builder> {
-
-        /** The max count of bytes to read. */
-        private long maxCount = EOF;
-
-        /** Flag if close should be propagated. */
-        private boolean propagateClose = true;
+    public static class Builder extends AbstractBuilder<Builder> {
 
         /**
          * Builds a new {@link BoundedInputStream}.
@@ -100,50 +231,24 @@
         @SuppressWarnings("resource")
         @Override
         public BoundedInputStream get() throws IOException {
-            return new BoundedInputStream(getInputStream(), maxCount, propagateClose);
-        }
-
-        /**
-         * Sets the maximum number of bytes to return.
-         * <p>
-         * Default is {@value IOUtils#EOF}.
-         * </p>
-         *
-         * @param maxCount The maximum number of bytes to return.
-         * @return this.
-         */
-        public Builder setMaxCount(final long maxCount) {
-            this.maxCount = maxCount;
-            return this;
-        }
-
-        /**
-         * Sets whether the {@link #close()} method should propagate to the underling {@link InputStream}.
-         * <p>
-         * Default is true.
-         * </p>
-         *
-         * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if
-         *                       it does not.
-         * @return this.
-         */
-        public Builder setPropagateClose(final boolean propagateClose) {
-            this.propagateClose = propagateClose;
-            return this;
+            return new BoundedInputStream(getInputStream(), getCount(), getMaxCount(), isPropagateClose());
         }
 
     }
 
     /**
-     * Constructs a new {@link Builder}.
+     * Constructs a new {@link AbstractBuilder}.
      *
-     * @return a new {@link Builder}.
+     * @return a new {@link AbstractBuilder}.
      * @since 2.16.0
      */
     public static Builder builder() {
         return new Builder();
     }
 
+    /** The current count of bytes counted. */
+    private long count;
+
     /** The max count of bytes to read. */
     private final long maxCount;
 
@@ -158,7 +263,7 @@
      * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is unlimited.
      *
      * @param in The wrapped input stream.
-     * @deprecated Use {@link Builder#get()}.
+     * @deprecated Use {@link AbstractBuilder#get()}.
      */
     @Deprecated
     public BoundedInputStream(final InputStream in) {
@@ -169,34 +274,50 @@
      * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
      *
      * @param inputStream The wrapped input stream.
-     * @param maxCount   The maximum number of bytes to return.
-     * @deprecated Use {@link Builder#get()}.
+     * @param maxCount    The maximum number of bytes to return.
+     * @deprecated Use {@link AbstractBuilder#get()}.
      */
     @Deprecated
     public BoundedInputStream(final InputStream inputStream, final long maxCount) {
         // Some badly designed methods - e.g. the Servlet API - overload length
         // such that "-1" means stream finished
-        this(inputStream, maxCount, true);
+        this(inputStream, 0, maxCount, true);
     }
 
     /**
      * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it to a certain size.
      *
      * @param inputStream    The wrapped input stream.
-     * @param maxCount      The maximum number of bytes to return.
+     * @param count          The current number of bytes read.
+     * @param maxCount       The maximum number of bytes to return.
      * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
      *                       does not.
      */
-    @SuppressWarnings("resource") // Caller closes.
-    private BoundedInputStream(final InputStream inputStream, final long maxCount, final boolean propagateClose) {
+    BoundedInputStream(final InputStream inputStream, final long count, final long maxCount, final boolean propagateClose) {
         // Some badly designed methods - e.g. the Servlet API - overload length
         // such that "-1" means stream finished
-        super(new CountingInputStream(inputStream));
+        // Can't throw because we start from an InputStream.
+        super(inputStream);
+        this.count = count;
         this.maxCount = maxCount;
         this.propagateClose = propagateClose;
     }
 
     /**
+     * Adds the number of read bytes to the count.
+     *
+     * @param n number of bytes read, or -1 if no more bytes are available
+     * @throws IOException Not thrown here but subclasses may throw.
+     * @since 2.0
+     */
+    @Override
+    protected synchronized void afterRead(final int n) throws IOException {
+        if (n != EOF) {
+            count += n;
+        }
+    }
+
+    /**
      * {@inheritDoc}
      */
     @Override
@@ -226,13 +347,8 @@
      * @return The count of bytes read.
      * @since 2.12.0
      */
-    @SuppressWarnings("resource") // no allocation
     public long getCount() {
-        return getCountingInputStream().getByteCount();
-    }
-
-    private CountingInputStream getCountingInputStream() {
-        return (CountingInputStream) in;
+        return count;
     }
 
     /**
@@ -264,7 +380,7 @@
      * @since 2.16.0
      */
     public long getRemaining() {
-        return getMaxCount() - getCount();
+        return Math.max(0, getMaxCount() - getCount());
     }
 
     private boolean isMaxCount() {
@@ -373,7 +489,7 @@
      *
      * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code close()} method of the underlying stream or {@code false} if it
      *                       does not.
-     * @deprecated Use {@link Builder#setPropagateClose(boolean)}.
+     * @deprecated Use {@link AbstractBuilder#setPropagateClose(boolean)}.
      */
     @Deprecated
     public void setPropagateClose(final boolean propagateClose) {
@@ -389,7 +505,9 @@
      */
     @Override
     public long skip(final long n) throws IOException {
-        return super.skip(toReadLen(n));
+        final long skip = super.skip(toReadLen(n));
+        count += skip;
+        return skip;
     }
 
     private long toReadLen(final long len) {
diff --git a/src/main/java/org/apache/commons/io/input/CountingInputStream.java b/src/main/java/org/apache/commons/io/input/CountingInputStream.java
index 4c3eccd..661d79d 100644
--- a/src/main/java/org/apache/commons/io/input/CountingInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/CountingInputStream.java
@@ -28,7 +28,9 @@
  * A typical use case would be during debugging, to ensure that data is being
  * read as expected.
  * </p>
+ * @deprecated Use {@link BoundedInputStream} (unbounded by default).
  */
+@Deprecated
 public class CountingInputStream extends ProxyInputStream {
 
     /** The count of bytes that have passed. */
diff --git a/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java b/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java
index 5ae071f..aa9b3fa 100644
--- a/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/input/BoundedInputStreamTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.commons.io.input;
 
+import static org.apache.commons.io.IOUtils.EOF;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -27,6 +28,8 @@
 
 import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Tests for {@link BoundedInputStream}.
@@ -47,6 +50,92 @@
     }
 
     @SuppressWarnings("deprecation")
+    @ParameterizedTest
+    @ValueSource(longs = { -100, -1, 0, 1, 2, 4, 8, 16, 32, 64 })
+    public void testCounts(final long startCount) throws Exception {
+
+        final byte[] helloWorld = "Hello World".getBytes(StandardCharsets.UTF_8);
+        final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
+        final long actualStart = startCount < 0 ? 0 : startCount;
+
+        // limit = length
+        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setCount(startCount)
+                .setMaxCount(helloWorld.length).get()) {
+            assertEquals(helloWorld.length, bounded.getMaxCount());
+            assertEquals(helloWorld.length, bounded.getMaxLength());
+            assertEquals(actualStart, bounded.getCount());
+            assertEquals(Math.max(0, bounded.getMaxCount() - actualStart), bounded.getRemaining());
+            assertEquals(Math.max(0, bounded.getMaxLength() - actualStart), bounded.getRemaining());
+            int readCount = 0;
+            for (int i = 0; i < helloWorld.length; i++) {
+                final byte expectedCh = bounded.getRemaining() > 0 ? helloWorld[i] : EOF;
+                final int actualCh = bounded.read();
+                assertEquals(expectedCh, actualCh, "limit = length byte[" + i + "]");
+                if (actualCh != EOF) {
+                    readCount++;
+                }
+                assertEquals(helloWorld.length, bounded.getMaxCount());
+                assertEquals(helloWorld.length, bounded.getMaxLength());
+                assertEquals(actualStart + readCount, bounded.getCount(), "i=" + i);
+                assertEquals(Math.max(0, bounded.getMaxCount() - (readCount + actualStart)), bounded.getRemaining());
+                assertEquals(Math.max(0, bounded.getMaxLength() - (readCount + actualStart)), bounded.getRemaining());
+            }
+            assertEquals(-1, bounded.read(), "limit = length end");
+            assertEquals(helloWorld.length, bounded.getMaxLength());
+            assertEquals(readCount + actualStart, bounded.getCount());
+            assertEquals(0, bounded.getRemaining());
+            assertEquals(0, bounded.available());
+        }
+        // limit > length
+        final int maxCountP1 = helloWorld.length + 1;
+        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setCount(startCount)
+                .setMaxCount(maxCountP1).get()) {
+            assertEquals(maxCountP1, bounded.getMaxLength());
+            assertEquals(actualStart, bounded.getCount());
+            assertEquals(Math.max(0, bounded.getMaxCount() - actualStart), bounded.getRemaining());
+            assertEquals(Math.max(0, bounded.getMaxLength() - actualStart), bounded.getRemaining());
+            int readCount = 0;
+            for (int i = 0; i < helloWorld.length; i++) {
+                final byte expectedCh = bounded.getRemaining() > 0 ? helloWorld[i] : EOF;
+                final int actualCh = bounded.read();
+                assertEquals(expectedCh, actualCh, "limit = length byte[" + i + "]");
+                if (actualCh != EOF) {
+                    readCount++;
+                }
+                assertEquals(maxCountP1, bounded.getMaxCount());
+                assertEquals(maxCountP1, bounded.getMaxLength());
+                assertEquals(actualStart + readCount, bounded.getCount(), "i=" + i);
+                assertEquals(Math.max(0, bounded.getMaxCount() - (readCount + actualStart)), bounded.getRemaining());
+                assertEquals(Math.max(0, bounded.getMaxLength() - (readCount + actualStart)), bounded.getRemaining());
+            }
+            assertEquals(-1, bounded.read(), "limit > length end");
+            assertEquals(0, bounded.available());
+            assertEquals(maxCountP1, bounded.getMaxLength());
+            assertEquals(readCount + actualStart, bounded.getCount());
+            assertEquals(Math.max(0, maxCountP1 - bounded.getCount()), bounded.getRemaining());
+        }
+        // limit < length
+        try (BoundedInputStream bounded = new BoundedInputStream(new ByteArrayInputStream(helloWorld), hello.length)) {
+            assertEquals(hello.length, bounded.getMaxLength());
+            assertEquals(0, bounded.getCount());
+            assertEquals(bounded.getMaxLength(), bounded.getRemaining());
+            int readCount = 0;
+            for (int i = 0; i < hello.length; i++) {
+                assertEquals(hello[i], bounded.read(), "limit < length byte[" + i + "]");
+                readCount++;
+                assertEquals(hello.length, bounded.getMaxLength());
+                assertEquals(readCount, bounded.getCount());
+                assertEquals(bounded.getMaxLength() - readCount, bounded.getRemaining());
+            }
+            assertEquals(-1, bounded.read(), "limit < length end");
+            assertEquals(0, bounded.available());
+            assertEquals(hello.length, bounded.getMaxLength());
+            assertEquals(readCount, bounded.getCount());
+            assertEquals(bounded.getMaxLength() - readCount, bounded.getRemaining());
+        }
+    }
+
+    @SuppressWarnings("deprecation")
     @Test
     public void testOnMaxLength() throws Exception {
         final byte[] helloWorld = "Hello World".getBytes(StandardCharsets.UTF_8);
@@ -77,6 +166,7 @@
             assertEquals(bounded.getMaxLength() - readCount, bounded.getRemaining());
         }
         assertEquals(-1, bounded.read(), "limit = length end");
+        assertEquals(0, bounded.available());
         assertEquals(helloWorld.length, bounded.getMaxLength());
         assertEquals(readCount, bounded.getCount());
         assertEquals(bounded.getMaxLength() - readCount, bounded.getRemaining());
@@ -103,6 +193,7 @@
             assertEquals(readCount, bounded.getCount());
             assertEquals(bounded.getMaxLength() - readCount, bounded.getRemaining());
         }
+        assertEquals(0, bounded.available());
         assertEquals(-1, bounded.read(), "limit > length end");
         assertEquals(length2, bounded.getMaxLength());
         assertEquals(readCount, bounded.getCount());
@@ -150,17 +241,17 @@
             compare("limit = 0", IOUtils.EMPTY_BYTE_ARRAY, IOUtils.toByteArray(bounded));
         }
 
-        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setMaxCount(helloWorld.length)
-                .get()) {
+        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld))
+                .setMaxCount(helloWorld.length).get()) {
             compare("limit = length", helloWorld, IOUtils.toByteArray(bounded));
         }
 
-        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setMaxCount(helloWorld.length + 1)
-                .get()) {
+        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld))
+                .setMaxCount(helloWorld.length + 1).get()) {
             compare("limit > length", helloWorld, IOUtils.toByteArray(bounded));
         }
-        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld)).setMaxCount(helloWorld.length - 6)
-                .get()) {
+        try (BoundedInputStream bounded = BoundedInputStream.builder().setInputStream(new ByteArrayInputStream(helloWorld))
+                .setMaxCount(helloWorld.length - 6).get()) {
             compare("limit < length", hello, IOUtils.toByteArray(bounded));
         }
     }