blob: e19bd59c4ad47f5786752be6522c0424eef59e9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.test.utils.CloseAwareFSDataInputStream;
import org.apache.pig.test.utils.CloseAwareOutputStream;
import org.apache.tools.bzip2r.CBZip2InputStream;
import org.apache.tools.bzip2r.CBZip2OutputStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestBZip {
private static Properties properties;
private static MiniGenericCluster cluster;
@Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{ false },
{ true }
});
}
public TestBZip (Boolean useBzipFromHadoop) {
properties = cluster.getProperties();
properties.setProperty("pig.bzip.use.hadoop.inputformat", useBzipFromHadoop.toString());
}
@Rule
public TemporaryFolder folder = new TemporaryFolder();
@BeforeClass
public static void oneTimeSetUp() throws Exception {
cluster = MiniGenericCluster.buildCluster();
properties = cluster.getProperties();
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
/**
* Tests the end-to-end writing and reading of a BZip file.
*/
@Test
public void testBzipInPig() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);
File in = folder.newFile("junit-in.bz2");
File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());
CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 1; i < 100; i++) {
StringBuffer sb = new StringBuffer();
sb.append(i).append("\n").append(-i).append("\n");
byte bytes[] = sb.toString().getBytes();
cos.write(bytes);
}
cos.close();
pig.registerQuery("AA = load '"
+ Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
+ "';");
pig.registerQuery("A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);");
pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutput) + "';");
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutput),
Util.getSuccessMarkerPathFilter());
FSDataInputStream is = fs.open(outputFiles[0].getPath());
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
assertEquals(100, cis.read(new byte[100]));
cis.close();
pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutput) + "';");
Iterator<Tuple> i = pig.openIterator("B");
HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
while (i.hasNext()) {
Integer val = DataType.toInteger(i.next().get(0));
map.put(val, val);
}
assertEquals(new Integer(99), new Integer(map.keySet().size()));
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}
}
/**
* Tests the end-to-end writing and reading of a BZip file using absolute path with a trailing /.
*/
@Test
public void testBzipInPig2() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);
File in = folder.newFile("junit-in.bz2");
File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutput = Util.removeColon(out.getAbsolutePath());
CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 1; i < 100; i++) {
StringBuffer sb = new StringBuffer();
sb.append(i).append("\n").append(-i).append("\n");
byte bytes[] = sb.toString().getBytes();
cos.write(bytes);
}
cos.close();
pig.registerQuery("AA = load '"
+ Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
+ "';");
pig.registerQuery("A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);");
pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutput) + "/';");
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutput),
Util.getSuccessMarkerPathFilter());
FSDataInputStream is = fs.open(outputFiles[0].getPath());
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
assertEquals(100, cis.read(new byte[100]));
cis.close();
pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutput) + "';");
Iterator<Tuple> i = pig.openIterator("B");
HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
while (i.hasNext()) {
Integer val = DataType.toInteger(i.next().get(0));
map.put(val, val);
}
assertEquals(new Integer(99), new Integer(map.keySet().size()));
for (int j = 1; j < 100; j++) {
assertEquals(new Integer(j), map.get(j));
}
}
//see PIG-2391
@Test
public void testBz2() throws Exception {
String[] inputData = new String[] {
"1\t2\r3\t4", // '\r' case - this will be split into two tuples
"5\t6\r", // '\r\n' case
"7\t8", // '\n' case
"9\t10\r" // '\r\n' at the end of file
};
// bzip compressed input
File in = folder.newFile("junit-in.bz2");
String compressedInputFileName = in.getAbsolutePath();
String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
try {
CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 0; i < inputData.length; i++) {
StringBuffer sb = new StringBuffer();
sb.append(inputData[i]).append("\n");
byte bytes[] = sb.toString().getBytes();
cos.write(bytes);
}
cos.close();
Util.copyFromLocalToCluster(cluster, compressedInputFileName,
clusterCompressedFilePath);
// pig script to read compressed input
PigServer pig = new PigServer(cluster.getExecType(), properties);
// pig script to read compressed input
String script ="a = load '" + Util.encodeEscape(clusterCompressedFilePath) +"';";
pig.registerQuery(script);
pig.registerQuery("store a into 'intermediate.bz';");
pig.registerQuery("b = load 'intermediate.bz';");
Iterator<Tuple> it2 = pig.openIterator("b");
while (it2.hasNext()) {
it2.next();
}
} finally {
Util.deleteFile(cluster, "intermediate.bz");
Util.deleteFile(cluster, "final.bz");
}
}
/**
* Tests that '\n', '\r' and '\r\n' are treated as record delims when using
* bzip just like they are when using uncompressed text
*/
@Test
public void testRecordDelims() throws Exception {
String[] inputData = new String[] {
"1\t2\r3\t4", // '\r' case - this will be split into two tuples
"5\t6\r", // '\r\n' case
"7\t8", // '\n' case
"9\t10\r" // '\r\n' at the end of file
};
// bzip compressed input
File in = folder.newFile("junit-in.bz2");
String compressedInputFileName = in.getAbsolutePath();
String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Util.createInputFile(cluster, unCompressedInputFileName, inputData);
try {
CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in));
for (int i = 0; i < inputData.length; i++) {
StringBuffer sb = new StringBuffer();
sb.append(inputData[i]).append("\n");
byte bytes[] = sb.toString().getBytes();
cos.write(bytes);
}
cos.close();
Util.copyFromLocalToCluster(cluster, compressedInputFileName,
clusterCompressedFilePath);
// pig script to read uncompressed input
String script = "a = load '" + unCompressedInputFileName +"';";
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery(script);
Iterator<Tuple> it1 = pig.openIterator("a");
// pig script to read compressed input
script = "a = load '" + Util.encodeEscape(clusterCompressedFilePath) +"';";
pig.registerQuery(script);
Iterator<Tuple> it2 = pig.openIterator("a");
while(it1.hasNext()) {
Tuple t1 = it1.next();
Tuple t2 = it2.next();
assertEquals(t1, t2);
}
assertFalse(it2.hasNext());
} finally {
Util.deleteFile(cluster, unCompressedInputFileName);
Util.deleteFile(cluster, clusterCompressedFilePath);
}
}
/**
* Tests the end-to-end writing and reading of an empty BZip file.
*/
@Test
public void testEmptyBzipInPig() throws Exception {
PigServer pig = new PigServer(cluster.getExecType(), properties);
File in = folder.newFile("junit-in.tmp");
File out = folder.newFile("junit-out.bz2");
out.delete();
String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());
FileOutputStream fos = new FileOutputStream(in);
fos.write("55\n".getBytes());
fos.close();
System.out.println(in.getAbsolutePath());
pig.registerQuery("AA = load '"
+ Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
+ "';");
pig.registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);");
pig.registerQuery("store A into '" + Util.encodeEscape(clusterOutputFilePath) + "';");
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
FileStatus[] outputFiles = fs.listStatus(new Path(clusterOutputFilePath),
Util.getSuccessMarkerPathFilter());
FSDataInputStream is = fs.open(outputFiles[0].getPath());
CBZip2InputStream cis = new CBZip2InputStream(is, -1, out.length());
// Just a sanity check, to make sure it was a bzip file; we
// will do the value verification later
assertEquals(-1, cis.read(new byte[100]));
cis.close();
pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';");
pig.openIterator("B");
}
/**
* Tests the writing and reading of an empty BZip file.
*/
@Test
public void testEmptyBzip() throws Exception {
File tmp = folder.newFile("junit.tmp");
CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
tmp));
cos.close();
assertNotSame(0, tmp.length());
FileSystem fs = FileSystem.getLocal(new Configuration(false));
CBZip2InputStream cis = new CBZip2InputStream(
fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length());
assertEquals(-1, cis.read(new byte[100]));
cis.close();
}
@Test
public void testInnerStreamGetsClosed() throws Exception {
File tmp = folder.newFile("junit.tmp");
CloseAwareOutputStream out = new CloseAwareOutputStream(new FileOutputStream(tmp));
CBZip2OutputStream cos = new CBZip2OutputStream(out);
assertFalse(out.isClosed());
cos.close();
assertTrue(out.isClosed());
FileSystem fs = FileSystem.getLocal(new Configuration(false));
Path path = new Path(tmp.getAbsolutePath());
CloseAwareFSDataInputStream in = new CloseAwareFSDataInputStream(fs.open(path));
CBZip2InputStream cis = new CBZip2InputStream(in, -1, tmp.length());
assertFalse(in.isClosed());
cis.close();
assertTrue(in.isClosed());
}
/**
* Tests the case where a bzip block ends exactly at the end of the {@link InputSplit}
* with the block header ending a few bits into the last byte of current
* InputSplit. This case results in dropped records in Pig 0.6 release
* This test also tests that bzip files couple of dirs deep can be read by
* specifying the top level dir.
*/
@Test
public void testBlockHeaderEndingAtSplitNotByteAligned() throws IOException {
// the actual input file is at
// test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
// In this test we will load test/org/apache/pig/test/data/bzipdir1.bz2 to also
// test that the BZip2TextInputFormat can read subdirs recursively
String inputFileName =
"test/org/apache/pig/test/data/bzipdir1.bz2";
Long expectedCount = 74999L; // number of lines in above file
// the first block in the above file exactly ends a few bits into the
// byte at position 136500
int splitSize = 136500;
try {
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
testCount(inputFileName, expectedCount, splitSize, "TextLoader()");
} finally {
Util.deleteFile(cluster, inputFileName);
}
}
/**
* Tests the case where a bzip block ends exactly at the end of the input
* split (byte aligned with the last byte) and the last byte is a carriage
* return.
*/
@Test
public void testBlockHeaderEndingWithCR() throws IOException {
String inputFileName =
"test/org/apache/pig/test/data/blockEndingInCR.txt.bz2";
// number of lines in above file (the value is 1 more than bzcat | wc -l
// since there is a '\r' which is also treated as a record delim
Long expectedCount = 82094L;
// the first block in the above file exactly ends at the byte at
// position 136498 and the last byte is a carriage return ('\r')
try {
int splitSize = 136498;
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
} finally {
Util.deleteFile(cluster, inputFileName);
}
}
/**
* Tests the case where a bzip block ends exactly at the end of the input
* split and has more data which results in overcounting (record duplication)
* in Pig 0.6
*
*/
@Test
public void testBlockHeaderEndingAtSplitOverCounting() throws IOException {
String inputFileName =
"test/org/apache/pig/test/data/blockHeaderEndsAt136500.txt.bz2";
Long expectedCount = 1041046L; // number of lines in above file
// the first block in the above file exactly ends a few bits into the
// byte at position 136500
int splitSize = 136500;
try {
Util.copyFromLocalToCluster(cluster, inputFileName, inputFileName);
testCount(inputFileName, expectedCount, splitSize, "PigStorage()");
} finally {
Util.deleteFile(cluster, inputFileName);
}
}
private void testCount(String inputFileName, Long expectedCount,
int splitSize, String loadFuncSpec) throws IOException {
String outputFile = "/tmp/bz-output";
// simple load-store script to verify that the bzip input is getting
// split
String scriptToTestSplitting = "a = load '" +inputFileName + "' using " +
loadFuncSpec + "; store a into '" + outputFile + "';";
String script = "a = load '" + inputFileName + "';" +
"b = group a all;" +
"c = foreach b generate COUNT_STAR(a);";
Properties props = new Properties();
for (Entry<Object, Object> entry : properties.entrySet()) {
props.put(entry.getKey(), entry.getValue());
}
props.setProperty(MRConfiguration.MAX_SPLIT_SIZE, Integer.toString(splitSize));
props.setProperty("pig.noSplitCombination", "true");
PigServer pig = new PigServer(cluster.getExecType(), props);
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props));
fs.delete(new Path(outputFile), true);
Util.registerMultiLineQuery(pig, scriptToTestSplitting);
// verify that > 1 maps were launched due to splitting of the bzip input
FileStatus[] files = fs.listStatus(new Path(outputFile));
int numPartFiles = 0;
for (FileStatus fileStatus : files) {
if(fileStatus.getPath().getName().startsWith("part")) {
numPartFiles++;
}
}
assertEquals(true, numPartFiles > 1);
// verify record count to verify we read bzip data correctly
Util.registerMultiLineQuery(pig, script);
Iterator<Tuple> it = pig.openIterator("c");
Long result = (Long) it.next().get(0);
assertEquals(expectedCount, result);
}
@Test
public void testBzipStoreInMultiQuery() throws Exception {
String[] inputData = new String[] {
"1\t2\r3\t4"
};
try {
String inputFileName = "input.txt";
Util.createInputFile(cluster, inputFileName, inputData);
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.setBatchOn();
pig.registerQuery("a = load '" + inputFileName + "';");
pig.registerQuery("store a into 'output.bz2';");
pig.registerQuery("store a into 'output';");
pig.executeBatch();
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
FileStatus[] outputFiles = fs.listStatus(new Path("output"),
Util.getSuccessMarkerPathFilter());
assertTrue(outputFiles[0].getLen() > 0);
outputFiles = fs.listStatus(new Path("output.bz2"),
Util.getSuccessMarkerPathFilter());
assertTrue(outputFiles[0].getLen() > 0);
} finally {
Util.deleteFile(cluster, "input.txt");
Util.deleteFile(cluster, "output.bz2");
Util.deleteFile(cluster, "output");
}
}
@Test
public void testBzipStoreInMultiQuery2() throws Exception {
String[] inputData = new String[] {
"1\t2\r3\t4"
};
String inputFileName = "input2.txt";
Util.createInputFile(cluster, inputFileName, inputData);
try {
PigServer pig = new PigServer(cluster.getExecType(), properties);
PigContext pigContext = pig.getPigContext();
pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" );
pig.setBatchOn();
pig.registerQuery("a = load '" + inputFileName + "';");
pig.registerQuery("store a into 'output2.bz2';");
pig.registerQuery("store a into 'output2';");
pig.executeBatch();
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
Util.getSuccessMarkerPathFilter());
assertTrue(outputFiles[0].getLen() > 0);
outputFiles = fs.listStatus(new Path("output2.bz2"),
Util.getSuccessMarkerPathFilter());
assertTrue(outputFiles[0].getLen() > 0);
} finally {
Util.deleteFile(cluster,"input2.txt");
Util.deleteFile(cluster,"output2.bz2");
Util.deleteFile(cluster,"output2");
}
}
/**
* Tests that Pig's Bzip2TextInputFormat throws an IOException when the input files to be loaded are actually
* a result of concatenating 2 or more bz2 files. It should not silently ignore part
* of the input data. When, hadoop's TextInpuFormat is used(PIG-3251), it should
* successfully read this concatenated bzip file to the end.
*/
@Test
public void testBZ2Concatenation() throws Exception {
String[] inputData1 = new String[] {
"1\ta",
"2\taa"
};
String[] inputData2 = new String[] {
"1\tb",
"2\tbb"
};
String[] inputDataMerged = new String[] {
"1\ta",
"2\taa",
"1\tb",
"2\tbb"
};
// bzip compressed input file1
File in1 = folder.newFile("junit-in1.bz2");
String compressedInputFileName1 = in1.getAbsolutePath();
// file2
File in2 = folder.newFile("junit-in2.bz2");
String compressedInputFileName2 = in2.getAbsolutePath();
String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);
try {
CBZip2OutputStream cos =
new CBZip2OutputStream(new FileOutputStream(in1));
for (int i = 0; i < inputData1.length; i++) {
StringBuffer sb = new StringBuffer();
sb.append(inputData1[i]).append("\n");
byte bytes[] = sb.toString().getBytes();
cos.write(bytes);
}
cos.close();
CBZip2OutputStream cos2 =
new CBZip2OutputStream(new FileOutputStream(in2));
for (int i = 0; i < inputData2.length; i++) {
StringBuffer sb = new StringBuffer();
sb.append(inputData2[i]).append("\n");
byte bytes[] = sb.toString().getBytes();
cos2.write(bytes);
}
cos2.close();
// cat
catInto(compressedInputFileName2, compressedInputFileName1);
Util.copyFromLocalToCluster(cluster, compressedInputFileName1,
compressedInputFileName1);
// pig script to read uncompressed input
String script = "a = load '" + Util.encodeEscape(unCompressedInputFileName) +"';";
PigServer pig = new PigServer(cluster.getExecType(), properties);
pig.registerQuery(script);
Iterator<Tuple> it1 = pig.openIterator("a");
// pig script to read compressed concatenated input
script = "a = load '" + Util.encodeEscape(compressedInputFileName1) +"';";
pig.registerQuery(script);
try {
Iterator<Tuple> it2 = pig.openIterator("a");
while(it1.hasNext()) {
Tuple t1 = it1.next();
Tuple t2 = it2.next();
assertEquals(t1, t2);
}
assertFalse(it2.hasNext());
// When pig.bzip.use.hadoop.inputformat=true, it should successfully read the concatenated bzip file
assertEquals("IOException should be thrown when pig's own Bzip2TextInputFormat is used",
properties.getProperty("pig.bzip.use.hadoop.inputformat"),
"true");
} catch (IOException e) {
assertEquals("IOException should only be thrown when pig's own Bzip2TextInputFormat is used",
properties.getProperty("pig.bzip.use.hadoop.inputformat"),
"false");
}
} finally {
Util.deleteFile(cluster, unCompressedInputFileName);
}
}
/*
* Concatenate the contents of src file to the contents of dest file
*/
private void catInto(String src, String dest) throws IOException {
FileOutputStream out = new FileOutputStream(new File(dest) , true);
FileInputStream in = new FileInputStream(new File(src));
byte[] buffer = new byte[4096];
int bytesread;
while ((bytesread = in.read(buffer)) != -1) {
out.write(buffer,0, bytesread);
}
in.close();
out.close();
}
// See PIG-1714
@Test
public void testBzipStoreInMultiQuery3() throws Exception {
String[] inputData = new String[] {
"1\t2\r3\t4"
};
String inputFileName = "input3.txt";
Util.createInputFile(cluster, inputFileName, inputData);
String inputScript = "set mapred.output.compress true\n" +
"set mapreduce.output.fileoutputformat.compress true\n" +
"set mapred.output.compression.codec org.apache.hadoop.io.compress.BZip2Codec\n" +
"set mapreduce.output.fileoutputformat.compress.codec org.apache.hadoop.io.compress.BZip2Codec\n" +
"a = load '" + inputFileName + "';\n" +
"store a into 'output3.bz2';\n" +
"store a into 'output3';";
String inputScriptName = "script3.txt";
PrintWriter pw = new PrintWriter(new FileWriter(inputScriptName));
pw.println(inputScript);
pw.close();
try {
PigServer pig = new PigServer(cluster.getExecType(), properties);
FileInputStream fis = new FileInputStream(inputScriptName);
pig.registerScript(fis);
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
pig.getPigContext().getProperties()));
FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
Util.getSuccessMarkerPathFilter());
assertTrue(outputFiles[0].getLen() > 0);
outputFiles = fs.listStatus(new Path("output3.bz2"),
Util.getSuccessMarkerPathFilter());
assertTrue(outputFiles[0].getLen() > 0);
} finally {
Util.deleteFile(cluster, "input3.txt");
Util.deleteFile(cluster, "output3.bz2");
Util.deleteFile(cluster, "output3");
}
}
}