CRUNCH-670: Make AvroPathPerKeyTarget work with the Spark Runtime.
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index a3ecbb8..9f76b45 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -92,6 +92,11 @@
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
+ if (index == -1) {
+ // Map the -1 index from the SparkRuntime to the (correct) out0 value that
+ // the AvroPathPerKeyTarget expects.
+ index = 0;
+ }
Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
if (!srcFs.exists(base)) {
LOG.warn("Nothing to copy from {}", base);
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
index 852483d..087e935 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetPathPerKeyOutputFormat.java
@@ -42,7 +42,7 @@
public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
- Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "part"));
+ Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "out0"));
return new AvroParquetFilePerKeyRecordWriter<T>(basePath,
getUniqueFile(taskAttemptContext, "part", ".parquet"), conf);
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
index da84fc0..6563707 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroPathPerKeyOutputFormat.java
@@ -44,7 +44,7 @@
public RecordWriter<AvroWrapper<Pair<Utf8, T>>, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
Configuration conf = taskAttemptContext.getConfiguration();
- Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "part"));
+ Path basePath = new Path(getOutputPath(taskAttemptContext), conf.get("mapreduce.output.basename", "out0"));
return new AvroFilePerKeyRecordWriter<T>(basePath, getUniqueFile(taskAttemptContext, "part", ".avro"), conf);
}