blob: a68c9286159b08d4d97fae504c0edd3ad44255a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.contrib.parser;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.beanutils.ConversionException;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.beanutils.converters.DateConverter;
import org.apache.commons.beanutils.converters.DateTimeConverter;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
import com.datatorrent.api.Context;
import com.datatorrent.lib.parser.Parser;
import com.datatorrent.lib.util.KeyValPair;
/**
* Operator parses tuple based on regex pattern and populates POJO matching the user defined schema <br>
* This operator expects the upstream operator to send every line in the file as a byte array.
* splitRegexPattern contains the regex pattern of lines in the file <br>
* Schema is specified in a json format as per {@link DelimitedSchema} that
* contains field information and constraints for each field.<br>
* Schema field names should match with the POJO variable names<br>
* Assumption is that each field in the delimited data should map to a simple
* java type.<br>
* <br>
* <b>Properties</b> <br>
* <b>splitRegexPattern</b>:Regex pattern as a string<br>
*
* @displayName RegexParser
* @category Parsers
* @tags pojo parser regex logs server
* @since 3.7.0
*/
@InterfaceStability.Evolving
public class RegexParser extends Parser<byte[], KeyValPair<String, String>>
{
/**
* Contents of the schema.Schema is specified in a json format as per
* {@link DelimitedSchema}
*/
@NotNull
private String schema;
/**
* Schema is read into this object to access fields
*/
private transient DelimitedSchema delimitedParserSchema;
/**
* Regex Pattern defined for the tuple
*/
@NotNull
private String splitRegexPattern;
/**
* Pattern to store the compiled regex
*/
private transient Pattern pattern;
@Override
public void setup(Context.OperatorContext context)
{
delimitedParserSchema = new DelimitedSchema(schema);
pattern = Pattern.compile(splitRegexPattern);
}
@Override
public void processTuple(byte[] tuple)
{
if (tuple == null) {
if (err.isConnected()) {
err.emit(new KeyValPair<String, String>(null, "Blank/null tuple"));
}
errorTupleCount++;
return;
}
String incomingString = new String(tuple);
if (StringUtils.isBlank(incomingString)) {
if (err.isConnected()) {
err.emit(new KeyValPair<String, String>(incomingString, "Blank tuple"));
}
errorTupleCount++;
return;
}
try {
if (out.isConnected() && clazz != null) {
Matcher matcher = pattern.matcher(incomingString);
boolean patternMatched = false;
Constructor<?> ctor = clazz.getConstructor();
Object object = ctor.newInstance();
if (matcher.find()) {
for (int i = 0; i <= matcher.groupCount()-1; i++) {
if (delimitedParserSchema.getFields().get(i).getType() == DelimitedSchema.FieldType.DATE) {
DateTimeConverter dtConverter = new DateConverter();
dtConverter.setPattern((String)delimitedParserSchema.getFields().get(i).getConstraints().get(DelimitedSchema.DATE_FORMAT));
ConvertUtils.register(dtConverter, Date.class);
}
BeanUtils.setProperty(object, delimitedParserSchema.getFields().get(i).getName(), matcher.group(i+1));
}
patternMatched = true;
}
if (!patternMatched) {
throw new ConversionException("The incoming tuple do not match with the Regex pattern defined.");
}
out.emit(object);
emittedObjectCount++;
}
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException | InstantiationException | ConversionException e) {
if (err.isConnected()) {
err.emit(new KeyValPair<String, String>(incomingString, e.getMessage()));
logger.debug("Regex Expression : {} Incoming tuple : {}", splitRegexPattern, incomingString);
}
errorTupleCount++;
logger.error("Tuple could not be parsed. Reason {}", e.getMessage());
}
}
/**
* Set the schema that defines the format of the tuple
*
* @param schema
*/
public void setSchema(String schema)
{
this.schema = schema;
}
/**
* Set the Regex Pattern expected for the incoming tuple
*
* @param splitRegexPattern
*/
public void setSplitRegexPattern(String splitRegexPattern)
{
this.splitRegexPattern = splitRegexPattern;
}
/**
* Get the schema value
*
* @return schema
*/
public String getSchema()
{
return schema;
}
/**
* Get the Regex Pattern value
*
* @return splitRegexPattern
*/
public String getSplitRegexPattern()
{
return splitRegexPattern;
}
@Override
public Object convert(byte[] tuple)
{
throw new UnsupportedOperationException("Not supported");
}
@Override
public KeyValPair<String, String> processErrorTuple(byte[] input)
{
throw new UnsupportedOperationException("Not supported");
}
/**
* Get errorTupleCount
*
* @return errorTupleCount
*/
@VisibleForTesting
public long getErrorTupleCount()
{
return errorTupleCount;
}
/**
* Get emittedObjectCount
*
* @return emittedObjectCount
*/
@VisibleForTesting
public long getEmittedObjectCount()
{
return emittedObjectCount;
}
/**
* Get incomingTuplesCount
*
* @return incomingTuplesCount
*/
@VisibleForTesting
public long getIncomingTuplesCount()
{
return incomingTuplesCount;
}
private static final Logger logger = LoggerFactory.getLogger(RegexParser.class);
}