blob: 5a02027580dd0b4da3544b0e9bed498eecad0f2a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.types.parser;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.FloatValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.ShortValue;
import org.apache.flink.types.StringValue;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
/**
* A FieldParser is used parse a field from a sequence of bytes. Fields occur in a byte sequence and are terminated
* by the end of the byte sequence or a delimiter.
* <p>
* The parsers do not throw exceptions in general, but set an error state. That way, they can be used in functions
* that ignore invalid lines, rather than failing on them.
*
* @param <T> The type that is parsed.
*/
@PublicEvolving
public abstract class FieldParser<T> {
/**
* An enumeration of different types of errors that may occur.
*/
public static enum ParseErrorState {
/** No error occurred. */
NONE,
/** The domain of the numeric type is not large enough to hold the parsed value. */
NUMERIC_VALUE_OVERFLOW_UNDERFLOW,
/** A stand-alone sign was encountered while parsing a numeric type. */
NUMERIC_VALUE_ORPHAN_SIGN,
/** An illegal character was encountered while parsing a numeric type. */
NUMERIC_VALUE_ILLEGAL_CHARACTER,
/** The field was not in a correct format for the numeric type. */
NUMERIC_VALUE_FORMAT_ERROR,
/** A quoted string was not terminated until the line end. */
UNTERMINATED_QUOTED_STRING,
/** The parser found characters between the end of the quoted string and the delimiter. */
UNQUOTED_CHARS_AFTER_QUOTED_STRING,
/** The column is empty. */
EMPTY_COLUMN,
/** Invalid Boolean value **/
BOOLEAN_INVALID,
CHARSET_INVALID,
}
private Charset charset = StandardCharsets.UTF_8;
private ParseErrorState errorState = ParseErrorState.NONE;
/**
* Parses the value of a field from the byte array, taking care of properly reset
* the state of this parser.
* The start position within the byte array and the array's valid length is given.
* The content of the value is delimited by a field delimiter.
*
* @param bytes The byte array that holds the value.
* @param startPos The index where the field starts
* @param limit The limit unto which the byte contents is valid for the parser. The limit is the
* position one after the last valid byte.
* @param delim The field delimiter character
* @param reuse An optional reusable field to hold the value
* @param emptyColumnAsNull the result is null empty with emptyColumnAsNull enabled
*
* @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise.
*/
public int resetErrorStateAndParse(
byte[] bytes, int startPos, int limit, byte[] delim, T reuse, boolean emptyColumnAsNull) {
resetParserState();
if (startPos == limit - delim.length + 1 && emptyColumnAsNull) {
setErrorState(ParseErrorState.EMPTY_COLUMN);
return startPos;
} else {
return parseField(bytes, startPos, limit, delim, reuse);
}
}
public int resetErrorStateAndParse(byte[] bytes, int startPos, int limit, byte[] delim, T reuse) {
return resetErrorStateAndParse(bytes, startPos, limit, delim, reuse, false);
}
/**
* Each parser's logic should be implemented inside this method
*/
protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse);
/**
* Reset the state of the parser. Called as the very first method inside
* {@link FieldParser#resetErrorStateAndParse(byte[], int, int, byte[], Object, boolean)}, by default it just reset
* its error state.
* */
protected void resetParserState() {
this.errorState = ParseErrorState.NONE;
}
/**
* Gets the parsed field. This method returns the value parsed by the last successful invocation of
* {@link #parseField(byte[], int, int, byte[], Object)}. It objects are mutable and reused, it will return
* the object instance that was passed the parse function.
*
* @return The latest parsed field.
*/
public abstract T getLastResult();
/**
* Returns an instance of the parsed value type.
*
* @return An instance of the parsed value type.
*/
public abstract T createValue();
/**
* Checks if the delimiter starts at the given start position of the byte array.
*
* Attention: This method assumes that enough characters follow the start position for the delimiter check!
*
* @param bytes The byte array that holds the value.
* @param startPos The index of the byte array where the check for the delimiter starts.
* @param delim The delimiter to check for.
*
* @return true if a delimiter starts at the given start position, false otherwise.
*/
public static final boolean delimiterNext(byte[] bytes, int startPos, byte[] delim) {
for(int pos = 0; pos < delim.length; pos++) {
// check each position
if(delim[pos] != bytes[startPos+pos]) {
return false;
}
}
return true;
}
/**
* Checks if the given bytes ends with the delimiter at the given end position.
*
* @param bytes The byte array that holds the value.
* @param endPos The index of the byte array where the check for the delimiter ends.
* @param delim The delimiter to check for.
*
* @return true if a delimiter ends at the given end position, false otherwise.
*/
public static final boolean endsWithDelimiter(byte[] bytes, int endPos, byte[] delim) {
if (endPos < delim.length - 1) {
return false;
}
for (int pos = 0; pos < delim.length; ++pos) {
if (delim[pos] != bytes[endPos - delim.length + 1 + pos]) {
return false;
}
}
return true;
}
/**
* Sets the error state of the parser. Called by subclasses of the parser to set the type of error
* when failing a parse.
*
* @param error The error state to set.
*/
protected void setErrorState(ParseErrorState error) {
this.errorState = error;
}
/**
* Gets the error state of the parser, as a value of the enumeration {@link ParseErrorState}.
* If no error occurred, the error state will be {@link ParseErrorState#NONE}.
*
* @return The current error state of the parser.
*/
public ParseErrorState getErrorState() {
return this.errorState;
}
/**
* Returns the end position of a string. Sets the error state if the column is empty.
*
* @return the end position of the string or -1 if an error occurred
*/
protected final int nextStringEndPos(byte[] bytes, int startPos, int limit, byte[] delimiter) {
int endPos = startPos;
final int delimLimit = limit - delimiter.length + 1;
while (endPos < limit) {
if (endPos < delimLimit && delimiterNext(bytes, endPos, delimiter)) {
break;
}
endPos++;
}
if (endPos == startPos) {
setErrorState(ParseErrorState.EMPTY_COLUMN);
return -1;
}
return endPos;
}
/**
* Returns the length of a string. Throws an exception if the column is empty.
*
* @return the length of the string
*/
protected static final int nextStringLength(byte[] bytes, int startPos, int length, char delimiter) {
if (length <= 0) {
throw new IllegalArgumentException("Invalid input: Empty string");
}
int limitedLength = 0;
final byte delByte = (byte) delimiter;
while (limitedLength < length && bytes[startPos + limitedLength] != delByte) {
limitedLength++;
}
return limitedLength;
}
/**
* Gets the character set used for this parser.
*
* @return the charset used for this parser.
*/
public Charset getCharset() {
return this.charset;
}
/**
* Sets the character set used for this parser.
*
* @param charset charset used for this parser.
*/
public void setCharset(Charset charset) {
this.charset = charset;
}
// --------------------------------------------------------------------------------------------
// Mapping from types to parsers
// --------------------------------------------------------------------------------------------
/**
* Gets the parser for the type specified by the given class. Returns null, if no parser for that class
* is known.
*
* @param type The class of the type to get the parser for.
* @return The parser for the given type, or null, if no such parser exists.
*/
public static <T> Class<FieldParser<T>> getParserForType(Class<T> type) {
Class<? extends FieldParser<?>> parser = PARSERS.get(type);
if (parser == null) {
return null;
} else {
@SuppressWarnings("unchecked")
Class<FieldParser<T>> typedParser = (Class<FieldParser<T>>) parser;
return typedParser;
}
}
private static final Map<Class<?>, Class<? extends FieldParser<?>>> PARSERS =
new HashMap<Class<?>, Class<? extends FieldParser<?>>>();
static {
// basic types
PARSERS.put(Byte.class, ByteParser.class);
PARSERS.put(Short.class, ShortParser.class);
PARSERS.put(Integer.class, IntParser.class);
PARSERS.put(Long.class, LongParser.class);
PARSERS.put(String.class, StringParser.class);
PARSERS.put(Float.class, FloatParser.class);
PARSERS.put(Double.class, DoubleParser.class);
PARSERS.put(Boolean.class, BooleanParser.class);
PARSERS.put(BigDecimal.class, BigDecParser.class);
PARSERS.put(BigInteger.class, BigIntParser.class);
// value types
PARSERS.put(ByteValue.class, ByteValueParser.class);
PARSERS.put(ShortValue.class, ShortValueParser.class);
PARSERS.put(IntValue.class, IntValueParser.class);
PARSERS.put(LongValue.class, LongValueParser.class);
PARSERS.put(StringValue.class, StringValueParser.class);
PARSERS.put(FloatValue.class, FloatValueParser.class);
PARSERS.put(DoubleValue.class, DoubleValueParser.class);
PARSERS.put(BooleanValue.class, BooleanValueParser.class);
// SQL date/time types
PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
PARSERS.put(java.sql.Date.class, SqlDateParser.class);
PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
}
}