blob: 28fd1691f362e407b70db8f0104d3e3581feb6f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.parser;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.external.api.IDataParser;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IStreamDataParser;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.util.NonTaggedFormatUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
import org.apache.hyracks.dataflow.std.file.FieldCursorForDelimitedDataParser;
public class DelimitedDataParser extends AbstractDataParser implements IStreamDataParser, IRecordDataParser<char[]> {
private final char fieldDelimiter;
private final char quote;
private final boolean hasHeader;
private ARecordType recordType;
private IARecordBuilder recBuilder;
private ArrayBackedValueStorage fieldValueBuffer;
private DataOutput fieldValueBufferOutput;
private IValueParser[] valueParsers;
private FieldCursorForDelimitedDataParser cursor;
private byte[] fieldTypeTags;
private int[] fldIds;
private ArrayBackedValueStorage[] nameBuffers;
private boolean areAllNullFields;
public DelimitedDataParser(IValueParserFactory[] valueParserFactories, char fieldDelimter, char quote,
boolean hasHeader, ARecordType recordType, boolean isStreamParser) throws HyracksDataException {
this.fieldDelimiter = fieldDelimter;
this.quote = quote;
this.hasHeader = hasHeader;
this.recordType = recordType;
valueParsers = new IValueParser[valueParserFactories.length];
for (int i = 0; i < valueParserFactories.length; ++i) {
valueParsers[i] = valueParserFactories[i].createValueParser();
}
fieldValueBuffer = new ArrayBackedValueStorage();
fieldValueBufferOutput = fieldValueBuffer.getDataOutput();
recBuilder = new RecordBuilder();
recBuilder.reset(recordType);
recBuilder.init();
int n = recordType.getFieldNames().length;
fieldTypeTags = new byte[n];
for (int i = 0; i < n; i++) {
ATypeTag tag = recordType.getFieldTypes()[i].getTypeTag();
fieldTypeTags[i] = tag.serialize();
}
fldIds = new int[n];
nameBuffers = new ArrayBackedValueStorage[n];
AMutableString str = new AMutableString(null);
for (int i = 0; i < n; i++) {
String name = recordType.getFieldNames()[i];
fldIds[i] = recBuilder.getFieldId(name);
if (fldIds[i] < 0) {
if (!recordType.isOpen()) {
throw new HyracksDataException("Illegal field " + name + " in closed type " + recordType);
} else {
nameBuffers[i] = new ArrayBackedValueStorage();
str.setValue(name);
IDataParser.toBytes(str, nameBuffers[i], stringSerde);
}
}
}
if (!isStreamParser) {
cursor = new FieldCursorForDelimitedDataParser(null, fieldDelimiter, quote);
}
}
@Override
public boolean parse(DataOutput out) throws IOException {
while (cursor.nextRecord()) {
parseRecord(out);
if (!areAllNullFields) {
recBuilder.write(out, true);
return true;
}
}
return false;
}
private void parseRecord(DataOutput out) throws IOException {
recBuilder.reset(recordType);
recBuilder.init();
areAllNullFields = true;
for (int i = 0; i < valueParsers.length; ++i) {
if (!cursor.nextField()) {
break;
}
fieldValueBuffer.reset();
if (cursor.fStart == cursor.fEnd && recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.STRING
&& recordType.getFieldTypes()[i].getTypeTag() != ATypeTag.NULL) {
// if the field is empty and the type is optional, insert
// NULL. Note that string type can also process empty field as an
// empty string
if (!NonTaggedFormatUtil.isOptional(recordType.getFieldTypes()[i])) {
throw new HyracksDataException("At record: " + cursor.recordCount + " - Field " + cursor.fieldCount
+ " is not an optional type so it cannot accept null value. ");
}
fieldValueBufferOutput.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
} else {
fieldValueBufferOutput.writeByte(fieldTypeTags[i]);
// Eliminate doule quotes in the field that we are going to parse
if (cursor.isDoubleQuoteIncludedInThisField) {
cursor.eliminateDoubleQuote(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart);
cursor.fEnd -= cursor.doubleQuoteCount;
cursor.isDoubleQuoteIncludedInThisField = false;
}
valueParsers[i].parse(cursor.buffer, cursor.fStart, cursor.fEnd - cursor.fStart,
fieldValueBufferOutput);
areAllNullFields = false;
}
if (fldIds[i] < 0) {
recBuilder.addField(nameBuffers[i], fieldValueBuffer);
} else {
recBuilder.addField(fldIds[i], fieldValueBuffer);
}
}
}
@Override
public void parse(IRawRecord<? extends char[]> record, DataOutput out) throws IOException {
cursor.nextRecord(record.get(), record.size());
parseRecord(out);
if (!areAllNullFields) {
recBuilder.write(out, true);
}
}
@Override
public void setInputStream(InputStream in) throws IOException {
cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
if (in != null && hasHeader) {
cursor.nextRecord();
while (cursor.nextField()) {
;
}
}
}
@Override
public boolean reset(InputStream in) throws IOException {
cursor = new FieldCursorForDelimitedDataParser(new InputStreamReader(in), fieldDelimiter, quote);
return true;
}
}