blob: 3c4f8c687b6b3ff75f0c0b5fb9b0f5f07b49b758 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.service
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.ibm.etcd.api.Event.EventType
import com.ibm.etcd.client.kv.WatchUpdate
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdType._
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
// messages received by this actor
case class WatchEndpoint(key: String,
value: String,
isPrefix: Boolean,
name: String,
listenEvents: Set[EtcdEvent] = Set.empty)
case class UnwatchEndpoint(watchKey: String, isPrefix: Boolean, watchName: String, needFeedback: Boolean = false)
// the watchKey is the string user want to watch, it can be a prefix, the key is a record's key in Etcd
// so if `isPrefix = true`, the `watchKey != key`, else the `watchKey == key`
sealed abstract class WatchEndpointOperation(val watchKey: String,
val key: String,
val value: String,
val isPrefix: Boolean)
case class WatchEndpointRemoved(override val watchKey: String,
override val key: String,
override val value: String,
override val isPrefix: Boolean)
extends WatchEndpointOperation(watchKey, key, value, isPrefix)
case class WatchEndpointInserted(override val watchKey: String,
override val key: String,
override val value: String,
override val isPrefix: Boolean)
extends WatchEndpointOperation(watchKey, key, value, isPrefix)
case class WatcherClosed(key: String, isPrefix: Boolean)
sealed trait EtcdEvent
case object PutEvent extends EtcdEvent
case object DeleteEvent extends EtcdEvent
// there may be several watchers for a same watcher key, so add a watcherName to distinguish them
case class WatcherKey(watchKey: String, watchName: String)
class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSystem: ActorSystem) extends Actor {
implicit val ec = context.dispatcher
private[service] val putWatchers = TrieMap[WatcherKey, ActorRef]()
private[service] val deleteWatchers = TrieMap[WatcherKey, ActorRef]()
private[service] val prefixPutWatchers = TrieMap[WatcherKey, ActorRef]()
private[service] val prefixDeleteWatchers = TrieMap[WatcherKey, ActorRef]()
private val watcher = etcdClient.watchAllKeys { res: WatchUpdate =>
res.getEvents.asScala.foreach { event =>
event.getType match {
case EventType.DELETE =>
val key = ByteStringToString(event.getPrevKv.getKey)
val value = ByteStringToString(event.getPrevKv.getValue)
val watchEvent = WatchEndpointRemoved(key, key, value, false)
deleteWatchers
.foreach { watcher =>
if (watcher._1.watchKey == key) {
watcher._2 ! watchEvent
}
}
prefixDeleteWatchers
.foreach { watcher =>
if (key.startsWith(watcher._1.watchKey)) {
watcher._2 ! WatchEndpointRemoved(watcher._1.watchKey, key, value, true)
}
}
case EventType.PUT =>
val key = ByteStringToString(event.getKv.getKey)
val value = ByteStringToString(event.getKv.getValue)
val watchEvent = WatchEndpointInserted(key, key, value, false)
putWatchers
.foreach { watcher =>
if (watcher._1.watchKey == key) {
watcher._2 ! watchEvent
}
}
prefixPutWatchers
.foreach { watcher =>
if (key.startsWith(watcher._1.watchKey)) {
watcher._2 ! WatchEndpointInserted(watcher._1.watchKey, key, value, true)
}
}
case msg =>
logging.debug(this, s"watch event received: $msg.")
}
}
}
override def receive: Receive = {
case request: WatchEndpoint =>
logging.info(this, s"watch endpoint: $request")
val watcherKey = WatcherKey(request.key, request.name)
if (request.listenEvents.contains(PutEvent))
if (request.isPrefix)
prefixPutWatchers.update(watcherKey, sender())
else
putWatchers.update(watcherKey, sender())
if (request.listenEvents.contains(DeleteEvent))
if (request.isPrefix)
prefixDeleteWatchers.update(watcherKey, sender())
else
deleteWatchers.update(watcherKey, sender())
case request: UnwatchEndpoint =>
val watcherKey = WatcherKey(request.watchKey, request.watchName)
if (request.isPrefix) {
prefixPutWatchers.remove(watcherKey)
prefixDeleteWatchers.remove(watcherKey)
} else {
putWatchers.remove(watcherKey)
deleteWatchers.remove(watcherKey)
}
// always send WatcherClosed back to sender if it need a feedback
if (request.needFeedback)
sender ! WatcherClosed(request.watchKey, request.isPrefix)
}
}
object WatcherService {
def props(etcdClient: EtcdClient)(implicit logging: Logging, actorSystem: ActorSystem): Props = {
Props(new WatcherService(etcdClient))
}
}