| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.api.common.io; |
| |
| import org.apache.flink.annotation.Internal; |
| import org.apache.flink.core.fs.FileInputSplit; |
| import org.apache.flink.core.fs.Path; |
| import org.apache.flink.types.parser.FieldParser; |
| import org.apache.flink.types.parser.StringParser; |
| import org.apache.flink.types.parser.StringValueParser; |
| import org.apache.flink.util.InstantiationUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.TimeZone; |
| import java.util.TreeMap; |
| |
| |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| @Internal |
| public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> { |
| |
| private static final long serialVersionUID = 1L; |
| |
| |
| private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class); |
| |
| private static final Class<?>[] EMPTY_TYPES = new Class<?>[0]; |
| |
| private static final boolean[] EMPTY_INCLUDED = new boolean[0]; |
| |
| private static final byte[] DEFAULT_FIELD_DELIMITER = new byte[] {','}; |
| |
| private static final byte BACKSLASH = 92; |
| |
| // -------------------------------------------------------------------------------------------- |
| // Variables for internal operation. |
| // They are all transient, because we do not want them so be serialized |
| // -------------------------------------------------------------------------------------------- |
| |
| private transient FieldParser<?>[] fieldParsers; |
| |
| // To speed up readRecord processing. Used to find windows line endings. |
| // It is set when open so that readRecord does not have to evaluate it |
| protected boolean lineDelimiterIsLinebreak = false; |
| |
| protected transient int commentCount; |
| protected transient int invalidLineCount; |
| |
| |
| // -------------------------------------------------------------------------------------------- |
| // The configuration parameters. Configured on the instance and serialized to be shipped. |
| // -------------------------------------------------------------------------------------------- |
| |
| private Class<?>[] fieldTypes = EMPTY_TYPES; |
| |
| protected boolean[] fieldIncluded = EMPTY_INCLUDED; |
| |
| // The byte representation of the delimiter is updated consistent with |
| // current charset. |
| private byte[] fieldDelim = DEFAULT_FIELD_DELIMITER; |
| private String fieldDelimString = null; |
| |
| private boolean lenient; |
| |
| private boolean skipFirstLineAsHeader; |
| |
| private boolean quotedStringParsing = false; |
| |
| private byte quoteCharacter; |
| |
| // The byte representation of the comment prefix is updated consistent with |
| // current charset. |
| protected byte[] commentPrefix = null; |
| private String commentPrefixString = null; |
| |
| private TimeZone timezone = TimeZone.getTimeZone("UTC"); |
| |
| // -------------------------------------------------------------------------------------------- |
| // Constructors and getters/setters for the configurable parameters |
| // -------------------------------------------------------------------------------------------- |
| |
| protected GenericCsvInputFormat() { |
| super(); |
| } |
| |
| protected GenericCsvInputFormat(Path filePath) { |
| super(filePath, null); |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| public int getNumberOfFieldsTotal() { |
| return this.fieldIncluded.length; |
| } |
| |
| public int getNumberOfNonNullFields() { |
| return this.fieldTypes.length; |
| } |
| |
| @Override |
| public void setCharset(String charset) { |
| super.setCharset(charset); |
| |
| if (this.fieldDelimString != null) { |
| this.fieldDelim = fieldDelimString.getBytes(getCharset()); |
| } |
| |
| if (this.commentPrefixString != null) { |
| this.commentPrefix = commentPrefixString.getBytes(getCharset()); |
| } |
| } |
| |
| public byte[] getCommentPrefix() { |
| return commentPrefix; |
| } |
| |
| public void setCommentPrefix(String commentPrefix) { |
| if (commentPrefix != null) { |
| this.commentPrefix = commentPrefix.getBytes(getCharset()); |
| } else { |
| this.commentPrefix = null; |
| } |
| this.commentPrefixString = commentPrefix; |
| } |
| |
| public byte[] getFieldDelimiter() { |
| return fieldDelim; |
| } |
| |
| public void setFieldDelimiter(String delimiter) { |
| if (delimiter == null) { |
| throw new IllegalArgumentException("Delimiter must not be null"); |
| } |
| |
| this.fieldDelim = delimiter.getBytes(getCharset()); |
| this.fieldDelimString = delimiter; |
| } |
| |
| public boolean isLenient() { |
| return lenient; |
| } |
| |
| public void setLenient(boolean lenient) { |
| this.lenient = lenient; |
| } |
| |
| public boolean isSkippingFirstLineAsHeader() { |
| return skipFirstLineAsHeader; |
| } |
| |
| public void setSkipFirstLineAsHeader(boolean skipFirstLine) { |
| this.skipFirstLineAsHeader = skipFirstLine; |
| } |
| |
| public void enableQuotedStringParsing(char quoteCharacter) { |
| quotedStringParsing = true; |
| this.quoteCharacter = (byte)quoteCharacter; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| protected FieldParser<?>[] getFieldParsers() { |
| return this.fieldParsers; |
| } |
| |
| protected Class<?>[] getGenericFieldTypes() { |
| // check if we are dense, i.e., we read all fields |
| if (this.fieldIncluded.length == this.fieldTypes.length) { |
| return this.fieldTypes; |
| } |
| else { |
| // sparse type array which we made dense for internal book keeping. |
| // create a sparse copy to return |
| Class<?>[] types = new Class<?>[this.fieldIncluded.length]; |
| |
| for (int i = 0, k = 0; i < this.fieldIncluded.length; i++) { |
| if (this.fieldIncluded[i]) { |
| types[i] = this.fieldTypes[k++]; |
| } |
| } |
| |
| return types; |
| } |
| } |
| |
| |
| protected void setFieldTypesGeneric(Class<?> ... fieldTypes) { |
| if (fieldTypes == null) { |
| throw new IllegalArgumentException("Field types must not be null."); |
| } |
| |
| this.fieldIncluded = new boolean[fieldTypes.length]; |
| ArrayList<Class<?>> types = new ArrayList<Class<?>>(); |
| |
| // check if we support parsers for these types |
| for (int i = 0; i < fieldTypes.length; i++) { |
| Class<?> type = fieldTypes[i]; |
| |
| if (type != null) { |
| if (FieldParser.getParserForType(type) == null) { |
| throw new IllegalArgumentException("The type '" + type.getName() + "' is not supported for the CSV input format."); |
| } |
| types.add(type); |
| fieldIncluded[i] = true; |
| } |
| } |
| |
| this.fieldTypes = types.toArray(new Class<?>[types.size()]); |
| } |
| |
| protected void setFieldsGeneric(int[] sourceFieldIndices, Class<?>[] fieldTypes) { |
| checkNotNull(sourceFieldIndices); |
| checkNotNull(fieldTypes); |
| checkArgument(sourceFieldIndices.length == fieldTypes.length, |
| "Number of field indices and field types must match."); |
| |
| for (int i : sourceFieldIndices) { |
| if (i < 0) { |
| throw new IllegalArgumentException("Field indices must not be smaller than zero."); |
| } |
| } |
| |
| int largestFieldIndex = max(sourceFieldIndices); |
| this.fieldIncluded = new boolean[largestFieldIndex + 1]; |
| ArrayList<Class<?>> types = new ArrayList<Class<?>>(); |
| |
| // check if we support parsers for these types |
| for (int i = 0; i < fieldTypes.length; i++) { |
| Class<?> type = fieldTypes[i]; |
| |
| if (type != null) { |
| if (FieldParser.getParserForType(type) == null) { |
| throw new IllegalArgumentException("The type '" + type.getName() |
| + "' is not supported for the CSV input format."); |
| } |
| types.add(type); |
| fieldIncluded[sourceFieldIndices[i]] = true; |
| } |
| } |
| |
| this.fieldTypes = types.toArray(new Class<?>[types.size()]); |
| } |
| |
| protected void setFieldsGeneric(boolean[] includedMask, Class<?>[] fieldTypes) { |
| checkNotNull(includedMask); |
| checkNotNull(fieldTypes); |
| |
| ArrayList<Class<?>> types = new ArrayList<Class<?>>(); |
| |
| // check if types are valid for included fields |
| int typeIndex = 0; |
| for (int i = 0; i < includedMask.length; i++) { |
| |
| if (includedMask[i]) { |
| if (typeIndex > fieldTypes.length - 1) { |
| throw new IllegalArgumentException("Missing type for included field " + i + "."); |
| } |
| Class<?> type = fieldTypes[typeIndex++]; |
| |
| if (type == null) { |
| throw new IllegalArgumentException("Type for included field " + i + " should not be null."); |
| } else { |
| // check if we support parsers for this type |
| if (FieldParser.getParserForType(type) == null) { |
| throw new IllegalArgumentException("The type '" + type.getName() + "' is not supported for the CSV input format."); |
| } |
| types.add(type); |
| } |
| } |
| } |
| |
| this.fieldTypes = types.toArray(new Class<?>[types.size()]); |
| this.fieldIncluded = includedMask; |
| } |
| |
| public TimeZone getTimezone() { |
| return timezone; |
| } |
| |
| public void setTimezone(TimeZone timezone) { |
| this.timezone = timezone; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Runtime methods |
| // -------------------------------------------------------------------------------------------- |
| |
| @Override |
| public void open(FileInputSplit split) throws IOException { |
| super.open(split); |
| |
| // instantiate the parsers |
| FieldParser<?>[] parsers = new FieldParser<?>[fieldTypes.length]; |
| |
| for (int i = 0; i < fieldTypes.length; i++) { |
| if (fieldTypes[i] != null) { |
| Class<? extends FieldParser<?>> parserType = FieldParser.getParserForType(fieldTypes[i]); |
| if (parserType == null) { |
| throw new RuntimeException("No parser available for type '" + fieldTypes[i].getName() + "'."); |
| } |
| |
| FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class); |
| |
| p.setCharset(getCharset()); |
| if (this.quotedStringParsing) { |
| if (p instanceof StringParser) { |
| ((StringParser)p).enableQuotedStringParsing(this.quoteCharacter); |
| } else if (p instanceof StringValueParser) { |
| ((StringValueParser)p).enableQuotedStringParsing(this.quoteCharacter); |
| } |
| } |
| |
| parsers[i] = p; |
| } |
| } |
| this.fieldParsers = parsers; |
| |
| // skip the first line, if we are at the beginning of a file and have the option set |
| if (this.skipFirstLineAsHeader && this.splitStart == 0) { |
| readLine(); // read and ignore |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (this.invalidLineCount > 0) { |
| if (LOG.isWarnEnabled()) { |
| LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped."); |
| } |
| } |
| |
| if (this.commentCount > 0) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped."); |
| } |
| } |
| super.close(); |
| } |
| |
| protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int numBytes) throws ParseException { |
| |
| boolean[] fieldIncluded = this.fieldIncluded; |
| |
| int startPos = offset; |
| final int limit = offset + numBytes; |
| |
| for (int field = 0, output = 0; field < fieldIncluded.length; field++) { |
| |
| // check valid start position |
| if (startPos > limit || (startPos == limit && field != fieldIncluded.length - 1)) { |
| if (lenient) { |
| return false; |
| } else { |
| throw new ParseException("Row too short: " + new String(bytes, offset, numBytes, getCharset())); |
| } |
| } |
| |
| if (fieldIncluded[field]) { |
| // parse field |
| @SuppressWarnings("unchecked") |
| FieldParser<Object> parser = (FieldParser<Object>) this.fieldParsers[output]; |
| Object reuse = holders[output]; |
| startPos = parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse, false); |
| holders[output] = parser.getLastResult(); |
| |
| // check parse result |
| if (startPos < 0) { |
| // no good |
| if (lenient) { |
| return false; |
| } else { |
| String lineAsString = new String(bytes, offset, numBytes, getCharset()); |
| throw new ParseException("Line could not be parsed: '" + lineAsString + "'\n" |
| + "ParserError " + parser.getErrorState() + " \n" |
| + "Expect field types: "+fieldTypesToString() + " \n" |
| + "in file: " + filePath); |
| } |
| } |
| else if (startPos == limit |
| && field != fieldIncluded.length - 1 |
| && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) { |
| // We are at the end of the record, but not all fields have been read |
| // and the end is not a field delimiter indicating an empty last field. |
| if (lenient) { |
| return false; |
| } else { |
| throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); |
| } |
| } |
| output++; |
| } |
| else { |
| // skip field |
| startPos = skipFields(bytes, startPos, limit, this.fieldDelim); |
| if (startPos < 0) { |
| if (!lenient) { |
| String lineAsString = new String(bytes, offset, numBytes, getCharset()); |
| throw new ParseException("Line could not be parsed: '" + lineAsString+"'\n" |
| + "Expect field types: "+fieldTypesToString()+" \n" |
| + "in file: "+filePath); |
| } else { |
| return false; |
| } |
| } |
| else if (startPos == limit |
| && field != fieldIncluded.length - 1 |
| && !FieldParser.endsWithDelimiter(bytes, startPos - 1, fieldDelim)) { |
| // We are at the end of the record, but not all fields have been read |
| // and the end is not a field delimiter indicating an empty last field. |
| if (lenient) { |
| return false; |
| } else { |
| throw new ParseException("Row too short: " + new String(bytes, offset, numBytes)); |
| } |
| } |
| } |
| } |
| return true; |
| } |
| |
| private String fieldTypesToString() { |
| StringBuilder string = new StringBuilder(); |
| string.append(this.fieldTypes[0].toString()); |
| |
| for (int i = 1; i < this.fieldTypes.length; i++) { |
| string.append(", ").append(this.fieldTypes[i]); |
| } |
| |
| return string.toString(); |
| } |
| |
| protected int skipFields(byte[] bytes, int startPos, int limit, byte[] delim) { |
| |
| int i = startPos; |
| |
| final int delimLimit = limit - delim.length + 1; |
| |
| if (quotedStringParsing && bytes[i] == quoteCharacter) { |
| |
| // quoted string parsing enabled and field is quoted |
| i++; |
| |
| // search for ending quote character, continue when it is escaped |
| while (i < limit && (bytes[i] != quoteCharacter |
| || bytes[i - 1] == BACKSLASH |
| || (i + 1 < limit && bytes[i + 1] == quoteCharacter))) { |
| if (bytes[i - 1] != BACKSLASH |
| && bytes[i] == quoteCharacter |
| && i + 1 < limit |
| && bytes[i + 1] == quoteCharacter) { |
| i++; |
| } |
| i++; |
| } |
| i++; |
| |
| if (i == limit) { |
| // we are at the end of the record |
| return limit; |
| } else if ( i < delimLimit && FieldParser.delimiterNext(bytes, i, delim)) { |
| // we are not at the end, check if delimiter comes next |
| return i + delim.length; |
| } else { |
| // delimiter did not follow end quote. Error... |
| return -1; |
| } |
| } else { |
| // field is not quoted |
| while(i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) { |
| i++; |
| } |
| |
| if (i >= delimLimit) { |
| // no delimiter found. We are at the end of the record |
| return limit; |
| } else { |
| // delimiter found. |
| return i + delim.length; |
| } |
| } |
| } |
| |
| @SuppressWarnings("unused") |
| protected static void checkAndCoSort(int[] positions, Class<?>[] types) { |
| if (positions.length != types.length) { |
| throw new IllegalArgumentException("The positions and types must be of the same length"); |
| } |
| |
| TreeMap<Integer, Class<?>> map = new TreeMap<Integer, Class<?>>(); |
| |
| for (int i = 0; i < positions.length; i++) { |
| if (positions[i] < 0) { |
| throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid."); |
| } |
| if (types[i] == null) { |
| throw new IllegalArgumentException("The type " + i + " is invalid (null)"); |
| } |
| |
| if (map.containsKey(positions[i])) { |
| throw new IllegalArgumentException("The position " + positions[i] + " occurs multiple times."); |
| } |
| |
| map.put(positions[i], types[i]); |
| } |
| |
| int i = 0; |
| for (Map.Entry<Integer, Class<?>> entry : map.entrySet()) { |
| positions[i] = entry.getKey(); |
| types[i] = entry.getValue(); |
| i++; |
| } |
| } |
| |
| protected static void checkForMonotonousOrder(int[] positions, Class<?>[] types) { |
| if (positions.length != types.length) { |
| throw new IllegalArgumentException("The positions and types must be of the same length"); |
| } |
| |
| int lastPos = -1; |
| |
| for (int i = 0; i < positions.length; i++) { |
| if (positions[i] < 0) { |
| throw new IllegalArgumentException("The field " + " (" + positions[i] + ") is invalid."); |
| } |
| if (types[i] == null) { |
| throw new IllegalArgumentException("The type " + i + " is invalid (null)"); |
| } |
| |
| if (positions[i] <= lastPos) { |
| throw new IllegalArgumentException("The positions must be strictly increasing (no permutations are supported)."); |
| } |
| |
| lastPos = positions[i]; |
| } |
| } |
| |
| private static int max(int[] ints) { |
| checkArgument(ints.length > 0); |
| |
| int max = ints[0]; |
| for (int i = 1 ; i < ints.length; i++) { |
| max = Math.max(max, ints[i]); |
| } |
| return max; |
| } |
| } |