blob: 614f92a1c4c03c100e321b0f059c6d78a4fae0c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.kafka;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampLocalTZWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TimestampLocalTZTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.DateTimeFormatterBuilder;
import org.joda.time.format.DateTimeParser;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Basic JsonSerDe to make use of such storage handler smooth and easy and testing basic primitive Json.
* For production please use Hive native JsonSerde.
*/
@SuppressWarnings("unused") class KafkaJsonSerDe extends AbstractSerDe {
private static final ThreadLocal<DateTimeFormatter>
TS_PARSER =
ThreadLocal.withInitial(KafkaJsonSerDe::createAutoParser);
private static final Function<TypeInfo, ObjectInspector> TYPEINFO_TO_OI =
typeInfo -> PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()));
private ObjectInspector inspector;
private final ObjectMapper mapper = new ObjectMapper();
private long rowCount = 0L;
private long rawDataSize = 0L;
@Override
public void initialize(Configuration configuration, Properties tableProperties, Properties partitionProperties)
throws SerDeException {
super.initialize(configuration, tableProperties, partitionProperties);
final List<ObjectInspector> inspectors;
inspectors = getColumnTypes().stream().map(TYPEINFO_TO_OI).collect(Collectors.toList());
inspector = ObjectInspectorFactory.getStandardStructObjectInspector(getColumnNames(), inspectors);
}
@Override public Class<? extends Writable> getSerializedClass() {
return BytesRefWritable.class;
}
@Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
throw new SerDeException("unimplemented");
}
@Override public SerDeStats getSerDeStats() {
SerDeStats serDeStats = new SerDeStats();
serDeStats.setRawDataSize(rawDataSize);
serDeStats.setRowCount(rowCount);
return serDeStats;
}
@Override public Object deserialize(Writable blob) throws SerDeException {
BytesWritable record = (BytesWritable) blob;
Map<String, JsonNode> payload;
try {
payload = parseAsJson(record.getBytes());
rowCount += 1;
rawDataSize += record.getLength();
} catch (IOException e) {
throw new SerDeException(e);
}
final List<Object> output = new ArrayList<>(getColumnNames().size());
for (int i = 0; i < getColumnNames().size(); i++) {
final String name = getColumnNames().get(i);
final TypeInfo typeInfo = getColumnTypes().get(i);
final JsonNode value = payload.get(name);
if (value == null) {
output.add(null);
} else {
switch (getColumnTypes().get(i).getCategory()) {
case PRIMITIVE:
output.add(parseAsPrimitive(value, typeInfo));
break;
case MAP:
case LIST:
case UNION:
case STRUCT:
default:
throw new SerDeException("not supported yet");
}
}
}
return output;
}
private Object parseAsPrimitive(JsonNode value, TypeInfo typeInfo) throws SerDeException {
switch (TypeInfoFactory.getPrimitiveTypeInfo(typeInfo.getTypeName()).getPrimitiveCategory()) {
case TIMESTAMP:
TimestampWritable timestampWritable = new TimestampWritable();
timestampWritable.setTime(TS_PARSER.get().parseMillis(value.textValue()));
return timestampWritable;
case TIMESTAMPLOCALTZ:
final long numberOfMillis = TS_PARSER.get().parseMillis(value.textValue());
return new TimestampLocalTZWritable(new TimestampTZ(ZonedDateTime.ofInstant(Instant.ofEpochMilli(numberOfMillis),
((TimestampLocalTZTypeInfo) typeInfo).timeZone())));
case BYTE:
return new ByteWritable((byte) value.intValue());
case SHORT:
return (new ShortWritable(value.shortValue()));
case INT:
return new IntWritable(value.intValue());
case LONG:
return (new LongWritable((value.longValue())));
case FLOAT:
return (new FloatWritable(value.floatValue()));
case DOUBLE:
return (new DoubleWritable(value.doubleValue()));
case DECIMAL:
return (new HiveDecimalWritable(HiveDecimal.create(value.decimalValue())));
case CHAR:
return (new HiveCharWritable(new HiveChar(value.textValue(), ((CharTypeInfo) typeInfo).getLength())));
case VARCHAR:
return (new HiveVarcharWritable(new HiveVarchar(value.textValue(), ((CharTypeInfo) typeInfo).getLength())));
case STRING:
return (new Text(value.textValue()));
case BOOLEAN:
return (new BooleanWritable(value.isBoolean() ? value.booleanValue() : Boolean.valueOf(value.textValue())));
default:
throw new SerDeException("Unknown type: " + typeInfo.getTypeName());
}
}
private Map<String, JsonNode> parseAsJson(byte[] value) throws IOException {
JsonNode document = mapper.readValue(value, JsonNode.class);
//Hive Column names are case insensitive.
Map<String, JsonNode> documentMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
document.fields().forEachRemaining(field -> documentMap.put(field.getKey().toLowerCase(), field.getValue()));
return documentMap;
}
@Override public ObjectInspector getObjectInspector() throws SerDeException {
if (inspector == null) {
throw new SerDeException("null inspector ??");
}
return inspector;
}
private static DateTimeFormatter createAutoParser() {
final DateTimeFormatter
offsetElement =
new DateTimeFormatterBuilder().appendTimeZoneOffset("Z", true, 2, 4).toFormatter();
DateTimeParser
timeOrOffset =
new DateTimeFormatterBuilder().append(null,
new DateTimeParser[] {new DateTimeFormatterBuilder().appendLiteral('T').toParser(),
new DateTimeFormatterBuilder().appendLiteral(' ').toParser()})
.appendOptional(ISODateTimeFormat.timeElementParser().getParser())
.appendOptional(offsetElement.getParser())
.toParser();
return new DateTimeFormatterBuilder().append(ISODateTimeFormat.dateElementParser())
.appendOptional(timeOrOffset)
.toFormatter();
}
}