blob: 90074ca49b9fb133585e18fbe0fc1037ba3fc539 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.database
import java.io.File
import java.nio.file.Paths
import java.time.Instant
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import akka.testkit.TestKit
import common.StreamLogging
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.{Logging, TransactionId, WhiskInstants}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size.SizeInt
import pureconfig._
import pureconfig.generic.auto._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.io.Source
@RunWith(classOf[JUnitRunner])
class ArtifactWithFileStorageActivationStoreTests()
extends TestKit(ActorSystem("ArtifactWithFileStorageActivationStoreTests"))
with FlatSpecLike
with Matchers
with BeforeAndAfterAll
with ScalaFutures
with StreamLogging
with WhiskInstants {
implicit val transid: TransactionId = TransactionId.testing
implicit val notifier: Option[CacheChangeNotification] = None
private val uuid = UUID()
private val subject = Subject()
private val user =
Identity(subject, Namespace(EntityName(subject.asString), uuid), BasicAuthenticationAuthKey(uuid, Secret()))
private val context = UserContext(user, HttpRequest())
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
super.afterAll()
}
private def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
def responsePermutations = {
val message = JsObject("result key" -> JsString("result value"))
Seq(
ActivationResponse.success(None),
ActivationResponse.success(Some(message)),
ActivationResponse.applicationError(message),
ActivationResponse.whiskError(message))
}
def logPermutations = {
Seq(
ActivationLogs(),
ActivationLogs(Vector("2018-03-05T02:10:38.196689520Z stdout: single log line")),
ActivationLogs(
Vector(
"2018-03-05T02:10:38.196689522Z stdout: first log line of multiple lines",
"2018-03-05T02:10:38.196754258Z stdout: second log line of multiple lines")))
}
def expectedFileContent(activation: WhiskActivation,
includeResult: Boolean,
additionalFieldsForLogs: Seq[JsField] = Seq(),
additionalFieldsForActivation: Seq[JsField] = Seq()) = {
val expectedLogs = activation.logs.logs.map { log =>
{
JsObject(
Seq(
"type" -> "user_log".toJson,
"message" -> log.toJson,
"activationId" -> activation.activationId.toJson,
"namespace" -> activation.namespace.asString.toJson,
"namespaceId" -> user.namespace.uuid.toJson)
++ additionalFieldsForLogs: _*)
}
}
val expectedResult = if (includeResult) {
JsString(activation.response.result.getOrElse(JsNull).compactPrint)
} else {
JsString(s"Activation record '${activation.activationId}' for entity '${activation.name}'")
}
val expectedActivation = JsObject(
Seq(
"type" -> "activation_record".toJson,
"duration" -> activation.duration.toJson,
"name" -> activation.name.toJson,
"subject" -> activation.subject.toJson,
"waitTime" -> activation.annotations.get("waitTime").toJson.toJson,
"activationId" -> activation.activationId.toJson,
"namespaceId" -> user.namespace.uuid.toJson,
"publish" -> activation.publish.toJson,
"version" -> activation.version.toJson,
"response" -> activation.response.withoutResult.toExtendedJson,
"end" -> activation.end.toEpochMilli.toJson,
"message" -> expectedResult,
"kind" -> activation.annotations.get("kind").toJson.toJson,
"start" -> activation.start.toEpochMilli.toJson,
"limits" -> activation.annotations.get("limits").toJson.toJson,
"initTime" -> activation.annotations.get("initTime").toJson,
"namespace" -> activation.namespace.toJson)
++ additionalFieldsForActivation: _*)
expectedLogs ++ Seq(expectedActivation)
}
it should "store activations in artifact store and to file without result" in {
val config = ArtifactWithFileStorageActivationStoreConfig("userlogs", "logs", "namespaceId", false)
val activationStore = new ArtifactWithFileStorageActivationStore(system, logging, config)
val logDir = new File(new File(".").getCanonicalPath, config.logPath)
try {
logDir.mkdir
val activations = responsePermutations.map { response =>
logPermutations.map { logs =>
val activation = WhiskActivation(
namespace = EntityPath(subject.asString),
name = EntityName("name"),
subject = subject,
activationId = ActivationId.generate(),
start = Instant.now.inMills,
end = Instant.now.inMills,
response = response,
logs = logs,
duration = Some(101L),
annotations = Parameters("kind", "nodejs:14") ++ Parameters(
"limits",
ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB), LogLimit(10.MB)).toJson) ++
Parameters("waitTime", 16.toJson) ++
Parameters("initTime", 44.toJson))
val docInfo = await(activationStore.store(activation, context))
val fullyQualifiedActivationId = ActivationId(docInfo.id.asString)
await(activationStore.get(fullyQualifiedActivationId, context)) shouldBe activation
await(activationStore.delete(fullyQualifiedActivationId, context))
activation
}
}.flatten
Source
.fromFile(activationStore.getLogFile.toFile.getAbsoluteFile)
.getLines
.toList
.map(_.parseJson)
.toJson
.convertTo[JsArray] shouldBe activations
.map(a => expectedFileContent(a, false))
.flatten
.toJson
.convertTo[JsArray]
} finally {
activationStore.getLogFile.toFile.getAbsoluteFile.delete
logDir.delete
}
}
it should "store activations in artifact store and to file with result" in {
val config = ArtifactWithFileStorageActivationStoreConfig("userlogs", "logs", "namespaceId", true)
val activationStore = new ArtifactWithFileStorageActivationStore(system, logging, config)
val logDir = new File(new File(".").getCanonicalPath, config.logPath)
try {
logDir.mkdir
val activations = responsePermutations.map { response =>
logPermutations.map { logs =>
val activation = WhiskActivation(
namespace = EntityPath(subject.asString),
name = EntityName("name"),
subject = subject,
activationId = ActivationId.generate(),
start = Instant.now.inMills,
end = Instant.now.inMills,
response = response,
logs = logs,
duration = Some(101L),
annotations = Parameters("kind", "nodejs:14") ++ Parameters(
"limits",
ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB), LogLimit(10.MB)).toJson) ++
Parameters("waitTime", 16.toJson) ++
Parameters("initTime", 44.toJson))
val docInfo = await(activationStore.store(activation, context))
val fullyQualifiedActivationId = ActivationId(docInfo.id.asString)
await(activationStore.get(fullyQualifiedActivationId, context)) shouldBe activation
await(activationStore.delete(fullyQualifiedActivationId, context))
activation
}
}.flatten
Source
.fromFile(activationStore.getLogFile.toFile.getAbsoluteFile)
.getLines
.toList
.map(_.parseJson)
.toJson
.convertTo[JsArray] shouldBe activations
.map(a => expectedFileContent(a, true))
.flatten
.toJson
.convertTo[JsArray]
} finally {
activationStore.getLogFile.toFile.getAbsoluteFile.delete
logDir.delete
}
}
for (includeResult <- Seq(false, true)) {
it should "test activationToFileExtended: store activations in artifact store and in file " +
(if (includeResult) "with" else "without") + " result" in {
// used in ArtifactWithFileStorageActivationStoreExtended and for test data
val additionalFieldsForLogs = Map("field1" -> JsString("value1"))
val additionalFieldsForActivation = Map("field2" -> JsString("value2"))
// START - example of a simple ArtifactActivationStore implementation that uses activationToFileExtended
case class ArtifactWithFileStorageActivationStoreConfigExtendedTest(logFilePrefix: String,
logPath: String,
userIdField: String,
writeResultToFile: Boolean)
class ArtifactWithFileStorageActivationStoreExtendedTest(
actorSystem: ActorSystem,
logging: Logging,
config: ArtifactWithFileStorageActivationStoreConfigExtendedTest =
loadConfigOrThrow[ArtifactWithFileStorageActivationStoreConfigExtendedTest](
ConfigKeys.activationStoreWithFileStorage))
extends ArtifactActivationStore(actorSystem, logging) {
private val activationFileStorage =
new ActivationFileStorage(
config.logFilePrefix,
Paths.get(config.logPath),
config.writeResultToFile,
actorSystem,
logging)
def getLogFile = activationFileStorage.getLogFile
def shallResultBeIncluded: Boolean = includeResult
// other simple example for the flag: (includeResult && !activation.response.isSuccess)
override def store(activation: WhiskActivation, context: UserContext)(
implicit
transid: TransactionId,
notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
val additionalFields = Map(config.userIdField -> context.user.namespace.uuid.toJson)
activationFileStorage.activationToFileExtended(
activation,
context,
additionalFields ++ additionalFieldsForLogs ++ Map("namespace" -> JsString(subject.asString)),
additionalFields ++ additionalFieldsForActivation,
shallResultBeIncluded)
super.store(activation, context)
}
}
// END - example of a simple ArtifactActivationStore implementation that uses activationToFileExtended
// writeResultToFile is defined with the inverted value of includeResult and should be overridden by the test
val config =
ArtifactWithFileStorageActivationStoreConfigExtendedTest("userlogs", "logs", "namespaceId", !includeResult)
val activationStore =
new ArtifactWithFileStorageActivationStoreExtendedTest(system, logging, config)
val logDir = new File(new File(".").getCanonicalPath, config.logPath)
try {
logDir.mkdir
val activations = responsePermutations.flatMap { response =>
logPermutations.map { logs =>
val activation = WhiskActivation(
namespace = EntityPath(subject.asString),
name = EntityName("name"),
subject = subject,
activationId = ActivationId.generate(),
start = Instant.now.inMills,
end = Instant.now.inMills,
response = response,
logs = logs,
duration = Some(101L),
annotations = Parameters("kind", "nodejs:14") ++ Parameters(
"limits",
ActionLimits(TimeLimit(60.second), MemoryLimit(256.MB), LogLimit(10.MB)).toJson) ++
Parameters("waitTime", 16.toJson) ++
Parameters("initTime", 44.toJson))
val docInfo = await(activationStore.store(activation, context))
val fullyQualifiedActivationId = ActivationId(docInfo.id.asString)
await(activationStore.get(fullyQualifiedActivationId, context)) shouldBe activation
await(activationStore.delete(fullyQualifiedActivationId, context))
activation
}
}
Source
.fromFile(activationStore.getLogFile.toFile.getAbsoluteFile)
.getLines
.toList
.map(_.parseJson)
.toJson
.convertTo[JsArray] shouldBe activations
.flatMap(a =>
expectedFileContent(a, includeResult, additionalFieldsForLogs.toSeq, additionalFieldsForActivation.toSeq))
.toJson
.convertTo[JsArray]
} finally {
activationStore.getLogFile.toFile.getAbsoluteFile.delete
logDir.delete
}
}
}
}