HTRACE-358. Provide convenience wrapper around ScheduledExecutorService (Mike Drob via Colin P. McCabe)
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java
new file mode 100644
index 0000000..e783561
--- /dev/null
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.htrace.core;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A convenience wrapper around a {@link ScheduledExecutorService} for
+ * automatically propagating trace scopes to executable tasks.
+ * <p>
+ * Recurring tasks will use independent scopes per execution, but will all be
+ * tied to the same parent scope (if any).
+ */
+public class ScheduledTraceExecutorService extends TraceExecutorService
+    implements ScheduledExecutorService {
+  final ScheduledExecutorService impl;
+
+  ScheduledTraceExecutorService(Tracer tracer, String scopeName,
+      ScheduledExecutorService impl) {
+    super(tracer, scopeName, impl);
+    this.impl = impl;
+  }
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay,
+      TimeUnit unit) {
+    return impl.schedule(wrap(command), delay, unit);
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
+      TimeUnit unit) {
+    return impl.schedule(wrap(callable), delay, unit);
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+      long initialDelay, long period, TimeUnit unit) {
+    return impl.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+      long initialDelay, long delay, TimeUnit unit) {
+    return impl.scheduleWithFixedDelay(wrap(command), initialDelay, delay,
+        unit);
+  }
+
+}
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java
index 81e31ea..592f354 100644
--- a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java
@@ -26,6 +26,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+/**
+ * A convenience wrapper around an {@link ExecutorService} for automatically
+ * propagating trace scopes to executable tasks.
+ */
 public class TraceExecutorService implements ExecutorService {
   private final Tracer tracer;
   private final String scopeName;
@@ -40,7 +44,7 @@
 
   @Override
   public void execute(Runnable command) {
-    impl.execute(tracer.wrap(command, scopeName));
+    impl.execute(wrap(command));
   }
 
   @Override
@@ -71,24 +75,38 @@
 
   @Override
   public <T> Future<T> submit(Callable<T> task) {
-    return impl.submit(tracer.wrap(task, scopeName));
+    return impl.submit(wrap(task));
   }
 
   @Override
   public <T> Future<T> submit(Runnable task, T result) {
-    return impl.submit(tracer.wrap(task, scopeName), result);
+    return impl.submit(wrap(task), result);
   }
 
   @Override
   public Future<?> submit(Runnable task) {
-    return impl.submit(tracer.wrap(task, scopeName));
+    return impl.submit(wrap(task));
+  }
+
+  /*
+   * Intended for internal use only.
+   */
+  Runnable wrap(Runnable runnable) {
+    return tracer.wrap(runnable, scopeName);
+  }
+
+  /*
+   * Intended for internal use only.
+   */
+  <V> Callable<V> wrap(Callable<V> callable) {
+    return tracer.wrap(callable, scopeName);
   }
 
   private <T> Collection<? extends Callable<T>> wrapCollection(
       Collection<? extends Callable<T>> tasks) {
     List<Callable<T>> result = new ArrayList<Callable<T>>();
     for (Callable<T> task : tasks) {
-      result.add(tracer.wrap(task, scopeName));
+      result.add(wrap(task));
     }
     return result;
   }
diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java
index f78e0a0..0ca4d1d 100644
--- a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java
+++ b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -474,6 +475,16 @@
     return new TraceExecutorService(this, scopeName, impl);
   }
 
+  public ScheduledTraceExecutorService newTraceExecutorService(
+      ScheduledExecutorService impl) {
+    return newTraceExecutorService(impl, null);
+  }
+
+  public ScheduledTraceExecutorService newTraceExecutorService(
+      ScheduledExecutorService impl, String scopeName) {
+    return new ScheduledTraceExecutorService(this, scopeName, impl);
+  }
+
   public TracerPool getTracerPool() {
     if (tracerPool == null) {
       throwClientError(toString() + " is closed.");
diff --git a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java
index dbdd27c..1bd18f7 100644
--- a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java
+++ b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java
@@ -17,15 +17,23 @@
 package org.apache.htrace.core;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.htrace.core.Tracer.Builder;
 import org.junit.Test;
 
 public class TestTraceExecutor {
@@ -88,6 +96,64 @@
     }
   }
 
+  @Test
+  public void testScheduledExecutor() throws Exception {
+    final int TASK_COUNT = 3;
+    final int DELAY = 500;
+
+    HTraceConfiguration conf = HTraceConfiguration.fromKeyValuePairs(
+        Tracer.SAMPLER_CLASSES_KEY, AlwaysSampler.class.getName());
+
+    ScheduledExecutorService ses = null;
+    Builder builder = new Tracer.Builder("TestTraceExecutor").conf(conf);
+    try (Tracer tracer = builder.build()) {
+      final ThreadFactory tf = new NamingThreadFactory();
+      ses = Executors.newScheduledThreadPool(TASK_COUNT, tf);
+      ses = tracer.newTraceExecutorService(ses);
+
+      final CountDownLatch startLatch = new CountDownLatch(TASK_COUNT);
+      final CountDownLatch continueLatch = new CountDownLatch(1);
+      Callable<String> task = new Callable<String>() {
+        @Override
+        public String call() throws InterruptedException {
+          startLatch.countDown();
+          // Prevent any task from exiting until every task has started
+          assertTrue(continueLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS));
+          // Annotate on the presumed child trace
+          Tracer.getCurrentSpan().addTimelineAnnotation(
+              Thread.currentThread().getName());
+          return Tracer.getCurrentSpan().getDescription();
+        }
+      };
+
+      try (TraceScope scope = tracer.newScope("TestRunnable")) {
+        Collection<Future<String>> futures = new ArrayList<>();
+
+        for (int i = 0; i < TASK_COUNT; i++) {
+          futures.add(ses.schedule(task, DELAY, TimeUnit.MILLISECONDS));
+        }
+
+        // Wait for all tasks to start
+        assertTrue(startLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS));
+        continueLatch.countDown();
+        // Collect the expected results
+        Collection<String> results = new HashSet<>();
+        for (Future<String> future : futures) {
+          results.add(future.get(WAIT_TIME_SECONDS, TimeUnit.SECONDS));
+        }
+
+        assertTrue("Timeline Annotations should have gone to child traces.",
+            Tracer.getCurrentSpan().getTimelineAnnotations().isEmpty());
+        assertEquals("Duplicated child span descriptions.", TASK_COUNT,
+            results.size());
+      }
+    } finally {
+      if (ses != null) {
+        ses.shutdown();
+      }
+    }
+  }
+
   /*
    * Inspired by org.apache.solr.util.DefaultSolrThreadFactory
    */