blob: 1d8585194643dadce1be04ef810435c8f523f86b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.xml;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.Typifier;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.Attribute;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import java.io.Closeable;
import java.io.InputStream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Stack;
public class XMLReader implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(XMLReader.class);
public static final String ATTRIBUTE_MAP_NAME = "attributes";
private final Stack<String> fieldNameStack;
private final Stack<TupleWriter> rowWriterStack;
private final int dataLevel;
private final Map<String, XMLMap> nestedMapCollection;
private final boolean allTextmode;
private TupleWriter attributeWriter;
private CustomErrorContext errorContext;
private RowSetLoader rootRowWriter;
private int currentNestingLevel;
private XMLEvent currentEvent;
private String rootDataFieldName;
private String fieldName;
private xmlState currentState;
private TupleWriter currentTupleWriter;
private boolean rowStarted;
private String attributePrefix;
private String fieldValue;
private InputStream fsStream;
private XMLEventReader reader;
private ImplicitColumns metadata;
private boolean isSelfClosingEvent;
/**
* This field indicates the various states in which the reader operates. The names should be self-explanatory,
* but they are used as the reader iterates over the XML tags to know what to do.
*/
private enum xmlState {
ROW_STARTED,
POSSIBLE_MAP,
NESTED_MAP_STARTED,
GETTING_DATA,
WRITING_DATA,
FIELD_ENDED,
ROW_ENDED
}
public XMLReader(InputStream fsStream, int dataLevel, boolean allTextMode) throws XMLStreamException {
this.fsStream = fsStream;
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
// This property prevents XXE attacks by disallowing DTD.
inputFactory.setProperty(XMLInputFactory.SUPPORT_DTD, false);
reader = inputFactory.createXMLEventReader(fsStream);
fieldNameStack = new Stack<>();
rowWriterStack = new Stack<>();
nestedMapCollection = new HashMap<>();
this.dataLevel = dataLevel;
isSelfClosingEvent = false;
this.allTextmode = allTextMode;
}
public void open(RowSetLoader rootRowWriter, CustomErrorContext errorContext ) {
this.errorContext = errorContext;
this.rootRowWriter = rootRowWriter;
attributeWriter = getAttributeWriter();
}
public boolean next() {
while (!rootRowWriter.isFull()) {
try {
if (!processElements()) {
return false;
}
} catch (Exception e) {
throw UserException
.dataReadError(e)
.message("Error parsing file: " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
}
return true;
}
@Override
public void close() {
if (fsStream != null) {
AutoCloseables.closeSilently(fsStream);
fsStream = null;
}
if (reader != null) {
try {
reader.close();
} catch (XMLStreamException e) {
logger.warn("Error when closing XML stream: {}", e.getMessage());
}
reader = null;
}
}
/**
* This function processes the XML elements. This function stops reading when the
* limit (if any) which came from the query has been reached or the Iterator runs out of
* elements.
* @return True if there are more elements to parse, false if not
*/
private boolean processElements() {
XMLEvent nextEvent;
if (!reader.hasNext()) {
// Stop reading if there are no more results
return false;
}
// Iterate over XML events
while (reader.hasNext()) {
// get the current event
try {
nextEvent = reader.nextEvent();
// If the next event is whitespace, newlines, or other cruft that we don't need,
// ignore the event and move to the next event
if (XMLUtils.isEmptyWhiteSpace(nextEvent)) {
continue;
}
// Reset the self-closing tag flag.
isSelfClosingEvent = isSelfClosingEvent(currentEvent, nextEvent);
if (isSelfClosingEvent) {
logger.debug("Found self closing event!!");
}
// Capture the previous and current event
XMLEvent lastEvent = currentEvent;
currentEvent = nextEvent;
// Process the event
processEvent(currentEvent, lastEvent, reader.peek());
} catch (XMLStreamException e) {
throw UserException
.dataReadError(e)
.message("Error parsing XML file: " + e.getMessage())
.addContext(errorContext)
.build(logger);
}
}
return true;
}
/**
* One of the challenges with XML parsing are self-closing events. The streaming XML parser
* treats self-closing events as two events: a start event and an ending event. The issue is that
* the self-closing events can cause schema issues with Drill specifically, if a self-closing event
* is detected prior to a non-self-closing event, and that populated event contains a map or other nested data
* Drill will throw a schema change exception.
* <p>
* Since Drill uses Java's streaming XML parser, unfortunately, it does not provide a means of identifying
* self-closing tags. This function does that by comparing the event with the previous event and looking for
* a condition where one event is a start and the other is an ending event. Additionally, the column number and
* character offsets must be the same, indicating that the two events are the same.
*
* @param e1 The first XMLEvent
* @param e2 The second XMLEvent
* @return True if the events represent a self-closing event, false if not.
*/
private boolean isSelfClosingEvent(XMLEvent e1, XMLEvent e2) {
// If either event is null return false.
if (e1 == null || e2 == null) {
return false;
} else if (XMLUtils.hasAttributes(e1) || XMLUtils.hasAttributes(e2)) {
return false;
}
return (e1.getLocation().getCharacterOffset() == e2.getLocation().getCharacterOffset()) &&
(e1.getLocation().getColumnNumber() == e2.getLocation().getColumnNumber()) &&
e1.isStartElement() && e2.isEndElement();
}
/**
* This function processes an actual XMLEvent. There are three possibilities:
* 1. The event is a start event
* 2. The event contains text
* 3. The event is a closing tag
* There are other possible elements, but they are not relevant for our purposes.
*
* @param currentEvent The current event to be processed
* @param lastEvent The previous event which was processed
*/
private void processEvent(XMLEvent currentEvent,
XMLEvent lastEvent, XMLEvent nextEvent) {
String mapName;
switch (currentEvent.getEventType()) {
/*
* This case handles start elements.
* Case 1: The current nesting level is less than the data level.
* In this case, increase the nesting level and stop processing.
*
* Case 2: The nesting level is higher than the data level.
* In this case, a few things must happen.
* 1. We capture the field name
* 2. If the row has not started, we start the row
* 3. Set the possible map flag
* 4. Process attributes
* 5. Push both the field name and writer to the stacks
*/
case XMLStreamConstants.START_ELEMENT:
currentNestingLevel++;
// Case 1: Current nesting level is less than the data level
if (currentNestingLevel < dataLevel) {
// Stop here if the current level of nesting has not reached the data.
break;
}
StartElement startElement = currentEvent.asStartElement();
// Get the field name
fieldName = startElement.getName().getLocalPart();
if (rootDataFieldName == null && currentNestingLevel == dataLevel) {
rootDataFieldName = fieldName;
logger.debug("Root field name: {}", rootDataFieldName);
}
if (!rowStarted) {
currentTupleWriter = startRow(rootRowWriter);
Iterator<Attribute> attributes = startElement.getAttributes();
if (attributes != null && attributes.hasNext()) {
// This would be the root element, so the attribute prefix would simply be the field name.
writeAttributes(fieldName, attributes);
}
} else {
if (lastEvent != null &&
lastEvent.getEventType() == XMLStreamConstants.START_ELEMENT) {
/*
* Check the flag in the next section. If the next element is a character AND the flag is set,
* start a map. If not... ignore it all.
*/
changeState(xmlState.POSSIBLE_MAP);
rowWriterStack.push(currentTupleWriter);
}
fieldNameStack.push(fieldName);
if (currentNestingLevel > dataLevel) {
attributePrefix = XMLUtils.addField(attributePrefix, fieldName);
}
Iterator<Attribute> attributes = startElement.getAttributes();
if (attributes != null && attributes.hasNext()) {
writeAttributes(attributePrefix, attributes);
}
}
break;
/*
* This case processes character elements.
*/
case XMLStreamConstants.CHARACTERS:
/*
* This is the case for comments or other characters after a closing tag
*/
if (currentState == xmlState.ROW_ENDED) {
break;
}
// Get the field value but ignore characters outside of rows
if (rowStarted) {
if (currentState == xmlState.POSSIBLE_MAP && currentNestingLevel > dataLevel +1) {
changeState(xmlState.NESTED_MAP_STARTED);
// Remove the current field name from the stack
if (fieldNameStack.size() > 1) {
fieldNameStack.pop();
}
// Get the map name and push to stack
mapName = fieldNameStack.pop();
currentTupleWriter = getMapWriter(mapName, currentTupleWriter);
} else {
changeState(xmlState.ROW_STARTED);
}
}
// Get the field value
fieldValue = currentEvent.asCharacters().getData().trim();
changeState(xmlState.GETTING_DATA);
changeState(xmlState.GETTING_DATA);
break;
case XMLStreamConstants.END_ELEMENT:
currentNestingLevel--;
if (isSelfClosingEvent) {
logger.debug("Closing self-closing event {}. ", fieldName);
isSelfClosingEvent = false;
attributePrefix = XMLUtils.removeField(attributePrefix,fieldName);
break;
}
if (currentNestingLevel < dataLevel - 1) {
break;
} else if (currentEvent.asEndElement().getName().toString().compareTo(rootDataFieldName) == 0) {
// End the row
currentTupleWriter = endRow();
// Clear stacks
rowWriterStack.clear();
fieldNameStack.clear();
attributePrefix = "";
} else if (currentState == xmlState.FIELD_ENDED && currentNestingLevel >= dataLevel) {
// Case to end nested maps
// Pop tupleWriter off stack
if (rowWriterStack.size() > 0) {
currentTupleWriter = rowWriterStack.pop();
}
// Pop field name
if (fieldNameStack.size() > 0) {
fieldNameStack.pop();
}
attributePrefix = XMLUtils.removeField(attributePrefix,fieldName);
} else if (currentState != xmlState.ROW_ENDED) {
if ( !isSelfClosingEvent) {
writeFieldData(fieldName, fieldValue, currentTupleWriter);
}
// Clear out field name and value
attributePrefix = XMLUtils.removeField(attributePrefix, fieldName);
// Pop field name
if (fieldNameStack.size() > 0) {
fieldNameStack.pop();
}
fieldName = null;
fieldValue = null;
}
break;
}
}
public void implicitFields(ImplicitColumns metadata) {
this.metadata = metadata;
}
private TupleWriter startRow(RowSetLoader writer) {
if (currentNestingLevel == dataLevel) {
rootRowWriter.start();
rowStarted = true;
rowWriterStack.push(rootRowWriter);
changeState(xmlState.ROW_STARTED);
return rootRowWriter;
} else {
rowStarted = false;
return writer;
}
}
/**
* This method executes the steps to end a row from an XML dataset.
* @return the root row writer
*/
private TupleWriter endRow() {
logger.debug("Ending row");
if (metadata != null) {
metadata.writeImplicitColumns();
}
rootRowWriter.save();
rowStarted = false;
changeState(xmlState.ROW_ENDED);
return rootRowWriter;
}
/**
* Writes a field. If the field does not have a corresponding ScalarWriter, this method will
* create one.
* @param fieldName The field name
* @param fieldValue The field value to be written
* @param writer The TupleWriter which represents
*/
private void writeFieldData(String fieldName, String fieldValue, TupleWriter writer) {
if (fieldName == null) {
return;
}
changeState(xmlState.WRITING_DATA);
// Find the TupleWriter object
int index = writer.tupleSchema().index(fieldName);
if (index == -1) {
if (allTextmode) {
ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
index = writer.addColumn(colSchema);
} else {
index = addNewScalarColumn(fieldName, fieldValue, writer);
}
}
ScalarWriter colWriter = writer.scalar(index);
ColumnMetadata columnMetadata = writer.tupleSchema().metadata(index);
MinorType dataType = columnMetadata.schema().getType().getMinorType();
String dateFormat;
// Write the values depending on their data type. This only applies to scalar fields.
if (fieldValue != null && (currentState != xmlState.ROW_ENDED && currentState != xmlState.FIELD_ENDED)) {
switch (dataType) {
case BIT:
colWriter.setBoolean(Boolean.parseBoolean(fieldValue));
break;
case TINYINT:
case SMALLINT:
case INT:
colWriter.setInt(Integer.parseInt(fieldValue));
break;
case BIGINT:
colWriter.setLong(Long.parseLong(fieldValue));
break;
case FLOAT4:
case FLOAT8:
colWriter.setDouble(Double.parseDouble(fieldValue));
break;
case DATE:
dateFormat = columnMetadata.property("drill.format");
LocalDate localDate;
if (Strings.isNullOrEmpty(dateFormat)) {
// Use typifier if all text mode is disabled.
if (!allTextmode) {
localDate = Typifier.stringAsDate(fieldValue);
} else {
localDate = LocalDate.parse(fieldValue);
}
} else {
localDate = LocalDate.parse(fieldValue, DateTimeFormatter.ofPattern(dateFormat));
}
colWriter.setDate(localDate);
break;
case TIME:
dateFormat = columnMetadata.property("drill.format");
LocalTime localTime;
if (Strings.isNullOrEmpty(dateFormat)) {
localTime = LocalTime.parse(fieldValue);
} else {
localTime = LocalTime.parse(fieldValue, DateTimeFormatter.ofPattern(dateFormat));
}
colWriter.setTime(localTime);
break;
case TIMESTAMP:
dateFormat = columnMetadata.property("drill.format");
Instant timestamp;
if (Strings.isNullOrEmpty(dateFormat)) {
timestamp = Instant.parse(fieldValue);
} else {
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateFormat);
Date parsedDate = simpleDateFormat.parse(fieldValue);
timestamp = Instant.ofEpochMilli(parsedDate.getTime());
} catch (ParseException e) {
throw UserException.parseError(e)
.message("Cannot parse " + fieldValue + " as a timestamp. You can specify a format string in the provided schema to correct this.")
.addContext(errorContext)
.build(logger);
}
}
colWriter.setTimestamp(timestamp);
break;
default:
colWriter.setString(fieldValue);
}
changeState(xmlState.FIELD_ENDED);
}
}
/**
* Adds a new scalar column to the schema. If the data type is unknown, it will default to
* VARCHAR.
* @param fieldName The field name.
* @param fieldValue The field value
* @param writer A {@link TupleWriter} of the current Drill schema
* @return A int of the index of the new column.
*/
private int addNewScalarColumn(String fieldName, String fieldValue, TupleWriter writer) {
MinorType dataType;
if (Strings.isNullOrEmpty(fieldValue)) {
dataType = MinorType.VARCHAR;
} else {
dataType = Typifier.typifyToDrill(fieldValue);
}
ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, dataType, DataMode.OPTIONAL);
return writer.addColumn(colSchema);
}
/**
* Writes a attribute. If the field does not have a corresponding ScalarWriter, this method will
* create one.
* @param fieldName The field name
* @param fieldValue The field value to be written
* @param writer The TupleWriter which represents
*/
private void writeAttributeData(String fieldName, String fieldValue, TupleWriter writer) {
if (fieldName == null) {
return;
}
// Find the TupleWriter object
int index = writer.tupleSchema().index(fieldName);
if (index == -1) {
ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
index = writer.addColumn(colSchema);
}
ScalarWriter colWriter = writer.scalar(index);
if (fieldValue != null) {
colWriter.setString(fieldValue);
}
}
/**
* Returns a MapWriter for a given field. If the writer does not exist, add one to the schema
* @param mapName The Map's name
* @param rowWriter The current TupleWriter
* @return A TupleWriter of the new map
*/
private TupleWriter getMapWriter(String mapName, TupleWriter rowWriter) {
logger.debug("Adding map: {}", mapName);
int index = rowWriter.tupleSchema().index(mapName);
if (index == -1) {
// Check to see if the map already exists in the map collection
// This condition can occur in deeply nested data.
String tempFieldName = mapName + "-" + currentNestingLevel;
XMLMap mapObject = nestedMapCollection.get(tempFieldName);
if (mapObject != null) {
logger.debug("Found map {}", tempFieldName);
return mapObject.getMapWriter();
}
index = rowWriter.addColumn(SchemaBuilder.columnSchema(mapName, MinorType.MAP, DataMode.REQUIRED));
// Add map to map collection for future use
nestedMapCollection.put(tempFieldName, new XMLMap(mapName, rowWriter.tuple(index)));
}
logger.debug("Index: {}, Fieldname: {}", index, mapName);
return rowWriter.tuple(index);
}
private void changeState(xmlState newState) {
xmlState previousState = currentState;
currentState = newState;
}
private TupleWriter getAttributeWriter() {
int attributeIndex = rootRowWriter.tupleSchema().index(ATTRIBUTE_MAP_NAME);
if (attributeIndex == -1) {
attributeIndex = rootRowWriter.addColumn(SchemaBuilder.columnSchema(ATTRIBUTE_MAP_NAME, MinorType.MAP, DataMode.REQUIRED));
}
return rootRowWriter.tuple(attributeIndex);
}
/**
* Helper function which writes attributes of an XML element.
* @param prefix The attribute prefix
* @param attributes An iterator of Attribute objects
*/
private void writeAttributes(String prefix, Iterator<Attribute> attributes) {
while (attributes.hasNext()) {
Attribute currentAttribute = attributes.next();
String key = prefix + "_" + currentAttribute.getName().toString();
writeAttributeData(key, currentAttribute.getValue(), attributeWriter);
}
}
}