Fix Index hadoop failing with index.zip is not a valid DFS filename (#11316)
* * Fix bug
* * simplify class loading
* * fix example configs for integration tests
* Small classloader cleanup
Co-authored-by: jon-wei <jon.wei@imply.io>
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
index c680d27..7e0ab4c 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java
@@ -476,8 +476,8 @@
return new DataSegmentAndIndexZipFilePath(
finalSegment,
- tmpPath.toUri().getPath(),
- finalIndexZipFilePath.toUri().getPath()
+ tmpPath.toUri().toString(),
+ finalIndexZipFilePath.toUri().toString()
);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 37ffb4c..a163905 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -450,16 +450,11 @@
List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths = buildSegmentsStatus.getDataSegmentAndIndexZipFilePaths();
if (dataSegmentAndIndexZipFilePaths != null) {
indexGeneratorJobSuccess = true;
- try {
- Thread.currentThread().setContextClassLoader(oldLoader);
- renameSegmentIndexFilesJob(
- toolbox.getJsonMapper().writeValueAsString(indexerSchema),
- toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
- );
- }
- finally {
- Thread.currentThread().setContextClassLoader(loader);
- }
+ renameSegmentIndexFilesJob(
+ toolbox.getJsonMapper().writeValueAsString(indexerSchema),
+ toolbox.getJsonMapper().writeValueAsString(dataSegmentAndIndexZipFilePaths)
+ );
+
ArrayList<DataSegment> segments = new ArrayList<>(dataSegmentAndIndexZipFilePaths.stream()
.map(
DataSegmentAndIndexZipFilePath::getSegment)
@@ -545,22 +540,20 @@
}
}
+ /**
+ * Must be called only when the hadoopy classloader is the current classloader
+ */
private void renameSegmentIndexFilesJob(
String hadoopIngestionSpecStr,
String dataSegmentAndIndexZipFilePathListStr
)
{
- final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
+ final ClassLoader loader = Thread.currentThread().getContextClassLoader();
try {
- ClassLoader loader = HadoopTask.buildClassLoader(
- getHadoopDependencyCoordinates(),
- taskConfig.getDefaultHadoopCoordinates()
+ final Class<?> clazz = loader.loadClass(
+ "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner"
);
-
- Object renameSegmentIndexFilesRunner = getForeignClassloaderObject(
- "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner",
- loader
- );
+ Object renameSegmentIndexFilesRunner = clazz.newInstance();
String[] renameSegmentIndexFilesJobInput = new String[]{
hadoopIngestionSpecStr,
@@ -573,7 +566,6 @@
renameSegmentIndexFilesJobInput.getClass()
);
- Thread.currentThread().setContextClassLoader(loader);
renameSegmentIndexFiles.invoke(
renameSegmentIndexFilesRunner,
new Object[]{renameSegmentIndexFilesJobInput}
@@ -582,9 +574,6 @@
catch (Exception e) {
throw new RuntimeException(e);
}
- finally {
- Thread.currentThread().setContextClassLoader(oldLoader);
- }
}
private void indexerGeneratorCleanupJob(
diff --git a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs
index cd70973..b16500a 100644
--- a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs
+++ b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_hdfs
@@ -31,4 +31,4 @@
druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"]
-druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]
+druid_indexer_task_defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]
diff --git a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3 b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3
index 4ad6896..60dd856 100644
--- a/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3
+++ b/integration-tests/docker/environment-configs/override-examples/hadoop/s3_to_s3
@@ -32,4 +32,4 @@
druid_extensions_loadList=["druid-s3-extensions","druid-hdfs-storage"]
-druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]
+druid_indexer_task_defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.8.5", "org.apache.hadoop:hadoop-aws:2.8.5"]