| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.seatunnel.spark.kafka.sink |
| |
| import java.util.Properties |
| |
| import scala.collection.JavaConversions._ |
| |
| import org.apache.kafka.clients.producer.ProducerConfig |
| import org.apache.kafka.common.serialization.StringSerializer |
| import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils} |
| import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory |
| import org.apache.seatunnel.spark.SparkEnvironment |
| import org.apache.seatunnel.spark.batch.SparkBatchSink |
| import org.apache.spark.broadcast.Broadcast |
| import org.apache.spark.internal.Logging |
| import org.apache.spark.sql.{Dataset, Row} |
| |
| class Kafka extends SparkBatchSink with Logging { |
| |
| val producerPrefix = "producer." |
| |
| var kafkaSink: Option[Broadcast[KafkaProducerUtil]] = None |
| |
| override def checkConfig(): CheckResult = { |
| |
| val producerConfig = TypesafeConfigUtils.extractSubConfig(config, producerPrefix, false) |
| |
| if (config.hasPath("topic") && producerConfig.hasPath(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { |
| CheckResult.success() |
| } else { |
| CheckResult.error("please specify [topic] and [producer.bootstrap.servers]") |
| } |
| } |
| |
| override def prepare(env: SparkEnvironment): Unit = { |
| val defaultConfig = ConfigFactory.parseMap( |
| Map( |
| "format" -> "json", |
| producerPrefix + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName, |
| producerPrefix + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName)) |
| |
| config = config.withFallback(defaultConfig) |
| |
| val props = new Properties() |
| TypesafeConfigUtils |
| .extractSubConfig(config, producerPrefix, false) |
| .entrySet() |
| .foreach(entry => { |
| val key = entry.getKey |
| val value = String.valueOf(entry.getValue.unwrapped()) |
| props.put(key, value) |
| }) |
| |
| log.info("Kafka Output properties: ") |
| props.foreach(entry => { |
| val (key, value) = entry |
| log.info(key + " = " + value) |
| }) |
| |
| kafkaSink = Some(env.getSparkSession.sparkContext.broadcast(KafkaProducerUtil(props))) |
| } |
| |
| override def output(df: Dataset[Row], environment: SparkEnvironment): Unit = { |
| |
| val topic = config.getString("topic") |
| var format = config.getString("format") |
| if (config.hasPath("serializer")) { |
| format = config.getString("serializer") |
| } |
| format match { |
| case "text" => |
| if (df.schema.size != 1) { |
| throw new Exception( |
| s"Text data source supports only a single column," + |
| s" and you have ${df.schema.size} columns.") |
| } else { |
| df.foreach { row => |
| kafkaSink.foreach { ks => |
| ks.value.send(topic, row.getAs[String](0)) |
| } |
| } |
| } |
| case _ => |
| val dataSet = df.toJSON |
| dataSet.foreach { row => |
| kafkaSink.foreach { ks => |
| ks.value.send(topic, row) |
| } |
| } |
| } |
| } |
| |
| override def getPluginName: String = "Kafka" |
| } |