blob: 671e801b92cc4773e336a3f7c9c9aefb2f104880 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
)
import (
dubboCommon "dubbo.apache.org/dubbo-go/v3/common"
ex "dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata/definition"
dr "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper/curator_discovery"
"github.com/dubbo-go-pixiu/pixiu-api/pkg/api/config"
"github.com/dubbogo/go-zookeeper/zk"
)
import (
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/common"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/registry"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/adapter/dubboregistry/remoting/zookeeper"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pixiu/pkg/logger"
)
var _ registry.Listener = new(applicationServiceListener)
// applicationServiceListener normally monitors the /services/[:application]
type applicationServiceListener struct {
urls []*dubboCommon.URL
servicePath string
client *zookeeper.ZooKeeperClient
adapterListener common.RegistryEventListener
exit chan struct{}
wg sync.WaitGroup
}
// newApplicationServiceListener creates a new zk service listener
func newApplicationServiceListener(path string, client *zookeeper.ZooKeeperClient, adapterListener common.RegistryEventListener) *applicationServiceListener {
return &applicationServiceListener{
servicePath: path,
client: client,
exit: make(chan struct{}),
adapterListener: adapterListener,
}
}
func (asl *applicationServiceListener) WatchAndHandle() {
defer asl.wg.Done()
var (
failTimes int64 = 0
delayTimer = time.NewTimer(ConnDelay * time.Duration(failTimes))
)
defer delayTimer.Stop()
for {
children, e, err := asl.client.GetChildrenW(asl.servicePath)
// error handling
if err != nil {
failTimes++
logger.Infof("watching (path{%s}) = error{%v}", asl.servicePath, err)
// Exit the watch if root node is in error
if err == zookeeper.ErrNilNode {
logger.Errorf("watching (path{%s}) got errNilNode,so exit listen", asl.servicePath)
return
}
if failTimes > MaxFailTimes {
logger.Errorf("Error happens on (path{%s}) exceed max fail times: %v,so exit listen",
asl.servicePath, MaxFailTimes)
return
}
delayTimer.Reset(ConnDelay * time.Duration(failTimes))
<-delayTimer.C
continue
}
failTimes = 0
if continueLoop := asl.waitEventAndHandlePeriod(children, e); !continueLoop {
return
}
}
}
func (asl *applicationServiceListener) waitEventAndHandlePeriod(children []string, e <-chan zk.Event) bool {
tickerTTL := defaultTTL
ticker := time.NewTicker(tickerTTL)
defer ticker.Stop()
asl.handleEvent(children)
for {
select {
case <-ticker.C:
asl.handleEvent(children)
case zkEvent := <-e:
logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, zookeeper.StateToString(zkEvent.State), zkEvent.Err)
if zkEvent.Type != zk.EventNodeChildrenChanged {
return true
}
asl.handleEvent(children)
return true
case <-asl.exit:
logger.Warnf("listen(path{%s}) goroutine exit now...", asl.servicePath)
return false
}
}
}
func (asl *applicationServiceListener) handleEvent(children []string) {
fetchChildren, err := asl.client.GetChildren(asl.servicePath)
if err != nil {
logger.Warnf("Error when retrieving newChildren in path: %s, Error:%s", asl.servicePath, err.Error())
// disable the API
for _, url := range asl.urls {
bkConf, _, _, _ := registry.ParseDubboString(url.String())
apiPattern := registry.GetAPIPattern(bkConf)
if err := asl.adapterListener.OnDeleteRouter(config.Resource{Path: apiPattern}); err != nil {
logger.Errorf("Error={%s} when try to remove API by path: %s", err.Error(), apiPattern)
}
}
return
}
asl.urls = asl.getUrls(fetchChildren[0])
for _, url := range asl.urls {
bkConfig, _, location, err := registry.ParseDubboString(url.String())
if err != nil {
logger.Warnf("Parse dubbo interface provider %s failed; due to %s", url.String(), err.Error())
continue
}
if len(bkConfig.ApplicationName) == 0 || len(bkConfig.Interface) == 0 {
continue
}
methods, err := asl.getMethods(bkConfig.Interface)
if err != nil {
logger.Warnf("Get methods of interface %s failed; due to %s", bkConfig.Interface, err.Error())
continue
}
apiPattern := registry.GetAPIPattern(bkConfig)
mappingParams := []config.MappingParam{
{
Name: "requestBody.values",
MapTo: "opt.values",
},
{
Name: "requestBody.types",
MapTo: "opt.types",
},
}
for i := range methods {
api := registry.CreateAPIConfig(apiPattern, location, bkConfig, methods[i], mappingParams)
if err := asl.adapterListener.OnAddAPI(api); err != nil {
logger.Errorf("Error={%s} happens when try to add api %s", err.Error(), api.Path)
}
}
}
}
// getUrls return exported urls from instance
func (asl *applicationServiceListener) getUrls(path string) []*dubboCommon.URL {
insPath := strings.Join([]string{asl.servicePath, path}, constant.PathSlash)
data, err := asl.client.GetContent(insPath)
if err != nil {
logger.Errorf("Error when get content in path: %s, Error:%s", insPath, err.Error())
return nil
}
// convert the data to service instance
iss := &curator_discovery.ServiceInstance{}
err = json.Unmarshal(data, iss)
if err != nil {
logger.Warnf("Parse service instance %s failed due to %s", insPath, err.Error())
return nil
}
instance := toZookeeperInstance(iss)
metaData := instance.GetMetadata()
metadataStorageType, ok := metaData[constant.MetadataStorageTypeKey]
if !ok {
metadataStorageType = constant.DefaultMetadataStorageType
}
// get metadata service proxy factory according to the metadataStorageType
proxyFactory := ex.GetMetadataServiceProxyFactory(metadataStorageType)
if proxyFactory == nil {
return nil
}
metadataService := proxyFactory.GetProxy(instance)
if metadataService == nil {
logger.Warnf("Get metadataService of instance %s failed", instance)
return nil
}
// call GetExportedURLs to get the exported urls
urls, err := metadataService.GetExportedURLs(constant.AnyValue, constant.AnyValue, constant.AnyValue, constant.AnyValue)
if err != nil {
logger.Errorf("Get exported urls of instance %s failed; due to %s", instance, err.Error())
return nil
}
return urls
}
// toZookeeperInstance convert to registry's service instance
func toZookeeperInstance(cris *curator_discovery.ServiceInstance) dr.ServiceInstance {
pl, ok := cris.Payload.(map[string]interface{})
if !ok {
logger.Errorf("toZookeeperInstance{%s} payload is not map[string]interface{}", cris.ID)
return nil
}
mdi, ok := pl["metadata"].(map[string]interface{})
if !ok {
logger.Errorf("toZookeeperInstance{%s} metadata is not map[string]interface{}", cris.ID)
return nil
}
md := make(map[string]string, len(mdi))
for k, v := range mdi {
md[k] = fmt.Sprint(v)
}
return &dr.DefaultServiceInstance{
ID: cris.ID,
ServiceName: cris.Name,
Host: cris.Address,
Port: cris.Port,
Enable: true,
Healthy: true,
Metadata: md,
}
}
// getMethods return the methods of a service
func (asl *applicationServiceListener) getMethods(in string) ([]string, error) {
methods := []string{}
path := strings.Join([]string{methodsRootPath, in}, constant.PathSlash)
data, err := asl.client.GetContent(path)
if err != nil {
return nil, err
}
sd := &definition.ServiceDefinition{}
err = json.Unmarshal(data, sd)
if err != nil {
return nil, err
}
for _, m := range sd.Methods {
methods = append(methods, m.Name)
}
return methods, nil
}
// Close closes this listener
func (asl *applicationServiceListener) Close() {
close(asl.exit)
asl.wg.Wait()
}