Merge pull request #1444 from druid-io/logging-improvement
Separate bootstrap threads from loading threads on historical startup
diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
index acfb441..a5c67b5 100644
--- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java
@@ -44,6 +44,9 @@
@JsonProperty("numLoadingThreads")
private int numLoadingThreads = 1;
+ @JsonProperty("numBootstrapThreads")
+ private Integer numBootstrapThreads = null;
+
@JsonProperty
private File infoDir = null;
@@ -72,6 +75,10 @@
return numLoadingThreads;
}
+ public int getNumBootstrapThreads() {
+ return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads;
+ }
+
public File getInfoDir()
{
if (infoDir == null) {
diff --git a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
index 5405bbb..c4c3f9e 100644
--- a/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
+++ b/server/src/main/java/io/druid/server/coordination/BaseZkCoordinator.java
@@ -53,7 +53,6 @@
private volatile PathChildrenCache loadQueueCache;
private volatile boolean started = false;
- private final ListeningExecutorService loadingExec;
public BaseZkCoordinator(
ObjectMapper jsonMapper,
@@ -68,12 +67,6 @@
this.config = config;
this.me = me;
this.curator = curator;
- this.loadingExec = MoreExecutors.listeningDecorator(
- Executors.newFixedThreadPool(
- config.getNumLoadingThreads(),
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
- )
- );
}
@LifecycleStart
@@ -95,7 +88,10 @@
loadQueueLocation,
true,
true,
- loadingExec
+ Executors.newFixedThreadPool(
+ config.getNumLoadingThreads(),
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ZkCoordinator-%s").build()
+ )
);
try {
@@ -217,9 +213,4 @@
public abstract void loadLocalCache();
public abstract DataSegmentChangeHandler getDataSegmentChangeHandler();
-
- public ListeningExecutorService getLoadingExecutor()
- {
- return loadingExec;
- }
}
diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
index 3804db0..0a9523a 100644
--- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
+++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java
@@ -20,12 +20,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.emitter.EmittingLogger;
+import io.druid.concurrent.Execs;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.server.initialization.ZkPathsConfig;
@@ -34,13 +35,18 @@
import java.io.File;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@@ -86,8 +92,10 @@
}
List<DataSegment> cachedSegments = Lists.newArrayList();
- for (File file : baseDir.listFiles()) {
- log.info("Loading segment cache file [%s]", file);
+ File[] segmentsToLoad = baseDir.listFiles();
+ for (int i = 0; i < segmentsToLoad.length; i++) {
+ File file = segmentsToLoad[i];
+ log.info("Loading segment cache file [%d/%d][%s].", i, segmentsToLoad.length, file);
try {
DataSegment segment = jsonMapper.readValue(file, DataSegment.class);
if (serverManager.isSegmentCached(segment)) {
@@ -179,69 +187,71 @@
}
}
- public void addSegments(Iterable<DataSegment> segments, final DataSegmentChangeCallback callback)
+ private void addSegments(Collection<DataSegment> segments, final DataSegmentChangeCallback callback)
{
- try(final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
- new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
+ ExecutorService loadingExecutor = null;
+ try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
+ new BackgroundSegmentAnnouncer(announcer, exec, config.getAnnounceIntervalMillis())) {
+
backgroundSegmentAnnouncer.startAnnouncing();
- final List<ListenableFuture> segmentLoading = Lists.newArrayList();
+ loadingExecutor = Execs.multiThreaded(config.getNumBootstrapThreads(), "ZkCoordinator-loading-%s");
+ final int numSegments = segments.size();
+ final CountDownLatch latch = new CountDownLatch(numSegments);
+ final AtomicInteger counter = new AtomicInteger(0);
+ final CopyOnWriteArrayList<DataSegment> failedSegments = new CopyOnWriteArrayList<>();
for (final DataSegment segment : segments) {
- segmentLoading.add(
- getLoadingExecutor().submit(
- new Callable<Void>()
- {
- @Override
- public Void call() throws SegmentLoadingException
- {
+ loadingExecutor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ log.info("Loading segment[%d/%d][%s]", counter.getAndIncrement(), numSegments, segment.getIdentifier());
+ final boolean loaded = loadSegment(segment, callback);
+ if (loaded) {
try {
- log.info("Loading segment %s", segment.getIdentifier());
- final boolean loaded = loadSegment(segment, callback);
- if (loaded) {
- try {
- backgroundSegmentAnnouncer.announceSegment(segment);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SegmentLoadingException(e, "Loading Interrupted");
- }
- }
- return null;
- } catch(SegmentLoadingException e) {
- log.error(e, "[%s] failed to load", segment.getIdentifier());
- throw e;
+ backgroundSegmentAnnouncer.announceSegment(segment);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SegmentLoadingException(e, "Loading Interrupted");
}
}
+ } catch (SegmentLoadingException e) {
+ log.error(e, "[%s] failed to load", segment.getIdentifier());
+ failedSegments.add(segment);
+ } finally {
+ latch.countDown();
}
- )
+ }
+ }
);
}
- int failed = 0;
- for(ListenableFuture future : segmentLoading) {
- try {
- future.get();
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SegmentLoadingException(e, "Loading Interrupted");
- } catch(ExecutionException e) {
- failed++;
+ try{
+ latch.await();
+
+ if(failedSegments.size() > 0) {
+ log.makeAlert("%,d errors seen while loading segments", failedSegments.size())
+ .addData("failedSegments", failedSegments);
}
- }
- if(failed > 0) {
- throw new SegmentLoadingException("%,d errors seen while loading segments", failed);
+ } catch(InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.makeAlert(e, "LoadingInterrupted");
}
backgroundSegmentAnnouncer.finishAnnouncing();
}
catch (SegmentLoadingException e) {
- log.makeAlert(e, "Failed to load segments")
- .addData("segments", segments)
- .emit();
+ log.makeAlert(e, "Failed to load segments -- likely problem with announcing.")
+ .addData("numSegments", segments.size())
+ .emit();
}
finally {
callback.execute();
+ if (loadingExecutor != null) {
+ loadingExecutor.shutdownNow();
+ }
}
}