blob: 691acafe17876e0401a2f61bbb7014a282f54b20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.entity
import java.time.Instant
import scala.concurrent.Future
import scala.util.Try
import spray.json._
import spray.json.DefaultJsonProtocol._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.{ArtifactStore, CacheChangeNotification, DocumentFactory, StaleParameter}
/**
* A WhiskActivation provides an abstraction of the meta-data
* for a whisk action activation record.
*
* The WhiskActivation object is used as a helper to adapt objects between
* the schema used by the database and the WhiskAuth abstraction.
*
* @param namespace the namespace for the activation
* @param name the name of the activated entity
* @param subject the subject activating the entity
* @param activationId the activation id
* @param start the start of the activation in epoch millis
* @param end the end of the activation in epoch millis
* @param cause the activation id of the activated entity that causes this activation
* @param response the activation response
* @param logs the activation logs
* @param version the semantic version (usually matches the activated entity)
* @param publish true to share the activation or false otherwise
* @param annotations the set of annotations to attribute to the activation
* @param duration of the activation in milliseconds
* @throws IllegalArgumentException if any required argument is undefined
*/
@throws[IllegalArgumentException]
case class WhiskActivation(namespace: EntityPath,
override val name: EntityName,
subject: Subject,
activationId: ActivationId,
start: Instant,
end: Instant,
cause: Option[ActivationId] = None,
response: ActivationResponse = ActivationResponse.success(),
logs: ActivationLogs = ActivationLogs(),
version: SemVer = SemVer(),
publish: Boolean = false,
annotations: Parameters = Parameters(),
duration: Option[Long] = None)
extends WhiskEntity(EntityName(activationId.asString), "activation") {
require(cause != null, "cause undefined")
require(start != null, "start undefined")
require(end != null, "end undefined")
require(response != null, "response undefined")
def toJson = WhiskActivation.serdes.write(this).asJsObject
/**
* This the activation summary as computed by the database view.
* Strictly used in view testing to enforce alignment.
*/
override def summaryAsJson = {
import WhiskActivation.instantSerdes
def actionOrNot() = {
if (end != Instant.EPOCH) {
Map(
"end" -> end.toJson,
"duration" -> (duration getOrElse (end.toEpochMilli - start.toEpochMilli)).toJson,
"statusCode" -> response.statusCode.toJson)
} else Map.empty
}
JsObject(
super.summaryAsJson.fields - "updated" +
("activationId" -> activationId.toJson) +
("start" -> start.toJson) ++
cause.map(("cause" -> _.toJson)) ++
actionOrNot())
}
def resultAsJson = response.result.toJson.asJsObject
def toExtendedJson(removeFields: Seq[String] = Seq.empty, addFields: Map[String, JsValue] = Map.empty) = {
val JsObject(baseFields) = WhiskActivation.serdes.write(this).asJsObject
val newFields = (baseFields - "response") + ("response" -> response.toExtendedJson) -- removeFields ++ addFields
if (end != Instant.EPOCH) {
val durationValue = (duration getOrElse (end.toEpochMilli - start.toEpochMilli)).toJson
JsObject(newFields + ("duration" -> durationValue))
} else {
JsObject(newFields - "end")
}
}
def metadata =
copy(response = response.withoutResult, annotations = Parameters(), logs = ActivationLogs())
.revision[WhiskActivation](rev)
def withoutResult =
copy(response = response.withoutResult)
.revision[WhiskActivation](rev)
def withoutLogsOrResult =
copy(response = response.withoutResult, logs = ActivationLogs()).revision[WhiskActivation](rev)
def withoutLogs = copy(logs = ActivationLogs()).revision[WhiskActivation](rev)
def withLogs(logs: ActivationLogs) = copy(logs = logs).revision[WhiskActivation](rev)
def isTimedoutActivation = annotations.getAs[Boolean](WhiskActivation.timeoutAnnotation).getOrElse(false)
}
object WhiskActivation
extends DocumentFactory[WhiskActivation]
with WhiskEntityQueries[WhiskActivation]
with DefaultJsonProtocol {
/** Some field names for annotations */
val pathAnnotation = "path"
val bindingAnnotation = "binding"
val kindAnnotation = "kind"
val limitsAnnotation = "limits"
val topmostAnnotation = "topmost"
val causedByAnnotation = "causedBy"
val initTimeAnnotation = "initTime"
val waitTimeAnnotation = "waitTime"
val conductorAnnotation = "conductor"
val timeoutAnnotation = "timeout"
val memory = "memory"
val duration = "duration"
val statusCode = "statusCode"
/** Some field names for compositions */
val actionField = "action"
val paramsField = "params"
val stateField = "state"
val valueField = "value"
protected[entity] implicit val instantSerdes: RootJsonFormat[Instant] = new RootJsonFormat[Instant] {
def write(t: Instant) = t.toEpochMilli.toJson
def read(value: JsValue) =
Try {
value match {
case JsString(t) => Instant.parse(t)
case JsNumber(i) => Instant.ofEpochMilli(i.bigDecimal.longValue)
case _ => deserializationError("timestamp malformed")
}
} getOrElse deserializationError("timestamp malformed")
}
override val collectionName = "activations"
/** The main view for activations, keyed by namespace, sorted by date. */
override lazy val view = WhiskQueries.view(WhiskQueries.dbConfig.activationsDdoc, collectionName)
/**
* A view for activations in a namespace additionally keyed by action name
* (and package name if present) sorted by date.
*/
lazy val filtersView = WhiskQueries.view(WhiskQueries.dbConfig.activationsFilterDdoc, collectionName)
override implicit val serdes = jsonFormat13(WhiskActivation.apply)
// Caching activations doesn't make much sense in the common case as usually,
// an activation is only asked for once.
override val cacheEnabled = false
/**
* Queries datastore for activation records which have an entity name matching the
* given parameter.
*
* @return list of records as JSON object if docs parameter is false, as Left
* and a list of the WhiskActivations if including the full record, as Right
*/
def listActivationsMatchingName(db: ArtifactStore[WhiskActivation],
namespace: EntityPath,
path: EntityPath,
skip: Int,
limit: Int,
includeDocs: Boolean = false,
since: Option[Instant] = None,
upto: Option[Instant] = None,
stale: StaleParameter = StaleParameter.No)(
implicit transid: TransactionId): Future[Either[List[JsObject], List[WhiskActivation]]] = {
import WhiskQueries.TOP
val convert = if (includeDocs) Some((o: JsObject) => Try { serdes.read(o) }) else None
val startKey = List(namespace.addPath(path).asString, since map { _.toEpochMilli } getOrElse 0)
val endKey = List(namespace.addPath(path).asString, upto map { _.toEpochMilli } getOrElse TOP, TOP)
query(db, filtersView, startKey, endKey, skip, limit, reduce = false, stale, convert)
}
def put[Wsuper >: WhiskActivation](db: ArtifactStore[Wsuper], doc: WhiskActivation)(
implicit transid: TransactionId,
notifier: Option[CacheChangeNotification]): Future[DocInfo] =
//As activations are not updated we just pass None for the old document
super.put(db, doc, None)
}