Merge branch 'master' of github.com:metamx/druid
diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java b/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java
index 24706a8..94ab2ea 100644
--- a/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java
+++ b/common/src/main/java/com/metamx/druid/config/ConfigManagerConfig.java
@@ -8,7 +8,7 @@
*/
public abstract class ConfigManagerConfig
{
- @Config("druid.indexer.configTable")
+ @Config("druid.database.configTable")
public abstract String getConfigTable();
@Config("druid.indexer.poll.duration")
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
index 32cb188..e69b0f8 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java
@@ -80,7 +80,7 @@
return config;
}
- public TaskActionClient getTaskActionClientFactory()
+ public TaskActionClient getTaskActionClient()
{
return taskActionClientFactory.create(task);
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
index 67754ee..86fd2a7 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java
@@ -77,7 +77,7 @@
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Strategy: Create an empty segment covering the interval to be deleted
- final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+ final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final Interval interval = this.getImplicitLockInterval().get();
final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]);
final IndexableAdapter emptyAdapter = new IncrementalIndexAdapter(interval, empty);
@@ -104,7 +104,7 @@
segment.getVersion()
);
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId());
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
index f0d09d5..6e28455 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java
@@ -93,7 +93,7 @@
);
// We should have a lock from before we started running
- final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+ final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
log.info("Setting version to: %s", myLock.getVersion());
configCopy.setVersion(myLock.getVersion());
@@ -124,7 +124,7 @@
List<DataSegment> publishedSegments = job.getPublishedSegments();
// Request segment pushes
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(publishedSegments)));
// Done
return TaskStatus.success(getId());
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
index 3dfe99a..47f72b1 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java
@@ -258,7 +258,7 @@
}
);
- toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks));
+ toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(getId());
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
index 1790ddc..dd92888 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java
@@ -100,7 +100,7 @@
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
// We should have a lock from before we started running
- final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+ final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
// We know this exists
final Interval interval = getImplicitLockInterval().get();
@@ -190,7 +190,7 @@
);
// Request segment pushes
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(pushedSegments)));
// Done
return TaskStatus.success(getId());
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
index d1bfc6d..35babcd 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java
@@ -142,7 +142,7 @@
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{
- toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(toSubtasks()));
+ toolbox.getTaskActionClient().submit(new SpawnTasksAction(toSubtasks()));
return TaskStatus.success(getId());
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
index e652ab6..f4476ff 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java
@@ -72,7 +72,7 @@
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
// Confirm we have a lock (will throw if there isn't exactly one element)
- final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+ final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
if(!myLock.getDataSource().equals(getDataSource())) {
throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@@ -84,7 +84,7 @@
// List unused segments
final List<DataSegment> unusedSegments = toolbox
- .getTaskActionClientFactory()
+ .getTaskActionClient()
.submit(new SegmentListUnusedAction(myLock.getDataSource(), myLock.getInterval()));
// Verify none of these segments have versions > lock version
@@ -107,7 +107,7 @@
}
// Remove metadata for these segments
- toolbox.getTaskActionClientFactory().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
+ toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.copyOf(unusedSegments)));
return TaskStatus.success(getId());
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
index e0b3dd6..4bda036 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java
@@ -20,15 +20,12 @@
package com.metamx.druid.merger.common.task;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -36,7 +33,6 @@
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import com.metamx.common.ISE;
-import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskLock;
import com.metamx.druid.merger.common.TaskStatus;
@@ -45,7 +41,6 @@
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.emitter.EmittingLogger;
-import com.metamx.emitter.service.AlertEvent;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.DateTime;
@@ -115,7 +110,7 @@
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
- final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+ final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final ServiceEmitter emitter = toolbox.getEmitter();
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
@@ -166,7 +161,7 @@
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(uploadedSegment)));
return TaskStatus.success(getId());
}
@@ -196,7 +191,7 @@
};
final Set<String> current = ImmutableSet.copyOf(
- Iterables.transform(toolbox.getTaskActionClientFactory().submit(defaultListUsedAction()), toIdentifier)
+ Iterables.transform(toolbox.getTaskActionClient().submit(defaultListUsedAction()), toIdentifier)
);
final Set<String> requested = ImmutableSet.copyOf(Iterables.transform(segments, toIdentifier));
diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
index c8c0e2c..cebebd2 100644
--- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
+++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java
@@ -93,7 +93,7 @@
return super.preflight(toolbox);
}
- final TaskActionClient taskClient = toolbox.getTaskActionClientFactory();
+ final TaskActionClient taskClient = toolbox.getTaskActionClient();
List<DataSegment> segments = taskClient.submit(defaultListUsedAction());
@@ -176,7 +176,7 @@
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
} else {
log.info("Conversion failed.");
}
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
index 5452acac..a83f071 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java
@@ -243,14 +243,22 @@
public void init() throws Exception
{
scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
+ initializeDB();
final ConfigManagerConfig managerConfig = configFactory.build(ConfigManagerConfig.class);
DbConnector.createConfigTable(dbi, managerConfig.getConfigTable());
- JacksonConfigManager configManager = new JacksonConfigManager(new ConfigManager(dbi, managerConfig), jsonMapper);
+ JacksonConfigManager configManager =
+ new JacksonConfigManager(
+ lifecycle.addManagedInstance(
+ new ConfigManager(
+ dbi,
+ managerConfig
+ )
+ ), jsonMapper
+ );
initializeEmitter();
initializeMonitors();
- initializeDB();
initializeIndexerCoordinatorConfig();
initializeTaskConfig();
initializeS3Service();
diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java
index b5afa1d..21adae8 100644
--- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java
+++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java
@@ -23,7 +23,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
@@ -35,8 +34,6 @@
import com.metamx.druid.merger.coordinator.TaskMasterLifecycle;
import com.metamx.druid.merger.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
-import com.metamx.druid.merger.coordinator.scaling.AutoScalingData;
-import com.metamx.druid.merger.coordinator.scaling.ScalingStats;
import com.metamx.druid.merger.coordinator.setup.WorkerSetupData;
import com.metamx.emitter.service.ServiceEmitter;
@@ -188,7 +185,7 @@
public <T> Response doAction(final TaskActionHolder<T> holder)
{
final T ret = taskMasterLifecycle.getTaskToolbox(holder.getTask())
- .getTaskActionClientFactory()
+ .getTaskActionClient()
.submit(holder.getAction());
final Map<String, Object> retMap = Maps.newHashMap();
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java
index 4fac2c5..c943697 100644
--- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java
@@ -66,7 +66,6 @@
import com.metamx.emitter.service.ServiceEventBuilder;
import org.apache.commons.io.FileUtils;
import org.easymock.EasyMock;
-import org.jets3t.service.ServiceException;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
@@ -283,8 +282,8 @@
// Sort of similar to what realtime tasks do:
// Acquire lock for first interval
- final Optional<TaskLock> lock1 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval1));
- final List<TaskLock> locks1 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+ final Optional<TaskLock> lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
+ final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertTrue("lock1 present", lock1.isPresent());
@@ -292,8 +291,8 @@
Assert.assertEquals("locks1", ImmutableList.of(lock1.get()), locks1);
// Acquire lock for second interval
- final Optional<TaskLock> lock2 = toolbox.getTaskActionClientFactory().submit(new LockAcquireAction(interval2));
- final List<TaskLock> locks2 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+ final Optional<TaskLock> lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
+ final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertTrue("lock2 present", lock2.isPresent());
@@ -301,7 +300,7 @@
Assert.assertEquals("locks2", ImmutableList.of(lock1.get(), lock2.get()), locks2);
// Push first segment
- toolbox.getTaskActionClientFactory()
+ toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
@@ -315,14 +314,14 @@
);
// Release first lock
- toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval1));
- final List<TaskLock> locks3 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+ toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1));
+ final List<TaskLock> locks3 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks3", ImmutableList.of(lock2.get()), locks3);
// Push second segment
- toolbox.getTaskActionClientFactory()
+ toolbox.getTaskActionClient()
.submit(
new SegmentInsertAction(
ImmutableSet.of(
@@ -336,8 +335,8 @@
);
// Release second lock
- toolbox.getTaskActionClientFactory().submit(new LockReleaseAction(interval2));
- final List<TaskLock> locks4 = toolbox.getTaskActionClientFactory().submit(new LockListAction());
+ toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2));
+ final List<TaskLock> locks4 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
Assert.assertEquals("locks4", ImmutableList.<TaskLock>of(), locks4);
@@ -370,7 +369,7 @@
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(
- toolbox.getTaskActionClientFactory()
+ toolbox.getTaskActionClient()
.submit(new LockListAction())
);
@@ -380,7 +379,7 @@
.version(myLock.getVersion())
.build();
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
@@ -406,7 +405,7 @@
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
- final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+ final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder()
.dataSource("ds")
@@ -414,7 +413,7 @@
.version(myLock.getVersion())
.build();
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
@@ -440,7 +439,7 @@
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
- final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClientFactory().submit(new LockListAction()));
+ final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
final DataSegment segment = DataSegment.builder()
.dataSource("ds")
@@ -448,7 +447,7 @@
.version(myLock.getVersion() + "1!!!1!!")
.build();
- toolbox.getTaskActionClientFactory().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
+ toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
return TaskStatus.success(getId());
}
};
diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java
index dfb0d95..939dc9b 100644
--- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java
+++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java
@@ -375,7 +375,7 @@
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
- toolbox.getTaskActionClientFactory().submit(new SpawnTasksAction(nextTasks));
+ toolbox.getTaskActionClient().submit(new SpawnTasksAction(nextTasks));
return TaskStatus.success(id);
}
};