blob: c27a298e8798a174e2e8c98c1605bf9d10434677 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.mesos.test
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import akka.testkit.TestKit
import akka.testkit.TestProbe
import com.adobe.api.platform.runtime.mesos.Bridge
import com.adobe.api.platform.runtime.mesos.CommandDef
import com.adobe.api.platform.runtime.mesos.Constraint
import com.adobe.api.platform.runtime.mesos.DeleteTask
import com.adobe.api.platform.runtime.mesos.LIKE
import com.adobe.api.platform.runtime.mesos.Running
import com.adobe.api.platform.runtime.mesos.SubmitTask
import com.adobe.api.platform.runtime.mesos.Subscribe
import com.adobe.api.platform.runtime.mesos.SubscribeComplete
import com.adobe.api.platform.runtime.mesos.TaskDef
import com.adobe.api.platform.runtime.mesos.UNLIKE
import com.adobe.api.platform.runtime.mesos.User
import common.StreamLogging
import org.apache.mesos.v1.Protos.TaskID
import org.apache.mesos.v1.Protos.TaskState
import org.apache.mesos.v1.Protos.TaskStatus
import org.junit.runner.RunWith
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers}
import org.scalatest.junit.JUnitRunner
import scala.collection.immutable.Map
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.containerpool.ContainerArgsConfig
import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
import org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStore
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.mesos.MesosConfig
import org.apache.openwhisk.core.mesos.MesosContainerFactory
import org.apache.openwhisk.core.mesos.MesosTimeoutConfig
import org.apache.openwhisk.utils.retry
import scala.collection.JavaConverters._
@RunWith(classOf[JUnitRunner])
class MesosContainerFactoryTest
extends TestKit(ActorSystem("MesosActorSystem"))
with FlatSpecLike
with Matchers
with StreamLogging
with BeforeAndAfterEach
with BeforeAndAfterAll {
/** Awaits the given future, throws the exception enclosed in Failure. */
def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout)
implicit val wskConfig =
new WhiskConfig(Map(wskApiHostname -> "apihost") ++ wskApiHost)
var count = 0
var lastTaskId: String = null
def testTaskId() = {
count += 1
lastTaskId = "testTaskId" + count
lastTaskId
}
// 80 slots, each 265MB
val poolConfig = ContainerPoolConfig(21200.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.seconds)
val actionMemory = 265.MB
val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0
val containerArgsConfig =
new ContainerArgsConfig(
"net1",
Seq("dns1", "dns2"),
Seq.empty,
Seq.empty,
Seq.empty,
Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4")))
private var factory: MesosContainerFactory = _
override def beforeEach() = {
stream.reset()
}
override protected def afterEach(): Unit = {
super.afterEach()
Option(factory).foreach(_.close())
}
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system, verifySystemShutdown = true)
retry({
val threadNames = Thread.getAllStackTraces.asScala.keySet.map(_.getName)
withClue(s"Threads related to MesosActorSystem found to be active $threadNames") {
assert(!threadNames.exists(_.startsWith("MesosActorSystem")))
}
}, 10, Some(1.second))
super.afterAll()
}
val timeouts = MesosTimeoutConfig(1.seconds, 1.seconds, 1.seconds, 1.seconds, 1.seconds)
val mesosConfig =
MesosConfig("http://master:5050", None, "*", true, Seq.empty, " ", Seq.empty, true, None, 1.seconds, 2, timeouts)
behavior of "MesosContainerFactory"
it should "send Subscribe on init" in {
val wskConfig = new WhiskConfig(Map.empty)
factory = new MesosContainerFactory(
wskConfig,
system,
logging,
Map("--arg1" -> Set("v1", "v2")),
containerArgsConfig,
mesosConfig = mesosConfig,
clientFactory = (system, mesosConfig) => testActor)
expectMsg(Subscribe)
}
it should "send SubmitTask (with constraints) on create" in {
val mesosConfig = MesosConfig(
"http://master:5050",
None,
"*",
true,
Seq("att1 LIKE v1", "att2 UNLIKE v2"),
" ",
Seq("bbatt1 LIKE v1", "bbatt2 UNLIKE v2"),
true,
None,
1.seconds,
2,
timeouts)
factory = new MesosContainerFactory(
wskConfig,
system,
logging,
Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
containerArgsConfig,
mesosConfig = mesosConfig,
clientFactory = (_, _) => testActor,
taskIdGenerator = testTaskId _)
expectMsg(Subscribe)
factory.createContainer(
TransactionId.testing,
"mesosContainer",
ImageName("fakeImage"),
false,
actionMemory,
poolConfig.cpuShare(actionMemory))
expectMsg(
SubmitTask(TaskDef(
lastTaskId,
"mesosContainer",
"fakeImage",
mesosCpus,
actionMemory.toMB.toInt,
List(8080),
None,
false,
User("net1"),
Map(
"arg1" -> Set("v1", "v2"),
"arg2" -> Set("v3", "v4"),
"other" -> Set("v5", "v6"),
"dns" -> Set("dns1", "dns2"),
"extra1" -> Set("e1", "e2"),
"extra2" -> Set("e3", "e4")),
Some(CommandDef(Map("__OW_API_HOST" -> wskConfig.wskApiHost))),
Seq(Constraint("att1", LIKE, "v1"), Constraint("att2", UNLIKE, "v2")).toSet)))
}
it should "send DeleteTask on destroy" in {
val probe = TestProbe()
factory = new MesosContainerFactory(
wskConfig,
system,
logging,
Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
containerArgsConfig,
mesosConfig = mesosConfig,
clientFactory = (system, mesosConfig) => probe.testActor,
taskIdGenerator = testTaskId _)
probe.expectMsg(Subscribe)
//emulate successful subscribe
probe.reply(new SubscribeComplete("testid"))
//create the container
val c = factory.createContainer(
TransactionId.testing,
"mesosContainer",
ImageName("fakeImage"),
false,
actionMemory,
poolConfig.cpuShare(actionMemory))
probe.expectMsg(
SubmitTask(TaskDef(
lastTaskId,
"mesosContainer",
"fakeImage",
mesosCpus,
actionMemory.toMB.toInt,
List(8080),
None,
false,
User("net1"),
Map(
"arg1" -> Set("v1", "v2"),
"arg2" -> Set("v3", "v4"),
"other" -> Set("v5", "v6"),
"dns" -> Set("dns1", "dns2"),
"extra1" -> Set("e1", "e2"),
"extra2" -> Set("e3", "e4")),
Some(CommandDef(Map("__OW_API_HOST" -> wskConfig.wskApiHost))))))
//emulate successful task launch
val taskId = TaskID.newBuilder().setValue(lastTaskId)
probe.reply(
Running(
taskId.getValue,
"testAgentID",
TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).build(),
"agenthost",
Seq(30000)))
//wait for container
val container = await(c)
//destroy the container
implicit val tid = TransactionId.testing
val deleted = container.destroy()
probe.expectMsg(DeleteTask(lastTaskId))
probe.reply(TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_KILLED).build())
await(deleted)
}
it should "return static message for logs" in {
val probe = TestProbe()
factory = new MesosContainerFactory(
wskConfig,
system,
logging,
Map("--arg1" -> Set("v1", "v2"), "--arg2" -> Set("v3", "v4"), "other" -> Set("v5", "v6")),
new ContainerArgsConfig(
"bridge",
Seq.empty,
Seq.empty,
Seq.empty,
Seq.empty,
Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))),
mesosConfig = mesosConfig,
clientFactory = (system, mesosConfig) => probe.testActor,
taskIdGenerator = testTaskId _)
probe.expectMsg(Subscribe)
//emulate successful subscribe
probe.reply(new SubscribeComplete("testid"))
//create the container
val c = factory.createContainer(
TransactionId.testing,
"mesosContainer",
ImageName("fakeImage"),
false,
actionMemory,
poolConfig.cpuShare(actionMemory))
probe.expectMsg(
SubmitTask(TaskDef(
lastTaskId,
"mesosContainer",
"fakeImage",
mesosCpus,
actionMemory.toMB.toInt,
List(8080),
None,
false,
Bridge,
Map(
"arg1" -> Set("v1", "v2"),
"arg2" -> Set("v3", "v4"),
"other" -> Set("v5", "v6"),
"extra1" -> Set("e1", "e2"),
"extra2" -> Set("e3", "e4")),
Some(CommandDef(Map("__OW_API_HOST" -> wskConfig.wskApiHost))))))
//emulate successful task launch
val taskId = TaskID.newBuilder().setValue(lastTaskId)
probe.reply(
Running(
taskId.getValue,
"testAgentID",
TaskStatus.newBuilder().setTaskId(taskId).setState(TaskState.TASK_RUNNING).build(),
"agenthost",
Seq(30000)))
//wait for container
val container = await(c)
implicit val tid = TransactionId.testing
val logs = container
.logs(actionMemory, false)
.via(DockerToActivationLogStore.toFormattedString)
.runWith(Sink.seq)
await(logs)(0) should endWith
" stdout: Logs are not collected from Mesos containers currently. You can browse the logs for Mesos Task ID testTaskId using the mesos UI at http://master:5050"
}
}