blob: 5e35239a84d38a1069de237ef92b14bd82a66d8a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.json;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import net.minidev.json.JSONObject;
import net.minidev.json.parser.JSONParser;
import net.minidev.json.parser.ParseException;
import org.apache.commons.net.util.Base64;
import org.apache.tajo.catalog.*;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.exception.NotImplementedException;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.text.TextLineDeserializer;
import org.apache.tajo.storage.text.TextLineParsingError;
import java.io.IOException;
import java.nio.charset.CharsetDecoder;
import java.util.Map;
import java.util.TimeZone;
public class JsonLineDeserializer extends TextLineDeserializer {
private JSONParser parser;
// Full Path -> Type
private final Map<String, Type> types;
private final String [] projectedPaths;
private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8);
private final TimeZone timezone;
public JsonLineDeserializer(Schema schema, TableMeta meta, Column [] projected) {
super(schema, meta);
projectedPaths = SchemaUtil.convertColumnsToPaths(Lists.newArrayList(projected), true);
types = SchemaUtil.buildTypeMap(schema.getAllColumns(), projectedPaths);
timezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE,
StorageUtil.TAJO_CONF.getSystemTimezone().getID()));
}
@Override
public void init() {
parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
}
/**
*
*
* @param object
* @param pathElements
* @param depth
* @param fieldIndex
* @param output
* @throws IOException
*/
private void getValue(JSONObject object,
String fullPath,
String [] pathElements,
int depth,
int fieldIndex,
Tuple output) throws IOException {
String fieldName = pathElements[depth];
if (!object.containsKey(fieldName)) {
output.put(fieldIndex, NullDatum.get());
}
switch (types.get(fullPath)) {
case BOOLEAN:
String boolStr = object.getAsString(fieldName);
if (boolStr != null) {
output.put(fieldIndex, DatumFactory.createBool(boolStr.equals("true")));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case CHAR:
String charStr = object.getAsString(fieldName);
if (charStr != null) {
output.put(fieldIndex, DatumFactory.createChar(charStr));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case INT1:
case INT2:
Number int2Num = object.getAsNumber(fieldName);
if (int2Num != null) {
output.put(fieldIndex, DatumFactory.createInt2(int2Num.shortValue()));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case INT4:
Number int4Num = object.getAsNumber(fieldName);
if (int4Num != null) {
output.put(fieldIndex, DatumFactory.createInt4(int4Num.intValue()));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case INT8:
Number int8Num = object.getAsNumber(fieldName);
if (int8Num != null) {
output.put(fieldIndex, DatumFactory.createInt8(int8Num.longValue()));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case FLOAT4:
Number float4Num = object.getAsNumber(fieldName);
if (float4Num != null) {
output.put(fieldIndex, DatumFactory.createFloat4(float4Num.floatValue()));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case FLOAT8:
Number float8Num = object.getAsNumber(fieldName);
if (float8Num != null) {
output.put(fieldIndex, DatumFactory.createFloat8(float8Num.doubleValue()));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case TEXT:
String textStr = object.getAsString(fieldName);
if (textStr != null) {
output.put(fieldIndex, DatumFactory.createText(textStr));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case TIMESTAMP:
String timestampStr = object.getAsString(fieldName);
if (timestampStr != null) {
output.put(fieldIndex, DatumFactory.createTimestamp(timestampStr, timezone));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case TIME:
String timeStr = object.getAsString(fieldName);
if (timeStr != null) {
output.put(fieldIndex, DatumFactory.createTime(timeStr));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case DATE:
String dateStr = object.getAsString(fieldName);
if (dateStr != null) {
output.put(fieldIndex, DatumFactory.createDate(dateStr));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case BIT:
case BINARY:
case VARBINARY:
case BLOB: {
Object jsonObject = object.getAsString(fieldName);
if (jsonObject == null) {
output.put(fieldIndex, NullDatum.get());
break;
}
output.put(fieldIndex, DatumFactory.createBlob(Base64.decodeBase64((String) jsonObject)));
break;
}
case INET4:
String inetStr = object.getAsString(fieldName);
if (inetStr != null) {
output.put(fieldIndex, DatumFactory.createInet4(inetStr));
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case RECORD:
JSONObject nestedObject = (JSONObject) object.get(fieldName);
if (nestedObject != null) {
getValue(nestedObject, fullPath + "/" + pathElements[depth+1], pathElements, depth + 1, fieldIndex, output);
} else {
output.put(fieldIndex, NullDatum.get());
}
break;
case NULL_TYPE:
output.put(fieldIndex, NullDatum.get());
break;
default:
throw new TajoRuntimeException(
new NotImplementedException("" + types.get(fullPath).name() + " for json"));
}
}
@Override
public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError {
String line = decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString();
JSONObject object;
try {
object = (JSONObject) parser.parse(line);
} catch (ParseException pe) {
throw new TextLineParsingError(line, pe);
} catch (ArrayIndexOutOfBoundsException ae) {
// truncated value
throw new TextLineParsingError(line, ae);
}
for (int i = 0; i < projectedPaths.length; i++) {
String [] paths = projectedPaths[i].split(NestedPathUtil.PATH_DELIMITER);
getValue(object, paths[0], paths, 0, i, output);
}
}
@Override
public void release() {
}
}