HTTPCLIENT-2423: Propagate trace context in observation interceptors
Expose outbound requests as Micrometer sender carriers for header injection.
diff --git a/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/HttpClientObservationContext.java b/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/HttpClientObservationContext.java
new file mode 100644
index 0000000..35b746f
--- /dev/null
+++ b/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/HttpClientObservationContext.java
@@ -0,0 +1,46 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+package org.apache.hc.client5.http.observation.impl;
+
+import io.micrometer.observation.transport.RequestReplySenderContext;
+import org.apache.hc.core5.http.HttpRequest;
+import org.apache.hc.core5.http.HttpResponse;
+
+final class HttpClientObservationContext extends RequestReplySenderContext<HttpRequest, HttpResponse> {
+
+    HttpClientObservationContext(final HttpRequest request) {
+        super(HttpClientObservationContext::setHeader);
+        setCarrier(request);
+    }
+
+    private static void setHeader(final HttpRequest request, final String name, final String value) {
+        if (request != null) {
+            request.setHeader(name, value);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptor.java b/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptor.java
index d291868..d4f442a 100644
--- a/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptor.java
+++ b/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptor.java
@@ -40,6 +40,7 @@
 import org.apache.hc.core5.http.HttpResponse;
 import org.apache.hc.core5.http.nio.AsyncDataConsumer;
 import org.apache.hc.core5.http.nio.AsyncEntityProducer;
+import org.apache.hc.core5.util.Args;
 
 /**
  * Asynchronous execution interceptor that emits Micrometer {@link Observation}s
@@ -58,8 +59,8 @@ public final class ObservationAsyncExecInterceptor implements AsyncExecChainHand
 
     public ObservationAsyncExecInterceptor(final ObservationRegistry registry,
                                            final ObservingOptions opts) {
-        this.registry = registry;
-        this.opts = opts;
+        this.registry = Args.notNull(registry, "observationRegistry");
+        this.opts = opts != null ? opts : ObservingOptions.DEFAULT;
     }
 
     @Override
@@ -74,17 +75,21 @@ public void execute(final HttpRequest request,
             return;
         }
 
+        final HttpClientObservationContext observationContext = new HttpClientObservationContext(request);
+
         final Observation observation = Observation
-                .createNotStarted("http.client.request", registry)
+                .createNotStarted("http.client.request", () -> observationContext, registry)
                 .contextualName(request.getMethod() + " " + request.getRequestUri())
                 .lowCardinalityKeyValue("http.method", request.getMethod())
                 .lowCardinalityKeyValue("net.peer.name", scope.route.getTargetHost().getHostName())
                 .start();
 
         final AsyncExecCallback wrappedCallback = new AsyncExecCallback() {
+
             @Override
             public AsyncDataConsumer handleResponse(final HttpResponse response,
                                                     final EntityDetails entityDetails) throws HttpException, IOException {
+                observationContext.setResponse(response);
                 observation.lowCardinalityKeyValue("http.status_code", Integer.toString(response.getCode()));
                 return asyncExecCallback.handleResponse(response, entityDetails);
             }
@@ -106,8 +111,16 @@ public void failed(final Exception cause) {
                 observation.stop();
                 asyncExecCallback.failed(cause);
             }
+
         };
 
-        chain.proceed(request, entityProducer, scope, wrappedCallback);
+        try {
+            chain.proceed(request, entityProducer, scope, wrappedCallback);
+        } catch (final IOException | HttpException | RuntimeException | Error ex) {
+            observation.error(ex);
+            observation.stop();
+            throw ex;
+        }
     }
-}
+
+}
\ No newline at end of file
diff --git a/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptor.java b/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptor.java
index 230a8fc..9f8f494 100644
--- a/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptor.java
+++ b/httpclient5-observation/src/main/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptor.java
@@ -74,8 +74,10 @@ public ClassicHttpResponse execute(final ClassicHttpRequest request,
         final String uriForName = safeUriForName(request);
         final String peer = scope.route.getTargetHost().getHostName();
 
+        final HttpClientObservationContext observationContext = new HttpClientObservationContext(request);
+
         final Observation obs = Observation
-                .createNotStarted("http.client.request", registry)
+                .createNotStarted("http.client.request", () -> observationContext, registry)
                 .contextualName(method + " " + uriForName)
                 .lowCardinalityKeyValue("http.method", method)
                 .lowCardinalityKeyValue("net.peer.name", peer)
@@ -91,6 +93,7 @@ public ClassicHttpResponse execute(final ClassicHttpRequest request,
             throw t;
         } finally {
             if (response != null) {
+                observationContext.setResponse(response);
                 obs.lowCardinalityKeyValue("http.status_code", Integer.toString(response.getCode()));
             }
             if (opts.tagLevel == ObservingOptions.TagLevel.EXTENDED) {
@@ -111,4 +114,5 @@ private static String safeUriForName(final ClassicHttpRequest req) {
             return req.getRequestUri();
         }
     }
-}
+
+}
\ No newline at end of file
diff --git a/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptorTest.java b/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptorTest.java
index 72a5b20..5b5a897 100644
--- a/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptorTest.java
+++ b/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationAsyncExecInterceptorTest.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import io.micrometer.observation.Observation;
 import io.micrometer.observation.ObservationRegistry;
@@ -44,6 +45,7 @@
 import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
 import org.apache.hc.client5.http.observation.ObservingOptions;
 import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
@@ -57,6 +59,13 @@ class ObservationAsyncExecInterceptorTest {
 
     private HttpServer server;
 
+    private static final String TRACE_PARENT = "traceparent";
+
+    private static final String TRACE_PARENT_VALUE =
+            "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
+
+    private static final String STALE_TRACE_PARENT_VALUE = "stale";
+
     private static final class CountingHandler
             implements io.micrometer.observation.ObservationHandler<Observation.Context> {
         final AtomicInteger starts = new AtomicInteger();
@@ -132,4 +141,65 @@ void emitsObservationAroundAsyncCall() throws Exception {
         assertEquals(1, h.starts.get());
         assertEquals(1, h.stops.get());
     }
+
+    @Test
+    void propagatesTraceContextAroundAsyncCall() throws Exception {
+        final AtomicReference<String> receivedTraceParent = new AtomicReference<>();
+
+        server = ServerBootstrap.bootstrap()
+                .setLocalAddress(InetAddress.getLoopbackAddress())
+                .setListenerPort(0)
+                .register("localhost", "/get", (request, response, context) -> {
+                    final Header traceParent = request.getFirstHeader(TRACE_PARENT);
+                    receivedTraceParent.set(traceParent != null ? traceParent.getValue() : null);
+                    response.setCode(HttpStatus.SC_OK);
+                    response.setEntity(new StringEntity("{\"ok\":true}", ContentType.APPLICATION_JSON));
+                })
+                .create();
+        server.start();
+
+        final ObservationRegistry reg = ObservationRegistry.create();
+        reg.observationConfig().observationHandler(new TracePropagationHandler());
+
+        final ObservingOptions opts = ObservingOptions.builder()
+                .metrics(EnumSet.noneOf(ObservingOptions.MetricSet.class))
+                .build();
+
+        final HttpAsyncClientBuilder b = HttpAsyncClients.custom();
+        b.addExecInterceptorFirst("span", new ObservationAsyncExecInterceptor(reg, opts));
+
+        final HttpHost target = new HttpHost("http", "localhost", server.getLocalPort());
+
+        try (final CloseableHttpAsyncClient c = b.build()) {
+            c.start();
+
+            final SimpleHttpRequest request = SimpleRequestBuilder.get()
+                    .setHttpHost(target)
+                    .setPath("/get")
+                    .build();
+            request.setHeader(TRACE_PARENT, STALE_TRACE_PARENT_VALUE);
+
+            final Future<SimpleHttpResponse> future = c.execute(request, null);
+            final SimpleHttpResponse response = future.get(10, TimeUnit.SECONDS);
+
+            assertEquals(HttpStatus.SC_OK, response.getCode());
+        }
+
+        assertEquals(TRACE_PARENT_VALUE, receivedTraceParent.get());
+    }
+
+    private static final class TracePropagationHandler implements io.micrometer.observation.ObservationHandler<Observation.Context> {
+
+        @Override
+        public boolean supportsContext(final Observation.Context context) {
+            return context instanceof HttpClientObservationContext;
+        }
+
+        @Override
+        public void onStart(final Observation.Context context) {
+            final HttpClientObservationContext senderContext = (HttpClientObservationContext) context;
+            senderContext.getSetter().set(senderContext.getCarrier(), TRACE_PARENT, TRACE_PARENT_VALUE);
+        }
+
+    }
 }
diff --git a/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptorTest.java b/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptorTest.java
index a69cb2b..d1c9bcd 100644
--- a/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptorTest.java
+++ b/httpclient5-observation/src/test/java/org/apache/hc/client5/http/observation/impl/ObservationClassicExecInterceptorTest.java
@@ -31,6 +31,7 @@
 import java.net.InetAddress;
 import java.util.EnumSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import io.micrometer.observation.Observation;
 import io.micrometer.observation.ObservationRegistry;
@@ -38,8 +39,10 @@
 import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
 import org.apache.hc.client5.http.impl.classic.HttpClients;
 import org.apache.hc.client5.http.observation.ObservingOptions;
+import org.apache.hc.core5.http.ClassicHttpRequest;
 import org.apache.hc.core5.http.ClassicHttpResponse;
 import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.Header;
 import org.apache.hc.core5.http.HttpHost;
 import org.apache.hc.core5.http.HttpStatus;
 import org.apache.hc.core5.http.impl.bootstrap.HttpServer;
@@ -54,6 +57,13 @@ class ObservationClassicExecInterceptorTest {
 
     private HttpServer server;
 
+    private static final String TRACE_PARENT = "traceparent";
+
+    private static final String TRACE_PARENT_VALUE =
+            "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
+
+    private static final String STALE_TRACE_PARENT_VALUE = "stale";
+
     private static final class CountingHandler implements io.micrometer.observation.ObservationHandler<Observation.Context> {
         final AtomicInteger starts = new AtomicInteger();
         final AtomicInteger stops = new AtomicInteger();
@@ -124,4 +134,59 @@ void emitsObservationAroundClassicCall() throws Exception {
         assertEquals(1, h.starts.get(), "observation should start once");
         assertEquals(1, h.stops.get(), "observation should stop once");
     }
+
+    @Test
+    void propagatesTraceContextAroundClassicCall() throws Exception {
+        final AtomicReference<String> receivedTraceParent = new AtomicReference<>();
+
+        server = ServerBootstrap.bootstrap()
+                .setLocalAddress(InetAddress.getLoopbackAddress())
+                .setListenerPort(0)
+                .register("localhost", "/get", (request, response, context) -> {
+                    final Header traceParent = request.getFirstHeader(TRACE_PARENT);
+                    receivedTraceParent.set(traceParent != null ? traceParent.getValue() : null);
+                    response.setCode(HttpStatus.SC_OK);
+                    response.setEntity(new StringEntity("{\"ok\":true}", ContentType.APPLICATION_JSON));
+                })
+                .create();
+        server.start();
+
+        final ObservationRegistry reg = ObservationRegistry.create();
+        reg.observationConfig().observationHandler(new TracePropagationHandler());
+
+        final ObservingOptions opts = ObservingOptions.builder()
+                .metrics(EnumSet.noneOf(ObservingOptions.MetricSet.class))
+                .build();
+
+        final HttpClientBuilder b = HttpClients.custom();
+        b.addExecInterceptorFirst("span", new ObservationClassicExecInterceptor(reg, opts));
+
+        final HttpHost target = new HttpHost("http", "localhost", server.getLocalPort());
+
+        try (final CloseableHttpClient c = b.build()) {
+            final ClassicHttpRequest request = ClassicRequestBuilder.get("/get").build();
+            request.setHeader(TRACE_PARENT, STALE_TRACE_PARENT_VALUE);
+
+            final ClassicHttpResponse response = c.executeOpen(target, request, null);
+            assertEquals(HttpStatus.SC_OK, response.getCode());
+            response.close();
+        }
+
+        assertEquals(TRACE_PARENT_VALUE, receivedTraceParent.get());
+    }
+
+    private static final class TracePropagationHandler implements io.micrometer.observation.ObservationHandler<Observation.Context> {
+
+        @Override
+        public boolean supportsContext(final Observation.Context context) {
+            return context instanceof HttpClientObservationContext;
+        }
+
+        @Override
+        public void onStart(final Observation.Context context) {
+            final HttpClientObservationContext senderContext = (HttpClientObservationContext) context;
+            senderContext.getSetter().set(senderContext.getCarrier(), TRACE_PARENT, TRACE_PARENT_VALUE);
+        }
+
+    }
 }