Merge branch 'master' of github.com:linkedin/datafu
diff --git a/README.md b/README.md
index ece78b1..3bfeafc 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,11 @@
# DataFu
-DataFu is a collection of user-defined functions for working with large-scale data in Hadoop and Pig. This library was born out of the need for a stable, well-tested library of UDFs for data mining and statistics. It is used at LinkedIn in many of our off-line workflows for data derived products like "People You May Know" and "Skills & Endorsements". It contains functions for:
+[DataFu](http://data.linkedin.com/opensource/datafu) is a collection of user-defined functions for working with large-scale data in Hadoop and Pig. This library was born out of the need for a stable, well-tested library of UDFs for data mining and statistics. It is used at LinkedIn in many of our off-line workflows for data derived products like "People You May Know" and "Skills & Endorsements". It contains functions for:
* PageRank
* Quantiles (median), variance, etc.
* Sessionization
+* Variance
* Convenience bag functions (e.g., set operations, enumerating bags, etc)
* Convenience utility functions (e.g., assertions, easier writing of
EvalFuncs)
@@ -12,15 +13,13 @@
Each function is unit tested and code coverage is being tracked for the entire library. It has been tested against Pig 0.10.
-[http://data.linkedin.com/opensource/datafu](http://data.linkedin.com/opensource/datafu)
-
## What can you do with it?
Here's a taste of what you can do in Pig.
### Statistics
-Compute the [median](http://en.wikipedia.org/wiki/Median):
+Compute the [median](http://en.wikipedia.org/wiki/Median) with the [Median UDF](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/stats/Median.html):
define Median datafu.pig.stats.StreamingMedian();
@@ -31,7 +30,7 @@
-- produces median of 3
medians = FOREACH grouped GENERATE Median(sorted.val);
-Similarly, compute any arbitrary [quantiles](http://en.wikipedia.org/wiki/Quantile):
+Similarly, compute any arbitrary [quantiles](http://en.wikipedia.org/wiki/Quantile) with [StreamingQuantile](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/stats/StreamingQuantile.html):
define Quantile datafu.pig.stats.StreamingQuantile('0.0','0.5','1.0');
@@ -42,7 +41,7 @@
-- produces: (1,5.5,10)
quantiles = FOREACH grouped GENERATE Quantile(sorted.val);
-Or how about the [variance](http://en.wikipedia.org/wiki/Variance):
+Or how about the [variance](http://en.wikipedia.org/wiki/Variance) using [VAR](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/stats/VAR.html):
define VAR datafu.pig.stats.VAR();
@@ -55,7 +54,7 @@
### Set Operations
-Treat sorted bags as sets and compute their intersection:
+Treat sorted bags as sets and compute their intersection with [SetIntersect](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/sets/SetIntersect.html):
define SetIntersect datafu.pig.bags.sets.SetIntersect();
@@ -69,7 +68,7 @@
GENERATE SetIntersect(sorted_b1,sorted_b2);
}
-Compute the set union:
+Compute the set union with [SetUnion](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/sets/SetUnion.html):
define SetUnion datafu.pig.bags.sets.SetUnion();
@@ -85,7 +84,7 @@
### Bag operations
-Concatenate two or more bags:
+Concatenate two or more bags with [BagConcat](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/BagConcat.html):
define BagConcat datafu.pig.bags.BagConcat();
@@ -95,7 +94,7 @@
-- ({(1),(2),(3),(4),(5),(6),(7)})
output = FOREACH input GENERATE BagConcat(B1,B2,B3);
-Append a tuple to a bag:
+Append a tuple to a bag with [AppendToBag](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/AppendToBag.html):
define AppendToBag datafu.pig.bags.AppendToBag();
@@ -107,7 +106,7 @@
### PageRank
-Run PageRank on a large number of independent graphs:
+Run PageRank on a large number of independent graphs through the [PageRank UDF](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/linkanalysis/PageRank.html):
define PageRank datafu.pig.linkanalysis.PageRank('dangling_nodes','true');
@@ -136,18 +135,18 @@
If you are using Ivy:
- <dependency org="com.linkedin.datafu" name="datafu" rev="0.0.4"/>
+ <dependency org="com.linkedin.datafu" name="datafu" rev="0.0.6"/>
If you are using Maven:
<dependency>
<groupId>com.linkedin.datafu</groupId>
<artifactId>datafu</artifactId>
- <version>0.0.4</version>
+ <version>0.0.6</version>
</dependency>
-
-Or you can download one of the packages from the [downloads](https://github.com/linkedin/datafu/downloads) section.
+Or [download](https://github.com/linkedin/datafu/archive/master.zip) the code.
+
## Working with the source code
Here are some common tasks when working with the source code.
diff --git a/build.xml b/build.xml
index 6598c87..d731a1f 100644
--- a/build.xml
+++ b/build.xml
@@ -91,7 +91,7 @@
uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
</target>
- <target name="ivy-resolve" depends="ivy-taskdef" description="retreive dependencies with ivy">
+ <target name="ivy-resolve" depends="ivy-taskdef" description="retrieve dependencies with ivy">
<ivy:retrieve/>
</target>
diff --git a/ivy/pom-template.xml b/ivy/pom-template.xml
index ec4a059..d7cc079 100644
--- a/ivy/pom-template.xml
+++ b/ivy/pom-template.xml
@@ -6,7 +6,7 @@
<name>DataFu</name>
<version>@version</version>
<description>A collection of user-defined functions for working with large-scale data in Hadoop and Pig.</description>
- <url>http://sna-projects.com/datafu/</url>
+ <url>http://data.linkedin.com/opensource/datafu</url>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
diff --git a/src/java/datafu/pig/bags/sets/SetIntersect.java b/src/java/datafu/pig/bags/sets/SetIntersect.java
index e091608..13321a4 100644
--- a/src/java/datafu/pig/bags/sets/SetIntersect.java
+++ b/src/java/datafu/pig/bags/sets/SetIntersect.java
@@ -1,19 +1,19 @@
/*
* Copyright 2010 LinkedIn, Inc
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
-
+
package datafu.pig.bags.sets;
import java.io.IOException;
@@ -32,7 +32,7 @@
* <pre>
* {@code
* define SetIntersect datafu.pig.bags.sets.SetIntersect();
- *
+ *
* -- input:
* -- ({(1,10),(2,20),(3,30),(4,40)},{(2,20),(4,40),(8,80)})
* input = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
@@ -50,9 +50,8 @@
public class SetIntersect extends SetOperationsBase
{
private static final BagFactory bagFactory = BagFactory.getInstance();
- private static final TupleFactory tupleFactory = TupleFactory.getInstance();
- class pair implements Comparable<pair>
+ static class pair implements Comparable<pair>
{
final Iterator<Tuple> it;
Tuple data;
@@ -62,7 +61,7 @@
this.it = it;
this.data = it.next();
}
-
+
@Override
public int compareTo(pair o)
{
@@ -78,8 +77,9 @@
Object o = input.get(i);
if (!(o instanceof DataBag))
throw new RuntimeException("parameters must be databags");
- DataBag inputBag = (DataBag) o;
- pq.add(new pair(inputBag.iterator()));
+ Iterator<Tuple> inputIterator= ((DataBag) o).iterator();
+ if(inputIterator.hasNext())
+ pq.add(new pair(inputIterator));
}
return pq;
}
@@ -93,20 +93,22 @@
}
return true;
}
-
+
@Override
public DataBag exec(Tuple input) throws IOException
{
DataBag outputBag = bagFactory.newDefaultBag();
PriorityQueue<pair> pq = load_bags(input);
+ if(pq.size() != input.size())
+ return outputBag; // one or more input bags were empty
Tuple last_data = null;
-
+
while (true) {
if (pq.peek().data.compareTo(last_data) != 0 && all_equal(pq)) {
last_data = pq.peek().data;
outputBag.add(last_data);
}
-
+
pair p = pq.poll();
if (!p.it.hasNext())
break;
diff --git a/src/java/datafu/pig/bags/sets/SetUnion.java b/src/java/datafu/pig/bags/sets/SetUnion.java
index 2143423..293eba7 100644
--- a/src/java/datafu/pig/bags/sets/SetUnion.java
+++ b/src/java/datafu/pig/bags/sets/SetUnion.java
@@ -51,8 +51,7 @@
@Override
public DataBag exec(Tuple input) throws IOException
{
- Set<Object> seen = new HashSet<Object>();
- DataBag outputBag = bagFactory.newDefaultBag();
+ DataBag outputBag = bagFactory.newDistinctBag();
try {
for (int i=0; i < input.size(); i++) {
@@ -62,10 +61,7 @@
DataBag inputBag = (DataBag) o;
for (Tuple elem : inputBag) {
- if (!seen.contains(elem)) {
- outputBag.add(elem);
- seen.add(elem);
- }
+ outputBag.add(elem);
}
}
diff --git a/src/java/datafu/pig/util/SimpleEvalFunc.java b/src/java/datafu/pig/util/SimpleEvalFunc.java
index 4a3be05..8e1437b 100644
--- a/src/java/datafu/pig/util/SimpleEvalFunc.java
+++ b/src/java/datafu/pig/util/SimpleEvalFunc.java
@@ -21,7 +21,11 @@
import java.lang.reflect.Type;
import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
/**
Uses reflection to makes writing simple wrapper Pig UDFs easier.
@@ -173,5 +177,48 @@
throw new IOException(String.format("%s: caught exception processing input.", _method_signature()), e);
}
}
+
+ /**
+ * Override outputSchema so we can verify the input schema at pig compile time, instead of runtime
+ * @param inputSchema input schema
+ * @return call to super.outputSchema in case schema was defined elsewhere
+ */
+ @Override
+ public Schema outputSchema(Schema inputSchema)
+ {
+ if (inputSchema == null) {
+ throw new IllegalArgumentException(String.format("%s: null schema passed to %s", _method_signature(), getClass().getName()));
+ }
+
+ // check correct number of arguments
+ Class parameterTypes[] = m.getParameterTypes();
+ if (inputSchema.size() != parameterTypes.length) {
+ throw new IllegalArgumentException(String.format("%s: got %d arguments, expected %d.",
+ _method_signature(),
+ inputSchema.size(),
+ parameterTypes.length));
+ }
+
+ // check type for each argument
+ for (int i=0; i < parameterTypes.length; i++) {
+ try {
+ byte inputType = inputSchema.getField(i).type;
+ byte parameterType = DataType.findType(parameterTypes[i]);
+ if (inputType != parameterType) {
+ throw new IllegalArgumentException(String.format("%s: argument type mismatch [#%d]; expected %s, got %s",
+ _method_signature(),
+ i+1,
+ DataType.findTypeName(parameterType),
+ DataType.findTypeName(inputType)));
+ }
+ }
+ catch (FrontendException fe) {
+ throw new IllegalArgumentException(String.format("%s: Problem with input schema: ", _method_signature(), inputSchema), fe);
+ }
+ }
+
+ // delegate to super to determine the actual outputSchema (if specified)
+ return super.outputSchema(inputSchema);
+ }
}
diff --git a/test/pig/datafu/test/pig/bags/sets/SetTests.java b/test/pig/datafu/test/pig/bags/sets/SetTests.java
index b28dfd2..32e24d5 100644
--- a/test/pig/datafu/test/pig/bags/sets/SetTests.java
+++ b/test/pig/datafu/test/pig/bags/sets/SetTests.java
@@ -1,8 +1,16 @@
package datafu.test.pig.bags.sets;
+import java.util.Arrays;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
import org.testng.annotations.Test;
+import datafu.pig.bags.sets.SetIntersect;
import datafu.test.pig.PigTests;
public class SetTests extends PigTests
@@ -25,6 +33,16 @@
test.assertOutput("data",input,"data2",output);
}
+ @Test
+ public void testIntersectWithNullTuples() throws Exception {
+ DataBag one = BagFactory.getInstance().newDefaultBag();
+ DataBag two = BagFactory.getInstance().newDefaultBag();
+
+ Tuple input = TupleFactory.getInstance().newTuple(Arrays.asList(one, two));
+ DataBag output = new SetIntersect().exec(input);
+ Assert.assertEquals(0, output.size());
+ }
+
@Test(expectedExceptions=org.apache.pig.impl.logicalLayer.FrontendException.class)
public void setIntersectOutOfOrderTest() throws Exception
{
@@ -48,9 +66,9 @@
};
String[] output = {
- "({(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80),(1,1),(1,25),(1,70)})"
+ "({(1,1),(1,10),(1,20),(1,25),(1,30),(1,40),(1,50),(1,60),(1,70),(1,80)})"
};
- test.assertOutput("data",input,"data2",output);
+ test.assertOutput("data", input, "data2", output);
}
}
diff --git a/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig b/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig
index a5e1c4d..1b0e23b 100644
--- a/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig
+++ b/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig
@@ -6,7 +6,11 @@
dump data
-data2 = FOREACH data GENERATE SetUnion(B1,B2);
+data2 = FOREACH data GENERATE SetUnion(B1,B2) AS C;
+data2 = FOREACH data2 {
+ C = ORDER C BY val1 ASC, val2 ASC;
+ generate C;
+}
dump data2