[New Scheduler] Add a centralized watcher for etcd data (#5069)
* Add a centralized watcher for etcd data
* Fix class name format
* Fix compilation error
* Enable test for WatcherService
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
new file mode 100644
index 0000000..3c4f8c6
--- /dev/null
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/service/WatcherService.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import akka.actor.{Actor, ActorRef, ActorSystem, Props}
+import com.ibm.etcd.api.Event.EventType
+import com.ibm.etcd.client.kv.WatchUpdate
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.etcd.EtcdClient
+import org.apache.openwhisk.core.etcd.EtcdType._
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+
+// messages received by this actor
+case class WatchEndpoint(key: String,
+ value: String,
+ isPrefix: Boolean,
+ name: String,
+ listenEvents: Set[EtcdEvent] = Set.empty)
+case class UnwatchEndpoint(watchKey: String, isPrefix: Boolean, watchName: String, needFeedback: Boolean = false)
+
+// the watchKey is the string user want to watch, it can be a prefix, the key is a record's key in Etcd
+// so if `isPrefix = true`, the `watchKey != key`, else the `watchKey == key`
+sealed abstract class WatchEndpointOperation(val watchKey: String,
+ val key: String,
+ val value: String,
+ val isPrefix: Boolean)
+case class WatchEndpointRemoved(override val watchKey: String,
+ override val key: String,
+ override val value: String,
+ override val isPrefix: Boolean)
+ extends WatchEndpointOperation(watchKey, key, value, isPrefix)
+case class WatchEndpointInserted(override val watchKey: String,
+ override val key: String,
+ override val value: String,
+ override val isPrefix: Boolean)
+ extends WatchEndpointOperation(watchKey, key, value, isPrefix)
+case class WatcherClosed(key: String, isPrefix: Boolean)
+
+sealed trait EtcdEvent
+case object PutEvent extends EtcdEvent
+case object DeleteEvent extends EtcdEvent
+
+// there may be several watchers for a same watcher key, so add a watcherName to distinguish them
+case class WatcherKey(watchKey: String, watchName: String)
+
+class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSystem: ActorSystem) extends Actor {
+
+ implicit val ec = context.dispatcher
+
+ private[service] val putWatchers = TrieMap[WatcherKey, ActorRef]()
+ private[service] val deleteWatchers = TrieMap[WatcherKey, ActorRef]()
+ private[service] val prefixPutWatchers = TrieMap[WatcherKey, ActorRef]()
+ private[service] val prefixDeleteWatchers = TrieMap[WatcherKey, ActorRef]()
+
+ private val watcher = etcdClient.watchAllKeys { res: WatchUpdate =>
+ res.getEvents.asScala.foreach { event =>
+ event.getType match {
+ case EventType.DELETE =>
+ val key = ByteStringToString(event.getPrevKv.getKey)
+ val value = ByteStringToString(event.getPrevKv.getValue)
+ val watchEvent = WatchEndpointRemoved(key, key, value, false)
+ deleteWatchers
+ .foreach { watcher =>
+ if (watcher._1.watchKey == key) {
+ watcher._2 ! watchEvent
+ }
+ }
+ prefixDeleteWatchers
+ .foreach { watcher =>
+ if (key.startsWith(watcher._1.watchKey)) {
+ watcher._2 ! WatchEndpointRemoved(watcher._1.watchKey, key, value, true)
+ }
+ }
+ case EventType.PUT =>
+ val key = ByteStringToString(event.getKv.getKey)
+ val value = ByteStringToString(event.getKv.getValue)
+ val watchEvent = WatchEndpointInserted(key, key, value, false)
+ putWatchers
+ .foreach { watcher =>
+ if (watcher._1.watchKey == key) {
+ watcher._2 ! watchEvent
+ }
+ }
+ prefixPutWatchers
+ .foreach { watcher =>
+ if (key.startsWith(watcher._1.watchKey)) {
+ watcher._2 ! WatchEndpointInserted(watcher._1.watchKey, key, value, true)
+ }
+ }
+ case msg =>
+ logging.debug(this, s"watch event received: $msg.")
+ }
+ }
+
+ }
+
+ override def receive: Receive = {
+ case request: WatchEndpoint =>
+ logging.info(this, s"watch endpoint: $request")
+ val watcherKey = WatcherKey(request.key, request.name)
+ if (request.listenEvents.contains(PutEvent))
+ if (request.isPrefix)
+ prefixPutWatchers.update(watcherKey, sender())
+ else
+ putWatchers.update(watcherKey, sender())
+
+ if (request.listenEvents.contains(DeleteEvent))
+ if (request.isPrefix)
+ prefixDeleteWatchers.update(watcherKey, sender())
+ else
+ deleteWatchers.update(watcherKey, sender())
+
+ case request: UnwatchEndpoint =>
+ val watcherKey = WatcherKey(request.watchKey, request.watchName)
+ if (request.isPrefix) {
+ prefixPutWatchers.remove(watcherKey)
+ prefixDeleteWatchers.remove(watcherKey)
+ } else {
+ putWatchers.remove(watcherKey)
+ deleteWatchers.remove(watcherKey)
+ }
+
+ // always send WatcherClosed back to sender if it need a feedback
+ if (request.needFeedback)
+ sender ! WatcherClosed(request.watchKey, request.isPrefix)
+ }
+}
+
+object WatcherService {
+ def props(etcdClient: EtcdClient)(implicit logging: Logging, actorSystem: ActorSystem): Props = {
+ Props(new WatcherService(etcdClient))
+ }
+}
diff --git a/tests/build.gradle b/tests/build.gradle
index b9100ed..2d82234 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -90,6 +90,7 @@
"includes" : [
"org/apache/openwhisk/common/etcd/**",
"org/apache/openwhisk/core/scheduler/**",
+ "org/apache/openwhisk/core/service/**",
]
],
"REQUIRE_MULTI_RUNTIME" : [
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/service/WatcherServiceTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/service/WatcherServiceTests.scala
new file mode 100644
index 0000000..e015fec
--- /dev/null
+++ b/tests/src/test/scala/org/apache/openwhisk/core/service/WatcherServiceTests.scala
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.service
+
+import java.{lang, util}
+import java.util.concurrent.Executor
+
+import akka.actor.ActorSystem
+import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
+import akka.util.Timeout
+import com.google.protobuf.ByteString
+import com.ibm.etcd.api.{Event, KeyValue, ResponseHeader}
+import com.ibm.etcd.api.Event.EventType
+import com.ibm.etcd.client.kv.KvClient.Watch
+import com.ibm.etcd.client.kv.WatchUpdate
+import com.ibm.etcd.client.{EtcdClient => Client}
+import common.StreamLogging
+import org.apache.openwhisk.core.entity.SchedulerInstanceId
+import org.apache.openwhisk.core.etcd.EtcdClient
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
+
+import scala.concurrent.ExecutionContextExecutor
+import scala.concurrent.duration._
+
+@RunWith(classOf[JUnitRunner])
+class WatcherServiceTests
+ extends TestKit(ActorSystem("WatcherService"))
+ with ImplicitSender
+ with FlatSpecLike
+ with ScalaFutures
+ with Matchers
+ with MockFactory
+ with BeforeAndAfterAll
+ with StreamLogging {
+
+ implicit val timeout: Timeout = Timeout(5.seconds)
+ implicit val ece: ExecutionContextExecutor = system.dispatcher
+
+ private val watchName = "test-watcher-service"
+
+ val schedulerId = SchedulerInstanceId("scheduler0")
+
+ val client: Client = {
+ val hostAndPorts = "172.17.0.1:2379"
+ Client.forEndpoints(hostAndPorts).withPlainText().build()
+ }
+
+ val watch = new Watch {
+ override def close(): Unit = {}
+
+ override def addListener(listener: Runnable, executor: Executor): Unit = {}
+
+ override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+
+ override def isCancelled: Boolean = true
+
+ override def isDone: Boolean = true
+
+ override def get(): lang.Boolean = true
+
+ override def get(timeout: Long, unit: TimeUnit): lang.Boolean = true
+ }
+
+ private def watchEtcd(etcdClient: EtcdClient): Unit = {
+ (etcdClient
+ .watchAllKeys(_: WatchUpdate => Unit, _: Throwable => Unit, _: () => Unit))
+ .expects(*, *, *)
+ .returning(watch)
+ }
+
+ behavior of "WatcherService"
+
+ it should "watch a endpoint" in {
+ val etcdClient = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+
+ watchEtcd(etcdClient)
+
+ val service = TestActorRef(new WatcherService(etcdClient))
+ service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
+ service.underlyingActor.deleteWatchers.size shouldBe 1
+
+ service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(PutEvent))
+ service.underlyingActor.putWatchers.size shouldBe 1
+
+ service ! WatchEndpoint(key, value, isPrefix = false, watchName + 1, Set(DeleteEvent, PutEvent))
+ service.underlyingActor.deleteWatchers.size shouldBe 2
+ service.underlyingActor.putWatchers.size shouldBe 2
+
+ service ! WatchEndpoint(key, value, isPrefix = true, watchName, Set(DeleteEvent))
+ service.underlyingActor.prefixDeleteWatchers.size shouldBe 1
+
+ service ! WatchEndpoint(key, value, isPrefix = true, watchName, Set(PutEvent))
+ service.underlyingActor.prefixPutWatchers.size shouldBe 1
+
+ service ! WatchEndpoint(key, value, isPrefix = true, watchName + 1, Set(DeleteEvent, PutEvent))
+ service.underlyingActor.prefixDeleteWatchers.size shouldBe 2
+ service.underlyingActor.prefixPutWatchers.size shouldBe 2
+ }
+
+ it should "close the watcher upon UnWatchEndpoint event" in {
+ val etcdClient = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+
+ watchEtcd(etcdClient)
+
+ val service = TestActorRef(new WatcherService(etcdClient))
+
+ service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
+ service.underlyingActor.deleteWatchers.size shouldBe 1
+
+ service ! WatchEndpoint(key, value, isPrefix = false, watchName + "1", Set(DeleteEvent))
+ service.underlyingActor.deleteWatchers.size shouldBe 2
+
+ service ! UnwatchEndpoint(key, isPrefix = false, watchName)
+ service.underlyingActor.deleteWatchers.size shouldBe 1
+
+ service ! UnwatchEndpoint(key, isPrefix = false, watchName + "1")
+ service.underlyingActor.deleteWatchers.size shouldBe 0
+ }
+
+ it should "notify the recipient if a deletion or put event occurs" in {
+ val etcdClient = new MockWatchClient(client)(ece)
+ val key = "testKey"
+ val value = "testValue"
+
+ val probe = TestProbe()
+ val service = TestActorRef(new WatcherService(etcdClient))
+ val request = WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent, PutEvent))
+
+ probe.send(service, request)
+
+ service.underlyingActor.deleteWatchers.size shouldBe 1
+ service.underlyingActor.putWatchers.size shouldBe 1
+
+ etcdClient.onNext should not be null
+
+ etcdClient.publishEvents(EventType.DELETE, key, value)
+ probe.expectMsg(WatchEndpointRemoved(request.key, key, value, request.isPrefix))
+
+ etcdClient.publishEvents(EventType.PUT, key, value)
+ probe.expectMsg(WatchEndpointInserted(request.key, key, value, request.isPrefix))
+
+ service ! UnwatchEndpoint(key, false, watchName)
+
+ val request2 = WatchEndpoint("test", "", isPrefix = true, watchName, Set(DeleteEvent, PutEvent))
+ probe.send(service, request2)
+
+ etcdClient.publishEvents(EventType.DELETE, key, value)
+ probe.expectMsg(WatchEndpointRemoved(request2.key, key, value, request2.isPrefix))
+
+ etcdClient.publishEvents(EventType.PUT, key, value)
+ probe.expectMsg(WatchEndpointInserted(request2.key, key, value, request2.isPrefix))
+ }
+
+ it should "not notify the recipient if event type mismatched" in {
+ val etcdClient = new MockWatchClient(client)(ece)
+ val key = "testKey"
+ val value = "testValue"
+
+ val probe = TestProbe()
+ val service = TestActorRef(new WatcherService(etcdClient))
+ val request = WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
+
+ probe.send(service, request)
+
+ service.underlyingActor.deleteWatchers.size shouldBe 1
+
+ etcdClient.onNext should not be null
+
+ etcdClient.publishEvents(EventType.PUT, key, value)
+ probe.expectNoMessage()
+ service ! UnwatchEndpoint(key, false, watchName) // close the watcher for delete event
+
+ val request2 = WatchEndpoint(key, value, isPrefix = false, watchName, Set(PutEvent))
+
+ probe.send(service, request2)
+
+ service.underlyingActor.putWatchers.size shouldBe 1
+
+ etcdClient.onNext should not be null
+
+ etcdClient.publishEvents(EventType.DELETE, key, value)
+ probe.expectNoMessage()
+ }
+
+ it should "not register a watch request if there is already registered one" in {
+ val etcdClient = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+ val numOfTries = 3
+
+ watchEtcd(etcdClient)
+
+ val service = TestActorRef(new WatcherService(etcdClient))
+
+ (1 to numOfTries).foreach { _ =>
+ service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
+ }
+
+ service.underlyingActor.deleteWatchers.size shouldBe 1
+ }
+
+ it should "register a watch request if there is already registered one but with different watch name" in {
+ val etcdClient = mock[EtcdClient]
+
+ val key = "testKey"
+ val value = "testValue"
+ val numOfTries = 3
+
+ watchEtcd(etcdClient)
+
+ val service = TestActorRef(new WatcherService(etcdClient))
+
+ (1 to numOfTries).foreach { index =>
+ service ! WatchEndpoint(key, value, isPrefix = false, watchName + index, Set(DeleteEvent))
+ }
+
+ service.underlyingActor.deleteWatchers.size shouldBe 3
+ }
+
+}
+
+class mockWatchUpdate extends WatchUpdate {
+ private val eventLists: util.List[Event] = new util.ArrayList[Event]()
+ override def getHeader: ResponseHeader = ???
+
+ def addEvents(event: Event): WatchUpdate = {
+ eventLists.add(event)
+ this
+ }
+
+ override def getEvents: util.List[Event] = eventLists
+}
+
+class MockWatchClient(client: Client)(ece: ExecutionContextExecutor) extends EtcdClient(client)(ece) {
+ var onNext: WatchUpdate => Unit = null
+
+ override def watchAllKeys(next: WatchUpdate => Unit, error: Throwable => Unit, completed: () => Unit): Watch = {
+ onNext = next
+ new Watch {
+ override def close(): Unit = {}
+
+ override def addListener(listener: Runnable, executor: Executor): Unit = {}
+
+ override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
+
+ override def isCancelled: Boolean = true
+
+ override def isDone: Boolean = true
+
+ override def get(): lang.Boolean = true
+
+ override def get(timeout: Long, unit: TimeUnit): lang.Boolean = true
+ }
+ }
+
+ def publishEvents(eventType: EventType, key: String, value: String): Unit = {
+ val eType = eventType match {
+ case EventType.PUT => EventType.PUT
+ case EventType.DELETE => EventType.DELETE
+ case EventType.UNRECOGNIZED => EventType.UNRECOGNIZED
+ }
+ val event = Event
+ .newBuilder()
+ .setType(eType)
+ .setPrevKv(
+ KeyValue
+ .newBuilder()
+ .setKey(ByteString.copyFromUtf8(key))
+ .setValue(ByteString.copyFromUtf8(value))
+ .build())
+ .setKv(
+ KeyValue
+ .newBuilder()
+ .setKey(ByteString.copyFromUtf8(key))
+ .setValue(ByteString.copyFromUtf8(value))
+ .build())
+ .build()
+ onNext(new mockWatchUpdate().addEvents(event))
+ }
+}