/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.servicecomb.transport.rest.client.http;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import javax.ws.rs.core.Response.Status;

import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.common.rest.RestConst;
import org.apache.servicecomb.common.rest.codec.param.RestClientRequestImpl;
import org.apache.servicecomb.common.rest.definition.RestOperationMeta;
import org.apache.servicecomb.common.rest.filter.HttpClientFilter;
import org.apache.servicecomb.common.rest.filter.HttpClientFilterBeforeSendRequestExecutor;
import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.definition.OperationConfig;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.invocation.InvocationStageTrace;
import org.apache.servicecomb.foundation.common.http.HttpStatus;
import org.apache.servicecomb.foundation.common.net.IpPort;
import org.apache.servicecomb.foundation.common.net.URIEndpointObject;
import org.apache.servicecomb.foundation.common.utils.ExceptionUtils;
import org.apache.servicecomb.foundation.common.utils.JsonUtils;
import org.apache.servicecomb.foundation.vertx.client.http.HttpClientWithContext;
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;
import org.apache.servicecomb.foundation.vertx.http.VertxClientRequestToHttpServletRequest;
import org.apache.servicecomb.foundation.vertx.http.VertxClientResponseToHttpServletResponse;
import org.apache.servicecomb.registry.definition.DefinitionConst;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;

public class RestClientInvocation {
  private static final Logger LOGGER = LoggerFactory.getLogger(RestClientInvocation.class);

  private static final String[] INTERNAL_HEADERS = new String[] {
      org.apache.servicecomb.core.Const.CSE_CONTEXT,
      org.apache.servicecomb.core.Const.TARGET_MICROSERVICE
  };

  private final HttpClientWithContext httpClientWithContext;

  private Invocation invocation;

  private RestOperationMeta restOperationMeta;

  private AsyncResponse asyncResp;

  private final List<HttpClientFilter> httpClientFilters;

  private HttpClientRequest clientRequest;

  private HttpClientResponse clientResponse;

  private final Handler<Throwable> throwableHandler = this::fail;

  private boolean alreadyFailed = false;

  public RestClientInvocation(HttpClientWithContext httpClientWithContext, List<HttpClientFilter> httpClientFilters) {
    this.httpClientWithContext = httpClientWithContext;
    this.httpClientFilters = httpClientFilters;
  }

  public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Exception {
    this.invocation = invocation;
    this.asyncResp = asyncResp;

    OperationMeta operationMeta = invocation.getOperationMeta();
    restOperationMeta = operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION);

