blob: c3c34cbadeb61f5e48660098eb0bd56d2962f1fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.logging
import spray.json._
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Accept
import akka.stream.scaladsl.Flow
import akka.testkit.TestKit
import akka.http.scaladsl.model.HttpMethods.POST
import common.StreamLogging
import org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Success, Try}
@RunWith(classOf[JUnitRunner])
class ElasticSearchRestClientTests
extends TestKit(ActorSystem("ElasticSearchRestClient"))
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with StreamLogging {
implicit val ec: ExecutionContext = system.dispatcher
private val defaultResponseSource =
"""{"stream":"stdout","activationId":"197d60b33137424ebd60b33137d24ea3","action":"guest/someAction","@version":"1","@timestamp":"2018-03-27T15:48:09.112Z","type":"user_logs","tenant":"19bc46b1-71f6-4ed5-8c54-816aa4f8c502","message":"namespace : user@email.com\n","time_date":"2018-03-27T15:48:08.716152793Z"}"""
private val defaultResponse =
s"""{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1375,"max_score":1.0,"hits":[{"_index":"whisk_user_logs","_type":"user_logs","_id":"AWJoJSwAMGbzgxiD1jr9","_score":1.0,"_source":$defaultResponseSource}]}}"""
private val defaultHttpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, defaultResponse))
private val defaultHttpRequest = HttpRequest(
POST,
headers = List(Accept(MediaTypes.`application/json`)),
entity = HttpEntity(ContentTypes.`application/json`, EsQuery(EsQueryAll()).toJson.toString))
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
super.afterAll()
}
private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
Flow[(HttpRequest, Promise[HttpResponse])]
.mapAsyncUnordered(1) {
case (request, userContext) =>
request shouldBe httpRequest
Future.successful((Success(httpResponse), userContext))
}
private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
behavior of "ElasticSearch Rest Client"
it should "construct a query with must" in {
val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
val queryMust = EsQueryMust(queryTerms)
EsQuery(queryMust).toJson shouldBe JsObject(
"query" ->
JsObject(
"bool" ->
JsObject(
"must" ->
JsArray(
JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))))),
"from" -> 0.toJson)
// Test must with ranges
Seq((EsRangeGte, "gte"), (EsRangeGt, "gt"), (EsRangeLte, "lte"), (EsRangeLt, "lt")).foreach {
case (rangeArg, rangeValue) =>
val queryRange1 = EsQueryRange("someKey1", rangeArg, "someValue1")
val queryRange2 = EsQueryRange("someKey2", rangeArg, "someValue2")
val queryTerms = Vector(EsQueryBoolMatch("someKey1", "someValue1"), EsQueryBoolMatch("someKey2", "someValue2"))
val queryMust = EsQueryMust(queryTerms, Vector(queryRange1, queryRange2))
EsQuery(queryMust).toJson shouldBe JsObject(
"query" ->
JsObject(
"bool" ->
JsObject(
"must" ->
JsArray(
JsObject("match" -> JsObject("someKey1" -> JsString("someValue1"))),
JsObject("match" -> JsObject("someKey2" -> JsString("someValue2")))),
"filter" ->
JsArray(
JsObject("range" -> JsObject("someKey1" -> JsObject(rangeValue -> "someValue1".toJson))),
JsObject("range" -> JsObject("someKey2" -> JsObject(rangeValue -> "someValue2".toJson)))))),
"from" -> 0.toJson)
}
}
it should "construct a query with aggregations" in {
Seq((EsAggMax, "max"), (EsAggMin, "min")).foreach {
case (aggArg, aggValue) =>
val queryAgg = EsQueryAggs("someAgg", aggArg, "someField")
EsQuery(EsQueryAll(), aggs = Some(queryAgg)).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"aggs" -> JsObject("someAgg" -> JsObject(aggValue -> JsObject("field" -> "someField".toJson))),
"from" -> 0.toJson)
}
}
it should "construct a query with match" in {
val queryMatch = EsQueryMatch("someField", "someValue")
EsQuery(queryMatch).toJson shouldBe JsObject(
"query" -> JsObject("match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson))),
"from" -> 0.toJson)
// Test match with types
Seq((EsMatchPhrase, "phrase"), (EsMatchPhrasePrefix, "phrase_prefix")).foreach {
case (typeArg, typeValue) =>
val queryMatch = EsQueryMatch("someField", "someValue", Some(typeArg))
EsQuery(queryMatch).toJson shouldBe JsObject(
"query" -> JsObject(
"match" -> JsObject("someField" -> JsObject("query" -> "someValue".toJson, "type" -> typeValue.toJson))),
"from" -> 0.toJson)
}
}
it should "construct a query with term" in {
val queryTerm = EsQueryTerm("user", "someUser")
EsQuery(queryTerm).toJson shouldBe JsObject(
"query" -> JsObject("term" -> JsObject("user" -> JsString("someUser"))),
"from" -> 0.toJson)
}
it should "construct a query with query string" in {
val queryString = EsQueryString("_type: someType")
EsQuery(queryString).toJson shouldBe JsObject(
"query" -> JsObject("query_string" -> JsObject("query" -> JsString("_type: someType"))),
"from" -> 0.toJson)
}
it should "construct a query with order" in {
Seq((EsOrderAsc, "asc"), (EsOrderDesc, "desc")).foreach {
case (orderArg, orderValue) =>
val queryOrder = EsQueryOrder("someField", orderArg)
EsQuery(EsQueryAll(), Some(queryOrder)).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"sort" -> JsArray(JsObject("someField" -> JsObject("order" -> orderValue.toJson))),
"from" -> 0.toJson)
}
}
it should "construct query with size" in {
EsQuery(EsQueryAll(), size = Some(1)).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"size" -> 1.toJson,
"from" -> 0.toJson)
}
it should "construct query with from" in {
EsQuery(EsQueryAll(), from = 1).toJson shouldBe JsObject(
"query" -> JsObject("match_all" -> JsObject.empty),
"from" -> 1.toJson)
}
it should "error when search response does not match expected type" in {
val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpRequest = defaultHttpRequest)))
a[RuntimeException] should be thrownBy await(esClient.search[JsObject]("/"))
}
it should "parse search response into EsSearchResult" in {
val esClient =
new ElasticSearchRestClient("https", "host", 443, Some(testFlow(defaultHttpResponse, defaultHttpRequest)))
val response = await(esClient.search[EsSearchResult]("/"))
response shouldBe 'right
response.right.get.hits.hits should have size 1
response.right.get.hits.total shouldBe 1375
response.right.get.hits.hits(0).source shouldBe defaultResponseSource.parseJson.asJsObject
}
it should "return status code when HTTP error occurs" in {
val httpResponse = HttpResponse(StatusCodes.InternalServerError)
val esClient = new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, defaultHttpRequest)))
val response = await(esClient.search[JsObject]("/"))
response shouldBe 'left
response.left.get shouldBe StatusCodes.InternalServerError
}
it should "perform info request" in {
val responseBody = s"""{"cluster_name" : "elasticsearch"}"""
val httpRequest = HttpRequest(headers = List(Accept(MediaTypes.`application/json`)))
val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
val esClient =
new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
val response = await(esClient.info())
response shouldBe 'right
response.right.get shouldBe responseBody.parseJson.asJsObject
}
it should "perform index request" in {
val responseBody = s"""{"some_index" : {}}"""
val httpRequest = HttpRequest(uri = Uri("some_index"), headers = List(Accept(MediaTypes.`application/json`)))
val httpResponse = HttpResponse(StatusCodes.OK, entity = HttpEntity(ContentTypes.`application/json`, responseBody))
val esClient =
new ElasticSearchRestClient("https", "host", 443, Some(testFlow(httpResponse, httpRequest)))
val response = await(esClient.index("some_index"))
response shouldBe 'right
response.right.get shouldBe responseBody.parseJson.asJsObject
}
}