create a transpose udf and add tests
diff --git a/src/java/datafu/pig/util/Transpose.java b/src/java/datafu/pig/util/Transpose.java
new file mode 100644
index 0000000..34ce29d
--- /dev/null
+++ b/src/java/datafu/pig/util/Transpose.java
@@ -0,0 +1,94 @@
+package datafu.pig.util;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+/**
+ * Performs a transpose on a tuple, resulting in a bag of key, value fields where
+ * the key is the column name and the value is the value of that column in the tuple.
+ *
+ * <p>
+ * Example:
+ * <pre>
+ * {@code
+ *
+ * define Transpose datafu.pig.util.Transpose();
+
+ * -- input: 1,10,11,12
+ * input = LOAD 'input' AS (id:int,val1:int,val2:int,val3:int);
+ *
+ * -- produces: 1,{("val1",10),("val2",11),("val3",12)}
+ * output = FOREACH input GENERATE id, Transpose(val1 .. val3);
+ *
+ * }
+ * </pre>
+ * </p>
+ *
+ * @author "William Vaughan <wvaughan@linkedin.com>"
+ *
+ */
+public class Transpose extends AliasableEvalFunc<DataBag>
+{
+ private final String TRANSPOSE_TYPE = "TRANSPOSE_TYPE";
+
+ @Override
+ public Schema getOutputSchema(Schema input)
+ {
+ try
+ {
+ // require that every field in the input has the same type
+ Byte type = null;
+ for (FieldSchema fieldSchema : input.getFields()) {
+ if (type == null) {
+ type = fieldSchema.type;
+ } else {
+ if (type != fieldSchema.type) {
+ throw new RuntimeException(
+ String.format("Expected all input types to match. Got both %s and %s.",
+ DataType.findTypeName(type.byteValue()), DataType.findTypeName(fieldSchema.type)));
+ }
+ }
+ }
+ getInstanceProperties().put(TRANSPOSE_TYPE, type);
+
+ Schema outputTupleSchema = new Schema();
+ outputTupleSchema.add(new Schema.FieldSchema("key", DataType.CHARARRAY));
+ outputTupleSchema.add(new Schema.FieldSchema("value", type));
+ return new Schema(new Schema.FieldSchema(
+ getSchemaName(this.getClass().getName().toLowerCase(), input),
+ outputTupleSchema,
+ DataType.BAG));
+ }
+ catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public DataBag exec(Tuple input) throws IOException
+ {
+ // initialize a reverse mapping
+ HashMap<Integer, String> positionToAlias = new HashMap<Integer, String>();
+ for (String alias : getFieldAliases().keySet()) {
+ positionToAlias.put(getFieldAliases().get(alias), alias);
+ }
+ DataBag output = BagFactory.getInstance().newDefaultBag();
+ for (int i=0; i<input.size(); i++) {
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+ tuple.append(positionToAlias.get(i));
+ tuple.append(input.get(i));
+ output.add(tuple);
+ }
+ return output;
+ }
+
+}
diff --git a/test/pig/datafu/test/pig/util/TransposeTest.java b/test/pig/datafu/test/pig/util/TransposeTest.java
new file mode 100644
index 0000000..ef0848e
--- /dev/null
+++ b/test/pig/datafu/test/pig/util/TransposeTest.java
@@ -0,0 +1,97 @@
+package datafu.test.pig.util;
+
+import java.util.List;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import datafu.test.pig.PigTests;
+
+public class TransposeTest extends PigTests
+{
+ /**
+ register $JAR_PATH
+
+ define Transpose datafu.pig.util.Transpose();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:INT);
+
+ data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, transposed;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String transposeTest;
+
+ @Test
+ public void transposeTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(transposeTest);
+ writeLinesToFile("input", "1,10,11,12",
+ "2,20,21,22");
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data3");
+ for (Tuple tuple : output) {
+ int testCase = (Integer)tuple.get(0);
+ DataBag bag = (DataBag)tuple.get(1);
+ Assert.assertEquals(bag.size(), 3);
+ int i=0;
+ for (Tuple t : bag) {
+ String expectedKey = String.format("val%d",i+1);
+ Assert.assertEquals((String)t.get(0), expectedKey);
+ int actualValue = (Integer)t.get(1);
+ Assert.assertEquals(actualValue, testCase*10+i);
+ i++;
+ }
+ }
+ }
+
+ /**
+ register $JAR_PATH
+
+ define Transpose datafu.pig.util.Transpose();
+
+ data = LOAD 'input' using PigStorage(',') AS (testcase:INT,val1:INT,val2:INT,val3:DOUBLE);
+
+ data2 = FOREACH data GENERATE testcase, Transpose(val1 .. val3) as transposed;
+
+ describe data2;
+
+ data3 = FOREACH data2 GENERATE testcase, transposed;
+
+ STORE data3 INTO 'output';
+ */
+ @Multiline private static String transposeBadTypeTest;
+
+ @Test(expectedExceptions={RuntimeException.class})
+ public void transposeBadTypeTest() throws Exception
+ {
+ PigTest test = createPigTestFromString(transposeBadTypeTest);
+ writeLinesToFile("input", "1,10,11,12.0",
+ "2,20,21,22.0");
+ test.runScript();
+
+ List<Tuple> output = getLinesForAlias(test, "data3");
+ for (Tuple tuple : output) {
+ int testCase = (Integer)tuple.get(0);
+ DataBag bag = (DataBag)tuple.get(1);
+ Assert.assertEquals(bag.size(), 3);
+ int i=0;
+ for (Tuple t : bag) {
+ String expectedKey = String.format("val%d",i+1);
+ Assert.assertEquals((String)t.get(0), expectedKey);
+ int actualValue = (Integer)t.get(1);
+ Assert.assertEquals(actualValue, testCase*10+i);
+ i++;
+ }
+ }
+ }
+}