blob: 67c009b9a40aa056ba73e9858bd10eb67f08d819 [file] [log] [blame]
package org.apache.hawq.pxf.plugins.hive;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.commons.lang.CharUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.*;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hawq.pxf.api.BadRecordException;
import org.apache.hawq.pxf.api.OneField;
import org.apache.hawq.pxf.api.OneRow;
import org.apache.hawq.pxf.api.UnsupportedTypeException;
import org.apache.hawq.pxf.api.io.DataType;
import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
import org.apache.hawq.pxf.api.utilities.InputData;
import org.apache.hawq.pxf.api.utilities.Utilities;
import org.apache.hawq.pxf.plugins.hdfs.utilities.HdfsUtilities;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import static org.apache.hawq.pxf.api.io.DataType.*;
import static org.apache.hawq.pxf.api.io.DataType.DATE;
import static org.apache.hawq.pxf.api.io.DataType.SMALLINT;
/**
* Specialized HiveResolver for a Hive table stored as RC file.
* Use together with HiveInputFormatFragmenter/HiveRCFileAccessor.
*/
public class HiveORCSerdeResolver extends HiveResolver {
private static final Log LOG = LogFactory.getLog(HiveORCSerdeResolver.class);
private OrcSerde deserializer;
private boolean firstColumn;
private StringBuilder builder;
private StringBuilder parts;
private int numberOfPartitions;
private HiveInputFormatFragmenter.PXF_HIVE_SERDES serdeType;
private static final String MAPKEY_DELIM = ":";
private static final String COLLECTION_DELIM = ",";
private String collectionDelim;
private String mapkeyDelim;
public HiveORCSerdeResolver(InputData input) throws Exception {
super(input);
}
/* read the data supplied by the fragmenter: inputformat name, serde name, partition keys */
@Override
void parseUserData(InputData input) throws Exception {
String[] toks = HiveInputFormatFragmenter.parseToks(input);
String serdeEnumStr = toks[HiveInputFormatFragmenter.TOK_SERDE];
if (serdeEnumStr.equals(HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE.name())) {
serdeType = HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE;
} else {
throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeEnumStr);
}
parts = new StringBuilder();
partitionKeys = toks[HiveInputFormatFragmenter.TOK_KEYS];
parseDelimiterChar(input);
collectionDelim = input.getUserProperty("COLLECTION_DELIM") == null ? COLLECTION_DELIM
: input.getUserProperty("COLLECTION_DELIM");
mapkeyDelim = input.getUserProperty("MAPKEY_DELIM") == null ? MAPKEY_DELIM
: input.getUserProperty("MAPKEY_DELIM");
}
@Override
void initPartitionFields() {
numberOfPartitions = initPartitionFields(parts);
}
/**
* getFields returns a singleton list of OneField item.
* OneField item contains two fields: an integer representing the VARCHAR type and a Java
* Object representing the field value.
*/
@Override
public List<OneField> getFields(OneRow onerow) throws Exception {
Object tuple = deserializer.deserialize((Writable) onerow.getData());
// Each Hive record is a Struct
StructObjectInspector soi = (StructObjectInspector) deserializer.getObjectInspector();
List<OneField> record = traverseStruct(tuple, soi, false);
return record;
}
/*
* Get and init the deserializer for the records of this Hive data fragment.
* Suppress Warnings added because deserializer.initialize is an abstract function that is deprecated
* but its implementations (ColumnarSerDe, LazyBinaryColumnarSerDe) still use the deprecated interface.
*/
@SuppressWarnings("deprecation")
@Override
void initSerde(InputData input) throws Exception {
Properties serdeProperties = new Properties();
int numberOfDataColumns = input.getColumns() - numberOfPartitions;
LOG.debug("Serde number of columns is " + numberOfDataColumns);
StringBuilder columnNames = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
StringBuilder columnTypes = new StringBuilder(numberOfDataColumns * 2); // column + delimiter
String delim = "";
for (int i = 0; i < numberOfDataColumns; i++) {
ColumnDescriptor column = input.getColumn(i);
String columnName = column.columnName();
String columnType = HiveInputFormatFragmenter.toHiveType(DataType.get(column.columnTypeCode()), columnName);
columnNames.append(delim).append(columnName);
columnTypes.append(delim).append(columnType);
delim = ",";
}
serdeProperties.put(serdeConstants.LIST_COLUMNS, columnNames.toString());
serdeProperties.put(serdeConstants.LIST_COLUMN_TYPES, columnTypes.toString());
if (serdeType == HiveInputFormatFragmenter.PXF_HIVE_SERDES.ORC_SERDE) {
deserializer = new OrcSerde();
} else {
throw new UnsupportedTypeException("Unsupported Hive Serde: " + serdeType.name()); /* we should not get here */
}
deserializer.initialize(new JobConf(new Configuration(), HiveORCSerdeResolver.class), serdeProperties);
}
}