blob: 52a9701507d4db2d7b29197bd968a476df9a9fbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.xml;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.RecordSourceFactory;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.xml.inference.XmlNode;
import org.apache.nifi.xml.inference.XmlRecordSource;
import org.apache.nifi.xml.inference.XmlSchemaInference;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.apache.nifi.schema.inference.SchemaInferenceUtil.INFER_SCHEMA;
@Tags({"xml", "record", "reader", "parser"})
@CapabilityDescription("Reads XML content and creates Record objects. Records are expected in the second level of " +
"XML data, embedded in an enclosing root tag.")
public class XMLReader extends SchemaRegistryService implements RecordReaderFactory {
public static final AllowableValue RECORD_SINGLE = new AllowableValue("false", "false",
"Each FlowFile will consist of a single record without any sort of \"wrapper\".");
public static final AllowableValue RECORD_ARRAY = new AllowableValue("true", "true",
"Each FlowFile will consist of zero or more records. The outer-most XML element is expected to be a \"wrapper\" and will be ignored.");
public static final AllowableValue RECORD_EVALUATE = new AllowableValue("${xml.stream.is.array}", "Use attribute 'xml.stream.is.array'",
"Whether to treat a FlowFile as a single Record or an array of multiple Records is determined by the value of the 'xml.stream.is.array' attribute. "
+ "If the value of the attribute is 'true' (case-insensitive), then the XML Reader will treat the FlowFile as a series of Records with the outer element being ignored. "
+ "If the value of the attribute is 'false' (case-insensitive), then the FlowFile is treated as a single Record and no wrapper element is assumed. "
+ "If the attribute is missing or its value is anything other than 'true' or 'false', then an Exception will be thrown and no records will be parsed.");
public static final PropertyDescriptor RECORD_FORMAT = new PropertyDescriptor.Builder()
.name("record_format")
.displayName("Expect Records as Array")
.description("This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\". Because XML does not "
+ "provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire "
+ "XML blob with a \"wrapper element\". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\" "
+ "that will be ignored.")
.allowableValues(RECORD_SINGLE, RECORD_ARRAY, RECORD_EVALUATE)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue(RECORD_SINGLE.getValue())
.required(true)
.build();
public static final PropertyDescriptor ATTRIBUTE_PREFIX = new PropertyDescriptor.Builder()
.name("attribute_prefix")
.displayName("Attribute Prefix")
.description("If this property is set, the name of attributes will be prepended with a prefix when they are added to a record.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor CONTENT_FIELD_NAME = new PropertyDescriptor.Builder()
.name("content_field_name")
.displayName("Field Name for Content")
.description("If tags with content (e. g. <field>content</field>) are defined as nested records in the schema, " +
"the name of the tag will be used as name for the record and the value of this property will be used as name for the field. " +
"If tags with content shall be parsed together with attributes (e. g. <field attribute=\"123\">content</field>), " +
"they have to be defined as records. For additional information, see the section of processor usage.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(SchemaInferenceUtil.SCHEMA_CACHE);
properties.add(RECORD_FORMAT);
properties.add(ATTRIBUTE_PREFIX);
properties.add(CONTENT_FIELD_NAME);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
return properties;
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(INFER_SCHEMA);
return allowableValues;
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
final RecordSourceFactory<XmlNode> sourceFactory = (variables, contentStream) -> new XmlRecordSource(contentStream, isMultipleRecords(context, variables));
final Supplier<SchemaInferenceEngine<XmlNode>> schemaInference = () -> new XmlSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), sourceFactory, schemaInference,
() -> super.getSchemaAccessStrategy(strategy, schemaRegistry, context));
}
private boolean isMultipleRecords(final PropertyContext context, final Map<String, String> variables) {
final String recordFormat = context.getProperty(RECORD_FORMAT).evaluateAttributeExpressions(variables).getValue().trim();
if ("true".equalsIgnoreCase(recordFormat)) {
return true;
} else if ("false".equalsIgnoreCase(recordFormat)) {
return false;
} else {
throw new ProcessException("Cannot parse XML Records because the '" + RECORD_FORMAT.getDisplayName() + "' property evaluates to '"
+ recordFormat + "', which is neither 'true' nor 'false'");
}
}
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return INFER_SCHEMA;
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, SchemaNotFoundException, MalformedRecordException {
final ConfigurationContext context = getConfigurationContext();
final RecordSchema schema = getSchema(variables, in, null);
final String attributePrefix = trim(context.getProperty(ATTRIBUTE_PREFIX).evaluateAttributeExpressions(variables).getValue());
final String contentFieldName = trim(context.getProperty(CONTENT_FIELD_NAME).evaluateAttributeExpressions(variables).getValue());
final boolean isArray = isMultipleRecords(context, variables);
return new XMLRecordReader(in, schema, isArray, attributePrefix, contentFieldName, dateFormat, timeFormat, timestampFormat, logger);
}
private String trim(final String value) {
return value == null ? null : value.trim();
}
}