blob: 65033d3db978873d408797bfff6359be237616ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.logging
import java.nio.file.{Path, Paths}
import java.time.Instant
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import akka.http.scaladsl.model._
import org.apache.openwhisk.core.entity.{ActivationId, ActivationLogs, Identity}
import org.apache.openwhisk.core.containerpool.logging.ElasticSearchJsonProtocol._
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.database.UserContext
import scala.concurrent.{Future, Promise}
import scala.util.Try
import spray.json._
import pureconfig._
import pureconfig.generic.auto._
case class ElasticSearchLogFieldConfig(userLogs: String,
message: String,
tenantId: String,
activationId: String,
stream: String,
time: String)
case class ElasticSearchLogStoreConfig(protocol: String,
host: String,
port: Int,
path: String,
logSchema: ElasticSearchLogFieldConfig,
requiredHeaders: Seq[String] = Seq.empty)
/**
* ElasticSearch based implementation of a DockerToActivationFileLogStore. When using the JSON log driver, docker writes
* stdout/stderr to JSON formatted files. Those files can be processed by a backend service asynchronously to store
* user logs in ElasticSearch. This log store allows user logs then to be fetched from ElasticSearch.
*/
class ElasticSearchLogStore(
system: ActorSystem,
httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
destinationDirectory: Path = Paths.get("logs"),
elasticSearchConfig: ElasticSearchLogStoreConfig =
loadConfigOrThrow[ElasticSearchLogStoreConfig](ConfigKeys.logStoreElasticSearch))
extends DockerToActivationFileLogStore(system, destinationDirectory) {
// Schema of resultant logs from ES
case class UserLogEntry(message: String, stream: String, time: String) {
def toFormattedString = s"${time} ${stream}: ${message.stripLineEnd}"
}
object UserLogEntry extends DefaultJsonProtocol {
implicit val serdes =
jsonFormat(
UserLogEntry.apply,
elasticSearchConfig.logSchema.message,
elasticSearchConfig.logSchema.stream,
elasticSearchConfig.logSchema.time)
}
private val esClient = new ElasticSearchRestClient(
elasticSearchConfig.protocol,
elasticSearchConfig.host,
elasticSearchConfig.port,
httpFlow)
private def transcribeLogs(queryResult: EsSearchResult): ActivationLogs =
ActivationLogs(queryResult.hits.hits.map(_.source.convertTo[UserLogEntry].toFormattedString))
private def extractRequiredHeaders(headers: Seq[HttpHeader]) =
headers.filter(h => elasticSearchConfig.requiredHeaders.contains(h.lowercaseName)).toList
private def generatePayload(namespace: String, activationId: ActivationId) = {
val logQuery =
s"_type: ${elasticSearchConfig.logSchema.userLogs} AND ${elasticSearchConfig.logSchema.tenantId}: ${namespace} AND ${elasticSearchConfig.logSchema.activationId}: ${activationId}"
val queryString = EsQueryString(logQuery)
val queryOrder = EsQueryOrder(elasticSearchConfig.logSchema.time, EsOrderAsc)
EsQuery(queryString, Some(queryOrder))
}
private def generatePath(user: Identity) = elasticSearchConfig.path.format(user.namespace.uuid.asString)
override def fetchLogs(namespace: String,
activationId: ActivationId,
start: Option[Instant],
end: Option[Instant],
activationLogs: Option[ActivationLogs],
context: UserContext): Future[ActivationLogs] = {
val headers = extractRequiredHeaders(context.request.headers)
// Return logs from ElasticSearch, or return logs from activation if required headers are not present
if (headers.length == elasticSearchConfig.requiredHeaders.length) {
esClient
.search[EsSearchResult](generatePath(context.user), generatePayload(namespace, activationId), headers)
.flatMap {
case Right(queryResult) =>
Future.successful(transcribeLogs(queryResult))
case Left(code) =>
Future.failed(new RuntimeException(s"Status code '$code' was returned from log store"))
}
} else {
activationLogs match {
case Some(logs) => Future.successful(logs)
case None =>
Future.failed(new RuntimeException(s"Activation logs not available for activation ${activationId}"))
}
}
}
}
object ElasticSearchLogStoreProvider extends LogStoreProvider {
override def instance(actorSystem: ActorSystem): LogStore = new ElasticSearchLogStore(actorSystem)
}