blob: 3608e95d3eda75376542a0ff668a214f32bc4699 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.catalog.doris;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.connection.JdbcConnectionProvider;
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.DorisSystemException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
/** Doris System Operate. */
@Public
public class DorisSystem implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DorisSystem.class);
private static final String TABLE_BUCKETS = "table-buckets";
private final JdbcConnectionProvider jdbcConnectionProvider;
private static final List<String> builtinDatabases =
Collections.singletonList("information_schema");
public DorisSystem(DorisConnectionOptions options) {
this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
}
public List<String> listDatabases() {
return extractColumnValuesBySQL(
"SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
1,
dbName -> !builtinDatabases.contains(dbName));
}
public boolean databaseExists(String database) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
return listDatabases().contains(database);
}
public boolean createDatabase(String database) {
execute(String.format("CREATE DATABASE IF NOT EXISTS %s", database));
return true;
}
public boolean dropDatabase(String database) {
execute(String.format("DROP DATABASE IF EXISTS %s", database));
return true;
}
public boolean tableExists(String database, String table) {
return databaseExists(database) && listTables(database).contains(table);
}
public boolean columnExists(String database, String table, String columnName) {
if (tableExists(database, table)) {
List<String> columns =
extractColumnValuesBySQL(
"SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?",
1,
null,
database,
table,
columnName);
if (columns != null && !columns.isEmpty()) {
return true;
}
}
return false;
}
public List<String> listTables(String databaseName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
}
return extractColumnValuesBySQL(
"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
1,
null,
databaseName);
}
public void dropTable(String tableName) {
execute(String.format("DROP TABLE IF EXISTS %s", tableName));
}
public void createTable(TableSchema schema) {
String ddl = buildCreateTableDDL(schema);
LOG.info("Create table with ddl:{}", ddl);
execute(ddl);
}
public void execute(String sql) {
try (Statement statement =
jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
statement.execute(sql);
} catch (Exception e) {
throw new DorisSystemException(
String.format("SQL query could not be executed: %s", sql), e);
}
}
public List<String> extractColumnValuesBySQL(
String sql, int columnIndex, Predicate<String> filterFunc, Object... params) {
List<String> columnValues = Lists.newArrayList();
try (PreparedStatement ps =
jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
return columnValues;
} catch (Exception e) {
throw new DorisSystemException(
String.format("The following SQL query could not be executed: %s", sql), e);
}
}
public static String buildCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");
Map<String, FieldSchema> fields = schema.getFields();
List<String> keys = schema.getKeys();
// append keys
for (String key : keys) {
if (!fields.containsKey(key)) {
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
buildColumn(sb, field, true);
}
// append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
if (keys.contains(entry.getKey())) {
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);
}
sb = sb.deleteCharAt(sb.length() - 1);
sb.append(" ) ");
// append uniq model
if (DataModel.UNIQUE.equals(schema.getModel())) {
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())))
.append(")");
}
// append table comment
if (!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())) {
sb.append(" COMMENT '").append(quoteComment(schema.getTableComment())).append("' ");
}
// append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",", identifier(schema.getDistributeKeys())))
.append(")");
Map<String, String> properties = schema.getProperties();
if (schema.getTableBuckets() != null) {
int bucketsNum = schema.getTableBuckets();
if (bucketsNum <= 0) {
throw new CreateTableException("The number of buckets must be positive.");
}
sb.append(" BUCKETS ").append(bucketsNum);
} else {
sb.append(" BUCKETS AUTO ");
}
// append properties
int index = 0;
int skipProNum = 0;
for (Map.Entry<String, String> entry : properties.entrySet()) {
// skip table-buckets
if (entry.getKey().equals(TABLE_BUCKETS)) {
skipProNum++;
continue;
}
if (index == 0) {
sb.append(" PROPERTIES (");
}
if (index > 0) {
sb.append(",");
}
sb.append(quoteProperties(entry.getKey()))
.append("=")
.append(quoteProperties(entry.getValue()));
index++;
if (index == (schema.getProperties().size() - skipProNum)) {
sb.append(")");
}
}
return sb.toString();
}
public Map<String, String> getTableFieldNames(String databaseName, String tableName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
}
String sql =
String.format(
"SELECT COLUMN_NAME,DATA_TYPE "
+ "FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'",
databaseName, tableName);
Map<String, String> columnValues = new HashMap<>();
try (PreparedStatement ps =
jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String filedName = rs.getString(1);
String datatype = rs.getString(2);
columnValues.put(filedName, datatype);
}
return columnValues;
} catch (Exception e) {
throw new DorisSystemException(
String.format("The following SQL query could not be executed: %s", sql), e);
}
}
private static void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey) {
String fieldType = field.getTypeString();
if (isKey && DorisType.STRING.equals(fieldType)) {
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
sql.append(identifier(field.getName())).append(" ").append(fieldType);
if (field.getDefaultValue() != null) {
sql.append(" DEFAULT " + quoteDefaultValue(field.getDefaultValue()));
}
sql.append(" COMMENT '").append(quoteComment(field.getComment())).append("',");
}
public static String quoteDefaultValue(String defaultValue) {
// DEFAULT current_timestamp not need quote
if (defaultValue.equalsIgnoreCase("current_timestamp")) {
return defaultValue;
}
return "'" + defaultValue + "'";
}
public static String quoteComment(String comment) {
if (comment == null) {
return "";
} else {
return comment.replaceAll("'", "\\\\'");
}
}
private static List<String> identifier(List<String> name) {
List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
return result;
}
public static String identifier(String name) {
return "`" + name + "`";
}
public static String quoteTableIdentifier(String tableIdentifier) {
String[] dbTable = tableIdentifier.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
return identifier(dbTable[0]) + "." + identifier(dbTable[1]);
}
private static String quoteProperties(String name) {
return "'" + name + "'";
}
}