Extend ConsulClient to support catalog and health API
- remove rotten consul code
diff --git a/tests/src/whisk/consul/ConsulClientTests.scala b/tests/src/whisk/consul/ConsulClientTests.scala
index 064991e..fe7ef2e 100644
--- a/tests/src/whisk/consul/ConsulClientTests.scala
+++ b/tests/src/whisk/consul/ConsulClientTests.scala
@@ -17,7 +17,6 @@
package whisk.consul
import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
@@ -31,35 +30,93 @@
import whisk.core.WhiskConfig.consulServer
import common.WskActorSystem
+import akka.stream.ActorMaterializer
+
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.unmarshalling._
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import whisk.common.ConsulService
@RunWith(classOf[JUnitRunner])
class ConsulClientTests extends FlatSpec with ScalaFutures with Matchers with WskActorSystem {
- implicit val testConfig = PatienceConfig(5 seconds)
+ implicit val testConfig = PatienceConfig(5.seconds)
+ implicit val materializer = ActorMaterializer()
- private val config = new WhiskConfig(consulServer)
- private val consul = new ConsulClient(config.consulServer)
+ val config = new WhiskConfig(consulServer)
+ val consul = new ConsulClient(config.consulServer)
- "ConsulClient" should "be able to put, get and delete" in {
+ val consulHost :: consulPort :: Nil = config.consulServer.split(":").toList
+ val consulUri = Uri().withScheme("http").withHost(consulHost).withPort(consulPort.toInt)
+ val checkInterval = 1.second
+
+ /**
+ * Registers a service in consul with a given healthcheck
+ */
+ def registerService(name: String, id: String, checkScript: Option[String] = None) = {
+ val obj = Map(
+ "ID" -> id.toJson,
+ "Name" -> name.toJson
+ ) ++ checkScript.map { script =>
+ "Check" -> JsObject(
+ "Script" -> script.toJson,
+ "Interval" -> s"${checkInterval.toSeconds}s".toJson
+ )
+ }
+
+ Marshal(obj.toJson).to[RequestEntity].flatMap { entity =>
+ val r = Http().singleRequest(
+ HttpRequest(
+ method = HttpMethods.PUT,
+ uri = consulUri.withPath(Uri.Path("/v1/agent/service/register")),
+ entity = entity
+ )
+ )
+ r.flatMap { response =>
+ Unmarshal(response).to[Any]
+ }
+ }
+ }
+
+ /**
+ * Deregisters a service in consul
+ */
+ def deregisterService(id: String) = {
+ val r = Http().singleRequest(
+ HttpRequest(
+ method = HttpMethods.PUT,
+ uri = consulUri.withPath(Uri.Path(s"/v1/agent/service/deregister/$id"))
+ )
+ )
+ r.flatMap { response =>
+ Unmarshal(response).to[Any]
+ }
+ }
+
+ "Consul KV client" should "be able to put, get and delete" in {
val key = "emperor"
val value = "palpatine"
// Create an entry
- noException should be thrownBy consul.put(key, value).futureValue
+ noException should be thrownBy consul.kv.put(key, value).futureValue
// Gets the entry
- consul.get(key).futureValue should equal(value)
+ consul.kv.get(key).futureValue should equal(value)
// Deletes the entry
- noException should be thrownBy consul.del(key).futureValue
+ noException should be thrownBy consul.kv.del(key).futureValue
// Asserts that the entry is gone
- an[Exception] should be thrownBy consul.get(key).futureValue
+ an[Exception] should be thrownBy consul.kv.get(key).futureValue
}
it should "return an Exception if a non-existent key is queried" in {
val key = "no_such_key"
- an[Exception] should be thrownBy consul.get(key).futureValue
+ an[Exception] should be thrownBy consul.kv.get(key).futureValue
}
it should "be able to retrieve many keys recursively and checking against stored values" in {
@@ -71,13 +128,47 @@
// Write all values to the key/value store
values foreach {
- case (key, value) => noException should be thrownBy consul.put(key, value).futureValue
+ case (key, value) => noException should be thrownBy consul.kv.put(key, value).futureValue
}
- consul.getRecurse(prefix).futureValue should equal(values)
+ consul.kv.getRecurse(prefix).futureValue should equal(values)
}
- it should "drop the first part of the key" in {
+ "Consul Catalog client" should "return a list of all services" in {
+ consul.catalog.services().futureValue.size should be > 0
+ }
+
+ "Consul Health client" should "return an empty list of health results" in {
+ val services = consul.health.service("bogus").futureValue
+ services shouldBe List.empty
+ }
+
+ it should "return a list of health results" in {
+ val service = ConsulService("testservice", "testservice_1")
+ registerService(service.name, service.id).futureValue
+
+ val services = consul.health.service(service.name).futureValue
+ services.head shouldBe service
+
+ deregisterService(service.id).futureValue
+ }
+
+ it should "return only the passing service" in {
+ val passing = ConsulService("testservice", "testservice_passing")
+ val failing = ConsulService("testservice", "testservice_failing")
+ registerService(passing.name, passing.id, Some("exit 0")).futureValue
+ registerService(failing.name, failing.id, Some("exit 1")).futureValue
+
+ Thread.sleep(checkInterval.toMillis)
+
+ val services = consul.health.service(passing.name, true).futureValue
+ services.head shouldBe passing
+
+ deregisterService(passing.id).futureValue
+ deregisterService(failing.id).futureValue
+ }
+
+ "ConsulClient helper methods" should "drop the first part of the key" in {
val test = Map(
"nested/k1" -> "v1",
"inner/k2" -> "v2")
diff --git a/tests/src/whisk/consul/ConsulHealthTests.scala b/tests/src/whisk/consul/ConsulHealthTests.scala
new file mode 100644
index 0000000..e4114ac
--- /dev/null
+++ b/tests/src/whisk/consul/ConsulHealthTests.scala
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2015-2016 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.consul
+
+import scala.concurrent.duration.DurationInt
+
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.Matchers
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.time.Span.convertDurationToSpan
+
+import whisk.common.ConsulClient
+import whisk.core.WhiskConfig
+import whisk.core.WhiskConfig.consulServer
+
+import common.WskActorSystem
+import scala.concurrent.Future
+
+@RunWith(classOf[JUnitRunner])
+class ConsulHealthTests extends FlatSpec with ScalaFutures with Matchers with WskActorSystem {
+
+ implicit val testConfig = PatienceConfig(5.seconds)
+
+ private val config = new WhiskConfig(consulServer)
+ private val consul = new ConsulClient(config.consulServer)
+
+ "Consul" should "have all components passing" in {
+ val services = consul.catalog.services().futureValue
+
+ val health = services.map { service => consul.health.service(service) }.toList
+ val healthResults = Future.sequence(health).futureValue
+
+ val filteredHealth = services.map { service => consul.health.service(service, true) }.toList
+ val filteredHealthResults = Future.sequence(filteredHealth).futureValue
+
+ healthResults should contain theSameElementsAs filteredHealthResults
+ }
+
+}