blob: 3a3aa082c6b19f20a7cec7254dbdec06544612b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRecord;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@SuppressWarnings("magicnumber")
public class ClickhouseProxy {
private final ClickHouseRequest<?> clickhouseRequest;
private final ClickHouseClient client;
private final Map<Shard, ClickHouseClient> shardToDataSource = new ConcurrentHashMap<>(16);
public ClickhouseProxy(ClickHouseNode node) {
this.client = ClickHouseClient.newInstance(node.getProtocol());
this.clickhouseRequest =
client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
}
public ClickHouseRequest<?> getClickhouseConnection() {
return this.clickhouseRequest;
}
public ClickHouseRequest<?> getClickhouseConnection(Shard shard) {
ClickHouseClient c = shardToDataSource
.computeIfAbsent(shard, s -> ClickHouseClient.newInstance(s.getNode().getProtocol()));
return c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
}
public DistributedEngine getClickhouseDistributedTable(String database, String table) {
ClickHouseRequest<?> request = getClickhouseConnection();
return getClickhouseDistributedTable(request, database, table);
}
public DistributedEngine getClickhouseDistributedTable(ClickHouseRequest<?> connection, String database,
String table) {
String sql = String.format("select engine_full from system.tables where database = '%s' and name = '%s' and engine = 'Distributed'", database, table);
try (ClickHouseResponse response = connection.query(sql).executeAndWait()) {
List<ClickHouseRecord> records = response.stream().collect(Collectors.toList());
if (!records.isEmpty()) {
ClickHouseRecord record = records.get(0);
// engineFull field will be like : Distributed(cluster, database, table[, sharding_key[, policy_name]])
String engineFull = record.getValue(0).asString();
List<String> infos = Arrays.stream(engineFull.substring(12).split(","))
.map(s -> s.replace("'", "").trim()).collect(Collectors.toList());
return new DistributedEngine(infos.get(0), infos.get(1), infos.get(2).replace("\\)", "").trim());
}
throw new RuntimeException("Cannot get distributed table from clickhouse, resultSet is empty");
} catch (ClickHouseException e) {
throw new RuntimeException("Cannot get distributed table from clickhouse", e);
}
}
/**
* Get ClickHouse table schema, the key is fileName, value is value type.
*
* @param table table name.
* @return schema map.
*/
public Map<String, String> getClickhouseTableSchema(String table) {
ClickHouseRequest<?> request = getClickhouseConnection();
return getClickhouseTableSchema(request, table);
}
public Map<String, String> getClickhouseTableSchema(ClickHouseRequest<?> request, String table) {
String sql = "desc " + table;
Map<String, String> schema = new LinkedHashMap<>();
try (ClickHouseResponse response = request.query(sql).executeAndWait()) {
response.records().forEach(r -> schema.put(r.getValue(0).asString(), r.getValue(1).asString()));
} catch (ClickHouseException e) {
throw new RuntimeException("Cannot get table schema from clickhouse", e);
}
return schema;
}
/**
* Get the shard of the given cluster.
*
* @param connection clickhouse connection.
* @param clusterName cluster name.
* @param database database of the shard.
* @param port port of the shard.
* @return shard list.
*/
public List<Shard> getClusterShardList(ClickHouseRequest<?> connection, String clusterName,
String database, int port, String username, String password) {
String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "'";
List<Shard> shardList = new ArrayList<>();
try (ClickHouseResponse response = connection.query(sql).executeAndWait()) {
response.records().forEach(r -> {
shardList.add(new Shard(
r.getValue(0).asInteger(),
r.getValue(1).asInteger(),
r.getValue(2).asInteger(),
r.getValue(3).asString(),
r.getValue(4).asString(),
port, database, username, password));
});
return shardList;
} catch (ClickHouseException e) {
throw new RuntimeException("Cannot get cluster shard list from clickhouse", e);
}
}
/**
* Get ClickHouse table info.
*
* @param database database of the table.
* @param table table name of the table.
* @return clickhouse table info.
*/
public ClickhouseTable getClickhouseTable(String database, String table) {
String sql = String.format("select engine,create_table_query,engine_full,data_paths from system.tables where database = '%s' and name = '%s'", database, table);
try (ClickHouseResponse response = clickhouseRequest.query(sql).executeAndWait()) {
List<ClickHouseRecord> records = response.stream().collect(Collectors.toList());
if (records.isEmpty()) {
throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
}
ClickHouseRecord record = records.get(0);
String engine = record.getValue(0).asString();
String createTableDDL = record.getValue(1).asString();
String engineFull = record.getValue(2).asString();
List<String> dataPaths = record.getValue(3).asTuple().stream().map(Object::toString).collect(Collectors.toList());
DistributedEngine distributedEngine = null;
if ("Distributed".equals(engine)) {
distributedEngine = getClickhouseDistributedTable(clickhouseRequest, database, table);
String localTableSQL = String.format("select engine,create_table_query from system.tables where database = '%s' and name = '%s'",
distributedEngine.getDatabase(), distributedEngine.getTable());
try (ClickHouseResponse rs = clickhouseRequest.query(localTableSQL).executeAndWait()) {
List<ClickHouseRecord> localTableRecords = rs.stream().collect(Collectors.toList());
if (localTableRecords.isEmpty()) {
throw new RuntimeException("Cannot get table from clickhouse, resultSet is empty");
}
String localEngine = localTableRecords.get(0).getValue(0).asString();
String createLocalTableDDL = localTableRecords.get(0).getValue(1).asString();
createTableDDL = localizationEngine(localEngine, createLocalTableDDL);
}
}
return new ClickhouseTable(
database,
table,
distributedEngine,
engine,
createTableDDL,
engineFull,
dataPaths,
getClickhouseTableSchema(clickhouseRequest, table));
} catch (ClickHouseException e) {
throw new RuntimeException("Cannot get clickhouse table", e);
}
}
/**
* Localization the engine in clickhouse local table's createTableDDL to support specific engine.
* For example: change ReplicatedMergeTree to MergeTree.
*
* @param engine original engine of clickhouse local table
* @param ddl createTableDDL of clickhouse local table
* @return createTableDDL of clickhouse local table which can support specific engine
* TODO: support more engine
*/
public String localizationEngine(String engine, String ddl) {
if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()");
} else {
return ddl;
}
}
public void close() {
if (this.client != null) {
this.client.close();
}
shardToDataSource.values().forEach(ClickHouseClient::close);
}
}