blob: 4f8b628e00773a3fc9988f35d1c52b73263e42a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.parser;
import static org.apache.hyracks.api.exceptions.ErrorCode.PARSING_ERROR;
import java.io.DataOutput;
import java.io.IOException;
import java.util.BitSet;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.IAsterixListBuilder;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.parser.jackson.ADMToken;
import org.apache.asterix.external.parser.jackson.GeometryCoParser;
import org.apache.asterix.external.parser.jackson.ParserContext;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.base.AUnorderedList;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.AbstractCollectionType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.RecordUtil;
import org.apache.asterix.runtime.exceptions.UnsupportedTypeException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.hyracks.util.ParseUtil;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonStreamContext;
import com.fasterxml.jackson.core.JsonToken;
/**
* JSON format parser using Jackson parser.
*/
public abstract class AbstractJsonDataParser extends AbstractNestedDataParser<ADMToken> {
protected final ParserContext parserContext;
protected final JsonFactory jsonFactory;
protected final ARecordType rootType;
protected final GeometryCoParser geometryCoParser;
protected Supplier<String> dataSourceName = ExternalDataConstants.EMPTY_STRING;
protected LongSupplier lineNumber = ExternalDataConstants.NO_LINES;
protected JsonParser jsonParser;
/**
* Initialize JSONDataParser with GeometryCoParser
*
* @param recordType defined type.
* @param jsonFactory Jackson JSON parser factory.
*/
public AbstractJsonDataParser(ARecordType recordType, JsonFactory jsonFactory) {
// recordType currently cannot be null, however this is to guarantee for any future changes.
this.rootType = recordType != null ? recordType : RecordUtil.FULLY_OPEN_RECORD_TYPE;
this.jsonFactory = jsonFactory;
//GeometryCoParser to parse GeoJSON objects to AsterixDB internal spatial types.
geometryCoParser = new GeometryCoParser(jsonParser);
parserContext = new ParserContext();
}
/*
****************************************************
* Public methods
****************************************************
*/
public boolean parseAnyValue(DataOutput out) throws HyracksDataException {
try {
if (nextToken() == ADMToken.EOF) {
return false;
}
parseValue(BuiltinType.ANY, out);
return true;
} catch (IOException e) {
throw createException(e);
}
}
/*
****************************************************
* Abstract method implementation
****************************************************
*/
/**
* Jackson token to ADM token mapper
*/
@Override
protected final ADMToken advanceToNextToken() throws IOException {
final JsonToken jsonToken = jsonParser.nextToken();
if (jsonToken == null) {
return ADMToken.EOF;
}
ADMToken token;
switch (jsonToken) {
case VALUE_FALSE:
token = ADMToken.FALSE;
break;
case VALUE_TRUE:
token = ADMToken.TRUE;
break;
case VALUE_STRING:
token = ADMToken.STRING;
break;
case VALUE_NULL:
token = ADMToken.NULL;
break;
case VALUE_NUMBER_FLOAT:
token = ADMToken.DOUBLE;
break;
case VALUE_NUMBER_INT:
token = ADMToken.INT;
break;
case START_OBJECT:
token = ADMToken.OBJECT_START;
break;
case END_OBJECT:
token = ADMToken.OBJECT_END;
break;
case START_ARRAY:
token = ADMToken.ARRAY_START;
break;
case END_ARRAY:
token = ADMToken.ARRAY_END;
break;
case FIELD_NAME:
token = ADMToken.FIELD_NAME;
break;
default:
throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, jsonParser.currentToken().toString());
}
return token;
}
/*
****************************************************
* Overridden methods
****************************************************
*/
/**
* In the case of JSON, we can parse GeoJSON objects as internal AsterixDB spatial types.
*/
@Override
protected boolean isConvertable(ATypeTag parsedTypeTag, ATypeTag definedTypeTag) {
if (parsedTypeTag == ATypeTag.OBJECT && (definedTypeTag == ATypeTag.POINT || definedTypeTag == ATypeTag.LINE
|| definedTypeTag == ATypeTag.POLYGON)) {
return true;
}
return super.isConvertable(parsedTypeTag, definedTypeTag);
}
/*
****************************************************
* Complex types parsers
****************************************************
*/
@Override
protected final void parseObject(ARecordType recordType, DataOutput out) throws IOException {
final IMutableValueStorage valueBuffer = parserContext.enterObject();
final IARecordBuilder objectBuilder = parserContext.getObjectBuilder(recordType);
final BitSet nullBitMap = parserContext.getNullBitmap(recordType.getFieldTypes().length);
while (nextToken() != ADMToken.OBJECT_END) {
/*
* Jackson parser calls String.intern() for field names (if enabled).
* Calling getCurrentName() will not create multiple objects.
*/
final String fieldName = jsonParser.getCurrentName();
final int fieldIndex = recordType.getFieldIndex(fieldName);
if (!recordType.isOpen() && fieldIndex < 0) {
throw new RuntimeDataException(ErrorCode.PARSER_ADM_DATA_PARSER_EXTRA_FIELD_IN_CLOSED_RECORD,
LogRedactionUtil.userData(fieldName));
}
valueBuffer.reset();
nextToken();
if (fieldIndex < 0) {
//field is not defined and the type is open
parseValue(BuiltinType.ANY, valueBuffer.getDataOutput());
objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), valueBuffer);
} else {
//field is defined
final IAType fieldType = recordType.getFieldType(fieldName);
//fail fast if the current field is not nullable
if (currentToken() == ADMToken.NULL && !isNullableType(fieldType)) {
throw new RuntimeDataException(ErrorCode.PARSER_EXT_DATA_PARSER_CLOSED_FIELD_NULL,
LogRedactionUtil.userData(fieldName));
}
nullBitMap.set(fieldIndex);
parseValue(fieldType, valueBuffer.getDataOutput());
objectBuilder.addField(fieldIndex, valueBuffer);
}
}
/*
* Check for any possible missed values for a defined (non-nullable) type.
* Throws exception if there is a violation
*/
if (nullBitMap != null) {
checkOptionalConstraints(recordType, nullBitMap);
}
parserContext.exitObject(valueBuffer, nullBitMap, objectBuilder);
objectBuilder.write(out, true);
}
/**
* Geometry in GeoJSON is an object
*
* @param typeTag geometry typeTag
* @param out
* @throws IOException
*/
private void parseGeometry(ATypeTag typeTag, DataOutput out) throws IOException {
//Start the co-parser
geometryCoParser.starGeometry();
while (nextToken() != ADMToken.OBJECT_END) {
if (currentToken() == ADMToken.FIELD_NAME) {
geometryCoParser.checkFieldName(jsonParser.getCurrentName());
} else if (!geometryCoParser.checkValue(currentToken())) {
throw new IOException(geometryCoParser.getErrorMessage());
}
}
geometryCoParser.serialize(typeTag, out);
}
@Override
protected final void parseArray(AOrderedListType listType, DataOutput out) throws IOException {
parseCollection(listType, ADMToken.ARRAY_END, out);
}
@Override
protected void parseMultiset(AUnorderedList listType, DataOutput out) throws IOException {
throw new UnsupportedTypeException("JSON parser", ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG);
}
protected final void parseCollection(AbstractCollectionType collectionType, ADMToken endToken, DataOutput out)
throws IOException {
final IMutableValueStorage valueBuffer = parserContext.enterCollection();
final IAsterixListBuilder arrayBuilder = parserContext.getCollectionBuilder(collectionType);
final boolean isOpen = collectionType.getItemType().getTypeTag() == ATypeTag.ANY;
while (nextToken() != endToken) {
valueBuffer.reset();
if (isOpen) {
parseValue(BuiltinType.ANY, valueBuffer.getDataOutput());
} else {
//fail fast if current value is null
if (currentToken() == ADMToken.NULL) {
throw new RuntimeDataException(ErrorCode.PARSER_COLLECTION_ITEM_CANNOT_BE_NULL);
}
parseValue(collectionType.getItemType(), valueBuffer.getDataOutput());
}
arrayBuilder.addItem(valueBuffer);
}
parserContext.exitCollection(valueBuffer, arrayBuilder);
arrayBuilder.write(out, true);
}
/*
****************************************************
* Value parsers and serializers
****************************************************
*/
/**
* Parse JSON object or GeoJSON object.
*
* @param actualType
* @param out
* @throws IOException
*/
protected void parseObject(IAType actualType, DataOutput out) throws IOException {
if (actualType.getTypeTag() == ATypeTag.OBJECT) {
parseObject((ARecordType) actualType, out);
} else {
parseGeometry(actualType.getTypeTag(), out);
}
}
protected void parseValue(IAType definedType, DataOutput out) throws IOException {
final ATypeTag currentTypeTag = currentToken().getTypeTag();
/*
* In case of type mismatch, checkAndGetType will throw an exception.
*/
final IAType actualType = checkAndGetType(definedType, currentTypeTag);
switch (currentToken()) {
case NULL:
nullSerde.serialize(ANull.NULL, out);
break;
case FALSE:
booleanSerde.serialize(ABoolean.FALSE, out);
break;
case TRUE:
booleanSerde.serialize(ABoolean.TRUE, out);
break;
case INT:
case DOUBLE:
serializeNumeric(actualType.getTypeTag(), out);
break;
case STRING:
serializeString(actualType.getTypeTag(), out);
break;
case OBJECT_START:
parseObject(actualType, out);
break;
case ARRAY_START:
parseArray((AOrderedListType) actualType, out);
break;
default:
throw new RuntimeDataException(ErrorCode.PARSE_ERROR, jsonParser.currentToken().toString());
}
}
/**
* Given that numeric values may underflow or overflow, an exception will be thrown.
*
* @param numericType
* @param out
* @throws IOException
*/
protected void serializeNumeric(ATypeTag numericType, DataOutput out) throws IOException {
final ATypeTag typeToUse = numericType == ATypeTag.ANY ? currentToken().getTypeTag() : numericType;
switch (typeToUse) {
case BIGINT:
aInt64.setValue(jsonParser.getLongValue());
int64Serde.serialize(aInt64, out);
break;
case INTEGER:
aInt32.setValue(jsonParser.getIntValue());
int32Serde.serialize(aInt32, out);
break;
case SMALLINT:
aInt16.setValue(jsonParser.getShortValue());
int16Serde.serialize(aInt16, out);
break;
case TINYINT:
aInt8.setValue(jsonParser.getByteValue());
int8Serde.serialize(aInt8, out);
break;
case DOUBLE:
aDouble.setValue(jsonParser.getDoubleValue());
doubleSerde.serialize(aDouble, out);
break;
case FLOAT:
aFloat.setValue(jsonParser.getFloatValue());
floatSerde.serialize(aFloat, out);
break;
default:
throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, jsonParser.currentToken().toString());
}
}
/**
* Serialize the string value.
* TODO(wyk) avoid String objects for type STRING
*
* @param stringVariantType
* @param out
* @throws IOException
*/
protected void serializeString(ATypeTag stringVariantType, DataOutput out) throws IOException {
char[] buffer = jsonParser.getTextCharacters();
int begin = jsonParser.getTextOffset();
int len = jsonParser.getTextLength();
final ATypeTag typeToUse = stringVariantType == ATypeTag.ANY ? currentToken().getTypeTag() : stringVariantType;
switch (typeToUse) {
case STRING:
parseString(buffer, begin, len, out);
break;
case DATE:
parseDate(buffer, begin, len, out);
break;
case DATETIME:
parseDateTime(buffer, begin, len, out);
break;
case TIME:
parseTime(buffer, begin, len, out);
break;
case YEARMONTHDURATION:
parseYearMonthDuration(buffer, begin, len, out);
break;
case DAYTIMEDURATION:
parseDateTimeDuration(buffer, begin, len, out);
break;
case DURATION:
parseDuration(buffer, begin, len, out);
break;
case UUID:
parseUUID(buffer, begin, len, out);
break;
default:
throw new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, jsonParser.currentToken().toString());
}
}
protected HyracksDataException createException(Exception e) {
if (jsonParser != null) {
String msg;
if (e instanceof JsonParseException) {
msg = ((JsonParseException) e).getOriginalMessage();
} else {
Throwable rootCause = ExceptionUtils.getRootCause(e);
if (rootCause instanceof ParseException) {
msg = ((ParseException) rootCause).getOriginalMessage();
} else {
msg = ExceptionUtils.getRootCause(e).getMessage();
}
}
if (msg == null) {
msg = ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM.errorMessage();
}
long lineNum = lineNumber.getAsLong() + jsonParser.getCurrentLocation().getLineNr() - 1;
JsonStreamContext parsingContext = jsonParser.getParsingContext();
String fieldName = null;
while (parsingContext != null && fieldName == null) {
fieldName = parsingContext.getCurrentName();
parsingContext = parsingContext.getParent();
}
final String locationDetails = ParseUtil.asLocationDetailString(dataSourceName.get(), lineNum, fieldName);
return HyracksDataException.create(PARSING_ERROR, locationDetails, msg);
}
return new RuntimeDataException(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, e);
}
}