blob: e015fec41f8773f5efbab708c13bb09039b1ccd7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.service
import java.{lang, util}
import java.util.concurrent.Executor
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestActorRef, TestKit, TestProbe}
import akka.util.Timeout
import com.google.protobuf.ByteString
import com.ibm.etcd.api.{Event, KeyValue, ResponseHeader}
import com.ibm.etcd.api.Event.EventType
import com.ibm.etcd.client.kv.KvClient.Watch
import com.ibm.etcd.client.kv.WatchUpdate
import com.ibm.etcd.client.{EtcdClient => Client}
import common.StreamLogging
import org.apache.openwhisk.core.entity.SchedulerInstanceId
import org.apache.openwhisk.core.etcd.EtcdClient
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
@RunWith(classOf[JUnitRunner])
class WatcherServiceTests
extends TestKit(ActorSystem("WatcherService"))
with ImplicitSender
with FlatSpecLike
with ScalaFutures
with Matchers
with MockFactory
with BeforeAndAfterAll
with StreamLogging {
implicit val timeout: Timeout = Timeout(5.seconds)
implicit val ece: ExecutionContextExecutor = system.dispatcher
private val watchName = "test-watcher-service"
val schedulerId = SchedulerInstanceId("scheduler0")
val client: Client = {
val hostAndPorts = "172.17.0.1:2379"
Client.forEndpoints(hostAndPorts).withPlainText().build()
}
val watch = new Watch {
override def close(): Unit = {}
override def addListener(listener: Runnable, executor: Executor): Unit = {}
override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
override def isCancelled: Boolean = true
override def isDone: Boolean = true
override def get(): lang.Boolean = true
override def get(timeout: Long, unit: TimeUnit): lang.Boolean = true
}
private def watchEtcd(etcdClient: EtcdClient): Unit = {
(etcdClient
.watchAllKeys(_: WatchUpdate => Unit, _: Throwable => Unit, _: () => Unit))
.expects(*, *, *)
.returning(watch)
}
behavior of "WatcherService"
it should "watch a endpoint" in {
val etcdClient = mock[EtcdClient]
val key = "testKey"
val value = "testValue"
watchEtcd(etcdClient)
val service = TestActorRef(new WatcherService(etcdClient))
service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
service.underlyingActor.deleteWatchers.size shouldBe 1
service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(PutEvent))
service.underlyingActor.putWatchers.size shouldBe 1
service ! WatchEndpoint(key, value, isPrefix = false, watchName + 1, Set(DeleteEvent, PutEvent))
service.underlyingActor.deleteWatchers.size shouldBe 2
service.underlyingActor.putWatchers.size shouldBe 2
service ! WatchEndpoint(key, value, isPrefix = true, watchName, Set(DeleteEvent))
service.underlyingActor.prefixDeleteWatchers.size shouldBe 1
service ! WatchEndpoint(key, value, isPrefix = true, watchName, Set(PutEvent))
service.underlyingActor.prefixPutWatchers.size shouldBe 1
service ! WatchEndpoint(key, value, isPrefix = true, watchName + 1, Set(DeleteEvent, PutEvent))
service.underlyingActor.prefixDeleteWatchers.size shouldBe 2
service.underlyingActor.prefixPutWatchers.size shouldBe 2
}
it should "close the watcher upon UnWatchEndpoint event" in {
val etcdClient = mock[EtcdClient]
val key = "testKey"
val value = "testValue"
watchEtcd(etcdClient)
val service = TestActorRef(new WatcherService(etcdClient))
service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
service.underlyingActor.deleteWatchers.size shouldBe 1
service ! WatchEndpoint(key, value, isPrefix = false, watchName + "1", Set(DeleteEvent))
service.underlyingActor.deleteWatchers.size shouldBe 2
service ! UnwatchEndpoint(key, isPrefix = false, watchName)
service.underlyingActor.deleteWatchers.size shouldBe 1
service ! UnwatchEndpoint(key, isPrefix = false, watchName + "1")
service.underlyingActor.deleteWatchers.size shouldBe 0
}
it should "notify the recipient if a deletion or put event occurs" in {
val etcdClient = new MockWatchClient(client)(ece)
val key = "testKey"
val value = "testValue"
val probe = TestProbe()
val service = TestActorRef(new WatcherService(etcdClient))
val request = WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent, PutEvent))
probe.send(service, request)
service.underlyingActor.deleteWatchers.size shouldBe 1
service.underlyingActor.putWatchers.size shouldBe 1
etcdClient.onNext should not be null
etcdClient.publishEvents(EventType.DELETE, key, value)
probe.expectMsg(WatchEndpointRemoved(request.key, key, value, request.isPrefix))
etcdClient.publishEvents(EventType.PUT, key, value)
probe.expectMsg(WatchEndpointInserted(request.key, key, value, request.isPrefix))
service ! UnwatchEndpoint(key, false, watchName)
val request2 = WatchEndpoint("test", "", isPrefix = true, watchName, Set(DeleteEvent, PutEvent))
probe.send(service, request2)
etcdClient.publishEvents(EventType.DELETE, key, value)
probe.expectMsg(WatchEndpointRemoved(request2.key, key, value, request2.isPrefix))
etcdClient.publishEvents(EventType.PUT, key, value)
probe.expectMsg(WatchEndpointInserted(request2.key, key, value, request2.isPrefix))
}
it should "not notify the recipient if event type mismatched" in {
val etcdClient = new MockWatchClient(client)(ece)
val key = "testKey"
val value = "testValue"
val probe = TestProbe()
val service = TestActorRef(new WatcherService(etcdClient))
val request = WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
probe.send(service, request)
service.underlyingActor.deleteWatchers.size shouldBe 1
etcdClient.onNext should not be null
etcdClient.publishEvents(EventType.PUT, key, value)
probe.expectNoMessage()
service ! UnwatchEndpoint(key, false, watchName) // close the watcher for delete event
val request2 = WatchEndpoint(key, value, isPrefix = false, watchName, Set(PutEvent))
probe.send(service, request2)
service.underlyingActor.putWatchers.size shouldBe 1
etcdClient.onNext should not be null
etcdClient.publishEvents(EventType.DELETE, key, value)
probe.expectNoMessage()
}
it should "not register a watch request if there is already registered one" in {
val etcdClient = mock[EtcdClient]
val key = "testKey"
val value = "testValue"
val numOfTries = 3
watchEtcd(etcdClient)
val service = TestActorRef(new WatcherService(etcdClient))
(1 to numOfTries).foreach { _ =>
service ! WatchEndpoint(key, value, isPrefix = false, watchName, Set(DeleteEvent))
}
service.underlyingActor.deleteWatchers.size shouldBe 1
}
it should "register a watch request if there is already registered one but with different watch name" in {
val etcdClient = mock[EtcdClient]
val key = "testKey"
val value = "testValue"
val numOfTries = 3
watchEtcd(etcdClient)
val service = TestActorRef(new WatcherService(etcdClient))
(1 to numOfTries).foreach { index =>
service ! WatchEndpoint(key, value, isPrefix = false, watchName + index, Set(DeleteEvent))
}
service.underlyingActor.deleteWatchers.size shouldBe 3
}
}
class mockWatchUpdate extends WatchUpdate {
private val eventLists: util.List[Event] = new util.ArrayList[Event]()
override def getHeader: ResponseHeader = ???
def addEvents(event: Event): WatchUpdate = {
eventLists.add(event)
this
}
override def getEvents: util.List[Event] = eventLists
}
class MockWatchClient(client: Client)(ece: ExecutionContextExecutor) extends EtcdClient(client)(ece) {
var onNext: WatchUpdate => Unit = null
override def watchAllKeys(next: WatchUpdate => Unit, error: Throwable => Unit, completed: () => Unit): Watch = {
onNext = next
new Watch {
override def close(): Unit = {}
override def addListener(listener: Runnable, executor: Executor): Unit = {}
override def cancel(mayInterruptIfRunning: Boolean): Boolean = true
override def isCancelled: Boolean = true
override def isDone: Boolean = true
override def get(): lang.Boolean = true
override def get(timeout: Long, unit: TimeUnit): lang.Boolean = true
}
}
def publishEvents(eventType: EventType, key: String, value: String): Unit = {
val eType = eventType match {
case EventType.PUT => EventType.PUT
case EventType.DELETE => EventType.DELETE
case EventType.UNRECOGNIZED => EventType.UNRECOGNIZED
}
val event = Event
.newBuilder()
.setType(eType)
.setPrevKv(
KeyValue
.newBuilder()
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8(value))
.build())
.setKv(
KeyValue
.newBuilder()
.setKey(ByteString.copyFromUtf8(key))
.setValue(ByteString.copyFromUtf8(value))
.build())
.build()
onNext(new mockWatchUpdate().addEvents(event))
}
}