blob: c1380c7db16742bac63555ed4b193ebbdae82195 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.sink.writer;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.util.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class SchemaChangeHelper {
private static final List<String> dropFieldSchemas = Lists.newArrayList();
private static final List<FieldSchema> addFieldSchemas = Lists.newArrayList();
// Used to determine whether the doris table supports ddl
private static final List<DDLSchema> ddlSchemas = Lists.newArrayList();
private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s";
private static final String RENAME_DDL = "ALTER TABLE %s RENAME COLUMN %s %s";
public static void compareSchema(Map<String, FieldSchema> updateFiledSchemaMap,
Map<String, FieldSchema> originFieldSchemaMap) {
dropFieldSchemas.clear();
addFieldSchemas.clear();
for (Entry<String, FieldSchema> updateFieldSchema : updateFiledSchemaMap.entrySet()) {
String columName = updateFieldSchema.getKey();
if (!originFieldSchemaMap.containsKey(columName)) {
addFieldSchemas.add(updateFieldSchema.getValue());
originFieldSchemaMap.put(columName, updateFieldSchema.getValue());
}
}
for (Entry<String, FieldSchema> originFieldSchema : originFieldSchemaMap.entrySet()) {
String columName = originFieldSchema.getKey();
if (!updateFiledSchemaMap.containsKey(columName)) {
dropFieldSchemas.add(columName);
}
}
if (CollectionUtils.isNotEmpty(dropFieldSchemas)) {
dropFieldSchemas.forEach(originFieldSchemaMap::remove);
}
}
public static List<String> generateRenameDDLSql(String table, String oldColumnName, String newColumnName,
Map<String, FieldSchema> originFieldSchemaMap) {
ddlSchemas.clear();
List<String> ddlList = Lists.newArrayList();
FieldSchema fieldSchema = null;
for (Entry<String, FieldSchema> originFieldSchema : originFieldSchemaMap.entrySet()) {
if (originFieldSchema.getKey().equals(oldColumnName)) {
fieldSchema = originFieldSchema.getValue();
String renameSQL = String.format(RENAME_DDL, table, oldColumnName, newColumnName);
ddlList.add(renameSQL);
ddlSchemas.add(new DDLSchema(oldColumnName, false));
}
}
originFieldSchemaMap.remove(oldColumnName);
originFieldSchemaMap.put(newColumnName, fieldSchema);
return ddlList;
}
public static List<String> generateDDLSql(String table) {
ddlSchemas.clear();
List<String> ddlList = Lists.newArrayList();
for (FieldSchema fieldSchema : addFieldSchemas) {
String name = fieldSchema.getName();
String type = fieldSchema.getTypeString();
String defaultValue = fieldSchema.getDefaultValue();
String comment = fieldSchema.getComment();
String addDDL = String.format(ADD_DDL, table, name, type);
if (!StringUtils.isNullOrWhitespaceOnly(defaultValue)) {
addDDL = addDDL + " DEFAULT " + defaultValue;
}
if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
addDDL = addDDL + " COMMENT " + comment;
}
ddlList.add(addDDL);
ddlSchemas.add(new DDLSchema(name, false));
}
for (String columName : dropFieldSchemas) {
String dropDDL = String.format(DROP_DDL, table, columName);
ddlList.add(dropDDL);
ddlSchemas.add(new DDLSchema(columName, true));
}
dropFieldSchemas.clear();
addFieldSchemas.clear();
return ddlList;
}
public static List<DDLSchema> getDdlSchemas() {
return ddlSchemas;
}
public static class DDLSchema {
private final String columnName;
private final boolean isDropColumn;
public DDLSchema(String columnName, boolean isDropColumn) {
this.columnName = columnName;
this.isDropColumn = isDropColumn;
}
public String getColumnName() {
return columnName;
}
public boolean isDropColumn() {
return isDropColumn;
}
}
}