blob: da986df8f9b3fea0d9451ef2d6f0f1f3812fb7c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.api.types.TimestampType;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* A table schema that represents a table's structure with field names and types.
*/
@PublicEvolving
public class TableSchema {
private final Column[] columns;
private final String[] primaryKeys;
private final String[][] uniqueKeys;
private final String[][] indexes;
private final ComputedColumn[] computedColumns;
private final Watermark[] watermarks;
private final Map<String, Integer> columnNameToColumnIndex;
public TableSchema(Column[] columns) {
this(columns, new String[]{}, new String[][]{}, new String[][]{}, new ComputedColumn[]{}, new Watermark[]{});
}
public TableSchema(
Column[] columns, String[] primaryKeys, String[][] uniqueKeys, String[][] indexes,
ComputedColumn[] computedColumns, Watermark[] watermarks) {
this.columns = columns;
this.primaryKeys = primaryKeys;
this.uniqueKeys = uniqueKeys;
this.indexes = indexes;
this.computedColumns = computedColumns;
this.watermarks = watermarks;
// validate and create name to column index
columnNameToColumnIndex = new HashMap<>();
final Set<String> duplicateNames = new HashSet<>();
final Set<String> uniqueNames = new HashSet<>();
for (int i = 0; i < this.columns.length; i++) {
Preconditions.checkNotNull(columns[i]);
final String fieldName = this.columns[i].name();
columnNameToColumnIndex.put(fieldName, i);
if (uniqueNames.contains(fieldName)) {
duplicateNames.add(fieldName);
} else {
uniqueNames.add(fieldName);
}
}
if (!duplicateNames.isEmpty()) {
throw new TableException(
"Table column names must be unique.\n" +
"The duplicate columns are: " + duplicateNames.toString() + "\n" +
"All column names: " +
Arrays.toString(Arrays.stream(this.columns).map(Column::name).toArray(String[]::new))
);
}
// validate primary keys
for (int i = 0; i < primaryKeys.length; i++) {
if (!columnNameToColumnIndex.containsKey(primaryKeys[i])) {
throw new TableException(
"Primary key field: " + primaryKeys[i] + " not found in table schema."
);
}
}
// validate unique keys
for (int i = 0; i < uniqueKeys.length; i++) {
String[] uniqueKey = uniqueKeys[i];
if (null == uniqueKey || 0 == uniqueKey.length) {
throw new TableException("Unique key should not be empty.");
}
for (int j = 0; j < uniqueKey.length; j++) {
if (!columnNameToColumnIndex.containsKey(uniqueKey[j])) {
throw new TableException(
"Unique key field: " + uniqueKey[j] + " not found in table schema."
);
}
}
}
}
public TableSchema(String[] names, InternalType[] types, boolean[] nulls) {
this(validate(names, types, nulls));
}
public TableSchema(String[] names, InternalType[] types) {
this(validate(names, types));
}
private static Column[] validate(String[] names, InternalType[] types, boolean[] nulls) {
if (names.length != types.length) {
throw new TableException(
"Number of column indexes and column names must be equal.\n" +
"Column names count is [" + names.length + "]\n" +
"Column types count is [" + types.length + "]\n" +
"Column names: " + Arrays.toString(names) + "\n" +
"Column types: " + Arrays.toString(types)
);
}
if (names.length != nulls.length) {
throw new TableException(
"Number of column names and nullabilities must be equal.\n" +
"Column names count is: " + names.length + "\n" +
"Column nullabilities count is: " + nulls.length + "\n" +
"List of all field names: " + Arrays.toString(names) + "\n" +
"List of all field nullabilities: " + Arrays.toString(nulls)
);
}
List<Column> columns = new ArrayList<>();
for (int i = 0; i < names.length; i++) {
columns.add(new Column(names[i], types[i], nulls[i]));
}
return columns.toArray(new Column[columns.size()]);
}
private static Column[] validate(String[] names, InternalType[] types) {
boolean[] nulls = new boolean[types.length];
for (int i = 0; i < types.length; i++) {
nulls[i] = !TimestampType.ROWTIME_INDICATOR.equals(types[i])
&& !TimestampType.PROCTIME_INDICATOR.equals(types[i]);
}
return validate(names, types, nulls);
}
public Column[] getColumns() {
return this.columns;
}
public String[] getPrimaryKeys() {
return this.primaryKeys;
}
public String[][] getUniqueKeys() {
return this.uniqueKeys;
}
public String[][] getNormalIndexes() {
// keep the original index order
List<String[]> normalIndexes = new ArrayList<>();
for (String[] fields : indexes) {
if (!isUniqueColumns(fields)) {
normalIndexes.add(fields);
}
}
return normalIndexes.toArray(new String[0][]);
}
public String[][] getUniqueIndexes() {
// keep the original index order
List<String[]> uniqueIndexes = new ArrayList<>();
for (String[] fields : indexes) {
if (isUniqueColumns(fields)) {
uniqueIndexes.add(fields);
}
}
return uniqueIndexes.toArray(new String[0][]);
}
public int getColumnIndex(String columnName) {
if (columnNameToColumnIndex.containsKey(columnName)) {
return columnNameToColumnIndex.get(columnName);
} else {
throw new IllegalArgumentException("The given column name is not in the table schema.");
}
}
public boolean isUniqueColumns(String[] fields) {
for (String[] uniqueKey : uniqueKeys) {
if (Arrays.equals(uniqueKey, fields)) {
return true;
}
}
return Arrays.equals(primaryKeys, fields);
}
/**
* Returns all field type information as an array.
*/
public InternalType[] getFieldTypes() {
return Arrays.stream(columns).map(Column::internalType).toArray(InternalType[]::new);
}
/**
* Returns the specified type information for the given field index.
*
* @param fieldIndex the index of the field
*/
public Optional<InternalType> getFieldType(int fieldIndex) {
if (fieldIndex < 0 || fieldIndex >= columns.length) {
return Optional.empty();
}
return Optional.of(columns[fieldIndex].internalType());
}
/**
* Returns the specified type information for the given field name.
*
* @param fieldName the name of the field
*/
public Optional<InternalType> getFieldType(String fieldName) {
if (columnNameToColumnIndex.containsKey(fieldName)) {
return Optional.of(columns[columnNameToColumnIndex.get(fieldName)].internalType());
}
return Optional.empty();
}
/**
* Returns the number of fields.
*/
public int getFieldCount() {
return columns.length;
}
/**
* Returns all field names as an array.
*/
public String[] getFieldNames() {
return Arrays.stream(columns).map(Column::name).toArray(String[]::new);
}
/**
* Returns the specified name for the given field index.
*
* @param fieldIndex the index of the field
*/
public Optional<String> getFieldName(int fieldIndex) {
if (fieldIndex < 0 || fieldIndex >= columns.length) {
return Optional.empty();
}
return Optional.of(columns[fieldIndex].name());
}
/**
* Returns all field nullables as an array.
*/
public boolean[] getFieldNullables() {
boolean[] nulls = new boolean[columns.length];
for (int i = 0; i < columns.length; i++) {
nulls[i] = columns[i].isNullable();
}
return nulls;
}
/**
* @deprecated Use {@link TableSchema#getFieldTypes()} instead. Can be dropped after 1.7.
*/
@Deprecated
public InternalType[] getTypes() {
return Arrays.stream(columns).map(Column::internalType).toArray(InternalType[]::new);
}
/**
* @deprecated Use {@link TableSchema#getFieldNames()} instead. Can be dropped after 1.7.
*/
@Deprecated
public String[] getColumnNames() {
return Arrays.stream(columns).map(Column::name).toArray(String[]::new);
}
/**
* @deprecated Use {@link TableSchema#getFieldNullables()} instead. Can be dropped after 1.7.
*/
@Deprecated
public boolean[] getNullables() {
return getFieldNullables();
}
public ComputedColumn[] getComputedColumns() {
return computedColumns;
}
public Watermark[] getWatermarks() {
return watermarks;
}
/**
* Returns the specified column for the given field index.
*
* @param fieldIndex the index of the field
*/
public Column getColumn(int fieldIndex) {
Preconditions.checkArgument(fieldIndex >= 0 && fieldIndex < columns.length);
return columns[fieldIndex];
}
/**
* Returns the map for column name to field index.
*/
public Map<String, Integer> columnNameToIndex() {
return columnNameToColumnIndex;
}
/**
* @deprecated Use {@link TableSchema#getFieldType(int)} instead. Can be dropped after 1.7.
*/
@Deprecated
public InternalType getType(int fieldIndex) {
Preconditions.checkArgument(fieldIndex >= 0 && fieldIndex < columns.length);
return columns[fieldIndex].internalType();
}
/**
* @deprecated Use {@link TableSchema#getFieldType(String)} instead. Can be dropped after 1.7.
*/
@Deprecated
public Optional<InternalType> getType(String fieldName) {
if (columnNameToColumnIndex.containsKey(fieldName)) {
return Optional.of(columns[columnNameToColumnIndex.get(fieldName)].internalType());
}
return Optional.empty();
}
/**
* @deprecated Use {@link TableSchema#getFieldName(int)} instead. Can be dropped after 1.7.
*/
@Deprecated
public String getColumnName(int fieldIndex) {
Preconditions.checkArgument(fieldIndex >= 0 && fieldIndex < columns.length);
return columns[fieldIndex].name();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("root\n");
for (int i = 0; i < columns.length; i++) {
sb.append(" |-- name: ").append(columns[i].name()).append("\n");
sb.append(" |-- type: ").append(columns[i].internalType()).append("\n");
sb.append(" |-- isNullable: ").append(columns[i].isNullable()).append("\n");
}
if (primaryKeys.length > 0) {
sb.append("primary keys\n");
sb.append(" |-- ").append(String.join(", ", primaryKeys)).append("\n");
}
if (uniqueKeys.length > 0) {
sb.append("unique keys\n");
for (int i = 0; i < uniqueKeys.length; i++) {
sb.append(" |-- ").append(String.join(", ", uniqueKeys[i])).append("\n");
}
}
if (indexes.length > 0) {
sb.append("indexes\n");
for (int i = 0; i < indexes.length; i++) {
sb.append(" |-- ").append(String.join(", ", indexes[i])).append("\n");
}
}
return sb.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TableSchema schema = (TableSchema) o;
return Arrays.equals(columns, schema.columns) &&
Arrays.equals(primaryKeys, schema.primaryKeys) &&
Arrays.equals(uniqueKeys, schema.uniqueKeys) &&
Arrays.equals(indexes, schema.indexes);
}
@Override
public int hashCode() {
int result = Arrays.hashCode(columns);
result = 31 * result + Arrays.hashCode(primaryKeys);
result = 31 * result + Arrays.hashCode(uniqueKeys);
result = 31 * result + Arrays.hashCode(indexes);
return result;
}
public static Builder builder() {
return new Builder();
}
// --------------------------------------------------------------------------------------------
/**
* Builder for creating a {@link TableSchema}.
*/
public static class Builder {
private final List<Column> columns;
private final List<String> primaryKey;
private final List<List<String>> uniqueKeys;
private final List<List<String>> indexes;
private final List<ComputedColumn> computedColumns;
private final List<Watermark> watermarks;
public Builder() {
columns = new ArrayList<>();
primaryKey = new ArrayList<>();
uniqueKeys = new ArrayList<>();
indexes = new ArrayList<>();
computedColumns = new ArrayList<>();
watermarks = new ArrayList<>();
}
/**
* Add a field with name and type. The call order of this method determines the order
* of fields in the schema.
*/
@Deprecated
public Builder field(String name, InternalType type) {
columns.add(new Column(name, type));
return this;
}
@Deprecated
public Builder field(String name, InternalType type, boolean nullable) {
columns.add(new Column(name, type, nullable));
return this;
}
public Builder column(String name, InternalType type) {
columns.add(new Column(name, type));
return this;
}
public Builder column(String name, InternalType type, boolean nullable) {
columns.add(new Column(name, type, nullable));
return this;
}
public Builder primaryKey(String... fields) {
Preconditions.checkArgument(
primaryKey.isEmpty(),
"A primary key " + primaryKey +
" have been defined, can not define another primary key " +
Arrays.toString(fields));
primaryKey.addAll(Arrays.asList(fields));
// add index for primary key
indexes.add(Arrays.asList(fields));
return this;
}
public Builder uniqueKey(String... fields) {
uniqueKeys.add(Arrays.asList(fields));
// add index for unique key
indexes.add(Arrays.asList(fields));
return this;
}
/**
* Declare the given fields is a "normal" index which is not an unique index or
* clustered index or other type indexes.
* @param fields the column fields to be the normal index
*/
public Builder normalIndex(String... fields) {
indexes.add(Arrays.asList(fields));
return this;
}
/**
* Declare the given fields is an unique index which the fields are unique in the table.
* @param fields the column fields to be the unique index
*/
public Builder uniqueIndex(String... fields) {
// unique index is the same as unique key
return uniqueKey(fields);
}
public Builder computedColumn(String name, String expression) {
computedColumns.add(new ComputedColumn(name, expression));
return this;
}
public Builder watermark(String name, String eventTime, long offset) {
watermarks.add(new Watermark(name, eventTime, offset));
return this;
}
/**
* Returns a {@link TableSchema} instance.
*/
public TableSchema build() {
return new TableSchema(
columns.toArray(new Column[0]),
primaryKey.toArray(new String[0]),
// List<List<String>> -> String[][]
uniqueKeys
.stream()
.map(u -> u.toArray(new String[0])) // mapping each List to an array
.collect(Collectors.toList()) // collecting as a List<String[]>
.toArray(new String[uniqueKeys.size()][]),
// List<List<String>> -> String[][]
indexes
.stream()
.map(u -> u.toArray(new String[0])) // mapping each List to an array
.collect(Collectors.toList()) // collecting as a List<String[]>
.toArray(new String[indexes.size()][]),
computedColumns.toArray(new ComputedColumn[0]),
watermarks.toArray(new Watermark[0]));
}
}
}
// TODO: add tests