blob: 9f4fdc4c2948945f1f670e5fad1e787f727c4573 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.scala.examples.kafka
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
/**
* An example that shows how to read from and write to Kafka. This will read String messages
* from the input topic, prefix them by a configured prefix and output to the output topic.
*
* Please pass the following arguments to run the example:
* {{{
* --input-topic test-input
* --output-topic test-output
* --bootstrap.servers localhost:9092
* --zookeeper.connect localhost:2181
* --group.id myconsumer
* }}}
*/
object Kafka010Example {
def main(args: Array[String]): Unit = {
// parse input arguments
val params = ParameterTool.fromArgs(args)
if (params.getNumberOfParameters < 4) {
println("Missing parameters!\n"
+ "Usage: Kafka --input-topic <topic> --output-topic <topic> "
+ "--bootstrap.servers <kafka brokers> "
+ "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
return
}
val prefix = params.get("prefix", "PREFIX:")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new SimpleStringSchema,
params.getProperties)
val messageStream = env
.addSource(kafkaConsumer)
.map(in => prefix + in)
// create a Kafka producer for Kafka 0.10.x
val kafkaProducer = new FlinkKafkaProducer010(
params.getRequired("output-topic"),
new SimpleStringSchema,
params.getProperties)
// write data into Kafka
messageStream.addSink(kafkaProducer)
env.execute("Kafka 0.10 Example")
}
}