blob: 41c56e316c7c9d487e8cc8719c96fa1c28abcc5a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.avro;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.Getter;
/**
* <p>
* PojoToAvro
* </p>
* A generic implementation for POJO to Avro conversion. A POJO is converted to
* a GenericRecord based on the schema provided.<br>
* As of now only primitive types are supported.<br>
* Error records are emitted on the errorPort if connected
*
* @displayName Pojo To Avro
* @category Converter
* @tags avro
*
* @since 3.4.0
*/
@InterfaceStability.Evolving
public class PojoToAvro extends BaseOperator
{
private List<Field> columnNames;
private Class<?> cls;
private List<Getter> keyMethodMap;
private transient String schemaString;
private transient Schema schema;
@AutoMetric
@VisibleForTesting
int recordCount = 0;
@AutoMetric
@VisibleForTesting
int errorCount = 0;
@AutoMetric
@VisibleForTesting
int fieldErrorCount = 0;
public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<>();
public final transient DefaultOutputPort<Object> errorPort = new DefaultOutputPort<>();
private void parseSchema() throws IOException
{
setSchema(new Schema.Parser().parse(getSchemaString()));
}
/**
* Returns the schema string for Avro Generic Record
*
* @return schemaString
*/
public String getSchemaString()
{
return schemaString;
}
/**
* Sets the schema string
*/
public void setSchemaString(String schemaString)
{
this.schemaString = schemaString;
}
/**
* Returns the schema object
*
* @return schema
*/
private Schema getSchema()
{
return schema;
}
/**
* Sets the shcema object
*/
private void setSchema(Schema schema)
{
this.schema = schema;
}
/**
* Returns the list for field names from provided Avro schema
*
* @return List of Fields
*/
private List<Field> getColumnNames()
{
return columnNames;
}
/**
* Sets the list of column names representing the fields in Avro schema
*/
private void setColumnNames(List<Field> columnNames)
{
this.columnNames = columnNames;
}
/**
* This method generates the getters for provided field of a given class
*
* @return Getter
*/
private Getter<?, ?> generateGettersForField(Class<?> cls, String inputFieldName)
throws NoSuchFieldException, SecurityException
{
java.lang.reflect.Field f = cls.getDeclaredField(inputFieldName);
Class<?> c = ClassUtils.primitiveToWrapper(f.getType());
Getter<?, ?> classGetter = PojoUtils.createGetter(cls, inputFieldName, c);
return classGetter;
}
/**
* Initializes the list of columns in POJO based on the names from schema
*/
private void initializeColumnMap(Schema schema)
{
setColumnNames(schema.getFields());
keyMethodMap = new ArrayList<>();
for (int i = 0; i < getColumnNames().size(); i++) {
try {
keyMethodMap.add(generateGettersForField(cls, getColumnNames().get(i).name()));
} catch (NoSuchFieldException | SecurityException e) {
throw new RuntimeException("Failed to initialize pojo class getters for field - ", e);
}
}
}
@InputPortFieldAnnotation(schemaRequired = true)
public final transient DefaultInputPort<Object> data = new DefaultInputPort<Object>()
{
@Override
public void setup(PortContext context)
{
cls = context.getValue(Context.PortContext.TUPLE_CLASS);
try {
parseSchema();
initializeColumnMap(getSchema());
} catch (IOException e) {
LOG.error("Exception in parsing schema", e);
}
}
@Override
public void process(Object tuple)
{
processTuple(tuple);
}
};
/**
* Converts incoming tuples into Generic records
*/
protected void processTuple(Object tuple)
{
GenericRecord record = null;
try {
record = getGenericRecord(tuple);
} catch (Exception e) {
LOG.error("Exception in creating record");
errorCount++;
}
if (record != null) {
output.emit(record);
recordCount++;
} else if (errorPort.isConnected()) {
errorPort.emit(tuple);
errorCount++;
}
}
/**
* Returns a generic record mapping the POJO fields to provided schema
*
* @return Generic Record
*/
private GenericRecord getGenericRecord(Object tuple) throws Exception
{
int writeErrorCount = 0;
GenericRecord rec = new GenericData.Record(getSchema());
for (int i = 0; i < columnNames.size(); i++) {
try {
rec.put(columnNames.get(i).name(), AvroRecordHelper.convertValueStringToAvroKeyType(getSchema(),
columnNames.get(i).name(), keyMethodMap.get(i).get(tuple).toString()));
} catch (AvroRuntimeException e) {
LOG.error("Could not set Field [" + columnNames.get(i).name() + "] in the generic record", e);
fieldErrorCount++;
} catch (Exception e) {
LOG.error("Parse Exception", e);
fieldErrorCount++;
writeErrorCount++;
}
}
if (columnNames.size() == writeErrorCount) {
errorCount++;
return null;
} else {
return rec;
}
}
@Override
public void beginWindow(long windowId)
{
recordCount = 0;
errorCount = 0;
fieldErrorCount = 0;
}
private static final Logger LOG = LoggerFactory.getLogger(PojoToAvro.class);
}