blob: 12bca2213906322dbca79119efb4766612dc6422 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
* NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package org.apache.pig.piggybank.storage;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.StorageUtil;
import org.apache.xml.utils.StringBufferPool;
import com.google.common.base.Strings;
/**
* The UDF is useful for splitting the output data into a bunch of directories
* and files dynamically based on user specified key field in the output tuple.
*
* Sample usage: <code>
* A = LOAD 'mydata' USING PigStorage() as (a, b, c);
* STORE A INTO '/my/home/output' USING MultiStorage('/my/home/output','0', 'bz2', '\\t');
* </code> Parameter details:- ========== <b>/my/home/output </b>(Required) :
* The DFS path where output directories and files will be created. <b> 0
* </b>(Required) : Index of field whose values should be used to create
* directories and files( field 'a' in this case). <b>'bz2' </b>(Optional) : The
* compression type. Default is 'none'. Supported types are:- 'none', 'gz' and
* 'bz2' <b> '\\t' </b>(Optional) : Output field separator.
*
* Let 'a1', 'a2' be the unique values of field 'a'. Then output may look like
* this
*
* /my/home/output/a1/a1-0000 /my/home/output/a1/a1-0001
* /my/home/output/a1/a1-0002 ... /my/home/output/a2/a2-0000
* /my/home/output/a2/a2-0001 /my/home/output/a2/a2-0002
*
* The prefix '0000*' is the task-id of the mapper/reducer task executing this
* store. In case user does a GROUP BY on the field followed by MultiStorage(),
* then its imperative that all tuples for a particular group will go exactly to
* 1 reducer. So in the above case for e.g. there will be only 1 file each under
* 'a1' and 'a2' directories.
*
* If the output is compressed,then the sub directories and the output files will
* be having the extension. Say for example in the above case if bz2 is used one file
* will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2
*
* Key field can also be a comma separated list of indices e.g. '0,1' - in this case
* storage will be multi-level:
* /my/home/output/a1/b1/a1-b1-0000
* /my/home/output/a1/b2/a1-b2-0000
* There is also an option to leave key values out of storage, see isRemoveKeys.
*/
public class MultiStorage extends StoreFunc {
private static final String KEYFIELD_DELIMETER = ",";
private Path outputPath; // User specified output Path
private final List<Integer> splitFieldIndices= new ArrayList<Integer>(); // Indices of the key fields
private String fieldDel; // delimiter of the output record.
private Compression comp; // Compression type of output data.
private boolean isRemoveKeys = false;
// Compression types supported by this store
enum Compression {
none, bz2, bz, gz;
};
public MultiStorage(String parentPathStr, String splitFieldIndex) {
this(parentPathStr, splitFieldIndex, "none");
}
public MultiStorage(String parentPathStr, String splitFieldIndex,
String compression) {
this(parentPathStr, splitFieldIndex, compression, "\\t");
}
public MultiStorage(String parentPathStr, String splitFieldIndex,
String compression, String fieldDel) {
this(parentPathStr, splitFieldIndex, compression, fieldDel, "false");
}
/**
* Constructor
*
* @param parentPathStr
* Parent output dir path (this will be specified in store statement,
* so MultiStorage don't use this parameter in reality. However, we don't
* want to change the construct to break backward compatibility)
* @param splitFieldIndex
* key field index
* @param compression
* 'bz2', 'bz', 'gz' or 'none'
* @param fieldDel
* Output record field delimiter.
* @param isRemoveKeys
* Removes key columns from result during write.
*/
public MultiStorage(String parentPathStr, String splitFieldIndex,
String compression, String fieldDel, String isRemoveKeys) {
this.isRemoveKeys = Boolean.parseBoolean(isRemoveKeys);
this.outputPath = new Path(parentPathStr);
String[] splitFieldIndices = splitFieldIndex.split(KEYFIELD_DELIMETER);
for (String splitFieldIndexString : splitFieldIndices){
this.splitFieldIndices.add(Integer.parseInt(splitFieldIndexString));
}
this.fieldDel = fieldDel;
try {
this.comp = (compression == null) ? Compression.none : Compression
.valueOf(compression.toLowerCase());
} catch (IllegalArgumentException e) {
System.err.println("Exception when converting compression string: "
+ compression + " to enum. No compression will be used");
this.comp = Compression.none;
}
}
//--------------------------------------------------------------------------
// Implementation of StoreFunc
private RecordWriter<List<String>, Tuple> writer;
@Override
public void putNext(Tuple tuple) throws IOException {
for (int splitFieldIndex : this.splitFieldIndices) {
if (tuple.size() <= splitFieldIndex) {
throw new IOException("split field index:" + splitFieldIndex
+ " >= tuple size:" + tuple.size());
}
}
List<String> fields = new ArrayList<String>();
for (int splitFieldIndex : this.splitFieldIndices){
try {
fields.add(String.valueOf(tuple.get(splitFieldIndex)));
} catch (ExecException exec) {
throw new IOException(exec);
}
}
try {
writer.write(fields, tuple);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@SuppressWarnings("unchecked")
@Override
public OutputFormat getOutputFormat() throws IOException {
MultiStorageOutputFormat format = new MultiStorageOutputFormat();
format.setKeyValueSeparator(fieldDel);
if (this.isRemoveKeys){
format.setSkipIndices(this.splitFieldIndices);
}
return format;
}
@SuppressWarnings("unchecked")
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
job.getConfiguration().set(MRConfiguration.TEXTOUTPUTFORMAT_SEPARATOR, "");
FileOutputFormat.setOutputPath(job, new Path(location));
if (comp == Compression.bz2 || comp == Compression.bz) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
} else if (comp == Compression.gz) {
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
}
@Override
public Boolean supportsParallelWriteToStoreLocation() {
return false;
}
//--------------------------------------------------------------------------
// Implementation of OutputFormat
public static class MultiStorageOutputFormat extends
TextOutputFormat<List<String>, Tuple> {
private String keyValueSeparator = "\\t";
private byte fieldDel = '\t';
private List<Integer> skipIndices = null;
@Override
public RecordWriter<List<String>, Tuple>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException {
final TaskAttemptContext ctx = context;
return new RecordWriter<List<String>, Tuple>() {
private Map<List<String>, MyLineRecordWriter> storeMap =
new HashMap<List<String>, MyLineRecordWriter>();
private static final int BUFFER_SIZE = 1024;
private ByteArrayOutputStream mOut =
new ByteArrayOutputStream(BUFFER_SIZE);
@Override
public void write(List<String> key, Tuple val) throws IOException {
int sz = val.size();
for (int i = 0; i < sz; i++) {
Object field;
try {
field = val.get(i);
} catch (ExecException ee) {
throw ee;
}
boolean skipCurrentField = skipIndices != null && skipIndices.contains(i);
if (!skipCurrentField) {
StorageUtil.putField(mOut, field);
}
if (i != sz - 1 && !skipCurrentField) {
mOut.write(fieldDel);
}
}
getStore(key).write(null, new Text(mOut.toByteArray()));
mOut.reset();
}
@Override
public void close(TaskAttemptContext context) throws IOException {
for (MyLineRecordWriter out : storeMap.values()) {
out.close(context);
}
}
private MyLineRecordWriter getStore(List<String> fieldValues) throws IOException {
MyLineRecordWriter store = storeMap.get(fieldValues);
if (store == null) {
DataOutputStream os = createOutputStream(fieldValues);
store = new MyLineRecordWriter(os, keyValueSeparator);
storeMap.put(fieldValues, store);
}
return store;
}
private DataOutputStream createOutputStream(List<String> fieldValues) throws IOException {
Configuration conf = ctx.getConfiguration();
TaskID taskId = ctx.getTaskAttemptID().getTaskID();
// Check whether compression is enabled, if so get the extension and add them to the path
boolean isCompressed = getCompressOutput(ctx);
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(ctx, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, ctx.getConfiguration());
extension = codec.getDefaultExtension();
}
NumberFormat nf = NumberFormat.getInstance();
nf.setMinimumIntegerDigits(4);
StringBuffer pathStringBuffer = new StringBuffer();
for (String fieldValue : fieldValues){
String safeFieldValue = fieldValue.replaceAll("\\/","-");
pathStringBuffer.append(safeFieldValue);
pathStringBuffer.append("/");
}
pathStringBuffer.deleteCharAt(pathStringBuffer.length()-1);
String pathString = pathStringBuffer.toString();
String idString = pathString.replaceAll("\\/","-");
if (!Strings.isNullOrEmpty(extension)){
pathString = pathString.replaceAll("\\/",extension+"\\/");
}
Path path = new Path(pathString+extension, idString + '-'
+ nf.format(taskId.getId())+extension);
Path workOutputPath = ((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath();
Path file = new Path(workOutputPath, path);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
if (isCompressed)
return new DataOutputStream(codec.createOutputStream(fileOut));
else
return fileOut;
}
};
}
public void setKeyValueSeparator(String sep) {
keyValueSeparator = sep;
fieldDel = StorageUtil.parseFieldDel(keyValueSeparator);
}
public void setSkipIndices(List<Integer> skipIndices) {
this.skipIndices = skipIndices;
}
//------------------------------------------------------------------------
//
protected static class MyLineRecordWriter
extends TextOutputFormat.LineRecordWriter<WritableComparable, Text> {
public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) {
super(out, keyValueSeparator);
}
}
}
}