blob: e70467a67ab9e9244a7689aff275d57a2315de0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.docker.test
import java.io.IOException
import java.time.Instant
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import common.TimingHelpers
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.Future
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.BeforeAndAfterEach
import org.scalatest.FlatSpec
import org.apache.openwhisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine}
import org.scalatest.junit.JUnitRunner
import org.scalatest.Matchers
import common.{StreamLogging, WskActorSystem}
import spray.json._
import org.apache.openwhisk.common.LoggingMarkers._
import org.apache.openwhisk.common.LogMarker
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.docker._
import org.apache.openwhisk.core.entity.ActivationResponse
import org.apache.openwhisk.core.entity.ActivationResponse.ContainerResponse
import org.apache.openwhisk.core.entity.ActivationResponse.Timeout
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.http.Messages
import DockerContainerTests._
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
object DockerContainerTests {
/** Awaits the given future, throws the exception enclosed in Failure. */
def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout)
/** Creates an interval starting at EPOCH with the given duration. */
def intervalOf(duration: FiniteDuration) = Interval(Instant.EPOCH, Instant.ofEpochMilli(duration.toMillis))
def toRawLog(log: Seq[LogLine], appendSentinel: Boolean = true): ByteString = {
val appendedLog = if (appendSentinel) {
val lastTime = log.lastOption.map { case LogLine(time, _, _) => time }.getOrElse(Instant.EPOCH.toString)
log :+
LogLine(lastTime, "stderr", s"${Container.ACTIVATION_LOG_SENTINEL}\n") :+
LogLine(lastTime, "stdout", s"${Container.ACTIVATION_LOG_SENTINEL}\n")
} else {
log
}
ByteString(appendedLog.map(_.toJson.compactPrint).mkString("", "\n", "\n"))
}
}
/**
* Unit tests for ContainerPool schedule
*/
@RunWith(classOf[JUnitRunner])
class DockerContainerTests
extends FlatSpec
with Matchers
with MockFactory
with StreamLogging
with BeforeAndAfterEach
with WskActorSystem
with TimingHelpers {
override def beforeEach() = {
stream.reset()
}
/** Reads logs into memory and awaits them */
def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 500.milliseconds): Vector[String] =
Await.result(source.via(DockerToActivationLogStore.toFormattedString).runWith(Sink.seq[String]), timeout).toVector
val containerId = ContainerId("id")
/**
* Constructs a testcontainer with overridden IO methods. Results of the override can be provided
* as parameters.
*/
def dockerContainer(id: ContainerId = containerId, addr: ContainerAddress = ContainerAddress("ip"))(
ccRes: Future[RunResult] =
Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))),
awaitLogs: FiniteDuration = 2.seconds)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = {
new DockerContainer(id, addr, true) {
override protected def callContainer(
path: String,
body: JsObject,
timeout: FiniteDuration,
concurrent: Int,
retry: Boolean = false,
reschedule: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = {
ccRes
}
override protected val logCollectingIdleTimeout = awaitLogs
override protected val filePollInterval = 1.millisecond
}
}
behavior of "DockerContainer"
implicit val transid = TransactionId.testing
val parameters = Map(
"--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
"--ulimit" -> Set("nofile=1024:1024"),
"--pids-limit" -> Set("1024"))
/*
* CONTAINER CREATION
*/
it should "create a new instance" in {
implicit val docker = new TestDockerClient
implicit val runc = stub[RuncApi]
val image = "image"
val memory = 128.MB
val cpuShares = 1
val environment = Map("test" -> "hi")
val network = "testwork"
val name = "myContainer"
val container = DockerContainer.create(
transid = transid,
image = Right(ImageName(image)),
memory = memory,
cpuShares = cpuShares,
environment = environment,
network = network,
name = Some(name),
dockerRunParameters = parameters)
await(container)
docker.pulls should have size 0
docker.runs should have size 1
docker.inspects should have size 1
docker.rms should have size 0
val (testImage, args) = docker.runs.head
testImage shouldBe "image"
// Assert fixed values are passed as well
args should contain allOf ("--cap-drop", "NET_RAW", "NET_ADMIN")
args should contain inOrder ("--ulimit", "nofile=1024:1024")
args should contain inOrder ("--pids-limit", "1024") // OW PR 2119
// Assert proper parameter translation
args should contain inOrder ("--memory", s"${memory.toMB}m")
args should contain inOrder ("--memory-swap", s"${memory.toMB}m")
args should contain inOrder ("--cpu-shares", cpuShares.toString)
args should contain inOrder ("--network", network)
args should contain inOrder ("--name", name)
// Assert proper environment passing
args should contain allOf ("-e", "test=hi")
}
it should "pull a user provided image before creating the container" in {
implicit val docker = new TestDockerClient
implicit val runc = stub[RuncApi]
val container =
DockerContainer.create(transid = transid, image = Left(ImageName("image")), dockerRunParameters = parameters)
await(container)
docker.pulls should have size 1
docker.runs should have size 1
docker.inspects should have size 1
docker.rms should have size 0
}
it should "remove the container if inspect fails" in {
implicit val docker = new TestDockerClient {
override def inspectIPAddress(id: ContainerId,
network: String)(implicit transid: TransactionId): Future[ContainerAddress] = {
inspects += ((id, network))
Future.failed(new RuntimeException())
}
}
implicit val runc = stub[RuncApi]
val container =
DockerContainer.create(transid = transid, image = Right(ImageName("image")), dockerRunParameters = parameters)
a[WhiskContainerStartupError] should be thrownBy await(container)
docker.pulls should have size 0
docker.runs should have size 1
docker.inspects should have size 1
docker.rms should have size 1
}
it should "provide a proper error if run fails for blackbox containers" in {
implicit val docker = new TestDockerClient {
override def run(image: String,
args: Seq[String] = Seq.empty[String])(implicit transid: TransactionId): Future[ContainerId] = {
runs += ((image, args))
Future.failed(ProcessUnsuccessfulException(ExitStatus(1), "", ""))
}
}
implicit val runc = stub[RuncApi]
val container =
DockerContainer.create(transid = transid, image = Left(ImageName("image")), dockerRunParameters = parameters)
a[WhiskContainerStartupError] should be thrownBy await(container)
docker.pulls should have size 1
docker.runs should have size 1
docker.inspects should have size 0
docker.rms should have size 0
}
it should "remove the container if run fails with a broken container" in {
implicit val docker = new TestDockerClient {
override def run(image: String,
args: Seq[String] = Seq.empty[String])(implicit transid: TransactionId): Future[ContainerId] = {
runs += ((image, args))
Future.failed(BrokenDockerContainer(containerId, "Broken container"))
}
}
implicit val runc = stub[RuncApi]
val container =
DockerContainer.create(transid = transid, image = Right(ImageName("image")), dockerRunParameters = parameters)
a[WhiskContainerStartupError] should be thrownBy await(container)
docker.pulls should have size 0
docker.runs should have size 1
docker.inspects should have size 0
docker.rms should have size 1
}
it should "provide a proper error if inspect fails for blackbox containers" in {
implicit val docker = new TestDockerClient {
override def inspectIPAddress(id: ContainerId,
network: String)(implicit transid: TransactionId): Future[ContainerAddress] = {
inspects += ((id, network))
Future.failed(new RuntimeException())
}
}
implicit val runc = stub[RuncApi]
val container =
DockerContainer.create(transid = transid, image = Left(ImageName("image")), dockerRunParameters = parameters)
a[WhiskContainerStartupError] should be thrownBy await(container)
docker.pulls should have size 1
docker.runs should have size 1
docker.inspects should have size 1
docker.rms should have size 1
}
it should "return a specific error if pulling a user provided image failed (given the image does not define a tag)" in {
implicit val docker = new TestDockerClient {
override def pull(image: String)(implicit transid: TransactionId): Future[Unit] = {
pulls += image
Future.failed(new RuntimeException())
}
}
implicit val runc = stub[RuncApi]
val imageName = "image"
val container =
DockerContainer.create(transid = transid, image = Left(ImageName(imageName)), dockerRunParameters = parameters)
val exception = the[BlackboxStartupError] thrownBy await(container)
exception.msg shouldBe Messages.imagePullError(imageName)
docker.pulls should have size 1
docker.runs should have size 0 // run is **not** called as a backup measure because no tag is defined
docker.inspects should have size 0
docker.rms should have size 0
}
it should "recover a failed image pull if the subsequent docker run succeeds" in {
implicit val docker = new TestDockerClient {
override def pull(image: String)(implicit transid: TransactionId): Future[Unit] = {
pulls += image
Future.failed(new RuntimeException())
}
}
implicit val runc = stub[RuncApi]
val container =
DockerContainer.create(
transid = transid,
image = Left(ImageName("image", tag = Some("prod"))),
dockerRunParameters = parameters)
noException should be thrownBy await(container)
docker.pulls should have size 1
docker.runs should have size 1 // run is called as a backup measure in case the image is locally available
docker.inspects should have size 1
docker.rms should have size 0
}
it should "throw a pull exception if a recovering docker run fails as well" in {
implicit val docker = new TestDockerClient {
override def pull(image: String)(implicit transid: TransactionId): Future[Unit] = {
pulls += image
Future.failed(new RuntimeException())
}
override def run(image: String, args: Seq[String])(implicit transid: TransactionId): Future[ContainerId] = {
runs += ((image, args))
Future.failed(new RuntimeException())
}
}
implicit val runc = stub[RuncApi]
val imageName = ImageName("image", tag = Some("prod"))
val container =
DockerContainer.create(transid = transid, image = Left(imageName), dockerRunParameters = parameters)
val exception = the[BlackboxStartupError] thrownBy await(container)
exception.msg shouldBe Messages.imagePullError(imageName.resolveImageName())
docker.pulls should have size 1
docker.runs should have size 1 // run is called as a backup measure in case the image is locally available
docker.inspects should have size 0 // inspect is never called because the run failed as well
docker.rms should have size 0
}
/*
* DOCKER COMMANDS
*/
it should "pause and resume container via runc" in {
implicit val docker = new TestDockerClient
implicit val runc = new TestRuncClient
val id = ContainerId("id")
val container = new DockerContainer(id, ContainerAddress("ip"), true)
val suspend = container.suspend()
val resume = container.resume()
await(suspend)
await(resume)
docker.unpauses should have size 0
docker.pauses should have size 0
runc.pauses should have size 1
runc.resumes should have size 1
}
it should "pause and unpause container via docker" in {
implicit val docker = new TestDockerClient
implicit val runc = new TestRuncClient
val id = ContainerId("id")
val container = new DockerContainer(id, ContainerAddress("ip"), false)
val suspend = container.suspend()
val resume = container.resume()
await(suspend)
await(resume)
docker.unpauses should have size 1
docker.pauses should have size 1
runc.pauses should have size 0
runc.resumes should have size 0
}
it should "destroy a container via Docker" in {
implicit val docker = stub[DockerApiWithFileAccess]
implicit val runc = stub[RuncApi]
val id = ContainerId("id")
val container = new DockerContainer(id, ContainerAddress("ip"), true)
container.destroy()
(docker.rm(_: ContainerId)(_: TransactionId)).verify(id, transid)
}
/*
* INITIALIZE
*
* Only tests for quite simple cases. Disambiguation of errors is delegated to ActivationResponse
* and so are the tests for those.
*/
it should "initialize a container" in {
implicit val docker = stub[DockerApiWithFileAccess]
implicit val runc = stub[RuncApi]
val initTimeout = 1.second
val interval = intervalOf(1.millisecond)
val container = dockerContainer() {
Future.successful(RunResult(interval, Right(ContainerResponse(true, "", None))))
}
val initInterval = container.initialize(JsObject.empty, initTimeout, 1)
await(initInterval, initTimeout) shouldBe interval
// assert the starting log is there
val start = LogMarker.parse(logLines.head)
start.token shouldBe INVOKER_ACTIVATION_INIT
// assert the end log is there
val end = LogMarker.parse(logLines.last)
end.token shouldBe INVOKER_ACTIVATION_INIT.asFinish
end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
}
it should "properly deal with a timeout during initialization" in {
implicit val docker = stub[DockerApiWithFileAccess]
implicit val runc = stub[RuncApi]
val initTimeout = 1.second
val interval = intervalOf(initTimeout + 1.nanoseconds)
val container = dockerContainer() {
Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val init = container.initialize(JsObject.empty, initTimeout, 1)
val error = the[InitializationError] thrownBy await(init, initTimeout)
error.interval shouldBe interval
error.response.statusCode shouldBe ActivationResponse.DeveloperError
// assert the finish log is there
val end = LogMarker.parse(logLines.last)
end.token shouldBe INVOKER_ACTIVATION_INIT.asFinish
}
/*
* RUN
*
* Only tests for quite simple cases. Disambiguation of errors is delegated to ActivationResponse
* and so are the tests for those.
*/
it should "run a container" in {
implicit val docker = stub[DockerApiWithFileAccess]
implicit val runc = stub[RuncApi]
val interval = intervalOf(1.millisecond)
val result = JsObject.empty
val container = dockerContainer() {
Future.successful(RunResult(interval, Right(ContainerResponse(true, result.compactPrint, None))))
}
val runResult = container.run(JsObject.empty, JsObject.empty, 1.second, 1)
await(runResult) shouldBe (interval, ActivationResponse.success(Some(result), Some(2)))
// assert the starting log is there
val start = LogMarker.parse(logLines.head)
start.token shouldBe INVOKER_ACTIVATION_RUN
// assert the end log is there
val end = LogMarker.parse(logLines.last)
end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
}
it should "properly deal with a timeout during run" in {
implicit val docker = stub[DockerApiWithFileAccess]
implicit val runc = stub[RuncApi]
val runTimeout = 1.second
val interval = intervalOf(runTimeout + 1.nanoseconds)
val container = dockerContainer() {
Future.successful(RunResult(interval, Left(Timeout(new Throwable()))))
}
val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout, 1)
await(runResult) shouldBe (interval, ActivationResponse.developerError(
Messages.timedoutActivation(runTimeout, false)))
// assert the finish log is there
val end = LogMarker.parse(logLines.last)
end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
}
/*
* LOGS
*/
it should "read a simple log with sentinel" in {
val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = true)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(rawLog)
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer(id = containerId)()
// Read with tight limit to verify that no truncation occurs
val processedLogs = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = true))
docker.rawContainerLogsInvocations should have size 1
val (id, fromPos, pollInterval) = docker.rawContainerLogsInvocations(0)
id shouldBe containerId
fromPos shouldBe 0
pollInterval shouldBe 'defined
processedLogs should have size 1
processedLogs shouldBe Vector(expectedLogEntry.toFormattedString)
}
it should "read a simple log without sentinel" in {
val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = false)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(rawLog)
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer(id = containerId)()
// Read without tight limit so that the full read result is processed
val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = false))
docker.rawContainerLogsInvocations should have size 1
val (id, fromPos, pollInterval) = docker.rawContainerLogsInvocations(0)
id shouldBe containerId
fromPos shouldBe 0
pollInterval should not be 'defined
processedLogs should have size 1
processedLogs shouldBe Vector(expectedLogEntry.toFormattedString)
}
it should "fail log reading if error occurs during file reading" in {
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.failed(new IOException)
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer()()
an[IOException] should be thrownBy awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
docker.rawContainerLogsInvocations should have size 1
val (id, fromPos, _) = docker.rawContainerLogsInvocations(0)
id shouldBe containerId
fromPos shouldBe 0
}
it should "read two consecutive logs with sentinel" in {
val firstLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is the first log.\n")
val secondLogEntry = LogLine(Instant.EPOCH.plusSeconds(1L).toString, "stderr", "This is the second log.\n")
val firstRawLog = toRawLog(Seq(firstLogEntry), appendSentinel = true)
val secondRawLog = toRawLog(Seq(secondLogEntry), appendSentinel = true)
val returnValues = mutable.Queue(firstRawLog, secondRawLog)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(returnValues.dequeue())
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer()()
// Read without tight limit so that the full read result is processed
val processedFirstLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
val processedSecondLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
docker.rawContainerLogsInvocations should have size 2
val (_, fromPos1, _) = docker.rawContainerLogsInvocations(0)
fromPos1 shouldBe 0
val (_, fromPos2, _) = docker.rawContainerLogsInvocations(1)
fromPos2 shouldBe firstRawLog.length // second read should start behind the first line
processedFirstLog should have size 1
processedFirstLog shouldBe Vector(firstLogEntry.toFormattedString)
processedSecondLog should have size 1
processedSecondLog shouldBe Vector(secondLogEntry.toFormattedString)
}
it should "eventually terminate even if no sentinels can be found" in {
val expectedLog = Seq(LogLine(Instant.EPOCH.toString, "stdout", s"This is log entry.\n"))
val rawLog = toRawLog(expectedLog, appendSentinel = false)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
// "Fakes" an infinite source with only 1 entry
Source.tick(0.milliseconds, 10.seconds, rawLog)
}
}
implicit val runc = stub[RuncApi]
val waitForLogs = 100.milliseconds
val container = dockerContainer()(awaitLogs = waitForLogs)
// Read without tight limit so that the full read result is processed
val (interval, processedLog) = durationOf(awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true)))
interval.toMillis should (be >= waitForLogs.toMillis and be < (waitForLogs * 2).toMillis)
docker.rawContainerLogsInvocations should have size 1
processedLog should have size expectedLog.length + 1 //error log should be appended
processedLog.head shouldBe expectedLog.head.toFormattedString
processedLog(1) should include(Messages.logFailure)
}
it should "truncate logs and advance reading position to end of current read" in {
val firstLogFirstEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is the first line in first log.\n")
val firstLogSecondEntry =
LogLine(Instant.EPOCH.plusMillis(1L).toString, "stderr", "This is the second line in first log.\n")
val secondLogFirstEntry =
LogLine(Instant.EPOCH.plusMillis(2L).toString, "stdout", "This is the first line in second log.\n")
val secondLogSecondEntry =
LogLine(Instant.EPOCH.plusMillis(3L).toString, "stdout", "This is the second line in second log.\n")
val secondLogLimit = 4
val thirdLogFirstEntry =
LogLine(Instant.EPOCH.plusMillis(4L).toString, "stdout", "This is the first line in third log.\n")
val firstRawLog = toRawLog(Seq(firstLogFirstEntry, firstLogSecondEntry), appendSentinel = false)
val secondRawLog = toRawLog(Seq(secondLogFirstEntry, secondLogSecondEntry), appendSentinel = false)
val thirdRawLog = toRawLog(Seq(thirdLogFirstEntry), appendSentinel = true)
val returnValues = mutable.Queue(firstRawLog, secondRawLog, thirdRawLog)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(returnValues.dequeue())
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer()()
val processedFirstLog = awaitLogs(container.logs(limit = (firstRawLog.length - 1).bytes, waitForSentinel = true))
val processedSecondLog =
awaitLogs(container.logs(limit = (secondRawLog.length - 1).bytes, waitForSentinel = false))
val processedThirdLog = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
docker.rawContainerLogsInvocations should have size 3
val (_, fromPos1, _) = docker.rawContainerLogsInvocations(0)
fromPos1 shouldBe 0
val (_, fromPos2, _) = docker.rawContainerLogsInvocations(1)
fromPos2 shouldBe firstRawLog.length // second read should start behind full content of first read
val (_, fromPos3, _) = docker.rawContainerLogsInvocations(2)
fromPos3 shouldBe firstRawLog.length + secondRawLog.length // third read should start behind full content of first and second read
processedFirstLog should have size 2
processedFirstLog(0) shouldBe firstLogFirstEntry.toFormattedString
// Allowing just 1 byte less than the JSON structure causes the entire line to drop
processedFirstLog(1) should include(Messages.truncateLogs((firstRawLog.length - 1).bytes))
processedSecondLog should have size 2
processedSecondLog(0) shouldBe secondLogFirstEntry.toFormattedString
processedSecondLog(1) should include(Messages.truncateLogs((secondRawLog.length - 1).bytes))
processedThirdLog should have size 1
processedThirdLog(0) shouldBe thirdLogFirstEntry.toFormattedString
}
it should "not fail if the last log-line is incomplete" in {
val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
// "destroy" the second log entry by dropping some bytes
val rawLog = toRawLog(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = false).dropRight(10)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(rawLog)
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer(id = containerId)()
// Read with tight limit to verify that no truncation occurs
val processedLogs = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = false))
docker.rawContainerLogsInvocations should have size 1
val (id, fromPos, _) = docker.rawContainerLogsInvocations(0)
id shouldBe containerId
fromPos shouldBe 0
processedLogs should have size 2
processedLogs(0) shouldBe expectedLogEntry.toFormattedString
processedLogs(1) should include(Messages.logFailure)
}
it should "include an incomplete warning if sentinels have not been found only if we wait for sentinels" in {
val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
val rawLog = toRawLog(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = false)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(rawLog)
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer(id = containerId)()
// Read with tight limit to verify that no truncation occurs
val processedLogs = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = true))
docker.rawContainerLogsInvocations should have size 1
val (id, fromPos, _) = docker.rawContainerLogsInvocations(0)
id shouldBe containerId
fromPos shouldBe 0
processedLogs should have size 3
processedLogs(0) shouldBe expectedLogEntry.toFormattedString
processedLogs(1) shouldBe expectedLogEntry.toFormattedString
processedLogs(2) should include(Messages.logFailure)
val processedLogsFalse = awaitLogs(container.logs(limit = rawLog.length.bytes, waitForSentinel = false))
processedLogsFalse should have size 2
processedLogsFalse(0) shouldBe expectedLogEntry.toFormattedString
processedLogsFalse(1) shouldBe expectedLogEntry.toFormattedString
}
it should "strip sentinel lines if it waits or doesn't wait for them" in {
val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is a log entry.\n")
val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = true)
implicit val docker = new TestDockerClient {
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(rawLog)
}
}
implicit val runc = stub[RuncApi]
val container = dockerContainer(id = containerId)()
val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = true))
processedLogs should have size 1
processedLogs(0) shouldBe expectedLogEntry.toFormattedString
val processedLogsFalse = awaitLogs(container.logs(limit = 1.MB, waitForSentinel = false))
processedLogsFalse should have size 1
processedLogsFalse(0) shouldBe expectedLogEntry.toFormattedString
}
class TestRuncClient extends RuncApi {
var resumes = mutable.Buffer.empty[ContainerId]
var pauses = mutable.Buffer.empty[ContainerId]
override def resume(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
resumes += id
Future.successful(())
}
override def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
pauses += id
Future.successful(())
}
}
class TestDockerClient extends DockerApiWithFileAccess {
var runs = mutable.Buffer.empty[(String, Seq[String])]
var inspects = mutable.Buffer.empty[(ContainerId, String)]
var pauses = mutable.Buffer.empty[ContainerId]
var unpauses = mutable.Buffer.empty[ContainerId]
var rms = mutable.Buffer.empty[ContainerId]
var pulls = mutable.Buffer.empty[String]
var rawContainerLogsInvocations = mutable.Buffer.empty[(ContainerId, Long, Option[FiniteDuration])]
def clientVersion: String = "mock-test-client"
def run(image: String, args: Seq[String] = Seq.empty[String])(
implicit transid: TransactionId): Future[ContainerId] = {
runs += ((image, args))
Future.successful(ContainerId("testId"))
}
def inspectIPAddress(id: ContainerId, network: String)(
implicit transid: TransactionId): Future[ContainerAddress] = {
inspects += ((id, network))
Future.successful(ContainerAddress("testIp"))
}
def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
pauses += id
Future.successful(())
}
def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
unpauses += id
Future.successful(())
}
def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
rms += id
Future.successful(())
}
def ps(filters: Seq[(String, String)] = Seq.empty, all: Boolean = false)(
implicit transid: TransactionId): Future[Seq[ContainerId]] = ???
def pull(image: String)(implicit transid: TransactionId): Future[Unit] = {
pulls += image
Future.successful(())
}
override def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] = ???
override def rawContainerLogs(containerId: ContainerId,
fromPos: Long,
pollInterval: Option[FiniteDuration]): Source[ByteString, Any] = {
rawContainerLogsInvocations += ((containerId, fromPos, pollInterval))
Source.single(ByteString.empty)
}
}
}