| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package registry |
| |
| import ( |
| "fmt" |
| "net/url" |
| "os" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| ) |
| |
| import ( |
| "github.com/dubbogo/gost/log/logger" |
| |
| perrors "github.com/pkg/errors" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/common" |
| "dubbo.apache.org/dubbo-go/v3/common/constant" |
| "dubbo.apache.org/dubbo-go/v3/metrics" |
| metricsRegistry "dubbo.apache.org/dubbo-go/v3/metrics/registry" |
| ) |
| |
| const ( |
| RegistryConnDelay = 3 // connection delay |
| MaxWaitInterval = 3 * time.Second // max wait interval |
| ) |
| |
| var ( |
| processID = "" |
| localIP = "" |
| ) |
| |
| func init() { |
| processID = fmt.Sprintf("%d", os.Getpid()) |
| localIP = common.GetLocalIp() |
| } |
| |
| type createPathFunc func(dubboPath string) error |
| |
| // FacadeBasedRegistry is the interface of Registry, and it is designed for registry who |
| // want to inherit BaseRegistry. If there is no special case, you'd better inherit BaseRegistry |
| // and implement the FacadeBasedRegistry interface instead of directly implementing the |
| // Registry interface. |
| // |
| // CreatePath method create the path in the registry. |
| // |
| // DoRegister method actually does the register job. |
| // |
| // DoUnregister method does the unregister job. |
| // |
| // DoSubscribe method actually subscribes the URL. |
| // |
| // DoUnsubscribe method does unsubscribe the URL. |
| // |
| // CloseAndNilClient method closes the client and then reset the client in registry to nil |
| // you should notice that this method will be invoked inside a lock. |
| // So you should implement this method as light weighted as you can. |
| // |
| // CloseListener method closes listeners. |
| // |
| // InitListeners method init listeners |
| type FacadeBasedRegistry interface { |
| Registry |
| |
| CreatePath(string) error |
| DoRegister(string, string) error |
| DoUnregister(string, string) error |
| DoSubscribe(conf *common.URL) (Listener, error) |
| DoUnsubscribe(conf *common.URL) (Listener, error) |
| CloseAndNilClient() |
| CloseListener() |
| InitListeners() |
| } |
| |
| // BaseRegistry is a common logic abstract for registry. It implement Registry interface. |
| type BaseRegistry struct { |
| facadeBasedRegistry FacadeBasedRegistry |
| *common.URL |
| birth int64 // time of file birth, seconds since Epoch; 0 if unknown |
| wg sync.WaitGroup // wg+done for zk restart |
| done chan struct{} |
| registered *sync.Map |
| cltLock sync.RWMutex |
| } |
| |
| // InitBaseRegistry for init some local variables and set BaseRegistry's subclass to it |
| func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry { |
| r.URL = url |
| r.birth = time.Now().UnixNano() |
| r.done = make(chan struct{}) |
| r.registered = &sync.Map{} |
| r.facadeBasedRegistry = facadeRegistry |
| return r |
| } |
| |
| // GetURL for get registry's url |
| func (r *BaseRegistry) GetURL() *common.URL { |
| return r.URL |
| } |
| |
| // Destroy for graceful down |
| func (r *BaseRegistry) Destroy() { |
| // first step close registry's all listeners |
| r.facadeBasedRegistry.CloseListener() |
| // then close r.done to notify other program who listen to it |
| close(r.Done()) |
| // wait waitgroup done (wait listeners outside close over) |
| r.WaitGroup().Wait() |
| |
| // close registry client |
| r.closeRegisters() |
| } |
| |
| // Register implement interface registry to register |
| func (r *BaseRegistry) Register(url *common.URL) error { |
| start := time.Now() |
| // todo bug when provider、consumer simultaneous initialization |
| if _, ok := r.registered.Load(url.Key()); ok { |
| return perrors.Errorf("Service {%s} has been registered", url.Key()) |
| } |
| |
| err := r.register(url) |
| defer metrics.Publish(metricsRegistry.NewRegisterEvent(err == nil, start)) |
| if err == nil { |
| r.registered.Store(url.Key(), url) |
| |
| } else { |
| err = perrors.WithMessagef(err, "register(url:%+v)", url) |
| } |
| |
| return err |
| } |
| |
| // UnRegister implement interface registry to unregister |
| func (r *BaseRegistry) UnRegister(url *common.URL) error { |
| if _, ok := r.registered.Load(url.Key()); !ok { |
| return perrors.Errorf("Service {%s} has not registered", url.Key()) |
| } |
| err := r.unregister(url) |
| metrics.Publish(metricsRegistry.NewSubscribeEvent(err == nil)) |
| if err == nil { |
| r.registered.Delete(url.Key()) |
| } else { |
| err = perrors.WithMessagef(err, "unregister(url:%+v)", url) |
| } |
| |
| return err |
| } |
| |
| // service is for getting service path stored in url |
| func (r *BaseRegistry) service(c *common.URL) string { |
| return url.QueryEscape(c.Service()) |
| } |
| |
| // RestartCallBack for reregister when reconnect |
| func (r *BaseRegistry) RestartCallBack() bool { |
| flag := true |
| r.registered.Range(func(key, value interface{}) bool { |
| registeredUrl := value.(*common.URL) |
| err := r.register(registeredUrl) |
| if err != nil { |
| flag = false |
| logger.Errorf("failed to re-register service :%v, error{%#v}", |
| registeredUrl, perrors.WithStack(err)) |
| return flag |
| } |
| |
| logger.Infof("success to re-register service :%v", registeredUrl.Key()) |
| return flag |
| }) |
| |
| if flag { |
| r.facadeBasedRegistry.InitListeners() |
| } |
| |
| return flag |
| } |
| |
| // register for register url to registry, include init params |
| func (r *BaseRegistry) register(c *common.URL) error { |
| return r.processURL(c, r.facadeBasedRegistry.DoRegister, r.createPath) |
| } |
| |
| // unregister for unregister url to registry, include init params |
| func (r *BaseRegistry) unregister(c *common.URL) error { |
| return r.processURL(c, r.facadeBasedRegistry.DoUnregister, nil) |
| } |
| |
| func (r *BaseRegistry) processURL(c *common.URL, f func(string, string) error, cpf createPathFunc) error { |
| if f == nil { |
| panic(" Must provide a `function(string, string) error` to process URL. ") |
| } |
| var ( |
| err error |
| params url.Values |
| rawURL string |
| encodedURL string |
| dubboPath string |
| ) |
| params = url.Values{} |
| |
| c.RangeParams(func(key, value string) bool { |
| params.Add(key, value) |
| return true |
| }) |
| |
| role, _ := strconv.Atoi(c.GetParam(constant.RegistryRoleKey, "")) |
| switch role { |
| case common.PROVIDER: |
| dubboPath, rawURL, err = r.providerRegistry(c, params, cpf) |
| case common.CONSUMER: |
| dubboPath, rawURL, err = r.consumerRegistry(c, params, cpf) |
| default: |
| return perrors.Errorf("@c{%v} type is not referencer or provider", c) |
| } |
| if err != nil { |
| return perrors.WithMessagef(err, "@c{%v} registry fail", c) |
| } |
| |
| encodedURL = url.QueryEscape(rawURL) |
| dubboPath = strings.ReplaceAll(dubboPath, "$", "%24") |
| err = f(dubboPath, encodedURL) |
| |
| if err != nil { |
| return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL) |
| } |
| return nil |
| } |
| |
| // createPath will create dubbo path in register |
| func (r *BaseRegistry) createPath(dubboPath string) error { |
| r.cltLock.Lock() |
| defer r.cltLock.Unlock() |
| return r.facadeBasedRegistry.CreatePath(dubboPath) |
| } |
| |
| // providerRegistry for provider role do |
| func (r *BaseRegistry) providerRegistry(c *common.URL, params url.Values, f createPathFunc) (string, string, error) { |
| var ( |
| dubboPath string |
| rawURL string |
| err error |
| ) |
| if c.Path == "" || len(c.Methods) == 0 { |
| return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods) |
| } |
| dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.PROVIDER]) |
| if f != nil { |
| err = f(dubboPath) |
| } |
| if err != nil { |
| logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err)) |
| return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath) |
| } |
| params.Add(constant.AnyhostKey, "true") |
| |
| // Dubbo java consumer to start looking for the provider url,because the category does not match, |
| // the provider will not find, causing the consumer can not start, so we use consumers. |
| |
| if len(c.Methods) != 0 { |
| params.Add(constant.MethodsKey, strings.Join(c.Methods, ",")) |
| } |
| logger.Debugf("provider url params:%#v", params) |
| var host string |
| if len(c.Ip) == 0 { |
| host = localIP |
| } else { |
| host = c.Ip |
| } |
| host += ":" + c.Port |
| |
| // delete empty param key |
| for key, val := range params { |
| if len(val) > 0 && val[0] == "" { |
| params.Del(key) |
| } |
| } |
| |
| s, _ := url.QueryUnescape(params.Encode()) |
| rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, s) |
| // Print your own registration service providers. |
| logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL) |
| return dubboPath, rawURL, nil |
| } |
| |
| // consumerRegistry for consumer role do |
| func (r *BaseRegistry) consumerRegistry(c *common.URL, params url.Values, f createPathFunc) (string, string, error) { |
| var ( |
| dubboPath string |
| rawURL string |
| err error |
| ) |
| dubboPath = fmt.Sprintf("/%s/%s/%s", r.URL.GetParam(constant.RegistryGroupKey, "dubbo"), r.service(c), common.DubboNodes[common.CONSUMER]) |
| |
| if f != nil { |
| err = f(dubboPath) |
| } |
| |
| if err != nil { |
| logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err)) |
| return "", "", perrors.WithStack(err) |
| } |
| |
| params.Add("protocol", c.Protocol) |
| s, _ := url.QueryUnescape(params.Encode()) |
| rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, s) |
| logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL) |
| return dubboPath, rawURL, nil |
| } |
| |
| // sleepWait... |
| func sleepWait(n int) { |
| wait := time.Duration((n + 1) * 2e8) |
| if wait > MaxWaitInterval { |
| wait = MaxWaitInterval |
| } |
| time.Sleep(wait) |
| } |
| |
| // Subscribe :subscribe from registry, event will notify by notifyListener |
| func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) error { |
| for { |
| if !r.IsAvailable() { |
| logger.Warnf("event listener game over.") |
| return perrors.New("BaseRegistry is not available.") |
| } |
| |
| listener, err := r.facadeBasedRegistry.DoSubscribe(url) |
| if err != nil { |
| if !r.IsAvailable() { |
| logger.Warnf("event listener game over.") |
| return err |
| } |
| logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) |
| time.Sleep(time.Duration(RegistryConnDelay) * time.Second) |
| continue |
| } |
| |
| for { |
| if serviceEvent, err := listener.Next(); err != nil { |
| logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err)) |
| listener.Close() |
| return nil |
| } else { |
| logger.Debugf("[Registry] update begin, service event: %v", serviceEvent.String()) |
| notifyListener.Notify(serviceEvent) |
| } |
| } |
| } |
| } |
| |
| // UnSubscribe URL |
| func (r *BaseRegistry) UnSubscribe(url *common.URL, notifyListener NotifyListener) error { |
| if !r.IsAvailable() { |
| logger.Warnf("event listener game over.") |
| return perrors.New("BaseRegistry is not available.") |
| } |
| |
| listener, err := r.facadeBasedRegistry.DoUnsubscribe(url) |
| if err != nil { |
| if !r.IsAvailable() { |
| logger.Warnf("event listener game over.") |
| return perrors.New("BaseRegistry is not available.") |
| } |
| logger.Warnf("getListener() = err:%v", perrors.WithStack(err)) |
| return perrors.WithStack(err) |
| } |
| |
| if listener != nil { |
| listener.Close() |
| } |
| |
| return nil |
| } |
| |
| // LoadSubscribeInstances load subscribe instance |
| func (r *BaseRegistry) LoadSubscribeInstances(url *common.URL, notify NotifyListener) error { |
| return r.facadeBasedRegistry.LoadSubscribeInstances(url, notify) |
| } |
| |
| // closeRegisters close and remove registry client and reset services map |
| func (r *BaseRegistry) closeRegisters() { |
| logger.Infof("begin to close provider client") |
| r.cltLock.Lock() |
| defer r.cltLock.Unlock() |
| // Close and remove(set to nil) the registry client |
| r.facadeBasedRegistry.CloseAndNilClient() |
| // reset the services map |
| r.registered = nil |
| } |
| |
| // IsAvailable judge to is registry not closed by chan r.done |
| func (r *BaseRegistry) IsAvailable() bool { |
| select { |
| case <-r.Done(): |
| return false |
| default: |
| return true |
| } |
| } |
| |
| // WaitGroup open for outside add the waitgroup to add some logic before registry destroyed over(graceful down) |
| func (r *BaseRegistry) WaitGroup() *sync.WaitGroup { |
| return &r.wg |
| } |
| |
| // Done open for outside to listen the event of registry Destroy() called. |
| func (r *BaseRegistry) Done() chan struct{} { |
| return r.done |
| } |