blob: 09d3e10d0ec2eb24cc036267ba703e7857224fd1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package uniform
import (
perrors ""
// RouterChain contains all uniform router logic
// it has UniformRouter list,
type RouterChain struct {
routers []*UniformRouter
virtualServiceConfigBytes []byte
destinationRuleConfigBytes []byte
notify chan struct{}
// NewUniformRouterChain return
func NewUniformRouterChain(virtualServiceConfig, destinationRuleConfig []byte, notify chan struct{}) (router.PriorityRouter, error) {
uniformRouters := make([]*UniformRouter, 0)
var err error
fromFileConfig := true
uniformRouters, err = parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig, notify)
if err != nil {
fromFileConfig = false
logger.Warnf("parse router config form local file failed, error = %+v", err)
r := &RouterChain{
virtualServiceConfigBytes: virtualServiceConfig,
destinationRuleConfigBytes: destinationRuleConfig,
routers: uniformRouters,
notify: notify,
if err := k8s_api.SetK8sEventListener(r); err != nil {
logger.Warnf("try listen K8s router config failed, error = %+v", err)
if !fromFileConfig {
return nil, perrors.New("No config file from both local file and k8s")
return r, nil
// Route route invokers using RouterChain's routers one by one
func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
for _, v := range r.routers {
invokers = v.Route(invokers, url, invocation)
return invokers
func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
fmt.Printf("on processed event = %+v\n", *event)
if event.ConfigType == remoting.EventTypeAdd || event.ConfigType == remoting.EventTypeUpdate {
fmt.Println("event type add or update ")
switch event.Key {
case k8s_api.VirtualServiceEventKey:
fmt.Println("virtul service event key")
newVSValue, ok := event.Value.(*config.VirtualServiceConfig)
if !ok {
logger.Error("event.Value assertion error")
newVSJsonValue, ok := newVSValue.ObjectMeta.Annotations[""]
if !ok {
logger.Error("newVSValue.ObjectMeta.Annotations has no key named")
fmt.Println("json file = ", newVSJsonValue)
newVirtualServiceConfig := &config.VirtualServiceConfig{}
if err := json.Unmarshal([]byte(newVSJsonValue), newVirtualServiceConfig); err != nil {
logger.Error("on process json data unmarshal error = ", err)
newVirtualServiceConfig.YamlAPIVersion = newVirtualServiceConfig.APIVersion
newVirtualServiceConfig.YamlKind = newVirtualServiceConfig.Kind
newVirtualServiceConfig.MetaData.Name = newVirtualServiceConfig.ObjectMeta.Name
fmt.Printf("get event after asseration = %+v\n", newVirtualServiceConfig)
data, err := yaml.Marshal(newVirtualServiceConfig)
if err != nil {
logger.Error("Process change of virtual service: event.Value marshal error:", err)
r.routers, err = parseFromConfigToRouters(data, r.destinationRuleConfigBytes, r.notify)
if err != nil {
logger.Error("Process change of virtual service: parseFromConfigToRouters:", err)
case k8s_api.DestinationRuleEventKey:
fmt.Println("dest rule event key")
newDRValue, ok := event.Value.(*config.DestinationRuleConfig)
if !ok {
logger.Error("event.Value assertion error")
newDRJsonValue, ok := newDRValue.ObjectMeta.Annotations[""]
if !ok {
logger.Error("newVSValue.ObjectMeta.Annotations has no key named")
fmt.Println("json file = ", newDRJsonValue)
newDestRuleConfig := &config.DestinationRuleConfig{}
if err := json.Unmarshal([]byte(newDRJsonValue), newDestRuleConfig); err != nil {
logger.Error("on process json data unmarshal error = ", err)
newDestRuleConfig.YamlAPIVersion = newDestRuleConfig.APIVersion
newDestRuleConfig.YamlKind = newDestRuleConfig.Kind
newDestRuleConfig.MetaData.Name = newDestRuleConfig.ObjectMeta.Name
fmt.Printf("get event after asseration = %+v\n", newDestRuleConfig)
data, err := yaml.Marshal(newDestRuleConfig)
if err != nil {
logger.Error("Process change of dest rule: event.Value marshal error:", err)
r.routers, err = parseFromConfigToRouters(r.virtualServiceConfigBytes, data, r.notify)
if err != nil {
logger.Error("Process change of dest rule: parseFromConfigToRouters:", err)
logger.Error("unknow unsupported event key:", event.Key)
if event.ConfigType == remoting.EventTypeDel {
// todo delete router
// Name get name of ConnCheckerRouter
func (r *RouterChain) Name() string {
return name
// Priority get Router priority level
func (r *RouterChain) Priority() int64 {
return 0
// URL Return URL in router
func (r *RouterChain) URL() *common.URL {
return nil
// parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte, notify chan struct{}) ([]*UniformRouter, error) {
var virtualServiceConfigList []*config.VirtualServiceConfig
destRuleConfigsMap := make(map[string]map[string]map[string]string)
vsDecoder := yaml.NewDecoder(strings.NewReader(string(virtualServiceConfig)))
drDecoder := yaml.NewDecoder(strings.NewReader(string(destinationRuleConfig)))
for {
virtualServiceCfg := &config.VirtualServiceConfig{}
err := vsDecoder.Decode(virtualServiceCfg)
if err == io.EOF {
if err != nil {
logger.Error("parseFromConfigTo virtual service err = ", err)
return nil, err
virtualServiceConfigList = append(virtualServiceConfigList, virtualServiceCfg)
for {
destRuleCfg := &config.DestinationRuleConfig{}
err := drDecoder.Decode(destRuleCfg)
if err == io.EOF {
if err != nil {
logger.Error("parseFromConfigTo destination rule err = ", err)
return nil, err
destRuleCfgMap := make(map[string]map[string]string)
for _, v := range destRuleCfg.Spec.SubSets {
destRuleCfgMap[v.Name] = v.Labels
destRuleConfigsMap[destRuleCfg.Spec.Host] = destRuleCfgMap
routers := make([]*UniformRouter, 0)
for _, v := range virtualServiceConfigList {
tempSerivceNeedsDescMap := make(map[string]map[string]string)
for _, host := range v.Spec.Hosts {
targetDestMap := destRuleConfigsMap[host]
// copy to new Map
mapCombine(tempSerivceNeedsDescMap, targetDestMap)
data, err := json.Marshal(v.Spec.Dubbo)
if err != nil {
logger.Error("v.Spec.Dubbo unmarshal error = ", err)
fmt.Printf("===v.Spec.Dubbo = %s\n", string(data))
newRule, err := newDubboRouterRule(v.Spec.Dubbo, tempSerivceNeedsDescMap)
if err != nil {
logger.Error("Parse config to uniform rule err = ", err)
return nil, err
rtr, err := NewUniformRouter(newRule, notify)
if err != nil {
logger.Error("new uniform router err = ", err)
return nil, err
routers = append(routers, rtr)
logger.Debug("parsed successed! with router size = ", len(routers))
return routers, nil
func mapCombine(dist map[string]map[string]string, from map[string]map[string]string) {
for k, v := range from {
dist[k] = v