package org.apache.openwhisk.core.containerpool.logging
import spray.json._
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import akka.NotUsed
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Accept
import akka.testkit.TestKit
import akka.http.scaladsl.model.HttpMethods.POST
import common.StreamLogging
import org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Success, Try}
class ElasticSearchRestClientTests
extends TestKit(ActorSystem("ElasticSearchRestClient"))
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with StreamLogging {
implicit val ec: ExecutionContext = system.dispatcher
private val defaultResponseSource =
"""{"stream":"stdout","activationId":"197d60b33137424ebd60b33137d24ea3","action":"guest/someAction","@version":"1","@timestamp":"2018-03-27T15:48:09.112Z","type":"user_logs","tenant":"19bc46b1-71f6-4ed5-8c54-816aa4f8c502","message":"namespace :\n","time_date":"2018-03-27T15:48:08.716152793Z"}"""
private val defaultResponse =
private val defaultHttpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, defaultResponse))
private val defaultHttpRequest = HttpRequest(
headers = List(Accept(MediaTypes.`application/json`)),
entity = HttpEntity(ContentTypes.`application/json`, EsQuery(EsQueryAll()).toJson.toString))
override def afterAll(): Unit = {
private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
Flow[(HttpRequest, Promise[HttpResponse])]
.mapAsyncUnordered(1) {
case (request, userContext) =>
request shouldBe httpRequest
Future.successful((Success(httpResponse), userContext))
private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
behavior of "ElasticSearch Rest Client"
it should "construct a query with must" in {
val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
val queryMust = EsQueryMust(queryTerms)
EsQuery(queryMust).toJson shouldBe JsObject(
"query" ->
"bool" ->
"must" ->
JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))))),
"from" -> 0.toJson)
// Test must with ranges
Seq((EsRangeGte, "gte"), (EsRangeGt, "gt"), (EsRangeLte, "lte"), (EsRangeLt, "lt")).foreach {
case (rangeArg, rangeValue) =>
val queryRange1 = EsQueryRange("someKey1", rangeArg, "someValue1")
val queryRange2 = EsQueryRange("someKey2", rangeArg, "someValue2")
val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
val queryMust = EsQueryMust(queryTerms, Vector(queryRange1, queryRange2))
EsQuery(queryMust).toJson shouldBe JsObject(
"query" ->
"bool" ->
"must" ->
JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))),
"filter" ->
JsObject("range" -> JsObject("someKey1" -> JsObject(rangeValue -> "someValue1".toJson))),
JsObject("range" -> JsObject("someKey2" -> JsObject(rangeValue -> "someValue2".toJson)))))),
"from" -> 0.toJson)
it should "construct a query with aggregations" in {
Seq((EsAggMax, "max"), (EsAggMin, "min")).foreach {
case (aggArg, aggValue) =>
val queryAgg = EsQueryAggs("someAgg", aggArg, "someField")
EsQuery(EsQueryAll(), aggs = Some(queryAgg)).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"aggs" -> JsObject("someAgg" -> JsObject(aggValue -> JsObject("field" -> "someField".toJson))),
"from" -> 0.toJson)
it should "construct a query with match" in {
val queryMatch = EsQueryMatch("someField", "someValue")
EsQuery(queryMatch).toJson shouldBe JsObject(
"query" -> JsObject("match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson))),
"from" -> 0.toJson)
// Test match with types
Seq((EsMatchPhrase, "phrase"), (EsMatchPhrasePrefix, "phrase_prefix")).foreach {
case (typeArg, typeValue) =>
val queryMatch = EsQueryMatch("someField", "someValue", Some(typeArg))
EsQuery(queryMatch).toJson shouldBe JsObject(
"query" -> JsObject(
"match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson, "type" -> typeValue.toJson))),
"from" -> 0.toJson)
it should "construct a query with term" in {
val queryTerm = EsQueryTerm("user", "someUser")
EsQuery(queryTerm).toJson shouldBe JsObject(
"query" -> JsObject("term" -> JsObject("user" -> JsString("someUser"))),
"from" -> 0.toJson)
it should "construct a query with query string" in {
val queryString = EsQueryString("_type: someType")
EsQuery(queryString).toJson shouldBe JsObject(
"query" -> JsObject("query_string" -> JsObject("query" -> JsString("_type: someType"))),
"from" -> 0.toJson)
it should "construct a query with order" in {
Seq((EsOrderAsc, "asc"), (EsOrderDesc, "desc")).foreach {
case (orderArg, orderValue) =>
val queryOrder = EsQueryOrder("someField", orderArg)
EsQuery(EsQueryAll(), Some(queryOrder)).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"sort" -> JsArray(JsObject("someField" -> JsObject("order" -> orderValue.toJson))),
"from" -> 0.toJson)
it should "construct query with size" in {
EsQuery(EsQueryAll(), size = Some(1)).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"size" -> 1.toJson,
"from" -> 0.toJson)
it should "construct query with from" in {
EsQuery(EsQueryAll(), from = 1).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"from" -> 1.toJson)
it should "error when search response does not match expected type" in {
val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpRequest = defaultHttpRequest)))
a[RuntimeException] should be thrownBy await([JsObject]("/"))
it should "parse search response into EsSearchResult" in {
val esClient =
new ElasticSearchRestClient("https", "host", 443, Some(testFlow(defaultHttpResponse, defaultHttpRequest)))
val response = await([EsSearchResult]("/"))
response shouldBe 'right
response.right.get.hits.hits should have size 1 shouldBe 1375
response.right.get.hits.hits(0).source shouldBe defaultResponseSource.parseJson.asJsObject
it should "return status code when HTTP error occurs" in {
val httpResponse = HttpResponse(StatusCodes.InternalServerError)
val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, defaultHttpRequest)))
val response = await([JsObject]("/"))
response shouldBe 'left
response.left.get shouldBe StatusCodes.InternalServerError
it should "perform info request" in {
val responseBody = s"""{"cluster_name" : "elasticsearch"}"""
val httpRequest = HttpRequest(headers = List(Accept(MediaTypes.`application/json`)))
val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
val esClient =
new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
val response = await(
response shouldBe 'right
response.right.get shouldBe responseBody.parseJson.asJsObject
it should "perform index request" in {
val responseBody = s"""{"some_index" : {}}"""
val httpRequest = HttpRequest(uri = Uri("some_index"), headers = List(Accept(MediaTypes.`application/json`)))
val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
val esClient =
new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
val response = await(esClient.index("some_index"))
response shouldBe 'right
response.right.get shouldBe responseBody.parseJson.asJsObject