blob: 9b8be48eac6f9a74379e1fdf8c4d41cecc6dc4d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test.utils;
import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
/**
* Will contain static methods that will be useful
* for unit tests
*
*/
public class TestHelper {
public static int dispAfterNumTuples = 1000;
public static boolean bagContains(DataBag db, Tuple t) {
Iterator<Tuple> iter = db.iterator();
for (Tuple tuple : db) {
if (tuple.compareTo(t) == 0 || tupleEquals(tuple, t))
return true;
}
return false;
}
public static boolean compareBags(DataBag db1, DataBag db2) {
if (db1.size() != db2.size())
return false;
boolean equal = true;
for (Tuple tuple : db2) {
boolean contains = false;
for (Tuple tuple2 : db1) {
if (tuple.compareTo(tuple2) == 0) {
contains = true;
break;
}
}
if (!contains) {
equal = false;
break;
}
}
return equal;
}
public static boolean compareCogroupOutputs(DataBag db1, DataBag db2) throws ExecException{
if (db1.size() != db2.size())
return false;
Map<Object,DataBag> first = new HashMap<Object, DataBag>();
for (Tuple t : db1)
first.put(t.get(0), (DataBag)t.get(1));
Map<Object,DataBag> second = new HashMap<Object, DataBag>();
for (Tuple t : db2)
second.put(t.get(0), (DataBag)t.get(1));
Set<Entry<Object,DataBag>> entrySet = first.entrySet();
for (Entry<Object,DataBag> entry : entrySet){
Object key = entry.getKey();
DataBag bagOfSecond = second.get(key);
if(bagOfSecond == null)
return false;
boolean cmpVal = compareBags(bagOfSecond, entry.getValue());
if(cmpVal == false)
return false;
}
return true;
}
public static DataBag projectBag(DataBag db2, int i) throws ExecException {
DataBag ret = DefaultBagFactory.getInstance().newDefaultBag();
for (Tuple tuple : db2) {
Object o = tuple.get(i);
Tuple t1 = new DefaultTuple();
t1.append(o);
ret.add(t1);
}
return ret;
}
public static DataBag projectBag(DataBag db2, int[] fields) throws ExecException {
DataBag ret = DefaultBagFactory.getInstance().newDefaultBag();
for (Tuple tuple : db2) {
Tuple t1 = new DefaultTuple();
for (int fld : fields) {
Object o = tuple.get(fld);
t1.append(o);
}
ret.add(t1);
}
return ret;
}
public static int compareInputStreams(InputStream exp, InputStream act) throws IOException{
byte[] bExp = new byte[4096], bAct = new byte[4096];
int outLen,inLen = -1;
while(act.read(bAct)!=-1){
exp.read(bExp);
int cmp = compareByteArray(bExp, bAct);
if(cmp!=0)
return cmp;
}
return 0;
}
public static int compareByteArray(byte[] b1, byte[] b2){
if(b1.length>b2.length)
return 1;
else if(b1.length<b2.length)
return -1;
for(int i=0;i<b1.length;i++){
if(b1[i]>b2[i])
return 1;
else if(b1[i]<b2[i])
return -1;
}
return 0;
}
/*public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc, int dispAftNumTuples) throws ExecException, IOException{
Random r = new Random();
POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong()));
ldExp.setPc(pc);
ldExp.setLFile(expLocal);
POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong()));
ldAct.setPc(pc);
ldAct.setLFile(actHadoop);
Tuple t = null;
int numActTuples = -1;
boolean matches = true;
for(Result resAct=ldAct.getNext(t);resAct.returnStatus!=POStatus.STATUS_EOP;resAct=ldAct.getNext(t)){
Tuple tupAct = (Tuple)resAct.result;
++numActTuples;
boolean found = false;
for(Result resExp=ldExp.getNext(t);resExp.returnStatus!=POStatus.STATUS_EOP;resExp=ldExp.getNext(t)){
Tuple tupExp = (Tuple)resExp.result;
if(tupAct.compareTo(tupExp)==0){
found = true;
ldExp.tearDown();
break;
}
}
if(!found){
matches = false;
break;
}
if(numActTuples%dispAftNumTuples ==0)
System.out.println(numActTuples/dispAftNumTuples);
}
int numExpTuples = -1;
while(ldExp.getNext(t).returnStatus!=POStatus.STATUS_EOP)
++numExpTuples;
return (matches && numActTuples==numExpTuples);
}*/
public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc) throws ExecException, IOException{
Random r = new Random();
POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong()));
ldExp.setPc(pc);
ldExp.setLFile(expLocal);
POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong()));
ldAct.setPc(pc);
ldAct.setLFile(actHadoop);
Tuple t = null;
int numActTuples = -1;
DataBag bagAct = DefaultBagFactory.getInstance().newDefaultBag();
Result resAct = null;
while((resAct = ldAct.getNextTuple()).returnStatus!=POStatus.STATUS_EOP){
++numActTuples;
bagAct.add(trimTuple((Tuple)resAct.result));
}
int numExpTuples = -1;
DataBag bagExp = DefaultBagFactory.getInstance().newDefaultBag();
Result resExp = null;
while((resExp = ldExp.getNextTuple()).returnStatus!=POStatus.STATUS_EOP){
++numExpTuples;
bagExp.add(trimTuple((Tuple)resExp.result));
}
if(numActTuples!=numExpTuples)
return false;
return compareBags(bagExp, bagAct);
}
private static Tuple trimTuple(Tuple t){
Tuple ret = TupleFactory.getInstance().newTuple();
for (Object o : t.getAll()) {
DataByteArray dba = (DataByteArray)o;
DataByteArray nDba = new DataByteArray(dba.toString().trim().getBytes());
ret.append(nDba);
}
return ret;
}
/**
* Create temp file from a given dataset
* This assumes
* 1) The dataset has at least 1 record
* 2) All records are of the same size
*/
public static File createTempFile(String[][] data) throws IOException {
File fp1 = File.createTempFile("test", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(fp1));
for(int i = 0; i < data.length ; i++) {
// Building up string for each line
StringBuilder sb = new StringBuilder() ;
for(int j = 0 ; j < data[0].length ; j++) {
if (j != 0) {
sb.append("\t") ;
}
sb.append(data[i][j]) ;
}
// Write the line to file
ps.println(sb.toString());
}
ps.close();
return fp1 ;
}
//a quick way to check for map equality as the map value returned by PigStorage has byte array
public static boolean mapEquals(Map<String, Object> expectedMap, Map<String, Object> convertedMap) {
if(expectedMap == null) {
if(convertedMap != null) {
return false;
}
} else {
if (convertedMap == null) {
return false;
}
}
if(expectedMap.size() != convertedMap.size()) {
return false;
}
for(String key: expectedMap.keySet()) {
Object v = convertedMap.get(key);
String convertedValue = new String(((DataByteArray)v).get());
if(!expectedMap.get(key).toString().equals(convertedValue)) {
return false;
}
}
return true;
}
@SuppressWarnings("unchecked")
public static boolean tupleEquals(Tuple expectedTuple, Tuple convertedTuple) {
if(expectedTuple == null) {
if(convertedTuple != null) {
return false;
}
} else {
if(convertedTuple == null) {
return false;
}
}
if(expectedTuple.size() != convertedTuple.size()) {
return false;
}
for(int i = 0; i < expectedTuple.size(); ++i) {
Object e ;
Object c ;
try {
e = expectedTuple.get(i);
c = convertedTuple.get(i);
} catch (Exception e1) {
return false;
}
if(e instanceof Map) {
Map<String, Object> eMap = (Map<String, Object>)e;
if(c instanceof Map) {
Map<String, Object> cMap = (Map<String, Object>)c;
if(!mapEquals(eMap, cMap)) {
return false;
}
} else {
return false;
}
} else if (e instanceof Tuple) {
if(c instanceof Tuple) {
if(!tupleEquals((Tuple)e, (Tuple)c)) {
return false;
}
} else {
return false;
}
} else if (e instanceof DataBag){
if(c instanceof DataBag) {
if(!bagEquals((DataBag)e, (DataBag)c)) {
return false;
}
} else {
return false;
}
} else {
if(e == null) {
if(c != null) {
return false;
}
} else {
if(c == null) {
return false;
} else {
if(!e.equals(c)) {
return false;
}
}
}
}
}
return true;
}
public static boolean bagEquals(DataBag expectedBag, DataBag convertedBag) {
if(expectedBag == null) {
if(convertedBag != null) {
return false;
}
} else {
if(convertedBag == null) {
return false;
}
}
if(expectedBag.size() != convertedBag.size()) {
return false;
}
Iterator<Tuple> expectedBagIterator = expectedBag.iterator();
Iterator<Tuple> convertedBagIterator = convertedBag.iterator();
while(expectedBagIterator.hasNext()) {
Tuple expectedBagTuple = expectedBagIterator.next();
Tuple convertedBagTuple = convertedBagIterator.next();
if(!tupleEquals(expectedBagTuple, convertedBagTuple)) {
return false;
}
}
return true;
}
/**
* Find out the string which matches "regex" from "target", and sort the string with spacial
* order. The string elements are split by "split".
*/
public static String sortString(String regex, String target, String split) {
Pattern p = Pattern.compile(regex);
Matcher matcher = p.matcher(target);
String original = null;
String replaceString = new String();
if (matcher.find()) {
original = matcher.group(1);
String[] out = original.split(split);
Collections.sort(Arrays.asList(out));
for (int j = 0; j < out.length; j++) {
replaceString += (j > 0 ? ", " + out[j] : out[j]);
}
return target.replace(original, replaceString);
}
return target;
}
/**
* Sort UDFs for golden plan
*/
public static String sortUDFs(String goldenString) {
String regex = "MapReduce\\([0-9]*\\,(.*)\\) - -[0-9]*\\:";
String[] goldenArray = goldenString.split("\n");
for (int i = 0; i < goldenArray.length; i++) {
goldenString = goldenString.replace(goldenArray[i],
sortString(regex, goldenArray[i], ","));
}
return goldenString;
}
/**
* sort subFields for LogMessages
*/
public static List<String> sortSubFields(List<String> logMessages) {
String regex = "\\[(.*)\\]";
for (int i = 0; i < logMessages.size(); i++) {
logMessages.set(i, sortString(regex, logMessages.get(i), ", "));
}
return logMessages;
}
public static String sortStringList(String text, String delimiter1, String delimiter2, String separator){
Pattern pattern = Pattern.compile(String.format("(\\%s.*?\\%s)", delimiter1, delimiter2));
Matcher matcher = pattern.matcher(text);
String sortedString = text;
while (matcher.find()) {
// split and sort the list
String value = matcher.group(1);
value = value.substring(1,value.length()-1);
String[] sortedList = value.split(separator);
Arrays.sort(sortedList);
// pretty-print the output and replace the unsorted string with the sorted one
String sorted = Arrays.asList(sortedList).toString().replaceAll("(^.|.$)", "").replace(", ", separator);
sorted = String.format("%s%s%s", delimiter1, sorted, delimiter2);
sortedString = sortedString.replace(matcher.group(1), sorted);
}
return sortedString;
}
}