CRUNCH-242: Control the input/output conversion via the Source and Target interfaces
diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
index 6ba5e06..83f509f 100644
--- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
+++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java
@@ -21,6 +21,7 @@
 import java.sql.Driver;
 
 import org.apache.crunch.Source;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
 import org.apache.hadoop.conf.Configuration;
@@ -123,4 +124,8 @@
     return ptype;
   }
 
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return ptype.getConverter();
+  }
 }
diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java
index b744c8f..b0a0449 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Source.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Source.java
@@ -19,6 +19,7 @@
 
 import java.io.IOException;
 
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
@@ -35,6 +36,12 @@
   PType<T> getType();
 
   /**
+   * Returns the {@code Converter} used for mapping the inputs from this instance
+   * into {@code PCollection} or {@code PTable} values.
+   */
+  Converter<?, ?, ?, ?> getConverter();
+  
+  /**
    * Configure the given job to use this source as an input.
    * 
    * @param job
diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java
index 48dc2cd..65ad67d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Target.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Target.java
@@ -18,6 +18,7 @@
 package org.apache.crunch;
 
 import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 
@@ -80,6 +81,15 @@
   boolean accept(OutputHandler handler, PType<?> ptype);
 
   /**
+   * Returns the {@code Converter} to use for mapping from the output {@code PCollection}
+   * into the output values expected by this instance.
+   * 
+   * @param ptype The {@code PType} of the data that is being written to this instance
+   * @return A valid {@code Converter} for the output represented by this instance
+   */
+  Converter<?, ?, ?, ?> getConverter(PType<?> ptype);
+  
+  /**
    * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
    * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
    * 
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
index 2d6d590..87d0a5b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java
@@ -63,8 +63,7 @@
     return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null);
   }
 
-  public static <S> DoNode createOutputNode(String name, PType<S> ptype) {
-    Converter outputConverter = ptype.getConverter();
+  public static DoNode createOutputNode(String name, Converter outputConverter, PType<?> ptype) {
     DoFn<?, ?> fn = ptype.getOutputMapFn();
     return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null);
   }
@@ -135,7 +134,7 @@
     Converter inputConverter = null;
     if (inputNode) {
       if (nodeContext == NodeContext.MAP) {
-        inputConverter = ptype.getConverter();
+        inputConverter = source.getConverter();
       } else {
         inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter();
       }
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index da13611..c733323 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -36,6 +36,7 @@
 import org.apache.crunch.impl.mr.run.CrunchReducer;
 import org.apache.crunch.impl.mr.run.NodeContext;
 import org.apache.crunch.impl.mr.run.RTNode;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -149,8 +150,8 @@
       DoNode node = null;
       for (NodePath nodePath : targetsToNodePaths.get(target)) {
         if (node == null) {
-          PCollectionImpl<?> collect = nodePath.tail();
-          node = DoNode.createOutputNode(target.toString(), collect.getPType());
+          PType<?> ptype = nodePath.tail().getPType();
+          node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype);
           outputHandler.configureNode(node, target);
         }
         outputNodes.add(walkPath(nodePath.descendingIterator(), node));
@@ -163,8 +164,8 @@
         DoNode node = null;
         for (NodePath nodePath : mapSideNodePaths.get(target)) {
           if (node == null) {
-            PCollectionImpl<?> collect = nodePath.tail();
-            node = DoNode.createOutputNode(target.toString(), collect.getPType());
+            PType<?> ptype = nodePath.tail().getPType();
+            node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype);
             outputHandler.configureNode(node, target);
           }
           mapSideNodes.add(walkPath(nodePath.descendingIterator(), node));
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index b232abb..13645ba 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -31,6 +31,7 @@
 import org.apache.crunch.io.FileReaderFactory;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -84,6 +85,11 @@
   }
   
   @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return ptype.getConverter();
+  }
+  
+  @Override
   public void configureSource(Job job, int inputId) throws IOException {
     if (inputId == -1) {
       for (Path path : paths) {
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 07c63df..cbd87e3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -86,6 +86,12 @@
     return true;
   }
 
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+    return ptype.getConverter();
+  }
+
+  @Override
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
     FileSystem srcFs = workingPath.getFileSystem(conf);
     Path src = getSourcePattern(workingPath, index);
@@ -254,4 +260,5 @@
     }
     return exists;
   }
+
 }
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 5dd4d69..68c9430 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -24,6 +24,7 @@
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
@@ -94,4 +95,14 @@
   public long getLastModifiedAt(Configuration configuration) {
     return source.getLastModifiedAt(configuration);
   }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return source.getConverter();
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
+    return target.getConverter(ptype);
+  }
 }
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
index 7963c83..0a30fa4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java
@@ -30,7 +30,7 @@
   public void testBuild() {
     final String pipelineName = "PipelineName";
     final String nodeName = "outputNode";
-    DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings());
+    DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings().getConverter(), Writables.strings());
     JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName);
     jobNameBuilder.visit(Lists.newArrayList(doNode));
     String jobName = jobNameBuilder.build();
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 6a5a124..2f5a160 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -30,6 +30,7 @@
 import org.apache.crunch.impl.mr.run.CrunchMapper;
 import org.apache.crunch.io.CrunchInputs;
 import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
@@ -130,4 +131,9 @@
     LOG.warn("Cannot determine last modified time for source: " + toString());
     return -1;
   }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return PTYPE.getConverter();
+  }
 }
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 83d62c8..69a260e 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -28,6 +28,7 @@
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.MapReduceTarget;
 import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -120,4 +121,9 @@
     LOG.info("HBaseTarget ignores checks for existing outputs...");
     return false;
   }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter(final PType<?> ptype) {
+    return ptype.getConverter();
+  }
 }