Merge pull request #1444 from druid-io/logging-improvement

Separate bootstrap threads from loading threads on historical startup
diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
index acfb441..a5c67b5 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
@@ -44,6 +44,9 @@
   @JsonProperty("numLoadingThreads")
   private int numLoadingThreads = 1;
 
+  @JsonProperty("numBootstrapThreads")
+  private Integer numBootstrapThreads = null;
+
   @JsonProperty
   private File infoDir = null;
 
@@ -72,6 +75,10 @@
     return numLoadingThreads;
   }
 
+  public int getNumBootstrapThreads() {
+    return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
+  }
+
   public File getInfoDir()
   {
     if (infoDir == null) {
diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
index 5405bbb..c4c3f9e 100644
--- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
+++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
@@ -53,7 +53,6 @@
 
   private volatile PathChildrenCache loadQueueCache;
   private volatile boolean started = false;
-  private final ListeningExecutorService loadingExec;
 
   public BaseZkCoordinator(
       ObjectMapper jsonMapper,
@@ -68,12 +67,6 @@
     this.config = config;
     this.me = me;
     this.curator = curator;
-    this.loadingExec = MoreExecutors.listeningDecorator(
-        Executors.newFixedThreadPool(
-            config.getNumLoadingThreads(),
-            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
-        )
-    );
   }
 
   @LifecycleStart
@@ -95,7 +88,10 @@
           loadQueueLocation,
           true,
           true,
-          loadingExec
+          Executors.newFixedThreadPool(
+              config.getNumLoadingThreads(),
+              new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
+          )
       );
 
       try {
@@ -217,9 +213,4 @@
   public abstract void loadLocalCache();
 
   public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
-
-  public ListeningExecutorService getLoadingExecutor()
-  {
-    return loadingExec;
-  }
 }
diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
index 3804db0..0a9523a 100644
--- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
+++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
@@ -20,12 +20,13 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.metamx.common.ISE;
 import com.metamx.common.concurrent.ScheduledExecutorFactory;
 import com.metamx.emitter.EmittingLogger;
+import io.druid.concurrent.Execs;
 import io.druid.segment.loading.SegmentLoaderConfig;
 import io.druid.segment.loading.SegmentLoadingException;
 import io.druid.server.initialization.ZkPathsConfig;
@@ -34,13 +35,18 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  */
@@ -86,8 +92,10 @@
     }
 
     List<DataSegment> cachedSegments = Lists.newArrayList();
-    for (File file : baseDir.listFiles()) {
-      log.info("Loading segment cache file [%s]", file);
+    File[] segmentsToLoad = baseDir.listFiles();
+    for (int i = 0; i < segmentsToLoad.length; i++) {
+      File file = segmentsToLoad[i];
+      log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file);
       try {
         DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
         if (serverManager.isSegmentCached(segment)) {
@@ -179,69 +187,71 @@
     }
   }
 
-  public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
+  private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
   {
-    try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
-            new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
+    ExecutorService loadingExecutor = null;
+    try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
+             new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
+
       backgroundSegmentAnnouncer.startAnnouncing();
 
-      final List<ListenableFuture> segmentLoading = Lists.newArrayList();
+      loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s");
 
+      final int numSegments = segments.size();
+      final CountDownLatch latch = new CountDownLatch(numSegments);
+      final AtomicInteger counter = new AtomicInteger(0);
+      final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
       for (final DataSegment segment : segments) {
-        segmentLoading.add(
-            getLoadingExecutor().submit(
-                new Callable<Void>()
-                {
-                  @Override
-                  public Void call() throws SegmentLoadingException
-                  {
+        loadingExecutor.submit(
+            new Runnable() {
+              @Override
+              public void run() {
+                try {
+                  log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier());
+                  final boolean loaded = loadSegment(segment, callback);
+                  if (loaded) {
                     try {
-                      log.info("Loading segment %s", segment.getIdentifier());
-                      final boolean loaded = loadSegment(segment, callback);
-                      if (loaded) {
-                        try {
-                          backgroundSegmentAnnouncer.announceSegment(segment);
-                        }
-                        catch (InterruptedException e) {
-                          Thread.currentThread().interrupt();
-                          throw new SegmentLoadingException(e, "Loading Interrupted");
-                        }
-                      }
-                      return null;
-                    } catch(SegmentLoadingException e) {
-                      log.error(e, "[%s] failed to load", segment.getIdentifier());
-                      throw e;
+                      backgroundSegmentAnnouncer.announceSegment(segment);
+                    } catch (InterruptedException e) {
+                      Thread.currentThread().interrupt();
+                      throw new SegmentLoadingException(e, "Loading Interrupted");
                     }
                   }
+                } catch (SegmentLoadingException e) {
+                  log.error(e, "[%s] failed to load", segment.getIdentifier());
+                  failedSegments.add(segment);
+                } finally {
+                  latch.countDown();
                 }
-            )
+              }
+            }
         );
       }
 
-      int failed = 0;
-      for(ListenableFuture future : segmentLoading) {
-        try {
-          future.get();
-        } catch(InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new SegmentLoadingException(e, "Loading Interrupted");
-        } catch(ExecutionException e) {
-          failed++;
+      try{
+        latch.await();
+
+        if(failedSegments.size() > 0) {
+          log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
+              .addData("failedSegments", failedSegments);
         }
-      }
-      if(failed > 0) {
-        throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
+      } catch(InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.makeAlert(e, "LoadingInterrupted");
       }
 
       backgroundSegmentAnnouncer.finishAnnouncing();
     }
     catch (SegmentLoadingException e) {
-      log.makeAlert(e, "Failed to load segments")
-         .addData("segments", segments)
-         .emit();
+      log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
+          .addData("numSegments", segments.size())
+          .emit();
     }
     finally {
       callback.execute();
+      if (loadingExecutor != null) {
+        loadingExecutor.shutdownNow();
+      }
     }
   }