blob: 637733fe99b513235b558ff94acc022d06e14979 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.spark.clickhouse.sink
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import net.jpountz.xxhash.XXHashFactory
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.seatunnel.spark.clickhouse.Config.{CLICKHOUSE_LOCAL_PATH, COPY_METHOD, DATABASE, FIELDS, HOST, NODE_ADDRESS, NODE_FREE_PASSWORD, NODE_PASS, PASSWORD, SHARDING_KEY, TABLE, TMP_BATCH_CACHE_LINE, USERNAME}
import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse._
import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX, LOGGER, UUID_LENGTH, getClickhouseTableInfo}
import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.{FileTransfer, RsyncFileTransfer, ScpFileTransfer}
import org.apache.seatunnel.spark.clickhouse.sink.filetransfer.TransferMethod.{RSYNC, SCP, TransferMethod, getCopyMethod}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.{Dataset, Encoders, Row}
import org.slf4j.LoggerFactory
import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl}
import java.io.File
import java.nio.channels.FileChannel
import java.nio.file.{Paths, StandardOpenOption}
import java.util
import java.util.concurrent.ThreadLocalRandom
import java.util.{Objects, Properties, UUID}
import scala.collection.JavaConversions.collectionAsScalaIterable
import scala.collection.{JavaConversions, mutable}
import scala.sys.process._
import scala.util.{Failure, Success, Try}
/**
* Clickhouse sink use clickhouse-local program. Details see feature
* <a href="https://github.com/apache/incubator-seatunnel/issues/1382">ST-1382</a> }
*/
class ClickhouseFile extends SparkBatchSink {
private val properties: Properties = new Properties()
private var clickhouseLocalPath: String = _
private var table: Table = _
private var fields: List[String] = _
private var nodeUser: Map[String, String] = _
private var nodePass: Map[String, String] = _
private val random = ThreadLocalRandom.current()
private var freePass: Boolean = false
private var copyFileMethod: TransferMethod = SCP
private var tmpBatchCacheLine = 100000
override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
if (!config.hasPath(FIELDS)) {
this.fields = data.schema.fieldNames.toList
}
val session = env.getSparkSession
import session.implicits._
val encoder = Encoders.tuple(
ExpressionEncoder[Shard],
RowEncoder(data.schema))
data.map(item => {
val hashInstance = XXHashFactory.fastestInstance().hash64()
val shard = getRowShard(distributedEngine.equals(this.table.engine), this.table.shards,
this.table.shardKey, this.table.shardKeyType, this.table.shardWeightCount, this.random,
hashInstance, item)
(shard, item)
})(encoder).groupByKey(si => si._1).mapGroups((shard, rows) => {
val paths = generateClickhouseFile(rows)
moveFileToServer(shard, paths)
attachClickhouseFile(shard, paths)
clearLocalFile(paths.head.substring(0, CLICKHOUSE_FILE_PREFIX.length + UUID_LENGTH + 1))
0
}).foreach(_ => {})
}
private def generateClickhouseFile(rows: Iterator[(Shard, Row)]): List[String] = {
def getValue(kv: util.Map.Entry[String, String]): String = {
if (this.fields.contains(kv.getKey)) {
kv.getKey
} else {
val v = getDefaultValue(kv.getValue)
if (v == null) {
"NULL"
} else if (v.isInstanceOf[Integer]) {
"0"
} else {
s"'${v.toString}'"
}
}
}
val uuid = UUID.randomUUID().toString.substring(0, UUID_LENGTH).replaceAll("-", "_")
val targetPath = java.lang.String.format("%s/%s", CLICKHOUSE_FILE_PREFIX, uuid)
val target = new File(targetPath)
target.mkdirs()
val tmpDataPath = targetPath + "/local_data.log"
mmapSaveDataSafely(tmpDataPath, rows.map(r => r._2))
val exec = mutable.ListBuffer[String]()
exec.appendAll(clickhouseLocalPath.trim.split(" "))
exec.append("-S")
exec.append(fields.map(f => s"$f ${this.table.tableSchema.get(f)}").mkString(","))
exec.append("-N")
exec.append("temp_table" + uuid)
exec.append("-q")
exec.append(java.lang.String.format("%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;", this.table.getCreateDDLNoDatabase
.replaceAll("`", ""), this.table.getLocalTableName,
this.table.tableSchema.entrySet.map(getValue).mkString(","), uuid))
exec.append("--path")
exec.append(targetPath)
val command = Process(Seq("less", tmpDataPath)) #| exec
LOGGER.info(command.lineStream.mkString("\n"))
new File(targetPath + "/data/_local/" + this.table.getLocalTableName).listFiles().filter(f => f.isDirectory).
filterNot(f => f.getName.equals("detached")).map(f => f.getAbsolutePath).toList
}
private def mmapSaveDataSafely(path: String, rows: Iterator[Row]): Unit = {
val outputChannel = FileChannel.open(Paths.get(path), StandardOpenOption.WRITE, StandardOpenOption.READ,
StandardOpenOption.CREATE_NEW)
val cache = mutable.ListBuffer[Row]()
while (rows.hasNext) {
cache.append(rows.next())
if (cache.length >= tmpBatchCacheLine) {
mmapSaveData(outputChannel, cache.toList)
cache.clear()
}
}
if (cache.nonEmpty) {
mmapSaveData(outputChannel, cache.toList)
}
outputChannel.close()
}
private def mmapSaveData(outputChannel: FileChannel, rows: List[Row]): Unit = {
val data = rows.map(r => {
this.fields.map(f => r.getAs[Object](f).toString).mkString("\t") + "\n"
}).mkString
val buffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, outputChannel.size(), data.getBytes.length)
buffer.put(data.getBytes)
}
private def moveFileToServer(shard: Shard, paths: List[String]): Unit = {
var fileTransfer: FileTransfer = null
this.copyFileMethod match {
case SCP => {
if (freePass || !nodePass.contains(shard.hostAddress)) {
fileTransfer = new ScpFileTransfer(shard.hostAddress)
} else {
fileTransfer = new ScpFileTransfer(shard.hostAddress, nodeUser(shard.hostAddress), nodePass(shard.hostAddress))
}
}
case RSYNC => {
if (freePass || !nodePass.contains(shard.hostAddress)) {
fileTransfer = new RsyncFileTransfer(shard.hostAddress)
} else {
fileTransfer = new RsyncFileTransfer(shard.hostAddress, nodeUser(shard.hostAddress), nodePass(shard.hostAddress))
}
}
case _ => throw new UnsupportedOperationException(s"unknown copy file method: '$copyFileMethod', please use " +
s"scp/rsync instead")
}
fileTransfer.init()
fileTransfer.transferAndChown(paths, s"${this.table.getLocalDataPath(shard).head}detached/")
fileTransfer.close()
}
private def attachClickhouseFile(shard: Shard, paths: List[String]): Unit = {
val balanced: BalancedClickhouseDataSource =
new BalancedClickhouseDataSource(
s"jdbc:clickhouse://${shard.hostAddress}:${shard.port}/${shard.database}", properties)
val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
paths.map(path => fromPathGetPart(path)).foreach(part => {
conn.createStatement().execute(s"ALTER TABLE ${this.table.getLocalTableName} ATTACH PART '$part'")
})
}
private def fromPathGetPart(path: String): String = {
path.substring(path.lastIndexOf("/") + 1)
}
private def clearLocalFile(path: String): Unit = {
val r = Try(FileUtils.deleteDirectory(new File(path)))
r match {
case Failure(exception) =>
LOGGER.warn(s"delete folder failed, path : $path", exception)
case Success(_) =>
}
}
override def checkConfig(): CheckResult = {
var checkResult = checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD,
CLICKHOUSE_LOCAL_PATH)
if (checkResult.isSuccess) {
clickhouseLocalPath = config.getString(CLICKHOUSE_LOCAL_PATH)
properties.put("user", config.getString(USERNAME))
properties.put("password", config.getString(PASSWORD))
val hosts = parseHost(config.getString(HOST))
val database = config.getString(DATABASE)
val table = config.getString(TABLE)
val conn = getClickhouseConnection(hosts.map(_.hostAndPort).mkString(","), database, properties)
if (config.hasPath(COPY_METHOD)) {
this.copyFileMethod = getCopyMethod(config.getString(COPY_METHOD))
}
if (config.hasPath(TMP_BATCH_CACHE_LINE)) {
this.tmpBatchCacheLine = config.getInt(TMP_BATCH_CACHE_LINE)
}
val (result, tableInfo) = getClickhouseTableInfo(conn, database, table)
if (!Objects.isNull(result)) {
checkResult = result
} else {
this.table = tableInfo
tableInfo.initTableInfo(hosts, conn)
tableInfo.initShardDataPath(config.getString(USERNAME), config.getString(PASSWORD))
// check config of node password whether completed or not
if (config.hasPath(NODE_FREE_PASSWORD) && config.getBoolean(NODE_FREE_PASSWORD)) {
this.freePass = true
} else if (config.hasPath(NODE_PASS)) {
val nodePass = config.getObjectList(NODE_PASS)
val nodeUserMap = mutable.Map[String, String]()
val nodePassMap = mutable.Map[String, String]()
nodePass.foreach(np => {
val address = np.toConfig.getString(NODE_ADDRESS)
// default user "root"
val username = if (np.toConfig.hasPath(USERNAME)) np.toConfig.getString(USERNAME) else "root"
val password = np.toConfig.getString(PASSWORD)
nodeUserMap(address) = username
nodePassMap(address) = password
})
this.nodeUser = nodePassMap.toMap
this.nodePass = nodePassMap.toMap
checkResult = checkNodePass(this.nodePass, tableInfo.shards.values().toList)
} else {
checkResult = CheckResult.error("if clickhouse node is free password to spark node, " +
"make config 'node_free_password' set true. Otherwise need provide clickhouse node password for" +
" root user, location at node_pass config.")
}
if (checkResult.isSuccess) {
// check sharding method
if (config.hasPath(SHARDING_KEY) && StringUtils.isNotEmpty(config.getString(SHARDING_KEY))) {
this.table.shardKey = config.getString(SHARDING_KEY)
}
checkResult = this.table.prepareShardInfo(conn)
if (checkResult.isSuccess) {
if (this.config.hasPath(FIELDS)) {
this.fields = config.getStringList(FIELDS).toList
checkResult = acceptedClickHouseSchema(this.fields, JavaConversions.mapAsScalaMap(this.table
.tableSchema).toMap, this.table.name)
}
}
}
}
}
checkResult
}
private def checkNodePass(nodePassMap: Map[String, String], shardInfo: List[Shard]): CheckResult = {
val noPassShard = shardInfo.filter(shard => !nodePassMap.contains(shard.hostAddress) &&
!nodePassMap.contains(shard.hostname))
if (noPassShard.nonEmpty) {
CheckResult.error(s"can't find node ${
String.join(",", JavaConversions.asJavaIterable(noPassShard.map(s => s.hostAddress)))
} password in node_address config")
} else {
CheckResult.success()
}
}
override def prepare(prepareEnv: SparkEnvironment): Unit = {
}
override def getPluginName: String = "ClickhouseFile"
}
object ClickhouseFile {
private final val CLICKHOUSE_FILE_PREFIX = "/tmp/clickhouse-local/spark-file"
private val LOGGER = LoggerFactory.getLogger(classOf[ClickhouseFile])
private val UUID_LENGTH = 10
private val OBJECT_MAPPER = new ObjectMapper()
OBJECT_MAPPER.registerModule(DefaultScalaModule)
def getClickhouseTableInfo(conn: ClickHouseConnectionImpl, database: String, table: String):
(CheckResult, Table) = {
val sql = s"select engine,create_table_query,engine_full,data_paths from system.tables where database " +
s"= '$database' and name = '$table'"
val rs = conn.createStatement().executeQuery(sql)
if (rs.next()) {
(null, new Table(table, database, rs.getString(1), rs.getString(2),
rs.getString(3),
OBJECT_MAPPER.readValue(rs.getString(4).replaceAll("'", "\""),
classOf[util.List[String]]).toList))
} else {
(CheckResult.error(s"can't find table '$table' in database '$database', please check config file"),
null)
}
}
}