blob: 1e127fef2fcc353a7fb04702ce86abd4944c4257 [file] [log] [blame]
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package services
import java.util.Calendar
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import org.apache.kafka.clients.consumer.CommitFailedException
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import akka.event.Logging.DebugLevel
import whisk.common.TransactionId
import whisk.connector.kafka.KafkaConsumerConnector
import whisk.connector.kafka.KafkaProducerConnector
import whisk.core.WhiskConfig
import whisk.core.connector.Message
import whisk.utils.ExecutionContextFactory
import common.WskActorSystem
@RunWith(classOf[JUnitRunner])
class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll {
implicit val transid = TransactionId.testing
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
val config = new WhiskConfig(WhiskConfig.kafkaHost)
assert(config.isValid)
val groupid = "kafkatest"
val topic = "Dinosaurs"
val sessionTimeout = 10 seconds
val producer = new KafkaProducerConnector(config.kafkaHost, ec)
val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, sessionTimeout = sessionTimeout)
producer.setVerbosity(DebugLevel)
consumer.setVerbosity(DebugLevel)
override def afterAll() {
producer.close()
consumer.close()
super.afterAll()
}
behavior of "Kafka connector"
it should "send and receive a kafka message which sets up the topic" in {
for (i <- 0 until 5) {
val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
val start = java.lang.System.currentTimeMillis
val sent = Await.result(producer.send(topic, message), 10 seconds)
val received = consumer.peek(10 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
val end = java.lang.System.currentTimeMillis
val elapsed = end - start
println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
received.size should be >= 1
received.last should be(message.serialize)
consumer.commit()
}
}
it should "send and receive a kafka message even after session timeout" in {
for (i <- 0 until 4) {
val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
val start = java.lang.System.currentTimeMillis
val sent = Await.result(producer.send(topic, message), 1 seconds)
val received = consumer.peek(1 seconds).map { case (_, _, _, msg) => new String(msg, "utf-8") }
val end = java.lang.System.currentTimeMillis
val elapsed = end - start
println(s"($i) Received ${received.size}. Took $elapsed msec: $received\n")
// only the last iteration will have an updated cursor
// iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
// iteration 1: get iteration 0 records + 1 more (since we intentionally failed the commit on previous iteration)
// iteration 2: get iteration 1 records + 1 more (since we intentionally failed the commit on previous iteration)
// iteration 3: get exactly 1 records since iteration 2 should have forwarded the cursor
if (i < 3) {
received.size should be >= i + 1
} else {
received.size should be(1)
}
received.last should be(message.serialize)
if (i < 2) {
Thread.sleep((sessionTimeout + 1.second).toMillis)
a[CommitFailedException] should be thrownBy {
consumer.commit() // sleep should cause commit to fail
}
} else consumer.commit()
}
}
}