blob: 9098a2109d2917df33de4e3cb7b632252476e21a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.api;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.types.DataType;
import org.apache.flink.table.api.types.DataTypes;
import org.apache.flink.table.api.types.InternalType;
import org.apache.flink.table.api.types.RowType;
import org.apache.flink.table.api.types.TypeConverters;
import org.apache.flink.table.sources.IndexKey;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* A RichTableSchema represents a Table's structure.
*/
public class RichTableSchema implements Serializable {
private final String[] columnNames;
private final InternalType[] columnTypes;
private final boolean[] nullables;
// TODO should introduce table constraints later.
private final List<String> primaryKeys = new ArrayList<>();
private final List<List<String>> uniqueKeys = new ArrayList<>();
// index info, may include unique index or non-unique index or both.
private final List<Index> indexes = new ArrayList<>();
private final List<String> headerFields = new ArrayList<>();
/**
* Creates a new RichTableSchema with column names and column types. All the columns
* exist in the origin table, no computed column exists.
*
* @param columnNames column names
* @param columnTypes column names
*/
public RichTableSchema(String[] columnNames, InternalType[] columnTypes) {
boolean[] nullables = new boolean[columnNames.length];
for (int i = 0; i < nullables.length; i++) {
nullables[i] = true;
}
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.nullables = nullables;
}
public RichTableSchema(String[] columnNames, InternalType[] columnTypes, boolean[] nullables) {
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.nullables = nullables;
}
/**
* Returns the header field names which is in the properties of source record.
*/
public List<String> getHeaderFields() {
return headerFields;
}
/**
* Set the header fields which is in the properties of source record.
*
* @param fields header field names
*/
public void setHeaderFields(List<String> fields) {
checkArgument(headerFields.size() == 0, "header fields has been set");
Set<String> columnNames = new HashSet<>();
columnNames.addAll(Arrays.asList(getColumnNames()));
for (String field : fields) {
if (!columnNames.contains(field)) {
throw new IllegalArgumentException("The HEADER column field '" + field +
"' is not in the table schema");
}
headerFields.add(field);
}
}
/**
* Set the primary keys Maybe one or more keys are combined to be a primary key.
*
* @param keys one or more combined primary key
*/
public void setPrimaryKey(String... keys) {
checkArgument(primaryKeys.size() == 0, "primary key has been set");
Set<String> columnNames = new HashSet<>();
columnNames.addAll(Arrays.asList(getColumnNames()));
for (String key : keys) {
if (!columnNames.contains(key)) {
throw new IllegalArgumentException("The primary key '" + key + "' is not in the table schema");
}
primaryKeys.add(key);
}
}
/**
* Set the unique keys Maybe one or more keys are combined to be a unique key.
*/
public void setUniqueKeys(List<List<String>> uniqueKeys) {
checkArgument(this.uniqueKeys.size() == 0, "unique key has been set");
Set<String> columnNames = new HashSet<>();
columnNames.addAll(Arrays.asList(getColumnNames()));
for (List<String> uk : uniqueKeys) {
for (String key : uk) {
if (!columnNames.contains(key)) {
throw new IllegalArgumentException("The unique key '" + key + "' is not in the table schema");
}
}
this.uniqueKeys.add(uk);
}
}
/**
* Set the indexes.
*/
public void setIndexes(List<Index> keys) {
checkArgument(indexes.size() == 0, "indexes has been set");
Set<String> columnNames = new HashSet<>();
columnNames.addAll(Arrays.asList(getColumnNames()));
for (Index index : keys) {
addIndex(index, columnNames);
}
}
/**
* Set single index.
*/
public void addSingleIndex(Index index) {
Set<String> columnNames = new HashSet<>();
columnNames.addAll(Arrays.asList(getColumnNames()));
addIndex(index, columnNames);
}
protected void addIndex(Index index, Set<String> columnNames) {
for (String key : index.keyList) {
if (!columnNames.contains(key)) {
throw new IllegalArgumentException("The index key '" + key + "' is not in the table schema");
}
}
indexes.add(index);
}
/**
* Returns the primary keys Maybe one or more keys are combined to be a primary key.
*/
public List<String> getPrimaryKeys() {
return primaryKeys;
}
public List<List<String>> getUniqueKeys() {
return uniqueKeys;
}
/**
* Returns original indexes defined in the table.
*/
public List<Index> getIndexes() {
return indexes;
}
/**
* Returns all deduced indexes defined in the table, include primary key/ unique key/ indexes.
*/
public List<Index> deduceAllIndexes() {
List<Index> indexList = new ArrayList<>();
if (null != primaryKeys && primaryKeys.size() > 0) {
indexList.add(new UniqueIndex(primaryKeys));
}
for (List<String> uk: uniqueKeys) {
indexList.add(new UniqueIndex((uk)));
}
for (Index idx : indexes) {
indexList.add(idx);
}
return indexList;
}
/**
* Utility method to convert sql Index to table api IndexKey(s).
*/
public List<IndexKey> toIndexKeys() {
List<Index> indexes = deduceAllIndexes();
List<IndexKey> indexKeys = new ArrayList<>();
for (Index index : indexes) {
indexKeys.add(index.toIndexKey(this));
}
return indexKeys;
}
/**
* Returns the final result type info of this table.
* This type info including the computed columns (if exist) and exclude proctime field.
*/
public RowType getResultType() {
return new RowType(getColumnTypes(), getColumnNames());
}
/**
* Returns the final result type info of this table.
* This type info including the computed columns (if exist) and exclude proctime field.
*/
public DataType getResultRowType() {
return DataTypes.createRowType(getColumnTypes(), getColumnNames());
}
public RowTypeInfo getResultTypeInfo() {
return (RowTypeInfo) TypeConverters.createExternalTypeInfoFromDataType(getResultRowType());
}
public String[] getColumnNames() {
return columnNames;
}
public InternalType[] getColumnTypes() {
return columnTypes;
}
public boolean[] getNullables() {
return nullables;
}
/**
* Describe the structure of an UniqueIndex.
*/
public static class UniqueIndex extends Index {
public UniqueIndex(List<String> keyList) {
super(true, keyList);
}
}
/**
* Describe the structure of an Index.
*/
public static class Index {
public final boolean unique;
public final List<String> keyList;
public Index(boolean unique, List<String> keyList) {
assert keyList != null;
this.unique = unique;
this.keyList = keyList;
}
// Convert this Index to internal IndexKey.
public IndexKey toIndexKey(RichTableSchema schema) {
assert null != schema;
String[] fieldNames = schema.getColumnNames();
int[] indexes = new int[keyList.size()];
for (int i = 0; i < keyList.size(); i++) {
int idx = findIndex(keyList.get(i), fieldNames);
indexes[i] = idx;
}
return IndexKey.of(unique, indexes);
}
int findIndex(String column, String[] fieldNames) {
for (int j = 0; j < fieldNames.length; j++) {
if (fieldNames[j].equals(column)) {
return j;
}
}
return -1;
}
}
}