Merge pull request #712 from metamx/indexing-extra-classpaths
Allow indexing tasks to specify extra classpaths.
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index f03c552..959bb54 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -108,6 +108,12 @@
}
@Override
+ public String getClasspathPrefix()
+ {
+ return null;
+ }
+
+ @Override
public String toString()
{
return Objects.toStringHelper(this)
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 69e5c66..09f850e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -79,6 +79,8 @@
private final HadoopIngestionSpec spec;
@JsonIgnore
private final List<String> hadoopDependencyCoordinates;
+ @JsonIgnore
+ private final String classpathPrefix;
/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
@@ -96,7 +98,8 @@
@JsonProperty("spec") HadoopIngestionSpec spec,
@JsonProperty("config") HadoopIngestionSpec config, // backwards compat
@JsonProperty("hadoopCoordinates") String hadoopCoordinates,
- @JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
+ @JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
+ @JsonProperty("classpathPrefix") String classpathPrefix
)
{
super(
@@ -123,6 +126,8 @@
// Will be defaulted to something at runtime, based on taskConfig.
this.hadoopDependencyCoordinates = null;
}
+
+ this.classpathPrefix = classpathPrefix;
}
@Override
@@ -159,6 +164,13 @@
return hadoopDependencyCoordinates;
}
+ @JsonProperty
+ @Override
+ public String getClasspathPrefix()
+ {
+ return classpathPrefix;
+ }
+
@SuppressWarnings("unchecked")
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index f939516..f554d96 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -99,6 +99,12 @@
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
+ * Returns an extra classpath that should be prepended to the default classpath when running this task. If no
+ * extra classpath should be prepended, this should return null or the empty string.
+ */
+ public String getClasspathPrefix();
+
+ /**
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
index 0c740fb..36aa925 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
@@ -161,10 +161,19 @@
final List<String> command = Lists.newArrayList();
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
+ final String taskClasspath;
+ if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
+ taskClasspath = Joiner.on(File.pathSeparator).join(
+ task.getClasspathPrefix(),
+ config.getClasspath()
+ );
+ } else {
+ taskClasspath = config.getClasspath();
+ }
command.add(config.getJavaCommand());
command.add("-cp");
- command.add(config.getClasspath());
+ command.add(taskClasspath);
Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
index fc7f13e..d565aa5 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
@@ -427,7 +427,8 @@
null
),
null,
- null
+ null,
+ "blah"
);
final String json = jsonMapper.writeValueAsString(task);
@@ -442,5 +443,7 @@
task.getSpec().getTuningConfig().getJobProperties(),
task2.getSpec().getTuningConfig().getJobProperties()
);
+ Assert.assertEquals("blah", task.getClasspathPrefix());
+ Assert.assertEquals("blah", task2.getClasspathPrefix());
}
}