blob: dcdcb9b422e13ff19397c080ef84e388d2df6b51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.logging
import java.time.ZonedDateTime
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpMethods.{GET, POST}
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.{Accept, RawHeader}
import akka.stream.scaladsl.Flow
import akka.testkit.TestKit
import common.StreamLogging
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import pureconfig.error.ConfigReaderException
import spray.json._
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.database.UserContext
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Success, Try}
@RunWith(classOf[JUnitRunner])
class ElasticSearchLogStoreTests
extends TestKit(ActorSystem("ElasticSearchLogStore"))
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with StreamLogging {
implicit val ec: ExecutionContext = system.dispatcher
private val uuid = UUID()
private val tenantId = s"testSpace_${uuid}"
private val user =
Identity(Subject(), Namespace(EntityName(tenantId), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
private val activationId = ActivationId.generate()
private val defaultLogSchema =
ElasticSearchLogFieldConfig("user_logs", "message", "tenantId", "activationId_str", "stream_str", "time_date")
private val defaultConfig =
ElasticSearchLogStoreConfig("https", "host", 443, "/whisk_user_logs/_search", defaultLogSchema)
private val defaultConfigRequiredHeaders =
ElasticSearchLogStoreConfig(
"https",
"host",
443,
"/whisk_user_logs/_search",
defaultLogSchema,
Seq("x-auth-token", "x-auth-project-id"))
private val defaultHttpResponse = HttpResponse(
StatusCodes.OK,
entity = HttpEntity(
ContentTypes.`application/json`,
s"""{"took":799,"timed_out":false,"_shards":{"total":204,"successful":204,"failed":0},"hits":{"total":2,"max_score":null,"hits":[{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196689522Z","accountId":null,"message":"some log stuff\\n","type":"user_logs","event_uuid":"1c00007f-ecb9-4083-8d2e-4d5e2849621f","activationId_str":"$activationId","action_str":"user@email.com/logs","tenantId":"${tenantId}","logmet_cluster":"topic1-elasticsearch_1","@timestamp":"2018-03-05T02:11:37.687Z","@version":"1","stream_str":"stdout","timestamp":"2018-03-05T02:10:39.131Z"},"sort":[1520215897687]},{"_index":"logstash-2018.03.05.02","_type":"user_logs","_id":"14c2a5b7-8cad-4ec0-992e-70fab1996465","_score":null,"_source":{"time_date":"2018-03-05T02:10:38.196754258Z","accountId":null,"message":"more logs\\n","type":"user_logs","event_uuid":"14c2a5b7-8cad-4ec0-992e-70fab1996465","activationId_str":"$activationId","action_str":"user@email.com/logs","tenantId":"tenant","${tenantId}":"topic1-elasticsearch_1","@timestamp":"2018-03-05T02:11:37.701Z","@version":"1","stream_str":"stdout","timestamp":"2018-03-05T02:10:39.131Z"},"sort":[1520215897701]}]}}"""))
private val defaultPayload = JsObject(
"query" -> JsObject("query_string" -> JsObject("query" -> JsString(
s"_type: ${defaultConfig.logSchema.userLogs} AND ${defaultConfig.logSchema.tenantId}: $tenantId AND ${defaultConfig.logSchema.activationId}: $activationId"))),
"sort" -> JsArray(JsObject(defaultConfig.logSchema.time -> JsObject("order" -> JsString("asc")))),
"from" -> JsNumber(0)).compactPrint
private val defaultHttpRequest = HttpRequest(
POST,
Uri(s"/whisk_user_logs/_search"),
List(Accept(MediaTypes.`application/json`)),
HttpEntity(ContentTypes.`application/json`, defaultPayload))
private val defaultLogStoreHttpRequest =
HttpRequest(method = GET, uri = "https://some.url", entity = HttpEntity.Empty)
private val defaultContext = UserContext(user, defaultLogStoreHttpRequest)
private val expectedLogs = ActivationLogs(
Vector("2018-03-05T02:10:38.196689522Z stdout: some log stuff", "2018-03-05T02:10:38.196754258Z stdout: more logs"))
private val activation = WhiskActivation(
namespace = EntityPath(tenantId),
name = EntityName("name"),
Subject(),
activationId = activationId,
start = ZonedDateTime.now.toInstant,
end = ZonedDateTime.now.toInstant,
response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))),
logs = expectedLogs,
annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson))
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
super.afterAll()
}
private def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
Flow[(HttpRequest, Promise[HttpResponse])]
.mapAsyncUnordered(1) {
case (request, userContext) =>
request shouldBe httpRequest
Future.successful((Success(httpResponse), userContext))
}
private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
behavior of "ElasticSearch Log Store"
it should "fail when loading out of box configs since whisk.logstore.elasticsearch does not exist" in {
a[ConfigReaderException[_]] should be thrownBy new ElasticSearchLogStore(system)
}
it should "get user logs from ElasticSearch when there are no required headers needed" in {
val esLogStore =
new ElasticSearchLogStore(
system,
Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfig)
await(
esLogStore.fetchLogs(
activation.withoutLogs.namespace.asString,
activation.withoutLogs.activationId,
None,
None,
Some(activation.withoutLogs.logs),
defaultContext)) shouldBe expectedLogs
}
it should "get logs from supplied activation record when required headers are not present" in {
val esLogStore =
new ElasticSearchLogStore(
system,
Some(testFlow(defaultHttpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfigRequiredHeaders)
await(
esLogStore.fetchLogs(
activation.namespace.asString,
activation.activationId,
None,
None,
Some(activation.logs),
defaultContext)) shouldBe expectedLogs
}
it should "get user logs from ElasticSearch when required headers are needed" in {
val authToken = "token"
val authProjectId = "projectId"
val httpRequest = HttpRequest(
POST,
Uri(s"/whisk_user_logs/_search"),
List(
Accept(MediaTypes.`application/json`),
RawHeader("x-auth-token", authToken),
RawHeader("x-auth-project-id", authProjectId)),
HttpEntity(ContentTypes.`application/json`, defaultPayload))
val esLogStore =
new ElasticSearchLogStore(
system,
Some(testFlow(defaultHttpResponse, httpRequest)),
elasticSearchConfig = defaultConfigRequiredHeaders)
val requiredHeadersHttpRequest = HttpRequest(
uri = "https://some.url",
headers = List(RawHeader("x-auth-token", authToken), RawHeader("x-auth-project-id", authProjectId)),
entity = HttpEntity.Empty)
val context = UserContext(user, requiredHeadersHttpRequest)
await(
esLogStore.fetchLogs(
activation.withoutLogs.namespace.asString,
activation.withoutLogs.activationId,
None,
None,
Some(activation.withoutLogs.logs),
context)) shouldBe expectedLogs
}
it should "dynamically replace $UUID in request path" in {
val dynamicPathConfig =
ElasticSearchLogStoreConfig("https", "host", 443, "/elasticsearch/logstash-%s*/_search", defaultLogSchema)
val httpRequest = HttpRequest(
POST,
Uri(s"/elasticsearch/logstash-${user.namespace.uuid.asString}*/_search"),
List(Accept(MediaTypes.`application/json`)),
HttpEntity(ContentTypes.`application/json`, defaultPayload))
val esLogStore = new ElasticSearchLogStore(
system,
Some(testFlow(defaultHttpResponse, httpRequest)),
elasticSearchConfig = dynamicPathConfig)
await(
esLogStore.fetchLogs(
activation.withoutLogs.namespace.asString,
activation.withoutLogs.activationId,
None,
None,
Some(activation.withoutLogs.logs),
defaultContext)) shouldBe expectedLogs
}
it should "fail to connect to invalid host" in {
val esLogStore = new ElasticSearchLogStore(system, elasticSearchConfig = defaultConfig)
a[Throwable] should be thrownBy await(
esLogStore.fetchLogs(
activation.namespace.asString,
activation.activationId,
None,
None,
Some(activation.logs),
defaultContext))
}
it should "forward errors from ElasticSearch" in {
val httpResponse = HttpResponse(StatusCodes.InternalServerError)
val esLogStore =
new ElasticSearchLogStore(
system,
Some(testFlow(httpResponse, defaultHttpRequest)),
elasticSearchConfig = defaultConfig)
a[RuntimeException] should be thrownBy await(
esLogStore.fetchLogs(
activation.namespace.asString,
activation.activationId,
None,
None,
Some(activation.logs),
defaultContext))
}
it should "error when configuration protocol is invalid" in {
val invalidHostConfig =
ElasticSearchLogStoreConfig("protocol", "host", 443, "/whisk_user_logs", defaultLogSchema, Seq.empty)
a[IllegalArgumentException] should be thrownBy new ElasticSearchLogStore(
system,
elasticSearchConfig = invalidHostConfig)
}
}