blob: 972346175bffc8a4867f22c1f607b48d803c8a3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.parser;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.OrderedListBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.builders.UnorderedListBuilder;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.om.base.temporal.GregorianCalendarSystem;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AUnionType;
import org.apache.asterix.om.types.AUnorderedListType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.util.string.UTF8StringWriter;
@SuppressWarnings("deprecation")
public class HiveRecordParser implements IRecordDataParser<Writable> {
private ARecordType recordType;
private SerDe hiveSerde;
private StructObjectInspector oi;
private IARecordBuilder recBuilder;
private ArrayBackedValueStorage fieldValueBuffer;
private ArrayBackedValueStorage listItemBuffer;
private byte[] fieldTypeTags;
private IAType[] fieldTypes;
private OrderedListBuilder orderedListBuilder;
private UnorderedListBuilder unorderedListBuilder;
private List<? extends StructField> fieldRefs;
private UTF8StringWriter utf8Writer = new UTF8StringWriter();
public HiveRecordParser(ARecordType recordType, JobConf hadoopConfiguration, String hiveSerdeClassName)
throws HyracksDataException {
try {
this.recordType = recordType;
int n = recordType.getFieldNames().length;
fieldTypes = recordType.getFieldTypes();
//create the hive table schema.
Properties tbl = new Properties();
tbl.put(Constants.LIST_COLUMNS, getCommaDelimitedColNames(this.recordType));
tbl.put(Constants.LIST_COLUMN_TYPES, getColTypes(this.recordType));
hiveSerde = (SerDe) Class.forName(hiveSerdeClassName).newInstance();
hiveSerde.initialize(hadoopConfiguration, tbl);
oi = (StructObjectInspector) hiveSerde.getObjectInspector();
fieldValueBuffer = new ArrayBackedValueStorage();
recBuilder = new RecordBuilder();
recBuilder.reset(recordType);
recBuilder.init();
fieldTypeTags = new byte[n];
for (int i = 0; i < n; i++) {
ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
fieldTypeTags[i] = tag.serialize();
}
fieldRefs = oi.getAllStructFieldRefs();
} catch (
Exception e)
{
throw new HyracksDataException(e);
}
}
@Override
public void parse(IRawRecord<? extends Writable> record, DataOutput out) throws HyracksDataException {
try {
Writable hiveRawRecord = record.get();
Object hiveObject = hiveSerde.deserialize(hiveRawRecord);
int n = recordType.getFieldNames().length;
List<Object> attributesValues = oi.getStructFieldsDataAsList(hiveObject);
recBuilder.reset(recordType);
recBuilder.init();
for (int i = 0; i < n; i++) {
final Object value = attributesValues.get(i);
final ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
fieldValueBuffer.reset();
final DataOutput dataOutput = fieldValueBuffer.getDataOutput();
dataOutput.writeByte(fieldTypeTags[i]);
//get field type
parseItem(fieldTypes[i], value, foi, dataOutput, false);
recBuilder.addField(i, fieldValueBuffer);
}
recBuilder.write(out, true);
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
private void parseItem(IAType itemType, Object value, ObjectInspector foi, DataOutput dataOutput,
boolean primitiveOnly) throws IOException {
switch (itemType.getTypeTag()) {
case BOOLEAN:
parseBoolean(value, (BooleanObjectInspector) foi, dataOutput);
break;
case TIME:
parseTime(value, (TimestampObjectInspector) foi, dataOutput);
break;
case DATE:
parseDate(value, (TimestampObjectInspector) foi, dataOutput);
break;
case DATETIME:
parseDateTime(value, (TimestampObjectInspector) foi, dataOutput);
break;
case DOUBLE:
parseDouble(value, (DoubleObjectInspector) foi, dataOutput);
break;
case FLOAT:
parseFloat(value, (FloatObjectInspector) foi, dataOutput);
break;
case INT8:
parseInt8(value, (ByteObjectInspector) foi, dataOutput);
break;
case INT16:
parseInt16(value, (ShortObjectInspector) foi, dataOutput);
break;
case INT32:
parseInt32(value, (IntObjectInspector) foi, dataOutput);
break;
case INT64:
parseInt64(value, (LongObjectInspector) foi, dataOutput);
break;
case STRING:
parseString(value, (StringObjectInspector) foi, dataOutput);
break;
case ORDEREDLIST:
if (primitiveOnly) {
throw new HyracksDataException("doesn't support hive data with list of non-primitive types");
}
parseOrderedList((AOrderedListType) itemType, value, (ListObjectInspector) foi);
break;
case UNORDEREDLIST:
if (primitiveOnly) {
throw new HyracksDataException("doesn't support hive data with list of non-primitive types");
}
parseUnorderedList((AUnorderedListType) itemType, value, (ListObjectInspector) foi);
break;
default:
throw new HyracksDataException("Can't get hive type for field of type " + itemType.getTypeTag());
}
}
private Object getColTypes(ARecordType record) throws Exception {
int n = record.getFieldTypes().length;
if (n < 1) {
throw new HyracksDataException("Failed to get columns of record");
}
//First Column
String cols = getHiveTypeString(record.getFieldTypes(), 0);
for (int i = 1; i < n; i++) {
cols = cols + "," + getHiveTypeString(record.getFieldTypes(), i);
}
return cols;
}
private String getCommaDelimitedColNames(ARecordType record) throws Exception {
if (record.getFieldNames().length < 1) {
throw new HyracksDataException("Can't deserialize hive records with no closed columns");
}
String cols = record.getFieldNames()[0];
for (int i = 1; i < record.getFieldNames().length; i++) {
cols = cols + "," + record.getFieldNames()[i];
}
return cols;
}
private String getHiveTypeString(IAType[] types, int i) throws Exception {
final IAType type = types[i];
ATypeTag tag = type.getTypeTag();
if (tag == ATypeTag.UNION) {
if (NonTaggedFormatUtil.isOptional(type)) {
throw new NotImplementedException("Non-optional UNION type is not supported.");
}
tag = ((AUnionType) type).getActualType().getTypeTag();
}
if (tag == null) {
throw new NotImplementedException("Failed to get the type information for field " + i + ".");
}
switch (tag) {
case BOOLEAN:
return Constants.BOOLEAN_TYPE_NAME;
case DATE:
return Constants.DATE_TYPE_NAME;
case DATETIME:
return Constants.DATETIME_TYPE_NAME;
case DOUBLE:
return Constants.DOUBLE_TYPE_NAME;
case FLOAT:
return Constants.FLOAT_TYPE_NAME;
case INT16:
return Constants.SMALLINT_TYPE_NAME;
case INT32:
return Constants.INT_TYPE_NAME;
case INT64:
return Constants.BIGINT_TYPE_NAME;
case INT8:
return Constants.TINYINT_TYPE_NAME;
case ORDEREDLIST:
return Constants.LIST_TYPE_NAME;
case STRING:
return Constants.STRING_TYPE_NAME;
case TIME:
return Constants.DATETIME_TYPE_NAME;
case UNORDEREDLIST:
return Constants.LIST_TYPE_NAME;
default:
throw new HyracksDataException("Can't get hive type for field of type " + tag);
}
}
private void parseInt64(Object obj, LongObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeLong(foi.get(obj));
}
private void parseInt32(Object obj, IntObjectInspector foi, DataOutput dataOutput) throws IOException {
if (obj == null) {
throw new HyracksDataException("can't parse null field");
}
dataOutput.writeInt(foi.get(obj));
}
private void parseInt16(Object obj, ShortObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeShort(foi.get(obj));
}
private void parseFloat(Object obj, FloatObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeFloat(foi.get(obj));
}
private void parseDouble(Object obj, DoubleObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeDouble(foi.get(obj));
}
private void parseDateTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeLong(foi.getPrimitiveJavaObject(obj).getTime());
}
private void parseDate(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
long chrononTimeInMs = foi.getPrimitiveJavaObject(obj).getTime();
short temp = 0;
if (chrononTimeInMs < 0 && chrononTimeInMs % GregorianCalendarSystem.CHRONON_OF_DAY != 0) {
temp = 1;
}
dataOutput.writeInt((int) (chrononTimeInMs / GregorianCalendarSystem.CHRONON_OF_DAY) - temp);
}
private void parseBoolean(Object obj, BooleanObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeBoolean(foi.get(obj));
}
private void parseInt8(Object obj, ByteObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeByte(foi.get(obj));
}
private void parseString(Object obj, StringObjectInspector foi, DataOutput dataOutput) throws IOException {
utf8Writer.writeUTF8(foi.getPrimitiveJavaObject(obj), dataOutput);
}
private void parseTime(Object obj, TimestampObjectInspector foi, DataOutput dataOutput) throws IOException {
dataOutput.writeInt((int) (foi.getPrimitiveJavaObject(obj).getTime() % 86400000));
}
private void parseOrderedList(AOrderedListType aOrderedListType, Object obj, ListObjectInspector foi)
throws IOException {
OrderedListBuilder orderedListBuilder = getOrderedListBuilder();
IAType itemType = null;
if (aOrderedListType != null)
itemType = aOrderedListType.getItemType();
orderedListBuilder.reset(aOrderedListType);
int n = foi.getListLength(obj);
for (int i = 0; i < n; i++) {
Object element = foi.getListElement(obj, i);
ObjectInspector eoi = foi.getListElementObjectInspector();
if (element == null) {
throw new HyracksDataException("can't parse hive list with null values");
}
parseItem(itemType, element, eoi, listItemBuffer.getDataOutput(), true);
orderedListBuilder.addItem(listItemBuffer);
}
orderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
}
private void parseUnorderedList(AUnorderedListType uoltype, Object obj, ListObjectInspector oi) throws IOException {
UnorderedListBuilder unorderedListBuilder = getUnorderedListBuilder();
IAType itemType = null;
if (uoltype != null)
itemType = uoltype.getItemType();
byte tagByte = itemType.getTypeTag().serialize();
unorderedListBuilder.reset(uoltype);
int n = oi.getListLength(obj);
for (int i = 0; i < n; i++) {
Object element = oi.getListElement(obj, i);
ObjectInspector eoi = oi.getListElementObjectInspector();
if (element == null) {
throw new HyracksDataException("can't parse hive list with null values");
}
listItemBuffer.reset();
final DataOutput dataOutput = listItemBuffer.getDataOutput();
dataOutput.writeByte(tagByte);
parseItem(itemType, element, eoi, dataOutput, true);
unorderedListBuilder.addItem(listItemBuffer);
}
unorderedListBuilder.write(fieldValueBuffer.getDataOutput(), true);
}
private OrderedListBuilder getOrderedListBuilder() {
if (orderedListBuilder != null)
return orderedListBuilder;
else {
orderedListBuilder = new OrderedListBuilder();
return orderedListBuilder;
}
}
private UnorderedListBuilder getUnorderedListBuilder() {
if (unorderedListBuilder != null)
return unorderedListBuilder;
else {
unorderedListBuilder = new UnorderedListBuilder();
return unorderedListBuilder;
}
}
}