Changes in preparation to pig 0.1.1 release

git-svn-id: https://svn.apache.org/repos/asf/hadoop/pig/branches/branch-0.1@720324 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index decb129..a7873c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,30 @@
 Pig Change Log
 
-Release 0.1.0 - Unreleased
+Trunk (unreleased changes)
+
+  INCOMPATIBLE CHANGES
+
+  IMPROVEMENTS 
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+Release 0.1.1 - Unreleased
+
+INCOMPATIBLE CHANGES
+
+NEW FEATURES
+
+IMPROVEMENTS
+
+PIG-253: integration with hadoop-18
+
+BUG FIXES
+
+PIG-342: Fix DistinctDataBag to recalculate size after it has spilled. (bdimcheff via gates)
+
+Release 0.1.0 - - 2008-09-11
 
   INCOMPATIBLE CHANGES
 
@@ -342,3 +366,10 @@
 
     PIG-34: updated CHANGES.txt
 
+	PIG-472: Added RegExLoader to piggybank, an abstract loader class to parse
+	text files via regular espressions (spackest via gates)
+
+	PIG-473: Added CommonLogLoader, a subclass of RegExLoader to piggybank (spackest via gates)
+
+	PIG-474: Added MyRegexLoader, a subclass of RegExLoader, to piggybank (spackest via gates)
+
diff --git a/build.xml b/build.xml
index 5555249..269927b 100644
--- a/build.xml
+++ b/build.xml
@@ -25,7 +25,7 @@
     <!-- name and version properties -->
     <property name="name" value="pig" />
     <property name="Name" value="Pig" />
-    <property name="version" value="0.1.0-dev" />
+    <property name="version" value="0.2.0-dev" />
     <property name="final.name" value="${name}-${version}" />
 
     <!-- source properties -->
@@ -58,7 +58,7 @@
     <property name="build.javadoc" value="${build.docs}/api" />
     <property name="build.encoding" value="ISO-8859-1" />
     <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
