| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package uniform |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "github.com/apache/dubbo-go/cluster/router" |
| "github.com/apache/dubbo-go/cluster/router/uniform/k8s_api" |
| "github.com/apache/dubbo-go/common" |
| "github.com/apache/dubbo-go/common/logger" |
| "github.com/apache/dubbo-go/config" |
| "github.com/apache/dubbo-go/config_center" |
| "github.com/apache/dubbo-go/protocol" |
| "github.com/apache/dubbo-go/remoting" |
| perrors "github.com/pkg/errors" |
| "gopkg.in/yaml.v2" |
| "io" |
| "strings" |
| ) |
| |
| // RouterChain contains all uniform router logic |
| // it has UniformRouter list, |
| type RouterChain struct { |
| routers []*UniformRouter |
| virtualServiceConfigBytes []byte |
| destinationRuleConfigBytes []byte |
| notify chan struct{} |
| } |
| |
| // NewUniformRouterChain return |
| func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte, notify chan struct{}) (router.PriorityRouter, error) { |
| fromFileConfig := true |
| uniformRouters, err := parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig, notify) |
| if err != nil { |
| fromFileConfig = false |
| logger.Warnf("parse router config form local file failed, error = %+v", err) |
| } |
| r := &RouterChain{ |
| virtualServiceConfigBytes: virtualServiceConfig, |
| destinationRuleConfigBytes: destinationRuleConfig, |
| routers: uniformRouters, |
| notify: notify, |
| } |
| if err := k8s_api.SetK8sEventListener(r); err != nil { |
| logger.Warnf("try listen K8s router config failed, error = %+v", err) |
| if !fromFileConfig { |
| return nil, perrors.New("No config file from both local file and k8s") |
| } |
| } |
| return r, nil |
| } |
| |
| // Route route invokers using RouterChain's routers one by one |
| func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { |
| for _, v := range r.routers { |
| invokers = v.Route(invokers, url, invocation) |
| } |
| return invokers |
| } |
| |
| func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) { |
| fmt.Printf("on processed event = %+v\n", *event) |
| if event.ConfigType == remoting.EventTypeAdd || event.ConfigType == remoting.EventTypeUpdate { |
| fmt.Println("event type add or update ") |
| switch event.Key { |
| case k8s_api.VirtualServiceEventKey: |
| fmt.Println("virtul service event key") |
| newVSValue, ok := event.Value.(*config.VirtualServiceConfig) |
| if !ok { |
| logger.Error("event.Value assertion error") |
| return |
| } |
| |
| newVSJsonValue, ok := newVSValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] |
| if !ok { |
| logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration") |
| return |
| } |
| fmt.Println("json file = ", newVSJsonValue) |
| newVirtualServiceConfig := &config.VirtualServiceConfig{} |
| if err := json.Unmarshal([]byte(newVSJsonValue), newVirtualServiceConfig); err != nil { |
| logger.Error("on process json data unmarshal error = ", err) |
| return |
| } |
| newVirtualServiceConfig.YamlAPIVersion = newVirtualServiceConfig.APIVersion |
| newVirtualServiceConfig.YamlKind = newVirtualServiceConfig.Kind |
| newVirtualServiceConfig.MetaData.Name = newVirtualServiceConfig.ObjectMeta.Name |
| fmt.Printf("get event after asseration = %+v\n", newVirtualServiceConfig) |
| data, err := yaml.Marshal(newVirtualServiceConfig) |
| if err != nil { |
| logger.Error("Process change of virtual service: event.Value marshal error:", err) |
| return |
| } |
| r.routers, err = parseFromConfigToRouters(data, r.destinationRuleConfigBytes, r.notify) |
| if err != nil { |
| logger.Error("Process change of virtual service: parseFromConfigToRouters:", err) |
| return |
| } |
| case k8s_api.DestinationRuleEventKey: |
| fmt.Println("dest rule event key") |
| newDRValue, ok := event.Value.(*config.DestinationRuleConfig) |
| if !ok { |
| logger.Error("event.Value assertion error") |
| return |
| } |
| |
| newDRJsonValue, ok := newDRValue.ObjectMeta.Annotations["kubectl.kubernetes.io/last-applied-configuration"] |
| if !ok { |
| logger.Error("newVSValue.ObjectMeta.Annotations has no key named kubectl.kubernetes.io/last-applied-configuration") |
| return |
| } |
| newDestRuleConfig := &config.DestinationRuleConfig{} |
| if err := json.Unmarshal([]byte(newDRJsonValue), newDestRuleConfig); err != nil { |
| logger.Error("on process json data unmarshal error = ", err) |
| return |
| } |
| newDestRuleConfig.YamlAPIVersion = newDestRuleConfig.APIVersion |
| newDestRuleConfig.YamlKind = newDestRuleConfig.Kind |
| newDestRuleConfig.MetaData.Name = newDestRuleConfig.ObjectMeta.Name |
| fmt.Printf("get event after asseration = %+v\n", newDestRuleConfig) |
| data, err := yaml.Marshal(newDestRuleConfig) |
| if err != nil { |
| logger.Error("Process change of dest rule: event.Value marshal error:", err) |
| return |
| } |
| r.routers, err = parseFromConfigToRouters(r.virtualServiceConfigBytes, data, r.notify) |
| if err != nil { |
| logger.Error("Process change of dest rule: parseFromConfigToRouters:", err) |
| return |
| } |
| default: |
| logger.Error("unknow unsupported event key:", event.Key) |
| } |
| } |
| |
| // todo delete router |
| //if event.ConfigType == remoting.EventTypeDel { |
| // |
| //} |
| } |
| |
| // Name get name of ConnCheckerRouter |
| func (r *RouterChain) Name() string { |
| return name |
| } |
| |
| // Priority get Router priority level |
| func (r *RouterChain) Priority() int64 { |
| return 0 |
| } |
| |
| // URL Return URL in router |
| func (r *RouterChain) URL() *common.URL { |
| return nil |
| } |
| |
| // parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list |
| func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte, notify chan struct{}) ([]*UniformRouter, error) { |
| var virtualServiceConfigList []*config.VirtualServiceConfig |
| destRuleConfigsMap := make(map[string]map[string]map[string]string) |
| |
| vsDecoder := yaml.NewDecoder(strings.NewReader(string(virtualServiceConfig))) |
| drDecoder := yaml.NewDecoder(strings.NewReader(string(destinationRuleConfig))) |
| for { |
| virtualServiceCfg := &config.VirtualServiceConfig{} |
| |
| err := vsDecoder.Decode(virtualServiceCfg) |
| if err == io.EOF { |
| break |
| } |
| |
| if err != nil { |
| logger.Error("parseFromConfigTo virtual service err = ", err) |
| return nil, err |
| } |
| virtualServiceConfigList = append(virtualServiceConfigList, virtualServiceCfg) |
| } |
| |
| for { |
| destRuleCfg := &config.DestinationRuleConfig{} |
| err := drDecoder.Decode(destRuleCfg) |
| if err == io.EOF { |
| break |
| } |
| if err != nil { |
| logger.Error("parseFromConfigTo destination rule err = ", err) |
| return nil, err |
| } |
| destRuleCfgMap := make(map[string]map[string]string) |
| for _, v := range destRuleCfg.Spec.SubSets { |
| destRuleCfgMap[v.Name] = v.Labels |
| } |
| destRuleConfigsMap[destRuleCfg.Spec.Host] = destRuleCfgMap |
| } |
| |
| routers := make([]*UniformRouter, 0) |
| |
| for _, v := range virtualServiceConfigList { |
| tempSerivceNeedsDescMap := make(map[string]map[string]string) |
| for _, host := range v.Spec.Hosts { |
| targetDestMap := destRuleConfigsMap[host] |
| |
| // copy to new Map |
| mapCombine(tempSerivceNeedsDescMap, targetDestMap) |
| } |
| // change single config to one rule |
| newRule, err := newDubboRouterRule(v.Spec.Dubbo, tempSerivceNeedsDescMap) |
| if err != nil { |
| logger.Error("Parse config to uniform rule err = ", err) |
| return nil, err |
| } |
| rtr, err := NewUniformRouter(newRule, notify) |
| if err != nil { |
| logger.Error("new uniform router err = ", err) |
| return nil, err |
| } |
| routers = append(routers, rtr) |
| } |
| logger.Debug("parsed successed! with router size = ", len(routers)) |
| return routers, nil |
| } |
| |
| func mapCombine(dist map[string]map[string]string, from map[string]map[string]string) { |
| for k, v := range from { |
| dist[k] = v |
| } |
| } |