blob: 7aa314f63a7ce7d8c4476e188d76de0b8b5a5c44 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.catalog.doris;
import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.connection.JdbcConnectionProvider;
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
import org.apache.doris.flink.exception.CreateTableException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.DorisSystemException;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Doris System Operate
*/
@Public
public class DorisSystem {
private static final Logger LOG = LoggerFactory.getLogger(DorisSystem.class);
private final JdbcConnectionProvider jdbcConnectionProvider;
private static final List<String> builtinDatabases = Collections.singletonList("information_schema");
public DorisSystem(DorisConnectionOptions options) {
this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(options);
}
public List<String> listDatabases() {
return extractColumnValuesBySQL(
"SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;",
1,
dbName -> !builtinDatabases.contains(dbName));
}
public boolean databaseExists(String database) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(database));
return listDatabases().contains(database);
}
public boolean createDatabase(String database) {
execute(String.format("CREATE DATABASE IF NOT EXISTS %s", database));
return true;
}
public boolean dropDatabase(String database) {
execute(String.format("DROP DATABASE IF EXISTS %s", database));
return true;
}
public boolean tableExists(String database, String table){
return databaseExists(database)
&& listTables(database).contains(table);
}
public List<String> listTables(String databaseName) {
if (!databaseExists(databaseName)) {
throw new DorisRuntimeException("database" + databaseName + " is not exists");
}
return extractColumnValuesBySQL(
"SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
1,
null,
databaseName);
}
public void dropTable(String tableName) {
execute(String.format("DROP TABLE IF EXISTS %s", tableName));
}
public void createTable(TableSchema schema) {
String ddl = buildCreateTableDDL(schema);
LOG.info("Create table with ddl:{}", ddl);
execute(ddl);
}
public void execute(String sql) {
try (Statement statement = jdbcConnectionProvider.getOrEstablishConnection().createStatement()) {
statement.execute(sql);
} catch (Exception e){
throw new DorisSystemException(String.format("SQL query could not be executed: %s", sql), e);
}
}
public List<String> extractColumnValuesBySQL(
String sql,
int columnIndex,
Predicate<String> filterFunc,
Object... params) {
List<String> columnValues = Lists.newArrayList();
try (PreparedStatement ps = jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
return columnValues;
} catch (Exception e) {
throw new DorisSystemException(
String.format(
"The following SQL query could not be executed: %s", sql),
e);
}
}
public String buildCreateTableDDL(TableSchema schema) {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ");
sb.append(identifier(schema.getDatabase()))
.append(".")
.append(identifier(schema.getTable()))
.append("(");
Map<String, FieldSchema> fields = schema.getFields();
List<String> keys = schema.getKeys();
//append keys
for(String key : keys){
if(!fields.containsKey(key)){
throw new CreateTableException("key " + key + " not found in column list");
}
FieldSchema field = fields.get(key);
buildColumn(sb, field, true);
}
//append values
for (Map.Entry<String, FieldSchema> entry : fields.entrySet()) {
if(keys.contains(entry.getKey())){
continue;
}
FieldSchema field = entry.getValue();
buildColumn(sb, field, false);
}
sb = sb.deleteCharAt(sb.length() -1);
sb.append(" ) ");
//append uniq model
if(DataModel.UNIQUE.equals(schema.getModel())){
sb.append(schema.getModel().name())
.append(" KEY(")
.append(String.join(",", identifier(schema.getKeys())))
.append(")");
}
//append table comment
if(!StringUtils.isNullOrWhitespaceOnly(schema.getTableComment())){
sb.append(" COMMENT '")
.append(quoteComment(schema.getTableComment()))
.append("' ");
}
//append distribute key
sb.append(" DISTRIBUTED BY HASH(")
.append(String.join(",", identifier(schema.getDistributeKeys())))
.append(") BUCKETS AUTO ");
//append properties
int index = 0;
for (Map.Entry<String, String> entry : schema.getProperties().entrySet()) {
if (index == 0) {
sb.append(" PROPERTIES (");
}
if (index > 0) {
sb.append(",");
}
sb.append(quoteProperties(entry.getKey()))
.append("=")
.append(quoteProperties(entry.getValue()));
index++;
if (index == schema.getProperties().size()) {
sb.append(")");
}
}
return sb.toString();
}
private void buildColumn(StringBuilder sql, FieldSchema field, boolean isKey){
String fieldType = field.getTypeString();
if(isKey && DorisType.STRING.equals(fieldType)){
fieldType = String.format("%s(%s)", DorisType.VARCHAR, 65533);
}
sql.append(identifier(field.getName()))
.append(" ")
.append(fieldType)
.append(" COMMENT '")
.append(quoteComment(field.getComment()))
.append("',");
}
private String quoteComment(String comment){
if(comment == null){
return "";
} else {
return comment.replaceAll("'","\\\\'");
}
}
private List<String> identifier(List<String> name) {
List<String> result = name.stream().map(m -> identifier(m)).collect(Collectors.toList());
return result;
}
private String identifier(String name) {
return "`" + name + "`";
}
private String quoteProperties(String name) {
return "'" + name + "'";
}
}