| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.openwhisk.core.database |
| |
| import java.nio.charset.StandardCharsets |
| |
| import scala.concurrent.Future |
| import scala.concurrent.duration.DurationInt |
| import scala.util.Failure |
| import scala.util.Success |
| import scala.util.Try |
| import akka.actor.ActorSystem |
| import akka.actor.Props |
| import spray.json._ |
| import org.apache.openwhisk.common.Logging |
| import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} |
| import org.apache.openwhisk.core.connector.Message |
| import org.apache.openwhisk.core.connector.MessageFeed |
| import org.apache.openwhisk.core.connector.MessagingProvider |
| import org.apache.openwhisk.core.entity.CacheKey |
| import org.apache.openwhisk.core.entity.ControllerInstanceId |
| import org.apache.openwhisk.core.entity.WhiskAction |
| import org.apache.openwhisk.core.entity.WhiskActionMetaData |
| import org.apache.openwhisk.core.entity.WhiskPackage |
| import org.apache.openwhisk.core.entity.WhiskRule |
| import org.apache.openwhisk.core.entity.WhiskTrigger |
| import org.apache.openwhisk.spi.SpiLoader |
| import pureconfig._ |
| |
| case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message { |
| override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint |
| } |
| |
| object CacheInvalidationMessage extends DefaultJsonProtocol { |
| def parse(msg: String) = Try(serdes.read(msg.parseJson)) |
| implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", "instanceId") |
| } |
| |
| class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: ControllerInstanceId)( |
| implicit logging: Logging, |
| as: ActorSystem) { |
| import RemoteCacheInvalidation._ |
| implicit private val ec = as.dispatchers.lookup("dispatchers.kafka-dispatcher") |
| |
| private val instanceId = s"$component${instance.asString}" |
| |
| private val msgProvider = SpiLoader.get[MessagingProvider] |
| private val cacheInvalidationConsumer = |
| msgProvider.getConsumer(config, s"$cacheInvalidationTopic$instanceId", cacheInvalidationTopic, maxPeek = 128) |
| private val cacheInvalidationProducer = msgProvider.getProducer(config) |
| |
| def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = { |
| cacheInvalidationProducer.send(cacheInvalidationTopic, CacheInvalidationMessage(key, instanceId)).map(_ => ()) |
| } |
| |
| private val invalidationFeed = as.actorOf(Props { |
| new MessageFeed( |
| "cacheInvalidation", |
| logging, |
| cacheInvalidationConsumer, |
| cacheInvalidationConsumer.maxPeek, |
| 1.second, |
| removeFromLocalCache) |
| }) |
| |
| def invalidateWhiskActionMetaData(key: CacheKey) = |
| WhiskActionMetaData.removeId(key) |
| |
| private def removeFromLocalCache(bytes: Array[Byte]): Future[Unit] = Future { |
| val raw = new String(bytes, StandardCharsets.UTF_8) |
| |
| CacheInvalidationMessage.parse(raw) match { |
| case Success(msg: CacheInvalidationMessage) => { |
| if (msg.instanceId != instanceId) { |
| WhiskActionMetaData.removeId(msg.key) |
| WhiskAction.removeId(msg.key) |
| WhiskPackage.removeId(msg.key) |
| WhiskRule.removeId(msg.key) |
| WhiskTrigger.removeId(msg.key) |
| } |
| } |
| case Failure(t) => logging.error(this, s"failed processing message: $raw with $t") |
| } |
| invalidationFeed ! MessageFeed.Processed |
| } |
| } |
| |
| object RemoteCacheInvalidation { |
| val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) |
| val cacheInvalidationTopic = topicPrefix + "cacheInvalidation" |
| } |