* Copyright (c) 2019 The StreamX Project
* <p>
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package com.streamxhub.streamx.flink.core.scala.sink
import com.streamxhub.streamx.common.conf.ConfigConst
import com.streamxhub.streamx.common.util.{ConfigUtils, Logger, Utils}
import com.streamxhub.streamx.flink.core.scala.StreamingContext
import org.apache.flink.api.common.serialization.{SerializationSchema, SimpleStringSchema}
import org.apache.flink.streaming.api.datastream.DataStreamSink
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.{DEFAULT_KAFKA_PRODUCERS_POOL_SIZE, Semantic}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Optional, Properties}
import javax.annotation.Nullable
import scala.annotation.meta.param
import scala.util.Try
object KafkaSink {
def apply(@(transient@param)
property: Properties = new Properties(),
parallelism: Int = 0,
name: String = null,
uid: String = null)(implicit ctx: StreamingContext): KafkaSink = new KafkaSink(ctx, property, parallelism, name, uid)
class KafkaSink(@(transient@param) val ctx: StreamingContext,
property: Properties = new Properties(),
parallelism: Int = 0,
name: String = null,
uid: String = null) extends Sink {
* for scala
* @param stream
* @param alias
* @param topic
* @param serializer 序列化Scheam,不指定默认使用SimpleStringSchema
* @param partitioner 指定kafka分区器(默认使用<b>KafkaEqualityPartitioner</b>分区器,顾名思义,该分区器可以均匀的将数据写到各个分区中去,
* 注意:Flink中默认使用的是<span style="color:RED">FlinkFixedPartitioner</span>分区器,该分区器需要特别注意sink的并行度和kafka的分区数,不然会出现往一个分区写...
* )
* @tparam T
* @return
def sink[T](stream: DataStream[T],
alias: String = "",
topic: String = "",
serializer: SerializationSchema[T] = new SimpleStringSchema().asInstanceOf[SerializationSchema[T]],
partitioner: FlinkKafkaPartitioner[T] = new KafkaEqualityPartitioner[T](ctx.getParallelism)): DataStreamSink[T] = {
val producer = {
val prop = ConfigUtils.getKafkaSinkConf(ctx.parameter.toMap, topic, alias)
Utils.copyProperties(property, prop)
val topicId = prop.remove(ConfigConst.KEY_KAFKA_TOPIC).toString
* EXACTLY_ONCE语义下会使用到 kafkaProducersPoolSize
val semantic = Try(Some(prop.remove(ConfigConst.KEY_KAFKA_SEMANTIC).toString.toUpperCase)).getOrElse(None) match {
case None => Semantic.AT_LEAST_ONCE //默认采用AT_LEAST_ONCE
case Some("AT_LEAST_ONCE") => Semantic.AT_LEAST_ONCE
case Some("EXACTLY_ONCE") => Semantic.EXACTLY_ONCE
case Some("NONE") => Semantic.NONE
case _ => throw new IllegalArgumentException("[StreamX] kafka.sink semantic error,muse be (AT_LEAST_ONCE|EXACTLY_ONCE|NONE) ")
val schema = new KeyedSerializationSchemaWrapper[T](serializer)
val customPartitioner = partitioner match {
case null => Optional.ofNullable(null).asInstanceOf[Optional[FlinkKafkaPartitioner[T]]]
case part => Optional.of(part)
new FlinkKafkaProducer[T](topicId, schema, prop, customPartitioner, semantic, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE)
* versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
* this method is not available for earlier Kafka versions
val sink = stream.addSink(producer)
afterSink(sink, parallelism, name, uid)
* <b>KafkaEqualityPartitioner</b>分区器,顾名思义,该分区器可以均匀的将数据写到各个分区中去
* @param parallelism
* @tparam T
class KafkaEqualityPartitioner[T](parallelism: Int) extends FlinkKafkaPartitioner[T] with Logger {
private[this] var parallelInstanceId = 0
private[this] val partitionIndex: AtomicInteger = new AtomicInteger(0)
override def open(parallelInstanceId: Int, parallelInstances: Int): Unit = {
logInfo(s"KafkaEqualityPartitioner: parallelism $parallelism")
require(parallelInstanceId >= 0 && parallelInstances > 0, "[StreamX] KafkaEqualityPartitioner:Id of this subtask cannot be negative,Number of subtasks must be larger than 0.")
this.parallelInstanceId = parallelInstanceId
override def partition(record: T, key: Array[Byte], value: Array[Byte], targetTopic: String, partitions: Array[Int]): Int = {
require(partitions != null && partitions.length > 0, "[StreamX] KafkaEqualityPartitioner:Partitions of the target topic is empty.")
(parallelism, partitions.length) match {
//kafka have 1 partition
case (_, 1) => 0
case (x, y) if x % y == 0 => partitions(parallelInstanceId % partitions.length)
case (_, y) =>
partitionIndex.get() match {
case x if x == y - 1 => partitionIndex.getAndSet(0)
case _ => partitionIndex.incrementAndGet()
override def equals(o: Any): Boolean = this == o || o.isInstanceOf[KafkaEqualityPartitioner[T]]
override def hashCode: Int = classOf[KafkaEqualityPartitioner[T]].hashCode
def checkArgument(condition: Boolean, @Nullable errorMessage: String): Unit = if (!condition) throw new IllegalArgumentException(errorMessage)