blob: ca5a6e178f501089aa6fb52e9cd2027d727f528d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.fs.Path;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class TestCounters {
String file = "input.txt";
static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
final int MAX = 100*1000;
Random r = new Random();
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
@Test
public void testMapOnly() throws IOException, ExecException {
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
for(int i = 0; i < MAX; i++) {
int t = r.nextInt(100);
pw.println(t);
if(t > 50) count ++;
}
pw.close();
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only");
PigStats pigStats = job.getStatistics();
//counting the no. of bytes in the output file
//long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
"output_map_only", pigServer.getPigContext()), pigServer
.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output_map_only"), true);
System.out.println("============================================");
System.out.println("Test case Map Only");
System.out.println("============================================");
JobGraph jg = pigStats.getJobGraph();
Iterator<JobStats> iter = jg.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
assertEquals(count, js.getMapOutputRecords());
assertEquals(0, js.getReduceInputRecords());
assertEquals(0, js.getReduceOutputRecords());
System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
assertEquals(filesize, js.getHdfsBytesWritten());
}
}
@Test
public void testMapOnlyBinStorage() throws IOException, ExecException {
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
for(int i = 0; i < MAX; i++) {
int t = r.nextInt(100);
pw.println(t);
if(t > 50)
count ++;
}
pw.close();
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
"output_map_only", pigServer.getPigContext()),
pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output_map_only"), true);
System.out.println("============================================");
System.out.println("Test case Map Only");
System.out.println("============================================");
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
assertEquals(count, js.getMapOutputRecords());
assertEquals(0, js.getReduceInputRecords());
assertEquals(0, js.getReduceOutputRecords());
}
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@Test
public void testMapReduceOnly() throws IOException, ExecException {
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
for(int i = 0; i < 10; i++)
nos[i] = 0;
for(int i = 0; i < MAX; i++) {
int index = r.nextInt(10);
int value = r.nextInt(100);
nos[index] += value;
pw.println(index + "\t" + value);
}
pw.close();
for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
ExecJob job = pigServer.store("c", "output");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapReduce");
System.out.println("============================================");
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
assertEquals(MAX, js.getMapOutputRecords());
System.out.println("Reduce input records : " + js.getReduceInputRecords());
assertEquals(MAX, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
}
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@Test
public void testMapReduceOnlyBinStorage() throws IOException, ExecException {
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
for(int i = 0; i < 10; i++)
nos[i] = 0;
for(int i = 0; i < MAX; i++) {
int index = r.nextInt(10);
int value = r.nextInt(100);
nos[index] += value;
pw.println(index + "\t" + value);
}
pw.close();
for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
ExecJob job = pigServer.store("c", "output", "BinStorage");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapReduce");
System.out.println("============================================");
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
assertEquals(MAX, js.getMapOutputRecords());
System.out.println("Reduce input records : " + js.getReduceInputRecords());
assertEquals(MAX, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
}
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@Test
public void testMapCombineReduce() throws IOException, ExecException {
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
for(int i = 0; i < 10; i++)
nos[i] = 0;
for(int i = 0; i < MAX; i++) {
int index = r.nextInt(10);
int value = r.nextInt(100);
nos[index] += value;
pw.println(index + "\t" + value);
}
pw.close();
for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
ExecJob job = pigServer.store("c", "output");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
assertEquals(MAX, js.getMapOutputRecords());
System.out.println("Reduce input records : " + js.getReduceInputRecords());
assertEquals(count, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
}
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@Test
public void testMapCombineReduceBinStorage() throws IOException, ExecException {
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
for(int i = 0; i < 10; i++)
nos[i] = 0;
for(int i = 0; i < MAX; i++) {
int index = r.nextInt(10);
int value = r.nextInt(100);
nos[index] += value;
pw.println(index + "\t" + value);
}
pw.close();
for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
ExecJob job = pigServer.store("c", "output", "BinStorage");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
assertEquals(MAX, js.getMapOutputRecords());
System.out.println("Reduce input records : " + js.getReduceInputRecords());
assertEquals(count, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
}
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@Test
public void testMultipleMRJobs() throws IOException, ExecException {
Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
Util.isMapredExecType(cluster.getExecType()));
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
for(int i = 0; i < 10; i++)
nos[i] = 0;
for(int i = 0; i < MAX; i++) {
int index = r.nextInt(10);
int value = r.nextInt(100);
nos[index] += value;
pw.println(index + "\t" + value);
}
pw.close();
for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = order a by $0;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
ExecJob job = pigServer.store("d", "output");
PigStats pigStats = job.getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MultipleMRJobs");
System.out.println("============================================");
JobGraph jp = pigStats.getJobGraph();
JobStats js = (JobStats)jp.getSinks().get(0);
System.out.println("Job id: " + js.getName());
System.out.println(jp.toString());
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
assertEquals(MAX, js.getMapOutputRecords());
System.out.println("Reduce input records : " + js.getReduceInputRecords());
assertEquals(count, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
assertEquals(filesize, js.getHdfsBytesWritten());
}
@Test
public void testMapOnlyMultiQueryStores() throws Exception {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
for(int i = 0; i < MAX; i++) {
int t = r.nextInt(100);
pw.println(t);
}
pw.close();
PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = filter a by $0 <= 50;");
pigServer.registerQuery("store b into '/tmp/outout1';");
pigServer.registerQuery("store c into '/tmp/outout2';");
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
assertTrue(stats.getOutputLocations().size() == 2);
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
assertEquals(MAX, counter);
}
@Test
public void testMultiQueryStores() throws Exception {
int[] nums = new int[100];
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
for(int i = 0; i < MAX; i++) {
int t = r.nextInt(100);
pw.println(t);
nums[t]++;
}
pw.close();
int groups = 0;
for (int i : nums) {
if (i > 0) groups++;
}
PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 >= 50;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group;");
pigServer.registerQuery("e = filter a by $0 < 50;");
pigServer.registerQuery("f = group e by $0;");
pigServer.registerQuery("g = foreach f generate group;");
pigServer.registerQuery("store d into '/tmp/outout1';");
pigServer.registerQuery("store g into '/tmp/outout2';");
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
assertTrue(stats.getOutputLocations().size() == 2);
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
assertEquals(groups, counter);
}
/*
* IMPORTANT NOTE:
* COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
* SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
*/
// @Test
// public void testLocal() throws IOException, ExecException {
// int count = 0;
// //PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
// File file = File.createTempFile("data", ".txt");
// PrintWriter pw = new PrintWriter(new FileOutputStream(file));
// int [] nos = new int[10];
// for(int i = 0; i < 10; i++)
// nos[i] = 0;
//
// for(int i = 0; i < MAX; i++) {
// int index = r.nextInt(10);
// int value = r.nextInt(100);
// nos[index] += value;
// pw.println(index + "\t" + value);
// }
// pw.close();
//
// for(int i = 0; i < 10; i++)
// if(nos[i] > 0)
// count ++;
//
// File out = File.createTempFile("output", ".txt");
// out.delete();
// PigServer pigServer = new PigServer("local");
// // FileLocalizer is initialized before using HDFS by previous tests
// FileLocalizer.setInitialized(false);
// pigServer.registerQuery("a = load '" + Util.encodeEscape(file.toString()) + "';");
// pigServer.registerQuery("b = order a by $0;");
// pigServer.registerQuery("c = group b by $0;");
// pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
// PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
// long filesize = 0;
// while(is.read() != -1) filesize++;
//
// is.close();
// out.delete();
//
// //Map<String, Map<String, String>> stats = pigStats.getPigStats();
//
// assertEquals(10, pigStats.getRecordsWritten());
// assertEquals(110, pigStats.getBytesWritten());
//
// }
@Test
public void testJoinInputCounters() throws Exception {
testInputCounters("join");
}
@Test
public void testCogroupInputCounters() throws Exception {
testInputCounters("cogroup");
}
@Test
public void testSkewedInputCounters() throws Exception {
testInputCounters("skewed");
}
@Test
public void testSelfJoinInputCounters() throws Exception {
testInputCounters("self-join");
}
private static boolean multiInputCreated = false;
private static int count = 0;
private void testInputCounters(String keyword) throws Exception {
String file1 = "multi-input1.txt";
String file2 = "multi-input2.txt";
String output = keyword;
if (keyword.equals("self-join")) {
file2 = file1;
keyword = "join";
}
final int MAX_NUM_RECORDS = 100;
if (!multiInputCreated) {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
int t = r.nextInt(100);
pw.println(t);
}
pw.close();
PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
int t = r.nextInt(100);
if (t > 50) {
count ++;
pw2.println(t);
}
}
pw2.close();
multiInputCreated = true;
}
PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file1 + "';");
pigServer.registerQuery("b = load '" + file2 + "';");
if (keyword.equals("join") || keyword.endsWith("cogroup")) {
pigServer.registerQuery("c = " + keyword + " a by $0, b by $0;");
} else if (keyword.equals("skewed")) {
pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
}
ExecJob job = pigServer.store("c", output + "_output");
PigStats stats = job.getStatistics();
assertTrue(stats.isSuccessful());
List<InputStats> inputs = stats.getInputStats();
if (keyword.equals("join") || keyword.endsWith("cogroup")) {
assertEquals(2, inputs.size());
} else if (keyword.equals("skewed")) {
assertEquals(3, inputs.size());
}
for (InputStats input : inputs) {
if (file1.equals(input.getName()) && input.getInputType() == InputStats.INPUT_TYPE.regular) {
assertEquals(MAX_NUM_RECORDS, input.getNumberRecords());
} else if (file2.equals(input.getName())){
assertEquals(count, input.getNumberRecords());
} else {
assertTrue(input.getInputType() == InputStats.INPUT_TYPE.sampler);
}
}
}
@Test
public void testSplitUnionOutputCounters() throws Exception {
PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
for (int i = 0; i < 10; i++) {
pw.println(i);
}
pw.close();
String query =
"a = load 'splitunion-input';" +
"split a into b if $0 < 5, c otherwise;" +
"d = union b, c;";
pigServer.registerQuery(query);
ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
PigStats stats1 = job.getStatistics();
query =
"a = load 'splitunion-input';" +
"split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
"e = distinct d;" +
"f = union b, c, e;";
pigServer.registerQuery(query);
job = pigServer.store("f", "splitunion-output-1", "PigStorage");
PigStats stats2 = job.getStatistics();
PigStats[] pigStats = new PigStats[]{stats1, stats2};
for (int i = 0; i < 2; i++) {
PigStats stats = pigStats[i];
assertTrue(stats.isSuccessful());
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(1, outputs.size());
OutputStats output = outputs.get(0);
assertEquals("splitunion-output-" + i, output.getName());
assertEquals(10, output.getNumberRecords());
assertEquals(20, output.getBytes());
}
}
}