blob: d3727c50c7fc2254e0b0ea70debba7a0e3336f6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.entity
import scala.util.Try
import akka.http.scaladsl.model.StatusCodes.OK
import spray.json._
import spray.json.DefaultJsonProtocol
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.http.Messages._
protected[core] case class ActivationResponse private (statusCode: Int,
result: Option[JsValue],
size: Option[Int] = None) {
def toJsonObject = ActivationResponse.serdes.write(this).asJsObject
// Used when presenting to end-users, to hide the statusCode (which is an implementation detail),
// and to provide a convenience boolean "success" field.
def toExtendedJson: JsObject = {
val baseFields = this.toJsonObject.fields
JsObject(
(baseFields - "statusCode") ++ Seq(
"success" -> JsBoolean(this.isSuccess),
"status" -> JsString(ActivationResponse.messageForCode(statusCode))))
}
def isSuccess = statusCode == ActivationResponse.Success
def isApplicationError = statusCode == ActivationResponse.ApplicationError
def isContainerError = statusCode == ActivationResponse.DeveloperError
def isWhiskError = statusCode == ActivationResponse.WhiskError
def withoutResult = ActivationResponse(statusCode, None)
override def toString = toJsonObject.compactPrint
}
protected[core] object ActivationResponse extends DefaultJsonProtocol {
/* The field name that is universally recognized as the marker of an error, from the application or otherwise. */
val ERROR_FIELD: String = "error"
// These constants need to be synchronized with messageForCode() method below
val Success = 0 // action ran successfully and produced a result
val ApplicationError = 1 // action ran but there was an error and it was handled
val DeveloperError = 2 // action ran but failed to handle an error, or action did not run and failed to initialize
val WhiskError = 3 // internal system error
val statusSuccess = "success"
val statusApplicationError = "application_error"
val statusDeveloperError = "action_developer_error"
val statusWhiskError = "whisk_internal_error"
protected[core] def statusForCode(code: Int) = {
require(code >= 0 && code <= 3)
code match {
case Success => statusSuccess
case ApplicationError => statusApplicationError
case DeveloperError => statusDeveloperError
case WhiskError => statusWhiskError
}
}
protected[core] def messageForCode(code: Int) = {
require(code >= 0 && code <= 3)
code match {
case Success => "success"
case ApplicationError => "application error"
case DeveloperError => "action developer error"
case WhiskError => "whisk internal error"
}
}
private def error(code: Int, errorValue: JsValue, size: Option[Int]) = {
require(code == ApplicationError || code == DeveloperError || code == WhiskError)
ActivationResponse(code, Some(JsObject(ERROR_FIELD -> errorValue)), size)
}
protected[core] def success(result: Option[JsValue] = None, size: Option[Int] = None) =
ActivationResponse(Success, result, size)
protected[core] def applicationError(errorValue: JsValue, size: Option[Int] = None) =
error(ApplicationError, errorValue, size)
protected[core] def applicationError(errorMsg: String) =
error(ApplicationError, JsString(errorMsg), None)
protected[core] def developerError(errorValue: JsValue, size: Option[Int]) =
error(DeveloperError, errorValue, size)
protected[core] def developerError(errorMsg: String, size: Option[Int] = None) =
error(DeveloperError, JsString(errorMsg), size)
protected[core] def whiskError(errorValue: JsValue) = error(WhiskError, errorValue, None)
protected[core] def whiskError(errorMsg: String) =
error(WhiskError, JsString(errorMsg), None)
/**
* Returns an ActivationResponse that is used as a placeholder for payload
* Used as a feed for starting a sequence.
* NOTE: the code is application error (since this response could be used as a response for the sequence
* if the payload contains an error)
*/
protected[core] def payloadPlaceholder(payload: Option[JsObject]) = ActivationResponse(ApplicationError, payload)
/**
* Class of errors for invoker-container communication.
*/
protected[core] sealed trait ContainerConnectionError
protected[core] sealed trait ContainerHttpError extends ContainerConnectionError
protected[core] case class ConnectionError(t: Throwable) extends ContainerHttpError
protected[core] case class NoResponseReceived() extends ContainerHttpError
protected[core] case class Timeout(t: Throwable) extends ContainerHttpError
protected[core] case class MemoryExhausted() extends ContainerConnectionError
/**
* @param statusCode the container HTTP response code (e.g., 200 OK)
* @param entity the entity response as string
* @param truncated either None to indicate complete entity or Some(actual length, max allowed)
*/
protected[core] case class ContainerResponse(statusCode: Int,
entity: String,
truncated: Option[(ByteSize, ByteSize)]) {
/** true iff status code is OK (HTTP 200 status code), anything else is considered an error. **/
val okStatus = statusCode == OK.intValue
val ok = okStatus && truncated.isEmpty
override def toString = {
val base = if (okStatus) "ok" else "not ok"
val rest = truncated.map(e => s", truncated ${e.toString}").getOrElse("")
base + rest
}
}
protected[core] object ContainerResponse {
def apply(okStatus: Boolean, entity: String, truncated: Option[(ByteSize, ByteSize)] = None): ContainerResponse = {
ContainerResponse(if (okStatus) OK.intValue else 500, entity, truncated)
}
}
/**
* Interprets response from container after initialization. This method is only called when the initialization failed.
*
* @param response an either a container error or container response (HTTP Status Code, HTTP response bytes as String)
* @return appropriate ActivationResponse representing initialization error
*/
protected[core] def processInitResponseContent(response: Either[ContainerConnectionError, ContainerResponse],
logger: Logging): ActivationResponse = {
require(
response.isLeft || !response.right.exists(_.ok),
s"should not interpret init response when status code is OK")
response match {
case Right(ContainerResponse(code, str, truncated)) =>
val sizeOpt = Option(str).map(_.length)
truncated match {
case None =>
Try { str.parseJson.asJsObject } match {
case scala.util.Success(result @ JsObject(fields)) =>
// If the response is a JSON object container an error field, accept it as the response error.
val errorOpt = fields.get(ERROR_FIELD)
val errorContent = errorOpt getOrElse invalidInitResponse(str).toJson
developerError(errorContent, sizeOpt)
case _ =>
developerError(invalidInitResponse(str), sizeOpt)
}
case Some((length, maxlength)) =>
developerError(truncatedResponse(str, length, maxlength), Some(length.toBytes.toInt))
}
case Left(_: MemoryExhausted) =>
developerError(memoryExhausted)
case Left(e) =>
// This indicates a terminal failure in the container (it exited prematurely).
developerError(abnormalInitialization)
}
}
/**
* Interprets response from container after running the action. This method is only called when the initialization succeeded.
*
* @param response an Option (HTTP Status Code, HTTP response bytes as String)
* @return appropriate ActivationResponse representing run result
*/
protected[core] def processRunResponseContent(response: Either[ContainerConnectionError, ContainerResponse],
logger: Logging): ActivationResponse = {
response match {
case Right(res @ ContainerResponse(_, str, truncated)) =>
truncated match {
case None =>
val sizeOpt = Option(str).map(_.length)
Try { str.parseJson.asJsObject } match {
case scala.util.Success(result @ JsObject(fields)) =>
// If the response is a JSON object container an error field, accept it as the response error.
val errorOpt = fields.get(ERROR_FIELD)
if (res.okStatus) {
errorOpt map { error =>
applicationError(error, sizeOpt)
} getOrElse {
// The happy path.
success(Some(result), sizeOpt)
}
} else {
// Any non-200 code is treated as a container failure. We still need to check whether
// there was a useful error message in there.
val errorContent = errorOpt getOrElse invalidRunResponse(str).toJson
developerError(errorContent, sizeOpt)
}
case scala.util.Success(notAnObj) =>
// This should affect only blackbox containers, since our own containers should already test for that.
developerError(invalidRunResponse(str), sizeOpt)
case scala.util.Failure(t) =>
// This should affect only blackbox containers, since our own containers should already test for that.
logger.warn(this, s"response did not json parse: '$str' led to $t")
developerError(invalidRunResponse(str), sizeOpt)
}
case Some((length, maxlength)) =>
applicationError(JsString(truncatedResponse(str, length, maxlength)), Some(length.toBytes.toInt))
}
case Left(_: MemoryExhausted) =>
developerError(memoryExhausted)
case Left(e) =>
// This indicates a terminal failure in the container (it exited prematurely).
developerError(abnormalRun)
}
}
protected[core] implicit val serdes = jsonFormat3(ActivationResponse.apply)
}