blob: e69a425ad59fef0fda4b37af2c1f3420833961a4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.integrationtest.checklist
import org.scalatest.TestData
import org.apache.gearpump.integrationtest.kafka._
import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
/**
* The test spec checks the Kafka datasource connector
*/
class ConnectorKafkaSpec extends TestSpecBase {
private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway)
private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka").head
private var producer: NumericalDataProducer = null
override def beforeAll(): Unit = {
super.beforeAll()
kafkaCluster.start()
}
override def afterAll(): Unit = {
kafkaCluster.shutDown()
super.afterAll()
}
override def afterEach(test: TestData): Unit = {
super.afterEach(test)
if (producer != null) {
producer.stop()
producer = null
}
}
"KafkaSource and KafkaSink" should {
"read from and write to kafka" in {
// setup
val sourceTopic = "topic1"
val sinkTopic = "topic2"
val messageNum = 10000
kafkaCluster.produceDataToKafka(sourceTopic, messageNum)
// exercise
val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
"-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
"-brokerList", kafkaCluster.getBrokerListConnectString,
"-sourceTopic", sourceTopic,
"-sinkTopic", sinkTopic).mkString(" ")
val appId = restClient.getNextAvailableAppId()
val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
success shouldBe true
// verify
expectAppIsRunning(appId, "KafkaReadWrite")
Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum,
"kafka all message written")
}
}
"Gearpump with Kafka" should {
"support at-least-once message delivery" in {
// setup
val sourcePartitionNum = 2
val sourceTopic = "topic3"
val sinkTopic = "topic4"
// Generate number sequence (1, 2, 3, ...) to the topic
kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
producer = new NumericalDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString)
producer.start()
// exercise
val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite",
"-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
"-brokerList", kafkaCluster.getBrokerListConnectString,
"-sourceTopic", sourceTopic,
"-sinkTopic", sinkTopic,
"-source", sourcePartitionNum).mkString(" ")
val appId = restClient.getNextAvailableAppId()
val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
success shouldBe true
// verify #1
expectAppIsRunning(appId, "KafkaReadWrite")
Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running")
// verify #2
val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max
restClient.killExecutor(appId, executorToKill) shouldBe true
Util.retryUntil(() => restClient.queryExecutorBrief(appId)
.map(_.executorId).max > executorToKill,
s"executor $executorToKill killed")
// verify #3
val detector = new MessageLossDetector(producer.lastWriteNum)
val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
Util.retryUntil(() => {
kafkaReader.read()
detector.allReceived
}, "kafka all message read")
}
}
}