CRUNCH-242: Control the input/output conversion via the Source and Target interfaces
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 6ba5e06..83f509f 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -21,6 +21,7 @@
import java.sql.Driver;
import org.apache.crunch.Source;
+import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
@@ -123,4 +124,8 @@
return ptype;
}
+ @Override
+ public Converter<?, ?, ?, ?> getConverter() {
+ return ptype.getConverter();
+ }
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index b744c8f..b0a0449 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -19,6 +19,7 @@
import java.io.IOException;
+import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
@@ -35,6 +36,12 @@
PType<T> getType();
/**
+ * Returns the {@code Converter} used for mapping the inputs from this instance
+ * into {@code PCollection} or {@code PTable} values.
+ */
+ Converter<?, ?, ?, ?> getConverter();
+
+ /**
* Configure the given job to use this source as an input.
*
* @param job
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 48dc2cd..65ad67d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -18,6 +18,7 @@
package org.apache.crunch;
import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
@@ -80,6 +81,15 @@
boolean accept(OutputHandler handler, PType<?> ptype);
/**
+ * Returns the {@code Converter} to use for mapping from the output {@code PCollection}
+ * into the output values expected by this instance.
+ *
+ * @param ptype The {@code PType} of the data that is being written to this instance
+ * @return A valid {@code Converter} for the output represented by this instance
+ */
+ Converter<?, ?, ?, ?> getConverter(PType<?> ptype);
+
+ /**
* Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
* for the given {@code PType}, if possible. If it is not possible, return {@code null}.
*
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 2d6d590..87d0a5b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -63,8 +63,7 @@
return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null);
}
- public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
- Converter outputConverter = ptype.getConverter();
+ public static DoNode createOutputNode(String name, Converter outputConverter, PType<?> ptype) {
DoFn<?, ?> fn = ptype.getOutputMapFn();
return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null);
}
@@ -135,7 +134,7 @@
Converter inputConverter = null;
if (inputNode) {
if (nodeContext == NodeContext.MAP) {
- inputConverter = ptype.getConverter();
+ inputConverter = source.getConverter();
} else {
inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index da13611..c733323 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -36,6 +36,7 @@
import org.apache.crunch.impl.mr.run.CrunchReducer;
import org.apache.crunch.impl.mr.run.NodeContext;
import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.PType;
import org.apache.crunch.util.DistCache;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -149,8 +150,8 @@
DoNode node = null;
for (NodePath nodePath : targetsToNodePaths.get(target)) {
if (node == null) {
- PCollectionImpl<?> collect = nodePath.tail();
- node = DoNode.createOutputNode(target.toString(), collect.getPType());
+ PType<?> ptype = nodePath.tail().getPType();
+ node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype);
outputHandler.configureNode(node, target);
}
outputNodes.add(walkPath(nodePath.descendingIterator(), node));
@@ -163,8 +164,8 @@
DoNode node = null;
for (NodePath nodePath : mapSideNodePaths.get(target)) {
if (node == null) {
- PCollectionImpl<?> collect = nodePath.tail();
- node = DoNode.createOutputNode(target.toString(), collect.getPType());
+ PType<?> ptype = nodePath.tail().getPType();
+ node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype);
outputHandler.configureNode(node, target);
}
mapSideNodes.add(walkPath(nodePath.descendingIterator(), node));
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index b232abb..13645ba 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -31,6 +31,7 @@
import org.apache.crunch.io.FileReaderFactory;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -84,6 +85,11 @@
}
@Override
+ public Converter<?, ?, ?, ?> getConverter() {
+ return ptype.getConverter();
+ }
+
+ @Override
public void configureSource(Job job, int inputId) throws IOException {
if (inputId == -1) {
for (Path path : paths) {
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 07c63df..cbd87e3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -86,6 +86,12 @@
return true;
}
+ @Override
+ public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+ return ptype.getConverter();
+ }
+
+ @Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
Path src = getSourcePattern(workingPath, index);
@@ -254,4 +260,5 @@
}
return exists;
}
+
}
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 5dd4d69..68c9430 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -24,6 +24,7 @@
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
@@ -94,4 +95,14 @@
public long getLastModifiedAt(Configuration configuration) {
return source.getLastModifiedAt(configuration);
}
+
+ @Override
+ public Converter<?, ?, ?, ?> getConverter() {
+ return source.getConverter();
+ }
+
+ @Override
+ public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+ return target.getConverter(ptype);
+ }
}
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
index 7963c83..0a30fa4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -30,7 +30,7 @@
public void testBuild() {
final String pipelineName = "PipelineName";
final String nodeName = "outputNode";
- DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings());
+ DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings().getConverter(), Writables.strings());
JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
jobNameBuilder.visit(Lists.newArrayList(doNode));
String jobName = jobNameBuilder.build();
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 6a5a124..2f5a160 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -30,6 +30,7 @@
import org.apache.crunch.impl.mr.run.CrunchMapper;
import org.apache.crunch.io.CrunchInputs;
import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
@@ -130,4 +131,9 @@
LOG.warn("Cannot determine last modified time for source: " + toString());
return -1;
}
+
+ @Override
+ public Converter<?, ?, ?, ?> getConverter() {
+ return PTYPE.getConverter();
+ }
}
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 83d62c8..69a260e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -28,6 +28,7 @@
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.MapReduceTarget;
import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -120,4 +121,9 @@
LOG.info("HBaseTarget ignores checks for existing outputs...");
return false;
}
+
+ @Override
+ public Converter<?, ?, ?, ?> getConverter(final PType<?> ptype) {
+ return ptype.getConverter();
+ }
}