blob: c76e9b8c797c05b4ccee9fed072d4bb2131f0c2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
* NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package org.apache.pig.piggybank.test.storage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestMultiStorage {
private static final String INPUT_FILE = "MultiStorageInput.txt";
private PigServer pigServer;
private PigServer pigServerLocal;
private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
public TestMultiStorage() throws Exception {
pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServerLocal = new PigServer(Util.getLocalTestMode());
}
public static final PathFilter hiddenPathFilter = new PathFilter() {
public boolean accept(Path p) {
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
private void createFile() throws IOException {
PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
w.println("100\tapple\taaa1");
w.println("200\torange\tbbb1");
w.println("300\tstrawberry\tccc1");
w.println("101\tapple\taaa2");
w.println("201\torange\tbbb2");
w.println("301\tstrawberry\tccc2");
w.println("102\tapple\taaa3");
w.println("202\torange\tbbb3");
w.println("302\tstrawberry\tccc3");
w.close();
Util.deleteFile(cluster, INPUT_FILE);
Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
}
@Before
public void setUp() throws Exception {
createFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
Path localOut = new Path("local-out");
if (fs.exists(localOut)) {
fs.delete(localOut, true);
}
}
@After
public void tearDown() throws Exception {
new File(INPUT_FILE).delete();
Util.deleteFile(cluster, INPUT_FILE);
}
@AfterClass
public static void shutdown() {
cluster.shutDown();
}
enum Mode {
local, cluster
}
@Test
public void testMultiStorage() throws IOException {
final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);";
final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING "
+ "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');";
final String MULTI_STORE_LOCAL = "STORE A INTO 'local-out' USING "
+ "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');";
System.out.print("Testing in LOCAL mode: ...");
testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
System.out.println("Succeeded!");
System.out.print("Testing in CLUSTER mode: ...");
testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER);
System.out.println("Succeeded!");
}
@Test
public void testOutputStats() throws IOException {
FileSystem fs = cluster.getFileSystem();
pigServer.setBatchOn();
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
pigServer.registerQuery("B = FILTER A BY name == 'apple';");
pigServer.registerQuery("STORE A INTO 'out1' USING org.apache.pig.piggybank.storage.MultiStorage('out1', '1');"); //153 bytes
pigServer.registerQuery("STORE B INTO 'out2' USING org.apache.pig.piggybank.storage.MultiStorage('out2', '1');"); // 45 bytes
ExecJob job = pigServer.executeBatch().get(0);
PigStats stats = job.getStatistics();
PigStats.JobGraph jobGraph = stats.getJobGraph();
JobStats jobStats = (JobStats) jobGraph.getSinks().get(0);
Map<String, Long> multiStoreCounters = jobStats.getMultiStoreCounters();
List<OutputStats> outputStats = SimplePigStats.get().getOutputStats();
OutputStats outputStats1 = "out1".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
OutputStats outputStats2 = "out2".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
assertEquals(153 + 45, stats.getBytesWritten());
assertEquals(2, outputStats.size()); // 2 split conditions
assertEquals(153, outputStats1.getBytes());
assertEquals(45, outputStats2.getBytes());
assertEquals(9, outputStats1.getRecords());
assertEquals(3, outputStats2.getRecords());
assertEquals(3L, multiStoreCounters.get("Output records in _1_out2").longValue());
assertEquals(9L, multiStoreCounters.get("Output records in _0_out1").longValue());
fs.delete(new Path("out1"), true);
fs.delete(new Path("out2"), true);
}
/**
* The actual method that run the test in local or cluster mode.
*/
private void testMultiStorage( Mode mode, String outPath,
String... queries) throws IOException {
PigServer pigServer = (Mode.local == mode) ? this.pigServerLocal : this.pigServer;
pigServer.setBatchOn();
for (String query : queries) {
pigServer.registerQuery(query);
}
pigServer.executeBatch();
verifyResults(mode, outPath);
}
/**
* Test if records are split into directories corresponding to split field
* values
*/
private void verifyResults(Mode mode, String outPath) throws IOException {
FileSystem fs = (Mode.local == mode ? FileSystem
.getLocal(new Configuration()) : cluster.getFileSystem());
Path output = new Path(outPath);
assertTrue("Output dir does not exists!", fs.exists(output)
&& fs.getFileStatus(output).isDir());
Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
assertTrue("Split field dirs not found!", paths != null);
for (Path path : paths) {
String splitField = path.getName();
Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
assertTrue("No files found for path: " + path.toUri().getPath(),
files != null);
for (Path filePath : files) {
assertTrue("This shouldn't be a directory", fs.isFile(filePath));
BufferedReader reader = new BufferedReader(new InputStreamReader(fs
.open(filePath)));
String line = "";
int count = 0;
while ((line = reader.readLine()) != null) {
String[] fields = line.split("\\t");
assertEquals(fields.length, 3);
assertEquals("Unexpected field value in the output record",
splitField, fields[1]);
count++;
System.out.println("field: " + fields[1]);
}
reader.close();
assertEquals(count, 3);
}
}
}
}