blob: 3663d2d1ee150a5ab4df0e58e65c92eb872dc002 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.RowComparator;
import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/**
* TypeInformation for {@link Row}
*/
@PublicEvolving
public class RowTypeInfo extends TupleTypeInfoBase<Row> {
private static final long serialVersionUID = 9158518989896601963L;
private static final String REGEX_INT_FIELD = "[0-9]+";
private static final String REGEX_STR_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
private static final String REGEX_FIELD = REGEX_STR_FIELD + "|" + REGEX_INT_FIELD;
private static final String REGEX_NESTED_FIELDS = "(" + REGEX_FIELD + ")(\\.(.+))?";
private static final String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS + "|\\"
+ ExpressionKeys.SELECT_ALL_CHAR + "|\\"
+ ExpressionKeys.SELECT_ALL_CHAR_SCALA;
private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
private static final Pattern PATTERN_INT_FIELD = Pattern.compile(REGEX_INT_FIELD);
// --------------------------------------------------------------------------------------------
protected final String[] fieldNames;
/** Temporary variable for directly passing orders to comparators. */
private boolean[] comparatorOrders = null;
public RowTypeInfo(TypeInformation<?>... types) {
super(Row.class, types);
this.fieldNames = new String[types.length];
for (int i = 0; i < types.length; i++) {
fieldNames[i] = "f" + i;
}
}
public RowTypeInfo(TypeInformation<?>[] types, String[] fieldNames) {
super(Row.class, types);
checkNotNull(fieldNames, "FieldNames should not be null.");
checkArgument(
types.length == fieldNames.length,
"Number of field types and names is different.");
checkArgument(
!hasDuplicateFieldNames(fieldNames),
"Field names are not unique.");
this.fieldNames = Arrays.copyOf(fieldNames, fieldNames.length);
}
@Override
public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
if (!matcher.matches()) {
throw new InvalidFieldReferenceException(
"Invalid tuple field reference \"" + fieldExpression + "\".");
}
String field = matcher.group(0);
if ((field.equals(ExpressionKeys.SELECT_ALL_CHAR)) ||
(field.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA))) {
// handle select all
int keyPosition = 0;
for (TypeInformation<?> fType : types) {
if (fType instanceof CompositeType) {
CompositeType<?> cType = (CompositeType<?>) fType;
cType.getFlatFields(ExpressionKeys.SELECT_ALL_CHAR, offset + keyPosition, result);
keyPosition += cType.getTotalFields() - 1;
} else {
result.add(new FlatFieldDescriptor(offset + keyPosition, fType));
}
keyPosition++;
}
} else {
field = matcher.group(1);
Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
int fieldIndex;
if (intFieldMatcher.matches()) {
// field expression is an integer
fieldIndex = Integer.valueOf(field);
} else {
fieldIndex = this.getFieldIndex(field);
}
// fetch the field type will throw exception if the index is illegal
TypeInformation<?> fieldType = this.getTypeAt(fieldIndex);
// compute the offset,
for (int i = 0; i < fieldIndex; i++) {
offset += this.getTypeAt(i).getTotalFields();
}
String tail = matcher.group(3);
if (tail == null) {
// expression hasn't nested field
if (fieldType instanceof CompositeType) {
((CompositeType) fieldType).getFlatFields("*", offset, result);
} else {
result.add(new FlatFieldDescriptor(offset, fieldType));
}
} else {
// expression has nested field
if (fieldType instanceof CompositeType) {
((CompositeType) fieldType).getFlatFields(tail, offset, result);
} else {
throw new InvalidFieldReferenceException(
"Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + ".");
}
}
}
}
@Override
public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
if (!matcher.matches()) {
if (fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR) ||
fieldExpression.equals(ExpressionKeys.SELECT_ALL_CHAR_SCALA)) {
throw new InvalidFieldReferenceException("Wildcard expressions are not allowed here.");
} else {
throw new InvalidFieldReferenceException("Invalid format of Row field expression \""+fieldExpression+"\".");
}
}
String field = matcher.group(1);
Matcher intFieldMatcher = PATTERN_INT_FIELD.matcher(field);
int fieldIndex;
if (intFieldMatcher.matches()) {
// field expression is an integer
fieldIndex = Integer.valueOf(field);
} else {
fieldIndex = this.getFieldIndex(field);
}
// fetch the field type will throw exception if the index is illegal
TypeInformation<X> fieldType = this.getTypeAt(fieldIndex);
String tail = matcher.group(3);
if (tail == null) {
// found the type
return fieldType;
} else {
if (fieldType instanceof CompositeType) {
return ((CompositeType<?>) fieldType).getTypeAt(tail);
} else {
throw new InvalidFieldReferenceException(
"Nested field expression \""+ tail + "\" not possible on atomic type "+fieldType+".");
}
}
}
@Override
public TypeComparator<Row> createComparator(
int[] logicalKeyFields,
boolean[] orders,
int logicalFieldOffset,
ExecutionConfig config) {
comparatorOrders = orders;
TypeComparator<Row> comparator = super.createComparator(
logicalKeyFields,
orders,
logicalFieldOffset,
config);
comparatorOrders = null;
return comparator;
}
@Override
protected TypeComparatorBuilder<Row> createTypeComparatorBuilder() {
if (comparatorOrders == null) {
throw new IllegalStateException("Cannot create comparator builder without orders.");
}
return new RowTypeComparatorBuilder(comparatorOrders);
}
@Override
public String[] getFieldNames() {
return fieldNames;
}
@Override
public int getFieldIndex(String fieldName) {
for (int i = 0; i < fieldNames.length; i++) {
if (fieldNames[i].equals(fieldName)) {
return i;
}
}
return -1;
}
@Override
public TypeSerializer<Row> createSerializer(ExecutionConfig config) {
int len = getArity();
TypeSerializer<?>[] fieldSerializers = new TypeSerializer[len];
for (int i = 0; i < len; i++) {
fieldSerializers[i] = types[i].createSerializer(config);
}
return new RowSerializer(fieldSerializers);
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof RowTypeInfo;
}
@Override
public int hashCode() {
return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder("Row");
if (types.length > 0) {
bld.append('(').append(fieldNames[0]).append(": ").append(types[0]);
for (int i = 1; i < types.length; i++) {
bld.append(", ").append(fieldNames[i]).append(": ").append(types[i]);
}
bld.append(')');
}
return bld.toString();
}
/**
* Returns the field types of the row. The order matches the order of the field names.
*/
public TypeInformation<?>[] getFieldTypes() {
return types;
}
/**
* Tests whether an other object describes the same, schema-equivalent row information.
*/
public boolean schemaEquals(Object obj) {
return equals(obj) && Arrays.equals(fieldNames, ((RowTypeInfo) obj).fieldNames);
}
private boolean hasDuplicateFieldNames(String[] fieldNames) {
HashSet<String> names = new HashSet<>();
for (String field : fieldNames) {
if (!names.add(field)) {
return true;
}
}
return false;
}
private class RowTypeComparatorBuilder implements TypeComparatorBuilder<Row> {
private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>();
private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>();
private final boolean[] comparatorOrders;
public RowTypeComparatorBuilder(boolean[] comparatorOrders) {
this.comparatorOrders = comparatorOrders;
}
@Override
public void initializeTypeComparatorBuilder(int size) {
fieldComparators.ensureCapacity(size);
logicalKeyFields.ensureCapacity(size);
}
@Override
public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
fieldComparators.add(comparator);
logicalKeyFields.add(fieldId);
}
@Override
public TypeComparator<Row> createTypeComparator(ExecutionConfig config) {
checkState(
fieldComparators.size() > 0,
"No field comparators were defined for the TupleTypeComparatorBuilder."
);
checkState(
logicalKeyFields.size() > 0,
"No key fields were defined for the TupleTypeComparatorBuilder."
);
checkState(
fieldComparators.size() == logicalKeyFields.size(),
"The number of field comparators and key fields is not equal."
);
final int maxKey = Collections.max(logicalKeyFields);
checkState(
maxKey >= 0,
"The maximum key field must be greater or equal than 0."
);
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1];
for (int i = 0; i <= maxKey; i++) {
fieldSerializers[i] = types[i].createSerializer(config);
}
int[] keyPositions = new int[logicalKeyFields.size()];
for (int i = 0; i < keyPositions.length; i++) {
keyPositions[i] = logicalKeyFields.get(i);
}
TypeComparator[] comparators = new TypeComparator[fieldComparators.size()];
for (int i = 0; i < fieldComparators.size(); i++) {
comparators[i] = fieldComparators.get(i);
}
//noinspection unchecked
return new RowComparator(
getArity(),
keyPositions,
comparators,
(TypeSerializer<Object>[]) fieldSerializers,
comparatorOrders);
}
}
/**
* Creates a {@link RowTypeInfo} with projected fields.
*
* @param rowType The original RowTypeInfo whose fields are projected
* @param fieldMapping The field mapping of the projection
* @return A RowTypeInfo with projected fields.
*/
public static RowTypeInfo projectFields(RowTypeInfo rowType, int[] fieldMapping) {
TypeInformation[] fieldTypes = new TypeInformation[fieldMapping.length];
String[] fieldNames = new String[fieldMapping.length];
for (int i = 0; i < fieldMapping.length; i++) {
fieldTypes[i] = rowType.getTypeAt(fieldMapping[i]);
fieldNames[i] = rowType.getFieldNames()[fieldMapping[i]];
}
return new RowTypeInfo(fieldTypes, fieldNames);
}
}