blob: c2da2668ae23a267ab867ca7105990d68edb8179 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.docker.test
import common.StreamLogging
import common.WskActorSystem
import java.nio.charset.StandardCharsets
import java.time.Instant
import org.apache.http.HttpRequest
import org.apache.http.HttpResponse
import org.apache.http.entity.StringEntity
import org.apache.http.localserver.LocalServerTestBase
import org.apache.http.protocol.HttpContext
import org.apache.http.protocol.HttpRequestHandler
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner
import scala.concurrent.Await
import scala.concurrent.TimeoutException
import scala.concurrent.duration._
import spray.json.JsObject
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.AkkaContainerClient
import org.apache.openwhisk.core.containerpool.ContainerHealthError
import org.apache.openwhisk.core.entity.ActivationResponse._
import org.apache.openwhisk.core.entity.size._
/**
* Unit tests for AkkaContainerClientTests which communicate with containers.
*/
@RunWith(classOf[JUnitRunner])
class AkkaContainerClientTests
extends FlatSpec
with Matchers
with BeforeAndAfter
with BeforeAndAfterAll
with StreamLogging
with WskActorSystem {
implicit val transid = TransactionId.testing
implicit val ec = actorSystem.dispatcher
var testHang: FiniteDuration = 0.second
var testStatusCode: Int = 200
var testResponse: String = null
var testConnectionFailCount: Int = 0
val mockServer = new LocalServerTestBase {
var failcount = 0
override def setUp() = {
super.setUp()
this.serverBootstrap
.registerHandler(
"/init",
new HttpRequestHandler() {
override def handle(request: HttpRequest, response: HttpResponse, context: HttpContext) = {
if (testHang.length > 0) {
Thread.sleep(testHang.toMillis)
}
if (testConnectionFailCount > 0 && failcount < testConnectionFailCount) {
failcount += 1
println("failing in test")
throw new RuntimeException("failing...")
}
response.setStatusCode(testStatusCode);
if (testResponse != null) {
response.setEntity(new StringEntity(testResponse, StandardCharsets.UTF_8))
}
}
})
}
}
mockServer.setUp()
val httpHost = mockServer.start()
val hostWithPort = s"${httpHost.getHostName}:${httpHost.getPort}"
before {
testHang = 0.second
testStatusCode = 200
testResponse = null
testConnectionFailCount = 0
stream.reset()
}
override def afterAll = {
mockServer.shutDown()
}
behavior of "AkkaContainerClient"
it should "not wait longer than set timeout" in {
val timeout = 5.seconds
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100)
testHang = timeout * 2
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result shouldBe 'left
waited should be > timeout.toMillis
waited should be < (timeout * 2).toMillis
}
it should "handle empty entity response" in {
val timeout = 5.seconds
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100)
testStatusCode = 204
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
result shouldBe Left(NoResponseReceived())
}
it should "retry till timeout on StreamTcpException" in {
val timeout = 5.seconds
val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100)
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result match {
case Left(Timeout(_: TimeoutException)) => // good
case _ => fail(s"$result was not a Timeout(TimeoutException)")
}
waited should be > timeout.toMillis
waited should be < (timeout * 2).toMillis
}
it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
val timeout = 5.seconds
val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100)
assertThrows[ContainerHealthError] {
Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
}
}
it should "retry till success within timeout limit" in {
val timeout = 5.seconds
val retryInterval = 500.milliseconds
val connection =
new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100, retryInterval)
val start = Instant.now()
testConnectionFailCount = 5
testResponse = ""
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result shouldBe Right {
ContainerResponse(true, "", None)
}
waited should be > (testConnectionFailCount * retryInterval).toMillis
waited should be < timeout.toMillis
}
it should "not truncate responses within limit" in {
val timeout = 1.minute.toMillis
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 50.B, 100)
Seq(true, false).foreach { success =>
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, if (r != null) r else "", None)
}
}
}
}
it should "truncate responses that exceed limit" in {
val timeout = 1.minute.toMillis
val limit = 1.B
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100)
Seq(true, false).foreach { success =>
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
}
}
}
}
it should "truncate large responses that exceed limit" in {
val timeout = 1.minute.toMillis
//use a limit large enough to not fit into a single ByteString as response entity is parsed into multiple ByteStrings
//seems like this varies, but often is ~64k or ~128k
val limit = 300.KB
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100)
Seq(true, false).foreach { success =>
// Generate a response that's 1MB
val response = "0" * 1024 * 1024
testStatusCode = if (success) 200 else 500
testResponse = response
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, response.take(limit.toBytes.toInt), Some((response.length.B, limit)))
}
}
}
}