blob: b86c09a6898adf9bdac2b81abd0dbb48091f4c89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.types;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.flink.types.RowUtils.deepEqualsRow;
import static org.apache.flink.types.RowUtils.deepHashCodeRow;
/**
* A row is a fixed-length, null-aware composite type for storing multiple values in a deterministic
* field order. Every field can be null regardless of the field's type. The type of row fields
* cannot be automatically inferred; therefore, it is required to provide type information whenever
* a row is produced.
*
* <p>The main purpose of rows is to bridge between Flink's Table and SQL ecosystem and other APIs.
* Therefore, a row does not only consist of a schema part (containing the fields) but also attaches
* a {@link RowKind} for encoding a change in a changelog. Thus, a row can be considered as an entry
* in a changelog. For example, in regular batch scenarios, a changelog would consist of a bounded
* stream of {@link RowKind#INSERT} rows. The row kind is kept separate from the fields and can be
* accessed by using {@link #getKind()} and {@link #setKind(RowKind)}.
*
* <p>Fields of a row can be accessed either position-based or name-based. An implementer can decide
* in which field mode a row should operate during creation. Rows that were produced by the
* framework support a hybrid of both field modes (i.e. named positions):
*
* <h1>Position-based field mode</h1>
*
* <p>{@link Row#withPositions(int)} creates a fixed-length row. The fields can be accessed by
* position (zero-based) using {@link #getField(int)} and {@link #setField(int, Object)}. Every
* field is initialized with {@code null} by default.
*
* <h1>Name-based field mode</h1>
*
* <p>{@link Row#withNames()} creates a variable-length row. The fields can be accessed by name
* using {@link #getField(String)} and {@link #setField(String, Object)}. Every field is initialized
* during the first call to {@link #setField(String, Object)} for the given name. However, the
* framework will initialize missing fields with {@code null} and reorder all fields once more type
* information is available during serialization or input conversion. Thus, even name-based rows
* eventually become fixed-length composite types with a deterministic field order. Name-based rows
* perform worse than position-based rows but simplify row creation and code readability.
*
* <h1>Hybrid / named-position field mode</h1>
*
* <p>Rows that were produced by the framework (after deserialization or output conversion) are
* fixed-length rows with a deterministic field order that can map static field names to field
* positions. Thus, fields can be accessed both via {@link #getField(int)} and {@link
* #getField(String)}. Both {@link #setField(int, Object)} and {@link #setField(String, Object)} are
* supported for existing fields. However, adding new field names via {@link #setField(String,
* Object)} is not allowed. A hybrid row's {@link #equals(Object)} supports comparing to all kinds
* of rows. A hybrid row's {@link #hashCode()} is only valid for position-based rows.
*
* <p>A row instance is in principle {@link Serializable}. However, it may contain non-serializable
* fields in which case serialization will fail if the row is not serialized with Flink's
* serialization stack.
*
* <p>The {@link #equals(Object)} and {@link #hashCode()} methods of this class support all external
* conversion classes of the table ecosystem.
*/
@PublicEvolving
public final class Row implements Serializable {
private static final long serialVersionUID = 3L;
/** The kind of change a row describes in a changelog. */
private RowKind kind;
/** Fields organized by position. Either this or {@link #fieldByName} is set. */
private final @Nullable Object[] fieldByPosition;
/** Fields organized by name. Either this or {@link #fieldByPosition} is set. */
private final @Nullable Map<String, Object> fieldByName;
/** Mapping from field names to positions. Requires {@link #fieldByPosition} semantics. */
private final @Nullable LinkedHashMap<String, Integer> positionByName;
Row(
RowKind kind,
@Nullable Object[] fieldByPosition,
@Nullable Map<String, Object> fieldByName,
@Nullable LinkedHashMap<String, Integer> positionByName) {
this.kind = kind;
this.fieldByPosition = fieldByPosition;
this.fieldByName = fieldByName;
this.positionByName = positionByName;
}
/**
* Creates a fixed-length row in position-based field mode.
*
* <p>The semantics are equivalent to {@link Row#withPositions(RowKind, int)}. This constructor
* exists for backwards compatibility.
*
* @param kind kind of change a row describes in a changelog
* @param arity the number of fields in the row
*/
public Row(RowKind kind, int arity) {
this.kind = Preconditions.checkNotNull(kind, "Row kind must not be null.");
this.fieldByPosition = new Object[arity];
this.fieldByName = null;
this.positionByName = null;
}
/**
* Creates a fixed-length row in position-based field mode.
*
* <p>The semantics are equivalent to {@link Row#withPositions(int)}. This constructor exists
* for backwards compatibility.
*
* @param arity the number of fields in the row
*/
public Row(int arity) {
this(RowKind.INSERT, arity);
}
/**
* Creates a fixed-length row in position-based field mode.
*
* <p>Fields can be accessed by position via {@link #setField(int, Object)} and {@link
* #getField(int)}.
*
* <p>See the class documentation of {@link Row} for more information.
*
* @param kind kind of change a row describes in a changelog
* @param arity the number of fields in the row
* @return a new row instance
*/
public static Row withPositions(RowKind kind, int arity) {
return new Row(kind, new Object[arity], null, null);
}
/**
* Creates a fixed-length row in position-based field mode.
*
* <p>Fields can be accessed by position via {@link #setField(int, Object)} and {@link
* #getField(int)}.
*
* <p>By default, a row describes an {@link RowKind#INSERT} change.
*
* <p>See the class documentation of {@link Row} for more information.
*
* @param arity the number of fields in the row
* @return a new row instance
*/
public static Row withPositions(int arity) {
return withPositions(RowKind.INSERT, arity);
}
/**
* Creates a variable-length row in name-based field mode.
*
* <p>Fields can be accessed by name via {@link #setField(String, Object)} and {@link
* #getField(String)}.
*
* <p>See the class documentation of {@link Row} for more information.
*
* @param kind kind of change a row describes in a changelog
* @return a new row instance
*/
public static Row withNames(RowKind kind) {
return new Row(kind, null, new HashMap<>(), null);
}
/**
* Creates a variable-length row in name-based field mode.
*
* <p>Fields can be accessed by name via {@link #setField(String, Object)} and {@link
* #getField(String)}.
*
* <p>By default, a row describes an {@link RowKind#INSERT} change.
*
* <p>See the class documentation of {@link Row} for more information.
*
* @return a new row instance
*/
public static Row withNames() {
return withNames(RowKind.INSERT);
}
/**
* Returns the kind of change that this row describes in a changelog.
*
* <p>By default, a row describes an {@link RowKind#INSERT} change.
*
* @see RowKind
*/
public RowKind getKind() {
return kind;
}
/**
* Sets the kind of change that this row describes in a changelog.
*
* <p>By default, a row describes an {@link RowKind#INSERT} change.
*
* @see RowKind
*/
public void setKind(RowKind kind) {
Preconditions.checkNotNull(kind, "Row kind must not be null.");
this.kind = kind;
}
/**
* Returns the number of fields in the row.
*
* <p>Note: The row kind is kept separate from the fields and is not included in this number.
*
* @return the number of fields in the row
*/
public int getArity() {
if (fieldByPosition != null) {
return fieldByPosition.length;
} else {
assert fieldByName != null;
return fieldByName.size();
}
}
/**
* Returns the field's content at the specified field position.
*
* <p>Note: The row must operate in position-based field mode.
*
* @param pos the position of the field, 0-based
* @return the field's content at the specified position
*/
public @Nullable Object getField(int pos) {
if (fieldByPosition != null) {
return fieldByPosition[pos];
} else {
throw new IllegalArgumentException(
"Accessing a field by position is not supported in name-based field mode.");
}
}
/**
* Returns the field's content at the specified field position.
*
* <p>Note: The row must operate in position-based field mode.
*
* <p>This method avoids a lot of manual casting in the user implementation.
*
* @param pos the position of the field, 0-based
* @return the field's content at the specified position
*/
@SuppressWarnings("unchecked")
public <T> T getFieldAs(int pos) {
return (T) getField(pos);
}
/**
* Returns the field's content using the specified field name.
*
* <p>Note: The row must operate in name-based field mode.
*
* @param name the name of the field or null if not set previously
* @return the field's content
*/
public @Nullable Object getField(String name) {
if (fieldByName != null) {
return fieldByName.get(name);
} else if (positionByName != null) {
final Integer pos = positionByName.get(name);
if (pos == null) {
throw new IllegalArgumentException(
String.format("Unknown field name '%s' for mapping to a position.", name));
}
assert fieldByPosition != null;
return fieldByPosition[pos];
} else {
throw new IllegalArgumentException(
"Accessing a field by name is not supported in position-based field mode.");
}
}
/**
* Returns the field's content using the specified field name.
*
* <p>Note: The row must operate in name-based field mode.
*
* <p>This method avoids a lot of manual casting in the user implementation.
*
* @param name the name of the field, set previously
* @return the field's content
*/
@SuppressWarnings("unchecked")
public <T> T getFieldAs(String name) {
return (T) getField(name);
}
/**
* Sets the field's content at the specified position.
*
* <p>Note: The row must operate in position-based field mode.
*
* @param pos the position of the field, 0-based
* @param value the value to be assigned to the field at the specified position
*/
public void setField(int pos, @Nullable Object value) {
if (fieldByPosition != null) {
fieldByPosition[pos] = value;
} else {
throw new IllegalArgumentException(
"Accessing a field by position is not supported in name-based field mode.");
}
}
/**
* Sets the field's content using the specified field name.
*
* <p>Note: The row must operate in name-based field mode.
*
* @param name the name of the field
* @param value the value to be assigned to the field
*/
public void setField(String name, @Nullable Object value) {
if (fieldByName != null) {
fieldByName.put(name, value);
} else if (positionByName != null) {
final Integer pos = positionByName.get(name);
if (pos == null) {
throw new IllegalArgumentException(
String.format(
"Unknown field name '%s' for mapping to a row position. "
+ "Available names are: %s",
name, positionByName.keySet()));
}
assert fieldByPosition != null;
fieldByPosition[pos] = value;
} else {
throw new IllegalArgumentException(
"Accessing a field by name is not supported in position-based field mode.");
}
}
/**
* Returns the set of field names if this row operates in name-based field mode, otherwise null.
*
* <p>This method is a helper method for serializers and converters but can also be useful for
* other row transformations.
*
* @param includeNamedPositions whether or not to include named positions when this row operates
* in a hybrid field mode
*/
public @Nullable Set<String> getFieldNames(boolean includeNamedPositions) {
if (fieldByName != null) {
return fieldByName.keySet();
}
if (includeNamedPositions && positionByName != null) {
return positionByName.keySet();
}
return null;
}
/** Clears all fields of this row. */
public void clear() {
if (fieldByPosition != null) {
Arrays.fill(fieldByPosition, null);
} else {
assert fieldByName != null;
fieldByName.clear();
}
}
@Override
public String toString() {
return RowUtils.deepToStringRow(kind, fieldByPosition, fieldByName);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Row other = (Row) o;
return deepEqualsRow(
kind,
fieldByPosition,
fieldByName,
positionByName,
other.kind,
other.fieldByPosition,
other.fieldByName,
other.positionByName);
}
@Override
public int hashCode() {
return deepHashCodeRow(kind, fieldByPosition, fieldByName);
}
// --------------------------------------------------------------------------------------------
// Utility methods
// --------------------------------------------------------------------------------------------
/**
* Creates a fixed-length row in position-based field mode and assigns the given values to the
* row's fields.
*
* <p>This method should be more convenient than {@link Row#withPositions(int)} in many cases.
*
* <p>For example:
*
* <pre>
* Row.of("hello", true, 1L);
* </pre>
*
* instead of
*
* <pre>
* Row row = Row.withPositions(3);
* row.setField(0, "hello");
* row.setField(1, true);
* row.setField(2, 1L);
* </pre>
*
* <p>By default, a row describes an {@link RowKind#INSERT} change.
*/
public static Row of(Object... values) {
final Row row = new Row(values.length);
for (int i = 0; i < values.length; i++) {
row.setField(i, values[i]);
}
return row;
}
/**
* Creates a fixed-length row in position-based field mode with given kind and assigns the given
* values to the row's fields.
*
* <p>This method should be more convenient than {@link Row#withPositions(RowKind, int)} in many
* cases.
*
* <p>For example:
*
* <pre>
* Row.ofKind(RowKind.INSERT, "hello", true, 1L);
* </pre>
*
* instead of
*
* <pre>
* Row row = Row.withPositions(RowKind.INSERT, 3);
* row.setField(0, "hello");
* row.setField(1, true);
* row.setField(2, 1L);
* </pre>
*/
public static Row ofKind(RowKind kind, Object... values) {
final Row row = new Row(kind, values.length);
for (int i = 0; i < values.length; i++) {
row.setField(i, values[i]);
}
return row;
}
/**
* Creates a new row which is copied from another row (including its {@link RowKind}).
*
* <p>This method does not perform a deep copy. Use {@link RowSerializer#copy(Row)} if required.
*/
public static Row copy(Row row) {
final Object[] newFieldByPosition;
if (row.fieldByPosition != null) {
newFieldByPosition = new Object[row.fieldByPosition.length];
System.arraycopy(
row.fieldByPosition, 0, newFieldByPosition, 0, newFieldByPosition.length);
} else {
newFieldByPosition = null;
}
final Map<String, Object> newFieldByName;
if (row.fieldByName != null) {
newFieldByName = new HashMap<>(row.fieldByName);
} else {
newFieldByName = null;
}
return new Row(row.kind, newFieldByPosition, newFieldByName, row.positionByName);
}
/**
* Creates a new row with projected fields and identical {@link RowKind} from another row.
*
* <p>This method does not perform a deep copy.
*
* <p>Note: The row must operate in position-based field mode. Field names are not projected.
*
* @param fieldPositions field indices to be projected
*/
public static Row project(Row row, int[] fieldPositions) {
final Row newRow = Row.withPositions(row.kind, fieldPositions.length);
for (int i = 0; i < fieldPositions.length; i++) {
newRow.setField(i, row.getField(fieldPositions[i]));
}
return newRow;
}
/**
* Creates a new row with projected fields and identical {@link RowKind} from another row.
*
* <p>This method does not perform a deep copy.
*
* <p>Note: The row must operate in name-based field mode.
*
* @param fieldNames field names to be projected
*/
public static Row project(Row row, String[] fieldNames) {
final Row newRow = Row.withNames(row.getKind());
for (String fieldName : fieldNames) {
newRow.setField(fieldName, row.getField(fieldName));
}
return newRow;
}
/**
* Creates a new row with fields that are copied from the other rows and appended to the
* resulting row in the given order. The {@link RowKind} of the first row determines the {@link
* RowKind} of the result.
*
* <p>This method does not perform a deep copy.
*
* <p>Note: All rows must operate in position-based field mode.
*/
public static Row join(Row first, Row... remainings) {
Preconditions.checkArgument(
first.fieldByPosition != null,
"All rows must operate in position-based field mode.");
int newLength = first.fieldByPosition.length;
for (Row remaining : remainings) {
Preconditions.checkArgument(
remaining.fieldByPosition != null,
"All rows must operate in position-based field mode.");
newLength += remaining.fieldByPosition.length;
}
final Row joinedRow = new Row(first.kind, newLength);
int index = 0;
// copy the first row
assert joinedRow.fieldByPosition != null;
System.arraycopy(
first.fieldByPosition,
0,
joinedRow.fieldByPosition,
index,
first.fieldByPosition.length);
index += first.fieldByPosition.length;
// copy the remaining rows
for (Row remaining : remainings) {
assert remaining.fieldByPosition != null;
System.arraycopy(
remaining.fieldByPosition,
0,
joinedRow.fieldByPosition,
index,
remaining.fieldByPosition.length);
index += remaining.fieldByPosition.length;
}
return joinedRow;
}
}