-    <property name="hadoop.jarfile" value="hadoop17.jar" />
+    <property name="hadoop.jarfile" value="hadoop18.jar" />
 
     <!-- distribution properties -->
     <property name="staging.dir" value="${build.dir}/staging"/>
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MyRegExLoader.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MyRegExLoader.java
new file mode 100644
index 0000000..3bf8f25
--- /dev/null
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MyRegExLoader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.util.regex.Pattern;
+
+/*
+ * MyRegExLoader extends RegExLoader, allowing regular expressions to be passed by argument through pig latin
+ * via a line like
+ * 
+ * A = LOAD 'file:test.txt' USING org.apache.pig.piggybank.storage.MyRegExLoader('(\\d+)!+(\\w+)~+(\\w+)');
+ * 
+ * which would parse lines like
+ * 
+ * 1!!!one~i 2!!two~~ii 3!three~~~iii
+ * 
+ * into arrays like
+ * 
+ * {1, "one", "i"}, {2, "two", "ii"}, {3, "three", "iii"}
+ */
+
+public class MyRegExLoader extends RegExLoader {
+    Pattern pattern = null;
+
+    public MyRegExLoader(String pattern) {
+        this.pattern = Pattern.compile(pattern);
+    }
+
+    @Override
+    public Pattern getPattern() {
+        return pattern;
+    }
+}
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java
new file mode 100644
index 0000000..87bca2d
--- /dev/null
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * RegExLoader is an abstract class used to parse logs based on a regular expression.
+ * 
+ * There is a single abstract method, getPattern which needs to return a Pattern. Each group will be returned
+ * as a different DataAtom.
+ * 
+ * Look to org.apache.pig.piggybank.storage.apachelog.CommonLogLoader for example usage.
+ */
+
+public abstract class RegExLoader implements ReversibleLoadStoreFunc {
+    protected BufferedPositionedInputStream in = null;
+    long end = Long.MAX_VALUE;
+    private byte recordDel = (byte) '\n';
+    private String fieldDel = "\t";
+    final private static Charset utf8 = Charset.forName("UTF8");
+    OutputStream os;
+
+    abstract public Pattern getPattern();
+
+    public RegExLoader() {
+    }
+
+    public Tuple getNext() throws IOException {
+        if (in == null || in.getPosition() > end) {
+            return null;
+        }
+
+        Pattern pattern = getPattern();
+        Matcher matcher = pattern.matcher("");
+
+        String line;
+        if ((line = in.readLine(utf8, recordDel)) != null) {
+            if (line.length() > 0 && line.charAt(line.length() - 1) == '\r')
+                line = line.substring(0, line.length() - 1);
+
+            matcher.reset(line);
+            if (matcher.find()) {
+                ArrayList<Datum> list = new ArrayList<Datum>();
+
+                for (int i = 1; i <= matcher.groupCount(); i++) {
+                    list.add(new DataAtom(matcher.group(i)));
+                }
+                return new Tuple(list);
+            }
+        }
+        return null;
+    }
+
+    public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+        this.in = in;
+        this.end = end;
+
+        // Since we are not block aligned we throw away the first
+        // record and could on a different instance to read it
+        if (offset != 0) {
+            getNext();
+        }
+    }
+
+    public void bindTo(OutputStream os) throws IOException {
+        this.os = os;
+    }
+
+    public void putNext(Tuple f) throws IOException {
+        os.write((f.toDelimitedString(this.fieldDel) + (char) this.recordDel).getBytes("utf8"));
+    }
+
+    public void finish() throws IOException {
+    }
+}
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/apachelog/CommonLogLoader.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/apachelog/CommonLogLoader.java
new file mode 100644
index 0000000..963ecbd
--- /dev/null
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/apachelog/CommonLogLoader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage.apachelog;
+
+import java.util.regex.Pattern;
+
+import org.apache.pig.piggybank.storage.RegExLoader;
+
+/**
+ * CommonLogLoader is used to load logs based on Apache's common log format, based on a format like
+ * 
+ * LogFormat "%h %l %u %t \"%r\" %>s %b" common
+ * 
+ * The log filename ends up being access_log from a line like
+ * 
+ * CustomLog logs/access_log common
+ * 
+ * Example:
+ * 
+ * raw = LOAD 'access_log' USING org.apache.pig.piggybank.storage.apachelog.CommongLogLoader AS (remoteAddr,
+ * remoteLogname, user, time, method, uri, proto, bytes);
+ * 
+ */
+
+public class CommonLogLoader extends RegExLoader {
+    // 81.19.151.110 - - [04/Oct/2008:13:28:23 -0600] "GET / HTTP/1.0" 200 156
+    private final static Pattern commonLogPattern = Pattern
+        .compile("^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+.(\\S+\\s+\\S+).\\s+.(\\S+)\\s+(\\S+)\\s+(\\S+.\\S+).\\s+(\\S+)\\s+(\\S+)$");
+
+    public Pattern getPattern() {
+        return commonLogPattern;
+    }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java
new file mode 100644
index 0000000..4dd27d5
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.piggybank.storage.apachelog.CommonLogLoader;
+import org.junit.Test;
+
+public class TestCommonLogLoader extends TestCase {
+    public static ArrayList<String[]> data = new ArrayList<String[]>();
+    static {
+        data.add(new String[] { "1.2.3.4", "-", "-", "[01/Jan/2008:23:27:45 -0600]", "\"GET /zero.html HTTP/1.0\"", "200", "100" });
+        data.add(new String[] { "2.3.4.5", "-", "-", "[02/Feb/2008:23:27:48 -0600]", "\"GET /one.js HTTP/1.1\"", "201", "101" });
+        data.add(new String[] { "3.4.5.6", "-", "-", "[03/Mar/2008:23:27:48 -0600]", "\"GET /two.xml HTTP/1.2\"", "202", "102" });
+    }
+
+    public static ArrayList<String[]> EXPECTED = new ArrayList<String[]>();
+    static {
+
+        for (int i = 0; i < data.size(); i++) {
+            ArrayList<String> thisExpected = new ArrayList<String>();
+            for (int j = 0; j <= 2; j++) {
+                thisExpected.add(data.get(i)[j]);
+            }
+            String temp = data.get(i)[3];
+            temp = temp.replace("[", "");
+            temp = temp.replace("]", "");
+            thisExpected.add(temp);
+
+            temp = data.get(i)[4];
+
+            for (String thisOne : data.get(i)[4].split(" ")) {
+                thisOne = thisOne.replace("\"", "");
+                thisExpected.add(thisOne);
+            }
+            for (int j = 5; j <= 6; j++) {
+                thisExpected.add(data.get(i)[j]);
+            }
+
+            String[] toAdd = new String[0];
+            toAdd = (String[]) (thisExpected.toArray(toAdd));
+            EXPECTED.add(toAdd);
+        }
+    }
+
+    @Test
+    public void testInstantiation() {
+        CommonLogLoader commonLogLoader = new CommonLogLoader();
+        assertNotNull(commonLogLoader);
+    }
+
+    @Test
+    public void testLoadFromBindTo() throws Exception {
+        String filename = TestHelper.createTempFile(data, " ");
+        CommonLogLoader commonLogLoader = new CommonLogLoader();
+        PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+        InputStream inputStream = FileLocalizer.open(filename, pigContext);
+        commonLogLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE);
+
+        int tupleCount = 0;
+
+        while (true) {
+            Tuple tuple = commonLogLoader.getNext();
+            if (tuple == null)
+                break;
+            else {
+                TestHelper.examineTuple(EXPECTED, tuple, tupleCount);
+                tupleCount++;
+            }
+        }
+        assertEquals(data.size(), tupleCount);
+    }
+
+    public void testLoadFromPigServer() throws Exception {
+        String filename = TestHelper.createTempFile(data, " ");
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        filename = filename.replace("\\", "\\\\");
+        pig.registerQuery("A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.apachelog.CommonLogLoader();");
+        Iterator<?> it = pig.openIterator("A");
+
+        int tupleCount = 0;
+
+        while (it.hasNext()) {
+            Tuple tuple = (Tuple) it.next();
+            if (tuple == null)
+                break;
+            else {
+                TestHelper.examineTuple(EXPECTED, tuple, tupleCount);
+                tupleCount++;
+            }
+        }
+        assertEquals(data.size(), tupleCount);
+    }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHelper.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHelper.java
new file mode 100644
index 0000000..4fd00f2
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+public class TestHelper extends TestCase {
+    @Test
+    public void testTest() {
+        assertTrue(true);
+    }
+
+
+    public static ArrayList<String[]> getExpected(ArrayList<String[]> data, Pattern pattern) {
+        ArrayList<String[]> expected = new ArrayList<String[]>();
+        for (int i = 0; i < data.size(); i++) {
+            String string = data.get(i)[0];
+            Matcher matcher = pattern.matcher(string);
+            matcher.groupCount();
+            matcher.find();
+            String[] toAdd = new String[] { matcher.group(1), matcher.group(2), matcher.group(3) };
+            expected.add(toAdd);
+        }
+
+        return expected;
+    }
+
+    private static String join(String delimiter, String[] strings) {
+        String string = strings[0];
+        for (int i = 1; i < strings.length; i++) {
+            string += delimiter + strings[i];
+        }
+        return string;
+    }
+
+    public static void examineTuple(ArrayList<String[]> expectedData, Tuple tuple, int tupleCount) {
+        for (int i = 0; i < tuple.arity(); i++) {
+            DataAtom dataAtom = tuple.getAtomField(i);
+            String expected = expectedData.get(tupleCount)[i];
+            String actual = dataAtom.toString();
+            assertEquals(expected, actual);
+        }
+    }
+
+    public static String createTempFile(ArrayList<String[]> myData, String delimiter) throws Exception {
+        File tmpFile = File.createTempFile("test", ".txt");
+        if (tmpFile.exists()) {
+            tmpFile.delete();
+        }
+        PrintWriter pw = new PrintWriter(tmpFile);
+        for (int i = 0; i < myData.size(); i++) {
+            pw.println(join(delimiter, myData.get(i)));
+        }
+        pw.close();
+        tmpFile.deleteOnExit();
+        return tmpFile.getAbsolutePath();
+    }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMyRegExLoader.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMyRegExLoader.java
new file mode 100644
index 0000000..2cdf3b8
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMyRegExLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+
+public class TestMyRegExLoader extends TestCase {
+    private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
+    private final static Pattern pattern = Pattern.compile(patternString);
+    public static ArrayList<String[]> data = new ArrayList<String[]>();
+    static {
+        data.add(new String[] { "1!!!one~i" });
+        data.add(new String[] { "2!!two~~ii" });
+        data.add(new String[] { "3!three~~~iii" });
+    }
+
+    public void testLoadMyRegExFromPigServer() throws Exception {
+        ArrayList<String[]> expected = TestHelper.getExpected(data, pattern);
+        String filename = TestHelper.createTempFile(data, "");
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        filename = filename.replace("\\", "\\\\");
+        patternString = patternString.replace("\\", "\\\\");
+        String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.MyRegExLoader('" + patternString + "');";
+        pig.registerQuery(query);
+        Iterator<?> it = pig.openIterator("A");
+
+        int tupleCount = 0;
+
+        while (it.hasNext()) {
+            Tuple tuple = (Tuple) it.next();
+            if (tuple == null)
+                break;
+            else {
+                TestHelper.examineTuple(expected, tuple, tupleCount);
+                tupleCount++;
+            }
+        }
+        assertEquals(data.size(), tupleCount);
+    }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java
new file mode 100644
index 0000000..996cf4a
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.piggybank.storage.RegExLoader;
+import org.junit.Test;
+
+public class TestRegExLoader extends TestCase {
+    private static String patternString = "(\\w+),(\\w+);(\\w+)";
+    private final static Pattern pattern = Pattern.compile(patternString);
+
+    class DummyRegExLoader extends RegExLoader {
+        @Override
+        public Pattern getPattern() {
+            return Pattern.compile(patternString);
+        }
+    }
+
+    public static ArrayList<String[]> data = new ArrayList<String[]>();
+    static {
+        data.add(new String[] { "1,one;i" });
+        data.add(new String[] { "2,two;ii" });
+        data.add(new String[] { "3,three;iii" });
+    }
+
+    @Test
+    public void testLoadFromBindTo() throws Exception {
+        String filename = TestHelper.createTempFile(data, " ");
+        DummyRegExLoader dummyRegExLoader = new DummyRegExLoader();
+        PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+        InputStream inputStream = FileLocalizer.open(filename, pigContext);
+        dummyRegExLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE);
+
+        ArrayList<String[]> expected = TestHelper.getExpected(data, pattern);
+        int tupleCount = 0;
+
+        while (true) {
+            Tuple tuple = dummyRegExLoader.getNext();
+            if (tuple == null)
+                break;
+            else {
+                TestHelper.examineTuple(expected, tuple, tupleCount);
+                tupleCount++;
+            }
+        }
+        assertEquals(data.size(), tupleCount);
+    }
+}
diff --git a/lib/hadoop17.jar b/lib/hadoop17.jar
deleted file mode 100644
index f21f740..0000000
--- a/lib/hadoop17.jar
+++ /dev/null
Binary files differ
diff --git a/lib/hadoop18.jar b/lib/hadoop18.jar
new file mode 100644
index 0000000..e884a76
--- /dev/null
+++ b/lib/hadoop18.jar
Binary files differ
diff --git a/src/org/apache/pig/ComparisonFunc.java b/src/org/apache/pig/ComparisonFunc.java
index ad90881..b982da3 100644
--- a/src/org/apache/pig/ComparisonFunc.java
+++ b/src/org/apache/pig/ComparisonFunc.java
@@ -26,7 +26,7 @@
 

 public abstract class ComparisonFunc extends WritableComparator {

     public ComparisonFunc() {

-        super(Tuple.class);

+        super(Tuple.class, true);

     }

 

     public int compare(WritableComparable a, WritableComparable b) {

diff --git a/src/org/apache/pig/backend/hadoop/datastorage/HFile.java b/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
index 05a4b2e..8b26897 100644
--- a/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
+++ b/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
@@ -75,6 +75,6 @@
     

     public SeekableInputStream sopen() throws IOException {

         return new HSeekableInputStream(fs.getHFS().open(path),

-                                        fs.getHFS().getContentLength(path));

+                                        fs.getHFS(). getContentSummary(path).getLength());

     }

 }

diff --git a/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java b/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
index 53d3a50..9e06a62 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
@@ -45,7 +45,6 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -80,7 +79,6 @@
     
     protected DataStorage ds;
     
-    protected JobSubmissionProtocol jobTracker;
     protected JobClient jobClient;
 
     // key: the operator key from the logical plan that originated the physical plan
@@ -101,7 +99,6 @@
         this.ds = null;
         
         // to be set in the init method
-        this.jobTracker = null;
         this.jobClient = null;
     }
     
@@ -185,16 +182,6 @@
             
         if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
                 log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
-                if (!LOCAL.equalsIgnoreCase(cluster)) {
-                try {
-                    jobTracker = (JobSubmissionProtocol) RPC.getProxy(
-                            JobSubmissionProtocol.class,
-                            JobSubmissionProtocol.versionID, JobTracker
-                                    .getAddress(configuration), configuration);
-                } catch (IOException e) {
-                    throw new ExecException("Failed to crate job tracker", e);
-                }
-            }
         }
 
         try {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
index de446ed..f1fc85f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
@@ -175,6 +175,9 @@
             }
             if (pom.toCombine != null) {
                 conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine));
