blob: 604abc1c2747f9458e87f0d65d7bf0a1149b3d4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.tracing.zipkin;
import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.servicecomb.tracing.zipkin.ZipkinTracingAdviser.CALL_PATH;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import java.lang.reflect.Array;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.servicecomb.tracing.zipkin.ZipkinTracingAdviser.ThrowableSupplier;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Test;
import brave.Span;
import brave.Tracer.SpanInScope;
import brave.Tracing;
import brave.propagation.StrictScopeDecorator;
import brave.propagation.ThreadLocalCurrentTraceContext;
import org.junit.jupiter.api.Assertions;
public class ZipkinTracingAdviserTest {
private static final int nThreads = 10;
private final String spanName = "some span";
private final String path = this.getClass().getCanonicalName();
private final String expected = "supplied";
private final RuntimeException error = new RuntimeException("oops");
private final ThrowableSupplier<String> supplier = () -> expected;
private final Map<String, Queue<zipkin2.Span>> traces = new ConcurrentHashMap<>();
private final Tracing tracing = Tracing.newBuilder()
.currentTraceContext(
ThreadLocalCurrentTraceContext.newBuilder().addScopeDecorator(StrictScopeDecorator.create()).build()).
spanReporter(e -> traces.computeIfAbsent(e.traceId(), id -> new ConcurrentLinkedDeque<>()).
add(e)).
build();
private final ZipkinTracingAdviser tracingAdviser = new ZipkinTracingAdviser(tracing.tracer());
@After
public void tearDown() throws Exception {
tracing.close();
}
@Test
public void startsNewRootSpan() throws Throwable {
String result = tracingAdviser.invoke(spanName, path, supplier);
MatcherAssert.assertThat(result, is(expected));
await().atMost(2, SECONDS).until(() -> !traces.isEmpty());
zipkin2.Span span = traces.values().iterator().next().poll();
MatcherAssert.assertThat(span.name(), is(spanName));
MatcherAssert.assertThat(tracedValues(span), contains(this.getClass().getCanonicalName()));
}
@Test
public void includesExceptionInTags() throws Throwable {
try {
tracingAdviser.invoke(spanName, path, () -> {
throw error;
});
expectFailing(RuntimeException.class);
} catch (RuntimeException ignored) {
}
await().atMost(2, SECONDS).until(() -> !traces.isEmpty());
zipkin2.Span span = traces.values().iterator().next().poll();
Assertions.assertEquals(spanName, span.name());
MatcherAssert.assertThat(tracedValues(span), containsInAnyOrder(this.getClass().getCanonicalName(), "RuntimeException: oops"));
}
@SuppressWarnings({"unused", "try"})
@Test
public void startsNewChildSpan() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(nThreads);
CompletableFuture<?>[] futures = (CompletableFuture<?>[]) Array.newInstance(CompletableFuture.class, nThreads);
for (int i = 0; i < nThreads; i++) {
futures[i] = CompletableFuture.runAsync(() -> {
Span currentSpan = tracing.tracer().newTrace().start();
waitTillAllAreReady(cyclicBarrier);
try (SpanInScope spanInScope = tracing.tracer().withSpanInScope(currentSpan)) {
MatcherAssert.assertThat(tracingAdviser.invoke(spanName, path, supplier), is(expected));
} catch (Throwable throwable) {
Assertions.fail(throwable.getMessage());
} finally {
currentSpan.finish();
}
}, Executors.newFixedThreadPool(nThreads));
}
CompletableFuture.allOf(futures).join();
MatcherAssert.assertThat(traces.size(), is(nThreads));
for (Queue<zipkin2.Span> queue : traces.values()) {
zipkin2.Span child = queue.poll();
MatcherAssert.assertThat(child.name(), is(spanName));
zipkin2.Span parent = queue.poll();
MatcherAssert.assertThat(child.parentId(), is(parent.id()));
MatcherAssert.assertThat(child.traceId(), is(parent.traceId()));
MatcherAssert.assertThat(tracedValues(child), contains(this.getClass().getCanonicalName()));
}
}
private void waitTillAllAreReady(CyclicBarrier cyclicBarrier) {
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
private List<String> tracedValues(zipkin2.Span spans) {
return spans.tags().entrySet().stream()
.filter(span -> CALL_PATH.equals(span.getKey()) || "error".equals(span.getKey()))
.filter(span -> span.getValue() != null)
.map(Map.Entry::getValue)
.distinct()
.collect(Collectors.toList());
}
}