blob: 546a47904d883267c2c3b56e88ae6f0209e6e079 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.data;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.io.Writable;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
/**
* This class was used to handle reading and writing of intermediate
* results of data types. Now that functionality is in {@link BinInterSedes}
* This class could also be used for storing permanent results, it used
* by BinStorage through DefaultTuple class.
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
public class DataReaderWriter {
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
private static BagFactory mBagFactory = BagFactory.getInstance();
static final int UNSIGNED_SHORT_MAX = 65535;
public static final String UTF8 = "UTF-8";
private static final int ONE_MINUTE = 60000;
public static Tuple bytesToTuple(DataInput in) throws IOException {
// Don't use Tuple.readFields, because it requires you to
// create a tuple with no size and then append fields.
// That's less efficient than allocating the tuple size up
// front and then filling in the spaces.
// Read the size.
int sz = in.readInt();
// if sz == 0, we construct an "empty" tuple -
// presumably the writer wrote an empty tuple!
if (sz < 0) {
throw new IOException("Invalid size " + sz + " for a tuple");
}
Tuple t = mTupleFactory.newTuple(sz);
for (int i = 0; i < sz; i++) {
t.set(i, readDatum(in));
}
return t;
}
public static DataBag bytesToBag(DataInput in) throws IOException {
DataBag bag = mBagFactory.newDefaultBag();
long size = in.readLong();
for (long i = 0; i < size; i++) {
try {
Object o = readDatum(in);
bag.add((Tuple)o);
} catch (ExecException ee) {
throw ee;
}
}
return bag;
}
public static Map<String, Object> bytesToMap(DataInput in) throws IOException {
int size = in.readInt();
Map<String, Object> m = new HashMap<String, Object>(size);
for (int i = 0; i < size; i++) {
String key = (String)readDatum(in);
m.put(key, readDatum(in));
}
return m;
}
public static InternalMap bytesToInternalMap(DataInput in) throws IOException {
int size = in.readInt();
InternalMap m = new InternalMap(size);
for (int i = 0; i < size; i++) {
Object key = readDatum(in);
m.put(key, readDatum(in));
}
return m;
}
public static String bytesToCharArray(DataInput in) throws IOException{
int size = in.readUnsignedShort();
byte[] ba = new byte[size];
in.readFully(ba);
return new String(ba, DataReaderWriter.UTF8);
}
public static String bytesToBigCharArray(DataInput in) throws IOException{
int size = in.readInt();
byte[] ba = new byte[size];
in.readFully(ba);
return new String(ba, DataReaderWriter.UTF8);
}
public static Writable bytesToWritable(DataInput in) throws IOException {
String className = (String) readDatum(in);
// create the writeable class . It needs to have a default constructor
Class<?> objClass = null ;
try {
objClass = Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find class " + className +
", while attempting to de-serialize it ", e);
}
Writable writable = null;
try {
writable = (Writable) objClass.newInstance();
} catch (Exception e) {
String msg = "Could create instance of class " + className +
", while attempting to de-serialize it. (no default constructor ?)";
throw new IOException(msg, e);
}
//read the fields of the object from DataInput
writable.readFields(in);
return writable;
}
public static Object readDatum(DataInput in) throws IOException, ExecException {
// Read the data type
byte b = in.readByte();
return readDatum(in, b);
}
public static Object readDatum(DataInput in, byte type) throws IOException, ExecException {
switch (type) {
case DataType.TUPLE:
return bytesToTuple(in);
case DataType.BAG:
return bytesToBag(in);
case DataType.MAP:
return bytesToMap(in);
case DataType.INTERNALMAP:
return bytesToInternalMap(in);
case DataType.INTEGER:
return Integer.valueOf(in.readInt());
case DataType.LONG:
return Long.valueOf(in.readLong());
case DataType.FLOAT:
return Float.valueOf(in.readFloat());
case DataType.DOUBLE:
return Double.valueOf(in.readDouble());
case DataType.BIGINTEGER:
byte[] bigIntegerByteArray = new byte[in.readInt()];
in.readFully(bigIntegerByteArray);
return new BigInteger(bigIntegerByteArray);
case DataType.BIGDECIMAL:
byte[] bt = new byte[in.readInt()];
in.readFully(bt);
return new BigDecimal(new String(bt, DataReaderWriter.UTF8));
case DataType.BOOLEAN:
return Boolean.valueOf(in.readBoolean());
case DataType.BYTE:
return Byte.valueOf(in.readByte());
case DataType.DATETIME:
return new DateTime(in.readLong(), DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));
case DataType.BYTEARRAY: {
int size = in.readInt();
byte[] ba = new byte[size];
in.readFully(ba);
return new DataByteArray(ba);
}
case DataType.BIGCHARARRAY:
return bytesToBigCharArray(in);
case DataType.CHARARRAY:
return bytesToCharArray(in);
case DataType.GENERIC_WRITABLECOMPARABLE :
return bytesToWritable(in);
case DataType.NULL:
return null;
default:
throw new RuntimeException("Unexpected data type " + type +
" found in stream.");
}
}
@SuppressWarnings("unchecked")
public static void writeDatum(
DataOutput out,
Object val) throws IOException {
// Read the data type
byte type = DataType.findType(val);
switch (type) {
case DataType.TUPLE:
Tuple t = (Tuple)val;
out.writeByte(DataType.TUPLE);
int sz = t.size();
out.writeInt(sz);
for (int i = 0; i < sz; i++) {
DataReaderWriter.writeDatum(out, t.get(i));
}
break;
case DataType.BAG:
DataBag bag = (DataBag)val;
out.writeByte(DataType.BAG);
out.writeLong(bag.size());
Iterator<Tuple> it = bag.iterator();
while (it.hasNext()) {
DataReaderWriter.writeDatum(out, it.next());
}
break;
case DataType.MAP: {
out.writeByte(DataType.MAP);
Map<String, Object> m = (Map<String, Object>)val;
out.writeInt(m.size());
Iterator<Map.Entry<String, Object> > i =
m.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<String, Object> entry = i.next();
writeDatum(out, entry.getKey());
writeDatum(out, entry.getValue());
}
break;
}
case DataType.INTERNALMAP: {
out.writeByte(DataType.INTERNALMAP);
Map<Object, Object> m = (Map<Object, Object>)val;
out.writeInt(m.size());
Iterator<Map.Entry<Object, Object> > i =
m.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<Object, Object> entry = i.next();
writeDatum(out, entry.getKey());
writeDatum(out, entry.getValue());
}
break;
}
case DataType.INTEGER:
out.writeByte(DataType.INTEGER);
out.writeInt((Integer)val);
break;
case DataType.LONG:
out.writeByte(DataType.LONG);
out.writeLong((Long)val);
break;
case DataType.FLOAT:
out.writeByte(DataType.FLOAT);
out.writeFloat((Float)val);
break;
case DataType.DOUBLE:
out.writeByte(DataType.DOUBLE);
out.writeDouble((Double)val);
break;
case DataType.BOOLEAN:
out.writeByte(DataType.BOOLEAN);
out.writeBoolean((Boolean)val);
break;
case DataType.BYTE:
out.writeByte(DataType.BYTE);
out.writeByte((Byte)val);
break;
case DataType.DATETIME:
out.writeByte(DataType.DATETIME);
out.writeLong(((DateTime)val).getMillis());
out.writeShort(((DateTime)val).getZone().getOffset((DateTime)val) / 60000);
break;
case DataType.BYTEARRAY: {
out.writeByte(DataType.BYTEARRAY);
DataByteArray bytes = (DataByteArray)val;
out.writeInt(bytes.size());
out.write(bytes.mData);
break;
}
case DataType.BIGINTEGER:
out.writeByte(DataType.BIGINTEGER);
byte[] bytes = ((BigInteger)val).toByteArray();
out.writeInt(bytes.length);
out.write(bytes);
break;
case DataType.BIGDECIMAL:
out.writeByte(DataType.BIGDECIMAL);
byte[] bt = ((BigDecimal)val).toString().getBytes(DataReaderWriter.UTF8);
out.writeInt(bt.length);
out.write(bt);
break;
case DataType.CHARARRAY: {
String s = (String)val;
byte[] utfBytes = s.getBytes(DataReaderWriter.UTF8);
int length = utfBytes.length;
if(length < DataReaderWriter.UNSIGNED_SHORT_MAX) {
out.writeByte(DataType.CHARARRAY);
out.writeShort(length);
out.write(utfBytes);
} else {
out.writeByte(DataType.BIGCHARARRAY);
out.writeInt(length);
out.write(utfBytes);
}
break;
}
case DataType.GENERIC_WRITABLECOMPARABLE :
out.writeByte(DataType.GENERIC_WRITABLECOMPARABLE);
//store the class name, so we know the class to create on read
writeDatum(out, val.getClass().getName());
Writable writable = (Writable)val;
writable.write(out);
break;
case DataType.NULL:
out.writeByte(DataType.NULL);
break;
default:
throw new RuntimeException("Unexpected data type " + type +
" found in stream.");
}
}
}