| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import static org.apache.pig.PigConfiguration.PIG_STREAMING_ENVIRONMENT; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Random; |
| |
| import org.apache.pig.PigServer; |
| import org.apache.pig.PigToStream; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.builtin.PigStorage; |
| import org.apache.pig.builtin.PigStreaming; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.io.FileLocalizer; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.util.TupleFormat; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| public class TestStreaming { |
| |
| private static final MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); |
| private static File testDataDir; |
| |
| private PigServer pigServer; |
| |
| @BeforeClass |
| public static void oneTimeSetUp() throws Exception { |
| // Create the test data directory if needed |
| testDataDir = new File(Util.getTestDirectory(TestStreaming.class)); |
| testDataDir.mkdirs(); |
| } |
| |
| @Before |
| public void setup() throws ExecException { |
| pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); |
| } |
| |
| @After |
| public void tearDown() { |
| pigServer = null; |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws Exception { |
| cluster.shutDown(); |
| Util.deleteDirectory(testDataDir); |
| } |
| |
| private TupleFactory tf = TupleFactory.getInstance(); |
| |
| private static final String simpleEchoStreamingCommand; |
| |
| static { |
| String quote = "'"; |
| if (Util.WINDOWS) { |
| quote= "\""; |
| } |
| |
| simpleEchoStreamingCommand = "perl -ne "+quote+"print $_"+quote; |
| } |
| |
| private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException { |
| Assert.assertEquals(firstField.length, secondField.length); |
| |
| Tuple[] expectedResults = new Tuple[firstField.length]; |
| for (int i=0; i < expectedResults.length; ++i) { |
| expectedResults[i] = tf.newTuple(2); |
| expectedResults[i].set(0, firstField[i]); |
| expectedResults[i].set(1, secondField[i]); |
| } |
| |
| return expectedResults; |
| } |
| |
| @Test |
| public void testSimpleMapSideStreaming() throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", "D,2", |
| "A,5", "B,5", "C,8", "A,8", |
| "D,8", "A,9"}); |
| |
| // Expected results |
| String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; |
| boolean[] withTypes = {true, false}; |
| for (int i = 0; i < withTypes.length; i++) { |
| Tuple[] expectedResults = null; |
| if(withTypes[i] == true) { |
| expectedResults = |
| setupExpectedResults(expectedFirstFields, expectedSecondFields); |
| } else { |
| expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| } |
| |
| // Pig query to run |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); |
| pigServer.registerQuery("S1 = stream FILTERED_DATA through `" + |
| simpleEchoStreamingCommand + "`;"); |
| if(withTypes[i] == true) { |
| pigServer.registerQuery("OP = stream S1 through `" + |
| simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); |
| } else { |
| pigServer.registerQuery("OP = stream S1 through `" + |
| simpleEchoStreamingCommand + "`;"); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); |
| } |
| } |
| |
| @Test |
| public void testSimpleMapSideStreamingWithOutputSchema() |
| throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", "D,2", |
| "A,5", "B,5", "C,8", "A,8", |
| "D,8", "A,9"}); |
| |
| // Expected results |
| Object[] expectedFirstFields = new String[] {"C", "A", "D", "A"}; |
| Object[] expectedSecondFields = new Integer[] {8, 8, 8, 9}; |
| |
| boolean[] withTypes = {true, false}; |
| for (int i = 0; i < withTypes.length; i++) { |
| Tuple[] expectedResults = null; |
| if(withTypes[i] == true) { |
| expectedResults = |
| setupExpectedResults(expectedFirstFields, expectedSecondFields); |
| } else { |
| expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| } |
| // Pig query to run |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); |
| if(withTypes[i] == true) { |
| pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" + |
| simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); |
| } else { |
| pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" + |
| simpleEchoStreamingCommand + "` as (f0, f1);"); |
| } |
| pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > 6;"); |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); |
| } |
| } |
| |
| @Test |
| public void testSimpleReduceSideStreamingAfterFlatten() |
| throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", "D,2", |
| "A,5", "B,5", "C,8", "A,8", |
| "D,8", "A,9"}); |
| |
| // Expected results |
| String[] expectedFirstFields = new String[] {"A", "A", "A", "B", "C", "D"}; |
| Integer[] expectedSecondFields = new Integer[] {5, 8, 9, 5, 8, 8}; |
| boolean[] withTypes = {true, false}; |
| for (int i = 0; i < withTypes.length; i++) { |
| Tuple[] expectedResults = null; |
| if(withTypes[i] == true) { |
| expectedResults = |
| setupExpectedResults(expectedFirstFields, expectedSecondFields); |
| } else { |
| expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| } |
| |
| // Pig query to run |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); |
| pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;"); |
| pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " + |
| "generate flatten($1);"); |
| pigServer.registerQuery("S1 = stream FLATTENED_GROUPED_DATA through `" + |
| simpleEchoStreamingCommand + "`;"); |
| if(withTypes[i] == true) { |
| pigServer.registerQuery("OP = stream S1 through `" + |
| simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); |
| } else { |
| pigServer.registerQuery("OP = stream S1 through `" + |
| simpleEchoStreamingCommand + "`;"); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputsAfterSort(pigServer.openIterator("OP"), expectedResults); |
| } |
| } |
| |
| @Test |
| public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1,2,3", "B,2,4,5", |
| "C,3,1,2", "D,2,5,2", |
| "A,5,5,1", "B,5,7,4", |
| "C,8,9,2", "A,8,4,5", |
| "D,8,8,3", "A,9,2,5"} |
| ); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"}; |
| Integer[] expectedSecondFields = new Integer[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8}; |
| Integer[] expectedThirdFields = new Integer[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8}; |
| Integer[] expectedFourthFields = new Integer[] {3, 5, 5, 1, 5, 4, 2, 2, 2, 3}; |
| Tuple[] expectedResults = new Tuple[10]; |
| for (int i = 0; i < expectedResults.length; ++i) { |
| expectedResults[i] = tf.newTuple(4); |
| expectedResults[i].set(0, expectedFirstFields[i]); |
| expectedResults[i].set(1, expectedSecondFields[i]); |
| expectedResults[i].set(2, expectedThirdFields[i]); |
| expectedResults[i].set(3, expectedFourthFields[i]); |
| } |
| //setupExpectedResults(expectedFirstFields, expectedSecondFields); |
| |
| // Pig query to run |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); |
| pigServer.registerQuery("S1 = stream FILTERED_DATA through `" + |
| simpleEchoStreamingCommand + "`;"); |
| pigServer.registerQuery("S2 = stream S1 through `" + |
| simpleEchoStreamingCommand + "`;"); |
| pigServer.registerQuery("GROUPED_DATA = group IP by $0;"); |
| pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " + |
| " D = order IP BY $2, $3;" + |
| " generate flatten(D);" + |
| "};"); |
| pigServer.registerQuery("S3 = stream ORDERED_DATA through `" + |
| simpleEchoStreamingCommand + "`;"); |
| pigServer.registerQuery("OP = stream S3 through `" + |
| simpleEchoStreamingCommand + "` as (f0:chararray, f1:int, f2:int, f3:int);"); |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); |
| } |
| |
| @Test |
| public void testInputShipSpecs() throws Exception { |
| |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", |
| "D,2", "A,5", "B,5", |
| "C,8", "A,8", "D,8", |
| "A,9"}); |
| |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "while (<INFILE>) {", |
| " chomp $_;", |
| " print STDOUT \"$_\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| "}", |
| }; |
| File command1 = Util.createInputFile("script", "pl", script); |
| // Test relative path |
| File command2 = new File(testDataDir, "testInputShipSpecs.pl"); |
| Util.writeToFile(command2, script); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "B", "C", "A", "D", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; |
| Tuple[] expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| |
| // Pig query to run |
| |
| pigServer.registerQuery( |
| "define CMD1 `perl " + command1.getName() + " foo` " + |
| "ship ('" + Util.encodeEscape(command1.toString()) + "') " + |
| "input('foo' using " + PigStreaming.class.getName() + "(',')) " + |
| "output(stdout using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"); |
| pigServer.registerQuery( |
| "define CMD2 `perl " + command2.getName() + " bar` " + |
| "ship ('" + Util.encodeEscape(command2.toString()) + "') " + |
| "input('bar' using " + PigStreaming.class.getName() + "(',')) " + |
| "output(stdout using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) |
| + "' using PigStorage(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); |
| pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + |
| "through CMD1;"); |
| pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;"); |
| |
| String output = "/pig/out"; |
| pigServer.deleteFile(output); |
| pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); |
| |
| pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');"); |
| Iterator<Tuple> iter = pigServer.openIterator("A"); |
| |
| List<Tuple> outputs = new ArrayList<Tuple>(); |
| while (iter.hasNext()) { |
| outputs.add(iter.next()); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(outputs.iterator(), expectedResults); |
| } |
| |
| @Test |
| public void testInputShipSpecsWithUDFDefine() throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", |
| "D,2", "A,5", "B,5", |
| "C,8", "A,8", "D,8", |
| "A,9"}); |
| |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "while (<INFILE>) {", |
| " chomp $_;", |
| " print STDOUT \"$_\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| "}", |
| }; |
| File command1 = Util.createInputFile("script", "pl", script); |
| File command2 = Util.createInputFile("script", "pl", script); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "B", "C", "A", "D", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; |
| Tuple[] expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| |
| // Pig query to run |
| pigServer.registerQuery("define PS " + PigStreaming.class.getName() + "(',');"); |
| |
| pigServer.registerQuery( |
| "define CMD1 `perl " + command1.getName() + " foo` " + |
| "ship ('" + Util.encodeEscape(command1.toString()) + "') " + |
| "input('foo' using PS )" + |
| "output(stdout using PS ) " + |
| "stderr();"); |
| pigServer.registerQuery( |
| "define CMD2 `perl " + command2.getName() + " bar` " + |
| "ship ('" + Util.encodeEscape(command2.toString()) + "') " + |
| "input('bar' using PS ) " + |
| "output(stdout using PS ) " + |
| "stderr();"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using PigStorage(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); |
| pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + |
| "through CMD1;"); |
| pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;"); |
| |
| String output = "/pig/out"; |
| pigServer.deleteFile(output); |
| pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); |
| |
| pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');"); |
| Iterator<Tuple> iter = pigServer.openIterator("A"); |
| |
| List<Tuple> outputs = new ArrayList<Tuple>(); |
| while (iter.hasNext()) { |
| outputs.add(iter.next()); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(outputs.iterator(), expectedResults); |
| } |
| |
| @Test |
| public void testInputCacheSpecs() throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", |
| "D,2", "A,5", "B,5", |
| "C,8", "A,8", "D,8", |
| "A,9"}); |
| |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "while (<INFILE>) {", |
| " chomp $_;", |
| " print STDOUT \"$_\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| "}", |
| }; |
| // Copy the scripts to HDFS |
| File command1 = Util.createInputFile("script", "pl", script); |
| File command2 = Util.createInputFile("script", "pl", script); |
| String c1 = FileLocalizer.hadoopify(command1.toString(), |
| pigServer.getPigContext()); |
| String c2 = FileLocalizer.hadoopify(command2.toString(), |
| pigServer.getPigContext()); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "B", "C", "A", "D", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; |
| Tuple[] expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| |
| // Pig query to run |
| pigServer.registerQuery( |
| "define CMD1 `perl script1.pl foo` " + |
| "cache ('" + c1 + "#script1.pl') " + |
| "input('foo' using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"); |
| pigServer.registerQuery( |
| "define CMD2 `perl script2.pl bar` " + |
| "cache ('" + c2 + "#script2.pl') " + |
| "input('bar' using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); |
| pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + |
| "through CMD1;"); |
| pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;"); |
| |
| String output = "/pig/out"; |
| pigServer.deleteFile(output); |
| pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); |
| |
| pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');"); |
| Iterator<Tuple> iter = pigServer.openIterator("A"); |
| |
| List<Tuple> outputs = new ArrayList<Tuple>(); |
| while (iter.hasNext()) { |
| outputs.add(iter.next()); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(outputs.iterator(), expectedResults); |
| } |
| |
| @Test |
| public void testOutputShipSpecs() throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", |
| "D,2", "A,5", "B,5", |
| "C,8", "A,8", "D,8", |
| "A,9"}); |
| |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";", |
| "while (<STDIN>) {", |
| " print OUTFILE \"$_\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| " print OUTFILE2 \"A,10\n\";", |
| "}", |
| }; |
| File command = Util.createInputFile("script", "pl", script); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "A", "A", "A", "A", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10}; |
| Tuple[] expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| |
| // Pig query to run |
| pigServer.registerQuery( |
| "define CMD `perl " + command.getName() + " foo bar` " + |
| "ship ('" + Util.encodeEscape(command.toString()) + "') " + |
| "output('foo' using " + PigStreaming.class.getName() + "(','), " + |
| "'bar' using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); |
| pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); |
| |
| String output = "/pig/out"; |
| pigServer.deleteFile(output); |
| pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); |
| |
| pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');"); |
| Iterator<Tuple> iter = pigServer.openIterator("A"); |
| |
| List<Tuple> outputs = new ArrayList<Tuple>(); |
| while (iter.hasNext()) { |
| outputs.add(iter.next()); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(outputs.iterator(), expectedResults); |
| } |
| |
| @Test |
| public void testOutputShipSpecsWithUDFDefine() throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", |
| "D,2", "A,5", "B,5", |
| "C,8", "A,8", "D,8", |
| "A,9"}); |
| |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";", |
| "while (<STDIN>) {", |
| " print OUTFILE \"$_\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| " print OUTFILE2 \"A,10\n\";", |
| "}", |
| }; |
| File command = Util.createInputFile("script", "pl", script); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "A", "A", "A", "A", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10}; |
| Tuple[] expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| |
| // Pig query to run |
| pigServer.registerQuery( |
| "define PS " + PigStreaming.class.getName() + "(',');"); |
| |
| pigServer.registerQuery( |
| "define CMD `perl " + command.getName() + " foo bar` " + |
| "ship ('" + Util.encodeEscape(command.toString()) + "') " + |
| "output('foo' using PS, " + |
| "'bar' using PS) " + |
| "stderr();"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using PigStorage(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); |
| pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); |
| |
| String output = "/pig/out"; |
| pigServer.deleteFile(output); |
| pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); |
| |
| pigServer.registerQuery("A = load '" + output + "/bar" + "' using PigStorage(',');"); |
| Iterator<Tuple> iter = pigServer.openIterator("A"); |
| |
| List<Tuple> outputs = new ArrayList<Tuple>(); |
| while (iter.hasNext()) { |
| outputs.add(iter.next()); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(outputs.iterator(), expectedResults); |
| } |
| |
| @Test |
| public void testInputOutputSpecs() throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", |
| "D,2", "A,5", "B,5", |
| "C,8", "A,8", "D,8", |
| "A,9"}); |
| |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "open(OUTFILE, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";", |
| "open(OUTFILE2, \">\", $ARGV[2]) or die \"Can't open \".$ARGV[2].\"!: $!\";", |
| "while (<INFILE>) {", |
| " chomp $_;", |
| " print OUTFILE \"$_\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| " print OUTFILE2 \"$_\n\";", |
| "}", |
| }; |
| File command = Util.createInputFile("script", "pl", script); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "B", "C", "A", "D", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; |
| Tuple[] expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| // Pig query to run |
| pigServer.registerQuery( |
| "define CMD `perl " + command.getName() + " foo bar foobar` " + |
| "ship ('" + Util.encodeEscape(command.toString()) + "') " + |
| "input('foo' using " + PigStreaming.class.getName() + "(',')) " + |
| "output('bar', " + |
| "'foobar' using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;"); |
| pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;"); |
| |
| String output = "/pig/out"; |
| pigServer.deleteFile(output); |
| pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); |
| |
| pigServer.registerQuery("A = load '" + output + "/foobar" + "' using PigStorage(',');"); |
| Iterator<Tuple> iter = pigServer.openIterator("A"); |
| |
| List<Tuple> outputs = new ArrayList<Tuple>(); |
| while (iter.hasNext()) { |
| outputs.add(iter.next()); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(outputs.iterator(), expectedResults); |
| |
| // Cleanup |
| pigServer.deleteFile(output); |
| } |
| |
| @Test |
| public void testSimpleMapSideStreamingWithUnixPipes() |
| throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", "D,2", |
| "A,5", "B,5", "C,8", "A,8", |
| "D,8", "A,9"}); |
| |
| // Expected results |
| String[] expectedFirstFields = |
| new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {1, 2, 3, 2, 5, 5, 8, 8, 8, 9}; |
| boolean[] withTypes = {true, false}; |
| for (int i = 0; i < withTypes.length; i++) { |
| Tuple[] expectedResults = null; |
| if(withTypes[i] == true) { |
| expectedResults = |
| setupExpectedResults(expectedFirstFields, expectedSecondFields); |
| } else { |
| expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| } |
| |
| // Pig query to run |
| pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand + |
| " | " + simpleEchoStreamingCommand + "`;"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| if(withTypes[i] == true) { |
| pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);"); |
| } else { |
| pigServer.registerQuery("OP = stream IP through CMD;"); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); |
| } |
| } |
| |
| @Test |
| public void testNegativeLoadStoreOptimization() |
| throws Exception { |
| File input = Util.createInputFile("tmp", "", |
| new String[] {"A,1", "B,2", "C,3", "D,2", |
| "A,5", "B,5", "C,8", "A,8", |
| "D,8", "A,9"}); |
| |
| // Expected results |
| String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"}; |
| Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9}; |
| boolean[] withTypes = {true, false}; |
| for (int i = 0; i < withTypes.length; i++) { |
| Tuple[] expectedResults = null; |
| if(withTypes[i] == true) { |
| expectedResults = |
| setupExpectedResults(expectedFirstFields, expectedSecondFields); |
| } else { |
| expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| } |
| |
| // Pig query to run |
| pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + |
| "` input(stdin using " + PigStreamDump.class.getName() + ");"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) + "' using " |
| + PigStorage.class.getName() + "(',');"); |
| pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); |
| if(withTypes[i] == true) { |
| pigServer.registerQuery("OP = stream FILTERED_DATA through `" + |
| simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);"); |
| } else { |
| pigServer.registerQuery("OP = stream FILTERED_DATA through `" + |
| simpleEchoStreamingCommand + "`;"); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults); |
| } |
| } |
| |
| @Test |
| public void testNegativeMultipleInput() throws IOException { |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "while (<INFILE>) {", |
| " chomp $_;", |
| " print STDOUT \"$_\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| "}", |
| }; |
| File command1 = Util.createInputFile("script", "pl", script); |
| String query = |
| "define CMD1 `perl " + command1.getName() + " foo` " + |
| "ship ('" + Util.encodeEscape(command1.toString()) + "') " + |
| "input('foo' using " + PigStreaming.class.getName() + "(',')) " + |
| "output(stdout using " + PigStreaming.class.getName() + "(',')) " + |
| "input('foo' using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"; |
| |
| try { |
| pigServer.registerQuery( query ); |
| } catch(FrontendException ex) { |
| String expectedMsg = "pig script failed to validate: Duplicated command option"; |
| System.out.println( ex.getMessage() ); |
| Assert.assertTrue( ex.getMessage().contains( expectedMsg ) ); |
| return; |
| } |
| |
| Assert.fail( "Testcase is supposed to fail." ); |
| } |
| |
| @Test |
| public void testStreamingStderrLogsShouldNotBePersistedByDefault() throws Exception { |
| |
| Util.createInputFile(cluster, "mydummyinput.txt", new String[] { "dummy"}); |
| |
| PigServer pig = new PigServer(cluster.getExecType(),cluster.getProperties()); |
| pig.setBatchOn(); |
| |
| pig.registerQuery("define mycmd `echo dummy` ;"); |
| pig.registerQuery("A = load 'mydummyinput.txt' as (f1:chararray);"); |
| pig.registerQuery("B = stream A through mycmd;"); |
| pig.registerQuery("store B into 'output_dir_001' ;"); |
| pig.executeBatch(); |
| |
| Assert.assertTrue(Util.exists(pig.getPigContext(), "output_dir_001")); |
| Assert.assertFalse(Util.exists(pig.getPigContext(), "output_dir_001/_logs/mycmd")); |
| |
| } |
| |
| /** |
| * PIG-2973: Verify that JobConf is added to environment even when input to |
| * the streaming binary is asynchronous (i.e. it is from a file). |
| */ |
| @Test |
| public void testAddJobConfToEnvironmentWithASynchInput() throws Exception { |
| File input = Util.createInputFile("tmp", "", new String[] {"A"}); |
| |
| // Generate a random number that will be passed via an environment |
| // variable to the streaming process |
| Random rand = new Random(); |
| final int ENV_VAR_VALUE = rand.nextInt(); |
| final String ENV_VAR_NAME = "MY_RANDOM_NUMBER"; |
| |
| // Perl script |
| String[] script = |
| new String[] { |
| "#!/usr/bin/perl", |
| "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", |
| "while (<INFILE>) {", |
| " chomp $_;", |
| // Append the value of the environment variable to the line |
| " print STDOUT \"$_,$ENV{'" + ENV_VAR_NAME + "'}\n\";", |
| " print STDERR \"STDERR: $_\n\";", |
| "}", |
| }; |
| File command = Util.createInputFile("script", "pl", script); |
| |
| // Expected results |
| String[] expectedFirstFields = new String[] {"A"}; |
| Integer[] expectedSecondFields = new Integer[] {ENV_VAR_VALUE}; |
| Tuple[] expectedResults = |
| setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), |
| Util.toDataByteArrays(expectedSecondFields)); |
| |
| // Set a property and pass it via environment variable to the streaming process |
| pigServer.getPigContext().getProperties() |
| .setProperty(PIG_STREAMING_ENVIRONMENT, ENV_VAR_NAME); |
| pigServer.getPigContext().getProperties() |
| .setProperty(ENV_VAR_NAME, Integer.toString(ENV_VAR_VALUE)); |
| |
| // Pig query to run |
| pigServer.registerQuery( |
| "define CMD `perl " + command.getName() + " foo` " + |
| "ship ('" + Util.encodeEscape(command.toString()) + "') " + |
| "input('foo' using " + PigStreaming.class.getName() + "()) " + |
| "output(stdout using " + PigStreaming.class.getName() + "(',')) " + |
| "stderr();"); |
| pigServer.registerQuery("IP = load '" |
| + Util.generateURI(input.toString(), |
| pigServer.getPigContext()) |
| + "' using PigStorage();"); |
| pigServer.registerQuery("STREAMED_DATA = stream IP through CMD;"); |
| |
| String output = "/pig/out"; |
| pigServer.deleteFile(output); |
| pigServer.store("STREAMED_DATA", output, PigStorage.class.getName() + "(',')"); |
| |
| pigServer.registerQuery("A = load '" + output + "' using PigStorage(',');"); |
| Iterator<Tuple> iter = pigServer.openIterator("A"); |
| |
| List<Tuple> outputs = new ArrayList<Tuple>(); |
| while (iter.hasNext()) { |
| outputs.add(iter.next()); |
| } |
| |
| // Run the query and check the results |
| Util.checkQueryOutputs(outputs.iterator(), expectedResults); |
| } |
| |
| public static class PigStreamDump implements PigToStream { |
| |
| public static final String recordDelimiter = "\n"; |
| |
| @Override |
| public byte[] serialize(Tuple t) throws IOException { |
| return (TupleFormat.format(t) + recordDelimiter).getBytes(); |
| } |
| |
| } |
| } |