CRUNCH-635: Output path per key for Text target

Signed-off-by: Josh Wills <jwills@apache.org>
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/text/TextPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/text/TextPathPerKeyIT.java
new file mode 100644
index 0000000..6cfb8ed
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/io/text/TextPathPerKeyIT.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.FilterFns;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.Compress;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TextPathPerKeyIT extends CrunchTestSupport implements Serializable {
+    @Test
+    public void testOutputFilePerKey() throws Exception {
+        Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+        Path outDir = tempDir.getPath("out");
+        PTable<String, String> pTable = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+                .parallelDo(new MapFn<String, Pair<String, String>>() {
+                    @Override
+                    public Pair<String, String> map(String input) {
+                        String[] p = input.split("\t");
+                        return Pair.of(p[0], p[1]);
+                    }
+                }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+        pTable.groupByKey()
+                .write(new TextPathPerKeyTarget(outDir));
+        p.done();
+
+        Set<String> names = Sets.newHashSet();
+        FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+        for (FileStatus fstat : fs.listStatus(outDir)) {
+            names.add(fstat.getPath().getName());
+        }
+        assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+        FileStatus[] aStat = fs.listStatus(new Path(outDir, "A"));
+        assertEquals(1, aStat.length);
+        assertEquals("part-r-00000", aStat[0].getPath().getName());
+
+        FileStatus[] bStat = fs.listStatus(new Path(outDir, "B"));
+        assertEquals(1, bStat.length);
+        assertEquals("part-r-00000", bStat[0].getPath().getName());
+    }
+
+    @Test
+    public void testOutputFilePerKeyMapOnlyJob() throws Exception {
+        Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+        Path outDir = tempDir.getPath("out");
+        PTable<String, String> pTable = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+                .parallelDo(new MapFn<String, Pair<String, String>>() {
+                    @Override
+                    public Pair<String, String> map(String input) {
+                        String[] p = input.split("\t");
+                        return Pair.of(p[0], p[1]);
+                    }
+                }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+        pTable.write(new TextPathPerKeyTarget(outDir));
+        p.done();
+
+        Set<String> names = Sets.newHashSet();
+        FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+        for (FileStatus fstat : fs.listStatus(outDir)) {
+            names.add(fstat.getPath().getName());
+        }
+        assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+        FileStatus[] aStat = fs.listStatus(new Path(outDir, "A"));
+        assertEquals(1, aStat.length);
+        assertEquals("part-m-00000", aStat[0].getPath().getName());
+
+        FileStatus[] bStat = fs.listStatus(new Path(outDir, "B"));
+        assertEquals(1, bStat.length);
+        assertEquals("part-m-00000", bStat[0].getPath().getName());
+    }
+
+    @Test
+    public void testOutputFilePerKeyWithNestedSubFoldersAndCompression() throws Exception {
+        Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+        Path outDir = tempDir.getPath("out");
+        PTable<String, String> pTable = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+                .parallelDo(new MapFn<String, Pair<String, String>>() {
+                    @Override
+                    public Pair<String, String> map(String input) {
+                        String[] p = input.split("\t");
+                        return Pair.of(p[0] + "/dir", p[1]);
+                    }
+                }, Writables.tableOf(Writables.strings(), Writables.strings()));
+
+        pTable.groupByKey()
+                .write(new TextPathPerKeyTarget(outDir));
+
+        Path outDir2 = tempDir.getPath("out2");
+
+        pTable.groupByKey()
+                .write(Compress.gzip(new TextPathPerKeyTarget(outDir2)));
+        p.done();
+
+        assertSubDirs(outDir, "");
+        assertSubDirs(outDir2, ".gz");
+    }
+
+    private void assertSubDirs(Path outDir, String extension) throws IOException {
+        Set<String> names = Sets.newHashSet();
+        FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+        for (FileStatus fstat : fs.listStatus(outDir)) {
+            names.add(fstat.getPath().getName());
+        }
+        assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+        FileStatus[] aStat = fs.listStatus(new Path(outDir, "A"));
+        assertEquals(1, aStat.length);
+        assertEquals("dir", aStat[0].getPath().getName());
+        FileStatus[] aDirStat = fs.listStatus(aStat[0].getPath());
+        assertEquals("part-r-00000" + extension, aDirStat[0].getPath().getName());
+
+        FileStatus[] bStat = fs.listStatus(new Path(outDir, "B"));
+        assertEquals(1, bStat.length);
+        assertEquals("dir", bStat[0].getPath().getName());
+        FileStatus[] bDirStat = fs.listStatus(bStat[0].getPath());
+        assertEquals("part-r-00000" + extension, bDirStat[0].getPath().getName());
+    }
+
+    @Test
+    public void testOutputFilePerKey_NothingToOutput() throws Exception {
+        Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+        Path outDir = tempDir.getPath("out");
+
+        p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+                .parallelDo(new MapFn<String, Pair<String, String>>() {
+                    @Override
+                    public Pair<String, String> map(String input) {
+                        String[] p = input.split("\t");
+                        return Pair.of(p[0], p[1]);
+                    }
+                }, Writables.tableOf(Writables.strings(), Writables.strings()))
+                .filter(FilterFns.<Pair<String, String>>REJECT_ALL())
+                .groupByKey()
+                .write(new TextPathPerKeyTarget(outDir));
+        p.done();
+
+        FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+        assertFalse(fs.exists(outDir));
+    }
+
+    @Test
+    public void testOutputFilePerKey_Directories() throws Exception {
+        Pipeline p = new MRPipeline(this.getClass(), tempDir.getDefaultConfiguration());
+        Path outDir = tempDir.getPath("out");
+        p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+                .parallelDo(new MapFn<String, Pair<String, String>>() {
+                    @Override
+                    public Pair<String, String> map(String input) {
+                        String[] p = input.split("\t");
+                        return Pair.of(p[0] + "/child", p[1]);
+                    }
+                }, Writables.tableOf(Writables.strings(), Writables.strings()))
+                .groupByKey()
+                .write(new TextPathPerKeyTarget(outDir));
+        p.done();
+
+        Set<String> names = Sets.newHashSet();
+        FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+        for (FileStatus fstat : fs.listStatus(outDir)) {
+            names.add(fstat.getPath().getName());
+        }
+        assertEquals(ImmutableSet.of("A", "B", "_SUCCESS"), names);
+
+        Path aParent = new Path(outDir, "A");
+        FileStatus[] aParentStat = fs.listStatus(aParent);
+        assertEquals(1, aParentStat.length);
+        assertEquals("child", aParentStat[0].getPath().getName());
+        FileStatus[] aChildStat = fs.listStatus(new Path(aParent, "child"));
+        assertEquals(1, aChildStat.length);
+        assertEquals("part-r-00000", aChildStat[0].getPath().getName());
+
+        Path bParent = new Path(outDir, "B");
+        FileStatus[] bParentStat = fs.listStatus(bParent);
+        assertEquals(1, bParentStat.length);
+        assertEquals("child", bParentStat[0].getPath().getName());
+        FileStatus[] bChildStat = fs.listStatus(new Path(bParent, "child"));
+        assertEquals(1, bChildStat.length);
+        assertEquals("part-r-00000", bChildStat[0].getPath().getName());
+    }
+}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyOutputFormat.java
new file mode 100644
index 0000000..67579e4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyOutputFormat.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class TextPathPerKeyOutputFormat<V> extends TextOutputFormat<Text, V> {
+    @Override
+    public RecordWriter<Text, V> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
+        Configuration conf = taskAttemptContext.getConfiguration();
+
+        FileOutputCommitter outputCommitter = (FileOutputCommitter) getOutputCommitter(taskAttemptContext);
+        Path basePath = new Path(outputCommitter.getWorkPath(), conf.get("mapreduce.output.basename", "part"));
+
+        boolean isCompressed = FileOutputFormat.getCompressOutput(taskAttemptContext);
+        CompressionCodec codec = null;
+        String extension = "";
+
+        if (isCompressed) {
+            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(taskAttemptContext, GzipCodec.class);
+            codec = ReflectionUtils.newInstance(codecClass, conf);
+            extension = codec.getDefaultExtension();
+        }
+
+        return new TextPathPerKeyRecordWriter<>(basePath, getUniqueFile(taskAttemptContext, "part", extension),
+                isCompressed, codec, taskAttemptContext);
+    }
+
+    private class TextPathPerKeyRecordWriter<V> extends RecordWriter<Text, V> {
+
+        private final Path basePath;
+        private final String uniqueFileName;
+        private final Configuration conf;
+        private String currentKey;
+        private RecordWriter<V, NullWritable> currentWriter;
+        private CompressionCodec compressionCodec;
+        private boolean isCompressed;
+        private TaskAttemptContext taskAttemptContext;
+
+        public TextPathPerKeyRecordWriter(Path basePath, String uniqueFileName, boolean isCompressed,
+                                          CompressionCodec codec, TaskAttemptContext context) {
+            this.basePath = basePath;
+            this.uniqueFileName = uniqueFileName;
+            this.conf = context.getConfiguration();
+            this.isCompressed = isCompressed;
+            this.compressionCodec = codec;
+            this.taskAttemptContext = context;
+        }
+
+        @Override
+        public void write(Text record, V n) throws IOException, InterruptedException {
+            String key = record.toString();
+            if (!key.equals(currentKey)) {
+                if (currentWriter != null) {
+                    currentWriter.close(taskAttemptContext);
+                }
+
+                currentKey = key;
+                Path dir = new Path(basePath, key);
+                FileSystem fs = dir.getFileSystem(conf);
+                if (!fs.exists(dir)) {
+                    fs.mkdirs(dir);
+                }
+
+                Path filePath = new Path(dir, uniqueFileName);
+
+                DataOutputStream dataOutputStream;
+
+                if (fs.exists(filePath)) {
+                    dataOutputStream = fs.append(filePath);
+                } else {
+                    dataOutputStream = fs.create(filePath);
+                }
+
+                if (isCompressed && compressionCodec != null) {
+                    dataOutputStream = new DataOutputStream(compressionCodec.createOutputStream(dataOutputStream));
+                }
+
+                String keyValueSeparator = conf.get(SEPERATOR, "\t");
+                currentWriter = new LineRecordWriter<>(dataOutputStream, keyValueSeparator);
+            }
+
+            currentWriter.write(n, NullWritable.get());
+        }
+
+        @Override
+        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+            if (currentWriter != null) {
+                currentWriter.close(taskAttemptContext);
+                currentKey = null;
+                currentWriter = null;
+            }
+        }
+    }
+}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyTarget.java
new file mode 100644
index 0000000..574651c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextPathPerKeyTarget.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.text;
+
+import com.google.common.collect.Maps;
+import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TextPathPerKeyTarget extends FileTargetImpl {
+
+    private Map<String, String> extraConf = Maps.newHashMap();
+
+    private static final Logger LOG = LoggerFactory.getLogger(TextPathPerKeyTarget.class);
+
+    public TextPathPerKeyTarget(String path) {
+        this(new Path(path));
+    }
+
+    public TextPathPerKeyTarget(Path path) {
+        this(path, SequentialFileNamingScheme.getInstance());
+    }
+
+    public TextPathPerKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
+        super(path, TextPathPerKeyOutputFormat.class, fileNamingScheme);
+    }
+
+    @Override
+    public boolean accept(OutputHandler handler, PType<?> ptype) {
+        if (ptype instanceof PTableType && ptype.getFamily() == WritableTypeFamily.getInstance()) {
+            if (String.class.equals(((PTableType) ptype).getKeyType().getTypeClass())) {
+                handler.configure(this, ptype);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public Target outputConf(String key, String value) {
+        extraConf.put(key, value);
+        return this;
+    }
+
+    @Override
+    public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+        FormatBundle bundle = FormatBundle.forOutput(TextPathPerKeyOutputFormat.class);
+        for (Map.Entry<String, String> e : extraConf.entrySet()) {
+            bundle.set(e.getKey(), e.getValue());
+        }
+
+        Converter converter = ((PTableType) ptype).getValueType().getConverter();
+        Class valueClass = converter.getValueClass();
+        configureForMapReduce(job, valueClass, NullWritable.class, bundle, outputPath, name);
+    }
+
+    @Override
+    public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
+        FileSystem srcFs = workingPath.getFileSystem(conf);
+        Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
+        if (!srcFs.exists(base)) {
+            LOG.warn("Nothing to copy from {}", base);
+            return;
+        }
+
+        FileSystem dstFs = path.getFileSystem(conf);
+        if (!dstFs.exists(path)) {
+            dstFs.mkdirs(path);
+        }
+
+        boolean sameFs = isCompatible(srcFs, path);
+        move(conf, base, srcFs, path, dstFs, sameFs);
+        dstFs.create(getSuccessIndicator(), true).close();
+    }
+
+    private void move(Configuration conf, Path srcBase, FileSystem srcFs, Path dstBase, FileSystem dstFs, boolean sameFs)
+            throws IOException {
+        Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(srcBase));
+        if (!dstFs.exists(dstBase)) {
+            dstFs.mkdirs(dstBase);
+        }
+        for (Path key : keys) {
+            Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(key), key);
+            Path targetPath = new Path(dstBase, key.getName());
+            dstFs.mkdirs(targetPath);
+            for (Path s : srcs) {
+                if (srcFs.isDirectory(s)) {
+                    move(conf, key, srcFs, targetPath, dstFs, sameFs);
+                } else {
+                    Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-"));
+                    if (sameFs) {
+                        srcFs.rename(s, d);
+                    } else {
+                        FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "TextFilePerKey(" + path + ")";
+    }
+}