blob: e24f24ca43b36f3a33551fe5b5cf3c98b64f4322 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.QueryParserDriver;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.TestHelper;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
public class TestLoad {
PigContext pc;
PigServer[] servers;
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private static final String WORKING_DIR = "/tmp/test" + java.util.UUID.randomUUID();
@Before
public void setUp() throws Exception {
FileLocalizer.deleteTempFiles();
servers = new PigServer[] {
new PigServer(cluster.getExecType(), cluster.getProperties()),
new PigServer(Util.getLocalTestMode(), new Properties())
};
}
@Test
public void testGetNextTuple() throws IOException {
pc = servers[0].getPigContext();
String curDir = System.getProperty("user.dir");
String inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
// copy passwd file to cluster and set that as the input location for the load
Util.copyFromLocalToCluster(cluster, inpDir + "passwd", "passwd");
FileSpec inpFSpec = new FileSpec("passwd", new FuncSpec(PigStorage.class.getName(), new String[]{":"}));
POLoad ld = GenPhyOp.topLoadOp();
ld.setLFile(inpFSpec);
ld.setPc(pc);
DataBag inpDB = DefaultBagFactory.getInstance().newDefaultBag();
BufferedReader br = new BufferedReader(new FileReader("test/org/apache/pig/test/data/InputFiles/passwd"));
for(String line = br.readLine();line!=null;line=br.readLine()){
String[] flds = line.split(":",-1);
Tuple t = new DefaultTuple();
for (String fld : flds) {
t.append((fld.compareTo("")!=0 ? new DataByteArray(fld.getBytes()) : null));
}
inpDB.add(t);
}
Tuple t=null;
int size = 0;
for(Result res = ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple()){
assertEquals(true, TestHelper.bagContains(inpDB, (Tuple)res.result));
++size;
}
assertEquals(true, size==inpDB.size());
}
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Test
public void testLoadRemoteRel() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("test", WORKING_DIR + "/test");
}
}
@Test
public void testLoadRemoteAbs() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
boolean noConversionExpected = true;
checkLoadPath(WORKING_DIR + "/test", WORKING_DIR + "/test", noConversionExpected);
}
}
@Test
public void testLoadRemoteRelScheme() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("test", WORKING_DIR + "/test");
}
}
@Test
public void testLoadRemoteAbsScheme() throws Exception {
pc = servers[0].getPigContext();
boolean noConversionExpected = true;
checkLoadPath("hdfs:" + WORKING_DIR + "/test","hdfs:" + WORKING_DIR + "/test", noConversionExpected);
// check if a location 'hdfs:<abs path>' can actually be read using PigStorage
String[] inputFileNames = new String[] {
WORKING_DIR + "/TestLoad-testLoadRemoteAbsSchema-input.txt"};
testLoadingMultipleFiles(inputFileNames, "hdfs:" + inputFileNames[0]);
}
@Test
public void testLoadRemoteAbsAuth() throws Exception {
pc = servers[0].getPigContext();
checkLoadPath(cluster.getFileSystem().getUri()+"/test","/test");
}
@Test
public void testLoadRemoteNormalize() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
boolean noConversionExpected = true;
checkLoadPath(WORKING_DIR + "/foo/../././", WORKING_DIR + "/foo/.././.", noConversionExpected);
}
}
@Test
public void testGlobChars() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("t?s*", WORKING_DIR + "/t?s*");
}
}
@Test
public void testCommaSeparatedString() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("usr/pig/a,b", WORKING_DIR + "/usr/pig/a,"+ WORKING_DIR + "/b");
}
}
@Test
public void testCommaSeparatedString2() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("t?s*,test", WORKING_DIR + "/t?s*,"+ WORKING_DIR + "/test");
}
}
@SuppressWarnings("unchecked")
@Test
public void testCommaSeparatedString3() throws Exception {
PigServer pig = servers[0];
pc = pig.getPigContext();
boolean noConversionExpected = true;
checkLoadPath("hdfs:"+ WORKING_DIR + "/test,hdfs:" + WORKING_DIR + "/test2,hdfs:" + WORKING_DIR + "/test3",
"hdfs:" + WORKING_DIR + "/test,hdfs:" + WORKING_DIR + "/test2,hdfs:" + WORKING_DIR + "/test3", noConversionExpected );
// check if a location 'hdfs:<abs path>,hdfs:<abs path>' can actually be
// read using PigStorage
String[] inputFileNames = new String[] {
WORKING_DIR + "/TestLoad-testCommaSeparatedString3-input1.txt",
WORKING_DIR + "/TestLoad-testCommaSeparatedString3-input2.txt"};
String inputString = "hdfs:" + inputFileNames[0] + ",hdfs:" +
inputFileNames[1];
testLoadingMultipleFiles(inputFileNames, inputString);
}
@Test
public void testCommaSeparatedString4() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("usr/pig/{a,c},usr/pig/b", WORKING_DIR + "/usr/pig/{a,c}," + WORKING_DIR + "/usr/pig/b");
}
}
@Test
public void testCommaSeparatedString5() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("/usr/pig/{a,c},b", "/usr/pig/{a,c}," + WORKING_DIR + "/b");
}
// check if a location '<abs path>,<relative path>' can actually be
// read using PigStorage
String loadLocationString = WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input{1,2}.txt," +
"TestLoad-testCommaSeparatedStringMixed-input3.txt"; // current working dir is set to WORKING_DIR in checkLoadPath()
String[] inputFileNames = new String[] {
WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input1.txt",
WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input2.txt",
WORKING_DIR + "/TestLoad-testCommaSeparatedStringMixed-input3.txt",};
pc = servers[0].getPigContext(); // test in map reduce mode
testLoadingMultipleFiles(inputFileNames, loadLocationString);
}
@Test
public void testCommaSeparatedString6() throws Exception {
for (PigServer pig : servers) {
pc = pig.getPigContext();
checkLoadPath("usr/pig/{a,c},/usr/pig/b", WORKING_DIR + "/usr/pig/{a,c},/usr/pig/b");
}
}
@Test
public void testNonDfsLocation() throws Exception {
String nonDfsUrl = "har:///user/foo/f.har";
String query = "a = load '" + nonDfsUrl + "' using PigStorage('\t','-noschema');" +
"store a into 'pigoutput';";
LogicalPlan lp = Util.buildLp(servers[1], query);
LOLoad load = (LOLoad) lp.getSources().get(0);
nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
assertEquals(nonDfsUrl, load.getFileSpec().getFileName());
}
@SuppressWarnings("unchecked")
private void testLoadingMultipleFiles(String[] inputFileNames,
String loadLocationString) throws IOException, ParserException {
String[][] inputStrings = new String[][] {
new String[] { "hello\tworld"},
new String[] { "bye\tnow"},
new String[] { "all\tgood"}
};
List<Tuple> expected = Arrays.asList(new Tuple[] {
(Tuple) Util.getPigConstant("('hello', 'world')"),
(Tuple) Util.getPigConstant("('bye', 'now')"),
(Tuple) Util.getPigConstant("('all', 'good')")});
List<Tuple> expectedBasedOnNumberOfInputs = new ArrayList<Tuple>();
for(int i = 0; i < inputFileNames.length; i++) {
Util.createInputFile(pc, inputFileNames[i], inputStrings[i]);
expectedBasedOnNumberOfInputs.add(expected.get(i));
}
try {
servers[0].registerQuery(" a = load '" + loadLocationString + "' as " +
"(s1:chararray, s2:chararray);");
Iterator<Tuple> it = servers[0].openIterator("a");
List<Tuple> actual = new ArrayList<Tuple>();
while(it.hasNext()) {
actual.add(it.next());
}
Collections.sort(expectedBasedOnNumberOfInputs);
Collections.sort(actual);
assertEquals(expectedBasedOnNumberOfInputs, actual);
} finally {
for(int i = 0; i < inputFileNames.length; i++) {
Util.deleteFile(pc, inputFileNames[i]);
}
}
}
private void checkLoadPath(String orig, String expected) throws Exception {
checkLoadPath(orig, expected, false);
}
private void checkLoadPath(String orig, String expected,
boolean noConversionExpected) throws Exception {
boolean[] multiquery = {true, false};
for (boolean b : multiquery) {
pc.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "" + b);
DataStorage dfs = pc.getDfs();
dfs.setActiveContainer(dfs.asContainer(WORKING_DIR));
Map<String, String> fileNameMap = new HashMap<String, String>();
QueryParserDriver builder = new QueryParserDriver(pc, "Test-Load", fileNameMap);
String query = "a = load '"+orig+"';";
LogicalPlan lp = builder.parse(query);
assertTrue(lp.size()>0);
Operator op = lp.getSources().get(0);
assertTrue(op instanceof LOLoad);
LOLoad load = (LOLoad)op;
String p = load.getFileSpec().getFileName();
System.err.println("DEBUG: p:" + p + " expected:" + expected +", exectype:" + pc.getExecType());
if(noConversionExpected) {
assertEquals(expected, p);
} else {
String protocol = pc.getExecType() == cluster.getExecType() ? "hdfs" : "file";
// regex : A word character, i.e. [a-zA-Z_0-9] or '-' followed by ':' then any characters
String regex = "[\\-\\w:\\.]";
assertTrue(p.matches(".*" + protocol + "://" + regex + "*.*"));
assertEquals(expected, p.replaceAll(protocol + "://" + regex + "*/", "/"));
}
}
}
}