| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package polaris |
| |
| import ( |
| "fmt" |
| "net/url" |
| "strconv" |
| ) |
| |
| import ( |
| gxchan "github.com/dubbogo/gost/container/chan" |
| "github.com/dubbogo/gost/log/logger" |
| |
| perrors "github.com/pkg/errors" |
| |
| "github.com/polarismesh/polaris-go/pkg/model" |
| ) |
| |
| import ( |
| "dubbo.apache.org/dubbo-go/v3/common" |
| "dubbo.apache.org/dubbo-go/v3/common/constant" |
| "dubbo.apache.org/dubbo-go/v3/config_center" |
| "dubbo.apache.org/dubbo-go/v3/registry" |
| "dubbo.apache.org/dubbo-go/v3/remoting" |
| ) |
| |
| type polarisListener struct { |
| watcher *PolarisServiceWatcher |
| events *gxchan.UnboundedChan |
| closeCh chan struct{} |
| } |
| |
| // NewPolarisListener new polaris listener |
| func NewPolarisListener(watcher *PolarisServiceWatcher) (*polarisListener, error) { |
| listener := &polarisListener{ |
| watcher: watcher, |
| events: gxchan.NewUnboundedChan(32), |
| closeCh: make(chan struct{}), |
| } |
| listener.startListen() |
| return listener, nil |
| } |
| func (pl *polarisListener) startListen() { |
| pl.watcher.AddSubscriber(func(et remoting.EventType, ins []model.Instance) { |
| for i := range ins { |
| pl.events.In() <- &config_center.ConfigChangeEvent{Value: ins[i], ConfigType: et} |
| } |
| }) |
| } |
| |
| // Next returns next service event once received |
| func (pl *polarisListener) Next() (*registry.ServiceEvent, error) { |
| for { |
| select { |
| case <-pl.closeCh: |
| logger.Warnf("polaris listener is close") |
| return nil, perrors.New("listener stopped") |
| case val := <-pl.events.Out(): |
| e, _ := val.(*config_center.ConfigChangeEvent) |
| logger.Debugf("got polaris event %s", e) |
| instance := e.Value.(model.Instance) |
| return ®istry.ServiceEvent{Action: e.ConfigType, Service: generateUrl(instance)}, nil |
| } |
| } |
| } |
| |
| // Close closes this listener |
| func (pl *polarisListener) Close() { |
| // TODO need to add UnWatch in polaris |
| close(pl.closeCh) |
| } |
| |
| func generateUrl(instance model.Instance) *common.URL { |
| if instance.GetMetadata() == nil { |
| logger.Errorf("polaris instance metadata is empty,instance:%+v", instance) |
| return nil |
| } |
| path := instance.GetMetadata()["path"] |
| myInterface := instance.GetMetadata()["interface"] |
| if len(path) == 0 && len(myInterface) == 0 { |
| logger.Errorf("polaris instance metadata does not have both path key and interface key,instance:%+v", instance) |
| return nil |
| } |
| if len(path) == 0 && len(myInterface) != 0 { |
| path = "/" + myInterface |
| } |
| protocol := instance.GetProtocol() |
| if len(protocol) == 0 { |
| logger.Errorf("polaris instance metadata does not have protocol key,instance:%+v", instance) |
| return nil |
| } |
| urlMap := url.Values{} |
| for k, v := range instance.GetMetadata() { |
| urlMap.Set(k, v) |
| } |
| urlMap.Set(constant.PolarisInstanceID, instance.GetId()) |
| urlMap.Set(constant.PolarisInstanceHealthStatus, fmt.Sprintf("%+v", instance.IsHealthy())) |
| urlMap.Set(constant.PolarisInstanceIsolatedStatus, fmt.Sprintf("%+v", instance.IsIsolated())) |
| instance.GetCircuitBreakerStatus() |
| return common.NewURLWithOptions( |
| common.WithIp(instance.GetHost()), |
| common.WithPort(strconv.Itoa(int(instance.GetPort()))), |
| common.WithProtocol(protocol), |
| common.WithParams(urlMap), |
| common.WithPath(path), |
| ) |
| } |