| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.openwhisk.core.containerpool |
| |
| import java.net.NoRouteToHostException |
| import java.nio.charset.StandardCharsets |
| import java.time.Instant |
| |
| import org.apache.commons.io.IOUtils |
| import org.apache.http.{HttpHeaders, NoHttpResponseException} |
| import org.apache.http.client.config.RequestConfig |
| import org.apache.http.client.methods.{HttpPost, HttpRequestBase} |
| import org.apache.http.client.utils.{HttpClientUtils, URIBuilder} |
| import org.apache.http.conn.HttpHostConnectException |
| import org.apache.http.entity.StringEntity |
| import org.apache.http.impl.NoConnectionReuseStrategy |
| import org.apache.http.impl.client.HttpClientBuilder |
| import org.apache.http.impl.conn.PoolingHttpClientConnectionManager |
| import org.apache.http.util.EntityUtils |
| import spray.json._ |
| import org.apache.openwhisk.common.{Logging, TransactionId} |
| import org.apache.openwhisk.core.ConfigKeys |
| import org.apache.openwhisk.core.entity.ActivationResponse._ |
| import org.apache.openwhisk.core.entity.ByteSize |
| import org.apache.openwhisk.core.entity.size.SizeLong |
| import pureconfig._ |
| import pureconfig.generic.auto._ |
| |
| import scala.annotation.tailrec |
| import scala.concurrent._ |
| import scala.concurrent.{Await, ExecutionContext, Future} |
| import scala.concurrent.duration._ |
| import scala.util.{Failure, Success, Try} |
| import scala.util.control.NoStackTrace |
| |
| // Used internally to wrap all exceptions for which the request can be retried |
| protected[containerpool] case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace |
| |
| /** |
| * This HTTP client is used only in the invoker to communicate with the action container. |
| * It allows to POST a JSON object and receive JSON object back; that is the |
| * content type and the accept headers are both 'application/json. |
| * The reason we still use this class for the action container is a mysterious hang |
| * in the Akka http client where a future fails to properly timeout and we have not |
| * determined why that is. |
| * |
| * @param hostname the host name |
| * @param timeout the timeout in msecs to wait for a response |
| * @param maxResponse the maximum size in bytes the connection will accept |
| * @param maxConcurrent the maximum number of concurrent requests allowed (Default is 1) |
| */ |
| protected class ApacheBlockingContainerClient(hostname: String, |
| timeout: FiniteDuration, |
| maxResponse: ByteSize, |
| maxConcurrent: Int = 1)(implicit logging: Logging, ec: ExecutionContext) |
| extends ContainerClient { |
| |
| /** |
| * Closes the HttpClient and all resources allocated by it. |
| * |
| * This will close the HttpClient that is generated for this instance of ApacheBlockingContainerClient. That will also cause the |
| * ConnectionManager to be closed alongside. |
| */ |
| def close(): Future[Unit] = Future.successful(HttpClientUtils.closeQuietly(connection)) |
| |
| /** |
| * Posts to hostname/endpoint the given JSON object. |
| * Waits up to timeout before aborting on a good connection. |
| * If the endpoint is not ready, retry up to timeout. |
| * Every retry reduces the available timeout so that this method should not |
| * wait longer than the total timeout (within a small slack allowance). |
| * |
| * @param endpoint the path the api call relative to hostname |
| * @param body the JSON value to post (this is usually a JSON objecT) |
| * @param retry whether or not to retry on connection failure |
| * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String) |
| */ |
| def post(endpoint: String, body: JsValue, retry: Boolean, reschedule: Boolean = false)( |
| implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = { |
| val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8) |
| entity.setContentType("application/json") |
| |
| val request = new HttpPost(baseUri.setPath(endpoint).build) |
| request.addHeader(HttpHeaders.ACCEPT, "application/json") |
| request.setEntity(entity) |
| |
| Future { |
| blocking { |
| execute(request, timeout, maxConcurrent, retry, reschedule) |
| } |
| } |
| } |
| |
| // Annotation will make the compiler complain if no tail recursion is possible |
| @tailrec private def execute( |
| request: HttpRequestBase, |
| timeout: FiniteDuration, |
| maxConcurrent: Int, |
| retry: Boolean, |
| reschedule: Boolean = false)(implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = { |
| val start = Instant.now |
| |
| Try(connection.execute(request)).map { response => |
| val containerResponse = Option(response.getEntity) |
| .map { entity => |
| val statusCode = response.getStatusLine.getStatusCode |
| val contentLength = entity.getContentLength |
| |
| // Negative contentLength means unknown or overflow. We don't want to consume in either case. |
| if (contentLength >= 0) { |
| if (contentLength <= maxResponseBytes) { |
| // optimized route to consume the entire stream into a string |
| val str = EntityUtils.toString(entity, StandardCharsets.UTF_8) // consumes and closes the whole stream |
| Right(ContainerResponse(statusCode, str, None)) |
| } else { |
| // only consume a bounded number of bytes according to the system limits |
| val str = new String(IOUtils.toByteArray(entity.getContent, maxResponseBytes), StandardCharsets.UTF_8) |
| EntityUtils.consumeQuietly(entity) // consume the rest of the stream to free the connection |
| Right(ContainerResponse(statusCode, str, Some(contentLength.B, maxResponse))) |
| } |
| } else { |
| EntityUtils.consumeQuietly(entity) // silently consume the whole stream to free the connection |
| Left(NoResponseReceived()) |
| } |
| } |
| .getOrElse { |
| // entity is null |
| Left(NoResponseReceived()) |
| } |
| |
| response.close() |
| containerResponse |
| } recoverWith { |
| // The route to target socket as well as the target socket itself may need some time to be available - |
| // particularly on a loaded system. |
| // The following exceptions occur on such transient conditions. In addition, no data has been transmitted |
| // yet if these exceptions occur. For this reason, it is safe and reasonable to retry. |
| // |
| // HttpHostConnectException: no target socket is listening (yet). |
| case t: HttpHostConnectException => Failure(RetryableConnectionError(t)) |
| // |
| // NoRouteToHostException: route to target host is not known (yet). |
| case t: NoRouteToHostException => Failure(RetryableConnectionError(t)) |
| |
| //In general with NoHttpResponseException it cannot be said if server has processed the request or not |
| //For some cases like in standalone mode setup it should be fine to retry |
| case t: NoHttpResponseException if ApacheBlockingContainerClient.clientConfig.retryNoHttpResponseException => |
| Failure(RetryableConnectionError(t)) |
| } match { |
| case Success(response) => response |
| case Failure(_: RetryableConnectionError) if reschedule => |
| //propagate as a failed future; clients can retry at a different container |
| throw ContainerHealthError(tid, request.getURI.toString) |
| case Failure(t: RetryableConnectionError) if retry => |
| if (timeout > Duration.Zero) { |
| Thread.sleep(50) // Sleep for 50 milliseconds |
| val newTimeout = timeout - (Instant.now.toEpochMilli - start.toEpochMilli).milliseconds |
| execute(request, newTimeout, maxConcurrent, retry = true) |
| } else { |
| logging.warn(this, s"POST failed with $t - no retry because timeout exceeded.") |
| Left(Timeout(t)) |
| } |
| case Failure(t: Throwable) => Left(ConnectionError(t)) |
| } |
| } |
| |
| private val maxResponseBytes = maxResponse.toBytes |
| |
| private val baseUri = new URIBuilder() |
| .setScheme("http") |
| .setHost(hostname) |
| |
| private val httpconfig = RequestConfig.custom |
| .setConnectTimeout(timeout.toMillis.toInt) |
| .setConnectionRequestTimeout(timeout.toMillis.toInt) |
| .setSocketTimeout(timeout.toMillis.toInt) |
| .build |
| |
| private val connection = HttpClientBuilder.create |
| .setDefaultRequestConfig(httpconfig) |
| // Connections are not reused by most of the available runtimes. To circumvent any issues we might have regarding |
| // connections randomly breaking due to our pause/resume cycle, we don't reuse connections at all. |
| .setConnectionReuseStrategy(new NoConnectionReuseStrategy) |
| .setConnectionManager { |
| // A PoolingHttpClientConnectionManager is the default when not specifying any ConnectionManager. |
| // The PoolingHttpClientConnectionManager has the benefit of actively checking if a connection has become stale, |
| // which is very important because pausing/resuming containers can cause a connection to become silently broken. |
| // This causes very subtle bugs, especially when containers are reused after a pretty long time (like > 5 minutes). |
| // |
| // The BasicHttpClientConnectionManager (which would be alternative here) doesn't have such a mechanism and thus |
| // isn't suitable for our usage. |
| val cm = new PoolingHttpClientConnectionManager() |
| // perRoute effectively means per host in our use-case, which means setting it to the same value as the maximum |
| // total of all connections in the pool is appropriate here. |
| cm.setDefaultMaxPerRoute(maxConcurrent) |
| cm.setMaxTotal(maxConcurrent) |
| cm |
| } |
| .useSystemProperties() |
| .disableAutomaticRetries() |
| .build |
| } |
| |
| case class ApacheClientConfig(retryNoHttpResponseException: Boolean) |
| |
| object ApacheBlockingContainerClient { |
| val clientConfig: ApacheClientConfig = loadConfigOrThrow[ApacheClientConfig](ConfigKeys.apacheClientConfig) |
| |
| /** A helper method to post one single request to a connection. Used for container tests. */ |
| def post(host: String, port: Int, endPoint: String, content: JsValue)( |
| implicit logging: Logging, |
| tid: TransactionId, |
| ec: ExecutionContext): (Int, Option[JsObject]) = { |
| val timeout = 90.seconds |
| val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout, 1.MB) |
| val response = executeRequest(connection, endPoint, content) |
| val result = Await.result(response, timeout) |
| connection.close() |
| result |
| } |
| |
| /** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */ |
| def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: Duration)( |
| implicit logging: Logging, |
| tid: TransactionId, |
| ec: ExecutionContext): Seq[(Int, Option[JsObject])] = { |
| val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, 1.MB, contents.size) |
| val futureResults = contents.map { content => |
| executeRequest(connection, endPoint, content) |
| } |
| val results = Await.result(Future.sequence(futureResults), timeout) |
| connection.close() |
| results |
| } |
| |
| private def executeRequest(connection: ApacheBlockingContainerClient, endpoint: String, content: JsValue)( |
| implicit logging: Logging, |
| tid: TransactionId, |
| ec: ExecutionContext): Future[(Int, Option[JsObject])] = { |
| connection.post(endpoint, content, retry = true) map { |
| case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption) |
| case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container") |
| case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException() |
| case Left(ConnectionError(_: java.net.SocketTimeoutException)) => |
| throw new java.util.concurrent.TimeoutException() |
| case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage) |
| } |
| |
| } |
| } |