| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.test; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.Random; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.mapred.FileAlreadyExistsException; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.io.FileLocalizer; |
| import org.apache.pig.impl.util.JavaCompilerHelper; |
| import org.apache.pig.test.junit.OrderedJUnit4Runner; |
| import org.apache.pig.test.junit.OrderedJUnit4Runner.TestOrder; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| |
| //Need to run testImportList first due to TEZ-1802 |
| @RunWith(OrderedJUnit4Runner.class) |
| @TestOrder({ |
| "testImportList", |
| "testScriptFiles", |
| "testSetProperties_way_num01", |
| "testSetProperties_way_num02", |
| "testSetProperties_way_num03", |
| "testHadoopExceptionCreation" |
| }) |
| public class TestPigContext { |
| private static final String TMP_DIR_PROP = "/tmp/hadoop-hadoop"; |
| private static final String FS_NAME = "file:///"; |
| private static final String JOB_TRACKER = "local"; |
| |
| private static PigContext pigContext; |
| private static Properties properties; |
| private static MiniGenericCluster cluster; |
| |
| private File input; |
| |
| @BeforeClass |
| public static void oneTimeSetup() { |
| cluster = MiniGenericCluster.buildCluster(); |
| properties = cluster.getProperties(); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| Util.resetStateForExecModeSwitch(); |
| pigContext = new PigContext(Util.getLocalTestMode(), getProperties()); |
| input = File.createTempFile("PigContextTest-", ".txt"); |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws Exception { |
| cluster.shutDown(); |
| } |
| |
| /** |
| * Passing an already configured pigContext in PigServer constructor. |
| */ |
| @Test |
| public void testSetProperties_way_num01() throws Exception { |
| PigServer pigServer = new PigServer(pigContext); |
| registerAndStore(pigServer); |
| |
| check_asserts(pigServer); |
| } |
| |
| /** |
| * Setting properties through PigServer constructor directly. |
| */ |
| @Test |
| public void testSetProperties_way_num02() throws Exception { |
| PigServer pigServer = new PigServer(Util.getLocalTestMode(), getProperties()); |
| registerAndStore(pigServer); |
| |
| check_asserts(pigServer); |
| } |
| |
| /** |
| * using connect() method. |
| */ |
| @Test |
| public void testSetProperties_way_num03() throws Exception { |
| pigContext.connect(); |
| PigServer pigServer = new PigServer(pigContext); |
| registerAndStore(pigServer); |
| |
| check_asserts(pigServer); |
| } |
| |
| @Test |
| public void testHadoopExceptionCreation() throws Exception { |
| Object object = PigContext |
| .instantiateFuncFromSpec("org.apache.hadoop.mapred.FileAlreadyExistsException"); |
| assertTrue(object instanceof FileAlreadyExistsException); |
| } |
| |
| @Test |
| // See PIG-832 |
| public void testImportList() throws Exception { |
| |
| String FILE_SEPARATOR = System.getProperty("file.separator"); |
| File tmpDir = File.createTempFile("test", ""); |
| tmpDir.delete(); |
| tmpDir.mkdir(); |
| |
| File udf1Dir = new File(tmpDir.getAbsolutePath() + FILE_SEPARATOR + "com" + FILE_SEPARATOR |
| + "xxx" + FILE_SEPARATOR + "udf1"); |
| udf1Dir.mkdirs(); |
| File udf2Dir = new File(tmpDir.getAbsolutePath() + FILE_SEPARATOR + "com" + FILE_SEPARATOR |
| + "xxx" + FILE_SEPARATOR + "udf2"); |
| udf2Dir.mkdirs(); |
| |
| String udf1Src = new String("package com.xxx.udf1;\n" + |
| "import java.io.IOException;\n" + |
| "import org.apache.pig.EvalFunc;\n" + |
| "import org.apache.pig.data.Tuple;\n" + |
| "public class TestUDF1 extends EvalFunc<Integer>{\n" + |
| "public Integer exec(Tuple input) throws IOException {\n" + |
| "return 1;}\n" + |
| "}"); |
| |
| String udf2Src = new String("package com.xxx.udf2;\n" + |
| "import org.apache.pig.builtin.PigStorage;\n" + |
| "public class TestUDF2 extends PigStorage { }\n"); |
| |
| // compile |
| JavaCompilerHelper javaCompilerHelper = new JavaCompilerHelper(); |
| javaCompilerHelper.compile(tmpDir.getAbsolutePath(), |
| new JavaCompilerHelper.JavaSourceFromString("com.xxx.udf1.TestUDF1", udf1Src), |
| new JavaCompilerHelper.JavaSourceFromString("com.xxx.udf2.TestUDF2", udf2Src)); |
| |
| // generate jar file |
| String jarName = "TestUDFJar.jar"; |
| String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName; |
| int status = Util.executeJavaCommand("jar -cf " + jarFile + |
| " -C " + tmpDir.getAbsolutePath() + " " + "com"); |
| assertEquals(0, status); |
| Util.resetStateForExecModeSwitch(); |
| PigContext localPigContext = new PigContext(cluster.getExecType(), properties); |
| |
| // register jar using properties |
| localPigContext.getProperties().setProperty("pig.additional.jars", jarFile); |
| PigServer pigServer = new PigServer(localPigContext); |
| |
| PigContext.initializeImportList("com.xxx.udf1:com.xxx.udf2."); |
| ArrayList<String> importList = PigContext.getPackageImportList(); |
| assertEquals(6, importList.size()); |
| assertEquals("", importList.get(0)); |
| assertEquals("com.xxx.udf1.", importList.get(1)); |
| assertEquals("com.xxx.udf2.", importList.get(2)); |
| assertEquals("java.lang.", importList.get(3)); |
| assertEquals("org.apache.pig.builtin.", importList.get(4)); |
| assertEquals("org.apache.pig.impl.builtin.", importList.get(5)); |
| |
| Object udf = PigContext.instantiateFuncFromSpec("TestUDF1"); |
| assertTrue(udf.getClass().toString().endsWith("com.xxx.udf1.TestUDF1")); |
| |
| int LOOP_COUNT = 40; |
| File tmpFile = File.createTempFile("test", "txt"); |
| tmpFile.delete(); // don't actually want the file, just the filename |
| String clusterTmpPath = Util.removeColon(tmpFile.getCanonicalPath()); |
| |
| String localInput[] = new String[LOOP_COUNT]; |
| Random r = new Random(1); |
| int rand; |
| for(int i = 0; i < LOOP_COUNT; i++) { |
| rand = r.nextInt(100); |
| localInput[i] = Integer.toString(rand); |
| } |
| Util.createInputFile(cluster, clusterTmpPath, localInput); |
| |
| FileLocalizer.deleteTempFiles(); |
| pigServer.registerQuery("A = LOAD '" + Util.encodeEscape(clusterTmpPath) |
| + "' using TestUDF2() AS (num:chararray);"); |
| pigServer.registerQuery("B = foreach A generate TestUDF1(num);"); |
| Iterator<Tuple> iter = pigServer.openIterator("B"); |
| assertTrue("No Output received", iter.hasNext()); |
| while (iter.hasNext()) { |
| Tuple t = iter.next(); |
| assertTrue(t.get(0) instanceof Integer); |
| assertEquals(Integer.valueOf(1), (Integer)t.get(0)); |
| } |
| Util.deleteFile(cluster, clusterTmpPath); |
| Util.deleteDirectory(tmpDir); |
| } |
| |
| // See PIG-1824 |
| @SuppressWarnings("deprecation") |
| @Test |
| public void testScriptFiles() throws Exception { |
| PigContext pc = new PigContext(Util.getLocalTestMode(), getProperties()); |
| final int n = pc.scriptFiles.size(); |
| pc.addScriptFile("test/path-1824"); |
| assertEquals("test" + File.separator + "path-1824", pc.getScriptFiles().get("test/path-1824").toString()); |
| assertEquals("script files should not be populated", n, pc.scriptFiles.size()); |
| |
| pc.addScriptFile("path-1824", "test/path-1824"); |
| assertEquals("test" + File.separator + "path-1824", pc.getScriptFiles().get("path-1824").toString()); |
| assertEquals("script files should not be populated", n, pc.scriptFiles.size()); |
| |
| // last add wins when using an alias |
| pc.addScriptFile("path-1824", "test/some/other/path-1824"); |
| assertEquals("test" + File.separator + "some" + File.separator + "other" |
| + File.separator + "path-1824", pc.getScriptFiles().get("path-1824").toString()); |
| assertEquals("script files should not be populated", n, pc.scriptFiles.size()); |
| |
| // clean up |
| pc.getScriptFiles().remove("path-1824"); |
| pc.getScriptFiles().remove("test/path-1824"); |
| } |
| |
| private static Properties getProperties() { |
| Properties props = new Properties(); |
| props.put(MRConfiguration.JOB_TRACKER, JOB_TRACKER); |
| props.put("fs.default.name", FS_NAME); |
| props.put("hadoop.tmp.dir", TMP_DIR_PROP); |
| return props; |
| } |
| |
| private List<String> getCommands() { |
| List<String> commands = new ArrayList<String>(); |
| commands.add("my_input = LOAD '" + Util.encodeEscape(input.getAbsolutePath()) |
| + "' USING PigStorage();"); |
| commands.add("words = FOREACH my_input GENERATE FLATTEN(TOKENIZE($0));"); |
| commands.add("grouped = GROUP words BY $0;"); |
| commands.add("counts = FOREACH grouped GENERATE group, COUNT(words);"); |
| return commands; |
| } |
| |
| private void registerAndStore(PigServer pigServer) throws IOException { |
| // pigServer.debugOn(); |
| List<String> commands = getCommands(); |
| for (final String command : commands) { |
| pigServer.registerQuery(command); |
| } |
| String outFileName = Util.removeColon(input.getAbsolutePath() + ".out"); |
| pigServer.store("counts", outFileName); |
| Util.deleteFile(cluster, outFileName); |
| } |
| |
| private void check_asserts(PigServer pigServer) { |
| assertEquals(JOB_TRACKER, |
| pigServer.getPigContext().getProperties().getProperty(MRConfiguration.JOB_TRACKER)); |
| assertEquals(FS_NAME, |
| pigServer.getPigContext().getProperties().getProperty(FileSystem.FS_DEFAULT_NAME_KEY)); |
| assertEquals(TMP_DIR_PROP, |
| pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir")); |
| } |
| } |