GERONIMO-6701 basic zipkin http sender
diff --git a/geronimo-opentracing-common/src/main/java/org/apache/geronimo/microprofile/opentracing/common/microprofile/zipkin/ZipkinHttp.java b/geronimo-opentracing-common/src/main/java/org/apache/geronimo/microprofile/opentracing/common/microprofile/zipkin/ZipkinHttp.java
new file mode 100644
index 0000000..2301e68
--- /dev/null
+++ b/geronimo-opentracing-common/src/main/java/org/apache/geronimo/microprofile/opentracing/common/microprofile/zipkin/ZipkinHttp.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin;
+
+import static java.util.Collections.singletonList;
+import static java.util.Optional.ofNullable;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static javax.ws.rs.client.Entity.entity;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON_TYPE;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+
+import org.apache.geronimo.microprofile.opentracing.common.config.GeronimoOpenTracingConfig;
+import org.apache.geronimo.microprofile.opentracing.common.spi.Listener;
+import org.eclipse.microprofile.opentracing.ClientTracingRegistrar;
+
+// experimental
+public class ZipkinHttp implements Listener<ZipkinSpan> {
+
+    private GeronimoOpenTracingConfig config;
+
+    private Jsonb jsonb;
+
+    private BlockingQueue<ZipkinSpan> spans;
+    private Client client;
+    private String collector;
+    private ScheduledExecutorService executor;
+    private ScheduledFuture<?> scheduledTask;
+    private int maxSpansPerBulk;
+    private int maxSpansIteration;
+
+    public void setConfig(final GeronimoOpenTracingConfig config) {
+        this.config = config;
+    }
+
+    public void setJsonb(final Jsonb jsonb) {
+        this.jsonb = jsonb;
+    }
+
+    public void init() {
+        if (jsonb == null) {
+            jsonb = JsonbBuilder.create();
+        }
+        final int capacity = Integer.parseInt(
+                config.read("span.converter.zipkin.http.bufferSize", "1000000"));
+        maxSpansPerBulk = Integer.parseInt(
+                config.read("span.converter.zipkin.http.maxSpansPerBulk", "250"));
+        maxSpansIteration = Integer.parseInt(
+                config.read("span.converter.zipkin.http.maxSpansIteration", "-1"));
+        collector = config.read("span.converter.zipkin.http.collector", null);
+        if (collector == null) {
+            return;
+        }
+
+        final long delay = Long.parseLong(
+                config.read("span.converter.zipkin.http.bulkSendInterval", "60000"));
+        if (delay < 0) {
+            logger().severe("No span.converter.zipkin.http.bulkSendInterval configured, skipping");
+            collector = null; // to skip anything
+            return;
+        }
+        if (delay > 0) {
+            executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+                @Override
+                public Thread newThread(Runnable r) {
+                    final Thread thread = new Thread(r, getClass().getName() + "-executor");
+                    thread.setPriority(Thread.NORM_PRIORITY);
+                    thread.setDaemon(false);
+                    return thread;
+                }
+            });
+            scheduledTask = executor.scheduleAtFixedRate(this::onEmit, delay, delay, MILLISECONDS);
+            spans = new ArrayBlockingQueue<>(capacity);
+        } else { // == 0 => immediate send
+            spans = null;
+        }
+        final ClientBuilder clientBuilder = ClientBuilder.newBuilder()
+                .connectTimeout(Long.parseLong(config.read("span.converter.zipkin.http.connectTimeout", "30000")), MILLISECONDS)
+                .readTimeout(Long.parseLong(config.read("span.converter.zipkin.http.readTimeout", "30000")), MILLISECONDS);
+        ofNullable(config.read("span.converter.zipkin.http.providers", null))
+                .ifPresent(providers -> Stream.of(providers.split(","))
+                    .map(String::trim)
+                    .map(it -> {
+                        try {
+                            return Thread.currentThread().getContextClassLoader().loadClass(it)
+                                    .getConstructor().newInstance();
+                        } catch (final Exception e) {
+                            throw new IllegalArgumentException(e);
+                        }
+                    })
+                    .forEach(clientBuilder::register));
+        if (Boolean.parseBoolean(config.read("span.converter.zipkin.http.selfTrace", "false"))) {
+            ClientTracingRegistrar.configure(clientBuilder);
+        }
+        client = clientBuilder.build();
+        logger().severe("Zipkin http sender configured");
+    }
+
+    private Logger logger() {
+        return Logger.getLogger("org.apache.geronimo.opentracing.zipkin.http");
+    }
+
+    public Jsonb getJsonb() {
+        return jsonb;
+    }
+
+    public void destroy() {
+        try {
+            jsonb.close();
+        } catch (final Exception e) {
+            // no-op
+        }
+        scheduledTask.cancel(true);
+        executor.shutdownNow();
+        try {
+            executor.awaitTermination(1, MINUTES);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void onEvent(final ZipkinSpan zipkinSpan) {
+        if (collector != null) {
+            if (spans == null) {
+                doSend(singletonList(zipkinSpan));
+            } else {
+                spans.add(zipkinSpan);
+            }
+        }
+    }
+
+    private void onEmit() {
+        final int size = this.spans.size();
+        final List<ZipkinSpan> copy = new ArrayList<>(size <= 0 ? maxSpansPerBulk : Math.min(size, maxSpansPerBulk));
+        int toSend = maxSpansIteration <= 0 ? size : Math.min(size, maxSpansIteration);
+        while (toSend > 0) {
+            this.spans.drainTo(copy, Math.min(toSend, maxSpansPerBulk));
+            if (copy.isEmpty()) {
+                break;
+            }
+            doSend(copy);
+            toSend -= copy.size();
+            copy.clear();
+
+        }
+    }
+
+    private void doSend(final List<ZipkinSpan> copy) {
+        final Response result = client.target(collector)
+                .request(APPLICATION_JSON_TYPE)
+                .post(entity(copy, APPLICATION_JSON_TYPE));
+        if (result.getStatus() >= 300) {
+            // todo: better handling but at least log them to not loose them completely or explode in memory
+            throw new IllegalStateException("Can't send to zipkin: " + copy);
+        }
+    }
+}
diff --git a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java
index dfd4e1d..e9ffe8a 100644
--- a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java
+++ b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/cdi/OpenTracingExtension.java
@@ -41,6 +41,7 @@
 import org.apache.geronimo.microprofile.opentracing.common.microprofile.client.OpenTracingClientResponseFilter;
 import org.apache.geronimo.microprofile.opentracing.common.microprofile.thread.OpenTracingExecutorService;
 import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinConverter;
+import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinHttp;
 import org.apache.geronimo.microprofile.opentracing.microprofile.zipkin.CdiZipkinLogger;
 import org.eclipse.microprofile.opentracing.Traced;
 
@@ -51,12 +52,12 @@
     private GeronimoOpenTracingConfig config;
 
     private boolean useZipkin;
-    private boolean useZipkinLogger;
+    private String zipkinSender;
 
     void onStart(@Observes final BeforeBeanDiscovery beforeBeanDiscovery) {
         config = GeronimoOpenTracingConfig.create();
         useZipkin = Boolean.parseBoolean(config.read("span.converter.zipkin.active", "true"));
-        useZipkinLogger = useZipkin && Boolean.parseBoolean(config.read("span.converter.zipkin.logger.active", "true"));
+        zipkinSender = config.read("span.converter.zipkin.sender", "logger");
     }
 
     void vetoDefaultConfigIfScanned(@Observes final ProcessAnnotatedType<GeronimoOpenTracingConfig> config) {
@@ -101,11 +102,17 @@
     }
 
     void zipkinLoggerToggle(@Observes final ProcessAnnotatedType<CdiZipkinLogger> onZipkinLogger) {
-        if (!useZipkinLogger) {
+        if (!"logger".equalsIgnoreCase(zipkinSender) || !Boolean.parseBoolean(config.read("span.converter.zipkin.logger.active", "true"))) {
             onZipkinLogger.veto();
         }
     }
 
+    void zipkinHttpToggle(@Observes final ProcessAnnotatedType<CdiZipkinHttp> onZipkinHttp) {
+        if (!"http".equalsIgnoreCase(zipkinSender) || !Boolean.parseBoolean(config.read("span.converter.zipkin.http.active", "true"))) {
+            onZipkinHttp.veto();
+        }
+    }
+
     <T> void removeTracedFromJaxRsEndpoints(@Observes @WithAnnotations(Traced.class) final ProcessAnnotatedType<T> pat) {
         if (isJaxRs(pat.getAnnotatedType())) { // we have filters with more accurate timing
             final AnnotatedTypeConfigurator<T> configurator = pat.configureAnnotatedType();
diff --git a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinHttp.java b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinHttp.java
new file mode 100644
index 0000000..37c7807
--- /dev/null
+++ b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinHttp.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.microprofile.opentracing.microprofile.zipkin;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.event.Observes;
+import javax.inject.Inject;
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+
+import org.apache.geronimo.microprofile.opentracing.common.config.GeronimoOpenTracingConfig;
+import org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin.ZipkinHttp;
+import org.apache.geronimo.microprofile.opentracing.common.microprofile.zipkin.ZipkinSpan;
+
+@ApplicationScoped
+public class CdiZipkinHttp extends ZipkinHttp {
+
+    @Inject
+    private GeronimoOpenTracingConfig config;
+
+    private Jsonb jsonb;
+
+    @PostConstruct
+    public void init() {
+        setConfig(config);
+        jsonb = JsonbBuilder.create();
+        super.init();
+    }
+
+    @PreDestroy
+    public void destroy() {
+        super.destroy();
+    }
+
+    public void onZipkinSpan(@Observes final ZipkinSpan zipkinSpan) {
+        super.onEvent(zipkinSpan);
+    }
+}
diff --git a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java
index d5eed2a..a34f214 100644
--- a/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java
+++ b/geronimo-opentracing/src/main/java/org/apache/geronimo/microprofile/opentracing/microprofile/zipkin/CdiZipkinLogger.java
@@ -37,8 +37,6 @@
 
     private Jsonb jsonb;
 
-    private boolean wrapAsList;
-
     @PostConstruct
     public void init() {
         setConfig(config);