GERONIMO-6701 basic zipkin http sender
diff --git a/geronimo-opentracing-common/src/main/java/org/apache/geronimo/microprofile/opentracing/common/microprofile/zipkin/ZipkinHttp.java b/geronimo-opentracing-common/src/main/java/org/apache/geronimo/microprofile/opentracing/common/microprofile/zipkin/ZipkinHttp.java
new file mode 100644
index 0000000..2301e68
--- /dev/null
+++ b/geronimo-opentracing-common/src/main/java/org/apache/geronimo/microprofile/opentracing/common/microprofile/zipkin/ZipkinHttp.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin;
+
+import static java.util.Collections.singletonList;
+import static java.util.Optional.ofNullable;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static javax.ws.rs.client.Entity.entity;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+
+import org.apache.geronimo.microprofile.opentracing.common.config.GeronimoOpenTracingConfig;
+import org.apache.geronimo.microprofile.opentracing.common.spi.Listener;
+import org.eclipse.microprofile.opentracing.ClientTracingRegistrar;
+
+// experimental
+public class ZipkinHttp implements Listener<ZipkinSpan> {
+
+ private GeronimoOpenTracingConfig config;
+
+ private Jsonb jsonb;
+
+ private BlockingQueue<ZipkinSpan> spans;
+ private Client client;
+ private String collector;
+ private ScheduledExecutorService executor;
+ private ScheduledFuture<?> scheduledTask;
+ private int maxSpansPerBulk;
+ private int maxSpansIteration;
+
+ public void setConfig(final GeronimoOpenTracingConfig config) {
+ this.config = config;
+ }
+
+ public void setJsonb(final Jsonb jsonb) {
+ this.jsonb = jsonb;
+ }
+
+ public void init() {
+ if (jsonb == null) {
+ jsonb = JsonbBuilder.create();
+ }
+ final int capacity = Integer.parseInt(
+ config.read("span.converter.zipkin.http.bufferSize", "1000000"));
+ maxSpansPerBulk = Integer.parseInt(
+ config.read("span.converter.zipkin.http.maxSpansPerBulk", "250"));
+ maxSpansIteration = Integer.parseInt(
+ config.read("span.converter.zipkin.http.maxSpansIteration", "-1"));
+ collector = config.read("span.converter.zipkin.http.collector", null);
+ if (collector == null) {
+ return;
+ }
+
+ final long delay = Long.parseLong(
+ config.read("span.converter.zipkin.http.bulkSendInterval", "60000"));
+ if (delay < 0) {
+ logger().severe("No span.converter.zipkin.http.bulkSendInterval configured, skipping");
+ collector = null; // to skip anything
+ return;
+ }
+ if (delay > 0) {
+ executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread thread = new Thread(r, getClass().getName() + "-executor");
+ thread.setPriority(Thread.NORM_PRIORITY);
+ thread.setDaemon(false);
+ return thread;
+ }
+ });
+ scheduledTask = executor.scheduleAtFixedRate(this::onEmit, delay, delay, MILLISECONDS);
+ spans = new ArrayBlockingQueue<>(capacity);
+ } else { // == 0 => immediate send
+ spans = null;
+ }
+ final ClientBuilder clientBuilder = ClientBuilder.newBuilder()
+ .connectTimeout(Long.parseLong(config.read("span.converter.zipkin.http.connectTimeout", "30000")), MILLISECONDS)
+ .readTimeout(Long.parseLong(config.read("span.converter.zipkin.http.readTimeout", "30000")), MILLISECONDS);
+ ofNullable(config.read("span.converter.zipkin.http.providers", null))
+ .ifPresent(providers -> Stream.of(providers.split(","))
+ .map(String::trim)
+ .map(it -> {
+ try {
+ return Thread.currentThread().getContextClassLoader().loadClass(it)
+ .getConstructor().newInstance();
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ })
+ .forEach(clientBuilder::register));
+ if (Boolean.parseBoolean(config.read("span.converter.zipkin.http.selfTrace", "false"))) {
+ ClientTracingRegistrar.configure(clientBuilder);
+ }
+ client = clientBuilder.build();
+ logger().severe("Zipkin http sender configured");
+ }
+
+ private Logger logger() {
+ return Logger.getLogger("org.apache.geronimo.opentracing.zipkin.http");
+ }
+
+ public Jsonb getJsonb() {
+ return jsonb;
+ }
+
+ public void destroy() {
+ try {
+ jsonb.close();
+ } catch (final Exception e) {
+ // no-op
+ }
+ scheduledTask.cancel(true);
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(1, MINUTES);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void onEvent(final ZipkinSpan zipkinSpan) {
+ if (collector != null) {
+ if (spans == null) {
+ doSend(singletonList(zipkinSpan));
+ } else {
+ spans.add(zipkinSpan);
+ }
+ }
+ }
+
+ private void onEmit() {
+ final int size = this.spans.size();
+ final List<ZipkinSpan> copy = new ArrayList<>(size <= 0 ? maxSpansPerBulk : Math.min(size, maxSpansPerBulk));
+ int toSend = maxSpansIteration <= 0 ? size : Math.min(size, maxSpansIteration);
+ while (toSend > 0) {
+ this.spans.drainTo(copy, Math.min(toSend, maxSpansPerBulk));
+ if (copy.isEmpty()) {
+ break;
+ }
+ doSend(copy);
+ toSend -= copy.size();
+ copy.clear();
+
+ }
+ }
+
+ private void doSend(final List<ZipkinSpan> copy) {
+ final Response result = client.target(collector)
+ .request(APPLICATION_JSON_TYPE)
+ .post(entity(copy, APPLICATION_JSON_TYPE));
+ if (result.getStatus() >= 300) {
+ // todo: better handling but at least log them to not loose them completely or explode in memory
+ throw new IllegalStateException("Can't send to zipkin: " + copy);
+ }
+ }
+}
diff --git a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java
index dfd4e1d..e9ffe8a 100644
--- a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java
+++ b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java
@@ -41,6 +41,7 @@
import org.apache.geronimo.microprofile.opentracing.common.microprofile.client.OpenTracingClientResponseFilter;
import org.apache.geronimo.microprofile.opentracing.common.microprofile.thread.OpenTracingExecutorService;
import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinConverter;
+import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinHttp;
import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinLogger;
import org.eclipse.microprofile.opentracing.Traced;
@@ -51,12 +52,12 @@
private GeronimoOpenTracingConfig config;
private boolean useZipkin;
- private boolean useZipkinLogger;
+ private String zipkinSender;
void onStart(@Observes final BeforeBeanDiscovery beforeBeanDiscovery) {
config = GeronimoOpenTracingConfig.create();
useZipkin = Boolean.parseBoolean(config.read("span.converter.zipkin.active", "true"));
- useZipkinLogger = useZipkin && Boolean.parseBoolean(config.read("span.converter.zipkin.logger.active", "true"));
+ zipkinSender = config.read("span.converter.zipkin.sender", "logger");
}
void vetoDefaultConfigIfScanned(@Observes final ProcessAnnotatedType<GeronimoOpenTracingConfig> config) {
@@ -101,11 +102,17 @@
}
void zipkinLoggerToggle(@Observes final ProcessAnnotatedType<CdiZipkinLogger> onZipkinLogger) {
- if (!useZipkinLogger) {
+ if (!"logger".equalsIgnoreCase(zipkinSender) || !Boolean.parseBoolean(config.read("span.converter.zipkin.logger.active", "true"))) {
onZipkinLogger.veto();
}
}
+ void zipkinHttpToggle(@Observes final ProcessAnnotatedType<CdiZipkinHttp> onZipkinHttp) {
+ if (!"http".equalsIgnoreCase(zipkinSender) || !Boolean.parseBoolean(config.read("span.converter.zipkin.http.active", "true"))) {
+ onZipkinHttp.veto();
+ }
+ }
+
<T> void removeTracedFromJaxRsEndpoints(@Observes @WithAnnotations(Traced.class) final ProcessAnnotatedType<T> pat) {
if (isJaxRs(pat.getAnnotatedType())) { // we have filters with more accurate timing
final AnnotatedTypeConfigurator<T> configurator = pat.configureAnnotatedType();
diff --git a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinHttp.java b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinHttp.java
new file mode 100644
index 0000000..37c7807
--- /dev/null
+++ b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinHttp.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.microprofile.opentracing.microprofile.zipkin;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
+import javax.inject.Inject;
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+
+import org.apache.geronimo.microprofile.opentracing.common.config.GeronimoOpenTracingConfig;
+import org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin.ZipkinHttp;
+import org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin.ZipkinSpan;
+
+@ApplicationScoped
+public class CdiZipkinHttp extends ZipkinHttp {
+
+ @Inject
+ private GeronimoOpenTracingConfig config;
+
+ private Jsonb jsonb;
+
+ @PostConstruct
+ public void init() {
+ setConfig(config);
+ jsonb = JsonbBuilder.create();
+ super.init();
+ }
+
+ @PreDestroy
+ public void destroy() {
+ super.destroy();
+ }
+
+ public void onZipkinSpan(@Observes final ZipkinSpan zipkinSpan) {
+ super.onEvent(zipkinSpan);
+ }
+}
diff --git a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java
index d5eed2a..a34f214 100644
--- a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java
+++ b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java
@@ -37,8 +37,6 @@
private Jsonb jsonb;
- private boolean wrapAsList;
-
@PostConstruct
public void init() {
setConfig(config);