CRUNCH-635: Output path per key for Text target
Signed-off-by: Josh Wills <jwills@apache.org>
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/TextPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/TextPathPerKeyIT.java
new file mode 100644
index 0000000..6cfb8ed
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/TextPathPerKeyIT.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.Compress;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TextPathPerKeyIT extends CrunchTestSupport implements Serializable {
+ @Test
+ public void testOutputFilePerKey() throws Exception {
+ Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+ Path outDir = tempDir.getPath("out");
+ PTable<String, String> pTable = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+ .parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] p = input.split("\t");
+ return Pair.of(p[0], p[1]);
+ }
+ }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+ pTable.groupByKey()
+ .write(new TextPathPerKeyTarget(outDir));
+ p.done();
+
+ Set<String> names = Sets.newHashSet();
+ FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+ for (FileStatus fstat : fs.listStatus(outDir)) {
+ names.add(fstat.getPath().getName());
+ }
+ assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+ FileStatus[] aStat = fs.listStatus(new Path(outDir, "A"));
+ assertEquals(1, aStat.length);
+ assertEquals("part-r-00000", aStat[0].getPath().getName());
+
+ FileStatus[] bStat = fs.listStatus(new Path(outDir, "B"));
+ assertEquals(1, bStat.length);
+ assertEquals("part-r-00000", bStat[0].getPath().getName());
+ }
+
+ @Test
+ public void testOutputFilePerKeyMapOnlyJob() throws Exception {
+ Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+ Path outDir = tempDir.getPath("out");
+ PTable<String, String> pTable = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+ .parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] p = input.split("\t");
+ return Pair.of(p[0], p[1]);
+ }
+ }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+ pTable.write(new TextPathPerKeyTarget(outDir));
+ p.done();
+
+ Set<String> names = Sets.newHashSet();
+ FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+ for (FileStatus fstat : fs.listStatus(outDir)) {
+ names.add(fstat.getPath().getName());
+ }
+ assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+ FileStatus[] aStat = fs.listStatus(new Path(outDir, "A"));
+ assertEquals(1, aStat.length);
+ assertEquals("part-m-00000", aStat[0].getPath().getName());
+
+ FileStatus[] bStat = fs.listStatus(new Path(outDir, "B"));
+ assertEquals(1, bStat.length);
+ assertEquals("part-m-00000", bStat[0].getPath().getName());
+ }
+
+ @Test
+ public void testOutputFilePerKeyWithNestedSubFoldersAndCompression() throws Exception {
+ Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+ Path outDir = tempDir.getPath("out");
+ PTable<String, String> pTable = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+ .parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] p = input.split("\t");
+ return Pair.of(p[0] + "/dir", p[1]);
+ }
+ }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+ pTable.groupByKey()
+ .write(new TextPathPerKeyTarget(outDir));
+
+ Path outDir2 = tempDir.getPath("out2");
+
+ pTable.groupByKey()
+ .write(Compress.gzip(new TextPathPerKeyTarget(outDir2)));
+ p.done();
+
+ assertSubDirs(outDir, "");
+ assertSubDirs(outDir2, ".gz");
+ }
+
+ private void assertSubDirs(Path outDir, String extension) throws IOException {
+ Set<String> names = Sets.newHashSet();
+ FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+ for (FileStatus fstat : fs.listStatus(outDir)) {
+ names.add(fstat.getPath().getName());
+ }
+ assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+ FileStatus[] aStat = fs.listStatus(new Path(outDir, "A"));
+ assertEquals(1, aStat.length);
+ assertEquals("dir", aStat[0].getPath().getName());
+ FileStatus[] aDirStat = fs.listStatus(aStat[0].getPath());
+ assertEquals("part-r-00000" + extension, aDirStat[0].getPath().getName());
+
+ FileStatus[] bStat = fs.listStatus(new Path(outDir, "B"));
+ assertEquals(1, bStat.length);
+ assertEquals("dir", bStat[0].getPath().getName());
+ FileStatus[] bDirStat = fs.listStatus(bStat[0].getPath());
+ assertEquals("part-r-00000" + extension, bDirStat[0].getPath().getName());
+ }
+
+ @Test
+ public void testOutputFilePerKey_NothingToOutput() throws Exception {
+ Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+ Path outDir = tempDir.getPath("out");
+
+ p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+ .parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] p = input.split("\t");
+ return Pair.of(p[0], p[1]);
+ }
+ }, Writables.tableOf(Writables.strings(), Writables.strings()))
+ .filter(FilterFns.<Pair<String, String>>REJECT_ALL())
+ .groupByKey()
+ .write(new TextPathPerKeyTarget(outDir));
+ p.done();
+
+ FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+ assertFalse(fs.exists(outDir));
+ }
+
+ @Test
+ public void testOutputFilePerKey_Directories() throws Exception {
+ Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+ Path outDir = tempDir.getPath("out");
+ p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+ .parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] p = input.split("\t");
+ return Pair.of(p[0] + "/child", p[1]);
+ }
+ }, Writables.tableOf(Writables.strings(), Writables.strings()))
+ .groupByKey()
+ .write(new TextPathPerKeyTarget(outDir));
+ p.done();
+
+ Set<String> names = Sets.newHashSet();
+ FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+ for (FileStatus fstat : fs.listStatus(outDir)) {
+ names.add(fstat.getPath().getName());
+ }
+ assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+ Path aParent = new Path(outDir, "A");
+ FileStatus[] aParentStat = fs.listStatus(aParent);
+ assertEquals(1, aParentStat.length);
+ assertEquals("child", aParentStat[0].getPath().getName());
+ FileStatus[] aChildStat = fs.listStatus(new Path(aParent, "child"));
+ assertEquals(1, aChildStat.length);
+ assertEquals("part-r-00000", aChildStat[0].getPath().getName());
+
+ Path bParent = new Path(outDir, "B");
+ FileStatus[] bParentStat = fs.listStatus(bParent);
+ assertEquals(1, bParentStat.length);
+ assertEquals("child", bParentStat[0].getPath().getName());
+ FileStatus[] bChildStat = fs.listStatus(new Path(bParent, "child"));
+ assertEquals(1, bChildStat.length);
+ assertEquals("part-r-00000", bChildStat[0].getPath().getName());
+ }
+}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyOutputFormat.java
new file mode 100644
index 0000000..67579e4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyOutputFormat.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class TextPathPerKeyOutputFormat<V> extends TextOutputFormat<Text, V> {
+ @Override
+ public RecordWriter<Text, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
+ Configuration conf = taskAttemptContext.getConfiguration();
+
+ FileOutputCommitter outputCommitter = (FileOutputCommitter) getOutputCommitter(taskAttemptContext);
+ Path basePath = new Path(outputCommitter.getWorkPath(), conf.get("mapreduce.output.basename", "part"));
+
+ boolean isCompressed = FileOutputFormat.getCompressOutput(taskAttemptContext);
+ CompressionCodec codec = null;
+ String extension = "";
+
+ if (isCompressed) {
+ Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(taskAttemptContext, GzipCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, conf);
+ extension = codec.getDefaultExtension();
+ }
+
+ return new TextPathPerKeyRecordWriter<>(basePath, getUniqueFile(taskAttemptContext, "part", extension),
+ isCompressed, codec, taskAttemptContext);
+ }
+
+ private class TextPathPerKeyRecordWriter<V> extends RecordWriter<Text, V> {
+
+ private final Path basePath;
+ private final String uniqueFileName;
+ private final Configuration conf;
+ private String currentKey;
+ private RecordWriter<V, NullWritable> currentWriter;
+ private CompressionCodec compressionCodec;
+ private boolean isCompressed;
+ private TaskAttemptContext taskAttemptContext;
+
+ public TextPathPerKeyRecordWriter(Path basePath, String uniqueFileName, boolean isCompressed,
+ CompressionCodec codec, TaskAttemptContext context) {
+ this.basePath = basePath;
+ this.uniqueFileName = uniqueFileName;
+ this.conf = context.getConfiguration();
+ this.isCompressed = isCompressed;
+ this.compressionCodec = codec;
+ this.taskAttemptContext = context;
+ }
+
+ @Override
+ public void write(Text record, V n) throws IOException, InterruptedException {
+ String key = record.toString();
+ if (!key.equals(currentKey)) {
+ if (currentWriter != null) {
+ currentWriter.close(taskAttemptContext);
+ }
+
+ currentKey = key;
+ Path dir = new Path(basePath, key);
+ FileSystem fs = dir.getFileSystem(conf);
+ if (!fs.exists(dir)) {
+ fs.mkdirs(dir);
+ }
+
+ Path filePath = new Path(dir, uniqueFileName);
+
+ DataOutputStream dataOutputStream;
+
+ if (fs.exists(filePath)) {
+ dataOutputStream = fs.append(filePath);
+ } else {
+ dataOutputStream = fs.create(filePath);
+ }
+
+ if (isCompressed && compressionCodec != null) {
+ dataOutputStream = new DataOutputStream(compressionCodec.createOutputStream(dataOutputStream));
+ }
+
+ String keyValueSeparator = conf.get(SEPERATOR, "\t");
+ currentWriter = new LineRecordWriter<>(dataOutputStream, keyValueSeparator);
+ }
+
+ currentWriter.write(n, NullWritable.get());
+ }
+
+ @Override
+ public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ if (currentWriter != null) {
+ currentWriter.close(taskAttemptContext);
+ currentKey = null;
+ currentWriter = null;
+ }
+ }
+ }
+}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyTarget.java
new file mode 100644
index 0000000..574651c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyTarget.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import com.google.common.collect.Maps;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TextPathPerKeyTarget extends FileTargetImpl {
+
+ private Map<String, String> extraConf = Maps.newHashMap();
+
+ private static final Logger LOG = LoggerFactory.getLogger(TextPathPerKeyTarget.class);
+
+ public TextPathPerKeyTarget(String path) {
+ this(new Path(path));
+ }
+
+ public TextPathPerKeyTarget(Path path) {
+ this(path, SequentialFileNamingScheme.getInstance());
+ }
+
+ public TextPathPerKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
+ super(path, TextPathPerKeyOutputFormat.class, fileNamingScheme);
+ }
+
+ @Override
+ public boolean accept(OutputHandler handler, PType<?> ptype) {
+ if (ptype instanceof PTableType && ptype.getFamily() == WritableTypeFamily.getInstance()) {
+ if (String.class.equals(((PTableType) ptype).getKeyType().getTypeClass())) {
+ handler.configure(this, ptype);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Target outputConf(String key, String value) {
+ extraConf.put(key, value);
+ return this;
+ }
+
+ @Override
+ public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+ FormatBundle bundle = FormatBundle.forOutput(TextPathPerKeyOutputFormat.class);
+ for (Map.Entry<String, String> e : extraConf.entrySet()) {
+ bundle.set(e.getKey(), e.getValue());
+ }
+
+ Converter converter = ((PTableType) ptype).getValueType().getConverter();
+ Class valueClass = converter.getValueClass();
+ configureForMapReduce(job, valueClass, NullWritable.class, bundle, outputPath, name);
+ }
+
+ @Override
+ public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
+ FileSystem srcFs = workingPath.getFileSystem(conf);
+ Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
+ if (!srcFs.exists(base)) {
+ LOG.warn("Nothing to copy from {}", base);
+ return;
+ }
+
+ FileSystem dstFs = path.getFileSystem(conf);
+ if (!dstFs.exists(path)) {
+ dstFs.mkdirs(path);
+ }
+
+ boolean sameFs = isCompatible(srcFs, path);
+ move(conf, base, srcFs, path, dstFs, sameFs);
+ dstFs.create(getSuccessIndicator(), true).close();
+ }
+
+ private void move(Configuration conf, Path srcBase, FileSystem srcFs, Path dstBase, FileSystem dstFs, boolean sameFs)
+ throws IOException {
+ Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(srcBase));
+ if (!dstFs.exists(dstBase)) {
+ dstFs.mkdirs(dstBase);
+ }
+ for (Path key : keys) {
+ Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(key), key);
+ Path targetPath = new Path(dstBase, key.getName());
+ dstFs.mkdirs(targetPath);
+ for (Path s : srcs) {
+ if (srcFs.isDirectory(s)) {
+ move(conf, key, srcFs, targetPath, dstFs, sameFs);
+ } else {
+ Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-"));
+ if (sameFs) {
+ srcFs.rename(s, d);
+ } else {
+ FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "TextFilePerKey(" + path + ")";
+ }
+}