+                // this is to make sure that combiner is only called once
+                // since we can't handle no combine or multiple combines
+                conf.setCombineOnceOnly(true);
             }
             if (pom.groupFuncs != null) {
                 conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs));
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
index e51647d..10eded4 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
@@ -70,7 +70,10 @@
                 }
             }
 
-            index = PigInputFormat.getActiveSplit().getIndex();
+            if (PigInputFormat.getActiveSplit() == null) {
+            } else {
+                index = PigInputFormat.getActiveSplit().getIndex();
+            }
 
             Datum groupName = key.getField(0);
             finalout.group = key;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
index 687b028..fb56915 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
@@ -31,6 +31,7 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -90,11 +91,12 @@
         Set<String> locations = new HashSet<String>();
         for (String loc : wrapped.getLocations()) {
             Path path = new Path(loc);
-            String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus(
-                    path).getLen());
-            for (int i = 0; i < hints.length; i++) {
-                for (int j = 0; j < hints[i].length; j++) {
-                    locations.add(hints[i][j]);
+            BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, fs.getFileStatus(
+                                            path).getLen());
+            for (int i = 0; i < blocks.length; i++) {
+                String[] hosts = blocks[i].getHosts();
+                for (int j = 0; j < hosts.length; j++){
+                    locations.add(hosts[j]);
                 }
             }
         }
