blob: 1f8dc5c2b72863c2414ea93580e9f1439a1fc853 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.contrib.parser;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.StringTokenizer;
import org.elasticsearch.common.primitives.Ints;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.parser.Parser;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.FieldInfo.SupportType;
import com.datatorrent.lib.util.KeyValPair;
import com.datatorrent.lib.util.PojoUtils;
/**
* Operator that parses a JSON string tuple and emits a POJO on the output port
* and tuples that could not be parsed on error port.Upstream operator needs to
* ensure that a full JSON record is emitted.<br>
* <b>Properties</b><br>
* <b>pojoClass</b>:POJO class <br>
* <b>(optional)fieldMappingString</b>String of format
* fieldNameInJson:fieldNameInPOJO:DataType<br>
* <b>Ports</b> <br>
* <b>in</b>:input tuple as a String. Each tuple represents a json string<br>
* <b>out</b>:tuples that are validated as per the user defined POJO are emitted
* as POJO on this port<br>
* <b>err</b>:tuples that could not be parsed are emitted on this port as
* KeyValPair<String,String><br>
* Key being the tuple and Val being the reason
*
* @displayName SimpleStreamingJsonParser
* @category Parsers
* @tags json pojo parser streaming
*
* @since 3.5.0
*/
@InterfaceStability.Evolving
public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, String>>
{
private transient JSONParser jsonParser;
private transient String fieldMappingString;
private transient List<FieldInfo> fieldInfos;
private transient List<ActiveFieldInfo> columnFieldSetters;
protected JsonKeyFinder finder;
private static final String FIELD_SEPARATOR = ":";
private static final String RECORD_SEPARATOR = ",";
private transient ArrayList<String> columnFields;
private transient Class<?> pojoClass;
/**
* @return POJO class
*/
private Class<?> getPojoClass()
{
return pojoClass;
}
/**
* Sets the POJO class
*/
public void setPojoClass(Class<?> pojoClass)
{
this.pojoClass = pojoClass;
}
/**
* Returns a string representing mapping from generic record to POJO fields
*/
public String getFieldMappingString()
{
return fieldMappingString;
}
/**
* Comma separated list mapping a field in JSON schema to POJO field eg :
* fieldNameInPOJO:fieldNameInJSON:DataType
*/
public void setFieldMappingString(String pojoFieldsToJsonMapping)
{
this.fieldMappingString = pojoFieldsToJsonMapping;
}
public StreamingJsonParser()
{
}
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
}
@Override
public void processTuple(byte[] tuple)
{
incomingTuplesCount++;
Object obj = convert(tuple);
if (obj != null) {
output.emit(obj);
emittedObjectCount++;
}
}
/**
* Parse an incoming tuple & return a POJO object
*/
@Override
public Object convert(byte[] tuple)
{
String str;
if (tuple == null) {
if (err.isConnected()) {
err.emit(new KeyValPair<String, String>(null, "null tuple"));
}
errorTupleCount++;
return null;
}
try {
str = new String(tuple, "UTF-8");
} catch (UnsupportedEncodingException e1) {
if (err.isConnected()) {
err.emit(new KeyValPair<String, String>(tuple.toString(), "Encoding not supported"));
}
errorTupleCount++;
LOG.error("Encoding not supported", e1);
throw new RuntimeException(e1);
}
try {
finder.setKeyCount(0);
finder.getKeyValMap().clear();
while (!finder.isEnd()) {
jsonParser.parse(str, finder, true);
//stop parsing when the required keyCount is reached
if (finder.getKeyCount() == columnFields.size()) {
break;
}
}
jsonParser.reset();
return setPojoFields(finder.getKeyValMap());
} catch (ParseException | IllegalAccessException | InstantiationException e) {
if (err.isConnected()) {
err.emit(new KeyValPair<String, String>(str, e.getMessage()));
}
errorTupleCount++;
LOG.error("Exception in parsing the record", e);
throw new RuntimeException(e);
}
}
/**
* Creates a map representing fieldName in POJO:field in JSON:Data type
*
* @return List of FieldInfo
*/
private List<FieldInfo> createFieldInfoMap(String str)
{
fieldInfos = new ArrayList<FieldInfo>();
StringTokenizer strtok = new StringTokenizer(str, RECORD_SEPARATOR);
while (strtok.hasMoreTokens()) {
String[] token = strtok.nextToken().split(FIELD_SEPARATOR);
try {
fieldInfos.add(new FieldInfo(token[0], token[1], SupportType.valueOf(token[2])));
} catch (Exception e) {
LOG.error("Invalid support type", e);
}
}
return fieldInfos;
}
@Override
public KeyValPair<String, String> processErrorTuple(byte[] input)
{
throw new UnsupportedOperationException("Not supported");
}
/**
* Class that maps fieldInfo to its getters or setters
*/
protected static class ActiveFieldInfo
{
final FieldInfo fieldInfo;
Object setterOrGetter;
ActiveFieldInfo(FieldInfo fieldInfo)
{
this.fieldInfo = fieldInfo;
}
}
/**
* A list of {@link FieldInfo}s where each item maps a column name to a pojo
* field name.
*/
private List<FieldInfo> getFieldInfos()
{
return fieldInfos;
}
/**
* Sets the {@link FieldInfo}s. A {@link FieldInfo} maps a store column to a
* pojo field name.<br/>
* The value from fieldInfo.column is assigned to
* fieldInfo.pojoFieldExpression.
*
* @description $[].columnName name of the Output Field in POJO
* @description $[].pojoFieldExpression expression to get the respective field
* from generic record
* @useSchema $[].pojoFieldExpression outputPort.fields[].name
*/
private void setFieldInfos(List<FieldInfo> fieldInfos)
{
this.fieldInfos = fieldInfos;
}
/**
* Use reflection to generate field info values if the user has not provided
* the inputs mapping
*
* @return String representing the POJO field to JSON field mapping
*/
private String generateFieldInfoInputs(Class<?> cls)
{
java.lang.reflect.Field[] fields = cls.getDeclaredFields();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fields.length; i++) {
java.lang.reflect.Field f = fields[i];
Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
sb.append(f.getName()).append(FIELD_SEPARATOR).append(f.getName()).append(FIELD_SEPARATOR)
.append(c.getSimpleName().toUpperCase()).append(RECORD_SEPARATOR);
}
return sb.substring(0, sb.length() - 1);
}
/**
* Adds the Active Fields to the columnFieldSetters {@link ActiveFieldInfo}s
*/
private void initColumnFieldSetters(List<FieldInfo> fieldInfos)
{
for (FieldInfo fi : fieldInfos) {
if (columnFieldSetters == null) {
columnFieldSetters = Lists.newArrayList();
}
columnFieldSetters.add(new StreamingJsonParser.ActiveFieldInfo(fi));
}
}
/**
* Initialize the setters for generating the POJO
*/
private void initializeActiveFieldSetters()
{
for (int i = 0; i < columnFieldSetters.size(); i++) {
ActiveFieldInfo activeFieldInfo = columnFieldSetters.get(i);
SupportType st = activeFieldInfo.fieldInfo.getType();
switch (st) {
case BOOLEAN:
activeFieldInfo.setterOrGetter = PojoUtils.createSetterBoolean(getPojoClass(),
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case DOUBLE:
activeFieldInfo.setterOrGetter = PojoUtils.createSetterDouble(getPojoClass(),
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case FLOAT:
activeFieldInfo.setterOrGetter = PojoUtils.createSetterFloat(getPojoClass(),
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case INTEGER:
activeFieldInfo.setterOrGetter = PojoUtils.createSetterInt(getPojoClass(),
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
case STRING:
activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(),
activeFieldInfo.fieldInfo.getPojoFieldExpression(), activeFieldInfo.fieldInfo.getType().getJavaType());
break;
case LONG:
activeFieldInfo.setterOrGetter = PojoUtils.createSetterLong(getPojoClass(),
activeFieldInfo.fieldInfo.getPojoFieldExpression());
break;
default:
activeFieldInfo.setterOrGetter = PojoUtils.createSetter(getPojoClass(),
activeFieldInfo.fieldInfo.getPojoFieldExpression(), Byte.class);
break;
}
columnFieldSetters.get(i).setterOrGetter = activeFieldInfo.setterOrGetter;
}
}
/**
* Returns a POJO from a Generic Record Null is set as the default value if a
* key is not found in the parsed JSON
*
* @return Object
*/
@SuppressWarnings("unchecked")
private Object setPojoFields(HashMap<Object, Object> tuple) throws InstantiationException, IllegalAccessException
{
Object newObj = getPojoClass().newInstance();
try {
for (int i = 0; i < columnFieldSetters.size(); i++) {
StreamingJsonParser.ActiveFieldInfo afi = columnFieldSetters.get(i);
SupportType st = afi.fieldInfo.getType();
Object val = null;
try {
val = tuple.get(afi.fieldInfo.getColumnName());
} catch (Exception e) {
LOG.error("Could not find field -" + afi.fieldInfo.getColumnName() + "- in the generic record", e);
val = null;
}
//Nothing to set if a value is null
if (val == null) {
continue;
}
try {
switch (st) {
case BOOLEAN:
((PojoUtils.SetterBoolean<Object>)afi.setterOrGetter).set(newObj,
(boolean)tuple.get(afi.fieldInfo.getColumnName()));
break;
case DOUBLE:
((PojoUtils.SetterDouble<Object>)afi.setterOrGetter).set(newObj,
(double)tuple.get(afi.fieldInfo.getColumnName()));
break;
case INTEGER:
int intVal = Ints.checkedCast((long)tuple.get(afi.fieldInfo.getColumnName()));
((PojoUtils.SetterInt<Object>)afi.setterOrGetter).set(newObj, intVal);
break;
case STRING:
((PojoUtils.Setter<Object, String>)afi.setterOrGetter).set(newObj,
new String(tuple.get(afi.fieldInfo.getColumnName()).toString()));
break;
case LONG:
((PojoUtils.SetterLong<Object>)afi.setterOrGetter).set(newObj,
(long)tuple.get(afi.fieldInfo.getColumnName()));
break;
default:
throw new RuntimeException("Invalid Support Type");
}
} catch (Exception e) {
LOG.error("Exception in setting value", e);
throw new RuntimeException(e);
}
}
} catch (Exception ex) {
LOG.error("Generic Exception in setting value" + ex.getMessage());
newObj = null;
}
return newObj;
}
@OutputPortFieldAnnotation(schemaRequired = true)
public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>()
{
public void setup(PortContext context)
{
jsonParser = new JSONParser();
finder = new JsonKeyFinder();
columnFields = new ArrayList<String>();
columnFieldSetters = Lists.newArrayList();
setPojoClass(context.getValue(Context.PortContext.TUPLE_CLASS));
if (getFieldMappingString() == null) {
setFieldInfos(createFieldInfoMap(generateFieldInfoInputs(getPojoClass())));
} else {
setFieldInfos(createFieldInfoMap(getFieldMappingString()));
}
initColumnFieldSetters(getFieldInfos());
initializeActiveFieldSetters();
ListIterator<FieldInfo> itr = fieldInfos.listIterator();
while (itr.hasNext()) {
columnFields.add(itr.next().getColumnName());
}
finder.setMatchKeyList(columnFields);
}
};
private static final Logger LOG = LoggerFactory.getLogger(StreamingJsonParser.class);
}