| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package etcdv3 |
| |
| import ( |
| "strings" |
| "sync" |
| ) |
| |
| import ( |
| perrors "github.com/pkg/errors" |
| ) |
| |
| import ( |
| "github.com/apache/dubbo-go/common" |
| "github.com/apache/dubbo-go/common/logger" |
| "github.com/apache/dubbo-go/config_center" |
| "github.com/apache/dubbo-go/registry" |
| "github.com/apache/dubbo-go/remoting" |
| ) |
| |
| type dataListener struct { |
| interestedURL []*common.URL |
| listener config_center.ConfigurationListener |
| } |
| |
| // NewRegistryDataListener creates a data listener for etcd |
| func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { |
| return &dataListener{listener: listener} |
| } |
| |
| // AddInterestedURL adds a registration @url to listen |
| func (l *dataListener) AddInterestedURL(url *common.URL) { |
| l.interestedURL = append(l.interestedURL, url) |
| } |
| |
| // DataChange processes the data change event from registry center of etcd |
| func (l *dataListener) DataChange(eventType remoting.Event) bool { |
| index := strings.Index(eventType.Path, "/providers/") |
| if index == -1 { |
| logger.Warnf("Listen with no url, event.path={%v}", eventType.Path) |
| return false |
| } |
| url := eventType.Path[index+len("/providers/"):] |
| serviceURL, err := common.NewURL(url) |
| if err != nil { |
| logger.Warnf("Listen NewURL(r{%s}) = error{%v}", eventType.Path, err) |
| return false |
| } |
| |
| for _, v := range l.interestedURL { |
| if serviceURL.URLEqual(v) { |
| l.listener.Process( |
| &config_center.ConfigChangeEvent{ |
| Key: eventType.Path, |
| Value: serviceURL, |
| ConfigType: eventType.Action, |
| }, |
| ) |
| return true |
| } |
| } |
| return false |
| } |
| |
| type configurationListener struct { |
| registry *etcdV3Registry |
| events chan *config_center.ConfigChangeEvent |
| closeOnce sync.Once |
| } |
| |
| // NewConfigurationListener for listening the event of etcdv3. |
| func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { |
| // add a new waiter |
| reg.WaitGroup().Add(1) |
| return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)} |
| } |
| |
| // Process data change event from config center of etcd |
| func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) { |
| l.events <- configType |
| } |
| |
| // Next returns next service event once received |
| func (l *configurationListener) Next() (*registry.ServiceEvent, error) { |
| for { |
| select { |
| case <-l.registry.Done(): |
| logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.") |
| return nil, perrors.New("listener stopped") |
| |
| case e := <-l.events: |
| logger.Infof("got etcd event %#v", e) |
| if e.ConfigType == remoting.EventTypeDel && l.registry.client.Valid() { |
| select { |
| case <-l.registry.Done(): |
| logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value) |
| default: |
| } |
| continue |
| } |
| return ®istry.ServiceEvent{Action: e.ConfigType, Service: e.Value.(*common.URL)}, nil |
| } |
| } |
| } |
| |
| // Close etcd registry center |
| func (l *configurationListener) Close() { |
| l.closeOnce.Do(func() { |
| l.registry.WaitGroup().Done() |
| }) |
| } |