Changes in preparation to pig 0.1.1 release
git-svn-id: https://svn.apache.org/repos/asf/hadoop/pig/branches/branch-0.1@720324 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index decb129..a7873c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,30 @@
Pig Change Log
-Release 0.1.0 - Unreleased
+Trunk (unreleased changes)
+
+ INCOMPATIBLE CHANGES
+
+ IMPROVEMENTS
+
+ OPTIMIZATIONS
+
+ BUG FIXES
+
+Release 0.1.1 - Unreleased
+
+INCOMPATIBLE CHANGES
+
+NEW FEATURES
+
+IMPROVEMENTS
+
+PIG-253: integration with hadoop-18
+
+BUG FIXES
+
+PIG-342: Fix DistinctDataBag to recalculate size after it has spilled. (bdimcheff via gates)
+
+Release 0.1.0 - - 2008-09-11
INCOMPATIBLE CHANGES
@@ -342,3 +366,10 @@
PIG-34: updated CHANGES.txt
+ PIG-472: Added RegExLoader to piggybank, an abstract loader class to parse
+ text files via regular espressions (spackest via gates)
+
+ PIG-473: Added CommonLogLoader, a subclass of RegExLoader to piggybank (spackest via gates)
+
+ PIG-474: Added MyRegexLoader, a subclass of RegExLoader, to piggybank (spackest via gates)
+
diff --git a/build.xml b/build.xml
index 5555249..269927b 100644
--- a/build.xml
+++ b/build.xml
@@ -25,7 +25,7 @@
<!-- name and version properties -->
<property name="name" value="pig" />
<property name="Name" value="Pig" />
- <property name="version" value="0.1.0-dev" />
+ <property name="version" value="0.2.0-dev" />
<property name="final.name" value="${name}-${version}" />
<!-- source properties -->
@@ -58,7 +58,7 @@
<property name="build.javadoc" value="${build.docs}/api" />
<property name="build.encoding" value="ISO-8859-1" />
<!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
- <property name="hadoop.jarfile" value="hadoop17.jar" />
+ <property name="hadoop.jarfile" value="hadoop18.jar" />
<!-- distribution properties -->
<property name="staging.dir" value="${build.dir}/staging"/>
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MyRegExLoader.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MyRegExLoader.java
new file mode 100644
index 0000000..3bf8f25
--- /dev/null
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MyRegExLoader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.util.regex.Pattern;
+
+/*
+ * MyRegExLoader extends RegExLoader, allowing regular expressions to be passed by argument through pig latin
+ * via a line like
+ *
+ * A = LOAD 'file:test.txt' USING org.apache.pig.piggybank.storage.MyRegExLoader('(\\d+)!+(\\w+)~+(\\w+)');
+ *
+ * which would parse lines like
+ *
+ * 1!!!one~i 2!!two~~ii 3!three~~~iii
+ *
+ * into arrays like
+ *
+ * {1, "one", "i"}, {2, "two", "ii"}, {3, "three", "iii"}
+ */
+
+public class MyRegExLoader extends RegExLoader {
+ Pattern pattern = null;
+
+ public MyRegExLoader(String pattern) {
+ this.pattern = Pattern.compile(pattern);
+ }
+
+ @Override
+ public Pattern getPattern() {
+ return pattern;
+ }
+}
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java
new file mode 100644
index 0000000..87bca2d
--- /dev/null
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * RegExLoader is an abstract class used to parse logs based on a regular expression.
+ *
+ * There is a single abstract method, getPattern which needs to return a Pattern. Each group will be returned
+ * as a different DataAtom.
+ *
+ * Look to org.apache.pig.piggybank.storage.apachelog.CommonLogLoader for example usage.
+ */
+
+public abstract class RegExLoader implements ReversibleLoadStoreFunc {
+ protected BufferedPositionedInputStream in = null;
+ long end = Long.MAX_VALUE;
+ private byte recordDel = (byte) '\n';
+ private String fieldDel = "\t";
+ final private static Charset utf8 = Charset.forName("UTF8");
+ OutputStream os;
+
+ abstract public Pattern getPattern();
+
+ public RegExLoader() {
+ }
+
+ public Tuple getNext() throws IOException {
+ if (in == null || in.getPosition() > end) {
+ return null;
+ }
+
+ Pattern pattern = getPattern();
+ Matcher matcher = pattern.matcher("");
+
+ String line;
+ if ((line = in.readLine(utf8, recordDel)) != null) {
+ if (line.length() > 0 && line.charAt(line.length() - 1) == '\r')
+ line = line.substring(0, line.length() - 1);
+
+ matcher.reset(line);
+ if (matcher.find()) {
+ ArrayList<Datum> list = new ArrayList<Datum>();
+
+ for (int i = 1; i <= matcher.groupCount(); i++) {
+ list.add(new DataAtom(matcher.group(i)));
+ }
+ return new Tuple(list);
+ }
+ }
+ return null;
+ }
+
+ public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+ this.in = in;
+ this.end = end;
+
+ // Since we are not block aligned we throw away the first
+ // record and could on a different instance to read it
+ if (offset != 0) {
+ getNext();
+ }
+ }
+
+ public void bindTo(OutputStream os) throws IOException {
+ this.os = os;
+ }
+
+ public void putNext(Tuple f) throws IOException {
+ os.write((f.toDelimitedString(this.fieldDel) + (char) this.recordDel).getBytes("utf8"));
+ }
+
+ public void finish() throws IOException {
+ }
+}
diff --git a/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/apachelog/CommonLogLoader.java b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/apachelog/CommonLogLoader.java
new file mode 100644
index 0000000..963ecbd
--- /dev/null
+++ b/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/apachelog/CommonLogLoader.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage.apachelog;
+
+import java.util.regex.Pattern;
+
+import org.apache.pig.piggybank.storage.RegExLoader;
+
+/**
+ * CommonLogLoader is used to load logs based on Apache's common log format, based on a format like
+ *
+ * LogFormat "%h %l %u %t \"%r\" %>s %b" common
+ *
+ * The log filename ends up being access_log from a line like
+ *
+ * CustomLog logs/access_log common
+ *
+ * Example:
+ *
+ * raw = LOAD 'access_log' USING org.apache.pig.piggybank.storage.apachelog.CommongLogLoader AS (remoteAddr,
+ * remoteLogname, user, time, method, uri, proto, bytes);
+ *
+ */
+
+public class CommonLogLoader extends RegExLoader {
+ // 81.19.151.110 - - [04/Oct/2008:13:28:23 -0600] "GET / HTTP/1.0" 200 156
+ private final static Pattern commonLogPattern = Pattern
+ .compile("^(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+.(\\S+\\s+\\S+).\\s+.(\\S+)\\s+(\\S+)\\s+(\\S+.\\S+).\\s+(\\S+)\\s+(\\S+)$");
+
+ public Pattern getPattern() {
+ return commonLogPattern;
+ }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java
new file mode 100644
index 0000000..4dd27d5
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.piggybank.storage.apachelog.CommonLogLoader;
+import org.junit.Test;
+
+public class TestCommonLogLoader extends TestCase {
+ public static ArrayList<String[]> data = new ArrayList<String[]>();
+ static {
+ data.add(new String[] { "1.2.3.4", "-", "-", "[01/Jan/2008:23:27:45 -0600]", "\"GET /zero.html HTTP/1.0\"", "200", "100" });
+ data.add(new String[] { "2.3.4.5", "-", "-", "[02/Feb/2008:23:27:48 -0600]", "\"GET /one.js HTTP/1.1\"", "201", "101" });
+ data.add(new String[] { "3.4.5.6", "-", "-", "[03/Mar/2008:23:27:48 -0600]", "\"GET /two.xml HTTP/1.2\"", "202", "102" });
+ }
+
+ public static ArrayList<String[]> EXPECTED = new ArrayList<String[]>();
+ static {
+
+ for (int i = 0; i < data.size(); i++) {
+ ArrayList<String> thisExpected = new ArrayList<String>();
+ for (int j = 0; j <= 2; j++) {
+ thisExpected.add(data.get(i)[j]);
+ }
+ String temp = data.get(i)[3];
+ temp = temp.replace("[", "");
+ temp = temp.replace("]", "");
+ thisExpected.add(temp);
+
+ temp = data.get(i)[4];
+
+ for (String thisOne : data.get(i)[4].split(" ")) {
+ thisOne = thisOne.replace("\"", "");
+ thisExpected.add(thisOne);
+ }
+ for (int j = 5; j <= 6; j++) {
+ thisExpected.add(data.get(i)[j]);
+ }
+
+ String[] toAdd = new String[0];
+ toAdd = (String[]) (thisExpected.toArray(toAdd));
+ EXPECTED.add(toAdd);
+ }
+ }
+
+ @Test
+ public void testInstantiation() {
+ CommonLogLoader commonLogLoader = new CommonLogLoader();
+ assertNotNull(commonLogLoader);
+ }
+
+ @Test
+ public void testLoadFromBindTo() throws Exception {
+ String filename = TestHelper.createTempFile(data, " ");
+ CommonLogLoader commonLogLoader = new CommonLogLoader();
+ PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+ InputStream inputStream = FileLocalizer.open(filename, pigContext);
+ commonLogLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE);
+
+ int tupleCount = 0;
+
+ while (true) {
+ Tuple tuple = commonLogLoader.getNext();
+ if (tuple == null)
+ break;
+ else {
+ TestHelper.examineTuple(EXPECTED, tuple, tupleCount);
+ tupleCount++;
+ }
+ }
+ assertEquals(data.size(), tupleCount);
+ }
+
+ public void testLoadFromPigServer() throws Exception {
+ String filename = TestHelper.createTempFile(data, " ");
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ filename = filename.replace("\\", "\\\\");
+ pig.registerQuery("A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.apachelog.CommonLogLoader();");
+ Iterator<?> it = pig.openIterator("A");
+
+ int tupleCount = 0;
+
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ TestHelper.examineTuple(EXPECTED, tuple, tupleCount);
+ tupleCount++;
+ }
+ }
+ assertEquals(data.size(), tupleCount);
+ }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHelper.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHelper.java
new file mode 100644
index 0000000..4fd00f2
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+public class TestHelper extends TestCase {
+ @Test
+ public void testTest() {
+ assertTrue(true);
+ }
+
+
+ public static ArrayList<String[]> getExpected(ArrayList<String[]> data, Pattern pattern) {
+ ArrayList<String[]> expected = new ArrayList<String[]>();
+ for (int i = 0; i < data.size(); i++) {
+ String string = data.get(i)[0];
+ Matcher matcher = pattern.matcher(string);
+ matcher.groupCount();
+ matcher.find();
+ String[] toAdd = new String[] { matcher.group(1), matcher.group(2), matcher.group(3) };
+ expected.add(toAdd);
+ }
+
+ return expected;
+ }
+
+ private static String join(String delimiter, String[] strings) {
+ String string = strings[0];
+ for (int i = 1; i < strings.length; i++) {
+ string += delimiter + strings[i];
+ }
+ return string;
+ }
+
+ public static void examineTuple(ArrayList<String[]> expectedData, Tuple tuple, int tupleCount) {
+ for (int i = 0; i < tuple.arity(); i++) {
+ DataAtom dataAtom = tuple.getAtomField(i);
+ String expected = expectedData.get(tupleCount)[i];
+ String actual = dataAtom.toString();
+ assertEquals(expected, actual);
+ }
+ }
+
+ public static String createTempFile(ArrayList<String[]> myData, String delimiter) throws Exception {
+ File tmpFile = File.createTempFile("test", ".txt");
+ if (tmpFile.exists()) {
+ tmpFile.delete();
+ }
+ PrintWriter pw = new PrintWriter(tmpFile);
+ for (int i = 0; i < myData.size(); i++) {
+ pw.println(join(delimiter, myData.get(i)));
+ }
+ pw.close();
+ tmpFile.deleteOnExit();
+ return tmpFile.getAbsolutePath();
+ }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMyRegExLoader.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMyRegExLoader.java
new file mode 100644
index 0000000..2cdf3b8
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMyRegExLoader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+
+public class TestMyRegExLoader extends TestCase {
+ private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
+ private final static Pattern pattern = Pattern.compile(patternString);
+ public static ArrayList<String[]> data = new ArrayList<String[]>();
+ static {
+ data.add(new String[] { "1!!!one~i" });
+ data.add(new String[] { "2!!two~~ii" });
+ data.add(new String[] { "3!three~~~iii" });
+ }
+
+ public void testLoadMyRegExFromPigServer() throws Exception {
+ ArrayList<String[]> expected = TestHelper.getExpected(data, pattern);
+ String filename = TestHelper.createTempFile(data, "");
+ PigServer pig = new PigServer(ExecType.LOCAL);
+ filename = filename.replace("\\", "\\\\");
+ patternString = patternString.replace("\\", "\\\\");
+ String query = "A = LOAD 'file:" + filename + "' USING org.apache.pig.piggybank.storage.MyRegExLoader('" + patternString + "');";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+
+ int tupleCount = 0;
+
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ TestHelper.examineTuple(expected, tuple, tupleCount);
+ tupleCount++;
+ }
+ }
+ assertEquals(data.size(), tupleCount);
+ }
+}
diff --git a/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java
new file mode 100644
index 0000000..996cf4a
--- /dev/null
+++ b/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.piggybank.storage.RegExLoader;
+import org.junit.Test;
+
+public class TestRegExLoader extends TestCase {
+ private static String patternString = "(\\w+),(\\w+);(\\w+)";
+ private final static Pattern pattern = Pattern.compile(patternString);
+
+ class DummyRegExLoader extends RegExLoader {
+ @Override
+ public Pattern getPattern() {
+ return Pattern.compile(patternString);
+ }
+ }
+
+ public static ArrayList<String[]> data = new ArrayList<String[]>();
+ static {
+ data.add(new String[] { "1,one;i" });
+ data.add(new String[] { "2,two;ii" });
+ data.add(new String[] { "3,three;iii" });
+ }
+
+ @Test
+ public void testLoadFromBindTo() throws Exception {
+ String filename = TestHelper.createTempFile(data, " ");
+ DummyRegExLoader dummyRegExLoader = new DummyRegExLoader();
+ PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+ InputStream inputStream = FileLocalizer.open(filename, pigContext);
+ dummyRegExLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE);
+
+ ArrayList<String[]> expected = TestHelper.getExpected(data, pattern);
+ int tupleCount = 0;
+
+ while (true) {
+ Tuple tuple = dummyRegExLoader.getNext();
+ if (tuple == null)
+ break;
+ else {
+ TestHelper.examineTuple(expected, tuple, tupleCount);
+ tupleCount++;
+ }
+ }
+ assertEquals(data.size(), tupleCount);
+ }
+}
diff --git a/lib/hadoop17.jar b/lib/hadoop17.jar
deleted file mode 100644
index f21f740..0000000
--- a/lib/hadoop17.jar
+++ /dev/null
Binary files differ
diff --git a/lib/hadoop18.jar b/lib/hadoop18.jar
new file mode 100644
index 0000000..e884a76
--- /dev/null
+++ b/lib/hadoop18.jar
Binary files differ
diff --git a/src/org/apache/pig/ComparisonFunc.java b/src/org/apache/pig/ComparisonFunc.java
index ad90881..b982da3 100644
--- a/src/org/apache/pig/ComparisonFunc.java
+++ b/src/org/apache/pig/ComparisonFunc.java
@@ -26,7 +26,7 @@
public abstract class ComparisonFunc extends WritableComparator {
public ComparisonFunc() {
- super(Tuple.class);
+ super(Tuple.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
diff --git a/src/org/apache/pig/backend/hadoop/datastorage/HFile.java b/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
index 05a4b2e..8b26897 100644
--- a/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
+++ b/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
@@ -75,6 +75,6 @@
public SeekableInputStream sopen() throws IOException {
return new HSeekableInputStream(fs.getHFS().open(path),
- fs.getHFS().getContentLength(path));
+ fs.getHFS(). getContentSummary(path).getLength());
}
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java b/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
index 53d3a50..9e06a62 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
@@ -45,7 +45,6 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
@@ -80,7 +79,6 @@
protected DataStorage ds;
- protected JobSubmissionProtocol jobTracker;
protected JobClient jobClient;
// key: the operator key from the logical plan that originated the physical plan
@@ -101,7 +99,6 @@
this.ds = null;
// to be set in the init method
- this.jobTracker = null;
this.jobClient = null;
}
@@ -185,16 +182,6 @@
if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
- if (!LOCAL.equalsIgnoreCase(cluster)) {
- try {
- jobTracker = (JobSubmissionProtocol) RPC.getProxy(
- JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, JobTracker
- .getAddress(configuration), configuration);
- } catch (IOException e) {
- throw new ExecException("Failed to crate job tracker", e);
- }
- }
}
try {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
index de446ed..f1fc85f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
@@ -175,6 +175,9 @@
}
if (pom.toCombine != null) {
conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine));
+ // this is to make sure that combiner is only called once
+ // since we can't handle no combine or multiple combines
+ conf.setCombineOnceOnly(true);
}
if (pom.groupFuncs != null) {
conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs));
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
index e51647d..10eded4 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
@@ -70,7 +70,10 @@
}
}
- index = PigInputFormat.getActiveSplit().getIndex();
+ if (PigInputFormat.getActiveSplit() == null) {
+ } else {
+ index = PigInputFormat.getActiveSplit().getIndex();
+ }
Datum groupName = key.getField(0);
finalout.group = key;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
index 687b028..fb56915 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
@@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -90,11 +91,12 @@
Set<String> locations = new HashSet<String>();
for (String loc : wrapped.getLocations()) {
Path path = new Path(loc);
- String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus(
- path).getLen());
- for (int i = 0; i < hints.length; i++) {
- for (int j = 0; j < hints[i].length; j++) {
- locations.add(hints[i][j]);
+ BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, fs.getFileStatus(
+ path).getLen());
+ for (int i = 0; i < blocks.length; i++) {
+ String[] hosts = blocks[i].getHosts();
+ for (int j = 0; j < hosts.length; j++){
+ locations.add(hosts[j]);
}
}
}
diff --git a/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java b/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
index a5acc1c..918ac87 100644
--- a/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
+++ b/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
import org.apache.pig.impl.eval.collector.DataCollector;
@@ -169,10 +170,7 @@
*/
private boolean writeErrorToHDFS(int limit, String taskId) {
if (command.getPersistStderr()) {
- // These are hard-coded begin/end offsets a Hadoop *taskid*
- int beginIndex = 25, endIndex = 31;
-
- int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+ int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
return tipId < command.getLogFilesLimit();
}
return false;
@@ -249,4 +247,4 @@
}
}
-
\ No newline at end of file
+
diff --git a/src/org/apache/pig/data/DistinctDataBag.java b/src/org/apache/pig/data/DistinctDataBag.java
index fc3b2bc..65761da 100644
--- a/src/org/apache/pig/data/DistinctDataBag.java
+++ b/src/org/apache/pig/data/DistinctDataBag.java
@@ -67,6 +67,28 @@
return true;
}
+
+ public long size() {
+ if (mSpillFiles != null && mSpillFiles.size() > 0){
+ //We need to racalculate size to guarantee a count of unique
+ //entries including those on disk
+ Iterator<Tuple> iter = iterator();
+ int newSize = 0;
+ while (iter.hasNext()) {
+ newSize++;
+ iter.next();
+ }
+
+ synchronized(mContents) {
+ //we don't want adds to change our numbers
+ //the lock may need to cover more of the method
+ mSize = newSize;
+ }
+ }
+ return mSize;
+ }
+
+
@Override
public Iterator<Tuple> iterator() {
return new DistinctDataBagIterator();
@@ -84,7 +106,6 @@
@Override
public void addAll(DataBag b) {
synchronized (mContents) {
- mSize += b.size();
Iterator<Tuple> i = b.iterator();
while (i.hasNext()) {
if (mContents.add(i.next())) {
diff --git a/test/org/apache/pig/test/TestDataBag.java b/test/org/apache/pig/test/TestDataBag.java
index b11d44f..e9e7fb4 100644
--- a/test/org/apache/pig/test/TestDataBag.java
+++ b/test/org/apache/pig/test/TestDataBag.java
@@ -555,6 +555,8 @@
}
mgr.forceSpill();
}
+
+ assertEquals("Size of distinct data bag is incorrect", b.size(), rightAnswer.size());
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();