blob: cac8e1a1722a9ec8317792ab0a0d94f2d5d59bf6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.data;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JsonSerDe implements SerDe {
private static final Logger LOG = LoggerFactory.getLogger(JsonSerDe.class);
private List<String> columnNames;
private List<TypeInfo> columnTypes;
private StructTypeInfo rowTypeInfo;
private HCatSchema schema;
private JsonFactory jsonFactory = null;
private HCatRecordObjectInspector cachedObjectInspector;
@Override
public void initialize(Configuration conf, Properties tbl)
throws SerDeException {
LOG.debug("Initializing JsonSerDe");
LOG.debug("props to serde: {}", tbl.entrySet());
// Get column names and types
String columnNameProperty = tbl.getProperty(Constants.LIST_COLUMNS);
String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
// all table column names
if (columnNameProperty.length() == 0) {
columnNames = new ArrayList<String>();
} else {
columnNames = Arrays.asList(columnNameProperty.split(","));
}
// all column types
if (columnTypeProperty.length() == 0) {
columnTypes = new ArrayList<TypeInfo>();
} else {
columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
}
LOG.debug("columns: {}, {}", columnNameProperty, columnNames);
LOG.debug("types: {}, {} ", columnTypeProperty, columnTypes);
assert (columnNames.size() == columnTypes.size());
rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
cachedObjectInspector = HCatRecordObjectInspectorFactory.getHCatRecordObjectInspector(rowTypeInfo);
try {
schema = HCatSchemaUtils.getHCatSchema(rowTypeInfo).get(0).getStructSubSchema();
LOG.debug("schema : {}", schema);
LOG.debug("fields : {}", schema.getFieldNames());
} catch (HCatException e) {
throw new SerDeException(e);
}
jsonFactory = new JsonFactory();
}
/**
* Takes JSON string in Text form, and has to return an object representation above
* it that's readable by the corresponding object inspector.
*
* For this implementation, since we're using the jackson parser, we can construct
* our own object implementation, and we use HCatRecord for it
*/
@Override
public Object deserialize(Writable blob) throws SerDeException {
Text t = (Text) blob;
JsonParser p;
List<Object> r = new ArrayList<Object>(Collections.nCopies(columnNames.size(), null));
try {
p = jsonFactory.createJsonParser(new ByteArrayInputStream((t.getBytes())));
if (p.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("Start token not found where expected");
}
JsonToken token;
while (((token = p.nextToken()) != JsonToken.END_OBJECT) && (token != null)) {
// iterate through each token, and create appropriate object here.
populateRecord(r, token, p, schema);
}
} catch (JsonParseException e) {
LOG.warn("Error [{}] parsing json text [{}].", e, t);
LOG.debug(null, e);
throw new SerDeException(e);
} catch (IOException e) {
LOG.warn("Error [{}] parsing json text [{}].", e, t);
LOG.debug(null, e);
throw new SerDeException(e);
}
return new DefaultHCatRecord(r);
}
private void populateRecord(List<Object> r, JsonToken token, JsonParser p, HCatSchema s) throws IOException {
if (token != JsonToken.FIELD_NAME) {
throw new IOException("Field name expected");
}
String fieldName = p.getText();
int fpos;
try {
fpos = s.getPosition(fieldName);
} catch (NullPointerException npe) {
fpos = getPositionFromHiveInternalColumnName(fieldName);
LOG.debug("NPE finding position for field [{}] in schema [{}]", fieldName, s);
if (!fieldName.equalsIgnoreCase(getHiveInternalColumnName(fpos))) {
LOG.error("Hive internal column name {} and position "
+ "encoding {} for the column name are at odds", fieldName, fpos);
throw npe;
}
if (fpos == -1) {
return; // unknown field, we return.
}
}
HCatFieldSchema hcatFieldSchema = s.getFields().get(fpos);
Object currField = extractCurrentField(p, null, hcatFieldSchema, false);
r.set(fpos, currField);
}
public String getHiveInternalColumnName(int fpos) {
return HiveConf.getColumnInternalName(fpos);
}
public int getPositionFromHiveInternalColumnName(String internalName) {
// return HiveConf.getPositionFromInternalName(fieldName);
// The above line should have been all the implementation that
// we need, but due to a bug in that impl which recognizes
// only single-digit columns, we need another impl here.
Pattern internalPattern = Pattern.compile("_col([0-9]+)");
Matcher m = internalPattern.matcher(internalName);
if (!m.matches()) {
return -1;
} else {
return Integer.parseInt(m.group(1));
}
}
/**
* Utility method to extract current expected field from given JsonParser
*
* To get the field, we need either a type or a hcatFieldSchema(necessary for complex types)
* It is possible that one of them can be null, and so, if so, the other is instantiated
* from the other
*
* isTokenCurrent is a boolean variable also passed in, which determines
* if the JsonParser is already at the token we expect to read next, or
* needs advancing to the next before we read.
*/
private Object extractCurrentField(JsonParser p, Type t,
HCatFieldSchema hcatFieldSchema, boolean isTokenCurrent) throws IOException, JsonParseException,
HCatException {
Object val = null;
JsonToken valueToken;
if (isTokenCurrent) {
valueToken = p.getCurrentToken();
} else {
valueToken = p.nextToken();
}
if (hcatFieldSchema != null) {
t = hcatFieldSchema.getType();
}
switch (t) {
case INT:
val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getIntValue();
break;
case TINYINT:
val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getByteValue();
break;
case SMALLINT:
val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getShortValue();
break;
case BIGINT:
val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getLongValue();
break;
case BOOLEAN:
String bval = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
if (bval != null) {
val = Boolean.valueOf(bval);
} else {
val = null;
}
break;
case FLOAT:
val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getFloatValue();
break;
case DOUBLE:
val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getDoubleValue();
break;
case STRING:
val = (valueToken == JsonToken.VALUE_NULL) ? null : p.getText();
break;
case BINARY:
throw new IOException("JsonSerDe does not support BINARY type");
case ARRAY:
if (valueToken == JsonToken.VALUE_NULL) {
val = null;
break;
}
if (valueToken != JsonToken.START_ARRAY) {
throw new IOException("Start of Array expected");
}
List<Object> arr = new ArrayList<Object>();
while ((valueToken = p.nextToken()) != JsonToken.END_ARRAY) {
arr.add(extractCurrentField(p, null, hcatFieldSchema.getArrayElementSchema().get(0), true));
}
val = arr;
break;
case MAP:
if (valueToken == JsonToken.VALUE_NULL) {
val = null;
break;
}
if (valueToken != JsonToken.START_OBJECT) {
throw new IOException("Start of Object expected");
}
Map<Object, Object> map = new LinkedHashMap<Object, Object>();
Type keyType = hcatFieldSchema.getMapKeyType();
HCatFieldSchema valueSchema = hcatFieldSchema.getMapValueSchema().get(0);
while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) {
Object k = getObjectOfCorrespondingPrimitiveType(p.getCurrentName(), keyType);
Object v;
if (valueSchema.getType() == HCatFieldSchema.Type.STRUCT) {
v = extractCurrentField(p, null, valueSchema, false);
} else {
v = extractCurrentField(p, null, valueSchema, true);
}
map.put(k, v);
}
val = map;
break;
case STRUCT:
if (valueToken == JsonToken.VALUE_NULL) {
val = null;
break;
}
if (valueToken != JsonToken.START_OBJECT) {
throw new IOException("Start of Object expected");
}
HCatSchema subSchema = hcatFieldSchema.getStructSubSchema();
int sz = subSchema.getFieldNames().size();
List<Object> struct = new ArrayList<Object>(Collections.nCopies(sz, null));
while ((valueToken = p.nextToken()) != JsonToken.END_OBJECT) {
populateRecord(struct, valueToken, p, subSchema);
}
val = struct;
break;
}
return val;
}
private Object getObjectOfCorrespondingPrimitiveType(String s, Type t) throws IOException {
switch (t) {
case INT:
return Integer.valueOf(s);
case TINYINT:
return Byte.valueOf(s);
case SMALLINT:
return Short.valueOf(s);
case BIGINT:
return Long.valueOf(s);
case BOOLEAN:
return (s.equalsIgnoreCase("true"));
case FLOAT:
return Float.valueOf(s);
case DOUBLE:
return Double.valueOf(s);
case STRING:
return s;
case BINARY:
throw new IOException("JsonSerDe does not support BINARY type");
}
throw new IOException("Could not convert from string to map type " + t);
}
/**
* Given an object and object inspector pair, traverse the object
* and generate a Text representation of the object.
*/
@Override
public Writable serialize(Object obj, ObjectInspector objInspector)
throws SerDeException {
StringBuilder sb = new StringBuilder();
try {
StructObjectInspector soi = (StructObjectInspector) objInspector;
List<? extends StructField> structFields = soi.getAllStructFieldRefs();
assert (columnNames.size() == structFields.size());
if (obj == null) {
sb.append("null");
} else {
sb.append(SerDeUtils.LBRACE);
for (int i = 0; i < structFields.size(); i++) {
if (i > 0) {
sb.append(SerDeUtils.COMMA);
}
sb.append(SerDeUtils.QUOTE);
sb.append(columnNames.get(i));
sb.append(SerDeUtils.QUOTE);
sb.append(SerDeUtils.COLON);
buildJSONString(sb, soi.getStructFieldData(obj, structFields.get(i)),
structFields.get(i).getFieldObjectInspector());
}
sb.append(SerDeUtils.RBRACE);
}
} catch (IOException e) {
LOG.warn("Error generating json text from object.", e);
throw new SerDeException(e);
}
return new Text(sb.toString());
}
// TODO : code section copied over from SerDeUtils because of non-standard json production there
// should use quotes for all field names. We should fix this there, and then remove this copy.
// See http://jackson.codehaus.org/1.7.3/javadoc/org/codehaus/jackson/JsonParser.Feature.html#ALLOW_UNQUOTED_FIELD_NAMES
// for details - trying to enable Jackson to ignore that doesn't seem to work(compilation failure
// when attempting to use that feature, so having to change the production itself.
// Also, throws IOException when Binary is detected.
private static void buildJSONString(StringBuilder sb, Object o, ObjectInspector oi) throws IOException {
switch (oi.getCategory()) {
case PRIMITIVE: {
PrimitiveObjectInspector poi = (PrimitiveObjectInspector) oi;
if (o == null) {
sb.append("null");
} else {
switch (poi.getPrimitiveCategory()) {
case BOOLEAN: {
boolean b = ((BooleanObjectInspector) poi).get(o);
sb.append(b ? "true" : "false");
break;
}
case BYTE: {
sb.append(((ByteObjectInspector) poi).get(o));
break;
}
case SHORT: {
sb.append(((ShortObjectInspector) poi).get(o));
break;
}
case INT: {
sb.append(((IntObjectInspector) poi).get(o));
break;
}
case LONG: {
sb.append(((LongObjectInspector) poi).get(o));
break;
}
case FLOAT: {
sb.append(((FloatObjectInspector) poi).get(o));
break;
}
case DOUBLE: {
sb.append(((DoubleObjectInspector) poi).get(o));
break;
}
case STRING: {
sb.append('"');
sb.append(SerDeUtils.escapeString(((StringObjectInspector) poi)
.getPrimitiveJavaObject(o)));
sb.append('"');
break;
}
case TIMESTAMP: {
sb.append('"');
sb.append(((TimestampObjectInspector) poi)
.getPrimitiveWritableObject(o));
sb.append('"');
break;
}
case BINARY: {
throw new IOException("JsonSerDe does not support BINARY type");
}
default:
throw new RuntimeException("Unknown primitive type: "
+ poi.getPrimitiveCategory());
}
}
break;
}
case LIST: {
ListObjectInspector loi = (ListObjectInspector) oi;
ObjectInspector listElementObjectInspector = loi
.getListElementObjectInspector();
List<?> olist = loi.getList(o);
if (olist == null) {
sb.append("null");
} else {
sb.append(SerDeUtils.LBRACKET);
for (int i = 0; i < olist.size(); i++) {
if (i > 0) {
sb.append(SerDeUtils.COMMA);
}
buildJSONString(sb, olist.get(i), listElementObjectInspector);
}
sb.append(SerDeUtils.RBRACKET);
}
break;
}
case MAP: {
MapObjectInspector moi = (MapObjectInspector) oi;
ObjectInspector mapKeyObjectInspector = moi.getMapKeyObjectInspector();
ObjectInspector mapValueObjectInspector = moi
.getMapValueObjectInspector();
Map<?, ?> omap = moi.getMap(o);
if (omap == null) {
sb.append("null");
} else {
sb.append(SerDeUtils.LBRACE);
boolean first = true;
for (Object entry : omap.entrySet()) {
if (first) {
first = false;
} else {
sb.append(SerDeUtils.COMMA);
}
Map.Entry<?, ?> e = (Map.Entry<?, ?>) entry;
StringBuilder keyBuilder = new StringBuilder();
buildJSONString(keyBuilder, e.getKey(), mapKeyObjectInspector);
String keyString = keyBuilder.toString().trim();
boolean doQuoting = (!keyString.isEmpty()) && (keyString.charAt(0) != SerDeUtils.QUOTE);
if (doQuoting) {
sb.append(SerDeUtils.QUOTE);
}
sb.append(keyString);
if (doQuoting) {
sb.append(SerDeUtils.QUOTE);
}
sb.append(SerDeUtils.COLON);
buildJSONString(sb, e.getValue(), mapValueObjectInspector);
}
sb.append(SerDeUtils.RBRACE);
}
break;
}
case STRUCT: {
StructObjectInspector soi = (StructObjectInspector) oi;
List<? extends StructField> structFields = soi.getAllStructFieldRefs();
if (o == null) {
sb.append("null");
} else {
sb.append(SerDeUtils.LBRACE);
for (int i = 0; i < structFields.size(); i++) {
if (i > 0) {
sb.append(SerDeUtils.COMMA);
}
sb.append(SerDeUtils.QUOTE);
sb.append(structFields.get(i).getFieldName());
sb.append(SerDeUtils.QUOTE);
sb.append(SerDeUtils.COLON);
buildJSONString(sb, soi.getStructFieldData(o, structFields.get(i)),
structFields.get(i).getFieldObjectInspector());
}
sb.append(SerDeUtils.RBRACE);
}
break;
}
case UNION: {
UnionObjectInspector uoi = (UnionObjectInspector) oi;
if (o == null) {
sb.append("null");
} else {
sb.append(SerDeUtils.LBRACE);
sb.append(uoi.getTag(o));
sb.append(SerDeUtils.COLON);
buildJSONString(sb, uoi.getField(o),
uoi.getObjectInspectors().get(uoi.getTag(o)));
sb.append(SerDeUtils.RBRACE);
}
break;
}
default:
throw new RuntimeException("Unknown type in ObjectInspector!");
}
}
/**
* Returns an object inspector for the specified schema that
* is capable of reading in the object representation of the JSON string
*/
@Override
public ObjectInspector getObjectInspector() throws SerDeException {
return cachedObjectInspector;
}
@Override
public Class<? extends Writable> getSerializedClass() {
return Text.class;
}
@Override
public SerDeStats getSerDeStats() {
// no support for statistics yet
return null;
}
}