blob: 1c4944e886e51cbc8fbd4bb492c55adaa7bd0163 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
* NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package org.apache.pig.piggybank.test.storage;
import static org.apache.pig.ExecType.LOCAL;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import junit.framework.TestCase;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.test.Util;
import com.google.common.collect.Sets;
import org.junit.Assert;
public class TestMultiStorageCompression extends TestCase {
private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
public static ArrayList<String[]> data = new ArrayList<String[]>();
static {
data.add(new String[] { "f1,a,123" });
data.add(new String[] { "f2,b,234" });
data.add(new String[] { "f3,c,345" });
data.add(new String[] { "f4,d,567" });
}
public void testMultiStorageShouldSupportBz2() throws Exception {
String type = "bz2";
List<String> filesToDelete = new ArrayList<String>();
String tmpDir = System.getProperty("java.io.tmpdir");
String outputPath = tmpDir + File.separator + "output001." + type;
filesToDelete.add(outputPath);
try {
runQuery(outputPath, "0", type);
verifyResults(type, outputPath);
} finally {
cleanUpDirs(filesToDelete);
}
}
public void testMultiStorageShouldSupportGz() throws Exception {
String type = "gz";
List<String> filesToDelete = new ArrayList<String>();
String tmpDir = System.getProperty("java.io.tmpdir");
String outputPath = tmpDir + File.separator + "output001." + type;
filesToDelete.add(outputPath);
try {
runQuery(outputPath, "0", type);
verifyResults(type, outputPath);
} finally {
cleanUpDirs(filesToDelete);
}
}
private void cleanUpDirs(List<String> filesToDelete) throws IOException {
// Delete files recursively
Collections.reverse(filesToDelete);
for (String string : filesToDelete)
FileUtils.deleteDirectory(new File(string));
}
private void verifyResults(String type,
String outputPath) throws IOException, FileNotFoundException {
// Verify the output
File outputDir = new File(outputPath);
List<String> indexFolders = Arrays.asList(outputDir.list());
// Assert whether all keys are present
assertTrue(indexFolders.contains("f1." + type));
assertTrue(indexFolders.contains("f2." + type));
assertTrue(indexFolders.contains("f3." + type));
assertTrue(indexFolders.contains("f4." + type));
// Sort so that assertions are easy
Collections.sort(indexFolders);
for (int i = 0; i < indexFolders.size(); i++) {
String indexFolder = indexFolders.get(i);
if (indexFolder.startsWith("._SUCCESS")||indexFolder.startsWith("_SUCCESS"))
continue;
String topFolder = outputPath + File.separator + indexFolder;
File indexFolderFile = new File(topFolder);
String[] list = indexFolderFile.list();
for (String outputFile : list) {
String file = topFolder + File.separator + outputFile;
// Skip off any file starting with .
if (outputFile.startsWith("."))
continue;
// Try to read the records using the codec
CompressionCodec codec = null;
// Use the codec according to the test case
if (type.equals("bz2")) {
codec = new BZip2Codec();
} else if (type.equals("gz")) {
codec = new GzipCodec();
}
if(codec instanceof Configurable) {
((Configurable)codec).setConf(new Configuration());
}
CompressionInputStream createInputStream = codec
.createInputStream(new FileInputStream(file));
int b;
StringBuffer sb = new StringBuffer();
while ((b = createInputStream.read()) != -1) {
sb.append((char) b);
}
createInputStream.close();
// Assert for the number of fields and keys.
String[] fields = sb.toString().split("\\t");
assertEquals(3, fields.length);
String id = indexFolder.substring(1,2);
assertEquals("f" + id, fields[0]);
}
}
}
private void runQuery(String outputPath, String keyColIndices, String compressionType)
throws Exception, ExecException, IOException, FrontendException {
// create a data file
String filename = TestHelper.createTempFile(data, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + Util.encodeEscape(filename)
+ "' USING PigStorage(',') as (a,b,c);";
String query2 = "STORE A INTO '" + Util.encodeEscape(outputPath)
+ "' USING org.apache.pig.piggybank.storage.MultiStorage" + "('"
+ Util.encodeEscape(outputPath) + "','"+keyColIndices+"', '" + compressionType + "', '\\t');";
// Run Pig
pig.setBatchOn();
pig.registerQuery(query);
pig.registerQuery(query2);
pig.executeBatch();
}
public void testMultiStorageShouldSupportMultiLevelAndGz() throws Exception {
String type = "gz";
String outputDir = "output001.multi." + type;
List<String> filesToDelete = new ArrayList<String>();
String tmpDir = System.getProperty("java.io.tmpdir");
String outputPath = tmpDir + File.separator + outputDir;
filesToDelete.add(outputPath);
try {
runQuery(outputPath, "1,0", type);
Collection<File> fileList = FileUtils.listFiles(new File(outputPath),null,true);
Set<String> expectedPaths = Sets.newHashSet( "output001.multi.gz/a.gz/f1.gz/a-f1-0,000.gz",
"output001.multi.gz/b.gz/f2.gz/b-f2-0,000.gz",
"output001.multi.gz/c.gz/f3.gz/c-f3-0,000.gz",
"output001.multi.gz/d.gz/f4.gz/d-f4-0,000.gz");
for (File file : fileList){
String foundPath = file.getAbsolutePath().substring(file.getAbsolutePath().indexOf(outputDir));
if (expectedPaths.contains(foundPath)){
expectedPaths.remove(foundPath);
}
}
Assert.assertTrue(expectedPaths.isEmpty());
} finally {
cleanUpDirs(filesToDelete);
}
}
}