Merge branch 'master' of github.com:linkedin/datafu
diff --git a/README.md b/README.md
index ece78b1..3bfeafc 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,11 @@
 # DataFu
 
-DataFu is a collection of user-defined functions for working with large-scale data in Hadoop and Pig. This library was born out of the need for a stable, well-tested library of UDFs for data mining and statistics. It is used at LinkedIn in many of our off-line workflows for data derived products like "People You May Know" and "Skills & Endorsements". It contains functions for:
+[DataFu](http://data.linkedin.com/opensource/datafu) is a collection of user-defined functions for working with large-scale data in Hadoop and Pig. This library was born out of the need for a stable, well-tested library of UDFs for data mining and statistics. It is used at LinkedIn in many of our off-line workflows for data derived products like "People You May Know" and "Skills & Endorsements". It contains functions for:
 
 * PageRank
 * Quantiles (median), variance, etc.
 * Sessionization
+* Variance
 * Convenience bag functions (e.g., set operations, enumerating bags, etc)
 * Convenience utility functions (e.g., assertions, easier writing of
 EvalFuncs)
@@ -12,15 +13,13 @@
 
 Each function is unit tested and code coverage is being tracked for the entire library.  It has been tested against Pig 0.10.
 
-[http://data.linkedin.com/opensource/datafu](http://data.linkedin.com/opensource/datafu)
-
 ## What can you do with it?
 
 Here's a taste of what you can do in Pig.
 
 ### Statistics
   
-Compute the [median](http://en.wikipedia.org/wiki/Median):
+Compute the [median](http://en.wikipedia.org/wiki/Median) with the [Median UDF](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/stats/Median.html):
 
     define Median datafu.pig.stats.StreamingMedian();
 
@@ -31,7 +30,7 @@
     -- produces median of 3
     medians = FOREACH grouped GENERATE Median(sorted.val);
   
-Similarly, compute any arbitrary [quantiles](http://en.wikipedia.org/wiki/Quantile):
+Similarly, compute any arbitrary [quantiles](http://en.wikipedia.org/wiki/Quantile) with [StreamingQuantile](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/stats/StreamingQuantile.html):
 
     define Quantile datafu.pig.stats.StreamingQuantile('0.0','0.5','1.0');
 
@@ -42,7 +41,7 @@
     -- produces: (1,5.5,10)
     quantiles = FOREACH grouped GENERATE Quantile(sorted.val);
 
-Or how about the [variance](http://en.wikipedia.org/wiki/Variance):
+Or how about the [variance](http://en.wikipedia.org/wiki/Variance) using [VAR](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/stats/VAR.html):
 
     define VAR datafu.pig.stats.VAR();
 
@@ -55,7 +54,7 @@
  
 ### Set Operations
 
-Treat sorted bags as sets and compute their intersection:
+Treat sorted bags as sets and compute their intersection with [SetIntersect](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/sets/SetIntersect.html):
 
     define SetIntersect datafu.pig.bags.sets.SetIntersect();
   
@@ -69,7 +68,7 @@
       GENERATE SetIntersect(sorted_b1,sorted_b2);
     }
       
-Compute the set union:
+Compute the set union with [SetUnion](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/sets/SetUnion.html):
 
     define SetUnion datafu.pig.bags.sets.SetUnion();
 
@@ -85,7 +84,7 @@
 
 ### Bag operations
 
-Concatenate two or more bags:
+Concatenate two or more bags with [BagConcat](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/BagConcat.html):
 
     define BagConcat datafu.pig.bags.BagConcat();
 
@@ -95,7 +94,7 @@
     -- ({(1),(2),(3),(4),(5),(6),(7)})
     output = FOREACH input GENERATE BagConcat(B1,B2,B3);
 
-Append a tuple to a bag:
+Append a tuple to a bag with [AppendToBag](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/bags/AppendToBag.html):
 
     define AppendToBag datafu.pig.bags.AppendToBag();
 
@@ -107,7 +106,7 @@
 
 ### PageRank
 
-Run PageRank on a large number of independent graphs:
+Run PageRank on a large number of independent graphs through the [PageRank UDF](http://linkedin.github.com/datafu/docs/javadoc/datafu/pig/linkanalysis/PageRank.html):
 
     define PageRank datafu.pig.linkanalysis.PageRank('dangling_nodes','true');
 
@@ -136,18 +135,18 @@
 
 If you are using Ivy:
 
-    <dependency org="com.linkedin.datafu" name="datafu" rev="0.0.4"/>
+    <dependency org="com.linkedin.datafu" name="datafu" rev="0.0.6"/>
     
 If you are using Maven:
 
     <dependency>
       <groupId>com.linkedin.datafu</groupId>
       <artifactId>datafu</artifactId>
-      <version>0.0.4</version>
+      <version>0.0.6</version>
     </dependency>
-    
-Or you can download one of the packages from the [downloads](https://github.com/linkedin/datafu/downloads) section.    
 
+Or [download](https://github.com/linkedin/datafu/archive/master.zip) the code.
+    
 ## Working with the source code
 
 Here are some common tasks when working with the source code.
diff --git a/build.xml b/build.xml
index 6598c87..d731a1f 100644
--- a/build.xml
+++ b/build.xml
@@ -91,7 +91,7 @@
              uri="antlib:org.apache.ivy.ant" classpathref="ivy.lib.path"/>
   </target>
   
-  <target name="ivy-resolve" depends="ivy-taskdef" description="retreive dependencies with ivy">
+  <target name="ivy-resolve" depends="ivy-taskdef" description="retrieve dependencies with ivy">
     <ivy:retrieve/>
   </target>
   
diff --git a/ivy/pom-template.xml b/ivy/pom-template.xml
index ec4a059..d7cc079 100644
--- a/ivy/pom-template.xml
+++ b/ivy/pom-template.xml
@@ -6,7 +6,7 @@
   <name>DataFu</name>
   <version>@version</version>
   <description>A collection of user-defined functions for working with large-scale data in Hadoop and Pig.</description>
-  <url>http://sna-projects.com/datafu/</url>
+  <url>http://data.linkedin.com/opensource/datafu</url>
   <licenses>
     <license>
       <name>The Apache Software License, Version 2.0</name>
diff --git a/src/java/datafu/pig/bags/sets/SetIntersect.java b/src/java/datafu/pig/bags/sets/SetIntersect.java
index e091608..13321a4 100644
--- a/src/java/datafu/pig/bags/sets/SetIntersect.java
+++ b/src/java/datafu/pig/bags/sets/SetIntersect.java
@@ -1,19 +1,19 @@
 /*
  * Copyright 2010 LinkedIn, Inc
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not
  * use this file except in compliance with the License. You may obtain a copy of
  * the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and limitations under
  * the License.
  */
- 
+
 package datafu.pig.bags.sets;
 
 import java.io.IOException;
@@ -32,7 +32,7 @@
  * <pre>
  * {@code
  * define SetIntersect datafu.pig.bags.sets.SetIntersect();
- * 
+ *
  * -- input:
  * -- ({(1,10),(2,20),(3,30),(4,40)},{(2,20),(4,40),(8,80)})
  * input = LOAD 'input' AS (B1:bag{T:tuple(val1:int,val2:int)},B2:bag{T:tuple(val1:int,val2:int)});
@@ -50,9 +50,8 @@
 public class SetIntersect extends SetOperationsBase
 {
   private static final BagFactory bagFactory = BagFactory.getInstance();
-  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
 
-  class pair implements Comparable<pair>
+  static class pair implements Comparable<pair>
   {
     final Iterator<Tuple> it;
     Tuple data;
@@ -62,7 +61,7 @@
       this.it = it;
       this.data = it.next();
     }
-    
+
     @Override
     public int compareTo(pair o)
     {
@@ -78,8 +77,9 @@
       Object o = input.get(i);
       if (!(o instanceof DataBag))
         throw new RuntimeException("parameters must be databags");
-      DataBag inputBag = (DataBag) o;
-      pq.add(new pair(inputBag.iterator()));
+      Iterator<Tuple> inputIterator= ((DataBag) o).iterator();
+      if(inputIterator.hasNext())
+        pq.add(new pair(inputIterator));
     }
     return pq;
   }
@@ -93,20 +93,22 @@
     }
     return true;
   }
-  
+
   @Override
   public DataBag exec(Tuple input) throws IOException
   {
     DataBag outputBag = bagFactory.newDefaultBag();
     PriorityQueue<pair> pq = load_bags(input);
+    if(pq.size() != input.size())
+      return outputBag; // one or more input bags were empty
     Tuple last_data = null;
-    
+
     while (true) {
       if (pq.peek().data.compareTo(last_data) != 0 && all_equal(pq)) {
         last_data = pq.peek().data;
         outputBag.add(last_data);
       }
-            
+
       pair p = pq.poll();
       if (!p.it.hasNext())
         break;
diff --git a/src/java/datafu/pig/bags/sets/SetUnion.java b/src/java/datafu/pig/bags/sets/SetUnion.java
index 2143423..293eba7 100644
--- a/src/java/datafu/pig/bags/sets/SetUnion.java
+++ b/src/java/datafu/pig/bags/sets/SetUnion.java
@@ -51,8 +51,7 @@
   @Override
   public DataBag exec(Tuple input) throws IOException
   {
-    Set<Object> seen = new HashSet<Object>();
-    DataBag outputBag = bagFactory.newDefaultBag();
+    DataBag outputBag = bagFactory.newDistinctBag();
 
     try {
       for (int i=0; i < input.size(); i++) {
@@ -62,10 +61,7 @@
 
         DataBag inputBag = (DataBag) o;
         for (Tuple elem : inputBag) {
-          if (!seen.contains(elem)) {
-            outputBag.add(elem);
-            seen.add(elem);
-          }
+          outputBag.add(elem);
         }
       }
 
diff --git a/src/java/datafu/pig/util/SimpleEvalFunc.java b/src/java/datafu/pig/util/SimpleEvalFunc.java
index 4a3be05..8e1437b 100644
--- a/src/java/datafu/pig/util/SimpleEvalFunc.java
+++ b/src/java/datafu/pig/util/SimpleEvalFunc.java
@@ -21,7 +21,11 @@
 import java.lang.reflect.Type;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
 
 /**
   Uses reflection to makes writing simple wrapper Pig UDFs easier.
@@ -173,5 +177,48 @@
         throw new IOException(String.format("%s: caught exception processing input.", _method_signature()), e);
     }
   }
+
+  /**
+   * Override outputSchema so we can verify the input schema at pig compile time, instead of runtime
+   * @param inputSchema input schema
+   * @return call to super.outputSchema in case schema was defined elsewhere
+   */
+  @Override
+  public Schema outputSchema(Schema inputSchema)
+  {
+    if (inputSchema == null) {
+      throw new IllegalArgumentException(String.format("%s: null schema passed to %s", _method_signature(), getClass().getName()));
+    }
+
+    // check correct number of arguments
+    Class parameterTypes[] = m.getParameterTypes();
+    if (inputSchema.size() != parameterTypes.length) {
+      throw new IllegalArgumentException(String.format("%s: got %d arguments, expected %d.",
+                                                       _method_signature(),
+                                                       inputSchema.size(),
+                                                       parameterTypes.length));
+    }
+
+    // check type for each argument
+    for (int i=0; i < parameterTypes.length; i++) {
+      try {
+        byte inputType = inputSchema.getField(i).type;
+        byte parameterType = DataType.findType(parameterTypes[i]);
+        if (inputType != parameterType) {
+          throw new IllegalArgumentException(String.format("%s: argument type mismatch [#%d]; expected %s, got %s",
+                                                           _method_signature(),
+                                                           i+1,
+                                                           DataType.findTypeName(parameterType),
+                                                           DataType.findTypeName(inputType)));
+        }
+      }
+      catch (FrontendException fe) {
+        throw new IllegalArgumentException(String.format("%s: Problem with input schema: ", _method_signature(), inputSchema), fe);
+      }
+    }
+
+    // delegate to super to determine the actual outputSchema (if specified)
+    return super.outputSchema(inputSchema);
+  }
 }
 
diff --git a/test/pig/datafu/test/pig/bags/sets/SetTests.java b/test/pig/datafu/test/pig/bags/sets/SetTests.java
index b28dfd2..32e24d5 100644
--- a/test/pig/datafu/test/pig/bags/sets/SetTests.java
+++ b/test/pig/datafu/test/pig/bags/sets/SetTests.java
@@ -1,8 +1,16 @@
 package datafu.test.pig.bags.sets;
 
+import java.util.Arrays;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.pigunit.PigTest;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import datafu.pig.bags.sets.SetIntersect;
 import datafu.test.pig.PigTests;
 
 public class SetTests extends PigTests
@@ -25,6 +33,16 @@
     test.assertOutput("data",input,"data2",output);
   }
   
+  @Test
+  public void testIntersectWithNullTuples() throws Exception {
+     DataBag one = BagFactory.getInstance().newDefaultBag();
+     DataBag two = BagFactory.getInstance().newDefaultBag();
+
+     Tuple input = TupleFactory.getInstance().newTuple(Arrays.asList(one, two));
+     DataBag output = new SetIntersect().exec(input);
+     Assert.assertEquals(0, output.size());
+  }
+
   @Test(expectedExceptions=org.apache.pig.impl.logicalLayer.FrontendException.class)
   public void setIntersectOutOfOrderTest() throws Exception
   {
@@ -48,9 +66,9 @@
     };
     
     String[] output = {
-        "({(1,10),(1,20),(1,30),(1,40),(1,50),(1,60),(1,80),(1,1),(1,25),(1,70)})"
+        "({(1,1),(1,10),(1,20),(1,25),(1,30),(1,40),(1,50),(1,60),(1,70),(1,80)})"
       };
     
-    test.assertOutput("data",input,"data2",output);
+    test.assertOutput("data", input, "data2", output);
   }
 }
diff --git a/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig b/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig
index a5e1c4d..1b0e23b 100644
--- a/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig
+++ b/test/pig/datafu/test/pig/bags/sets/setUnionTest.pig
@@ -6,7 +6,11 @@
 
 dump data
 
-data2 = FOREACH data GENERATE SetUnion(B1,B2);
+data2 = FOREACH data GENERATE SetUnion(B1,B2) AS C;
+data2 = FOREACH data2 {
+  C = ORDER C BY val1 ASC, val2 ASC;
+  generate C;
+}
 
 dump data2