blob: 440953e0709709dee3cc0aca174e87ad256955c3 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package local
import (
import (
import (
func init() {
extension.SetLocalMetadataService(constant.DefaultKey, GetLocalMetadataService)
// version will be used by Version func
const (
version = "1.0.0"
// MetadataService is store and query the metadata info in memory when each service registry
type MetadataService struct {
exportedServiceURLs *sync.Map
subscribedServiceURLs *sync.Map
serviceDefinitions *sync.Map
lock *sync.RWMutex
mOnce *sync.Once
metadataInfo *common.MetadataInfo
metadataServiceURL *common.URL
var (
metadataServiceInstance *MetadataService
metadataServiceInitOnce sync.Once
// GetLocalMetadataService which should be singleton initiates a metadata service
func GetLocalMetadataService() (service.MetadataService, error) {
metadataServiceInitOnce.Do(func() {
metadataServiceInstance = &MetadataService{
// todo(DMwangnima): use external config
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
exportedServiceURLs: &sync.Map{},
subscribedServiceURLs: &sync.Map{},
serviceDefinitions: &sync.Map{},
lock: &sync.RWMutex{},
metadataInfo: nil,
mOnce: &sync.Once{},
return metadataServiceInstance, nil
// addURL will add URL in memory
func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
var (
urlSet interface{}
loaded bool
if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded {
wantedUrl := urlSet.(*skip.SkipList).Get(url)
if len(wantedUrl) > 0 && wantedUrl[0] != nil {
return false
// double chk
wantedUrl := urlSet.(*skip.SkipList).Get(url)
if len(wantedUrl) > 0 && wantedUrl[0] != nil {
return false
return true
// removeURL is used to remove specified url
func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
if value, loaded := targetMap.Load(url.ServiceKey()); loaded {
defer mts.lock.RUnlock()
if value.(*skip.SkipList).Len() == 0 {
// getAllService can return all the exportedUrlString except for metadataService
func (mts *MetadataService) getAllService(services *sync.Map) []*common.URL {
// using skip list to dedup and sorting
var res []*common.URL
services.Range(func(key, value interface{}) bool {
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := urls.ByPosition(i).(*common.URL)
if url.Service() != constant.MetadataServiceName {
res = append(res, url)
return true
return res
// getSpecifiedService can return specified service url by serviceKey
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) []*common.URL {
var res []*common.URL
serviceList, loaded := services.Load(serviceKey)
if loaded {
urls := serviceList.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := urls.ByPosition(i).(*common.URL)
if len(protocol) == 0 || protocol == constant.AnyValue || url.Protocol == protocol || url.GetParam(constant.ProtocolKey, "") == protocol {
res = append(res, url)
return res
// ExportURL can store the in memory
func (mts *MetadataService) ExportURL(url *common.URL) (bool, error) {
if constant.MetadataServiceName == url.GetParam(constant.InterfaceKey, "") {
mts.metadataServiceURL = url
return true, nil
mts.mOnce.Do(func() {
mts.metadataInfo = common.NewMetadataInfWithApp(config.GetApplicationConfig().Name)
return mts.addURL(mts.exportedServiceURLs, url), nil
// UnexportURL can remove the url store in memory
func (mts *MetadataService) UnexportURL(url *common.URL) error {
if constant.MetadataServiceName == url.GetParam(constant.InterfaceKey, "") {
mts.metadataServiceURL = nil
return nil
if mts.metadataInfo != nil {
mts.removeURL(mts.exportedServiceURLs, url)
return nil
// SubscribeURL can store the in memory
func (mts *MetadataService) SubscribeURL(url *common.URL) (bool, error) {
return mts.addURL(mts.subscribedServiceURLs, url), nil
// UnsubscribeURL can remove the url store in memory
func (mts *MetadataService) UnsubscribeURL(url *common.URL) error {
mts.removeURL(mts.subscribedServiceURLs, url)
return nil
// PublishServiceDefinition publish url's service metadata info, and write into memory
func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
if common.RoleType(common.CONSUMER).Role() == url.GetParam(constant.SideKey, "") {
return nil
interfaceName := url.GetParam(constant.InterfaceKey, "")
isGeneric := url.GetParamBool(constant.GenericKey, false)
if len(interfaceName) > 0 && !isGeneric {
tmpService := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
sd := definition.BuildFullDefinition(*tmpService, url)
data, err := sd.ToBytes()
if err != nil {
logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err)
return nil
mts.serviceDefinitions.Store(url.ServiceKey(), string(data))
return nil
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
return nil
// GetExportedURLs get all exported urls
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]*common.URL, error) {
if serviceInterface == constant.AnyValue {
return mts.getAllService(mts.exportedServiceURLs), nil
} else {
serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version)
return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol), nil
// GetSubscribedURLs get all subscribedUrl
func (mts *MetadataService) GetSubscribedURLs() ([]*common.URL, error) {
return mts.getAllService(mts.subscribedServiceURLs), nil
// GetServiceDefinition can get service definition by interfaceName, group and version
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
serviceKey := definition.ServiceDescriperBuild(interfaceName, group, version)
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
// GetServiceDefinitionByServiceKey can get service definition by serviceKey
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
// GetMetadataInfo can get metadata in memory
func (mts *MetadataService) GetMetadataInfo(revision string) (*common.MetadataInfo, error) {
if revision == "" {
return mts.metadataInfo, nil
if mts.metadataInfo.CalAndGetRevision() != revision {
return nil, nil
return mts.metadataInfo, nil
// GetExportedServiceURLs get exported service urls
func (mts *MetadataService) GetExportedServiceURLs() ([]*common.URL, error) {
return mts.getAllService(mts.exportedServiceURLs), nil
// RefreshMetadata will always return true because it will be implement by remote service
func (mts *MetadataService) RefreshMetadata(string, string) (bool, error) {
return true, nil
// Version will return the version of metadata service
func (mts *MetadataService) Version() (string, error) {
return version, nil
// GetMetadataServiceURL get url of MetadataService
func (mts *MetadataService) GetMetadataServiceURL() (*common.URL, error) {
return mts.metadataServiceURL, nil
// SetMetadataServiceURL save url of MetadataService
func (mts *MetadataService) SetMetadataServiceURL(url *common.URL) error {
mts.metadataServiceURL = url
return nil