blob: 8d4282dfbdf7bf8f3e209a889d64a0e06981969e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static java.util.regex.Matcher.quoteReplacement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.apache.pig.ExecType;
import org.apache.pig.ExecTypeProvider;
import org.apache.pig.LoadCaster;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.newplan.logical.optimizer.UidResetter;
import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
import org.apache.pig.newplan.logical.visitor.SortInfoSetter;
import org.apache.pig.newplan.logical.visitor.StoreAliasSetter;
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.QueryParserDriver;
import org.apache.pig.tools.grunt.GruntParser;
import org.apache.pig.tools.pigstats.ScriptState;
import org.junit.Assert;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
public class Util {
private static BagFactory mBagFactory = BagFactory.getInstance();
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
// Commonly-checked system state
// =================
public static final boolean WINDOWS /* borrowed from Path.WINDOWS, Shell.WINDOWS */
= System.getProperty("os.name").startsWith("Windows");
public static final String TEST_DIR = System.getProperty("test.build.dir", "build/test");
// Helper Functions
// =================
static public Tuple loadFlatTuple(Tuple t, int[] input) throws ExecException {
for (int i = 0; i < input.length; i++) {
t.set(i, new Integer(input[i]));
}
return t;
}
static public Tuple loadTuple(Tuple t, String[] input) throws ExecException {
for (int i = 0; i < input.length; i++) {
t.set(i, input[i]);
}
return t;
}
static public Tuple loadTuple(Tuple t, DataByteArray[] input) throws ExecException {
for (int i = 0; i < input.length; i++) {
t.set(i, input[i]);
}
return t;
}
static public Tuple loadNestTuple(Tuple t, int[] input) throws ExecException {
DataBag bag = BagFactory.getInstance().newDefaultBag();
for(int i = 0; i < input.length; i++) {
Tuple f = TupleFactory.getInstance().newTuple(1);
f.set(0, input[i]);
bag.add(f);
}
t.set(0, bag);
return t;
}
static public Tuple loadNestTuple(Tuple t, long[] input) throws ExecException {
DataBag bag = BagFactory.getInstance().newDefaultBag();
for(int i = 0; i < input.length; i++) {
Tuple f = TupleFactory.getInstance().newTuple(1);
f.set(0, new Long(input[i]));
bag.add(f);
}
t.set(0, bag);
return t;
}
// this one should handle String, DataByteArray, Long, Integer etc..
static public <T> Tuple loadNestTuple(Tuple t, T[] input) throws ExecException {
DataBag bag = BagFactory.getInstance().newDefaultBag();
for(int i = 0; i < input.length; i++) {
Tuple f = TupleFactory.getInstance().newTuple(1);
f.set(0, input[i]);
bag.add(f);
}
t.set(0, bag);
return t;
}
/**
* Create an array of tuple bags with specified size created by splitting
* the input array of primitive types
*
* @param input Array of primitive types
* @param bagSize The number of tuples to be split and copied into each bag
*
* @return an array of tuple bags with each bag containing bagSize tuples split from the input
*/
static public <T> Tuple[] splitCreateBagOfTuples(T[] input, int bagSize)
throws ExecException {
List<Tuple> result = new ArrayList<Tuple>();
for (int from = 0; from < input.length; from += bagSize) {
Tuple t = TupleFactory.getInstance().newTuple(1);
int to = from + bagSize < input.length ? from + bagSize
: input.length;
T[] array = Arrays.copyOfRange(input, from, to);
result.add(loadNestTuple(t, array));
}
return result.toArray(new Tuple[0]);
}
static public <T>void addToTuple(Tuple t, T[] b)
{
for(int i = 0; i < b.length; i++)
t.append(b[i]);
}
static public Tuple buildTuple(Object... args) throws ExecException {
return TupleFactory.getInstance().newTupleNoCopy(Lists.newArrayList(args));
}
static public Tuple buildBinTuple(final Object... args) throws IOException {
return TupleFactory.getInstance().newTuple(Lists.transform(
Lists.newArrayList(args), new Function<Object, DataByteArray>() {
@Override
public DataByteArray apply(Object o) {
if (o == null) {
return null;
}
try {
return new DataByteArray(DataType.toBytes(o));
} catch (ExecException e) {
return null;
}
}
}));
}
static public <T>Tuple createTuple(T[] s)
{
Tuple t = mTupleFactory.newTuple();
addToTuple(t, s);
return t;
}
static public DataBag createBag(Tuple[] t)
{
DataBag b = mBagFactory.newDefaultBag();
for(int i = 0; i < t.length; i++)b.add(t[i]);
return b;
}
static public<T> DataBag createBagOfOneColumn(T[] input) throws ExecException {
DataBag result = mBagFactory.newDefaultBag();
for (int i = 0; i < input.length; i++) {
Tuple t = mTupleFactory.newTuple(1);
t.set(0, input[i]);
result.add(t);
}
return result;
}
static public Map<String, Object> createMap(String[] contents)
{
Map<String, Object> m = new HashMap<String, Object>();
for(int i = 0; i < contents.length; ) {
m.put(contents[i], contents[i+1]);
i += 2;
}
return m;
}
static public<T> DataByteArray[] toDataByteArrays(T[] input) {
DataByteArray[] dbas = new DataByteArray[input.length];
for (int i = 0; i < input.length; i++) {
dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].toString().getBytes());
}
return dbas;
}
static public Tuple loadNestTuple(Tuple t, int[][] input) throws ExecException {
for (int i = 0; i < input.length; i++) {
DataBag bag = BagFactory.getInstance().newDefaultBag();
Tuple f = loadFlatTuple(TupleFactory.getInstance().newTuple(input[i].length), input[i]);
bag.add(f);
t.set(i, bag);
}
return t;
}
static public Tuple loadTuple(Tuple t, String[][] input) throws ExecException {
for (int i = 0; i < input.length; i++) {
DataBag bag = BagFactory.getInstance().newDefaultBag();
Tuple f = loadTuple(TupleFactory.getInstance().newTuple(input[i].length), input[i]);
bag.add(f);
t.set(i, bag);
}
return t;
}
/**
* Helper to remove colons (if any exist) from paths to sanitize them for
* consumption by hdfs.
*
* @param origPath original path name
* @return String sanitized path with anything prior to : removed
* @throws IOException
*/
static public String removeColon(String origPath)
{
return origPath.replaceAll(":", "");
}
/**
* Helper to convert \r\n to \n for cross-platform string
* matching with checked-in baselines.
*
* @param origPath original string
* @return String newline-standardized string
* @throws IOException
*/
static public String standardizeNewline(String origPath)
{
return origPath.replaceAll("\r\n", "\n");
}
/**
* Helper to create a temporary file with given input data for use in test cases.
*
* @param tmpFilenamePrefix file-name prefix
* @param tmpFilenameSuffix file-name suffix
* @param inputData input for test cases, each string in inputData[] is written
* on one line
* @return {@link File} handle to the created temporary file
* @throws IOException
*/
static public File createInputFile(String tmpFilenamePrefix,
String tmpFilenameSuffix,
String[] inputData)
throws IOException {
File f = File.createTempFile(tmpFilenamePrefix, tmpFilenameSuffix);
f.deleteOnExit();
writeToFile(f, inputData);
return f;
}
static public File createLocalInputFile(String filename, String[] inputData)
throws IOException {
File f = new File(filename);
f.deleteOnExit();
writeToFile(f, inputData);
return f;
}
public static void writeToFile(File f, String[] inputData) throws
IOException {
PrintWriter pw = new PrintWriter(new OutputStreamWriter(new
FileOutputStream(f), "UTF-8"));
for (int i=0; i<inputData.length; i++){
pw.print(inputData[i]);
pw.print("\n");
}
pw.close();
}
/**
* Helper to create a dfs file on the Minicluster DFS with given
* input data for use in test cases.
*
* @param miniCluster reference to the Minicluster where the file should be created
* @param fileName pathname of the file to be created
* @param inputData input for test cases, each string in inputData[] is written
* on one line
* @throws IOException
*/
static public void createInputFile(MiniGenericCluster miniCluster, String fileName,
String[] inputData)
throws IOException {
FileSystem fs = miniCluster.getFileSystem();
createInputFile(fs, fileName, inputData);
}
static public void createInputFile(FileSystem fs, String fileName,
String[] inputData) throws IOException {
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
if(fs.exists(new Path(fileName))) {
throw new IOException("File " + fileName + " already exists on the FileSystem");
}
FSDataOutputStream stream = fs.create(new Path(fileName));
PrintWriter pw = new PrintWriter(new OutputStreamWriter(stream, "UTF-8"));
for (int i=0; i<inputData.length; i++){
pw.print(inputData[i]);
pw.print("\n");
}
pw.close();
}
static public String[] readOutput(FileSystem fs, String fileName) throws IOException {
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
Path path = new Path(fileName);
if(!fs.exists(path)) {
throw new IOException("Path " + fileName + " does not exist on the FileSystem");
}
FileStatus fileStatus = fs.getFileStatus(path);
FileStatus[] files;
if (fileStatus.isDir()) {
files = fs.listStatus(path, new PathFilter() {
@Override
public boolean accept(Path p) {
return !p.getName().startsWith("_");
}
});
} else {
files = new FileStatus[] { fileStatus };
}
List<String> result = new ArrayList<String>();
for (FileStatus f : files) {
FSDataInputStream stream = fs.open(f.getPath());
BufferedReader br = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
String line;
while ((line = br.readLine()) != null) {
result.add(line);
}
br.close();
}
return result.toArray(new String[result.size()]);
}
/**
* Helper to create a dfs file on the MiniCluster dfs. This returns an
* outputstream that can be used in test cases to write data.
*
* @param cluster
* reference to the MiniCluster where the file should be created
* @param fileName
* pathname of the file to be created
* @return OutputStream to write any data to the file created on the
* MiniCluster.
* @throws IOException
*/
static public OutputStream createInputFile(MiniGenericCluster cluster,
String fileName) throws IOException {
FileSystem fs = cluster.getFileSystem();
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
if (fs.exists(new Path(fileName))) {
throw new IOException("File " + fileName
+ " already exists on the minicluster");
}
return fs.create(new Path(fileName));
}
/**
* Helper to create an empty temp file on local file system
* which will be deleted on exit
* @param prefix
* @param suffix
* @return File denoting a newly-created empty file
* @throws IOException
*/
static public File createTempFileDelOnExit(String prefix, String suffix)
throws IOException {
File tmpFile = File.createTempFile(prefix, suffix);
tmpFile.deleteOnExit();
return tmpFile;
}
/**
* Helper to remove a dfs file from the minicluster DFS
*
* @param miniCluster reference to the Minicluster where the file should be deleted
* @param fileName pathname of the file to be deleted
* @throws IOException
*/
static public void deleteFile(MiniGenericCluster miniCluster, String fileName)
throws IOException {
FileSystem fs = miniCluster.getFileSystem();
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
fs.delete(new Path(fileName), true);
}
/**
* Deletes a dfs file from the MiniCluster DFS quietly
*
* @param miniCluster the MiniCluster where the file should be deleted
* @param fileName the path of the file to be deleted
*/
public static void deleteQuietly(MiniGenericCluster miniCluster, String fileName) {
try {
deleteFile(miniCluster, fileName);
} catch (IOException ignored) {
}
}
static public void deleteFile(PigContext pigContext, String fileName)
throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
FileSystem fs = FileSystem.get(conf);
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
fs.delete(new Path(fileName), true);
}
static public boolean exists(PigContext pigContext, String fileName)
throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
FileSystem fs = FileSystem.get(conf);
if(Util.WINDOWS){
fileName = fileName.replace('\\','/');
}
return fs.exists(new Path(fileName));
}
/**
* Helper function to check if the result of a Pig Query is in line with
* expected results.
*
* @param actualResults Result of the executed Pig query
* @param expectedResults Expected results Array to validate against
*/
static public void checkQueryOutputs(Iterator<Tuple> actualResults,
Tuple[] expectedResults) {
checkQueryOutputs(actualResults, Arrays.asList(expectedResults));
}
/**
* Helper function to check if the result of a Pig Query is in line with
* expected results.
*
* @param actualResults Result of the executed Pig query
* @param expectedResults Expected results List to validate against
*/
static public void checkQueryOutputs(Iterator<Tuple> actualResults,
List<Tuple> expectedResults) {
int count = 0;
for (Tuple expected : expectedResults) {
Tuple actual = actualResults.next();
count++;
Assert.assertEquals(expected.toString(), actual.toString());
}
Assert.assertEquals(expectedResults.size(), count);
}
/**
* Helper function to check if the result of a Pig Query is in line with
* expected results.
*
* @param actualResults Result of the executed Pig query
* @param expectedResults Expected results List to validate against
*/
static public void checkQueryOutputs(Iterator<Tuple> actualResults,
Iterator<Tuple> expectedResults, Integer expectedRows) {
int count = 0;
while (expectedResults.hasNext()) {
Tuple expected = expectedResults.next();
Assert.assertTrue("Actual result has less records than expected results", actualResults.hasNext());
Tuple actual = actualResults.next();
Assert.assertEquals(expected.toString(), actual.toString());
count++;
}
Assert.assertFalse("Actual result has more records than expected results", actualResults.hasNext());
if (expectedRows != null) {
Assert.assertEquals((int)expectedRows, count);
}
}
/**
* Helper function to check if the result of a Pig Query is in line with
* expected results. It sorts actual and expected results before comparison
*
* @param actualResultsIt Result of the executed Pig query
* @param expectedResList Expected results to validate against
*/
static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt,
List<Tuple> expectedResList) {
List<Tuple> actualResList = new ArrayList<Tuple>();
while(actualResultsIt.hasNext()){
actualResList.add(actualResultsIt.next());
}
checkQueryOutputsAfterSort(actualResList, expectedResList);
}
/**
* Helper function to check if the result of Pig Query is in line with expected results.
* It sorts actual and expected results before comparison.
* The tuple size in the tuple list can vary. Pass by a two-dimension array, it will be converted to be a tuple list.
* e.g. expectedTwoDimensionObjects is [{{10, "will_join", 10, "will_join"}, {11, "will_not_join", null}, {null, 12, "will_not_join"}}],
* the field size of these 3 tuples are [4,3,3]
*
* @param actualResultsIt
* @param expectedTwoDimensionObjects represents a tuple list, in which the tuple can have variable size.
*/
static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt,
Object[][] expectedTwoDimensionObjects) {
List<Tuple> expectedResTupleList = new ArrayList<Tuple>();
for (int i = 0; i < expectedTwoDimensionObjects.length; ++i) {
Tuple t = TupleFactory.getInstance().newTuple();
for (int j = 0; j < expectedTwoDimensionObjects[i].length; ++j) {
t.append(expectedTwoDimensionObjects[i][j]);
}
expectedResTupleList.add(t);
}
checkQueryOutputsAfterSort(actualResultsIt, expectedResTupleList);
}
static public void checkQueryOutputsAfterSort(
List<Tuple> actualResList, List<Tuple> expectedResList) {
Collections.sort(actualResList);
Collections.sort(expectedResList);
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
}
/**
* Check if subStr is a subString of str . calls org.junit.Assert.fail if it is not
* @param str
* @param subStr
*/
static public void checkStrContainsSubStr(String str, String subStr){
if(!str.contains(subStr)){
fail("String '"+ subStr + "' is not a substring of '" + str + "'");
}
}
/**
* Check if query plan for alias argument produces exception with expected
* error message in expectedErr argument.
* @param query
* @param alias
* @param expectedErr
* @throws IOException
*/
static public void checkExceptionMessage(String query, String alias, String expectedErr)
throws IOException {
PigServer pig = new PigServer(ExecType.LOCAL);
boolean foundEx = false;
try{
Util.registerMultiLineQuery(pig, query);
pig.explain(alias, System.out);
}catch(FrontendException e){
foundEx = true;
checkMessageInException(e, expectedErr);
}
if(!foundEx)
fail("No exception thrown. Exception is expected.");
}
public static void checkMessageInException(FrontendException e,
String expectedErr) {
PigException pigEx = LogUtils.getPigException(e);
String message = pigEx.getMessage();
checkErrorMessageContainsExpected(message, expectedErr);
}
public static void checkErrorMessageContainsExpected(String message, String expectedMessage){
if(!message.contains(expectedMessage)){
String msg = "Expected error message containing '"
+ expectedMessage + "' but got '" + message + "'" ;
fail(msg);
}
}
static private String getFSMkDirCommand(String fileName) {
Path parentDir = new Path(fileName).getParent();
String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
return mkdirCommand;
}
/**
* Utility method to copy a file form local filesystem to the dfs on
* the minicluster for testing in mapreduce mode
* @param cluster a reference to the minicluster
* @param localFileName the pathname of local file
* @param fileNameOnCluster the name with which the file should be created on the minicluster
* @throws IOException
*/
static public void copyFromLocalToCluster(MiniGenericCluster cluster,
String localFileName, String fileNameOnCluster) throws IOException {
if(Util.WINDOWS){
if (!localFileName.contains(":")) {
localFileName = localFileName.replace('\\','/');
} else {
localFileName = localFileName.replace('/','\\');
}
fileNameOnCluster = fileNameOnCluster.replace('\\','/');
}
PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
String script = getFSMkDirCommand(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
GruntParser parser = new GruntParser(new StringReader(script), ps);
parser.setInteractive(false);
try {
parser.parseStopOnError();
} catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
throw new IOException(e);
}
}
static public void copyFromLocalToLocal(String fromLocalFileName,
String toLocalFileName) throws IOException {
FileUtils.copyFile(new File(fromLocalFileName), new File(toLocalFileName));
}
static public void copyFromClusterToLocal(MiniGenericCluster cluster,
String fileNameOnCluster, String localFileName) throws IOException {
if(Util.WINDOWS){
fileNameOnCluster = fileNameOnCluster.replace('\\','/');
localFileName = localFileName.replace('\\','/');
}
File parent = new File(localFileName).getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
PrintWriter writer = new PrintWriter(new FileWriter(localFileName));
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
cluster.getProperties()));
if(!fs.exists(new Path(fileNameOnCluster))) {
throw new IOException("File " + fileNameOnCluster + " does not exists on the minicluster");
}
String line = null;
FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster));
if(fst.isDir()) {
throw new IOException("Only files from cluster can be copied locally," +
" " + fileNameOnCluster + " is a directory");
}
FSDataInputStream stream = fs.open(new Path(fileNameOnCluster));
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
while( (line = reader.readLine()) != null) {
writer.println(line);
}
reader.close();
writer.close();
}
static public void printQueryOutput(Iterator<Tuple> actualResults,
Tuple[] expectedResults) {
System.out.println("Expected :") ;
for (Tuple expected : expectedResults) {
System.out.println(expected.toString()) ;
}
System.out.println("---End----") ;
System.out.println("Actual :") ;
while (actualResults.hasNext()) {
System.out.println(actualResults.next().toString()) ;
}
System.out.println("---End----") ;
}
/**
* Helper method to replace all occurrences of "\" with "\\" in a
* string. This is useful to fix the file path string on Windows
* where "\" is used as the path separator.
*
* @param str Any string
* @return The resulting string
*/
public static String encodeEscape(String str) {
String regex = "\\\\";
String replacement = quoteReplacement("\\\\");
return str.replaceAll(regex, replacement);
}
public static String generateURI(String filename, PigContext context)
throws IOException {
if(Util.WINDOWS){
filename = filename.replace('\\','/');
}
if (context.getExecType() == ExecType.MAPREDUCE || context.getExecType().name().equals("TEZ") ||
context.getExecType().name().equals("SPARK")) {
return FileLocalizer.hadoopify(filename, context);
} else if (context.getExecType().isLocal()) {
return filename;
} else {
throw new IllegalStateException("ExecType: " + context.getExecType());
}
}
public static Object getPigConstant(String pigConstantAsString) throws ParserException {
QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
"util", new HashMap<String, String>() ) ;
return queryParser.parseConstant(pigConstantAsString);
}
/**
* Parse list of strings in to list of tuples, convert quoted strings into
* @param tupleConstants
* @return
* @throws ParserException
*/
public static List<Tuple> getTuplesFromConstantTupleStrings(String[] tupleConstants) throws ParserException {
List<Tuple> result = new ArrayList<Tuple>(tupleConstants.length);
for(int i = 0; i < tupleConstants.length; i++) {
result.add((Tuple) getPigConstant(tupleConstants[i]));
}
return result;
}
/**
* Parse list of strings in to list of tuples, convert quoted strings into
* DataByteArray
* @param tupleConstants
* @return
* @throws ParserException
* @throws ExecException
*/
public static List<Tuple> getTuplesFromConstantTupleStringAsByteArray(String[] tupleConstants)
throws ParserException, ExecException {
List<Tuple> tuples = getTuplesFromConstantTupleStrings(tupleConstants);
for(Tuple t : tuples){
convertStringToDataByteArray(t);
}
return tuples;
}
/**
* Convert String objects in argument t to DataByteArray objects
* @param t
* @throws ExecException
*/
private static void convertStringToDataByteArray(Tuple t) throws ExecException {
if(t == null)
return;
for(int i=0; i<t.size(); i++){
Object col = t.get(i);
if(col == null)
continue;
if(col instanceof String){
DataByteArray dba = (col == null) ?
null : new DataByteArray((String)col);
t.set(i, dba);
}else if(col instanceof Tuple){
convertStringToDataByteArray((Tuple)col);
}else if(col instanceof DataBag){
Iterator<Tuple> it = ((DataBag)col).iterator();
while(it.hasNext()){
convertStringToDataByteArray(it.next());
}
}
}
}
public static File createFile(String[] data) throws Exception{
return createFile(null,data);
}
public static File createFile(String filePath, String[] data) throws Exception {
File f;
if( null == filePath || filePath.isEmpty() ) {
f = File.createTempFile("tmp", "");
} else {
f = new File(filePath);
}
if (f.getParent() != null && !(new File(f.getParent())).exists()) {
(new File(f.getParent())).mkdirs();
}
f.deleteOnExit();
PrintWriter pw = new PrintWriter(f);
for (int i=0; i<data.length; i++){
pw.println(data[i]);
}
pw.close();
return f;
}
/**
* Run default set of optimizer rules on new logical plan
* @param lp
* @return optimized logical plan
* @throws FrontendException
*/
public static LogicalPlan optimizeNewLP(
LogicalPlan lp)
throws FrontendException{
DanglingNestedNodeRemover DanglingNestedNodeRemover = new DanglingNestedNodeRemover( lp );
DanglingNestedNodeRemover.visit();
UidResetter uidResetter = new UidResetter( lp );
uidResetter.visit();
SchemaResetter schemaResetter =
new SchemaResetter( lp, true /*disable duplicate uid check*/ );
schemaResetter.visit();
StoreAliasSetter storeAliasSetter = new StoreAliasSetter( lp );
storeAliasSetter.visit();
// run optimizer
org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer =
new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(lp, 100, null);
optimizer.optimize();
SortInfoSetter sortInfoSetter = new SortInfoSetter( lp );
sortInfoSetter.visit();
return lp;
}
/**
* migrate old LP(logical plan) to new LP, optimize it, and build physical
* plan
* @param lp
* @param pc PigContext
* @return physical plan
* @throws Exception
*/
public static PhysicalPlan buildPhysicalPlanFromNewLP(
LogicalPlan lp, PigContext pc)
throws Exception {
LogToPhyTranslationVisitor visitor = new LogToPhyTranslationVisitor(lp);
visitor.setPigContext(pc);
visitor.visit();
return visitor.getPhysicalPlan();
}
public static MROperPlan buildMRPlan(PhysicalPlan pp, PigContext pc) throws Exception{
MRCompiler comp = new MRCompiler(pp, pc);
comp.compile();
comp.aggregateScalarsFiles();
comp.connectSoftLink();
return comp.getMRPlan();
}
public static MROperPlan buildMRPlanWithOptimizer(PhysicalPlan pp, PigContext pc) throws Exception {
MapRedUtil.checkLeafIsStore(pp, pc);
MapReduceLauncher launcher = new MapReduceLauncher();
return launcher.compile(pp,pc);
}
public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception {
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
PhysicalPlan pp = Util.buildPhysicalPlanFromNewLP(lp, pc);
MROperPlan mrp = Util.buildMRPlanWithOptimizer(pp, pc);
return mrp;
}
public static void registerMultiLineQuery(PigServer pigServer, String query) throws IOException {
File f = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f);
pw.println(query);
pw.close();
pigServer.registerScript(f.getCanonicalPath());
}
public static int executeJavaCommand(String cmd) throws Exception {
return executeJavaCommandAndReturnInfo(cmd).exitCode;
}
public static class ReadStream implements Runnable {
InputStream is;
Thread thread;
String message = "";
public ReadStream(InputStream is) {
this.is = is;
}
public void start () {
thread = new Thread (this);
thread.start ();
}
@Override
public void run () {
try {
InputStreamReader isr = new InputStreamReader (is);
BufferedReader br = new BufferedReader (isr);
while (true) {
String s = br.readLine ();
if (s == null) break;
if (!message.isEmpty()) {
message += "\n";
}
message += s;
}
is.close ();
} catch (Exception ex) {
ex.printStackTrace ();
}
}
public String getMessage() {
return message;
}
}
public static ProcessReturnInfo executeJavaCommandAndReturnInfo(String cmd)
throws Exception {
String javaHome = System.getenv("JAVA_HOME");
if(javaHome != null) {
String fileSeparator = System.getProperty("file.separator");
cmd = javaHome + fileSeparator + "bin" + fileSeparator + cmd;
}
Process cmdProc = Runtime.getRuntime().exec(cmd);
ProcessReturnInfo pri = new ProcessReturnInfo();
ReadStream stdoutStream = new ReadStream(cmdProc.getInputStream ());
ReadStream stderrStream = new ReadStream(cmdProc.getErrorStream ());
stdoutStream.start();
stderrStream.start();
cmdProc.waitFor();
pri.exitCode = cmdProc.exitValue();
pri.stdoutContents = stdoutStream.getMessage();
pri.stderrContents = stderrStream.getMessage();
return pri;
}
public static class ProcessReturnInfo {
public int exitCode;
public String stderrContents;
public String stdoutContents;
@Override
public String toString() {
return "[Exit code: " + exitCode + ", stdout: <" + stdoutContents + ">, " +
"stderr: <" + stderrContents + ">";
}
}
static public boolean deleteDirectory(File path) {
if(path.exists()) {
File[] files = path.listFiles();
for(int i=0; i<files.length; i++) {
if(files[i].isDirectory()) {
deleteDirectory(files[i]);
}
else {
files[i].delete();
}
}
}
return(path.delete());
}
/**
* @param pigContext
* @param fileName
* @param input
* @throws IOException
*/
public static void createInputFile(PigContext pigContext,
String fileName, String[] input) throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
createInputFile(FileSystem.get(conf), fileName, input);
}
public static String[] readOutput(PigContext pigContext,
String fileName) throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
return readOutput(FileSystem.get(conf), fileName);
}
public static void printPlan(LogicalPlan logicalPlan ) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(out);
LogicalPlanPrinter pp = new LogicalPlanPrinter(logicalPlan,ps);
pp.visit();
System.err.println(out.toString());
}
public static void printPlan(PhysicalPlan physicalPlan) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(out);
physicalPlan.explain(ps, "text", true);
System.err.println(out.toString());
}
public static List<Tuple> readFile2TupleList(String file, String delimiter) throws IOException{
List<Tuple> tuples=new ArrayList<Tuple>();
String line=null;
BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(file)));
while((line=reader.readLine())!=null){
String[] tokens=line.split(delimiter);
Tuple tuple=TupleFactory.getInstance().newTuple(Arrays.asList(tokens));
tuples.add(tuple);
}
reader.close();
return tuples;
}
/**
* Delete the existing logFile for the class and set the logging to a
* use a new log file and set log level to DEBUG
* @param clazz class for which the log file is being set
* @param logFile current log file
* @return new log file
* @throws Exception
*/
public static File resetLog(Class<?> clazz, File logFile) throws Exception {
if (logFile != null)
logFile.delete();
Logger logger = Logger.getLogger(clazz);
logger.removeAllAppenders();
logger.setLevel(Level.DEBUG);
SimpleLayout layout = new SimpleLayout();
File newLogFile = File.createTempFile("log", "");
FileAppender appender = new FileAppender(layout, newLogFile.toString(),
false, false, 0);
logger.addAppender(appender);
return newLogFile;
}
/**
* Check if logFile (does not/)contains the given list of messages.
* @param logFile
* @param messages
* @param expected if true, the messages are expected in the logFile,
* otherwise messages should not be there in the log
*/
public static void checkLogFileMessage(File logFile, String[] messages, boolean expected) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(logFile));
String logMessage = "";
String line;
while ((line = reader.readLine()) != null) {
logMessage = logMessage + line + "\n";
}
reader.close();
for (int i = 0; i < messages.length; i++) {
boolean present = logMessage.contains(messages[i]);
if (expected) {
if(!present){
fail("The message " + messages[i] + " is not present in" +
"log file contents: " + logMessage);
}
}else{
if(present){
fail("The message " + messages[i] + " is present in" +
"log file contents: " + logMessage);
}
}
}
return ;
}
catch (IOException e) {
fail("caught exception while checking log message :" + e);
}
}
public static LogicalPlan buildLp(PigServer pigServer, String query)
throws Exception {
pigServer.setBatchOn();
pigServer.registerQuery( query );
java.lang.reflect.Method buildLp = pigServer.getClass().getDeclaredMethod("buildLp");
buildLp.setAccessible(true);
return (LogicalPlan ) buildLp.invoke( pigServer );
}
public static PhysicalPlan buildPp(PigServer pigServer, String query)
throws Exception {
LogicalPlan lp = buildLp( pigServer, query );
lp.optimize(pigServer.getPigContext());
return ((HExecutionEngine)pigServer.getPigContext().getExecutionEngine()).compile(lp,
pigServer.getPigContext().getProperties());
}
public static LogicalPlan parse(String query, PigContext pc) throws FrontendException {
Map<String, String> fileNameMap = new HashMap<String, String>();
QueryParserDriver parserDriver = new QueryParserDriver( pc, "test", fileNameMap );
org.apache.pig.newplan.logical.relational.LogicalPlan lp = parserDriver.parse( query );
lp.validate(pc, "test", false);
return lp;
}
public static LogicalPlan parseAndPreprocess(String query, PigContext pc) throws FrontendException {
Map<String, String> fileNameMap = new HashMap<String, String>();
QueryParserDriver parserDriver = new QueryParserDriver( pc, "test", fileNameMap );
org.apache.pig.newplan.logical.relational.LogicalPlan lp = parserDriver.parse( query );
lp.validate(pc, "test", false);
return lp;
}
/**
* Replaces any alias in given schema that has name that starts with
* "NullAlias" with null . it does a case insensitive comparison of
* the alias name
* @param sch
*/
public static void schemaReplaceNullAlias(Schema sch){
if(sch == null)
return ;
for(FieldSchema fs : sch.getFields()){
if(fs.alias != null && fs.alias.toLowerCase().startsWith("nullalias")){
fs.alias = null;
}
schemaReplaceNullAlias(fs.schema);
}
}
static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt,
Tuple[] expectedResArray) {
List<Tuple> list = new ArrayList<Tuple>();
Collections.addAll(list, expectedResArray);
checkQueryOutputsAfterSort(actualResultsIt, list);
}
static public void convertBagToSortedBag(Tuple t) {
for (int i=0;i<t.size();i++) {
Object obj = null;
try {
obj = t.get(i);
} catch (ExecException e) {
// shall not happen
}
if (obj instanceof DataBag) {
DataBag bag = (DataBag)obj;
Iterator<Tuple> iter = bag.iterator();
DataBag sortedBag = DefaultBagFactory.getInstance().newSortedBag(null);
while (iter.hasNext()) {
Tuple t2 = iter.next();
sortedBag.add(t2);
convertBagToSortedBag(t2);
}
try {
t.set(i, sortedBag);
} catch (ExecException e) {
// shall not happen
}
}
}
}
static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
String[] expectedResArray, String schemaString) throws IOException {
LogicalSchema resultSchema = org.apache.pig.impl.util.Utils.parseSchema(schemaString);
checkQueryOutputsAfterSortRecursive(actualResultsIt, expectedResArray, resultSchema);
}
/**
* Helper function to check if the result of a Pig Query is in line with
* expected results. It sorts actual and expected string results before comparison
*
* @param actualResultsIt Result of the executed Pig query
* @param expectedResArray Expected string results to validate against
* @param fs fieldSchema of expecteResArray
* @throws IOException
*/
static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
String[] expectedResArray, LogicalSchema schema) throws IOException {
LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, DataType.TUPLE);
ResourceFieldSchema rfs = new ResourceFieldSchema(fs);
LoadCaster caster = new Utf8StorageConverter();
List<Tuple> actualResList = new ArrayList<Tuple>();
while(actualResultsIt.hasNext()){
actualResList.add(actualResultsIt.next());
}
List<Tuple> expectedResList = new ArrayList<Tuple>();
for (String str : expectedResArray) {
Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs);
expectedResList.add(newTuple);
}
for (Tuple t : actualResList) {
convertBagToSortedBag(t);
}
for (Tuple t : expectedResList) {
convertBagToSortedBag(t);
}
Collections.sort(actualResList);
Collections.sort(expectedResList);
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
}
public static String readFile(File file) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(file));
String result = "";
String line;
while ((line=reader.readLine())!=null) {
result += line;
result += "\n";
}
reader.close();
return result;
}
/**
* this removes the signature from the serialized plan changing the way the
* unique signature is generated should not break this test
* @param plan the plan to canonicalize
* @return the cleaned up plan
*/
public static String removeSignature(String plan) {
return plan.replaceAll("','','[^']*','scope','true'\\)\\)", "','','','scope','true'))");
}
public static boolean isHadoop203plus() {
String version = org.apache.hadoop.util.VersionInfo.getVersion();
if (version.matches("\\b0\\.20\\.2\\b"))
return false;
return true;
}
public static boolean isHadoop205() {
String version = org.apache.hadoop.util.VersionInfo.getVersion();
if (version.matches("\\b0\\.20\\.205\\..+"))
return true;
return false;
}
public static boolean isHadoop1_x() {
String version = org.apache.hadoop.util.VersionInfo.getVersion();
if (version.matches("\\b1\\.*\\..+"))
return true;
return false;
}
public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, boolean toSort){
if( toSort == true) {
for (Tuple t : actualResList) {
Util.convertBagToSortedBag(t);
}
Collections.sort(actualResList);
}
}
public static void checkQueryOutputs(Iterator<Tuple> actualResults, List<Tuple> expectedResults, boolean checkAfterSort) {
if (checkAfterSort) {
checkQueryOutputsAfterSort(actualResults, expectedResults);
} else {
checkQueryOutputs(actualResults, expectedResults);
}
}
static public void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
String[] expectedResArray, LogicalSchema schema, boolean
checkAfterSort) throws IOException {
if (checkAfterSort) {
checkQueryOutputsAfterSortRecursive(actualResultsIt,
expectedResArray, schema);
} else {
checkQueryOutputs(actualResultsIt,
expectedResArray, schema);
}
}
static void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
String[] expectedResArray, LogicalSchema schema) throws IOException {
LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, DataType.TUPLE);
ResourceFieldSchema rfs = new ResourceFieldSchema(fs);
LoadCaster caster = new Utf8StorageConverter();
List<Tuple> actualResList = new ArrayList<Tuple>();
while (actualResultsIt.hasNext()) {
actualResList.add(actualResultsIt.next());
}
List<Tuple> expectedResList = new ArrayList<Tuple>();
for (String str : expectedResArray) {
Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs);
expectedResList.add(newTuple);
}
for (Tuple t : actualResList) {
convertBagToSortedBag(t);
}
for (Tuple t : expectedResList) {
convertBagToSortedBag(t);
}
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
}
public static void assertParallelValues(long defaultParallel,
long requestedParallel,
long estimatedParallel,
long runtimeParallel,
Configuration conf) {
assertConfLong(conf, "pig.info.reducers.default.parallel", defaultParallel);
assertConfLong(conf, "pig.info.reducers.requested.parallel", requestedParallel);
assertConfLong(conf, "pig.info.reducers.estimated.parallel", estimatedParallel);
assertConfLong(conf, MRConfiguration.REDUCE_TASKS, runtimeParallel);
}
public static void assertConfLong(Configuration conf, String param, long expected) {
assertEquals("Unexpected value found in configs for " + param, expected, conf.getLong(param, -1));
}
/**
* Returns a PathFilter that filters out filenames that start with _.
* @return PathFilter
*/
public static PathFilter getSuccessMarkerPathFilter() {
return new PathFilter() {
@Override
public boolean accept(Path p) {
return !p.getName().startsWith("_");
}
};
}
/**
*
* @param expected
* Exception class that is expected to be thrown
* @param found
* Exception that occurred in the test
* @param message
* expected String to verify against
*/
public static void assertExceptionAndMessage(Class<?> expected,
Exception found, String message) {
assertEquals(expected, found.getClass());
assertEquals(found.getMessage(), message);
}
/**
* Called to reset ThreadLocal or static states that PigServer depends on
* when a test suite has testcases switching between LOCAL and MAPREDUCE/TEZ
* execution modes
*/
public static void resetStateForExecModeSwitch() {
FileLocalizer.setInitialized(false);
// For tez testing, we want to avoid TezResourceManager/LocalResource reuse
// (when switching between local and mapreduce/tez)
TezResourceManager.dropInstance();
// TODO: once we have Tez local mode, we can get rid of this. For now,
// if we run this test suite in Tez mode and there are some tests
// in LOCAL mode, we need to set ScriptState to
// null to force ScriptState gets initialized every time.
ScriptState.start(null);
}
public static boolean isMapredExecType(ExecType execType) {
return execType == ExecType.MAPREDUCE;
}
public static boolean isTezExecType(ExecType execType) {
if (execType.name().toLowerCase().startsWith("tez")) {
return true;
}
return false;
}
public static boolean isSparkExecType(ExecType execType) {
if (execType.name().toLowerCase().startsWith("spark")) {
return true;
}
return false;
}
public static String findPigJarName() {
final String suffix = System.getProperty("hadoopversion").equals("20") ? "1" : "2";
File baseDir = new File(".");
String[] jarNames = baseDir.list(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
if (!name.matches("pig.*h" + suffix + "\\.jar")) {
return false;
}
if (name.contains("all")) {
return false;
}
return true;
}
});
if (jarNames==null || jarNames.length!=1) {
throw new RuntimeException("Cannot find pig.jar");
}
return jarNames[0];
}
public static ExecType getLocalTestMode() throws Exception {
String execType = System.getProperty("test.exec.type");
if (execType != null) {
if (execType.equals("tez")) {
return ExecTypeProvider.fromString("tez_local");
} else if (execType.equals("spark")) {
return ExecTypeProvider.fromString("spark_local");
}
}
return ExecTypeProvider.fromString("local");
}
public static void createLogAppender(String appenderName, Writer writer, Class...clazzes) {
WriterAppender writerAppender = new WriterAppender(new PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
writerAppender.setName(appenderName);
for (Class clazz : clazzes) {
Logger logger = Logger.getLogger(clazz);
logger.addAppender(writerAppender);
}
}
public static void removeLogAppender(String appenderName, Class...clazzes) {
for (Class clazz : clazzes) {
Logger logger = Logger.getLogger(clazz);
Appender appender = logger.getAppender(appenderName);
appender.close();
logger.removeAppender(appenderName);
}
}
public static Path getFirstPartFile(Path path) throws Exception {
FileStatus[] parts = FileSystem.get(path.toUri(), new Configuration()).listStatus(path,
new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith("part-");
}
});
return parts[0].getPath();
}
public static File getFirstPartFile(File dir) throws Exception {
File[] parts = dir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("part-");
};
});
return parts[0];
}
@SuppressWarnings("rawtypes")
public static String getTestDirectory(Class testClass) {
return TEST_DIR + Path.SEPARATOR + "testdata" + Path.SEPARATOR +testClass.getSimpleName();
}
}