blob: 16a796977602530d45a14072d63a93c1d1e6d11d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nacos
import (
"fmt"
"reflect"
"strings"
"sync"
)
import (
"github.com/nacos-group/nacos-sdk-go/clients/cache"
model2 "github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/util"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud/servicediscovery"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/remote/nacos"
)
type nacosServiceDiscovery struct {
targetService []string
//descriptor string
client *nacos.NacosClient
config *model.RemoteConfig
listener servicediscovery.ServiceEventListener
instanceMap map[string]servicediscovery.ServiceInstance
cacheLock sync.Mutex
callbackFlagMap cache.ConcurrentMap
//done chan struct{}
}
func (n *nacosServiceDiscovery) Subscribe() error {
if n.client == nil {
return perrors.New("nacos naming client stopped")
}
serviceNames := n.listener.GetServiceNames()
for _, serviceName := range serviceNames {
subscribeParam := &vo.SubscribeParam{
ServiceName: serviceName,
GroupName: n.config.Group,
Clusters: []string{"DEFAULT"},
SubscribeCallback: n.Callback,
}
key := util.GetServiceCacheKey(util.GetGroupName(subscribeParam.ServiceName, subscribeParam.GroupName), strings.Join(subscribeParam.Clusters, ","))
if n.callbackFlagMap.Has(key) {
continue
}
go func() {
err := n.client.Subscribe(subscribeParam)
if err != nil {
logger.Warnf("[Pixiu-Nacos] Subscribe %s error %s", subscribeParam.ServiceName, err)
return
}
n.callbackFlagMap.Set(key, true)
}()
}
return nil
}
func (n *nacosServiceDiscovery) Unsubscribe() error {
if n.client == nil {
return perrors.New("nacos naming client stopped")
}
serviceNames := n.listener.GetServiceNames()
for _, serviceName := range serviceNames {
subscribeParam := &vo.SubscribeParam{
ServiceName: serviceName,
GroupName: n.config.Group,
Clusters: []string{"DEFAULT"},
SubscribeCallback: n.Callback,
}
_ = n.client.Unsubscribe(subscribeParam)
}
return nil
}
func (n *nacosServiceDiscovery) Callback(services []model2.SubscribeService, err error) {
addInstances := make([]servicediscovery.ServiceInstance, 0, len(services))
delInstances := make([]servicediscovery.ServiceInstance, 0, len(services))
updateInstances := make([]servicediscovery.ServiceInstance, 0, len(services))
newInstanceMap := make(map[string]servicediscovery.ServiceInstance, len(services))
n.cacheLock.Lock()
for i := range services {
service := services[i]
if !service.Enable {
// instance is not available,so ignore it
continue
}
instance := fromSubscribeServiceToServiceInstance(service)
key := instance.GetUniqKey()
newInstanceMap[instance.GetUniqKey()] = instance
if old, ok := n.instanceMap[key]; !ok {
// instance does not exist in cache, add it to cache
addInstances = append(addInstances, instance)
} else {
// instance is not different from cache, update it to cache
if !reflect.DeepEqual(old, instance) {
updateInstances = append(updateInstances, instance)
}
}
}
for host, inst := range n.instanceMap {
if _, ok := newInstanceMap[host]; !ok {
// cache instance does not exist in new instance list, remove it from cache
delInstances = append(delInstances, inst)
}
}
n.instanceMap = newInstanceMap
n.cacheLock.Unlock()
for _, instance := range addInstances {
n.listener.OnAddServiceInstance(&instance)
}
for _, instance := range delInstances {
n.listener.OnDeleteServiceInstance(&instance)
}
for _, instance := range updateInstances {
n.listener.OnUpdateServiceInstance(&instance)
}
}
func (n *nacosServiceDiscovery) QueryServicesByName(serviceNames []string) ([]servicediscovery.ServiceInstance, error) {
res := make([]servicediscovery.ServiceInstance, 0, len(serviceNames))
// need get all service instance api
for _, serviceName := range serviceNames {
instances, err := n.client.SelectInstances(vo.SelectInstancesParam{
ServiceName: serviceName,
GroupName: n.config.Group,
Clusters: []string{"DEFAULT"},
HealthyOnly: true,
})
if err != nil {
logger.Warnf("QueryServices SelectInstances {key:%s} = error{%s}", serviceName, err)
continue
}
for _, instance := range instances {
si := fromInstanceToServiceInstance(serviceName, instance)
res = append(res, si)
}
}
n.cacheLock.Lock()
defer n.cacheLock.Unlock()
for _, instance := range res {
n.instanceMap[instance.GetUniqKey()] = instance
}
return res, nil
}
func (n *nacosServiceDiscovery) QueryAllServices() ([]servicediscovery.ServiceInstance, error) {
services, err := n.client.GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: n.config.Group,
PageSize: 10,
})
if err != nil {
return nil, err
}
return n.QueryServicesByName(services.Doms)
}
func (n *nacosServiceDiscovery) Register() error {
panic("implement me")
}
func (n *nacosServiceDiscovery) UnRegister() error {
panic("implement me")
}
func (n *nacosServiceDiscovery) Get(s string) []*servicediscovery.ServiceInstance {
panic("implement me")
}
func (n *nacosServiceDiscovery) StartPeriodicalRefresh() error {
panic("implement me")
}
func NewNacosServiceDiscovery(targetService []string, config *model.RemoteConfig, l servicediscovery.ServiceEventListener) (servicediscovery.ServiceDiscovery, error) {
client, err := nacos.NewNacosClient(config)
if err != nil {
return nil, err
}
return &nacosServiceDiscovery{
targetService: targetService,
client: client,
config: config,
listener: l,
instanceMap: make(map[string]servicediscovery.ServiceInstance),
callbackFlagMap: cache.NewConcurrentMap(),
}, nil
}
func fromInstanceToServiceInstance(serviceName string, instance model2.Instance) servicediscovery.ServiceInstance {
addr := instance.Ip + ":" + fmt.Sprint(instance.Port)
return servicediscovery.ServiceInstance{
// nacos sdk return empty instanceId, so use addr
//ID: instance.InstanceId,
ID: addr,
ServiceName: serviceName,
Host: instance.Ip,
Port: int(instance.Port),
// SelectInstances default return all health instance, not unhealthy
Healthy: instance.Healthy,
Enable: instance.Enable,
CLusterName: instance.ClusterName,
Metadata: instance.Metadata,
}
}
func fromSubscribeServiceToServiceInstance(instance model2.SubscribeService) servicediscovery.ServiceInstance {
addr := instance.Ip + ":" + fmt.Sprint(instance.Port)
// because it value is DEFAULT_GROUP@@user-service, so split it with @@, and get service name
serviceName := instance.ServiceName
tmp := strings.Split(serviceName, "@@")
if len(tmp) == 2 {
serviceName = tmp[1]
}
return servicediscovery.ServiceInstance{
// nacos sdk return empty instanceId, so use addr
//ID: instance.InstanceId,
ID: addr,
ServiceName: serviceName,
Host: instance.Ip,
Port: int(instance.Port),
// subscribe callback service should be healthy
Healthy: true,
Enable: instance.Enable,
CLusterName: instance.ClusterName,
Metadata: instance.Metadata,
}
}