blob: da7b28aae9a6d1b854f4f0d09b3c63c6f6f0c5c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.builtin;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.LoadStoreCaster;
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.LogUtils;
import org.joda.time.DateTime;
/**
* This abstract class provides standard conversions between utf8 encoded data
* and pig data types. It is intended to be extended by load and store
* functions (such as {@link PigStorage}).
*/
public class Utf8StorageConverter implements LoadStoreCaster {
protected BagFactory mBagFactory = BagFactory.getInstance();
protected TupleFactory mTupleFactory = TupleFactory.getInstance();
protected final Log mLog = LogFactory.getLog(getClass());
private static final Integer mMaxInt = Integer.valueOf(Integer.MAX_VALUE);
private static final Integer mMinInt = Integer.valueOf(Integer.MIN_VALUE);
private static final Long mMaxLong = Long.valueOf(Long.MAX_VALUE);
private static final Long mMinLong = Long.valueOf(Long.MIN_VALUE);
private static final int BUFFER_SIZE = 1024;
public Utf8StorageConverter() {
}
private char findStartChar(char start) throws IOException{
switch (start) {
case ')': return '(';
case ']': return '[';
case '}': return '{';
default: throw new IOException("Unknown start character");
}
}
private DataBag consumeBag(PushbackInputStream in, ResourceFieldSchema fieldSchema) throws IOException {
if (fieldSchema==null) {
throw new IOException("Schema is null");
}
ResourceFieldSchema[] fss=fieldSchema.getSchema().getFields();
Tuple t;
int buf;
while ((buf=in.read())!='{') {
if (buf==-1) {
throw new IOException("Unexpect end of bag");
}
}
if (fss.length!=1)
throw new IOException("Only tuple is allowed inside bag schema");
ResourceFieldSchema fs = fss[0];
DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
while (true) {
t = consumeTuple(in, fs);
if (t!=null)
db.add(t);
while ((buf=in.read())!='}'&&buf!=',') {
if (buf==-1) {
throw new IOException("Unexpect end of bag");
}
}
if (buf=='}')
break;
}
return db;
}
private Tuple consumeTuple(PushbackInputStream in, ResourceFieldSchema fieldSchema) throws IOException {
if (fieldSchema==null) {
throw new IOException("Schema is null");
}
int buf;
ByteArrayOutputStream mOut;
while ((buf=in.read())!='('||buf=='}') {
if (buf==-1) {
throw new IOException("Unexpect end of tuple");
}
if (buf=='}') {
in.unread(buf);
return null;
}
}
Tuple t = TupleFactory.getInstance().newTuple();
if (fieldSchema.getSchema()!=null && fieldSchema.getSchema().getFields().length!=0) {
ResourceFieldSchema[] fss = fieldSchema.getSchema().getFields();
// Interpret item inside tuple one by one based on the inner schema
for (int i=0;i<fss.length;i++) {
Object field;
ResourceFieldSchema fs = fss[i];
int delimit = ',';
if (i==fss.length-1)
delimit = ')';
if (DataType.isComplex(fs.getType())) {
field = consumeComplexType(in, fs);
while ((buf=in.read())!=delimit) {
if (buf==-1) {
throw new IOException("Unexpect end of tuple");
}
}
}
else {
mOut = new ByteArrayOutputStream(BUFFER_SIZE);
while ((buf=in.read())!=delimit) {
if (buf==-1) {
throw new IOException("Unexpect end of tuple");
}
if (buf==delimit)
break;
mOut.write(buf);
}
field = parseSimpleType(mOut.toByteArray(), fs);
}
t.append(field);
}
}
else {
// No inner schema, treat everything inside tuple as bytearray
Deque<Character> level = new LinkedList<Character>(); // keep track of nested tuple/bag/map. We do not interpret, save them as bytearray
mOut = new ByteArrayOutputStream(BUFFER_SIZE);
while (true) {
buf=in.read();
if (buf==-1) {
throw new IOException("Unexpect end of tuple");
}
if (buf=='['||buf=='{'||buf=='(') {
level.push((char)buf);
mOut.write(buf);
}
else if (buf==')' && level.isEmpty()) // End of tuple
{
DataByteArray value = new DataByteArray(mOut.toByteArray());
t.append(value);
break;
}
else if (buf==',' && level.isEmpty())
{
DataByteArray value = new DataByteArray(mOut.toByteArray());
t.append(value);
mOut.reset();
}
else if (buf==']' ||buf=='}'||buf==')')
{
if (level.peek()==findStartChar((char)buf))
level.pop();
else
throw new IOException("Malformed tuple");
mOut.write(buf);
}
else
mOut.write(buf);
}
}
return t;
}
private Map<String, Object> consumeMap(PushbackInputStream in, ResourceFieldSchema fieldSchema) throws IOException {
int buf;
boolean emptyMap = true;
while ((buf=in.read())!='[') {
if (buf==-1) {
throw new IOException("Unexpect end of map");
}
}
HashMap<String, Object> m = new HashMap<String, Object>();
ByteArrayOutputStream mOut = new ByteArrayOutputStream(BUFFER_SIZE);
while (true) {
// Read key (assume key can not contains special character such as #, (, [, {, }, ], )
while ((buf=in.read())!='#') {
// end of map
if (emptyMap && buf==']') {
return m;
}
if (buf==-1) {
throw new IOException("Unexpect end of map");
}
emptyMap = false;
mOut.write(buf);
}
String key = bytesToCharArray(mOut.toByteArray());
if (key.length()==0)
throw new IOException("Map key can not be null");
// Read value
mOut.reset();
Deque<Character> level = new LinkedList<Character>(); // keep track of nested tuple/bag/map. We do not interpret, save them as bytearray
while (true) {
buf=in.read();
if (buf==-1) {
throw new IOException("Unexpect end of map");
}
if (buf=='['||buf=='{'||buf=='(') {
level.push((char)buf);
}
else if (buf==']' && level.isEmpty()) // End of map
break;
else if (buf==']' ||buf=='}'||buf==')')
{
if (level.isEmpty())
throw new IOException("Malformed map");
if (level.peek()==findStartChar((char)buf))
level.pop();
} else if (buf==','&&level.isEmpty()) { // Current map item complete
break;
}
mOut.write(buf);
}
Object value = null;
if (fieldSchema!=null && fieldSchema.getSchema()!=null && mOut.size()>0) {
value = bytesToObject(mOut.toByteArray(), fieldSchema.getSchema().getFields()[0]);
} else if (mOut.size()>0) { // untyped map
value = new DataByteArray(mOut.toByteArray());
}
m.put(key, value);
mOut.reset();
if (buf==']')
break;
}
return m;
}
private Object bytesToObject(byte[] b, ResourceFieldSchema fs) throws IOException {
Object field;
if (DataType.isComplex(fs.getType())) {
ByteArrayInputStream bis = new ByteArrayInputStream(b);
PushbackInputStream in = new PushbackInputStream(bis);
field = consumeComplexType(in, fs);
}
else {
field = parseSimpleType(b, fs);
}
return field;
}
private Object consumeComplexType(PushbackInputStream in, ResourceFieldSchema complexFieldSchema) throws IOException {
Object field;
switch (complexFieldSchema.getType()) {
case DataType.BAG:
field = consumeBag(in, complexFieldSchema);
break;
case DataType.TUPLE:
field = consumeTuple(in, complexFieldSchema);
break;
case DataType.MAP:
field = consumeMap(in, complexFieldSchema);
break;
default:
throw new IOException("Unknown complex data type");
}
return field;
}
private Object parseSimpleType(byte[] b, ResourceFieldSchema simpleFieldSchema) throws IOException {
Object field;
switch (simpleFieldSchema.getType()) {
case DataType.INTEGER:
field = bytesToInteger(b);
break;
case DataType.LONG:
field = bytesToLong(b);
break;
case DataType.FLOAT:
field = bytesToFloat(b);
break;
case DataType.DOUBLE:
field = bytesToDouble(b);
break;
case DataType.CHARARRAY:
field = bytesToCharArray(b);
break;
case DataType.BYTEARRAY:
field = new DataByteArray(b);
break;
case DataType.BOOLEAN:
field = bytesToBoolean(b);
break;
case DataType.BIGINTEGER:
field = bytesToBigInteger(b);
break;
case DataType.BIGDECIMAL:
field = bytesToBigDecimal(b);
break;
case DataType.DATETIME:
field = bytesToDateTime(b);
break;
default:
throw new IOException("Unknown simple data type");
}
return field;
}
@Override
public DataBag bytesToBag(byte[] b, ResourceFieldSchema schema) throws IOException {
if(b == null)
return null;
DataBag db;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(b);
PushbackInputStream in = new PushbackInputStream(bis);
db = consumeBag(in, schema);
} catch (IOException e) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to type bag, caught ParseException <" +
e.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
return db;
}
@Override
public String bytesToCharArray(byte[] b) throws IOException {
if(b == null)
return null;
return new String(b, "UTF-8");
}
@Override
public Double bytesToDouble(byte[] b) {
if(b == null || b.length == 0) {
return null;
}
try {
return Double.valueOf(new String(b));
} catch (NumberFormatException nfe) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to double, caught NumberFormatException <" +
nfe.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
}
@Override
public Float bytesToFloat(byte[] b) throws IOException {
if(b == null || b.length == 0) {
return null;
}
String s;
if (b.length > 0 && (b[b.length - 1] == 'F' || b[b.length - 1] == 'f')) {
s = new String(b, 0, b.length - 1);
} else {
s = new String(b);
}
try {
return Float.valueOf(s);
} catch (NumberFormatException nfe) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to float, caught NumberFormatException <" +
nfe.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
}
@Override
public Boolean bytesToBoolean(byte[] b) throws IOException {
if(b == null)
return null;
String s = new String(b);
if (s.equalsIgnoreCase("true")) {
return Boolean.TRUE;
} else if (s.equalsIgnoreCase("false")) {
return Boolean.FALSE;
} else {
return null;
}
}
/**
* Sanity check of whether this number is a valid integer or long.
* @param number the number to check
* @return true if it doesn't contain any invalid characters, i.e. only contains digits and '-'
*/
private static boolean sanityCheckIntegerLong(String number){
for (int i=0; i < number.length(); i++){
if (number.charAt(i) >= '0' && number.charAt(i) <='9' || i == 0 && number.charAt(i) == '-'){
// valid one
}
else{
// contains invalid characters, must not be a integer or long.
return false;
}
}
return true;
}
@Override
public Integer bytesToInteger(byte[] b) throws IOException {
if(b == null || b.length == 0) {
return null;
}
String s = new String(b);
s = s.trim();
Integer ret = null;
// See PIG-2835. Using exception handling to check if it's a double is very expensive.
// So we write our sanity check.
if (sanityCheckIntegerLong(s)){
try {
ret = Integer.valueOf(s);
} catch (NumberFormatException nfe) {
}
}
if (ret == null){
// It's possible that this field can be interpreted as a double.
// Unfortunately Java doesn't handle this in Integer.valueOf. So
// we need to try to convert it to a double and if that works then
// go to an int.
try {
Double d = Double.valueOf(s);
// Need to check for an overflow error
if (Double.compare(d.doubleValue(), mMaxInt.doubleValue() + 1) >= 0 ||
Double.compare(d.doubleValue(), mMinInt.doubleValue() - 1) <= 0) {
LogUtils.warn(this, "Value " + d + " too large for integer",
PigWarning.TOO_LARGE_FOR_INT, mLog);
return null;
}
return Integer.valueOf(d.intValue());
} catch (NumberFormatException nfe2) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to int, caught NumberFormatException <" +
nfe2.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
}
return ret;
}
@Override
public Long bytesToLong(byte[] b) throws IOException {
if (b == null || b.length == 0) {
return null;
}
String s = new String(b).trim();
if(s.endsWith("l") || s.endsWith("L")) {
s = s.substring(0, s.length()-1);
}
// See PIG-2835. Using exception handling to check if it's a double is very expensive.
// So we write our sanity check.
Long ret = null;
if (sanityCheckIntegerLong(s)) {
try {
ret = Long.valueOf(s);
} catch (NumberFormatException nfe) {
}
}
if (ret == null) {
// It's possible that this field can be interpreted as a double.
// Unfortunately Java doesn't handle this in Long.valueOf. So
// we need to try to convert it to a double and if that works then
// go to an long.
try {
Double d = Double.valueOf(s);
// Need to check for an overflow error
if (Double.compare(d.doubleValue(), mMaxLong.doubleValue() + 1) > 0 ||
Double.compare(d.doubleValue(), mMinLong.doubleValue() - 1) < 0) {
LogUtils.warn(this, "Value " + d + " too large for long",
PigWarning.TOO_LARGE_FOR_INT, mLog);
return null;
}
return Long.valueOf(d.longValue());
} catch (NumberFormatException nfe2) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to long, caught NumberFormatException <" +
nfe2.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
}
return ret;
}
@Override
public DateTime bytesToDateTime(byte[] b) throws IOException {
if (b == null) {
return null;
}
try {
String dtStr = new String(b);
return ToDate.extractDateTime(dtStr);
} catch (IllegalArgumentException e) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to datetime, caught IllegalArgumentException <" +
e.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
}
@Override
public Map<String, Object> bytesToMap(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
if(b == null)
return null;
Map<String, Object> map;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(b);
PushbackInputStream in = new PushbackInputStream(bis);
map = consumeMap(in, fieldSchema);
}
catch (IOException e) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to type map, caught ParseException <" +
e.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
return map;
}
@Override
public Tuple bytesToTuple(byte[] b, ResourceFieldSchema fieldSchema) throws IOException {
if(b == null)
return null;
Tuple t;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(b);
PushbackInputStream in = new PushbackInputStream(bis);
t = consumeTuple(in, fieldSchema);
}
catch (IOException e) {
LogUtils.warn(this, "Unable to interpret value " + Arrays.toString(b) + " in field being " +
"converted to type tuple, caught ParseException <" +
e.getMessage() + "> field discarded",
PigWarning.FIELD_DISCARDED_TYPE_CONVERSION_FAILED, mLog);
return null;
}
return t;
}
@Override
public BigInteger bytesToBigInteger(byte[] b) throws IOException {
if (b == null || b.length == 0) {
return null;
}
return new BigInteger(new String(b));
}
@Override
public BigDecimal bytesToBigDecimal(byte[] b) throws IOException {
if (b == null || b.length == 0) {
return null;
}
return new BigDecimal(new String(b));
}
@Override
public byte[] toBytes(DataBag bag) throws IOException {
return bag.toString().getBytes();
}
@Override
public byte[] toBytes(String s) throws IOException {
return s.getBytes();
}
@Override
public byte[] toBytes(Double d) throws IOException {
return d.toString().getBytes();
}
@Override
public byte[] toBytes(Float f) throws IOException {
return f.toString().getBytes();
}
@Override
public byte[] toBytes(Integer i) throws IOException {
return i.toString().getBytes();
}
@Override
public byte[] toBytes(Long l) throws IOException {
return l.toString().getBytes();
}
@Override
public byte[] toBytes(Boolean b) throws IOException {
return b.toString().getBytes();
}
@Override
public byte[] toBytes(DateTime dt) throws IOException {
return dt.toString().getBytes();
}
@Override
public byte[] toBytes(Map<String, Object> m) throws IOException {
return DataType.mapToString(m).getBytes();
}
@Override
public byte[] toBytes(Tuple t) throws IOException {
return t.toString().getBytes();
}
@Override
public byte[] toBytes(DataByteArray a) throws IOException {
return a.get();
}
@Override
public byte[] toBytes(BigInteger bi) throws IOException {
return bi.toString().getBytes();
}
@Override
public byte[] toBytes(BigDecimal bd) throws IOException {
return bd.toString().getBytes();
}
}