blob: e4d252801b423b8ca78e50f0130a67bc011bd879 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.database.cosmosdb.cache
import java.net.UnknownHostException
import akka.Done
import akka.actor.CoordinatedShutdown
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
import com.typesafe.config.ConfigFactory
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.openwhisk.common.{AkkaLogging, TransactionId}
import org.apache.openwhisk.core.database.{CacheInvalidationMessage, RemoteCacheInvalidation}
import org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, CosmosDBTestSupport}
import org.apache.openwhisk.core.entity.{
DocumentReader,
EntityName,
EntityPath,
WhiskDocumentReader,
WhiskEntity,
WhiskEntityJsonFormat,
WhiskPackage
}
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{Matchers, TryValues}
import scala.concurrent.duration._
import scala.util.Random
@RunWith(classOf[JUnitRunner])
class CacheInvalidatorTests
extends ScalatestKafkaSpec(6061)
with EmbeddedKafkaLike
with EmbeddedKafka
with CosmosDBTestSupport
with Matchers
with ScalaFutures
with TryValues {
private implicit val logging = new AkkaLogging(system.log)
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = 300.seconds)
override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
behavior of "CosmosDB CacheInvalidation"
private val server = s"localhost:$kafkaPort"
private var dbName: String = _
override def afterAll(): Unit = {
super.afterAll()
CoordinatedShutdown(system).run(CoordinatedShutdown.ClusterDowningReason)
shutdown()
}
it should "send event upon entity change" in {
implicit val tid = TransactionId.testing
implicit val docReader: DocumentReader = WhiskDocumentReader
implicit val format = WhiskEntityJsonFormat
dbName = createTestDB().getId
val dbConfig = storeConfig.copy(db = dbName)
val store = CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), EntityName(randomString()))
//Start cache invalidator after the db for whisks is created
val cacheInvalidator = startCacheInvalidator()
val (start, finish) = cacheInvalidator.start()
start.futureValue shouldBe Done
log.info("Cache Invalidator service started")
//Store stuff in db
val info = store.put(pkg).futureValue
log.info(s"Added document ${info.id}")
//This should result in change feed trigger and event to kafka topic
val topic = RemoteCacheInvalidation.cacheInvalidationTopic
val msgs =
consumeNumberMessagesFromTopics(Set(topic), 1, timeout = 60.seconds)(createKafkaConfig, new StringDeserializer())(
topic)
CacheInvalidationMessage.parse(msgs.head).get.key.mainId shouldBe pkg.docid.asString
store.del(info).futureValue
cacheInvalidator.stop(None)
finish.futureValue shouldBe Done
}
it should "exit if there is a missing kafka broker config" in {
implicit val tid = TransactionId.testing
implicit val docReader: DocumentReader = WhiskDocumentReader
implicit val format = WhiskEntityJsonFormat
dbName = createTestDB().getId
val dbConfig = storeConfig.copy(db = dbName)
val store = CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), EntityName(randomString()))
//Start cache invalidator after the db for whisks is created
val cacheInvalidator = startCacheInvalidatorWithoutKafka()
val (start, finish) = cacheInvalidator.start()
start.futureValue shouldBe Done
log.info("Cache Invalidator service started")
//when kafka config is missing, we expect KafkaException from producer immediately (although stopping feed processor takes some time)
finish.failed.futureValue shouldBe an[KafkaException]
}
it should "exit if kafka is not consuming" in {
implicit val tid = TransactionId.testing
implicit val docReader: DocumentReader = WhiskDocumentReader
implicit val format = WhiskEntityJsonFormat
dbName = createTestDB().getId
val dbConfig = storeConfig.copy(db = dbName)
val store = CosmosDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](dbConfig, None)
val pkg = WhiskPackage(EntityPath("cacheInvalidationTest"), EntityName(randomString()))
//Start cache invalidator with a bogus kafka broker after the db for whisks is created
val cacheInvalidator = startCacheInvalidatorWithInvalidKafka()
val (start, finish) = cacheInvalidator.start()
start.futureValue shouldBe Done
log.info("Cache Invalidator service started")
//Store stuff in db
val info = store.put(pkg).futureValue
log.info(s"Added document ${info.id}")
//when we cannot connect to kafka, we expect KafkaException from producer after timeout
finish.failed.futureValue shouldBe an[KafkaException]
}
it should "exit if there is a bad db config" in {
//Start cache invalidator after the db for whisks is created
val cacheInvalidator = startCacheInvalidatorWithoutCosmos()
val (start, finish) = cacheInvalidator.start()
//when db config is broken, we expect reactor.core.Exceptions$ReactiveException (a non-public RuntimeException)
start.failed.futureValue.getCause shouldBe an[UnknownHostException]
}
private def randomString() = Random.alphanumeric.take(5).mkString
private def startCacheInvalidator(): CacheInvalidator = {
val tsconfig = ConfigFactory.parseString(s"""
|akka.kafka.producer {
| kafka-clients {
| bootstrap.servers = "$server"
| }
|}
|whisk {
| cache-invalidator {
| cosmosdb {
| db = "$dbName"
| start-from-beginning = true
| }
| }
|}
""".stripMargin).withFallback(ConfigFactory.load())
new CacheInvalidator(tsconfig)
}
private def startCacheInvalidatorWithoutKafka(): CacheInvalidator = {
val tsconfig = ConfigFactory.parseString(s"""
|akka.kafka.producer {
| kafka-clients {
| #this config is missing
| }
|}
|whisk {
| cache-invalidator {
| cosmosdb {
| db = "$dbName"
| start-from-beginning = true
| }
| }
|}
""".stripMargin).withFallback(ConfigFactory.load())
new CacheInvalidator(tsconfig)
}
private def startCacheInvalidatorWithInvalidKafka(): CacheInvalidator = {
val tsconfig = ConfigFactory.parseString(s"""
|akka.kafka.producer {
| kafka-clients {
| bootstrap.servers = "localhost:9092"
| }
|}
|whisk {
| cache-invalidator {
| cosmosdb {
| db = "$dbName"
| start-from-beginning = true
| }
| }
|}
""".stripMargin).withFallback(ConfigFactory.load())
new CacheInvalidator(tsconfig)
}
private def startCacheInvalidatorWithoutCosmos(): CacheInvalidator = {
val tsconfig = ConfigFactory.parseString(s"""
|akka.kafka.producer {
| kafka-clients {
| bootstrap.servers = "$server"
| }
|}
|whisk {
| cache-invalidator {
| cosmosdb {
| db = "$dbName"
| endpoint = "https://BADENDPOINT-nobody-home.documents.azure.com:443/"
| start-from-beginning = true
| }
| }
|}
""".stripMargin).withFallback(ConfigFactory.load())
new CacheInvalidator(tsconfig)
}
}