blob: b92e335fdb69f82210d2977789902eb6123201b8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package consul
import (
import (
consul ""
perrors ""
import (
const (
registryConnDelay = 3
registryDestroyDefaultTimeout = time.Second * 3
func init() {
extension.SetRegistry("consul", newConsulRegistry)
// Consul registry wraps the consul client, to
// register and subscribe service.
type consulRegistry struct {
// Registry url.
// Consul client.
client *consul.Client
// Done field represents whether
// consul registry is closed.
done chan struct{}
func newConsulRegistry(url *common.URL) (registry.Registry, error) {
config := &consul.Config{Address: url.Location}
client, err := consul.NewClient(config)
if err != nil {
return nil, err
r := &consulRegistry{
URL: url,
client: client,
done: make(chan struct{}),
return r, nil
// Register register @url
// it delegate the job to register() method
func (r *consulRegistry) Register(url common.URL) error {
var err error
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.PROVIDER {
err = r.register(url)
if err != nil {
return err
return nil
// register actually register the @url
func (r *consulRegistry) register(url common.URL) error {
service, err := buildService(url)
if err != nil {
return err
return r.client.Agent().ServiceRegister(service)
// UnRegister unregister the @url
// it delegate the job to unregister() method
func (r *consulRegistry) UnRegister(url common.URL) error {
var err error
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.PROVIDER {
err = r.unregister(url)
if err != nil {
return err
return nil
// unregister actually unregister the @url
func (r *consulRegistry) unregister(url common.URL) error {
return r.client.Agent().ServiceDeregister(buildId(url))
// Subscribe subscribe the @url with the @notifyListener
func (r *consulRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
if role == common.CONSUMER {
r.subscribe(url, notifyListener)
return nil
// UnSubscribe is not supported yet
func (r *consulRegistry) UnSubscribe(url *common.URL, notifyListener registry.NotifyListener) error {
return perrors.New("UnSubscribe not support in consulRegistry")
// subscribe actually subscribe the @url
// it loops forever until success
func (r *consulRegistry) subscribe(url *common.URL, notifyListener registry.NotifyListener) {
for {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
listener, err := r.getListener(*url)
if err != nil {
if !r.IsAvailable() {
logger.Warnf("event listener game over.")
logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
time.Sleep(time.Duration(registryConnDelay) * time.Second)
for {
serviceEvent, err := listener.Next()
if err != nil {
logger.Warnf(" = error{%v}", perrors.WithStack(err))
logger.Infof("update begin, service event: %v", serviceEvent.String())
func (r *consulRegistry) getListener(url common.URL) (registry.Listener, error) {
listener, err := newConsulListener(*r.URL, url)
return listener, err
// GetUrl get registry URL of consul registry center
func (r *consulRegistry) GetUrl() common.URL {
return *r.URL
// IsAvailable checks consul registry center whether is available
func (r *consulRegistry) IsAvailable() bool {
select {
case <-r.done:
return false
return true
// Destroy consul registry center
func (r *consulRegistry) Destroy() {
if r.URL != nil {
done := make(chan struct{}, 1)
go func() {
defer func() {
if e := recover(); e != nil {
logger.Errorf("consulRegistry destory with panic: %v", e)
done <- struct{}{}
if err := r.UnRegister(*r.URL); err != nil {
logger.Errorf("consul registry unregister with err: %s", err.Error())
select {
case <-done:
logger.Infof("consulRegistry unregister done")
case <-time.After(registryDestroyDefaultTimeout):
logger.Errorf("consul unregister timeout")