HTRACE-358. Provide convenience wrapper around ScheduledExecutorService (Mike Drob via Colin P. McCabe)
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java
new file mode 100644
index 0000000..e783561
--- /dev/null
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.core;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A convenience wrapper around a {@link ScheduledExecutorService} for
+ * automatically propagating trace scopes to executable tasks.
+ * <p>
+ * Recurring tasks will use independent scopes per execution, but will all be
+ * tied to the same parent scope (if any).
+ */
+public class ScheduledTraceExecutorService extends TraceExecutorService
+ implements ScheduledExecutorService {
+ final ScheduledExecutorService impl;
+
+ ScheduledTraceExecutorService(Tracer tracer, String scopeName,
+ ScheduledExecutorService impl) {
+ super(tracer, scopeName, impl);
+ this.impl = impl;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay,
+ TimeUnit unit) {
+ return impl.schedule(wrap(command), delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
+ TimeUnit unit) {
+ return impl.schedule(wrap(callable), delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay, long period, TimeUnit unit) {
+ return impl.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay, long delay, TimeUnit unit) {
+ return impl.scheduleWithFixedDelay(wrap(command), initialDelay, delay,
+ unit);
+ }
+
+}
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java
index 81e31ea..592f354 100644
--- a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java
@@ -26,6 +26,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+/**
+ * A convenience wrapper around an {@link ExecutorService} for automatically
+ * propagating trace scopes to executable tasks.
+ */
public class TraceExecutorService implements ExecutorService {
private final Tracer tracer;
private final String scopeName;
@@ -40,7 +44,7 @@
@Override
public void execute(Runnable command) {
- impl.execute(tracer.wrap(command, scopeName));
+ impl.execute(wrap(command));
}
@Override
@@ -71,24 +75,38 @@
@Override
public <T> Future<T> submit(Callable<T> task) {
- return impl.submit(tracer.wrap(task, scopeName));
+ return impl.submit(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
- return impl.submit(tracer.wrap(task, scopeName), result);
+ return impl.submit(wrap(task), result);
}
@Override
public Future<?> submit(Runnable task) {
- return impl.submit(tracer.wrap(task, scopeName));
+ return impl.submit(wrap(task));
+ }
+
+ /*
+ * Intended for internal use only.
+ */
+ Runnable wrap(Runnable runnable) {
+ return tracer.wrap(runnable, scopeName);
+ }
+
+ /*
+ * Intended for internal use only.
+ */
+ <V> Callable<V> wrap(Callable<V> callable) {
+ return tracer.wrap(callable, scopeName);
}
private <T> Collection<? extends Callable<T>> wrapCollection(
Collection<? extends Callable<T>> tasks) {
List<Callable<T>> result = new ArrayList<Callable<T>>();
for (Callable<T> task : tasks) {
- result.add(tracer.wrap(task, scopeName));
+ result.add(wrap(task));
}
return result;
}
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java
index f78e0a0..0ca4d1d 100644
--- a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -474,6 +475,16 @@
return new TraceExecutorService(this, scopeName, impl);
}
+ public ScheduledTraceExecutorService newTraceExecutorService(
+ ScheduledExecutorService impl) {
+ return newTraceExecutorService(impl, null);
+ }
+
+ public ScheduledTraceExecutorService newTraceExecutorService(
+ ScheduledExecutorService impl, String scopeName) {
+ return new ScheduledTraceExecutorService(this, scopeName, impl);
+ }
+
public TracerPool getTracerPool() {
if (tracerPool == null) {
throwClientError(toString() + " is closed.");
diff --git a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java
index dbdd27c..1bd18f7 100644
--- a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java
+++ b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java
@@ -17,15 +17,23 @@
package org.apache.htrace.core;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.htrace.core.Tracer.Builder;
import org.junit.Test;
public class TestTraceExecutor {
@@ -88,6 +96,64 @@
}
}
+ @Test
+ public void testScheduledExecutor() throws Exception {
+ final int TASK_COUNT = 3;
+ final int DELAY = 500;
+
+ HTraceConfiguration conf = HTraceConfiguration.fromKeyValuePairs(
+ Tracer.SAMPLER_CLASSES_KEY, AlwaysSampler.class.getName());
+
+ ScheduledExecutorService ses = null;
+ Builder builder = new Tracer.Builder("TestTraceExecutor").conf(conf);
+ try (Tracer tracer = builder.build()) {
+ final ThreadFactory tf = new NamingThreadFactory();
+ ses = Executors.newScheduledThreadPool(TASK_COUNT, tf);
+ ses = tracer.newTraceExecutorService(ses);
+
+ final CountDownLatch startLatch = new CountDownLatch(TASK_COUNT);
+ final CountDownLatch continueLatch = new CountDownLatch(1);
+ Callable<String> task = new Callable<String>() {
+ @Override
+ public String call() throws InterruptedException {
+ startLatch.countDown();
+ // Prevent any task from exiting until every task has started
+ assertTrue(continueLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS));
+ // Annotate on the presumed child trace
+ Tracer.getCurrentSpan().addTimelineAnnotation(
+ Thread.currentThread().getName());
+ return Tracer.getCurrentSpan().getDescription();
+ }
+ };
+
+ try (TraceScope scope = tracer.newScope("TestRunnable")) {
+ Collection<Future<String>> futures = new ArrayList<>();
+
+ for (int i = 0; i < TASK_COUNT; i++) {
+ futures.add(ses.schedule(task, DELAY, TimeUnit.MILLISECONDS));
+ }
+
+ // Wait for all tasks to start
+ assertTrue(startLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS));
+ continueLatch.countDown();
+ // Collect the expected results
+ Collection<String> results = new HashSet<>();
+ for (Future<String> future : futures) {
+ results.add(future.get(WAIT_TIME_SECONDS, TimeUnit.SECONDS));
+ }
+
+ assertTrue("Timeline Annotations should have gone to child traces.",
+ Tracer.getCurrentSpan().getTimelineAnnotations().isEmpty());
+ assertEquals("Duplicated child span descriptions.", TASK_COUNT,
+ results.size());
+ }
+ } finally {
+ if (ses != null) {
+ ses.shutdown();
+ }
+ }
+ }
+
/*
* Inspired by org.apache.solr.util.DefaultSolrThreadFactory
*/