| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.openwhisk.http |
| |
| import akka.actor.ActorSystem |
| import akka.event.Logging |
| import akka.http.scaladsl.{Http, HttpConnectionContext} |
| import akka.http.scaladsl.model.{HttpRequest, _} |
| import akka.http.scaladsl.server.RouteResult.Rejected |
| import akka.http.scaladsl.server._ |
| import akka.http.scaladsl.server.directives._ |
| import akka.stream.ActorMaterializer |
| import kamon.metric.MeasurementUnit |
| import spray.json._ |
| import org.apache.openwhisk.common.Https.HttpsConfig |
| import org.apache.openwhisk.common._ |
| |
| import scala.collection.immutable.Seq |
| import scala.concurrent.duration.DurationInt |
| import scala.concurrent.{Await, Future} |
| |
| /** |
| * This trait extends the Akka Directives and Actor with logging and transaction counting |
| * facilities common to all OpenWhisk REST services. |
| */ |
| trait BasicHttpService extends Directives { |
| |
| val OW_EXTRA_LOGGING_HEADER = "X-OW-EXTRA-LOGGING" |
| |
| /** |
| * Gets the routes implemented by the HTTP service. |
| * |
| * @param transid the id for the transaction (every request is assigned an id) |
| */ |
| def routes(implicit transid: TransactionId): Route |
| |
| /** |
| * Gets the log level for a given route. The default is |
| * InfoLevel so override as needed. |
| * |
| * @param route the route to determine the loglevel for |
| * @return a log level for the route |
| */ |
| def loglevelForRoute(route: String): Logging.LogLevel = Logging.InfoLevel |
| |
| /** Prioritized rejections based on relevance. */ |
| val prioritizeRejections = recoverRejections { rejections => |
| val priorityRejection = rejections.find { |
| case rejection: UnacceptedResponseContentTypeRejection => true |
| case rejection: ValidationRejection => true |
| case _ => false |
| } |
| |
| priorityRejection.map(rejection => Rejected(Seq(rejection))).getOrElse(Rejected(rejections)) |
| } |
| |
| /** |
| * Receives a message and runs the router. |
| */ |
| def route: Route = { |
| assignId { implicit transid => |
| respondWithHeader(transid.toHeader) { |
| DebuggingDirectives.logRequest(logRequestInfo _) { |
| DebuggingDirectives.logRequestResult(logResponseInfo _) { |
| BasicDirectives.mapRequest(_.removeHeader(OW_EXTRA_LOGGING_HEADER)) { |
| handleRejections(BasicHttpService.customRejectionHandler) { |
| prioritizeRejections { |
| toStrictEntity(30.seconds) { |
| routes |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** Assigns transaction id to every request. */ |
| protected def assignId = HeaderDirectives.optionalHeaderValueByName(OW_EXTRA_LOGGING_HEADER) flatMap { headerValue => |
| val extraLogging = headerValue match { |
| // extract headers from HTTP request that indicates if additional logging should be enabled for this transaction. |
| // Passing "on" as header content will enable additional logging for this transaction, |
| // passing any other value will leave it as configured in the logging configuration |
| case Some(value) => value.toLowerCase == "on" |
| case None => false |
| } |
| extract { req => |
| val tid = |
| req.request.headers |
| .find(_.is(TransactionId.generatorConfig.lowerCaseHeader)) |
| .map(_.value) |
| .filterNot(_.startsWith(TransactionId.systemPrefix)) |
| .getOrElse { |
| // As this is only a fallback, because the tid should be generated by nginx, this shouldn't be used. |
| TransactionId.generateTid() |
| } |
| |
| TransactionId(tid, extraLogging) |
| } |
| } |
| |
| /** Generates log entry for every request. */ |
| protected def logRequestInfo(req: HttpRequest)(implicit tid: TransactionId): LogEntry = { |
| val m = req.method.name |
| val p = req.uri.path.toString |
| |
| val q: String = { |
| try { |
| req.uri.query().toString |
| } catch { |
| case _: IllegalUriException => s"Bad query parameters:${req.uri.toString()}" |
| case e: Exception => s"Query parsing error: ${e.getMessage}" |
| } |
| } |
| val l = loglevelForRoute(p) |
| LogEntry(s"[$tid] $m $p $q", l) |
| } |
| |
| protected def logResponseInfo(req: HttpRequest)(implicit tid: TransactionId): RouteResult => Option[LogEntry] = { |
| case RouteResult.Complete(res: HttpResponse) => |
| val m = req.method.name |
| val p = req.uri.path.toString |
| val l = loglevelForRoute(p) |
| |
| val name = "BasicHttpService" |
| |
| val token = |
| LogMarkerToken( |
| "http", |
| m.toLowerCase, |
| LoggingMarkers.counter, |
| Some(res.status.intValue.toString), |
| Map("statusCode" -> res.status.intValue.toString))(MeasurementUnit.time.milliseconds) |
| val marker = LogMarker(token, tid.deltaToStart, Some(tid.deltaToStart)) |
| |
| MetricEmitter.emitHistogramMetric(token, tid.deltaToStart) |
| MetricEmitter.emitCounterMetric(token) |
| |
| if (TransactionId.metricsLog) { |
| Some(LogEntry(s"[$tid] [$name] $marker", l)) |
| } else { |
| None |
| } |
| |
| case _ => None // other kind of responses |
| } |
| } |
| |
| object BasicHttpService { |
| |
| /** |
| * Starts an HTTP(S) route handler on given port and registers a shutdown hook. |
| */ |
| def startHttpService(route: Route, port: Int, config: Option[HttpsConfig] = None, interface: String = "0.0.0.0")( |
| implicit actorSystem: ActorSystem, |
| materializer: ActorMaterializer): Unit = { |
| val connectionContext = config.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) |
| val httpBinding = Http().bindAndHandle(route, interface, port, connectionContext = connectionContext) |
| addShutdownHook(httpBinding) |
| } |
| |
| def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem, |
| materializer: ActorMaterializer): Unit = { |
| implicit val executionContext = actorSystem.dispatcher |
| sys.addShutdownHook { |
| Await.result(binding.map(_.unbind()), 30.seconds) |
| Await.result(actorSystem.whenTerminated, 30.seconds) |
| } |
| } |
| |
| /** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */ |
| def customRejectionHandler(implicit transid: TransactionId) = { |
| RejectionHandler.default.mapRejectionResponse { |
| case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) => |
| val error = ErrorResponse(ent.data.utf8String, transid).toJson |
| res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint)) |
| case x => x |
| } |
| } |
| |
| } |