blob: 3ff1fcbb0cf0747a691dabf57135cc06bdc75f30 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.flink;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.utils.EncodingUtils;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* SHOW CREATE statement Util.
*
* <p>This code is mostly copied from {@link org.apache.flink.table.api.internal.ShowCreateUtil}.
*/
public class ShowCreateUtil {
private ShowCreateUtil() {}
public static String buildShowCreateTable(
ResolvedCatalogBaseTable<?> table,
ObjectIdentifier tableIdentifier,
boolean ignoreIfExists) {
if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
throw new TableException(
String.format(
"SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.",
tableIdentifier.asSerializableString()));
}
final String printIndent = " ";
StringBuilder sb =
new StringBuilder()
.append(buildCreateFormattedPrefix(ignoreIfExists, tableIdentifier));
sb.append(extractFormattedColumns(table, printIndent));
extractFormattedWatermarkSpecs(table, printIndent)
.ifPresent(watermarkSpecs -> sb.append(",\n").append(watermarkSpecs));
extractFormattedPrimaryKey(table, printIndent).ifPresent(pk -> sb.append(",\n").append(pk));
sb.append("\n) ");
extractFormattedComment(table)
.ifPresent(
c -> sb.append(String.format("COMMENT '%s'%s", c, System.lineSeparator())));
extractFormattedPartitionedInfo((ResolvedCatalogTable) table)
.ifPresent(
partitionedInfoFormatted ->
sb.append("PARTITIONED BY (")
.append(partitionedInfoFormatted)
.append(")\n"));
extractFormattedOptions(table, printIndent)
.ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
return sb.toString();
}
private static String extractFormattedColumns(
ResolvedCatalogBaseTable<?> table, String printIndent) {
return table.getResolvedSchema().getColumns().stream()
.map(column -> String.format("%s%s", printIndent, getColumnString(column)))
.collect(Collectors.joining(",\n"));
}
private static Optional<String> extractFormattedWatermarkSpecs(
ResolvedCatalogBaseTable<?> table, String printIndent) {
if (table.getResolvedSchema().getWatermarkSpecs().isEmpty()) {
return Optional.empty();
}
return Optional.of(
table.getResolvedSchema().getWatermarkSpecs().stream()
.map(
watermarkSpec ->
String.format(
"%sWATERMARK FOR %s AS %s",
printIndent,
EncodingUtils.escapeIdentifier(
watermarkSpec.getRowtimeAttribute()),
watermarkSpec
.getWatermarkExpression()
.asSerializableString()))
.collect(Collectors.joining("\n")));
}
private static Optional<String> extractFormattedComment(ResolvedCatalogBaseTable<?> table) {
String comment = table.getComment();
if (StringUtils.isNotEmpty(comment)) {
return Optional.of(EncodingUtils.escapeSingleQuotes(comment));
}
return Optional.empty();
}
private static Optional<String> extractFormattedPartitionedInfo(
ResolvedCatalogTable catalogTable) {
if (!catalogTable.isPartitioned()) {
return Optional.empty();
}
return Optional.of(
catalogTable.getPartitionKeys().stream()
.map(EncodingUtils::escapeIdentifier)
.collect(Collectors.joining(", ")));
}
private static Optional<String> extractFormattedOptions(
ResolvedCatalogBaseTable<?> table, String printIndent) {
if (Objects.isNull(table.getOptions()) || table.getOptions().isEmpty()) {
return Optional.empty();
}
return Optional.of(
table.getOptions().entrySet().stream()
.map(
entry ->
String.format(
"%s'%s' = '%s'",
printIndent,
EncodingUtils.escapeSingleQuotes(entry.getKey()),
EncodingUtils.escapeSingleQuotes(entry.getValue())))
.collect(Collectors.joining(",\n")));
}
private static String buildCreateFormattedPrefix(
boolean ignoreIfExists, ObjectIdentifier identifier) {
return String.format(
"CREATE TABLE%s %s (%s",
ignoreIfExists ? " IF NOT EXISTS " : "",
identifier.asSerializableString(),
System.lineSeparator());
}
private static Optional<String> extractFormattedPrimaryKey(
ResolvedCatalogBaseTable<?> table, String printIndent) {
Optional<UniqueConstraint> primaryKey = table.getResolvedSchema().getPrimaryKey();
return primaryKey.map(
uniqueConstraint -> String.format("%s%s", printIndent, uniqueConstraint));
}
private static String getColumnString(Column column) {
final StringBuilder sb = new StringBuilder();
sb.append(EncodingUtils.escapeIdentifier(column.getName()));
sb.append(" ");
// skip data type for computed column
if (column instanceof Column.ComputedColumn) {
sb.append(
column.explainExtras()
.orElseThrow(
() ->
new TableException(
String.format(
"Column expression can not be null for computed column '%s'",
column.getName()))));
} else {
sb.append(column.getDataType().getLogicalType().asSerializableString());
column.explainExtras()
.ifPresent(
e -> {
sb.append(" ");
sb.append(e);
});
}
// TODO: Print the column comment until FLINK-18958 is fixed
return sb.toString();
}
}