Merge branch 'master' of github.com:metamx/druid
diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java b/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java
index 24706a8..94ab2ea 100644
--- a/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java
+++ b/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java
@@ -8,7 +8,7 @@
  */
 public abstract class ConfigManagerConfig
 {
-  @Config("druid.indexer.configTable")
+  @Config("druid.database.configTable")
   public abstract String getConfigTable();
 
   @Config("druid.indexer.poll.duration")
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
index 32cb188..e69b0f8 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
@@ -80,7 +80,7 @@
     return config;
   }
 
-  public TaskActionClient getTaskActionClientFactory()
+  public TaskActionClient getTaskActionClient()
   {
     return taskActionClientFactory.create(task);
   }
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
index 67754ee..86fd2a7 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
@@ -77,7 +77,7 @@
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
     // Strategy: Create an empty segment covering the interval to be deleted
-    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
     final Interval interval = this.getImplicitLockInterval().get();
     final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
     final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);
@@ -104,7 +104,7 @@
         segment.getVersion()
     );
 
-    toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
+    toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
 
     return TaskStatus.success(getId());
   }
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
index f0d09d5..6e28455 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
@@ -93,7 +93,7 @@
                                                        );
 
     // We should have a lock from before we started running
-    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
     log.info("Setting version to: %s", myLock.getVersion());
     configCopy.setVersion(myLock.getVersion());
 
@@ -124,7 +124,7 @@
       List<DataSegment> publishedSegments = job.getPublishedSegments();
 
       // Request segment pushes
-      toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
+      toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
 
       // Done
       return TaskStatus.success(getId());
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
index 3dfe99a..47f72b1 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
@@ -258,7 +258,7 @@
         }
     );
 
-    toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks));
+    toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
 
     return TaskStatus.success(getId());
   }
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
index 1790ddc..dd92888 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
@@ -100,7 +100,7 @@
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
     // We should have a lock from before we started running
-    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
 
     // We know this exists
     final Interval interval = getImplicitLockInterval().get();
@@ -190,7 +190,7 @@
     );
 
     // Request segment pushes
-    toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
+    toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
 
     // Done
     return TaskStatus.success(getId());
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
index d1bfc6d..35babcd 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
@@ -142,7 +142,7 @@
   @Override
   public TaskStatus preflight(TaskToolbox toolbox) throws Exception
   {
-    toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(toSubtasks()));
+    toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks()));
     return TaskStatus.success(getId());
   }
 
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
index e652ab6..f4476ff 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
@@ -72,7 +72,7 @@
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
     // Confirm we have a lock (will throw if there isn't exactly one element)
-    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
 
     if(!myLock.getDataSource().equals(getDataSource())) {
       throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@@ -84,7 +84,7 @@
 
     // List unused segments
     final List<DataSegment> unusedSegments = toolbox
-        .getTaskActionClientFactory()
+        .getTaskActionClient()
         .submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
 
     // Verify none of these segments have versions > lock version
@@ -107,7 +107,7 @@
     }
 
     // Remove metadata for these segments
-    toolbox.getTaskActionClientFactory().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
+    toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
 
     return TaskStatus.success(getId());
   }
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
index e0b3dd6..4bda036 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
@@ -20,15 +20,12 @@
 package com.metamx.druid.merger.common.task;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.google.common.base.Charsets;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -36,7 +33,6 @@
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 import com.metamx.common.ISE;
-import com.metamx.common.logger.Logger;
 import com.metamx.druid.client.DataSegment;
 import com.metamx.druid.merger.common.TaskLock;
 import com.metamx.druid.merger.common.TaskStatus;
@@ -45,7 +41,6 @@
 import com.metamx.druid.merger.common.actions.SegmentInsertAction;
 import com.metamx.druid.shard.NoneShardSpec;
 import com.metamx.emitter.EmittingLogger;
-import com.metamx.emitter.service.AlertEvent;
 import com.metamx.emitter.service.ServiceEmitter;
 import com.metamx.emitter.service.ServiceMetricEvent;
 import org.joda.time.DateTime;
@@ -115,7 +110,7 @@
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
-    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+    final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
     final ServiceEmitter emitter = toolbox.getEmitter();
     final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
     final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
@@ -166,7 +161,7 @@
       emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
       emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
 
-      toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
+      toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
 
       return TaskStatus.success(getId());
     }
@@ -196,7 +191,7 @@
     };
 
     final Set<String> current = ImmutableSet.copyOf(
-        Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier)
+        Iterables.transform(toolbox.getTaskActionClient().submit(defaultListUsedAction()), toIdentifier)
     );
     final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
 
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
index c8c0e2c..cebebd2 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
@@ -93,7 +93,7 @@
       return super.preflight(toolbox);
     }
 
-    final TaskActionClient taskClient = toolbox.getTaskActionClientFactory();
+    final TaskActionClient taskClient = toolbox.getTaskActionClient();
 
     List<DataSegment> segments = taskClient.submit(defaultListUsedAction());
 
@@ -176,7 +176,7 @@
       DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
       updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
 
