| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Properties; |
| import java.util.Random; |
| |
| import junit.framework.Assert; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.JobStatus.State; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.pig.EvalFunc; |
| import org.apache.pig.ExecType; |
| import org.apache.pig.PigException; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.ResourceSchema; |
| import org.apache.pig.ResourceSchema.ResourceFieldSchema; |
| import org.apache.pig.ResourceStatistics; |
| import org.apache.pig.StoreMetadata; |
| import org.apache.pig.backend.datastorage.DataStorage; |
| import org.apache.pig.backend.executionengine.ExecException; |
| import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; |
| import org.apache.pig.builtin.BinStorage; |
| import org.apache.pig.builtin.PigStorage; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.DataByteArray; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.DefaultBagFactory; |
| import org.apache.pig.data.DefaultTuple; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.io.FileLocalizer; |
| import org.apache.pig.impl.logicalLayer.FrontendException; |
| import org.apache.pig.impl.logicalLayer.schema.Schema; |
| import org.apache.pig.impl.plan.PlanValidationException; |
| import org.apache.pig.impl.util.Utils; |
| import org.apache.pig.newplan.Operator; |
| import org.apache.pig.newplan.logical.relational.LOStore; |
| import org.apache.pig.newplan.logical.relational.LogicalPlan; |
| import org.apache.pig.newplan.logical.rules.InputOutputFileValidator; |
| import org.apache.pig.parser.ParserException; |
| import org.apache.pig.parser.QueryParserDriver; |
| import org.apache.pig.test.utils.GenRandomData; |
| import org.apache.pig.test.utils.TestHelper; |
| import org.joda.time.DateTimeZone; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| @RunWith(JUnit4.class) |
| public class TestStore extends junit.framework.TestCase { |
| POStore st; |
| DataBag inpDB; |
| static MiniCluster cluster = MiniCluster.buildCluster(); |
| PigContext pc; |
| POProject proj; |
| PigServer pig; |
| |
| String inputFileName; |
| String outputFileName; |
| |
| private static final String DUMMY_STORE_CLASS_NAME |
| = "org.apache.pig.test.TestStore\\$DummyStore"; |
| |
| private static final String FAIL_UDF_NAME |
| = "org.apache.pig.test.TestStore\\$FailUDF"; |
| private static final String MAP_MAX_ATTEMPTS = "mapred.map.max.attempts"; |
| private static final String TESTDIR = "/tmp/" + TestStore.class.getSimpleName(); |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); |
| pc = pig.getPigContext(); |
| inputFileName = TESTDIR + "/TestStore-" + new Random().nextLong() + ".txt"; |
| outputFileName = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; |
| |
| DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.UTC.getOffset(null))); |
| } |
| |
| @Override |
| @After |
| public void tearDown() throws Exception { |
| Util.deleteDirectory(new File(TESTDIR)); |
| Util.deleteFile(cluster, TESTDIR); |
| } |
| |
| private void storeAndCopyLocally(DataBag inpDB) throws Exception { |
| setUpInputFileOnCluster(inpDB); |
| String script = "a = load '" + inputFileName + "'; " + |
| "store a into '" + outputFileName + "' using PigStorage('\t');" + |
| "fs -ls " + TESTDIR; |
| pig.setBatchOn(); |
| Util.registerMultiLineQuery(pig, script); |
| pig.executeBatch(); |
| Util.copyFromClusterToLocal(cluster, outputFileName + "/part-m-00000", outputFileName); |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws Exception { |
| cluster.shutDown(); |
| } |
| |
| @Test |
| public void testValidation() throws Exception{ |
| String outputFileName = "test-output.txt"; |
| try { |
| String query = "a = load '" + inputFileName + "' as (c:chararray, " + |
| "i:int,d:double);" + |
| "store a into '" + outputFileName + "' using " + "PigStorage();"; |
| org.apache.pig.newplan.logical.relational.LogicalPlan lp = Util.buildLp( pig, query ); |
| new InputOutputFileValidator(lp, pig.getPigContext()).validate(); |
| } catch (PlanValidationException e){ |
| // Since output file is not present, validation should pass |
| // and not throw this exception. |
| fail("Store validation test failed."); |
| } finally { |
| Util.deleteFile(pig.getPigContext(), outputFileName); |
| } |
| } |
| |
| @Test |
| public void testValidationFailure() throws Exception{ |
| String input[] = new String[] { "some data" }; |
| String outputFileName = "test-output.txt"; |
| boolean sawException = false; |
| try { |
| Util.createInputFile(pig.getPigContext(),outputFileName, input); |
| String query = "a = load '" + inputFileName + "' as (c:chararray, " + |
| "i:int,d:double);" + |
| "store a into '" + outputFileName + "' using PigStorage();"; |
| org.apache.pig.newplan.logical.relational.LogicalPlan lp = Util.buildLp( pig, query ); |
| new InputOutputFileValidator(lp, pig.getPigContext()).validate(); |
| } catch (FrontendException pve){ |
| // Since output file is present, validation should fail |
| // and throw this exception |
| assertEquals(6000,pve.getErrorCode()); |
| assertEquals(PigException.REMOTE_ENVIRONMENT, pve.getErrorSource()); |
| assertTrue(pve.getCause() instanceof IOException); |
| sawException = true; |
| } finally { |
| assertTrue(sawException); |
| Util.deleteFile(pig.getPigContext(), outputFileName); |
| } |
| } |
| |
| @Test |
| public void testStore() throws Exception { |
| inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100); |
| storeAndCopyLocally(inpDB); |
| |
| int size = 0; |
| BufferedReader br = new BufferedReader(new FileReader(outputFileName)); |
| for(String line=br.readLine();line!=null;line=br.readLine()){ |
| String[] flds = line.split("\t",-1); |
| Tuple t = new DefaultTuple(); |
| t.append(flds[0].compareTo("")!=0 ? flds[0] : null); |
| t.append(flds[1].compareTo("")!=0 ? Integer.parseInt(flds[1]) : null); |
| |
| System.err.println("Simple data: "); |
| System.err.println(line); |
| System.err.println("t: "); |
| System.err.println(t); |
| assertEquals(true, TestHelper.bagContains(inpDB, t)); |
| ++size; |
| } |
| assertEquals(true, size==inpDB.size()); |
| } |
| |
| /** |
| * @param inpD |
| * @throws IOException |
| */ |
| private void setUpInputFileOnCluster(DataBag inpD) throws IOException { |
| String[] data = new String[(int) inpD.size()]; |
| int i = 0; |
| for (Tuple tuple : inpD) { |
| data[i] = toDelimitedString(tuple, "\t"); |
| i++; |
| } |
| Util.createInputFile(cluster, inputFileName, data); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private String toDelimitedString(Tuple t, String delim) throws ExecException { |
| StringBuilder buf = new StringBuilder(); |
| for (int i = 0; i < t.size(); i++) { |
| Object field = t.get(i); |
| if(field == null) { |
| buf.append(""); |
| } else { |
| if(field instanceof Map) { |
| Map<String, Object> m = (Map<String, Object>)field; |
| buf.append(DataType.mapToString(m)); |
| } else { |
| buf.append(field.toString()); |
| } |
| } |
| |
| if (i != t.size() - 1) |
| buf.append(delim); |
| } |
| return buf.toString(); |
| } |
| |
| @Test |
| public void testStoreComplexData() throws Exception { |
| inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100); |
| storeAndCopyLocally(inpDB); |
| PigStorage ps = new PigStorage("\t"); |
| int size = 0; |
| BufferedReader br = new BufferedReader(new FileReader(outputFileName)); |
| for(String line=br.readLine();line!=null;line=br.readLine()){ |
| String[] flds = line.split("\t",-1); |
| Tuple t = new DefaultTuple(); |
| |
| ResourceFieldSchema bagfs = GenRandomData.getSmallTupDataBagFieldSchema(); |
| ResourceFieldSchema tuplefs = GenRandomData.getSmallTupleFieldSchema(); |
| |
| t.append(flds[0].compareTo("")!=0 ? ps.getLoadCaster().bytesToBag(flds[0].getBytes(), bagfs) : null); |
| t.append(flds[1].compareTo("")!=0 ? new DataByteArray(flds[1].getBytes()) : null); |
| t.append(flds[2].compareTo("")!=0 ? ps.getLoadCaster().bytesToCharArray(flds[2].getBytes()) : null); |
| t.append(flds[3].compareTo("")!=0 ? ps.getLoadCaster().bytesToDouble(flds[3].getBytes()) : null); |
| t.append(flds[4].compareTo("")!=0 ? ps.getLoadCaster().bytesToFloat(flds[4].getBytes()) : null); |
| t.append(flds[5].compareTo("")!=0 ? ps.getLoadCaster().bytesToInteger(flds[5].getBytes()) : null); |
| t.append(flds[6].compareTo("")!=0 ? ps.getLoadCaster().bytesToLong(flds[6].getBytes()) : null); |
| t.append(flds[7].compareTo("")!=0 ? ps.getLoadCaster().bytesToMap(flds[7].getBytes()) : null); |
| t.append(flds[8].compareTo("")!=0 ? ps.getLoadCaster().bytesToTuple(flds[8].getBytes(), tuplefs) : null); |
| t.append(flds[9].compareTo("")!=0 ? ps.getLoadCaster().bytesToBoolean(flds[9].getBytes()) : null); |
| t.append(flds[10].compareTo("")!=0 ? ps.getLoadCaster().bytesToDateTime(flds[10].getBytes()) : null); |
| assertEquals(true, TestHelper.bagContains(inpDB, t)); |
| ++size; |
| } |
| assertEquals(true, size==inpDB.size()); |
| } |
| |
| @Test |
| public void testStoreComplexDataWithNull() throws Exception { |
| Tuple inputTuple = GenRandomData.genRandSmallBagTextTupleWithNulls(new Random(), 10, 100); |
| inpDB = DefaultBagFactory.getInstance().newDefaultBag(); |
| inpDB.add(inputTuple); |
| storeAndCopyLocally(inpDB); |
| PigStorage ps = new PigStorage("\t"); |
| int size = 0; |
| BufferedReader br = new BufferedReader(new FileReader(outputFileName)); |
| for(String line=br.readLine();line!=null;line=br.readLine()){ |
| System.err.println("Complex data: "); |
| System.err.println(line); |
| String[] flds = line.split("\t",-1); |
| Tuple t = new DefaultTuple(); |
| |
| ResourceFieldSchema stringfs = new ResourceFieldSchema(); |
| stringfs.setType(DataType.CHARARRAY); |
| ResourceFieldSchema intfs = new ResourceFieldSchema(); |
| intfs.setType(DataType.INTEGER); |
| |
| ResourceSchema tupleSchema = new ResourceSchema(); |
| tupleSchema.setFields(new ResourceFieldSchema[]{stringfs, intfs}); |
| ResourceFieldSchema tuplefs = new ResourceFieldSchema(); |
| tuplefs.setSchema(tupleSchema); |
| tuplefs.setType(DataType.TUPLE); |
| |
| ResourceSchema bagSchema = new ResourceSchema(); |
| bagSchema.setFields(new ResourceFieldSchema[]{tuplefs}); |
| ResourceFieldSchema bagfs = new ResourceFieldSchema(); |
| bagfs.setSchema(bagSchema); |
| bagfs.setType(DataType.BAG); |
| |
| t.append(flds[0].compareTo("")!=0 ? ps.getLoadCaster().bytesToBag(flds[0].getBytes(), bagfs) : null); |
| t.append(flds[1].compareTo("")!=0 ? new DataByteArray(flds[1].getBytes()) : null); |
| t.append(flds[2].compareTo("")!=0 ? ps.getLoadCaster().bytesToCharArray(flds[2].getBytes()) : null); |
| t.append(flds[3].compareTo("")!=0 ? ps.getLoadCaster().bytesToDouble(flds[3].getBytes()) : null); |
| t.append(flds[4].compareTo("")!=0 ? ps.getLoadCaster().bytesToFloat(flds[4].getBytes()) : null); |
| t.append(flds[5].compareTo("")!=0 ? ps.getLoadCaster().bytesToInteger(flds[5].getBytes()) : null); |
| t.append(flds[6].compareTo("")!=0 ? ps.getLoadCaster().bytesToLong(flds[6].getBytes()) : null); |
| t.append(flds[7].compareTo("")!=0 ? ps.getLoadCaster().bytesToMap(flds[7].getBytes()) : null); |
| t.append(flds[8].compareTo("")!=0 ? ps.getLoadCaster().bytesToTuple(flds[8].getBytes(), tuplefs) : null); |
| t.append(flds[9].compareTo("")!=0 ? ps.getLoadCaster().bytesToBoolean(flds[9].getBytes()) : null); |
| t.append(flds[10].compareTo("")!=0 ? ps.getLoadCaster().bytesToDateTime(flds[10].getBytes()) : null); |
| t.append(flds[11].compareTo("")!=0 ? ps.getLoadCaster().bytesToCharArray(flds[10].getBytes()) : null); |
| assertTrue(TestHelper.tupleEquals(inputTuple, t)); |
| ++size; |
| } |
| } |
| @Test |
| public void testBinStorageGetSchema() throws IOException, ParserException { |
| String input[] = new String[] { "hello\t1\t10.1", "bye\t2\t20.2" }; |
| String inputFileName = "testGetSchema-input.txt"; |
| String outputFileName = "testGetSchema-output.txt"; |
| try { |
| Util.createInputFile(pig.getPigContext(), |
| inputFileName, input); |
| String query = "a = load '" + inputFileName + "' as (c:chararray, " + |
| "i:int,d:double);store a into '" + outputFileName + "' using " + |
| "BinStorage();"; |
| pig.setBatchOn(); |
| Util.registerMultiLineQuery(pig, query); |
| pig.executeBatch(); |
| ResourceSchema rs = new BinStorage().getSchema(outputFileName, |
| new Job(ConfigurationUtil.toConfiguration(pig.getPigContext(). |
| getProperties()))); |
| Schema expectedSchema = Utils.getSchemaFromString( |
| "c:chararray,i:int,d:double"); |
| Assert.assertTrue("Checking binstorage getSchema output", Schema.equals( |
| expectedSchema, Schema.getPigSchema(rs), true, true)); |
| } finally { |
| Util.deleteFile(pig.getPigContext(), inputFileName); |
| Util.deleteFile(pig.getPigContext(), outputFileName); |
| } |
| } |
| |
| @Test |
| public void testStoreRemoteRel() throws Exception { |
| checkStorePath("test","/tmp/test"); |
| } |
| |
| @Test |
| public void testStoreRemoteAbs() throws Exception { |
| checkStorePath("/tmp/test","/tmp/test"); |
| } |
| |
| @Test |
| public void testStoreRemoteRelScheme() throws Exception { |
| checkStorePath("test","/tmp/test"); |
| } |
| |
| @Test |
| public void testStoreRemoteAbsScheme() throws Exception { |
| checkStorePath("hdfs:/tmp/test","hdfs:/tmp/test"); |
| } |
| |
| @Test |
| public void testStoreRemoteAbsAuth() throws Exception { |
| checkStorePath("hdfs://localhost:9000/test","/test"); |
| } |
| |
| @Test |
| public void testStoreRemoteNormalize() throws Exception { |
| checkStorePath("/tmp/foo/../././","/tmp/foo/.././."); |
| } |
| |
| @Test |
| public void testSetStoreSchema() throws Exception { |
| PigServer ps = null; |
| Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); |
| filesToVerify.put(outputFileName + "_storeSchema_test", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED, Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED, Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED, Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED, Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED, Boolean.FALSE); |
| try { |
| ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL}; |
| String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; |
| |
| String script = "a = load '"+ inputFileName + "' as (a0:chararray, a1:chararray);" + |
| "store a into '" + outputFileName + "' using " + |
| DUMMY_STORE_CLASS_NAME + "();"; |
| |
| for (ExecType execType : modes) { |
| FileLocalizer.setInitialized(false); |
| if(execType == ExecType.MAPREDUCE) { |
| ps = new PigServer(ExecType.MAPREDUCE, |
| cluster.getProperties()); |
| } else { |
| Properties props = new Properties(); |
| props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); |
| ps = new PigServer(ExecType.LOCAL, props); |
| if (Util.isHadoop1_x()) { |
| // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce |
| // OutputCommitter) is fixed only in 0.23.1 |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED, Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED, Boolean.FALSE); |
| } |
| } |
| ps.setBatchOn(); |
| Util.deleteFile(ps.getPigContext(), TESTDIR); |
| Util.createInputFile(ps.getPigContext(), |
| inputFileName, inputData); |
| Util.registerMultiLineQuery(ps, script); |
| ps.executeBatch(); |
| for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { |
| String condition = entry.getValue() ? "" : "not"; |
| assertEquals("Checking if file " + entry.getKey() + |
| " does " + condition + " exists in " + execType + |
| " mode", (boolean) entry.getValue(), |
| Util.exists(ps.getPigContext(), entry.getKey())); |
| } |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail("Exception encountered - hence failing:" + e); |
| } |
| } |
| |
| @Test |
| public void testCleanupOnFailure() throws Exception { |
| PigServer ps = null; |
| String cleanupSuccessFile = outputFileName + "_cleanupOnFailure_succeeded"; |
| String cleanupFailFile = outputFileName + "_cleanupOnFailure_failed"; |
| try { |
| ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; |
| String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; |
| |
| String script = "a = load '"+ inputFileName + "';" + |
| "store a into '" + outputFileName + "' using " + |
| DUMMY_STORE_CLASS_NAME + "('true');"; |
| |
| for (ExecType execType : modes) { |
| if(execType == ExecType.MAPREDUCE) { |
| ps = new PigServer(ExecType.MAPREDUCE, |
| cluster.getProperties()); |
| } else { |
| Properties props = new Properties(); |
| props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); |
| ps = new PigServer(ExecType.LOCAL, props); |
| } |
| Util.deleteFile(ps.getPigContext(), TESTDIR); |
| ps.setBatchOn(); |
| Util.createInputFile(ps.getPigContext(), |
| inputFileName, inputData); |
| Util.registerMultiLineQuery(ps, script); |
| ps.executeBatch(); |
| assertEquals( |
| "Checking if file indicating that cleanupOnFailure failed " + |
| " does not exists in " + execType + " mode", false, |
| Util.exists(ps.getPigContext(), cleanupFailFile)); |
| assertEquals( |
| "Checking if file indicating that cleanupOnFailure was " + |
| "successfully called exists in " + execType + " mode", true, |
| Util.exists(ps.getPigContext(), cleanupSuccessFile)); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail("Exception encountered - hence failing:" + e); |
| } |
| } |
| |
| |
| @Test |
| public void testCleanupOnFailureMultiStore() throws Exception { |
| PigServer ps = null; |
| String outputFileName1 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; |
| String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt"; |
| |
| Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>(); |
| filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE); |
| filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE); |
| filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE); |
| filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE); |
| filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE); |
| |
| try { |
| ExecType[] modes = new ExecType[] { ExecType.MAPREDUCE, ExecType.LOCAL}; |
| String[] inputData = new String[]{"hello\tworld", "bye\tworld"}; |
| |
| // though the second store should |
| // not cause a failure, the first one does and the result should be |
| // that both stores are considered to have failed |
| String script = "a = load '"+ inputFileName + "';" + |
| "store a into '" + outputFileName1 + "' using " + |
| DUMMY_STORE_CLASS_NAME + "('true', '1');" + |
| "store a into '" + outputFileName2 + "' using " + |
| DUMMY_STORE_CLASS_NAME + "('false', '2');"; |
| |
| for (ExecType execType : modes) { |
| FileLocalizer.setInitialized(false); |
| if(execType == ExecType.MAPREDUCE) { |
| ps = new PigServer(ExecType.MAPREDUCE, |
| cluster.getProperties()); |
| } else { |
| Properties props = new Properties(); |
| props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); |
| ps = new PigServer(ExecType.LOCAL, props); |
| // LocalJobRunner does not call abortTask |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE); |
| if (Util.isHadoop1_x()) { |
| // MAPREDUCE-1447/3563 (LocalJobRunner does not call methods of mapreduce |
| // OutputCommitter) is fixed only in 0.23.1 |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE); |
| filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE); |
| } |
| } |
| Util.deleteFile(ps.getPigContext(), TESTDIR); |
| ps.setBatchOn(); |
| Util.createInputFile(ps.getPigContext(), |
| inputFileName, inputData); |
| Util.registerMultiLineQuery(ps, script); |
| ps.executeBatch(); |
| for (Entry<String, Boolean> entry : filesToVerify.entrySet()) { |
| String condition = entry.getValue() ? "" : "not"; |
| assertEquals("Checking if file " + entry.getKey() + |
| " does " + condition + " exists in " + execType + |
| " mode", (boolean) entry.getValue(), |
| Util.exists(ps.getPigContext(), entry.getKey())); |
| } |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| Assert.fail("Exception encountered - hence failing:" + e); |
| } |
| } |
| |
| // Test that "_SUCCESS" file is created when "mapreduce.fileoutputcommitter.marksuccessfuljobs" |
| // property is set to true |
| // The test covers multi store and single store case in local and mapreduce mode |
| // The test also checks that "_SUCCESS" file is NOT created when the property |
| // is not set to true in all the modes. |
| @Test |
| public void testSuccessFileCreation1() throws Exception { |
| PigServer ps = null; |
| |
| try { |
| ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; |
| String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; |
| |
| String multiStoreScript = "a = load '"+ inputFileName + "';" + |
| "b = filter a by $0 == 'hello';" + |
| "c = filter a by $0 == 'hi';" + |
| "d = filter a by $0 == 'bye';" + |
| "store b into '" + outputFileName + "_1';" + |
| "store c into '" + outputFileName + "_2';" + |
| "store d into '" + outputFileName + "_3';"; |
| |
| String singleStoreScript = "a = load '"+ inputFileName + "';" + |
| "store a into '" + outputFileName + "_1';" ; |
| |
| for (ExecType execType : modes) { |
| for(boolean isPropertySet: new boolean[] { true, false}) { |
| for(boolean isMultiStore: new boolean[] { true, false}) { |
| String script = (isMultiStore ? multiStoreScript : |
| singleStoreScript); |
| // since we will be switching between map red and local modes |
| // we will need to make sure filelocalizer is reset before each |
| // run. |
| FileLocalizer.setInitialized(false); |
| if(execType == ExecType.MAPREDUCE) { |
| ps = new PigServer(ExecType.MAPREDUCE, |
| cluster.getProperties()); |
| } else { |
| Properties props = new Properties(); |
| props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); |
| ps = new PigServer(ExecType.LOCAL, props); |
| } |
| ps.getPigContext().getProperties().setProperty( |
| MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, |
| Boolean.toString(isPropertySet)); |
| Util.deleteFile(ps.getPigContext(), TESTDIR); |
| ps.setBatchOn(); |
| Util.createInputFile(ps.getPigContext(), |
| inputFileName, inputData); |
| Util.registerMultiLineQuery(ps, script); |
| ps.executeBatch(); |
| for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { |
| String sucFile = outputFileName + "_" + i + "/" + |
| MapReduceLauncher.SUCCEEDED_FILE_NAME; |
| assertEquals("Checking if _SUCCESS file exists in " + |
| execType + " mode", isPropertySet, |
| Util.exists(ps.getPigContext(), sucFile)); |
| } |
| } |
| } |
| } |
| } finally { |
| Util.deleteFile(ps.getPigContext(), TESTDIR); |
| } |
| } |
| |
| // Test _SUCCESS file is NOT created when job fails and when |
| // "mapreduce.fileoutputcommitter.marksuccessfuljobs" property is set to true |
| // The test covers multi store and single store case in local and mapreduce mode |
| // The test also checks that "_SUCCESS" file is NOT created when the property |
| // is not set to true in all the modes. |
| @Test |
| public void testSuccessFileCreation2() throws Exception { |
| PigServer ps = null; |
| try { |
| ExecType[] modes = new ExecType[] { ExecType.LOCAL, ExecType.MAPREDUCE}; |
| String[] inputData = new String[]{"hello\tworld", "hi\tworld", "bye\tworld"}; |
| System.err.println("XXX: " + TestStore.FailUDF.class.getName()); |
| String multiStoreScript = "a = load '"+ inputFileName + "';" + |
| "b = filter a by $0 == 'hello';" + |
| "b = foreach b generate " + FAIL_UDF_NAME + "($0);" + |
| "c = filter a by $0 == 'hi';" + |
| "d = filter a by $0 == 'bye';" + |
| "store b into '" + outputFileName + "_1';" + |
| "store c into '" + outputFileName + "_2';" + |
| "store d into '" + outputFileName + "_3';"; |
| |
| String singleStoreScript = "a = load '"+ inputFileName + "';" + |
| "b = foreach a generate " + FAIL_UDF_NAME + "($0);" + |
| "store b into '" + outputFileName + "_1';" ; |
| |
| for (ExecType execType : modes) { |
| for(boolean isPropertySet: new boolean[] { true, false}) { |
| for(boolean isMultiStore: new boolean[] { true, false}) { |
| String script = (isMultiStore ? multiStoreScript : |
| singleStoreScript); |
| // since we will be switching between map red and local modes |
| // we will need to make sure filelocalizer is reset before each |
| // run. |
| FileLocalizer.setInitialized(false); |
| if(execType == ExecType.MAPREDUCE) { |
| // since the job is guaranteed to fail, let's set |
| // number of retries to 1. |
| Properties props = cluster.getProperties(); |
| props.setProperty(MAP_MAX_ATTEMPTS, "1"); |
| ps = new PigServer(ExecType.MAPREDUCE, props); |
| } else { |
| Properties props = new Properties(); |
| props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); |
| // since the job is guaranteed to fail, let's set |
| // number of retries to 1. |
| props.setProperty(MAP_MAX_ATTEMPTS, "1"); |
| ps = new PigServer(ExecType.LOCAL, props); |
| } |
| ps.getPigContext().getProperties().setProperty( |
| MapReduceLauncher.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, |
| Boolean.toString(isPropertySet)); |
| Util.deleteFile(ps.getPigContext(), TESTDIR); |
| ps.setBatchOn(); |
| Util.createInputFile(ps.getPigContext(), |
| inputFileName, inputData); |
| Util.registerMultiLineQuery(ps, script); |
| try { |
| ps.executeBatch(); |
| } catch(IOException ioe) { |
| if(!ioe.getMessage().equals("FailUDFException")) { |
| // an unexpected exception |
| throw ioe; |
| } |
| } |
| for(int i = 1; i <= (isMultiStore ? 3 : 1); i++) { |
| String sucFile = outputFileName + "_" + i + "/" + |
| MapReduceLauncher.SUCCEEDED_FILE_NAME; |
| assertEquals("Checking if _SUCCESS file exists in " + |
| execType + " mode", false, |
| Util.exists(ps.getPigContext(), sucFile)); |
| } |
| } |
| } |
| } |
| } finally { |
| Util.deleteFile(ps.getPigContext(), TESTDIR); |
| } |
| } |
| |
| // A UDF which always throws an Exception so that the job can fail |
| public static class FailUDF extends EvalFunc<String> { |
| |
| @Override |
| public String exec(Tuple input) throws IOException { |
| throw new IOException("FailUDFException"); |
| } |
| |
| } |
| |
| public static class DummyStore extends PigStorage implements StoreMetadata{ |
| |
| private boolean failInPutNext = false; |
| |
| private String outputFileSuffix= ""; |
| |
| public DummyStore(String failInPutNextStr) { |
| failInPutNext = Boolean.parseBoolean(failInPutNextStr); |
| } |
| |
| public DummyStore(String failInPutNextStr, String outputFileSuffix) { |
| failInPutNext = Boolean.parseBoolean(failInPutNextStr); |
| this.outputFileSuffix = outputFileSuffix; |
| } |
| |
| public DummyStore() { |
| } |
| |
| |
| @Override |
| public void putNext(Tuple t) throws IOException { |
| if(failInPutNext) { |
| throw new IOException("Failing in putNext"); |
| } |
| super.putNext(t); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| @Override |
| public OutputFormat getOutputFormat() { |
| return new DummyOutputFormat(outputFileSuffix); |
| } |
| |
| @Override |
| public void storeSchema(ResourceSchema schema, String location, |
| Job job) throws IOException { |
| FileSystem fs = FileSystem.get(job.getConfiguration()); |
| // verify that output is available prior to storeSchema call |
| Path resultPath = new Path(location, "part-m-00000"); |
| if (!fs.exists(resultPath)) { |
| FileStatus[] listing = fs.listStatus(new Path(location)); |
| for (FileStatus fstat : listing) { |
| System.err.println(fstat.getPath()); |
| } |
| // not creating the marker file below fails the test |
| throw new IOException("" + resultPath + " not available in storeSchema"); |
| } |
| |
| // create a file to test that this method got called - if it gets called |
| // multiple times, the create will throw an Exception |
| fs.create( |
| new Path(location + "_storeSchema_test"), |
| false); |
| } |
| |
| @Override |
| public void cleanupOnFailure(String location, Job job) |
| throws IOException { |
| super.cleanupOnFailure(location, job); |
| |
| // check that the output file location is not present |
| Configuration conf = job.getConfiguration(); |
| FileSystem fs = FileSystem.get(conf); |
| if(fs.exists(new Path(location))) { |
| // create a file to inidicate that the cleanup did not happen |
| fs.create(new Path(location + "_cleanupOnFailure_failed" + |
| outputFileSuffix), false); |
| } |
| // create a file to test that this method got called successfully |
| // if it gets called multiple times, the create will throw an Exception |
| fs.create( |
| new Path(location + "_cleanupOnFailure_succeeded" + |
| outputFileSuffix), false); |
| } |
| |
| @Override |
| public void storeStatistics(ResourceStatistics stats, String location, |
| Job job) throws IOException { |
| |
| } |
| } |
| |
| private void checkStorePath(String orig, String expected) throws Exception { |
| checkStorePath(orig, expected, false); |
| } |
| |
| private void checkStorePath(String orig, String expected, boolean isTmp) throws Exception { |
| pc.getProperties().setProperty("opt.multiquery",""+true); |
| |
| DataStorage dfs = pc.getDfs(); |
| dfs.setActiveContainer(dfs.asContainer("/tmp")); |
| Map<String, String> fileNameMap = new HashMap<String, String>(); |
| |
| QueryParserDriver builder = new QueryParserDriver(pc, "Test-Store", fileNameMap); |
| |
| String query = "a = load 'foo';" + "store a into '"+orig+"';"; |
| LogicalPlan lp = builder.parse(query); |
| |
| Assert.assertTrue(lp.size()>1); |
| Operator op = lp.getSinks().get(0); |
| |
| Assert.assertTrue(op instanceof LOStore); |
| LOStore store = (LOStore)op; |
| |
| String p = store.getFileSpec().getFileName(); |
| p = p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/","/"); |
| |
| if (isTmp) { |
| Assert.assertTrue(p.matches("/tmp.*")); |
| } else { |
| Assert.assertEquals(expected, p); |
| } |
| } |
| |
| static class DummyOutputFormat extends PigTextOutputFormat { |
| |
| private String outputFileSuffix; |
| |
| public DummyOutputFormat(String outputFileSuffix) { |
| super((byte) '\t'); |
| this.outputFileSuffix = outputFileSuffix; |
| } |
| |
| @Override |
| public synchronized OutputCommitter getOutputCommitter( |
| TaskAttemptContext context) throws IOException { |
| return new DummyOutputCommitter(outputFileSuffix, |
| super.getOutputCommitter(context)); |
| } |
| |
| @Override |
| public Path getDefaultWorkFile(TaskAttemptContext context, |
| String extension) throws IOException { |
| FileOutputCommitter committer = |
| (FileOutputCommitter) super.getOutputCommitter(context); |
| return new Path(committer.getWorkPath(), getUniqueFile(context, |
| "part", extension)); |
| } |
| |
| } |
| |
| static class DummyOutputCommitter extends OutputCommitter { |
| |
| static String FILE_SETUPJOB_CALLED = "/tmp/TestStore/_setupJob_called"; |
| static String FILE_SETUPTASK_CALLED = "/tmp/TestStore/_setupTask_called"; |
| static String FILE_COMMITTASK_CALLED = "/tmp/TestStore/_commitTask_called"; |
| static String FILE_ABORTTASK_CALLED = "/tmp/TestStore/_abortTask_called"; |
| static String FILE_CLEANUPJOB_CALLED = "/tmp/TestStore/_cleanupJob_called"; |
| static String FILE_COMMITJOB_CALLED = "/tmp/TestStore/_commitJob_called"; |
| static String FILE_ABORTJOB_CALLED = "/tmp/TestStore/_abortJob_called"; |
| |
| private String outputFileSuffix; |
| private OutputCommitter baseCommitter; |
| |
| public DummyOutputCommitter(String outputFileSuffix, |
| OutputCommitter baseCommitter) throws IOException { |
| this.outputFileSuffix = outputFileSuffix; |
| this.baseCommitter = baseCommitter; |
| } |
| |
| @Override |
| public void setupJob(JobContext jobContext) throws IOException { |
| baseCommitter.setupJob(jobContext); |
| createFile(jobContext, FILE_SETUPJOB_CALLED + outputFileSuffix); |
| } |
| |
| @Override |
| public void setupTask(TaskAttemptContext taskContext) |
| throws IOException { |
| baseCommitter.setupTask(taskContext); |
| createFile(taskContext, FILE_SETUPTASK_CALLED + outputFileSuffix); |
| } |
| |
| @Override |
| public boolean needsTaskCommit(TaskAttemptContext taskContext) |
| throws IOException { |
| return true; |
| } |
| |
| @Override |
| public void commitTask(TaskAttemptContext taskContext) |
| throws IOException { |
| baseCommitter.commitTask(taskContext); |
| createFile(taskContext, FILE_COMMITTASK_CALLED + outputFileSuffix); |
| } |
| |
| @Override |
| public void abortTask(TaskAttemptContext taskContext) |
| throws IOException { |
| baseCommitter.abortTask(taskContext); |
| createFile(taskContext, FILE_ABORTTASK_CALLED + outputFileSuffix); |
| } |
| |
| @Override |
| public void cleanupJob(JobContext jobContext) throws IOException { |
| baseCommitter.cleanupJob(jobContext); |
| createFile(jobContext, FILE_CLEANUPJOB_CALLED + outputFileSuffix); |
| } |
| |
| @Override |
| public void commitJob(JobContext jobContext) throws IOException { |
| baseCommitter.commitJob(jobContext); |
| createFile(jobContext, FILE_COMMITJOB_CALLED + outputFileSuffix); |
| } |
| |
| @Override |
| public void abortJob(JobContext jobContext, State state) |
| throws IOException { |
| baseCommitter.abortJob(jobContext, state); |
| createFile(jobContext, FILE_ABORTJOB_CALLED + outputFileSuffix); |
| } |
| |
| public void createFile(JobContext jobContext, String fileName) |
| throws IOException { |
| Configuration conf = jobContext.getConfiguration(); |
| FileSystem fs = FileSystem.get(conf); |
| fs.mkdirs(new Path(fileName).getParent()); |
| FSDataOutputStream out = fs.create(new Path(fileName), true); |
| out.close(); |
| } |
| |
| } |
| } |