blob: a7b0683d9dfc138379c4cacaee7186d6d041f433 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.api.ConnectionManager;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.PathResolver;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.model.StubServiceDescriptor;
import org.apache.dubbo.rpc.protocol.AbstractExporter;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.protocol.tri.compressor.DeCompressor;
import org.apache.dubbo.rpc.protocol.tri.service.TriBuiltinService;
import org.apache.dubbo.triple.TripleWrapper;
import com.google.protobuf.ByteString;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
public class TripleProtocol extends AbstractProtocol {
public static final String METHOD_ATTR_PACK = "pack";
private static final Logger logger = LoggerFactory.getLogger(TripleProtocol.class);
private final PathResolver pathResolver;
private final TriBuiltinService triBuiltinService;
private final ConnectionManager connectionManager;
private final String acceptEncodings;
private boolean versionChecked = false;
public TripleProtocol(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
this.triBuiltinService = new TriBuiltinService(frameworkModel);
this.pathResolver = frameworkModel.getExtensionLoader(PathResolver.class)
.getDefaultExtension();
Set<String> supported = frameworkModel.getExtensionLoader(DeCompressor.class)
.getSupportedExtensions();
this.acceptEncodings = String.join(",", supported);
this.connectionManager = frameworkModel.getExtensionLoader(ConnectionManager.class)
.getExtension("multiple");
}
@Override
public int getDefaultPort() {
return 50051;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
checkProtobufVersion(url);
String key = serviceKey(url);
final AbstractExporter<T> exporter = new AbstractExporter<T>(invoker) {
@Override
public void afterUnExport() {
pathResolver.remove(url.getServiceKey());
pathResolver.remove(url.getServiceModel().getServiceModel().getInterfaceName());
// set service status
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceKey(), ServingStatus.NOT_SERVING);
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceInterface(), ServingStatus.NOT_SERVING);
exporterMap.remove(key);
}
};
exporterMap.put(key, exporter);
invokers.add(invoker);
pathResolver.add(url.getServiceKey(), invoker);
pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);
// set service status
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
triBuiltinService.getHealthStatusManager()
.setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);
// init
url.getOrDefaultApplicationModel().getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension()
.createExecutorIfAbsent(url);
PortUnificationExchanger.bind(url, new DefaultPuHandler());
optimizeSerialization(url);
return exporter;
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
optimizeSerialization(url);
ExecutorService streamExecutor = getOrCreateStreamExecutor(
url.getOrDefaultApplicationModel(), url);
TripleInvoker<T> invoker = new TripleInvoker<>(type, url, acceptEncodings,
connectionManager, invokers, streamExecutor);
invokers.add(invoker);
return invoker;
}
private ExecutorService getOrCreateStreamExecutor(ApplicationModel applicationModel, URL url) {
ExecutorService executor = applicationModel.getExtensionLoader(ExecutorRepository.class)
.getDefaultExtension()
.createExecutorIfAbsent(url);
Objects.requireNonNull(executor,
String.format("No available executor found in %s", url));
return executor;
}
@Override
protected <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException {
return null;
}
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroying protocol [" + this.getClass().getSimpleName() + "] ...");
}
PortUnificationExchanger.close();
pathResolver.destroy();
super.destroy();
}
private void checkProtobufVersion(URL url) {
if (versionChecked) {
return;
}
if (url.getServiceModel() == null) {
return;
}
ServiceDescriptor descriptor = url.getServiceModel().getServiceModel();
if (descriptor == null) {
return;
}
if (descriptor instanceof StubServiceDescriptor) {
return;
}
TripleWrapper.TripleResponseWrapper responseWrapper = TripleWrapper.TripleResponseWrapper.newBuilder()
.setData(ByteString.copyFromUtf8("Test"))
.setSerializeType("Test")
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
responseWrapper.writeTo(baos);
} catch (IOException e) {
throw new IllegalStateException(
"Bad protobuf-java version detected! Please make sure the version of user's "
+ "classloader is " + "greater than 3.11.0 ", e);
}
this.versionChecked = true;
}
}