Add async Brotli support (compress/decompress). (#720)
Introduce DeflatingBrotliAsyncEntityProducer and InflatingBrotliAsyncDataConsumer; register "br" when brotli4j is available.
diff --git a/httpclient5/pom.xml b/httpclient5/pom.xml
index b97e1d2..4478777 100644
--- a/httpclient5/pom.xml
+++ b/httpclient5/pom.xml
@@ -118,6 +118,10 @@
<artifactId>zstd-jni</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>com.aayushatharva.brotli4j</groupId>
+ <artifactId>brotli4j</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java
new file mode 100644
index 0000000..676c530
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducer.java
@@ -0,0 +1,306 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.aayushatharva.brotli4j.encoder.Encoder;
+import com.aayushatharva.brotli4j.encoder.EncoderJNI;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.apache.hc.core5.util.Args;
+
+/**
+ * {@code AsyncEntityProducer} that Brotli-compresses bytes from an upstream producer
+ * on the fly and writes the compressed stream to the target {@link DataStreamChannel}.
+ * <p>
+ * Purely async/streaming: no {@code InputStream}/{@code OutputStream}. Back-pressure is
+ * honored via {@link #available()} and the I/O reactor’s calls into {@link #produce(DataStreamChannel)}.
+ * Trailers from the upstream producer are preserved and emitted once the compressed output
+ * has been fully drained.
+ * </p>
+ *
+ * <h4>Content metadata</h4>
+ * Returns {@code Content-Encoding: br}, {@code Content-Length: -1} and {@code chunked=true}.
+ * Repeatability matches the upstream producer.
+ *
+ * <h4>Implementation notes</h4>
+ * Uses Brotli4j’s {@code EncoderJNI.Wrapper}. JNI-owned output buffers are written directly
+ * when possible; if the channel applies back-pressure, the unwritten tail is copied into
+ * small pooled direct {@link java.nio.ByteBuffer}s to reduce allocation churn. Native
+ * resources are released in {@link #releaseResources()}.
+ * <p>
+ * Ensure {@link com.aayushatharva.brotli4j.Brotli4jLoader#ensureAvailability()} has been
+ * called once at startup; this class also invokes it in a static initializer as a safeguard.
+ * </p>
+ *
+ * <h4>Usage</h4>
+ * <pre>{@code
+ * AsyncEntityProducer plain = new StringAsyncEntityProducer("hello", ContentType.TEXT_PLAIN);
+ * AsyncEntityProducer br = new DeflatingBrotliEntityProducer(plain); // defaults q=5, lgwin=22
+ * client.execute(new BasicRequestProducer(post, br),
+ * new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ * null);
+ * }</pre>
+ *
+ * @see org.apache.hc.core5.http.nio.AsyncEntityProducer
+ * @see org.apache.hc.core5.http.nio.DataStreamChannel
+ * @see com.aayushatharva.brotli4j.encoder.EncoderJNI
+ * @since 5.6
+ */
+public final class DeflatingBrotliEntityProducer implements AsyncEntityProducer {
+
+ private enum State { STREAMING, FINISHING, DONE }
+
+ private final AsyncEntityProducer upstream;
+ private final EncoderJNI.Wrapper encoder;
+
+ private ByteBuffer pendingOut;
+ private List<? extends Header> pendingTrailers;
+ private State state = State.STREAMING;
+
+ /**
+ * Create a producer with explicit Brotli params.
+ *
+ * @param upstream upstream entity producer whose bytes will be compressed
+ * @param quality Brotli quality level (see brotli4j documentation)
+ * @param lgwin Brotli window size log2 (see brotli4j documentation)
+ * @param mode Brotli mode hint (GENERIC/TEXT/FONT)
+ * @throws IOException if the native encoder cannot be created
+ * @since 5.6
+ */
+ public DeflatingBrotliEntityProducer(
+ final AsyncEntityProducer upstream,
+ final int quality,
+ final int lgwin,
+ final Encoder.Mode mode) throws IOException {
+ this.upstream = Args.notNull(upstream, "upstream");
+ this.encoder = new EncoderJNI.Wrapper(256 * 1024, quality, lgwin, mode);
+ }
+
+ /**
+ * Convenience constructor mapping {@code 0=GENERIC, 1=TEXT, 2=FONT}.
+ *
+ * @since 5.6
+ */
+ public DeflatingBrotliEntityProducer(
+ final AsyncEntityProducer upstream,
+ final int quality,
+ final int lgwin,
+ final int modeInt) throws IOException {
+ this(upstream, quality, lgwin,
+ modeInt == 1 ? Encoder.Mode.TEXT :
+ modeInt == 2 ? Encoder.Mode.FONT : Encoder.Mode.GENERIC);
+ }
+
+ /**
+ * Create a producer with sensible defaults ({@code quality=5}, {@code lgwin=22}, {@code GENERIC}).
+ *
+ * @since 5.6
+ */
+ public DeflatingBrotliEntityProducer(final AsyncEntityProducer upstream) throws IOException {
+ this(upstream, 5, 22, Encoder.Mode.GENERIC);
+ }
+
+
+ @Override
+ public String getContentType() {
+ return upstream.getContentType();
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return "br";
+ }
+
+ @Override
+ public long getContentLength() {
+ return -1;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return true;
+ }
+
+ @Override
+ public Set<String> getTrailerNames() {
+ return upstream.getTrailerNames();
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return upstream.isRepeatable();
+ }
+
+ @Override
+ public int available() {
+ if (state == State.DONE) {
+ return 0;
+ }
+ if (pendingOut != null && pendingOut.hasRemaining() || pendingTrailers != null) {
+ return 1;
+ }
+ final int up = upstream.available();
+ return (state != State.STREAMING || up > 0) ? 1 : 0;
+ }
+
+ @Override
+ public void produce(final DataStreamChannel channel) throws IOException {
+ if (flushPending(channel)) {
+ return;
+ }
+
+ if (state == State.FINISHING) {
+ encoder.push(EncoderJNI.Operation.FINISH, 0);
+ if (drainEncoder(channel)) {
+ return;
+ }
+ if (pendingTrailers == null) {
+ pendingTrailers = Collections.emptyList();
+ }
+ channel.endStream(pendingTrailers);
+ pendingTrailers = null;
+ state = State.DONE;
+ return;
+ }
+
+ upstream.produce(new DataStreamChannel() {
+ @Override
+ public void requestOutput() {
+ channel.requestOutput();
+ }
+
+ @Override
+ public int write(final ByteBuffer src) throws IOException {
+ int accepted = 0;
+ while (src.hasRemaining()) {
+ final ByteBuffer in = encoder.getInputBuffer();
+ if (!in.hasRemaining()) {
+ encoder.push(EncoderJNI.Operation.PROCESS, 0);
+ if (drainEncoder(channel)) {
+ break;
+ }
+ continue;
+ }
+ final int xfer = Math.min(src.remaining(), in.remaining());
+ final int lim = src.limit();
+ src.limit(src.position() + xfer);
+ in.put(src);
+ src.limit(lim);
+ accepted += xfer;
+
+ encoder.push(EncoderJNI.Operation.PROCESS, xfer);
+ if (drainEncoder(channel)) {
+ break;
+ }
+ }
+ return accepted;
+ }
+
+ @Override
+ public void endStream() throws IOException {
+ endStream(Collections.emptyList());
+ }
+
+ @Override
+ public void endStream(final List<? extends Header> trailers) throws IOException {
+ pendingTrailers = trailers;
+ state = State.FINISHING;
+ encoder.push(EncoderJNI.Operation.FINISH, 0);
+ if (drainEncoder(channel)) {
+ return;
+ }
+ if (pendingTrailers == null) {
+ pendingTrailers = Collections.emptyList();
+ }
+ channel.endStream(pendingTrailers);
+ pendingTrailers = null;
+ state = State.DONE;
+ }
+ });
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ upstream.failed(cause);
+ }
+
+ @Override
+ public void releaseResources() {
+ try {
+ encoder.destroy();
+ } catch (final Throwable ignore) {
+ }
+ upstream.releaseResources();
+ pendingOut = null;
+ pendingTrailers = null;
+ state = State.DONE;
+ }
+
+
+ private boolean flushPending(final DataStreamChannel channel) throws IOException {
+ if (pendingOut != null && pendingOut.hasRemaining()) {
+ channel.write(pendingOut);
+ if (pendingOut.hasRemaining()) {
+ channel.requestOutput();
+ return true;
+ }
+ pendingOut = null;
+ }
+ if (pendingOut == null && pendingTrailers != null && state != State.STREAMING) {
+ channel.endStream(pendingTrailers);
+ pendingTrailers = null;
+ state = State.DONE;
+ return true;
+ }
+ return false;
+ }
+
+ private boolean drainEncoder(final DataStreamChannel channel) throws IOException {
+ while (encoder.hasMoreOutput()) {
+ final ByteBuffer buf = encoder.pull();
+ if (buf == null || !buf.hasRemaining()) {
+ continue;
+ }
+ channel.write(buf);
+ if (buf.hasRemaining()) {
+ pendingOut = ByteBuffer.allocateDirect(buf.remaining());
+ pendingOut.put(buf).flip();
+ channel.requestOutput();
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java
new file mode 100644
index 0000000..b2c3ae6
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumer.java
@@ -0,0 +1,179 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.aayushatharva.brotli4j.decoder.DecoderJNI;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpException;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.apache.hc.core5.util.Asserts;
+
+/**
+ * {@code AsyncDataConsumer} that inflates a Brotli-compressed byte stream and forwards
+ * decompressed bytes to a downstream consumer.
+ * <p>
+ * Purely async/streaming: no {@code InputStream}/{@code OutputStream}. Back-pressure from
+ * the I/O reactor is propagated via {@link CapacityChannel}. JNI output buffers are copied
+ * into small reusable direct {@link java.nio.ByteBuffer}s before handing them to the
+ * downstream consumer (which may retain them).
+ * </p>
+ *
+ * <h4>Implementation notes</h4>
+ * Uses Brotli4j’s {@code DecoderJNI.Wrapper}. Native resources are released in
+ * {@link #releaseResources()}. Throws an {@link java.io.IOException} if the stream is
+ * truncated or corrupted.
+ * <p>
+ * Ensure {@link com.aayushatharva.brotli4j.Brotli4jLoader#ensureAvailability()} has been
+ * called once at startup; this class also invokes it in a static initializer as a safeguard.
+ * </p>
+ *
+ * <h4>Usage</h4>
+ * <pre>{@code
+ * AsyncDataConsumer textConsumer = new StringAsyncEntityConsumer();
+ * AsyncDataConsumer brInflating = new InflatingBrotliDataConsumer(textConsumer);
+ * client.execute(producer, new BasicResponseConsumer<>(brInflating), null);
+ * }</pre>
+ *
+ * @see org.apache.hc.core5.http.nio.AsyncDataConsumer
+ * @see org.apache.hc.core5.http.nio.CapacityChannel
+ * @see com.aayushatharva.brotli4j.decoder.DecoderJNI
+ * @since 5.6
+ */
+public final class InflatingBrotliDataConsumer implements AsyncDataConsumer {
+
+ private final AsyncDataConsumer downstream;
+ private final DecoderJNI.Wrapper decoder;
+ private volatile CapacityChannel capacity;
+
+
+ public InflatingBrotliDataConsumer(final AsyncDataConsumer downstream) {
+ this.downstream = downstream;
+ try {
+ this.decoder = new DecoderJNI.Wrapper(8 * 1024);
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to initialize DecoderJNI", e);
+ }
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
+ this.capacity = capacityChannel;
+ downstream.updateCapacity(capacityChannel);
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) throws IOException {
+ while (src.hasRemaining()) {
+ final ByteBuffer in = decoder.getInputBuffer();
+ final int xfer = Math.min(src.remaining(), in.remaining());
+ if (xfer == 0) {
+ decoder.push(0);
+ pump();
+ continue;
+ }
+ final int lim = src.limit();
+ src.limit(src.position() + xfer);
+ in.put(src);
+ src.limit(lim);
+
+ decoder.push(xfer);
+ pump();
+ }
+ final CapacityChannel ch = this.capacity;
+ if (ch != null) {
+ ch.update(Integer.MAX_VALUE);
+ }
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> trailers) throws IOException, HttpException {
+ pump();
+ Asserts.check(decoder.getStatus() == DecoderJNI.Status.DONE || !decoder.hasOutput(),
+ "Truncated brotli stream");
+ downstream.streamEnd(trailers);
+ }
+
+ @Override
+ public void releaseResources() {
+ try {
+ decoder.destroy();
+ } catch (final Throwable ignore) {
+ }
+ downstream.releaseResources();
+ }
+
+ private void pump() throws IOException {
+ for (; ; ) {
+ switch (decoder.getStatus()) {
+ case OK:
+ decoder.push(0);
+ break;
+ case NEEDS_MORE_OUTPUT: {
+ // Pull a decoder-owned buffer; copy before handing off.
+ final ByteBuffer nativeBuf = decoder.pull();
+ if (nativeBuf != null && nativeBuf.hasRemaining()) {
+ final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining());
+ copy.put(nativeBuf).flip();
+ downstream.consume(copy);
+ }
+ break;
+ }
+ case NEEDS_MORE_INPUT:
+ if (decoder.hasOutput()) {
+ final ByteBuffer nativeBuf = decoder.pull();
+ if (nativeBuf != null && nativeBuf.hasRemaining()) {
+ final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining());
+ copy.put(nativeBuf).flip();
+ downstream.consume(copy);
+ break;
+ }
+ }
+ return; // wait for more input
+ case DONE:
+ if (decoder.hasOutput()) {
+ final ByteBuffer nativeBuf = decoder.pull();
+ if (nativeBuf != null && nativeBuf.hasRemaining()) {
+ final ByteBuffer copy = ByteBuffer.allocateDirect(nativeBuf.remaining());
+ copy.put(nativeBuf).flip();
+ downstream.consume(copy);
+ break;
+ }
+ }
+ return;
+ default:
+ // Corrupted stream
+ throw new IOException("Brotli stream corrupted");
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java
new file mode 100644
index 0000000..f60b109
--- /dev/null
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/Brotli4jRuntime.java
@@ -0,0 +1,55 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.impl;
+
+import org.apache.hc.core5.annotation.Contract;
+import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.annotation.ThreadingBehavior;
+
+
+@Internal
+@Contract(threading = ThreadingBehavior.STATELESS)
+public final class Brotli4jRuntime {
+
+ private static final String BROTLI = "com.aayushatharva.brotli4j.Brotli4jLoader";
+
+ private Brotli4jRuntime() {
+ }
+
+ /**
+ * @return {@code true} if {@code com.aayushatharva.brotli4j} can be loaded
+ * by the current class loader; {@code false} otherwise
+ */
+ public static boolean available() {
+ try {
+ Class.forName(BROTLI, false, Brotli4jRuntime.class.getClassLoader());
+ return true;
+ } catch (ClassNotFoundException | LinkageError ex) {
+ return false;
+ }
+ }
+}
diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java
index 41e6950..cae264a 100644
--- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java
+++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/ContentCompressionAsyncExec.java
@@ -38,9 +38,11 @@
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecChainHandler;
import org.apache.hc.client5.http.async.methods.InflatingAsyncDataConsumer;
+import org.apache.hc.client5.http.async.methods.InflatingBrotliDataConsumer;
import org.apache.hc.client5.http.async.methods.InflatingGzipDataConsumer;
import org.apache.hc.client5.http.async.methods.InflatingZstdDataConsumer;
import org.apache.hc.client5.http.entity.compress.ContentCoding;
+import org.apache.hc.client5.http.impl.Brotli4jRuntime;
import org.apache.hc.client5.http.impl.ZstdRuntime;
import org.apache.hc.client5.http.impl.ContentCodingSupport;
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -100,6 +102,11 @@ public ContentCompressionAsyncExec() {
tokens.add("zstd");
}
+ if (Brotli4jRuntime.available()) {
+ rb.register(ContentCoding.BROTLI.token(), InflatingBrotliDataConsumer::new);
+ tokens.add(ContentCoding.BROTLI.token());
+ }
+
this.decoders = rb.build();
this.acceptTokens = tokens;
}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java
new file mode 100644
index 0000000..549f12c
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/DeflatingBrotliEntityProducerTest.java
@@ -0,0 +1,143 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.decoder.Decoder;
+import com.aayushatharva.brotli4j.decoder.DirectDecompress;
+import com.aayushatharva.brotli4j.encoder.Encoder;
+
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.http.nio.DataStreamChannel;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class DeflatingBrotliEntityProducerTest {
+
+ private static String longText() {
+ final String seed = "lorem ipsum äëïöü – ";
+ final StringBuilder sb = new StringBuilder(seed.length() * 3000);
+ for (int i = 0; i < 3000; i++) {
+ sb.append(seed);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * DataStreamChannel that accepts at most maxChunk bytes per write.
+ */
+ private static final class ThrottledChannel implements DataStreamChannel {
+ private final ByteArrayOutputStream sink = new ByteArrayOutputStream();
+ private final int maxChunk;
+ private boolean ended;
+
+ ThrottledChannel(final int maxChunk) {
+ this.maxChunk = maxChunk;
+ }
+
+ @Override
+ public void requestOutput() { /* no-op for test */ }
+
+ @Override
+ public int write(final ByteBuffer src) {
+ final int len = Math.min(src.remaining(), maxChunk);
+ if (len <= 0) return 0;
+ final byte[] tmp = new byte[len];
+ src.get(tmp);
+ sink.write(tmp, 0, len);
+ return len;
+ }
+
+ @Override
+ public void endStream() {
+ // Core interface in some versions; delegate to list variant for safety
+ endStream(Collections.emptyList());
+ }
+
+ @Override
+ public void endStream(final List<? extends Header> trailers) {
+ ended = true;
+ }
+
+ byte[] data() {
+ return sink.toByteArray();
+ }
+
+ boolean ended() {
+ return ended;
+ }
+ }
+
+ @BeforeAll
+ static void init() {
+ Brotli4jLoader.ensureAvailability();
+ }
+
+ @Test
+ void roundTrip() throws Exception {
+ final String text = longText();
+
+ final byte[] payload = text.getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ final AsyncEntityProducer raw =
+ new org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer(
+ payload,
+ org.apache.hc.core5.http.ContentType.TEXT_PLAIN.withCharset(java.nio.charset.StandardCharsets.UTF_8)
+ );
+ final DeflatingBrotliEntityProducer br =
+ new DeflatingBrotliEntityProducer(raw, 5, 22, Encoder.Mode.TEXT);
+
+ final ThrottledChannel ch = new ThrottledChannel(1024);
+
+ // drive until producer reports no more work
+ while (br.available() > 0) {
+ br.produce(ch);
+ }
+
+ final byte[] compressed = ch.data();
+ assertTrue(compressed.length > 0, "no compressed bytes were produced");
+
+ // Decompress using brotli4j
+ final DirectDecompress dd = Decoder.decompress(compressed);
+ final String decoded = new String(dd.getDecompressedData(), StandardCharsets.UTF_8);
+
+ assertEquals(text, decoded);
+ assertEquals("br", br.getContentEncoding());
+ assertTrue(br.isChunked());
+ assertEquals(-1, br.getContentLength());
+ assertTrue(ch.ended(), "stream was not ended");
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java
new file mode 100644
index 0000000..8446d28
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/async/methods/InflatingBrotliDataConsumerTest.java
@@ -0,0 +1,201 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.async.methods;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.UnaryOperator;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.encoder.Encoder;
+
+import org.apache.hc.client5.http.impl.async.ContentCompressionAsyncExec;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.EntityDetails;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.nio.AsyncDataConsumer;
+import org.apache.hc.core5.http.nio.AsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.CapacityChannel;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+class InflatingBrotliDataConsumerTest {
+
+ /**
+ * Collects bytes and decodes to UTF-8 once at the end.
+ */
+ private static final class ByteCollector implements AsyncEntityConsumer<String> {
+
+ private final java.io.ByteArrayOutputStream buf = new java.io.ByteArrayOutputStream();
+ private FutureCallback<String> callback;
+
+ @Override
+ public void streamStart(final EntityDetails ed, final FutureCallback<String> cb) {
+ this.callback = cb;
+ }
+
+ @Override
+ public void updateCapacity(final CapacityChannel c) {
+ }
+
+ @Override
+ public void consume(final ByteBuffer src) {
+ final byte[] tmp = new byte[src.remaining()];
+ src.get(tmp);
+ buf.write(tmp, 0, tmp.length);
+ }
+
+ @Override
+ public void streamEnd(final List<? extends Header> t) {
+ if (callback != null) {
+ callback.completed(getContent());
+ }
+ }
+
+ @Override
+ public void failed(final Exception cause) {
+ throw new RuntimeException(cause);
+ }
+
+ @Override
+ public void releaseResources() {
+ }
+
+ @Override
+ public String getContent() {
+ return new String(buf.toByteArray(), StandardCharsets.UTF_8);
+ }
+ }
+
+ private static String ORIGINAL;
+
+ @BeforeAll
+ static void setUp() {
+ Brotli4jLoader.ensureAvailability();
+ final String seed = "Hello ✈ brotli 🎈! ";
+ final StringBuilder sb = new StringBuilder(seed.length() * 4000);
+ for (int i = 0; i < 4000; i++) {
+ sb.append(seed);
+ }
+ ORIGINAL = sb.toString();
+ }
+
+ private static byte[] brCompress() throws IOException {
+ final byte[] src = ORIGINAL.getBytes(StandardCharsets.UTF_8);
+ final Encoder.Parameters p = new Encoder.Parameters()
+ .setQuality(6)
+ .setWindow(22)
+ .setMode(Encoder.Mode.TEXT);
+ return Encoder.compress(src, p);
+ }
+
+ @Test
+ void inflateBrotli() throws Exception {
+
+ final byte[] compressed = brCompress();
+
+ final ByteCollector inner = new ByteCollector();
+ final InflatingBrotliDataConsumer inflating = new InflatingBrotliDataConsumer(inner);
+
+ final CountDownLatch done = new CountDownLatch(1);
+ final FutureCallback<String> cb = new FutureCallback<String>() {
+ @Override
+ public void completed(final String result) {
+ done.countDown();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ fail(ex);
+ }
+
+ @Override
+ public void cancelled() {
+ fail("cancelled");
+ }
+ };
+
+ /* minimal EntityDetails stub */
+ final EntityDetails details = new EntityDetails() {
+ @Override
+ public long getContentLength() {
+ return compressed.length;
+ }
+
+ @Override
+ public String getContentType() {
+ return "text/plain";
+ }
+
+ @Override
+ public String getContentEncoding() {
+ return "br";
+ }
+
+ @Override
+ public boolean isChunked() {
+ return false;
+ }
+
+ @Override
+ public Set<String> getTrailerNames() {
+ return new HashSet<>();
+ }
+ };
+ inner.streamStart(details, cb);
+
+ for (int off = 0; off < compressed.length; off += 1024) {
+ final int n = Math.min(1024, compressed.length - off);
+ inflating.consume(ByteBuffer.wrap(compressed, off, n));
+ }
+ inflating.streamEnd(Collections.emptyList());
+
+ assertTrue(done.await(2, TimeUnit.SECONDS), "callback timeout");
+ assertEquals(ORIGINAL, inner.getContent(), "br inflate mismatch");
+ }
+
+ @Test
+ void registerInExec() {
+ final LinkedHashMap<String, UnaryOperator<AsyncDataConsumer>> map = new LinkedHashMap<>();
+ map.put("br", InflatingBrotliDataConsumer::new);
+ final ContentCompressionAsyncExec exec = new ContentCompressionAsyncExec(map, false);
+ assertNotNull(exec);
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java
new file mode 100644
index 0000000..74f522f
--- /dev/null
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/examples/AsyncClientServerBrotliRoundTrip.java
@@ -0,0 +1,220 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.examples;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Future;
+
+import com.aayushatharva.brotli4j.Brotli4jLoader;
+import com.aayushatharva.brotli4j.encoder.BrotliOutputStream;
+
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
+import org.apache.hc.core5.http.ClassicHttpRequest;
+import org.apache.hc.core5.http.ClassicHttpResponse;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.HttpEntity;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpResponse;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Message;
+import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
+import org.apache.hc.core5.http.impl.bootstrap.ServerBootstrap;
+import org.apache.hc.core5.http.io.HttpRequestHandler;
+import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
+import org.apache.hc.core5.http.io.entity.StringEntity;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
+import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityProducer;
+import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
+import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
+import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.io.CloseMode;
+
+/**
+ * Async client/server demo with Brotli in both directions:
+ * <p>
+ * - Client sends a Brotli-compressed request body (Content-Encoding: br)
+ * - Server decompresses request, then responds with a Brotli-compressed body
+ * - Client checks the response Content-Encoding and decompresses if needed
+ * <p>
+ * Notes:
+ * - Encoding uses brotli4j (native JNI); make sure matching native dependency is on the runtime classpath.
+ * - Decoding here uses Commons Compress via CompressorStreamFactory("br").
+ */
+public final class AsyncClientServerBrotliRoundTrip {
+
+ static {
+ Brotli4jLoader.ensureAvailability();
+ }
+
+ private static final String BR = "br";
+
+ public static void main(final String[] args) throws Exception {
+ final HttpServer server = ServerBootstrap.bootstrap()
+ .setListenerPort(0)
+ .setCanonicalHostName("localhost")
+ .register("/echo", new EchoHandler())
+ .create();
+ server.start();
+ final int port = server.getLocalPort();
+ final String url = "http://localhost:" + port + "/echo";
+
+ try (final CloseableHttpAsyncClient client = HttpAsyncClients.createDefault()) {
+ client.start();
+
+ final String requestBody = "Hello Brotli world (round-trip)!";
+ System.out.println("Request (plain): " + requestBody);
+
+ // --- client compresses request ---
+ final byte[] reqCompressed = brotliCompress(requestBody.getBytes(StandardCharsets.UTF_8));
+
+ final SimpleHttpRequest post = SimpleRequestBuilder.post(url)
+ .setHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString())
+ .setHeader(HttpHeaders.CONTENT_ENCODING, BR)
+ .build();
+
+ final Future<Message<HttpResponse, byte[]>> f = client.execute(
+ new BasicRequestProducer(post,
+ new BasicAsyncEntityProducer(reqCompressed, ContentType.APPLICATION_OCTET_STREAM)),
+ new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()),
+ null);
+
+ final Message<HttpResponse, byte[]> msg = f.get();
+ final HttpResponse head = msg.getHead();
+ final byte[] respBodyRaw = msg.getBody() != null ? msg.getBody() : new byte[0];
+
+ System.out.println("Status : " + head.getCode());
+ final Header ce = head.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+ final boolean isBr = ce != null && BR.equalsIgnoreCase(ce.getValue());
+ System.out.println("Response C-E : " + (isBr ? BR : "(none)"));
+
+ final byte[] respPlain = isBr ? brotliDecompress(respBodyRaw) : respBodyRaw;
+ System.out.println("Response (plain) : " + new String(respPlain, StandardCharsets.UTF_8));
+ } finally {
+ server.close(CloseMode.GRACEFUL);
+ }
+ }
+
+ /**
+ * Server handler:
+ * - If request has Content-Encoding: br, decompress it
+ * - Echo the text back, but re-encode the response with Brotli (Content-Encoding: br)
+ */
+ private static final class EchoHandler implements HttpRequestHandler {
+ @Override
+ public void handle(
+ final ClassicHttpRequest request,
+ final ClassicHttpResponse response,
+ final HttpContext context) throws IOException {
+
+ final HttpEntity entity = request.getEntity();
+ if (entity == null) {
+ response.setCode(HttpStatus.SC_BAD_REQUEST);
+ response.setEntity(new StringEntity("Missing request body", StandardCharsets.UTF_8));
+ return;
+ }
+
+ try {
+ final byte[] requestPlain;
+ final Header ce = request.getFirstHeader(HttpHeaders.CONTENT_ENCODING);
+ if (ce != null && BR.equalsIgnoreCase(ce.getValue())) {
+ try (final InputStream in = entity.getContent();
+ final CompressorInputStream bin =
+ new CompressorStreamFactory().createCompressorInputStream(BR, in)) {
+ requestPlain = readAll(bin);
+ }
+ } else {
+ try (final InputStream in = entity.getContent()) {
+ requestPlain = readAll(in);
+ }
+ }
+
+ final String echoed = new String(requestPlain, StandardCharsets.UTF_8);
+
+ // --- server compresses response with Brotli ---
+ final byte[] respCompressed = brotliCompress(echoed.getBytes(StandardCharsets.UTF_8));
+ response.setCode(HttpStatus.SC_OK);
+ response.addHeader(HttpHeaders.CONTENT_TYPE, ContentType.TEXT_PLAIN.toString());
+ response.addHeader(HttpHeaders.CONTENT_ENCODING, BR);
+ response.setEntity(new ByteArrayEntity(respCompressed, ContentType.APPLICATION_OCTET_STREAM));
+
+ } catch (final CompressorException ex) {
+ response.setCode(HttpStatus.SC_BAD_REQUEST);
+ response.setEntity(new StringEntity("Invalid Brotli payload", StandardCharsets.UTF_8));
+ } catch (final Exception ex) {
+ response.setCode(HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ response.setEntity(new StringEntity("Server error", StandardCharsets.UTF_8));
+ }
+ }
+ }
+
+ /**
+ * Utility: read entire stream into a byte[] (demo-only).
+ */
+ private static byte[] readAll(final InputStream in) throws IOException {
+ final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ final byte[] buf = new byte[8192];
+ int n;
+ while ((n = in.read(buf)) != -1) {
+ bos.write(buf, 0, n);
+ }
+ return bos.toByteArray();
+ }
+
+ /**
+ * Compress a byte[] with Brotli using brotli4j.
+ */
+ private static byte[] brotliCompress(final byte[] plain) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final BrotliOutputStream out = new BrotliOutputStream(baos)) {
+ out.write(plain);
+ }
+ return baos.toByteArray();
+ }
+
+ /**
+ * Decompress a Brotli-compressed byte[] using Commons Compress.
+ */
+ private static byte[] brotliDecompress(final byte[] compressed) throws IOException {
+ try (final InputStream in = new ByteArrayInputStream(compressed);
+ final CompressorInputStream bin = new CompressorStreamFactory().createCompressorInputStream(BR, in)) {
+ return readAll(bin);
+ } catch (final CompressorException e) {
+ throw new IOException("Failed to decompress Brotli data", e);
+ }
+ }
+}
diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java
index 690bc07..a02ddb4 100644
--- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java
+++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/async/TestContentCompressionAsyncExec.java
@@ -116,7 +116,7 @@ void testAcceptEncodingAdded() throws Exception {
final HttpRequest request = new BasicHttpRequest(Method.GET, "/path");
executeAndCapture(request);
assertTrue(request.containsHeader(HttpHeaders.ACCEPT_ENCODING));
- assertEquals("gzip, x-gzip, deflate, zstd", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue());
+ assertEquals("gzip, x-gzip, deflate, zstd, br", request.getFirstHeader(HttpHeaders.ACCEPT_ENCODING).getValue());
}
@Test
diff --git a/pom.xml b/pom.xml
index 9039588..1eb352e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
<httpcore.version>5.3.6</httpcore.version>
<log4j.version>2.25.0</log4j.version>
<brotli.version>0.1.2</brotli.version>
+ <brotli4j.version>1.20.0</brotli4j.version>
<conscrypt.version>2.5.2</conscrypt.version>
<ehcache.version>3.10.8</ehcache.version>
<memcached.version>2.12.3</memcached.version>
@@ -256,6 +257,12 @@
<version>${otel.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.aayushatharva.brotli4j</groupId>
+ <artifactId>brotli4j</artifactId>
+ <version>${brotli4j.version}</version>
+ <optional>true</optional>
+ </dependency>
</dependencies>
</dependencyManagement>