-      toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
+      toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
     } else {
       log.info("Conversion failed.");
     }
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
index 5452acac..a83f071 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
@@ -243,14 +243,22 @@
   public void init() throws Exception
   {
     scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
+    initializeDB();
 
     final ConfigManagerConfig managerConfig = configFactory.build(ConfigManagerConfig.class);
     DbConnector.createConfigTable(dbi, managerConfig.getConfigTable());
-    JacksonConfigManager configManager = new JacksonConfigManager(new ConfigManager(dbi, managerConfig), jsonMapper);
+    JacksonConfigManager configManager =
+        new JacksonConfigManager(
+            lifecycle.addManagedInstance(
+                new ConfigManager(
+                    dbi,
+                    managerConfig
+                )
+            ), jsonMapper
+        );
 
     initializeEmitter();
     initializeMonitors();
-    initializeDB();
     initializeIndexerCoordinatorConfig();
     initializeTaskConfig();
     initializeS3Service();
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java
index b5afa1d..21adae8 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java
@@ -23,7 +23,6 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.metamx.common.logger.Logger;
@@ -35,8 +34,6 @@
 import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
 import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
 import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
-import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
-import com.metamx.druid.merger.coordinator.scaling.ScalingStats;
 import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
 import com.metamx.emitter.service.ServiceEmitter;
 
@@ -188,7 +185,7 @@
   public <T> Response doAction(final TaskActionHolder<T> holder)
   {
     final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask())
-                                     .getTaskActionClientFactory()
+                                     .getTaskActionClient()
                                      .submit(holder.getAction());
 
     final Map<String, Object> retMap = Maps.newHashMap();
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java
index 4fac2c5..c943697 100644
--- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java
@@ -66,7 +66,6 @@
 import com.metamx.emitter.service.ServiceEventBuilder;
 import org.apache.commons.io.FileUtils;
 import org.easymock.EasyMock;
-import org.jets3t.service.ServiceException;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.After;
@@ -283,8 +282,8 @@
         // Sort of similar to what realtime tasks do:
 
         // Acquire lock for first interval
-        final Optional<TaskLock> lock1 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval1));
-        final List<TaskLock> locks1 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+        final Optional<TaskLock> lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
+        final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
 
         // (Confirm lock sanity)
         Assert.assertTrue("lock1 present", lock1.isPresent());
@@ -292,8 +291,8 @@
         Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1);
 
         // Acquire lock for second interval
-        final Optional<TaskLock> lock2 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval2));
-        final List<TaskLock> locks2 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+        final Optional<TaskLock> lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
+        final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
 
         // (Confirm lock sanity)
         Assert.assertTrue("lock2 present", lock2.isPresent());
@@ -301,7 +300,7 @@
         Assert.assertEquals("locks2", ImmutableList.of(lock1.get(), lock2.get()), locks2);
 
         // Push first segment
-        toolbox.getTaskActionClientFactory()
+        toolbox.getTaskActionClient()
                .submit(
                    new SegmentInsertAction(
                        ImmutableSet.of(
@@ -315,14 +314,14 @@
                );
 
         // Release first lock
-        toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval1));
-        final List<TaskLock> locks3 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+        toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
+        final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction());
 
         // (Confirm lock sanity)
         Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3);
 
         // Push second segment
-        toolbox.getTaskActionClientFactory()
+        toolbox.getTaskActionClient()
                .submit(
                    new SegmentInsertAction(
                        ImmutableSet.of(
@@ -336,8 +335,8 @@
                );
 
         // Release second lock
-        toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval2));
-        final List<TaskLock> locks4 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+        toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
+        final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction());
 
         // (Confirm lock sanity)
         Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4);
@@ -370,7 +369,7 @@
       public TaskStatus run(TaskToolbox toolbox) throws Exception
       {
         final TaskLock myLock = Iterables.getOnlyElement(
-            toolbox.getTaskActionClientFactory()
+            toolbox.getTaskActionClient()
                    .submit(new LockListAction())
         );
 
@@ -380,7 +379,7 @@
                                                .version(myLock.getVersion())
                                                .build();
 
-        toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
+        toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
         return TaskStatus.success(getId());
       }
     };
@@ -406,7 +405,7 @@
       @Override
       public TaskStatus run(TaskToolbox toolbox) throws Exception
       {
-        final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+        final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
 
         final DataSegment segment = DataSegment.builder()
                                                .dataSource("ds")
@@ -414,7 +413,7 @@
                                                .version(myLock.getVersion())
                                                .build();
 
-        toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
+        toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
         return TaskStatus.success(getId());
       }
     };
@@ -440,7 +439,7 @@
       @Override
       public TaskStatus run(TaskToolbox toolbox) throws Exception
       {
-        final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+        final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
 
         final DataSegment segment = DataSegment.builder()
                                                .dataSource("ds")
@@ -448,7 +447,7 @@
                                                .version(myLock.getVersion() + "1!!!1!!")
                                                .build();
 
-        toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
+        toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
         return TaskStatus.success(getId());
       }
     };
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java
index dfb0d95..939dc9b 100644
--- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java
@@ -375,7 +375,7 @@
       @Override
       public TaskStatus run(TaskToolbox toolbox) throws Exception
       {
-        toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks));
+        toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
         return TaskStatus.success(id);
       }
     };