Add async Brotli support (compress/decompress). (#720)

Introduce DeflatingBrotliAsyncEntityProducer and InflatingBrotliAsyncDataConsumer; register "br" when brotli4j is available.
diff --git a/httpclient5/pom.xml b/httpclient5/pom.xml
index b97e1d2..4478777 100644
--- a/httpclient5/pom.xml
+++ b/httpclient5/pom.xml
@@ -118,6 +118,10 @@
       <artifactId>zstd-jni</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>com.aayushatharva.brotli4j</groupId>
+      <artifactId>brotli4j</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java
new file mode 100644
index 0000000..676c530
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java
@@ -0,0 +1,306 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.aayushatharva.brotli4j.encoder.Encoder;
+import com.aayushatharva.brotli4j.encoder.EncoderJNI;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * {@code AsyncEntityProducer} that Brotli-compresses bytes from an upstream producer
+ * on the fly and writes the compressed stream to the target {@link DataStreamChannel}.
+ * <p>
+ * Purely async/streaming: no {@code InputStream}/{@code OutputStream}. Back-pressure is
+ * honored via {@link #available()} and the I/O reactor’s calls into {@link #produce(DataStreamChannel)}.
+ * Trailers from the upstream producer are preserved and emitted once the compressed output
+ * has been fully drained.
+ * </p>
+ *
+ * <h4>Content metadata</h4>
+ * Returns {@code Content-Encoding: br}, {@code Content-Length: -1} and {@code chunked=true}.
+ * Repeatability matches the upstream producer.
+ *
+ * <h4>Implementation notes</h4>
+ * Uses Brotli4j’s {@code EncoderJNI.Wrapper}. JNI-owned output buffers are written directly
+ * when possible; if the channel applies back-pressure, the unwritten tail is copied into
+ * small pooled direct {@link java.nio.ByteBuffer}s to reduce allocation churn. Native
+ * resources are released in {@link #releaseResources()}.
+ * <p>
+ * Ensure {@link com.aayushatharva.brotli4j.Brotli4jLoader#ensureAvailability()} has been
+ * called once at startup; this class also invokes it in a static initializer as a safeguard.
+ * </p>
+ *
+ * <h4>Usage</h4>
+ * <pre>{@code
+ * AsyncEntityProducer plain = new StringAsyncEntityProducer("hello", ContentType.TEXT_PLAIN);
+ * AsyncEntityProducer br = new DeflatingBrotliEntityProducer(plain); // defaults q=5, lgwin=22
+ * client.execute(new BasicRequestProducer(post, br),
+ *                new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ *                null);
+ * }</pre>
+ *
+ * @see org.apache.hc.core5.http.nio.AsyncEntityProducer
+ * @see org.apache.hc.core5.http.nio.DataStreamChannel
+ * @see com.aayushatharva.brotli4j.encoder.EncoderJNI
+ * @since 5.6
+ */
+public final class DeflatingBrotliEntityProducer implements AsyncEntityProducer {
+
+    private enum State { STREAMING, FINISHING, DONE }
+
+    private final AsyncEntityProducer upstream;
+    private final EncoderJNI.Wrapper encoder;
+
+    private ByteBuffer pendingOut;
+    private List<? extends Header> pendingTrailers;
+    private State state = State.STREAMING;
+
+    /**
+     * Create a producer with explicit Brotli params.
+     *
+     * @param upstream upstream entity producer whose bytes will be compressed
+     * @param quality  Brotli quality level (see brotli4j documentation)
+     * @param lgwin    Brotli window size log2 (see brotli4j documentation)
+     * @param mode     Brotli mode hint (GENERIC/TEXT/FONT)
+     * @throws IOException if the native encoder cannot be created
+     * @since 5.6
+     */
+    public DeflatingBrotliEntityProducer(
+            final AsyncEntityProducer upstream,
+            final int quality,
+            final int lgwin,
+            final Encoder.Mode mode) throws IOException {
+        this.upstream = Args.notNull(upstream, "upstream");
+        this.encoder = new EncoderJNI.Wrapper(256 * 1024, quality, lgwin, mode);
+    }
+
+    /**
+     * Convenience constructor mapping {@code 0=GENERIC, 1=TEXT, 2=FONT}.
+     *
+     * @since 5.6
+     */
+    public DeflatingBrotliEntityProducer(
+            final AsyncEntityProducer upstream,
+            final int quality,
+            final int lgwin,
+            final int modeInt) throws IOException {
+        this(upstream, quality, lgwin,
+                modeInt == 1 ? Encoder.Mode.TEXT :
+                        modeInt == 2 ? Encoder.Mode.FONT : Encoder.Mode.GENERIC);
+    }
+
+    /**
+     * Create a producer with sensible defaults ({@code quality=5}, {@code lgwin=22}, {@code GENERIC}).
+     *
+     * @since 5.6
+     */
+    public DeflatingBrotliEntityProducer(final AsyncEntityProducer upstream) throws IOException {
+        this(upstream, 5, 22, Encoder.Mode.GENERIC);
+    }
+
+
+    @Override
+    public String getContentType() {
+        return upstream.getContentType();
+    }
+
+    @Override
+    public String getContentEncoding() {
+        return "br";
+    }
+
+    @Override
+    public long getContentLength() {
+        return -1;
+    }
+
+    @Override
+    public boolean isChunked() {
+        return true;
+    }
+
+    @Override
+    public Set<String> getTrailerNames() {
+        return upstream.getTrailerNames();
+    }
+
+    @Override
+    public boolean isRepeatable() {
+        return upstream.isRepeatable();
+    }
+
+    @Override
+    public int available() {
+        if (state == State.DONE) {
+            return 0;
+        }
+        if (pendingOut != null && pendingOut.hasRemaining() || pendingTrailers != null) {
+            return 1;
+        }
+        final int up = upstream.available();
+        return (state != State.STREAMING || up > 0) ? 1 : 0;
+    }
+
+    @Override
+    public void produce(final DataStreamChannel channel) throws IOException {
+        if (flushPending(channel)) {
+            return;
+        }
+
+        if (state == State.FINISHING) {
+            encoder.push(EncoderJNI.Operation.FINISH, 0);
+            if (drainEncoder(channel)) {
+                return;
+            }
+            if (pendingTrailers == null) {
+                pendingTrailers = Collections.emptyList();
+            }
+            channel.endStream(pendingTrailers);
+            pendingTrailers = null;
+            state = State.DONE;
+            return;
+        }
+
+        upstream.produce(new DataStreamChannel() {
+            @Override
+            public void requestOutput() {
+                channel.requestOutput();
+            }
+
+            @Override
+            public int write(final ByteBuffer src) throws IOException {
+                int accepted = 0;
+                while (src.hasRemaining()) {
+                    final ByteBuffer in = encoder.getInputBuffer();
+                    if (!in.hasRemaining()) {
+                        encoder.push(EncoderJNI.Operation.PROCESS, 0);
+                        if (drainEncoder(channel)) {
+                            break;
+                        }
+                        continue;
+                    }
+                    final int xfer = Math.min(src.remaining(), in.remaining());
+                    final int lim = src.limit();
+                    src.limit(src.position() + xfer);
+                    in.put(src);
+                    src.limit(lim);
+                    accepted += xfer;
+
+                    encoder.push(EncoderJNI.Operation.PROCESS, xfer);
+                    if (drainEncoder(channel)) {
+                        break;
+                    }
+                }
+                return accepted;
+            }
+
+            @Override
+            public void endStream() throws IOException {
+                endStream(Collections.emptyList());
+            }
+
+            @Override
+            public void endStream(final List<? extends Header> trailers) throws IOException {
+                pendingTrailers = trailers;
+                state = State.FINISHING;
+                encoder.push(EncoderJNI.Operation.FINISH, 0);
+                if (drainEncoder(channel)) {
+                    return;
+                }
+                if (pendingTrailers == null) {
+                    pendingTrailers = Collections.emptyList();
+                }
+                channel.endStream(pendingTrailers);
+                pendingTrailers = null;
+                state = State.DONE;
+            }
+        });
+    }
+
+    @Override
+    public void failed(final Exception cause) {
+        upstream.failed(cause);
+    }
+
+    @Override
+    public void releaseResources() {
+        try {
+            encoder.destroy();
+        } catch (final Throwable ignore) {
+        }
+        upstream.releaseResources();
+        pendingOut = null;
+        pendingTrailers = null;
+        state = State.DONE;
+    }
+
+
+    private boolean flushPending(final DataStreamChannel channel) throws IOException {
+        if (pendingOut != null && pendingOut.hasRemaining()) {
+            channel.write(pendingOut);
+            if (pendingOut.hasRemaining()) {
+                channel.requestOutput();
+                return true;
+            }
+            pendingOut = null;
+        }
+        if (pendingOut == null && pendingTrailers != null && state != State.STREAMING) {
+            channel.endStream(pendingTrailers);
+            pendingTrailers = null;
+            state = State.DONE;
+            return true;
+        }
+        return false;
+    }
+
+    private boolean drainEncoder(final DataStreamChannel channel) throws IOException {
+        while (encoder.hasMoreOutput()) {
+            final ByteBuffer buf = encoder.pull();
+            if (buf == null || !buf.hasRemaining()) {
+                continue;
+            }
+            channel.write(buf);
+            if (buf.hasRemaining()) {
+                pendingOut = ByteBuffer.allocateDirect(buf.remaining());
+                pendingOut.put(buf).flip();
+                channel.requestOutput();
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java
new file mode 100644
index 0000000..b2c3ae6
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java
@@ -0,0 +1,179 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.aayushatharva.brotli4j.decoder.DecoderJNI;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * {@code AsyncDataConsumer} that inflates a Brotli-compressed byte stream and forwards
+ * decompressed bytes to a downstream consumer.
+ * <p>
+ * Purely async/streaming: no {@code InputStream}/{@code OutputStream}. Back-pressure from
+ * the I/O reactor is propagated via {@link CapacityChannel}. JNI output buffers are copied
+ * into small reusable direct {@link java.nio.ByteBuffer}s before handing them to the
+ * downstream consumer (which may retain them).
+ * </p>
+ *
+ * <h4>Implementation notes</h4>
+ * Uses Brotli4j’s {@code DecoderJNI.Wrapper}. Native resources are released in
+ * {@link #releaseResources()}. Throws an {@link java.io.IOException} if the stream is
+ * truncated or corrupted.
+ * <p>
+ * Ensure {@link com.aayushatharva.brotli4j.Brotli4jLoader#ensureAvailability()} has been
+ * called once at startup; this class also invokes it in a static initializer as a safeguard.
+ * </p>
+ *
+ * <h4>Usage</h4>
+ * <pre>{@code
+ * AsyncDataConsumer textConsumer = new StringAsyncEntityConsumer();
+ * AsyncDataConsumer brInflating  = new InflatingBrotliDataConsumer(textConsumer);
+ * client.execute(producer, new BasicResponseConsumer<>(brInflating), null);
+ * }</pre>
+ *
+ * @see org.apache.hc.core5.http.nio.AsyncDataConsumer
+ * @see org.apache.hc.core5.http.nio.CapacityChannel
+ * @see com.aayushatharva.brotli4j.decoder.DecoderJNI
+ * @since 5.6
+ */
+public final class InflatingBrotliDataConsumer implements AsyncDataConsumer {
+
+    private final AsyncDataConsumer downstream;
+    private final DecoderJNI.Wrapper decoder;
+    private volatile CapacityChannel capacity;
+
+
+    public InflatingBrotliDataConsumer(final AsyncDataConsumer downstream) {
+        this.downstream = downstream;
+        try {
+            this.decoder = new DecoderJNI.Wrapper(8 * 1024);
+        } catch (final IOException e) {
+            throw new RuntimeException("Unable to initialize DecoderJNI", e);
+        }
+    }
+
+    @Override
+    public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+        this.capacity = capacityChannel;
+        downstream.updateCapacity(capacityChannel);
+    }
+
+    @Override
+    public void consume(final ByteBuffer src) throws IOException {
+        while (src.hasRemaining()) {
+            final ByteBuffer in = decoder.getInputBuffer();
+            final int xfer = Math.min(src.remaining(), in.remaining());
+            if (xfer == 0) {
+                decoder.push(0);
+                pump();
+                continue;
+            }
+            final int lim = src.limit();
+            src.limit(src.position() + xfer);
+            in.put(src);
+            src.limit(lim);
+
+            decoder.push(xfer);
+            pump();
+        }
+        final CapacityChannel ch = this.capacity;
+        if (ch != null) {
+            ch.update(Integer.MAX_VALUE);
+        }
+    }
+
+    @Override
+    public void streamEnd(final List<? extends Header> trailers) throws IOException, HttpException {
+        pump();
+        Asserts.check(decoder.getStatus() == DecoderJNI.Status.DONE || !decoder.hasOutput(),
+                "Truncated brotli stream");
+        downstream.streamEnd(trailers);
+    }
+
+    @Override
+    public void releaseResources() {
+        try {
+            decoder.destroy();
+        } catch (final Throwable ignore) {
+        }
+        downstream.releaseResources();
+    }
+
+    private void pump() throws IOException {
+        for (; ; ) {
+            switch (decoder.getStatus()) {
+                case OK:
+                    decoder.push(0);
+                    break;
+                case NEEDS_MORE_OUTPUT: {
+                    // Pull a decoder-owned buffer; copy before handing off.
+                    final ByteBuffer nativeBuf = decoder.pull();
+                    if (nativeBuf != null && nativeBuf.hasRemaining()) {
+                        final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining());
+                        copy.put(nativeBuf).flip();
+                        downstream.consume(copy);
+                    }
+                    break;
+                }
+                case NEEDS_MORE_INPUT:
+                    if (decoder.hasOutput()) {
+                        final ByteBuffer nativeBuf = decoder.pull();
+                        if (nativeBuf != null && nativeBuf.hasRemaining()) {
+                            final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining());
+                            copy.put(nativeBuf).flip();
+                            downstream.consume(copy);
+                            break;
+                        }
+                    }
+                    return; // wait for more input
+                case DONE:
+                    if (decoder.hasOutput()) {
+                        final ByteBuffer nativeBuf = decoder.pull();
+                        if (nativeBuf != null && nativeBuf.hasRemaining()) {
+                            final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining());
+                            copy.put(nativeBuf).flip();
+                            downstream.consume(copy);
+                            break;
+                        }
+                    }
+                    return;
+                default:
+                    // Corrupted stream
+                    throw new IOException("Brotli stream corrupted");
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java
new file mode 100644
index 0000000..f60b109
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java
@@ -0,0 +1,55 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+
+
+@Internal
+@Contract(threading = ThreadingBehavior.STATELESS)
+public final class Brotli4jRuntime {
+
+    private static final String BROTLI = "com.aayushatharva.brotli4j.Brotli4jLoader";
+
+    private Brotli4jRuntime() {
+    }
+
+    /**
+     * @return {@code true} if {@code com.aayushatharva.brotli4j} can be loaded
+     * by the current class loader; {@code false} otherwise
+     */
+    public static boolean available() {
+        try {
+            Class.forName(BROTLI, false, Brotli4jRuntime.class.getClassLoader());
+            return true;
+        } catch (ClassNotFoundException | LinkageError ex) {
+            return false;
+        }
+    }
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java
index 41e6950..cae264a 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java
@@ -38,9 +38,11 @@
 import org.apache.hc.client5.http.async.AsyncExecChain;
 import org.apache.hc.client5.http.async.AsyncExecChainHandler;
 import org.apache.hc.client5.http.async.methods.InflatingAsyncDataConsumer;
+import org.apache.hc.client5.http.async.methods.InflatingBrotliDataConsumer;
 import org.apache.hc.client5.http.async.methods.InflatingGzipDataConsumer;
 import org.apache.hc.client5.http.async.methods.InflatingZstdDataConsumer;
 import org.apache.hc.client5.http.entity.compress.ContentCoding;
+import org.apache.hc.client5.http.impl.Brotli4jRuntime;
 import org.apache.hc.client5.http.impl.ZstdRuntime;
 import org.apache.hc.client5.http.impl.ContentCodingSupport;
 import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -100,6 +102,11 @@ public ContentCompressionAsyncExec() {
             tokens.add("zstd");
         }
 
+        if (Brotli4jRuntime.available()) {
+            rb.register(ContentCoding.BROTLI.token(), InflatingBrotliDataConsumer::new);
+            tokens.add(ContentCoding.BROTLI.token());
+        }
+
         this.decoders = rb.build();
         this.acceptTokens = tokens;
     }
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java
new file mode 100644
index 0000000..549f12c
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java
@@ -0,0 +1,143 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.decoder.Decoder;
+import com.aayushatharva.brotli4j.decoder.DirectDecompress;
+import com.aayushatharva.brotli4j.encoder.Encoder;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class DeflatingBrotliEntityProducerTest {
+
+    private static String longText() {
+        final String seed = "lorem ipsum äëïöü – ";
+        final StringBuilder sb = new StringBuilder(seed.length() * 3000);
+        for (int i = 0; i < 3000; i++) {
+            sb.append(seed);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * DataStreamChannel that accepts at most maxChunk bytes per write.
+     */
+    private static final class ThrottledChannel implements DataStreamChannel {
+        private final ByteArrayOutputStream sink = new ByteArrayOutputStream();
+        private final int maxChunk;
+        private boolean ended;
+
+        ThrottledChannel(final int maxChunk) {
+            this.maxChunk = maxChunk;
+        }
+
+        @Override
+        public void requestOutput() { /* no-op for test */ }
+
+        @Override
+        public int write(final ByteBuffer src) {
+            final int len = Math.min(src.remaining(), maxChunk);
+            if (len <= 0) return 0;
+            final byte[] tmp = new byte[len];
+            src.get(tmp);
+            sink.write(tmp, 0, len);
+            return len;
+        }
+
+        @Override
+        public void endStream() {
+            // Core interface in some versions; delegate to list variant for safety
+            endStream(Collections.emptyList());
+        }
+
+        @Override
+        public void endStream(final List<? extends Header> trailers) {
+            ended = true;
+        }
+
+        byte[] data() {
+            return sink.toByteArray();
+        }
+
+        boolean ended() {
+            return ended;
+        }
+    }
+
+    @BeforeAll
+    static void init() {
+        Brotli4jLoader.ensureAvailability();
+    }
+
+    @Test
+    void roundTrip() throws Exception {
+        final String text = longText();
+
+        final byte[] payload = text.getBytes(java.nio.charset.StandardCharsets.UTF_8);
+        final AsyncEntityProducer raw =
+                new org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer(
+                        payload,
+                        org.apache.hc.core5.http.ContentType.TEXT_PLAIN.withCharset(java.nio.charset.StandardCharsets.UTF_8)
+                );
+        final DeflatingBrotliEntityProducer br =
+                new DeflatingBrotliEntityProducer(raw, 5, 22, Encoder.Mode.TEXT);
+
+        final ThrottledChannel ch = new ThrottledChannel(1024);
+
+        // drive until producer reports no more work
+        while (br.available() > 0) {
+            br.produce(ch);
+        }
+
+        final byte[] compressed = ch.data();
+        assertTrue(compressed.length > 0, "no compressed bytes were produced");
+
+        // Decompress using brotli4j
+        final DirectDecompress dd = Decoder.decompress(compressed);
+        final String decoded = new String(dd.getDecompressedData(), StandardCharsets.UTF_8);
+
+        assertEquals(text, decoded);
+        assertEquals("br", br.getContentEncoding());
+        assertTrue(br.isChunked());
+        assertEquals(-1, br.getContentLength());
+        assertTrue(ch.ended(), "stream was not ended");
+    }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java
new file mode 100644
index 0000000..8446d28
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java
@@ -0,0 +1,201 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.UnaryOperator;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.encoder.Encoder;
+
+import org.apache.hc.client5.http.impl.async.ContentCompressionAsyncExec;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class InflatingBrotliDataConsumerTest {
+
+    /**
+     * Collects bytes and decodes to UTF-8 once at the end.
+     */
+    private static final class ByteCollector implements AsyncEntityConsumer<String> {
+
+        private final java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream();
+        private FutureCallback<String> callback;
+
+        @Override
+        public void streamStart(final EntityDetails ed, final FutureCallback<String> cb) {
+            this.callback = cb;
+        }
+
+        @Override
+        public void updateCapacity(final CapacityChannel c) {
+        }
+
+        @Override
+        public void consume(final ByteBuffer src) {
+            final byte[] tmp = new byte[src.remaining()];
+            src.get(tmp);
+            buf.write(tmp, 0, tmp.length);
+        }
+
+        @Override
+        public void streamEnd(final List<? extends Header> t) {
+            if (callback != null) {
+                callback.completed(getContent());
+            }
+        }
+
+        @Override
+        public void failed(final Exception cause) {
+            throw new RuntimeException(cause);
+        }
+
+        @Override
+        public void releaseResources() {
+        }
+
+        @Override
+        public String getContent() {
+            return new String(buf.toByteArray(), StandardCharsets.UTF_8);
+        }
+    }
+
+    private static String ORIGINAL;
+
+    @BeforeAll
+    static void setUp() {
+        Brotli4jLoader.ensureAvailability();
+        final String seed = "Hello ✈ brotli 🎈! ";
+        final StringBuilder sb = new StringBuilder(seed.length() * 4000);
+        for (int i = 0; i < 4000; i++) {
+            sb.append(seed);
+        }
+        ORIGINAL = sb.toString();
+    }
+
+    private static byte[] brCompress() throws IOException {
+        final byte[] src = ORIGINAL.getBytes(StandardCharsets.UTF_8);
+        final Encoder.Parameters p = new Encoder.Parameters()
+                .setQuality(6)
+                .setWindow(22)
+                .setMode(Encoder.Mode.TEXT);
+        return Encoder.compress(src, p);
+    }
+
+    @Test
+    void inflateBrotli() throws Exception {
+
+        final byte[] compressed = brCompress();
+
+        final ByteCollector inner = new ByteCollector();
+        final InflatingBrotliDataConsumer inflating = new InflatingBrotliDataConsumer(inner);
+
+        final CountDownLatch done = new CountDownLatch(1);
+        final FutureCallback<String> cb = new FutureCallback<String>() {
+            @Override
+            public void completed(final String result) {
+                done.countDown();
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                fail(ex);
+            }
+
+            @Override
+            public void cancelled() {
+                fail("cancelled");
+            }
+        };
+
+        /* minimal EntityDetails stub */
+        final EntityDetails details = new EntityDetails() {
+            @Override
+            public long getContentLength() {
+                return compressed.length;
+            }
+
+            @Override
+            public String getContentType() {
+                return "text/plain";
+            }
+
+            @Override
+            public String getContentEncoding() {
+                return "br";
+            }
+
+            @Override
+            public boolean isChunked() {
+                return false;
+            }
+
+            @Override
+            public Set<String> getTrailerNames() {
+                return new HashSet<>();
+            }
+        };
+        inner.streamStart(details, cb);
+
+        for (int off = 0; off < compressed.length; off += 1024) {
+            final int n = Math.min(1024, compressed.length - off);
+            inflating.consume(ByteBuffer.wrap(compressed, off, n));
+        }
+        inflating.streamEnd(Collections.emptyList());
+
+        assertTrue(done.await(2, TimeUnit.SECONDS), "callback timeout");
+        assertEquals(ORIGINAL, inner.getContent(), "br inflate mismatch");
+    }
+
+    @Test
+    void registerInExec() {
+        final LinkedHashMap<String, UnaryOperator<AsyncDataConsumer>> map = new LinkedHashMap<>();
+        map.put("br", InflatingBrotliDataConsumer::new);
+        final ContentCompressionAsyncExec exec = new ContentCompressionAsyncExec(map, false);
+        assertNotNull(exec);
+    }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java
new file mode 100644
index 0000000..74f522f
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java
@@ -0,0 +1,220 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.examples;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.encoder.BrotliOutputStream;
+
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.ClassicHttpResponse;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
+import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
+import org.apache.hc.core5.http.io.HttpRequestHandler;
+import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+
+/**
+ * Async client/server demo with Brotli in both directions:
+ * <p>
+ * - Client sends a Brotli-compressed request body (Content-Encoding: br)
+ * - Server decompresses request, then responds with a Brotli-compressed body
+ * - Client checks the response Content-Encoding and decompresses if needed
+ * <p>
+ * Notes:
+ * - Encoding uses brotli4j (native JNI); make sure matching native dependency is on the runtime classpath.
+ * - Decoding here uses Commons Compress via CompressorStreamFactory("br").
+ */
+public final class AsyncClientServerBrotliRoundTrip {
+
+    static {
+        Brotli4jLoader.ensureAvailability();
+    }
+
+    private static final String BR = "br";
+
+    public static void main(final String[] args) throws Exception {
+        final HttpServer server = ServerBootstrap.bootstrap()
+                .setListenerPort(0)
+                .setCanonicalHostName("localhost")
+                .register("/echo", new EchoHandler())
+                .create();
+        server.start();
+        final int port = server.getLocalPort();
+        final String url = "http://localhost:" + port + "/echo";
+
+        try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) {
+            client.start();
+
+            final String requestBody = "Hello Brotli world (round-trip)!";
+            System.out.println("Request (plain): " + requestBody);
+
+            // --- client compresses request ---
+            final byte[] reqCompressed = brotliCompress(requestBody.getBytes(StandardCharsets.UTF_8));
+
+            final SimpleHttpRequest post = SimpleRequestBuilder.post(url)
+                    .setHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString())
+                    .setHeader(HttpHeaders.CONTENT_ENCODING, BR)
+                    .build();
+
+            final Future<Message<HttpResponse, byte[]>> f = client.execute(
+                    new BasicRequestProducer(post,
+                            new BasicAsyncEntityProducer(reqCompressed, ContentType.APPLICATION_OCTET_STREAM)),
+                    new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()),
+                    null);
+
+            final Message<HttpResponse, byte[]> msg = f.get();
+            final HttpResponse head = msg.getHead();
+            final byte[] respBodyRaw = msg.getBody() != null ? msg.getBody() : new byte[0];
+
+            System.out.println("Status           : " + head.getCode());
+            final Header ce = head.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+            final boolean isBr = ce != null && BR.equalsIgnoreCase(ce.getValue());
+            System.out.println("Response C-E     : " + (isBr ? BR : "(none)"));
+
+            final byte[] respPlain = isBr ? brotliDecompress(respBodyRaw) : respBodyRaw;
+            System.out.println("Response (plain) : " + new String(respPlain, StandardCharsets.UTF_8));
+        } finally {
+            server.close(CloseMode.GRACEFUL);
+        }
+    }
+
+    /**
+     * Server handler:
+     * - If request has Content-Encoding: br, decompress it
+     * - Echo the text back, but re-encode the response with Brotli (Content-Encoding: br)
+     */
+    private static final class EchoHandler implements HttpRequestHandler {
+        @Override
+        public void handle(
+                final ClassicHttpRequest request,
+                final ClassicHttpResponse response,
+                final HttpContext context) throws IOException {
+
+            final HttpEntity entity = request.getEntity();
+            if (entity == null) {
+                response.setCode(HttpStatus.SC_BAD_REQUEST);
+                response.setEntity(new StringEntity("Missing request body", StandardCharsets.UTF_8));
+                return;
+            }
+
+            try {
+                final byte[] requestPlain;
+                final Header ce = request.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+                if (ce != null && BR.equalsIgnoreCase(ce.getValue())) {
+                    try (final InputStream in = entity.getContent();
+                         final CompressorInputStream bin =
+                                 new CompressorStreamFactory().createCompressorInputStream(BR, in)) {
+                        requestPlain = readAll(bin);
+                    }
+                } else {
+                    try (final InputStream in = entity.getContent()) {
+                        requestPlain = readAll(in);
+                    }
+                }
+
+                final String echoed = new String(requestPlain, StandardCharsets.UTF_8);
+
+                // --- server compresses response with Brotli ---
+                final byte[] respCompressed = brotliCompress(echoed.getBytes(StandardCharsets.UTF_8));
+                response.setCode(HttpStatus.SC_OK);
+                response.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString());
+                response.addHeader(HttpHeaders.CONTENT_ENCODING, BR);
+                response.setEntity(new ByteArrayEntity(respCompressed, ContentType.APPLICATION_OCTET_STREAM));
+
+            } catch (final CompressorException ex) {
+                response.setCode(HttpStatus.SC_BAD_REQUEST);
+                response.setEntity(new StringEntity("Invalid Brotli payload", StandardCharsets.UTF_8));
+            } catch (final Exception ex) {
+                response.setCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+                response.setEntity(new StringEntity("Server error", StandardCharsets.UTF_8));
+            }
+        }
+    }
+
+    /**
+     * Utility: read entire stream into a byte[] (demo-only).
+     */
+    private static byte[] readAll(final InputStream in) throws IOException {
+        final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        final byte[] buf = new byte[8192];
+        int n;
+        while ((n = in.read(buf)) != -1) {
+            bos.write(buf, 0, n);
+        }
+        return bos.toByteArray();
+    }
+
+    /**
+     * Compress a byte[] with Brotli using brotli4j.
+     */
+    private static byte[] brotliCompress(final byte[] plain) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final BrotliOutputStream out = new BrotliOutputStream(baos)) {
+            out.write(plain);
+        }
+        return baos.toByteArray();
+    }
+
+    /**
+     * Decompress a Brotli-compressed byte[] using Commons Compress.
+     */
+    private static byte[] brotliDecompress(final byte[] compressed) throws IOException {
+        try (final InputStream in = new ByteArrayInputStream(compressed);
+             final CompressorInputStream bin = new CompressorStreamFactory().createCompressorInputStream(BR, in)) {
+            return readAll(bin);
+        } catch (final CompressorException e) {
+            throw new IOException("Failed to decompress Brotli data", e);
+        }
+    }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java
index 690bc07..a02ddb4 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java
@@ -116,7 +116,7 @@ void testAcceptEncodingAdded() throws Exception {
         final HttpRequest request = new BasicHttpRequest(Method.GET, "/path");
         executeAndCapture(request);
         assertTrue(request.containsHeader(HttpHeaders.ACCEPT_ENCODING));
-        assertEquals("gzip, x-gzip, deflate, zstd", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue());
+        assertEquals("gzip, x-gzip, deflate, zstd, br", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue());
     }
 
     @Test
diff --git a/pom.xml b/pom.xml
index 9039588..1eb352e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
     <httpcore.version>5.3.6</httpcore.version>
     <log4j.version>2.25.0</log4j.version>
     <brotli.version>0.1.2</brotli.version>
+    <brotli4j.version>1.20.0</brotli4j.version>
     <conscrypt.version>2.5.2</conscrypt.version>
     <ehcache.version>3.10.8</ehcache.version>
     <memcached.version>2.12.3</memcached.version>
@@ -256,6 +257,12 @@
         <version>${otel.version}</version>
         <scope>test</scope>
       </dependency>
+      <dependency>
+        <groupId>com.aayushatharva.brotli4j</groupId>
+        <artifactId>brotli4j</artifactId>
+        <version>${brotli4j.version}</version>
+        <optional>true</optional>
+      </dependency>
     </dependencies>
   </dependencyManagement>