| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package edu.uci.ics.hivesterix.serde.lazy; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hive.serde.Constants; |
| import org.apache.hadoop.hive.serde2.ByteStream; |
| import org.apache.hadoop.hive.serde2.ByteStream.Output; |
| import org.apache.hadoop.hive.serde2.SerDe; |
| import org.apache.hadoop.hive.serde2.SerDeException; |
| import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; |
| import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.StructField; |
| import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; |
| import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| |
| /** |
| * The LazySerDe class combines the lazy property of LazySimpleSerDe class and |
| * the binary property of BinarySortable class. Lazy means a field is not |
| * deserialized until required. Binary means a field is serialized in binary |
| * compact format. |
| */ |
| public class LazySerDe implements SerDe { |
| |
| public static final Log LOG = LogFactory.getLog(LazySerDe.class.getName()); |
| |
| public LazySerDe() { |
| } |
| |
| List<String> columnNames; |
| List<TypeInfo> columnTypes; |
| |
| TypeInfo rowTypeInfo; |
| ObjectInspector cachedObjectInspector; |
| |
| // The object for storing row data |
| LazyColumnar cachedLazyStruct; |
| |
| /** |
| * Initialize the SerDe with configuration and table information. |
| */ |
| @Override |
| public void initialize(Configuration conf, Properties tbl) throws SerDeException { |
| // Get column names and types |
| String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS); |
| String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES); |
| if (columnNameProperty.length() == 0) { |
| columnNames = new ArrayList<String>(); |
| } else { |
| columnNames = Arrays.asList(columnNameProperty.split(",")); |
| } |
| if (columnTypeProperty.length() == 0) { |
| columnTypes = new ArrayList<TypeInfo>(); |
| } else { |
| columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); |
| } |
| assert (columnNames.size() == columnTypes.size()); |
| // Create row related objects |
| rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); |
| // Create the object inspector and the lazy binary struct object |
| cachedObjectInspector = LazyUtils.getLazyObjectInspectorFromTypeInfo(rowTypeInfo, true); |
| cachedLazyStruct = (LazyColumnar) LazyFactory.createLazyObject(cachedObjectInspector); |
| // output debug info |
| LOG.debug("LazySerDe initialized with: columnNames=" + columnNames + " columnTypes=" + columnTypes); |
| } |
| |
| /** |
| * Returns the ObjectInspector for the row. |
| */ |
| @Override |
| public ObjectInspector getObjectInspector() throws SerDeException { |
| return cachedObjectInspector; |
| } |
| |
| /** |
| * Returns the Writable Class after serialization. |
| */ |
| @Override |
| public Class<? extends Writable> getSerializedClass() { |
| return BytesWritable.class; |
| } |
| |
| // The wrapper for byte array |
| ByteArrayRef byteArrayRef; |
| |
| /** |
| * Deserialize a table record to a Lazy struct. |
| */ |
| @SuppressWarnings("deprecation") |
| @Override |
| public Object deserialize(Writable field) throws SerDeException { |
| if (byteArrayRef == null) { |
| byteArrayRef = new ByteArrayRef(); |
| } |
| if (field instanceof BytesWritable) { |
| BytesWritable b = (BytesWritable) field; |
| if (b.getSize() == 0) { |
| return null; |
| } |
| // For backward-compatibility with hadoop 0.17 |
| byteArrayRef.setData(b.get()); |
| cachedLazyStruct.init(byteArrayRef.getData(), 0, b.getSize()); |
| } else if (field instanceof Text) { |
| Text t = (Text) field; |
| if (t.getLength() == 0) { |
| return null; |
| } |
| byteArrayRef.setData(t.getBytes()); |
| cachedLazyStruct.init(byteArrayRef.getData(), 0, t.getLength()); |
| } else { |
| throw new SerDeException(getClass().toString() + ": expects either BytesWritable or Text object!"); |
| } |
| return cachedLazyStruct; |
| } |
| |
| /** |
| * The reusable output buffer and serialize byte buffer. |
| */ |
| BytesWritable serializeBytesWritable = new BytesWritable(); |
| ByteStream.Output serializeByteStream = new ByteStream.Output(); |
| |
| /** |
| * Serialize an object to a byte buffer in a binary compact way. |
| */ |
| @Override |
| public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { |
| // make sure it is a struct record or not |
| serializeByteStream.reset(); |
| |
| if (objInspector.getCategory() != Category.STRUCT) { |
| // serialize the primitive object |
| serialize(serializeByteStream, obj, objInspector); |
| } else { |
| // serialize the row as a struct |
| serializeStruct(serializeByteStream, obj, (StructObjectInspector) objInspector); |
| } |
| // return the serialized bytes |
| serializeBytesWritable.set(serializeByteStream.getData(), 0, serializeByteStream.getCount()); |
| return serializeBytesWritable; |
| } |
| |
| boolean nullMapKey = false; |
| |
| /** |
| * Serialize a struct object without writing the byte size. This function is |
| * shared by both row serialization and struct serialization. |
| * |
| * @param byteStream |
| * the byte stream storing the serialization data |
| * @param obj |
| * the struct object to serialize |
| * @param objInspector |
| * the struct object inspector |
| */ |
| private void serializeStruct(Output byteStream, Object obj, StructObjectInspector soi) { |
| // do nothing for null struct |
| if (null == obj) { |
| return; |
| } |
| /* |
| * Interleave serializing one null byte and 8 struct fields in each |
| * round, in order to support data deserialization with different table |
| * schemas |
| */ |
| List<? extends StructField> fields = soi.getAllStructFieldRefs(); |
| int size = fields.size(); |
| int lasti = 0; |
| byte nullByte = 0; |
| for (int i = 0; i < size; i++) { |
| // set bit to 1 if a field is not null |
| if (null != soi.getStructFieldData(obj, fields.get(i))) { |
| nullByte |= 1 << (i % 8); |
| } |
| // write the null byte every eight elements or |
| // if this is the last element and serialize the |
| // corresponding 8 struct fields at the same time |
| if (7 == i % 8 || i == size - 1) { |
| serializeByteStream.write(nullByte); |
| for (int j = lasti; j <= i; j++) { |
| serialize(serializeByteStream, soi.getStructFieldData(obj, fields.get(j)), fields.get(j) |
| .getFieldObjectInspector()); |
| } |
| lasti = i + 1; |
| nullByte = 0; |
| } |
| } |
| } |
| |
| /** |
| * A recursive function that serialize an object to a byte buffer based on |
| * its object inspector. |
| * |
| * @param byteStream |
| * the byte stream storing the serialization data |
| * @param obj |
| * the object to serialize |
| * @param objInspector |
| * the object inspector |
| */ |
| private void serialize(Output byteStream, Object obj, ObjectInspector objInspector) { |
| |
| // do nothing for null object |
| if (null == obj) { |
| return; |
| } |
| |
| switch (objInspector.getCategory()) { |
| case PRIMITIVE: { |
| PrimitiveObjectInspector poi = (PrimitiveObjectInspector) objInspector; |
| switch (poi.getPrimitiveCategory()) { |
| case VOID: { |
| return; |
| } |
| case BOOLEAN: { |
| boolean v = ((BooleanObjectInspector) poi).get(obj); |
| byteStream.write((byte) (v ? 1 : 0)); |
| return; |
| } |
| case BYTE: { |
| ByteObjectInspector boi = (ByteObjectInspector) poi; |
| byte v = boi.get(obj); |
| byteStream.write(v); |
| return; |
| } |
| case SHORT: { |
| ShortObjectInspector spoi = (ShortObjectInspector) poi; |
| short v = spoi.get(obj); |
| byteStream.write((byte) (v >> 8)); |
| byteStream.write((byte) (v)); |
| return; |
| } |
| case INT: { |
| IntObjectInspector ioi = (IntObjectInspector) poi; |
| int v = ioi.get(obj); |
| LazyUtils.writeVInt(byteStream, v); |
| return; |
| } |
| case LONG: { |
| LongObjectInspector loi = (LongObjectInspector) poi; |
| long v = loi.get(obj); |
| LazyUtils.writeVLong(byteStream, v); |
| return; |
| } |
| case FLOAT: { |
| FloatObjectInspector foi = (FloatObjectInspector) poi; |
| int v = Float.floatToIntBits(foi.get(obj)); |
| byteStream.write((byte) (v >> 24)); |
| byteStream.write((byte) (v >> 16)); |
| byteStream.write((byte) (v >> 8)); |
| byteStream.write((byte) (v)); |
| return; |
| } |
| case DOUBLE: { |
| DoubleObjectInspector doi = (DoubleObjectInspector) poi; |
| long v = Double.doubleToLongBits(doi.get(obj)); |
| byteStream.write((byte) (v >> 56)); |
| byteStream.write((byte) (v >> 48)); |
| byteStream.write((byte) (v >> 40)); |
| byteStream.write((byte) (v >> 32)); |
| byteStream.write((byte) (v >> 24)); |
| byteStream.write((byte) (v >> 16)); |
| byteStream.write((byte) (v >> 8)); |
| byteStream.write((byte) (v)); |
| return; |
| } |
| case STRING: { |
| StringObjectInspector soi = (StringObjectInspector) poi; |
| Text t = soi.getPrimitiveWritableObject(obj); |
| /* write byte size of the string which is a vint */ |
| int length = t.getLength(); |
| LazyUtils.writeVInt(byteStream, length); |
| /* write string itself */ |
| byte[] data = t.getBytes(); |
| byteStream.write(data, 0, length); |
| return; |
| } |
| default: { |
| throw new RuntimeException("Unrecognized type: " + poi.getPrimitiveCategory()); |
| } |
| } |
| } |
| case LIST: { |
| ListObjectInspector loi = (ListObjectInspector) objInspector; |
| ObjectInspector eoi = loi.getListElementObjectInspector(); |
| |
| // 1/ reserve spaces for the byte size of the list |
| // which is a integer and takes four bytes |
| int byteSizeStart = byteStream.getCount(); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| int listStart = byteStream.getCount(); |
| |
| // 2/ write the size of the list as a VInt |
| int size = loi.getListLength(obj); |
| LazyUtils.writeVInt(byteStream, size); |
| |
| // 3/ write the null bytes |
| byte nullByte = 0; |
| for (int eid = 0; eid < size; eid++) { |
| // set the bit to 1 if an element is not null |
| if (null != loi.getListElement(obj, eid)) { |
| nullByte |= 1 << (eid % 8); |
| } |
| // store the byte every eight elements or |
| // if this is the last element |
| if (7 == eid % 8 || eid == size - 1) { |
| byteStream.write(nullByte); |
| nullByte = 0; |
| } |
| } |
| |
| // 4/ write element by element from the list |
| for (int eid = 0; eid < size; eid++) { |
| serialize(byteStream, loi.getListElement(obj, eid), eoi); |
| } |
| |
| // 5/ update the list byte size |
| int listEnd = byteStream.getCount(); |
| int listSize = listEnd - listStart; |
| byte[] bytes = byteStream.getData(); |
| bytes[byteSizeStart] = (byte) (listSize >> 24); |
| bytes[byteSizeStart + 1] = (byte) (listSize >> 16); |
| bytes[byteSizeStart + 2] = (byte) (listSize >> 8); |
| bytes[byteSizeStart + 3] = (byte) (listSize); |
| |
| return; |
| } |
| case MAP: { |
| MapObjectInspector moi = (MapObjectInspector) objInspector; |
| ObjectInspector koi = moi.getMapKeyObjectInspector(); |
| ObjectInspector voi = moi.getMapValueObjectInspector(); |
| Map<?, ?> map = moi.getMap(obj); |
| |
| // 1/ reserve spaces for the byte size of the map |
| // which is a integer and takes four bytes |
| int byteSizeStart = byteStream.getCount(); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| int mapStart = byteStream.getCount(); |
| |
| // 2/ write the size of the map which is a VInt |
| int size = map.size(); |
| LazyUtils.writeVInt(byteStream, size); |
| |
| // 3/ write the null bytes |
| int b = 0; |
| byte nullByte = 0; |
| for (Map.Entry<?, ?> entry : map.entrySet()) { |
| // set the bit to 1 if a key is not null |
| if (null != entry.getKey()) { |
| nullByte |= 1 << (b % 8); |
| } else if (!nullMapKey) { |
| nullMapKey = true; |
| LOG.warn("Null map key encountered! Ignoring similar problems."); |
| } |
| b++; |
| // set the bit to 1 if a value is not null |
| if (null != entry.getValue()) { |
| nullByte |= 1 << (b % 8); |
| } |
| b++; |
| // write the byte to stream every 4 key-value pairs |
| // or if this is the last key-value pair |
| if (0 == b % 8 || b == size * 2) { |
| byteStream.write(nullByte); |
| nullByte = 0; |
| } |
| } |
| |
| // 4/ write key-value pairs one by one |
| for (Map.Entry<?, ?> entry : map.entrySet()) { |
| serialize(byteStream, entry.getKey(), koi); |
| serialize(byteStream, entry.getValue(), voi); |
| } |
| |
| // 5/ update the byte size of the map |
| int mapEnd = byteStream.getCount(); |
| int mapSize = mapEnd - mapStart; |
| byte[] bytes = byteStream.getData(); |
| bytes[byteSizeStart] = (byte) (mapSize >> 24); |
| bytes[byteSizeStart + 1] = (byte) (mapSize >> 16); |
| bytes[byteSizeStart + 2] = (byte) (mapSize >> 8); |
| bytes[byteSizeStart + 3] = (byte) (mapSize); |
| |
| return; |
| } |
| case STRUCT: { |
| // 1/ reserve spaces for the byte size of the struct |
| // which is a integer and takes four bytes |
| int byteSizeStart = byteStream.getCount(); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| byteStream.write((byte) 0); |
| int structStart = byteStream.getCount(); |
| |
| // 2/ serialize the struct |
| serializeStruct(byteStream, obj, (StructObjectInspector) objInspector); |
| |
| // 3/ update the byte size of the struct |
| int structEnd = byteStream.getCount(); |
| int structSize = structEnd - structStart; |
| byte[] bytes = byteStream.getData(); |
| bytes[byteSizeStart] = (byte) (structSize >> 24); |
| bytes[byteSizeStart + 1] = (byte) (structSize >> 16); |
| bytes[byteSizeStart + 2] = (byte) (structSize >> 8); |
| bytes[byteSizeStart + 3] = (byte) (structSize); |
| |
| return; |
| } |
| default: { |
| throw new RuntimeException("Unrecognized type: " + objInspector.getCategory()); |
| } |
| } |
| } |
| } |