blob: eee65a3575996b0390ef58c8497d475cbd0874cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.containerpool.docker.test
import java.nio.charset.StandardCharsets
import java.time.Instant
import scala.concurrent.duration._
import org.apache.http.HttpRequest
import org.apache.http.HttpResponse
import org.apache.http.entity.StringEntity
import org.apache.http.localserver.LocalServerTestBase
import org.apache.http.protocol.HttpContext
import org.apache.http.protocol.HttpRequestHandler
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.BeforeAndAfter
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import spray.json.JsObject
import common.StreamLogging
import common.WskActorSystem
import org.apache.http.conn.HttpHostConnectException
import scala.concurrent.Await
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.containerpool.ApacheBlockingContainerClient
import org.apache.openwhisk.core.containerpool.ContainerHealthError
import org.apache.openwhisk.core.containerpool.RetryableConnectionError
import org.apache.openwhisk.core.entity.ActivationResponse.Timeout
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.entity.ActivationResponse._
/**
* Unit tests for ApacheBlockingContainerClient which communicate with containers.
*/
@RunWith(classOf[JUnitRunner])
class ApacheBlockingContainerClientTests
extends FlatSpec
with Matchers
with BeforeAndAfter
with BeforeAndAfterAll
with StreamLogging
with WskActorSystem {
implicit val transid = TransactionId.testing
implicit val ec = actorSystem.dispatcher
var testHang: FiniteDuration = 0.second
var testStatusCode: Int = 200
var testResponse: String = null
val mockServer = new LocalServerTestBase {
override def setUp() = {
super.setUp()
this.serverBootstrap.registerHandler("/init", new HttpRequestHandler() {
override def handle(request: HttpRequest, response: HttpResponse, context: HttpContext) = {
if (testHang.length > 0) {
Thread.sleep(testHang.toMillis)
}
response.setStatusCode(testStatusCode);
if (testResponse != null) {
response.setEntity(new StringEntity(testResponse, StandardCharsets.UTF_8))
}
}
})
}
}
mockServer.setUp()
val httpHost = mockServer.start()
val hostWithPort = s"${httpHost.getHostName}:${httpHost.getPort}"
before {
testHang = 0.second
testStatusCode = 200
testResponse = null
stream.reset()
}
override def afterAll = {
mockServer.shutDown()
}
behavior of "ApacheBlockingContainerClient"
it should "not wait longer than set timeout" in {
val timeout = 5.seconds
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B)
testHang = timeout * 2
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result shouldBe 'left
waited should be > timeout.toMillis
waited should be < (timeout * 2).toMillis
}
it should "handle empty entity response" in {
val timeout = 5.seconds
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B)
testStatusCode = 204
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
result shouldBe Left(NoResponseReceived())
}
it should "retry till timeout on HttpHostConnectException" in {
val timeout = 5.seconds
val badHostAndPort = "0.0.0.0:12345"
val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B)
testStatusCode = 204
val start = Instant.now()
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
val end = Instant.now()
val waited = end.toEpochMilli - start.toEpochMilli
result match {
case Left(Timeout(RetryableConnectionError(_: HttpHostConnectException))) => // all good
case _ =>
fail(s"$result was not a Timeout(RetryableConnectionError(HttpHostConnectException)))")
}
waited should be > timeout.toMillis
waited should be < (timeout * 2).toMillis
}
it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
val timeout = 5.seconds
val badHostAndPort = "0.0.0.0:12345"
val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B)
assertThrows[ContainerHealthError] {
Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
}
}
it should "not truncate responses within limit" in {
val timeout = 1.minute.toMillis
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B)
Seq(true, false).foreach { success =>
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, if (r != null) r else "", None)
}
}
}
}
it should "truncate responses that exceed limit" in {
val timeout = 1.minute.toMillis
val limit = 1.B
val excess = limit + 1.B
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, limit)
Seq(true, false).foreach { success =>
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
testStatusCode = if (success) 200 else 500
testResponse = r
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
result shouldBe Right {
ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
}
}
}
}
}