Merge pull request #712 from metamx/indexing-extra-classpaths

Allow indexing tasks to specify extra classpaths.
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index f03c552..959bb54 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -108,6 +108,12 @@
   }
 
   @Override
+  public String getClasspathPrefix()
+  {
+    return null;
+  }
+
+  @Override
   public String toString()
   {
     return Objects.toStringHelper(this)
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 69e5c66..09f850e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -79,6 +79,8 @@
   private final HadoopIngestionSpec spec;
   @JsonIgnore
   private final List<String> hadoopDependencyCoordinates;
+  @JsonIgnore
+  private final String classpathPrefix;
 
   /**
    * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
@@ -96,7 +98,8 @@
       @JsonProperty("spec") HadoopIngestionSpec spec,
       @JsonProperty("config") HadoopIngestionSpec config, // backwards compat
       @JsonProperty("hadoopCoordinates") String hadoopCoordinates,
-      @JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates
+      @JsonProperty("hadoopDependencyCoordinates") List<String> hadoopDependencyCoordinates,
+      @JsonProperty("classpathPrefix") String classpathPrefix
   )
   {
     super(
@@ -123,6 +126,8 @@
       // Will be defaulted to something at runtime, based on taskConfig.
       this.hadoopDependencyCoordinates = null;
     }
+
+    this.classpathPrefix = classpathPrefix;
   }
 
   @Override
@@ -159,6 +164,13 @@
     return hadoopDependencyCoordinates;
   }
 
+  @JsonProperty
+  @Override
+  public String getClasspathPrefix()
+  {
+    return classpathPrefix;
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index f939516..f554d96 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -99,6 +99,12 @@
   public <T> QueryRunner<T> getQueryRunner(Query<T> query);
 
   /**
+   * Returns an extra classpath that should be prepended to the default classpath when running this task. If no
+   * extra classpath should be prepended, this should return null or the empty string.
+   */
+  public String getClasspathPrefix();
+
+  /**
    * Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
    * actions must be idempotent, since this method may be executed multiple times. This typically runs on the
    * coordinator. If this method throws an exception, the task should be considered a failure.
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
index 0c740fb..36aa925 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java
@@ -161,10 +161,19 @@
 
                               final List<String> command = Lists.newArrayList();
                               final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
+                              final String taskClasspath;
+                              if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
+                                taskClasspath = Joiner.on(File.pathSeparator).join(
+                                    task.getClasspathPrefix(),
+                                    config.getClasspath()
+                                );
+                              } else {
+                                taskClasspath = config.getClasspath();
+                              }
 
                               command.add(config.getJavaCommand());
                               command.add("-cp");
-                              command.add(config.getClasspath());
+                              command.add(taskClasspath);
 
                               Iterables.addAll(command, whiteSpaceSplitter.split(config.getJavaOpts()));
 
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
index fc7f13e..d565aa5 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
@@ -427,7 +427,8 @@
             null
         ),
         null,
-        null
+        null,
+        "blah"
     );
 
     final String json = jsonMapper.writeValueAsString(task);
@@ -442,5 +443,7 @@
         task.getSpec().getTuningConfig().getJobProperties(),
         task2.getSpec().getTuningConfig().getJobProperties()
     );
+    Assert.assertEquals("blah", task.getClasspathPrefix());
+    Assert.assertEquals("blah", task2.getClasspathPrefix());
   }
 }