| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.test; |
| |
| import java.io.File; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| |
| import org.apache.pig.PigRunner; |
| import org.apache.pig.PigServer; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.tools.pigstats.PigStats; |
| import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| |
| /** |
| * This class tests whether the number of reducers is correctly set for queries |
| * involving a sample job. Note the sample job should use the actual #reducers |
| * of the following order-by or skew-join job to generate the partition file, |
| * and the challenge is that whether we can get the actual #reducer of the |
| * order-by/skew-join job in advance. |
| */ |
| @RunWith(JUnit4.class) |
| public class TestNumberOfReducers { |
| |
| private static final String LOCAL_INPUT_FILE = "test/org/apache/pig/test/data/passwd"; |
| private static final String HDFS_INPUT_FILE = "passwd"; |
| private static final String OUTPUT_FILE = "output"; |
| private static final String PIG_FILE = "test.pig"; |
| |
| static PigContext pc; |
| static PigServer pigServer; |
| private static MiniGenericCluster cluster; |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| cluster = MiniGenericCluster.buildCluster(); |
| |
| Util.copyFromLocalToCluster(cluster, LOCAL_INPUT_FILE, HDFS_INPUT_FILE); |
| } |
| |
| @AfterClass |
| public static void oneTimeTearDown() throws Exception { |
| cluster.shutDown(); |
| } |
| |
| /** |
| * Specifying bytes_per_reducer, default_parallel and parallel, verifying |
| * whether it uses the actual #reducers as actual_parallel. |
| * |
| * Note the input file size is 555B. |
| * |
| */ |
| private void verifyOrderBy(int bytes_per_reducer, int default_parallel, |
| int parallel, int actual_parallel) throws IOException { |
| PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); |
| |
| // the file is 555B |
| w.println("SET pig.exec.reducers.bytes.per.reducer " + bytes_per_reducer + ";"); |
| |
| w.println("SET default_parallel " + default_parallel + ";"); |
| w.println("A = load '" + HDFS_INPUT_FILE + "';"); |
| w.print("B = order A by $0 "); |
| if (parallel > 0) |
| w.print("parallel " + parallel); |
| w.println(";"); |
| w.println("store B into '" + OUTPUT_FILE + "';"); |
| w.close(); |
| |
| doTest(bytes_per_reducer, default_parallel, parallel, actual_parallel); |
| } |
| |
| /** |
| * Specifying bytes_per_reducer, default_parallel and parallel, verifying |
| * whether it uses the actual #reducers as actual_parallel. |
| * |
| * Note the input file size is 555B. |
| * |
| */ |
| private void verifySkewJoin(int bytes_per_reducer, int default_parallel, |
| int parallel, int actual_parallel) throws IOException { |
| PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); |
| |
| // the file is 555B |
| w.println("SET pig.exec.reducers.bytes.per.reducer " + bytes_per_reducer + ";"); |
| w.println("SET default_parallel " + default_parallel + ";"); |
| w.println("A1 = load '" + HDFS_INPUT_FILE + "';"); |
| w.println("A2 = load '" + HDFS_INPUT_FILE + "';"); |
| w.print("B = join A1 by $0, A2 by $0 using 'skewed' "); |
| if (parallel > 0) |
| w.print("parallel " + parallel); |
| w.println(";"); |
| w.println("store B into '" + OUTPUT_FILE + "';"); |
| w.close(); |
| |
| doTest(bytes_per_reducer, default_parallel, parallel, actual_parallel); |
| } |
| |
| private void doTest(double bytes_per_reducer, int default_parallel, |
| int parallel, int actual_parallel) throws IOException { |
| try { |
| String[] args = { PIG_FILE }; |
| PigStats stats = PigRunner.run(args, null); |
| |
| assertTrue(stats.isSuccessful()); |
| |
| // get the skew-join job stat |
| MRJobStats js = (MRJobStats) stats.getJobGraph().getSinks().get(0); |
| assertEquals(actual_parallel, js.getNumberReduces()); |
| |
| // estimation should only kick in if parallel and default_parallel are not set |
| long estimatedReducers = -1; |
| if (parallel < 1 && default_parallel < 1) { |
| double fileSize = (double)(new File("test/org/apache/pig/test/data/passwd").length()); |
| int inputFiles = js.getInputs().size(); |
| estimatedReducers = Math.min((long)Math.ceil(fileSize/(double)bytes_per_reducer) * inputFiles, 999); |
| } |
| |
| Util.assertParallelValues(default_parallel, parallel, estimatedReducers, |
| actual_parallel, js.getInputs().get(0).getConf()); |
| |
| } catch (Exception e) { |
| assertNull("Exception thrown during verifySkewJoin", e); |
| } finally { |
| new File(PIG_FILE).delete(); |
| Util.deleteFile(cluster, OUTPUT_FILE); |
| } |
| } |
| |
| /** |
| * For 550B file and 300B per reducer, it estimates 2 #reducers. |
| * @throws Exception |
| */ |
| @Test |
| public void testOrderByEstimate() throws Exception{ |
| verifyOrderBy(300, -1, -1, 2); |
| } |
| |
| /** |
| * For two 550B files and 600B per reducer, it estimates 2 #reducers. |
| * @throws Exception |
| */ |
| @Test |
| public void testSkewJoinEstimate() throws Exception{ |
| verifySkewJoin(600, -1, -1, 2); |
| } |
| |
| |
| /** |
| * Pig will estimate 2 reducers but we specify parallel 1, so it should use |
| * 1 reducer. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testOrderByEstimate2Parallel1() throws Exception { |
| verifyOrderBy(300, -1, 1, 1); |
| } |
| |
| /** |
| * Pig will estimate 2 reducers but we specify parallel 1, so it should use |
| * 1 reducer. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testSkewJoinEstimate2Parallel1() throws Exception { |
| verifySkewJoin(600, -1, 1, 1); |
| } |
| |
| /** |
| * Pig will estimate 2 reducers but we specify default parallel 1, so it |
| * should use 1 reducer. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testOrderByEstimate2Default1() throws Exception { |
| verifyOrderBy(300, 1, -1, 1); |
| } |
| |
| /** |
| * Pig will estimate 2 reducers but we specify default parallel 1, so it |
| * should use 1 reducer. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testSkewJoinEstimate2Default1() throws Exception { |
| verifySkewJoin(600, 1, -1, 1); |
| } |
| |
| /** |
| * Pig will estimate 2 reducers but we specify parallel 4, so it should use |
| * 4 reducer. |
| * |
| * TODO we need verify that the sample job generates 4 partitions instead of |
| * 2. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testOrderByEstimate2Parallel4() throws Exception { |
| verifyOrderBy(300, -1, 4, 4); |
| } |
| |
| /** |
| * Pig will estimate 2 reducers but we specify parallel 4, so it should use |
| * 4 reducer. |
| * |
| * TODO we need verify that the sample job generates 4 partitions instead of |
| * 2. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testSkewJoinEstimate2Parallel4() throws Exception { |
| verifySkewJoin(600, -1, 4, 4); |
| } |
| |
| /** |
| * Pig will estimate 6 reducers but we specify default parallel 2, so it |
| * should use 2 reducer. Also, the sampler should generate 2 partitions, |
| * instead of 6, otherwise the next job will fail. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testOrderByEstimate6Default2() throws Exception { |
| verifyOrderBy(100, 2, -1, 2); |
| } |
| |
| /** |
| * Pig will estimate 6 reducers but we specify default parallel 2, so it |
| * should use 2 reducer. Also, the sampler should generate 2 partitions, |
| * instead of 6, otherwise the next job will fail. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testSkewJoinEstimate6Default2() throws Exception { |
| verifySkewJoin(200, 2, -1, 2); |
| } |
| } |