diff --git a/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java b/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
index a5acc1c..918ac87 100644
--- a/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
+++ b/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
 import org.apache.pig.impl.eval.collector.DataCollector;
@@ -169,10 +170,7 @@
      */
     private boolean writeErrorToHDFS(int limit, String taskId) {
         if (command.getPersistStderr()) {
-            // These are hard-coded begin/end offsets a Hadoop *taskid*
-            int beginIndex = 25, endIndex = 31;   
-
-            int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+            int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
             return tipId < command.getLogFilesLimit();
         }
         return false;
@@ -249,4 +247,4 @@
     }
 }
 
-    
\ No newline at end of file
+    
diff --git a/src/org/apache/pig/data/DistinctDataBag.java b/src/org/apache/pig/data/DistinctDataBag.java
index fc3b2bc..65761da 100644
--- a/src/org/apache/pig/data/DistinctDataBag.java
+++ b/src/org/apache/pig/data/DistinctDataBag.java
@@ -67,6 +67,28 @@
         return true;
     }
     
+    
+    public long size() {
+        if (mSpillFiles != null && mSpillFiles.size() > 0){
+            //We need to racalculate size to guarantee a count of unique 
+            //entries including those on disk
+            Iterator<Tuple> iter = iterator();
+            int newSize = 0;
+            while (iter.hasNext()) {
+                newSize++;
+                iter.next();
+            }
+            
+            synchronized(mContents) {
+                //we don't want adds to change our numbers
+                //the lock may need to cover more of the method
+                mSize = newSize;
+            }
+        }
+        return mSize;
+    }
+    
+    
     @Override
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
@@ -84,7 +106,6 @@
     @Override
     public void addAll(DataBag b) {
         synchronized (mContents) {
-            mSize += b.size();
             Iterator<Tuple> i = b.iterator();
             while (i.hasNext()) {
                 if (mContents.add(i.next())) {
diff --git a/test/org/apache/pig/test/TestDataBag.java b/test/org/apache/pig/test/TestDataBag.java
index b11d44f..e9e7fb4 100644
--- a/test/org/apache/pig/test/TestDataBag.java
+++ b/test/org/apache/pig/test/TestDataBag.java
@@ -555,6 +555,8 @@
             }
             mgr.forceSpill();
         }
+        
+        assertEquals("Size of distinct data bag is incorrect", b.size(), rightAnswer.size());
 
         // Read tuples back, hopefully they come out in the same order.
         Iterator<Tuple> bIter = b.iterator();