blob: 5c78652e503d0cf78b51b8c32f705875c01d2404 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.registry;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.client.utils.URIBuilder;
import org.apache.servicecomb.foundation.common.event.EnableExceptionPropagation;
import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.foundation.common.net.IpPort;
import org.apache.servicecomb.foundation.common.net.NetUtils;
import org.apache.servicecomb.foundation.common.utils.BeanUtils;
import org.apache.servicecomb.foundation.common.utils.SPIEnabled;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.registry.api.Registration;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceRegisteredEvent;
import org.apache.servicecomb.registry.api.registry.BasePath;
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstanceStatus;
import org.apache.servicecomb.registry.consumer.MicroserviceManager;
import org.apache.servicecomb.registry.consumer.StaticMicroserviceVersions;
import org.apache.servicecomb.registry.definition.MicroserviceNameParser;
import org.apache.servicecomb.registry.swagger.SwaggerLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.Subscribe;
import com.netflix.config.DynamicPropertyFactory;
import io.vertx.core.json.jackson.JacksonFactory;
import io.vertx.core.spi.json.JsonCodec;
public class RegistrationManager {
private static final Logger LOGGER = LoggerFactory.getLogger(RegistrationManager.class);
// value is ip or {interface name}
public static final String PUBLISH_ADDRESS = "servicecomb.service.publishAddress";
private static final String PUBLISH_PORT = "servicecomb.{transport_name}.publishPort";
private static SwaggerLoader swaggerLoader = new SwaggerLoader();
public static RegistrationManager INSTANCE = new RegistrationManager();
private final List<Registration> registrationList = new ArrayList<>();
private Registration primary;
private RegistrationManager() {
SPIServiceUtils.getOrLoadSortedService(Registration.class)
.stream()
.filter((SPIEnabled::enabled))
.forEach(registrationList::add);
initPrimary();
}
public MicroserviceInstance getMicroserviceInstance() {
return primary.getMicroserviceInstance();
}
public Microservice getMicroservice() {
assertPrimaryNotNull();
return primary.getMicroservice();
}
private void assertPrimaryNotNull() {
if (primary == null) {
throw new NullPointerException("At least one Registration implementation configured. Missed"
+ " to include dependency ? e.g. <artifactId>registry-service-center</artifactId>");
}
}
public String getAppId() {
assertPrimaryNotNull();
return primary.getAppId();
}
public SwaggerLoader getSwaggerLoader() {
return swaggerLoader;
}
public void updateMicroserviceInstanceStatus(
MicroserviceInstanceStatus status) {
registrationList
.forEach(registration -> registration.updateMicroserviceInstanceStatus(status));
}
public void addSchema(String schemaId, String content) {
registrationList
.forEach(registration -> registration.addSchema(schemaId, content));
}
public void addEndpoint(String endpoint) {
registrationList
.forEach(registration -> registration.addEndpoint(endpoint));
}
public void addBasePath(Collection<BasePath> basePaths) {
registrationList
.forEach(registration -> registration.addBasePath(basePaths));
}
public void destroy() {
registrationList.forEach(registration -> registration.destroy());
}
public void run() {
EventManager.getEventBus().register(new AfterServiceInstanceRegistryHandler(registrationList.size()));
registrationList.forEach(registration -> registration.run());
}
public void init() {
BeanUtils.addBeans(Registration.class, registrationList);
initPrimary();
registrationList.forEach(registration -> registration.init());
}
private void initPrimary() {
if (registrationList.isEmpty()) {
LOGGER.warn("No registration is enabled. Fix this if only in unit tests.");
primary = null;
} else {
primary = registrationList.get(0);
}
}
/**
* <p>
* Register a third party service if not registered before, and set it's instances into
* {@linkplain StaticMicroserviceVersions StaticMicroserviceVersions}.
* </p>
* <p>
* The registered third party service has the same {@code appId} and {@code environment} as this microservice instance has,
* and there is only one schema represented by {@code schemaIntfCls}, whose name is the same as {@code microserviceName}.
* </p>
* <em>
* This method is for initializing 3rd party service endpoint config.
* i.e. If this service has not been registered before, this service will be registered and the instances will be set;
* otherwise, NOTHING will happen.
* </em>
*
* @param microserviceName name of the 3rd party service, and this param also specifies the schemaId
* @param version version of this 3rd party service
* @param instances the instances of this 3rd party service. Users only need to specify the endpoint information, other
* necessary information will be generate and set in the implementation of this method.
* @param schemaIntfCls the producer interface of the service. This interface is used to generate swagger schema and
* can also be used for the proxy interface of RPC style invocation.
*/
public void registerMicroserviceMapping(String microserviceName, String version, List<MicroserviceInstance> instances,
Class<?> schemaIntfCls) {
MicroserviceNameParser parser = new MicroserviceNameParser(getAppId(), microserviceName);
MicroserviceManager microserviceManager = DiscoveryManager.INSTANCE.getAppManager()
.getOrCreateMicroserviceManager(parser.getAppId());
microserviceManager.getVersionsByName()
.computeIfAbsent(microserviceName,
svcName -> new StaticMicroserviceVersions(DiscoveryManager.INSTANCE.getAppManager(), parser.getAppId(),
microserviceName)
.init(schemaIntfCls, version, instances)
);
}
/**
* @see #registerMicroserviceMapping(String, String, List, Class)
* @param endpoints the endpoints of 3rd party service. Each of endpoints will be treated as a separated instance.
* Format of the endpoints is the same as the endpoints that ServiceComb microservices register in service-center,
* like {@code rest://127.0.0.1:8080}
*/
public void registerMicroserviceMappingByEndpoints(String microserviceName, String version,
List<String> endpoints, Class<?> schemaIntfCls) {
ArrayList<MicroserviceInstance> microserviceInstances = new ArrayList<>();
for (String endpoint : endpoints) {
MicroserviceInstance instance = new MicroserviceInstance();
instance.setEndpoints(Collections.singletonList(endpoint));
microserviceInstances.add(instance);
}
registerMicroserviceMapping(microserviceName, version, microserviceInstances, schemaIntfCls);
}
public static String getPublishAddress() {
String publicAddressSetting =
DynamicPropertyFactory.getInstance().getStringProperty(PUBLISH_ADDRESS, "").get();
publicAddressSetting = publicAddressSetting.trim();
if (publicAddressSetting.isEmpty()) {
return NetUtils.getHostAddress();
}
// placeholder is network interface name
if (publicAddressSetting.startsWith("{") && publicAddressSetting.endsWith("}")) {
return NetUtils
.ensureGetInterfaceAddress(publicAddressSetting.substring(1, publicAddressSetting.length() - 1))
.getHostAddress();
}
return publicAddressSetting;
}
public static String getPublishHostName() {
String publicAddressSetting =
DynamicPropertyFactory.getInstance().getStringProperty(PUBLISH_ADDRESS, "").get();
publicAddressSetting = publicAddressSetting.trim();
if (publicAddressSetting.isEmpty()) {
return NetUtils.getHostName();
}
if (publicAddressSetting.startsWith("{") && publicAddressSetting.endsWith("}")) {
return NetUtils
.ensureGetInterfaceAddress(publicAddressSetting.substring(1, publicAddressSetting.length() - 1))
.getHostName();
}
return publicAddressSetting;
}
/**
* In the case that listening address configured as 0.0.0.0, the publish address will be determined
* by the query result for the net interfaces.
*
* @return the publish address, or {@code null} if the param {@code address} is null.
*/
public static String getPublishAddress(String schema, String address) {
if (address == null) {
return address;
}
try {
URI originalURI = new URI(schema + "://" + address);
IpPort ipPort = NetUtils.parseIpPort(originalURI);
if (ipPort == null) {
LOGGER.warn("address {} not valid.", address);
return null;
}
IpPort publishIpPort = genPublishIpPort(schema, ipPort);
URIBuilder builder = new URIBuilder(originalURI);
return builder.setHost(publishIpPort.getHostOrIp()).setPort(publishIpPort.getPort()).build().toString();
} catch (URISyntaxException e) {
LOGGER.warn("address {} not valid.", address);
return null;
}
}
private static IpPort genPublishIpPort(String schema, IpPort ipPort) {
String publicAddressSetting = DynamicPropertyFactory.getInstance()
.getStringProperty(PUBLISH_ADDRESS, "")
.get();
publicAddressSetting = publicAddressSetting.trim();
String publishPortKey = PUBLISH_PORT.replace("{transport_name}", schema);
int publishPortSetting = DynamicPropertyFactory.getInstance()
.getIntProperty(publishPortKey, 0)
.get();
int publishPort = publishPortSetting == 0 ? ipPort.getPort() : publishPortSetting;
if (publicAddressSetting.isEmpty()) {
InetSocketAddress socketAddress = ipPort.getSocketAddress();
if (socketAddress.getAddress().isAnyLocalAddress()) {
String host = NetUtils.getHostAddress();
if (Inet6Address.class.isInstance(socketAddress.getAddress())) {
host = NetUtils.getIpv6HostAddress();
}
LOGGER.warn("address {}, auto select a host address to publish {}:{}, maybe not the correct one",
socketAddress,
host,
publishPort);
return new IpPort(host, publishPort);
}
return ipPort;
}
if (publicAddressSetting.startsWith("{") && publicAddressSetting.endsWith("}")) {
publicAddressSetting = NetUtils
.ensureGetInterfaceAddress(
publicAddressSetting.substring(1, publicAddressSetting.length() - 1))
.getHostAddress();
}
return new IpPort(publicAddressSetting, publishPort);
}
public String info() {
StringBuilder result = new StringBuilder();
AtomicBoolean first = new AtomicBoolean(true);
registrationList.forEach(registration -> {
if (first.getAndSet(false)) {
result.append("App ID: " + registration.getAppId() + "\n");
result.append("Service Name: " + registration.getMicroservice().getServiceName() + "\n");
result.append("Version: " + registration.getMicroservice().getVersion() + "\n");
result.append("Environment: " + registration.getMicroservice().getEnvironment() + "\n");
result.append("Endpoints: " + getEndpoints(registration.getMicroserviceInstance().getEndpoints()) + "\n");
result.append("Registration implementations:\n");
}
result.append(" name:" + registration.name() + "\n");
result.append(" Service ID: " + registration.getMicroservice().getServiceId() + "\n");
result.append(" Instance ID: " + registration.getMicroserviceInstance().getInstanceId() + "\n");
});
return result.toString();
}
private String getEndpoints(List<String> endpoints) {
return JacksonFactory.CODEC.toString(endpoints);
}
public static class AfterServiceInstanceRegistryHandler {
private AtomicInteger instanceRegisterCounter;
AfterServiceInstanceRegistryHandler(int counter) {
instanceRegisterCounter = new AtomicInteger(counter);
}
@Subscribe
@EnableExceptionPropagation
public void afterRegistryInstance(MicroserviceInstanceRegisteredEvent event) {
LOGGER.info("receive MicroserviceInstanceRegisteredEvent event, registration={}, instance id={}",
event.getRegistrationName(),
event.getInstanceId());
if (instanceRegisterCounter.decrementAndGet() > 0) {
return;
}
EventManager.unregister(this);
EventManager.getEventBus().post(new MicroserviceInstanceRegisteredEvent(
"Registration Manager",
null,
true
));
}
}
}