    String path = this.createRequestPath(restOperationMeta);
    IpPort ipPort = (IpPort) invocation.getEndpoint().getAddress();

    Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);

    invocation.getInvocationStageTrace().startGetConnection();
    requestFuture.compose(clientRequest -> {
      invocation.getInvocationStageTrace().finishGetConnection();

      this.clientRequest = clientRequest;

      clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());
      RestClientRequestImpl restClientRequest =
          new RestClientRequestImpl(clientRequest, httpClientWithContext.context(), asyncResp, throwableHandler);
      invocation.getHandlerContext().put(RestConst.INVOCATION_HANDLER_REQUESTCLIENT, restClientRequest);

      Buffer requestBodyBuffer;
      try {
        requestBodyBuffer = restClientRequest.getBodyBuffer();
      } catch (Exception e) {
        return Future.failedFuture(e);
      }
      HttpServletRequestEx requestEx = new VertxClientRequestToHttpServletRequest(clientRequest, requestBodyBuffer);
      invocation.getInvocationStageTrace().startClientFiltersRequest();

      return Future.fromCompletionStage(executeHttpClientFilters(requestEx).thenCompose((v) -> {
        // 从业务线程转移到网络线程中去发送
        invocation.onStartSendRequest();
        httpClientWithContext.runOnContext(httpClient -> {
          clientRequest.setTimeout(operationMeta.getConfig().getMsRequestTimeout());
          clientRequest.response().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
              fail(asyncResult.cause());
              return;
            }
            handleResponse(asyncResult.result());
          });
          processServiceCombHeaders(invocation, operationMeta);
          restClientRequest.end()
              .onComplete((t) -> invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime()));
        });
        return CompletableFuture.completedFuture((Void) null);
      }));
    }).onFailure(failure -> {
      invocation.getTraceIdLogger()
          .error(LOGGER, "Failed to send request, alreadyFailed:{}, local:{}, remote:{}, message={}.",
              alreadyFailed, getLocalAddress(), ipPort.getSocketAddress(),
              ExceptionUtils.getExceptionMessageWithoutTrace(failure));
      throwableHandler.handle(failure);
    });
  }

  private CompletableFuture<Void> executeHttpClientFilters(HttpServletRequestEx requestEx) {
    HttpClientFilterBeforeSendRequestExecutor exec =
        new HttpClientFilterBeforeSendRequestExecutor(httpClientFilters, invocation, requestEx);
    return exec.run();
  }


  /**
   * If this is a 3rd party invocation, ServiceComb related headers should be removed by default to hide inner
   * implementation. Otherwise, the InvocationContext will be set into the request headers.
   *
   * @see OperationConfig#isClientRequestHeaderFilterEnabled()
   * @param invocation  invocation determines whether this is an invocation to 3rd party services
   * @param operationMeta operationMeta determines whether to remove certain headers and which headers should be removed
   */
  private void processServiceCombHeaders(Invocation invocation, OperationMeta operationMeta) {
    if (invocation.isThirdPartyInvocation() && operationMeta.getConfig().isClientRequestHeaderFilterEnabled()) {
      for (String internalHeaderName : INTERNAL_HEADERS) {
        clientRequest.headers().remove(internalHeaderName);
      }
      return;
    }
    this.setCseContext();
  }

  private String getLocalAddress() {
    if (clientRequest == null || clientRequest.connection() == null
        || clientRequest.connection().localAddress() == null) {
      return "not connected";
    }
    return clientRequest.connection().localAddress().toString();
  }

  private HttpMethod getMethod() {
    OperationMeta operationMeta = invocation.getOperationMeta();
    RestOperationMeta swaggerRestOperation = operationMeta.getExtData(RestConst.SWAGGER_REST_OPERATION);
    String method = swaggerRestOperation.getHttpMethod();
    return HttpMethod.valueOf(method);
  }

  Future<HttpClientRequest> createRequest(IpPort ipPort, String path) {
    URIEndpointObject endpoint = (URIEndpointObject) invocation.getEndpoint().getAddress();
    HttpMethod method = getMethod();
    RequestOptions requestOptions = new RequestOptions();
    requestOptions.setHost(ipPort.getHostOrIp())
        .setPort(ipPort.getPort())
        .setSsl(endpoint.isSslEnabled())
        .setMethod(method)
        .setURI(path);

    invocation.getTraceIdLogger()
        .debug(LOGGER, "Sending request by rest, method={}, qualifiedName={}, path={}, endpoint={}.",
            method,
            invocation.getMicroserviceQualifiedName(),
            path,
            invocation.getEndpoint().getEndpoint());
    return httpClientWithContext.getHttpClient().request(requestOptions);
  }

  protected void handleResponse(HttpClientResponse httpClientResponse) {
    this.clientResponse = httpClientResponse;

    if (HttpStatus.isSuccess(clientResponse.statusCode()) && restOperationMeta.isDownloadFile()) {
      ReadStreamPart part = new ReadStreamPart(httpClientWithContext.context(), httpClientResponse);
      invocation.getHandlerContext().put(RestConst.READ_STREAM_PART, part);
      processResponseBody(null);
      return;
    }

    httpClientResponse.exceptionHandler(e -> {
      invocation.getTraceIdLogger().error(LOGGER, "Failed to receive response, local:{}, remote:{}, message={}.",
          getLocalAddress(), httpClientResponse.netSocket().remoteAddress(),
          ExceptionUtils.getExceptionMessageWithoutTrace(e));
      fail(e);
    });

    clientResponse.bodyHandler(this::processResponseBody);
  }

  /**
   * after this method, connection will be recycled to connection pool
   * @param responseBuf response body buffer, when download, responseBuf is null, because download data by ReadStreamPart
   */
  protected void processResponseBody(Buffer responseBuf) {
    invocation.getInvocationStageTrace().finishReceiveResponse();
    invocation.getResponseExecutor().execute(() -> {
      try {
        invocation.getInvocationStageTrace().startClientFiltersResponse();
        HttpServletResponseEx responseEx =
            new VertxClientResponseToHttpServletResponse(clientResponse, responseBuf);
        for (HttpClientFilter filter : httpClientFilters) {
          if (filter.enabled()) {
            Response response = filter.afterReceiveResponse(invocation, responseEx);
            if (response != null) {
              complete(response);
              return;
            }
          }
        }
      } catch (Throwable e) {
        fail(e);
      }
    });
  }

  protected void complete(Response response) {
    invocation.getInvocationStageTrace().finishClientFiltersResponse();
    asyncResp.complete(response);
  }

  protected void fail(Throwable e) {
    if (alreadyFailed) {
      return;
    }

    alreadyFailed = true;

    InvocationStageTrace stageTrace = invocation.getInvocationStageTrace();

    if (stageTrace.getFinishWriteToBuffer() == 0) {
      stageTrace.finishWriteToBuffer(System.nanoTime());
    }

    // even failed and did not received response, still set time for it
    // that will help to know the real timeout time
    if (stageTrace.getFinishReceiveResponse() == 0) {
      stageTrace.finishReceiveResponse();
    }
    if (stageTrace.getStartClientFiltersResponse() == 0) {
      stageTrace.startClientFiltersResponse();
    }

    stageTrace.finishClientFiltersResponse();

    try {
      if (e instanceof TimeoutException) {
        // give an accurate cause for timeout exception
        //   The timeout period of 30000ms has been exceeded while executing GET /xxx for server 1.1.1.1:8080
        // should not copy the message to invocationException to avoid leak server ip address
        LOGGER.info("Request timeout, Details: {}.", e.getMessage());
        asyncResp.consumerFail(new InvocationException(Status.REQUEST_TIMEOUT,
            new CommonExceptionData("Request Timeout.")));
        return;
      }
      asyncResp.fail(invocation.getInvocationType(), e);
    } catch (Throwable e1) {
      invocation.getTraceIdLogger().error(LOGGER, "failed to invoke asyncResp, message={}"
          , ExceptionUtils.getExceptionMessageWithoutTrace(e));
    }
  }

  protected void setCseContext() {
    try {
      clientRequest.putHeader(Const.CSE_CONTEXT, JsonUtils.writeUnicodeValueAsString(invocation.getContext()));
    } catch (Throwable e) {
      invocation.getTraceIdLogger().error(LOGGER, "Failed to encode and set cseContext, message={}."
          , ExceptionUtils.getExceptionMessageWithoutTrace(e));
    }
  }

  protected String createRequestPath(RestOperationMeta swaggerRestOperation) throws Exception {
    URIEndpointObject address = (URIEndpointObject) invocation.getEndpoint().getAddress();
    String urlPrefix = address.getFirst(DefinitionConst.URL_PREFIX);

    String path = (String) invocation.getHandlerContext().get(RestConst.REST_CLIENT_REQUEST_PATH);
    if (path == null) {
      path = swaggerRestOperation.getPathBuilder().createRequestPath(invocation.getSwaggerArguments());
    }

    if (StringUtils.isEmpty(urlPrefix) || path.startsWith(urlPrefix)) {
      return path;
    }

    return urlPrefix + path;
  }
}
