blob: c5b5e019b3a3329e86684c4c36d3210d7a7c1340 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.database
import java.nio.charset.StandardCharsets
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.actor.ActorSystem
import akka.actor.Props
import spray.json._
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.Message
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.entity.CacheKey
import org.apache.openwhisk.core.entity.ControllerInstanceId
import org.apache.openwhisk.core.entity.WhiskAction
import org.apache.openwhisk.core.entity.WhiskActionMetaData
import org.apache.openwhisk.core.entity.WhiskPackage
import org.apache.openwhisk.core.entity.WhiskRule
import org.apache.openwhisk.core.entity.WhiskTrigger
import org.apache.openwhisk.spi.SpiLoader
import pureconfig._
case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint
}
object CacheInvalidationMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", "instanceId")
}
class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: ControllerInstanceId)(
implicit logging: Logging,
as: ActorSystem) {
import RemoteCacheInvalidation._
implicit private val ec = as.dispatchers.lookup("dispatchers.kafka-dispatcher")
private val instanceId = s"$component${instance.asString}"
private val msgProvider = SpiLoader.get[MessagingProvider]
private val cacheInvalidationConsumer =
msgProvider.getConsumer(config, s"$cacheInvalidationTopic$instanceId", cacheInvalidationTopic, maxPeek = 128)
private val cacheInvalidationProducer = msgProvider.getProducer(config)
def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
cacheInvalidationProducer.send(cacheInvalidationTopic, CacheInvalidationMessage(key, instanceId)).map(_ => ())
}
private val invalidationFeed = as.actorOf(Props {
new MessageFeed(
"cacheInvalidation",
logging,
cacheInvalidationConsumer,
cacheInvalidationConsumer.maxPeek,
1.second,
removeFromLocalCache)
})
def invalidateWhiskActionMetaData(key: CacheKey) =
WhiskActionMetaData.removeId(key)
private def removeFromLocalCache(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
CacheInvalidationMessage.parse(raw) match {
case Success(msg: CacheInvalidationMessage) => {
if (msg.instanceId != instanceId) {
WhiskActionMetaData.removeId(msg.key)
WhiskAction.removeId(msg.key)
WhiskPackage.removeId(msg.key)
WhiskRule.removeId(msg.key)
WhiskTrigger.removeId(msg.key)
}
}
case Failure(t) => logging.error(this, s"failed processing message: $raw with $t")
}
invalidationFeed ! MessageFeed.Processed
}
}
object RemoteCacheInvalidation {
val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
val cacheInvalidationTopic = topicPrefix + "cacheInvalidation"
}