blob: 3e0f98bf548fda7ef3cc88bb6c934b6642db53cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.transport.highway;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.core.Response.Status;
import com.google.common.annotations.VisibleForTesting;
import org.apache.servicecomb.codec.protobuf.definition.OperationProtobuf;
import org.apache.servicecomb.codec.protobuf.definition.ProtobufManager;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.foundation.ssl.SSLCustom;
import org.apache.servicecomb.foundation.ssl.SSLOption;
import org.apache.servicecomb.foundation.ssl.SSLOptionFactory;
import org.apache.servicecomb.foundation.vertx.VertxTLSBuilder;
import org.apache.servicecomb.foundation.vertx.VertxUtils;
import org.apache.servicecomb.foundation.vertx.client.ClientPoolManager;
import org.apache.servicecomb.foundation.vertx.client.ClientVerticle;
import org.apache.servicecomb.foundation.vertx.client.tcp.TcpClientConfig;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.config.DynamicPropertyFactory;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Vertx;
public class HighwayClient {
private static final Logger LOGGER = LoggerFactory.getLogger(HighwayClient.class);
private static final String SSL_KEY = "highway.consumer";
private ClientPoolManager<HighwayClientConnectionPool> clientMgr;
public void init(Vertx vertx) throws Exception {
TcpClientConfig normalConfig = createTcpClientConfig();
normalConfig.setSsl(false);
TcpClientConfig sslConfig = createTcpClientConfig();
sslConfig.setSsl(true);
clientMgr = new ClientPoolManager<>(vertx, new HighwayClientPoolFactory(normalConfig, sslConfig));
DeploymentOptions deployOptions = VertxUtils.createClientDeployOptions(clientMgr,
HighwayConfig.getClientThreadCount());
VertxUtils.blockDeploy(vertx, ClientVerticle.class, deployOptions);
}
@VisibleForTesting
TcpClientConfig createTcpClientConfig() {
TcpClientConfig tcpClientConfig = new TcpClientConfig();
// global request timeout to be login timeout
tcpClientConfig.setMsLoginTimeout(DynamicPropertyFactory.getInstance()
.getLongProperty("servicecomb.request.timeout", TcpClientConfig.DEFAULT_LOGIN_TIMEOUT).get());
SSLOptionFactory factory =
SSLOptionFactory.createSSLOptionFactory(SSL_KEY, null);
SSLOption sslOption;
if (factory == null) {
sslOption = SSLOption.buildFromYaml(SSL_KEY);
} else {
sslOption = factory.createSSLOption();
}
SSLCustom sslCustom = SSLCustom.createSSLCustom(sslOption.getSslCustomClass());
VertxTLSBuilder.buildClientOptionsBase(sslOption, sslCustom, tcpClientConfig);
return tcpClientConfig;
}
public void send(Invocation invocation, AsyncResponse asyncResp) throws Exception {
invocation.getInvocationStageTrace().startGetConnection();
HighwayClientConnection tcpClient = findClientPool(invocation);
invocation.getInvocationStageTrace().startClientFiltersRequest();
OperationProtobuf operationProtobuf = ProtobufManager.getOrCreateOperation(invocation);
HighwayClientPackage clientPackage = createClientPackage(invocation, operationProtobuf);
invocation.onStartSendRequest();
tcpClient.send(clientPackage, ar -> {
invocation.getInvocationStageTrace().finishWriteToBuffer(clientPackage.getFinishWriteToBuffer());
invocation.getInvocationStageTrace().finishReceiveResponse();
// 此时是在网络线程中,转换线程
invocation.getResponseExecutor().execute(() -> {
invocation.getInvocationStageTrace().startClientFiltersResponse();
if (ar.failed()) {
// 只会是本地异常
invocation.getInvocationStageTrace().finishClientFiltersResponse();
if (ar.cause() instanceof TimeoutException) {
// give an accurate cause for timeout exception
// The timeout period of 30000ms has been exceeded while executing GET /xxx for server 1.1.1.1:8080
// should not copy the message to invocationException to avoid leak server ip address
LOGGER.info("Request timeout, Details: {}.", ar.cause().getMessage());
asyncResp.consumerFail(new InvocationException(Status.REQUEST_TIMEOUT,
new CommonExceptionData("Request Timeout.")));
return;
}
asyncResp.consumerFail(ar.cause());
return;
}
// 处理应答
try {
Response response =
HighwayCodec.decodeResponse(invocation,
operationProtobuf,
ar.result());
invocation.getInvocationStageTrace().finishClientFiltersResponse();
asyncResp.complete(response);
} catch (Throwable e) {
invocation.getInvocationStageTrace().finishClientFiltersResponse();
asyncResp.consumerFail(e);
}
});
});
}
public HighwayClientPackage createClientPackage(Invocation invocation, OperationProtobuf operationProtobuf) {
long msRequestTimeout = invocation.getOperationMeta().getConfig().getMsRequestTimeout();
return new HighwayClientPackage(invocation, operationProtobuf, msRequestTimeout);
}
public HighwayClientConnection findClientPool(Invocation invocation) {
HighwayClientConnection tcpClient = clientMgr.findClientPool(invocation.isSync())
.findOrCreateClient(invocation.getEndpoint().getEndpoint());
invocation.getInvocationStageTrace().finishGetConnection();
return tcpClient;
}
}