PIG-5357: BagFactory interface should support creating a distinct bag from a set (jtolar via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1843210 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index a59fbef..d95e0c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5357: BagFactory interface should support creating a distinct bag from a set (jtolar via rohini)
+
 PIG-5354: Show fieldname and a line number for casting errors (knoguchi)
 
 PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)
diff --git a/src/org/apache/pig/data/BagFactory.java b/src/org/apache/pig/data/BagFactory.java
index b2a24e7..6f3c5a5 100644
--- a/src/org/apache/pig/data/BagFactory.java
+++ b/src/org/apache/pig/data/BagFactory.java
@@ -23,6 +23,7 @@
 import java.net.URLClassLoader;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -127,6 +128,21 @@
     public abstract DataBag newDistinctBag();
 
     /**
+     * Get a distinct data bag.  Distinct bags guarantee that when an
+     * iterator is opened on the bag, no two tuples returned from the
+     * iterator will be equal.
+     * @param tuples distinct set of tuples
+     * @return distinct data bag
+     */
+    public DataBag newDistinctBag(Set<Tuple> tuples) {
+        DataBag bag = newDistinctBag();
+        for (Tuple t : tuples) {
+            bag.add(t);
+        }
+        return bag;
+    }
+
+    /**
      * Construct a new BagFactory
      */
     protected BagFactory() {
diff --git a/src/org/apache/pig/data/DefaultBagFactory.java b/src/org/apache/pig/data/DefaultBagFactory.java
index e160a05..fdbcceb 100644
--- a/src/org/apache/pig/data/DefaultBagFactory.java
+++ b/src/org/apache/pig/data/DefaultBagFactory.java
@@ -19,6 +19,7 @@
 
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Default implementation of BagFactory.
@@ -76,6 +77,21 @@
         return b;
     }
 
+    /**
+     * Get a distinct data bag.
+     * @param tuples Distinct set of tuples used to initialize the bag.
+     * If null, an empty bag is returned.
+     */
+    @Override
+    public DataBag newDistinctBag(Set<Tuple> tuples) {
+        if (tuples == null) {
+            return newDistinctBag();
+        }
+
+        DataBag b = new DistinctDataBag(tuples);
+        return b;
+    }
+
     DefaultBagFactory() {
         super();
     }
diff --git a/src/org/apache/pig/data/DistinctDataBag.java b/src/org/apache/pig/data/DistinctDataBag.java
index 36284ea..1fa980b 100644
--- a/src/org/apache/pig/data/DistinctDataBag.java
+++ b/src/org/apache/pig/data/DistinctDataBag.java
@@ -32,6 +32,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
+import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
@@ -63,6 +64,13 @@
         mContents = new HashSet<Tuple>();
     }
 
+    public DistinctDataBag(Set<Tuple> tuples) {
+        mContents = tuples;
+
+        mSize = mContents.size();
+        markSpillableIfNecessary();
+    }
+
     @Override
     public boolean isSorted() {
         return false;
@@ -227,7 +235,7 @@
         DistinctDataBagIterator() {
             // If this is the first read, we need to sort the data.
             synchronized (mContents) {
-                if (mContents instanceof HashSet) {
+                if (mContents instanceof Set) {
                     preMerge();
                     // We're the first reader, we need to sort the data.
                     // This is in case it gets dumped under us.