blob: 49cb5c6f5a188005f38733185852672998a211c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.impl.util;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.builtin.PigStreaming;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.streaming.StreamingDelimiters;
import org.joda.time.DateTime;
import com.google.common.base.Charsets;
/**
* This util class provides methods that are shared by storage class
* {@link PigStorage} and streaming class {@link PigStreaming}
*
*/
public final class StorageUtil {
private static Map<Byte, byte[]> TYPE_INDICATOR;
static {
TYPE_INDICATOR = new HashMap<Byte, byte[]>();
TYPE_INDICATOR.put(DataType.BOOLEAN, new byte[] {'B'});
TYPE_INDICATOR.put(DataType.INTEGER, new byte[] {'I'});
TYPE_INDICATOR.put(DataType.LONG, new byte[] {'L'});
TYPE_INDICATOR.put(DataType.FLOAT, new byte[] {'F'});
TYPE_INDICATOR.put(DataType.DOUBLE, new byte[] {'D'});
TYPE_INDICATOR.put(DataType.BYTEARRAY, new byte[] {'A'});
TYPE_INDICATOR.put(DataType.CHARARRAY, new byte[] {'C'});
TYPE_INDICATOR.put(DataType.DATETIME, new byte[] {'T'});
TYPE_INDICATOR.put(DataType.BIGINTEGER, new byte[] {'N'});
TYPE_INDICATOR.put(DataType.BIGDECIMAL, new byte[] {'E'});
}
public static final StreamingDelimiters DEFAULT_DELIMITERS = new StreamingDelimiters();
/**
* Transform a <code>String</code> into a byte representing the
* field delimiter.
*
* @param delimiter a string that may be in single-quoted form
* @return the field delimiter in byte form
*/
public static byte parseFieldDel(String delimiter) {
if (delimiter == null) {
throw new IllegalArgumentException("Null delimiter");
}
delimiter = parseSingleQuotedString(delimiter);
if (delimiter.length() > 1 && delimiter.charAt(0) != '\\') {
throw new IllegalArgumentException("Delimeter must be a " +
"single character " + delimiter);
}
byte fieldDel = '\t';
if (delimiter.length() == 1) {
fieldDel = (byte)delimiter.charAt(0);
} else if (delimiter.charAt(0) == '\\') {
switch (delimiter.charAt(1)) {
case 't':
fieldDel = (byte)'\t';
break;
case 'x':
fieldDel =
Integer.valueOf(delimiter.substring(2), 16).byteValue();
break;
case 'u':
fieldDel =
Integer.valueOf(delimiter.substring(2)).byteValue();
break;
default:
throw new IllegalArgumentException("Unknown delimiter " +
delimiter);
}
}
return fieldDel;
}
public static void putField(OutputStream out, Object field) throws IOException {
putField(out, field, DEFAULT_DELIMITERS, false);
}
public static void putField(OutputStream out, Object field, boolean includeTypeInformation) throws IOException {
putField(out, field, DEFAULT_DELIMITERS, includeTypeInformation);
}
/**
* Serialize an object to an {@link OutputStream} in the
* field-delimited form.
*
* @param out an OutputStream object
* @param field an object to be serialized
* @throws IOException if serialization fails.
*/
@SuppressWarnings("unchecked")
public static void putField(OutputStream out, Object field, StreamingDelimiters delims, boolean includeTypeInformation)
throws IOException {
switch (DataType.findType(field)) {
case DataType.NULL:
out.write(delims.getNull());
break;
case DataType.BOOLEAN:
writeField(out, ((Boolean)field).toString().getBytes(Charset.defaultCharset()),
DataType.BOOLEAN, includeTypeInformation);
break;
case DataType.INTEGER:
writeField(out, ((Integer)field).toString().getBytes(Charset.defaultCharset()),
DataType.INTEGER, includeTypeInformation);
break;
case DataType.LONG:
writeField(out, ((Long)field).toString().getBytes(Charset.defaultCharset()),
DataType.LONG, includeTypeInformation);
break;
case DataType.FLOAT:
writeField(out, ((Float)field).toString().getBytes(Charset.defaultCharset()),
DataType.FLOAT, includeTypeInformation);
break;
case DataType.DOUBLE:
writeField(out, ((Double)field).toString().getBytes(Charset.defaultCharset()),
DataType.DOUBLE, includeTypeInformation);
break;
case DataType.BIGINTEGER:
writeField(out, ((BigInteger)field).toString().getBytes(Charset.defaultCharset()),
DataType.BIGINTEGER, includeTypeInformation);
break;
case DataType.BIGDECIMAL:
writeField(out, ((BigDecimal)field).toString().getBytes(Charset.defaultCharset()),
DataType.BIGDECIMAL, includeTypeInformation);
break;
case DataType.DATETIME:
writeField(out, ((DateTime)field).toString().getBytes(Charset.defaultCharset()),
DataType.DATETIME, includeTypeInformation);
break;
case DataType.BYTEARRAY:
writeField(out, ((DataByteArray)field).get(),
DataType.BYTEARRAY, includeTypeInformation);
break;
case DataType.CHARARRAY:
writeField(out, ((String)field).getBytes(Charsets.UTF_8),
DataType.CHARARRAY, includeTypeInformation);
break;
case DataType.MAP:
boolean mapHasNext = false;
Map<String, Object> m = (Map<String, Object>)field;
out.write(delims.getMapBegin());
for(Map.Entry<String, Object> e: m.entrySet()) {
if(mapHasNext) {
out.write(delims.getFieldDelim());
} else {
mapHasNext = true;
}
putField(out, e.getKey(), delims, includeTypeInformation);
out.write(delims.getMapKeyDelim());
putField(out, e.getValue(), delims, includeTypeInformation);
}
out.write(delims.getMapEnd());
break;
case DataType.TUPLE:
boolean tupleHasNext = false;
Tuple t = (Tuple)field;
out.write(delims.getTupleBegin());
for(int i = 0; i < t.size(); ++i) {
if(tupleHasNext) {
out.write(delims.getFieldDelim());
} else {
tupleHasNext = true;
}
try {
putField(out, t.get(i), delims, includeTypeInformation);
} catch (ExecException ee) {
throw ee;
}
}
out.write(delims.getTupleEnd());
break;
case DataType.BAG:
boolean bagHasNext = false;
out.write(delims.getBagBegin());
Iterator<Tuple> tupleIter = ((DataBag)field).iterator();
while(tupleIter.hasNext()) {
if(bagHasNext) {
out.write(delims.getFieldDelim());
} else {
bagHasNext = true;
}
putField(out, (Object)tupleIter.next(), delims, includeTypeInformation);
}
out.write(delims.getBagEnd());
break;
default: {
int errCode = 2108;
String msg = "Could not determine data type of field: " + field;
throw new ExecException(msg, errCode, PigException.BUG);
}
}
}
private static void writeField(OutputStream out, byte[] bytes, byte dataType,
boolean includeTypeInformation) throws IOException {
if (includeTypeInformation) {
out.write(TYPE_INDICATOR.get(dataType));
}
out.write(bytes);
}
/**
* Transform a line of <code>Text</code> to a <code>Tuple</code>
*
* @param val a line of text
* @param fieldDel the field delimiter
* @return tuple constructed from the text
*/
public static Tuple textToTuple(Text val, byte fieldDel) {
return bytesToTuple(val.getBytes(), 0, val.getLength(), fieldDel);
}
/**
* Transform bytes from a byte array up to the specified length to a <code>Tuple</code>
*
* @param buf the byte array
* @param length number of bytes to consume from the byte array
* @param fieldDel the field delimiter
* @return tuple constructed from the bytes
*/
public static Tuple bytesToTuple(byte[] buf, int offset, int length, byte fieldDel) {
int start = offset;
ArrayList<Object> protoTuple = new ArrayList<Object>();
for (int i = offset; i < length; i++) {
if (buf[i] == fieldDel) {
readField(protoTuple, buf, start, i);
start = i + 1;
}
}
// pick up the last field
if (start <= length) {
readField(protoTuple, buf, start, length);
}
return TupleFactory.getInstance().newTupleNoCopy(protoTuple);
}
//---------------------------------------------------------------
// private methods
private static void readField(ArrayList<Object> protoTuple,
byte[] buf, int start, int end) {
if (start == end) {
// NULL value
protoTuple.add(null);
} else {
protoTuple.add(new DataByteArray(buf, start, end));
}
}
private static String parseSingleQuotedString(String delimiter) {
int startIndex = 0;
int endIndex;
while (startIndex < delimiter.length()
&& delimiter.charAt(startIndex++) != '\'')
;
endIndex = startIndex;
while (endIndex < delimiter.length()
&& delimiter.charAt(endIndex) != '\'') {
if (delimiter.charAt(endIndex) == '\\') {
endIndex++;
}
endIndex++;
}
return (endIndex < delimiter.length()) ?
delimiter.substring(startIndex, endIndex) : delimiter;
}
}