blob: 6038462c078a68c6cf5c79120ad0cef3a78c3148 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package subscriber
import (
"errors"
"reflect"
"github.com/duke-git/lancet/v2/slice"
"github.com/duke-git/lancet/v2/strutil"
"k8s.io/client-go/tools/cache"
"github.com/apache/dubbo-admin/pkg/common/bizerror"
"github.com/apache/dubbo-admin/pkg/common/constants"
enginecfg "github.com/apache/dubbo-admin/pkg/config/engine"
"github.com/apache/dubbo-admin/pkg/core/events"
"github.com/apache/dubbo-admin/pkg/core/logger"
meshresource "github.com/apache/dubbo-admin/pkg/core/resource/apis/mesh/v1alpha1"
coremodel "github.com/apache/dubbo-admin/pkg/core/resource/model"
"github.com/apache/dubbo-admin/pkg/core/store"
"github.com/apache/dubbo-admin/pkg/core/store/index"
)
type RuntimeInstanceEventSubscriber struct {
instanceStore store.ResourceStore
eventEmitter events.Emitter
}
func NewRuntimeInstanceEventSubscriber(instanceResourceStore store.ResourceStore, emitter events.Emitter) events.Subscriber {
return &RuntimeInstanceEventSubscriber{
instanceStore: instanceResourceStore,
eventEmitter: emitter,
}
}
func (s *RuntimeInstanceEventSubscriber) ResourceKind() coremodel.ResourceKind {
return meshresource.RuntimeInstanceKind
}
func (s *RuntimeInstanceEventSubscriber) Name() string {
return "Engine-" + s.ResourceKind().ToString()
}
func (s *RuntimeInstanceEventSubscriber) ProcessEvent(event events.Event) error {
newObj, ok := event.NewObj().(*meshresource.RuntimeInstanceResource)
if !ok && event.NewObj() != nil {
return bizerror.NewAssertionError(meshresource.RuntimeInstanceKind, reflect.TypeOf(event.NewObj()).Name())
}
oldObj, ok := event.OldObj().(*meshresource.RuntimeInstanceResource)
if !ok && event.OldObj() != nil {
return bizerror.NewAssertionError(meshresource.RuntimeInstanceKind, reflect.TypeOf(event.OldObj()).Name())
}
var processErr error
switch event.Type() {
case cache.Added, cache.Updated, cache.Replaced, cache.Sync:
if newObj == nil {
errStr := "process runtime instance upsert event, but new obj is nil, skipped processing"
logger.Error(errStr)
return errors.New(errStr)
}
processErr = s.processUpsert(newObj)
case cache.Deleted:
if oldObj == nil {
errStr := "process runtime instance delete event, but old obj is nil, skipped processing"
logger.Error(errStr)
return errors.New(errStr)
}
processErr = s.processDelete(oldObj)
}
if processErr != nil {
logger.Errorf("process runtime instance event failed, cause: %s event: %s", processErr.Error(), event.String())
return processErr
}
logger.Infof("process runtime instance event successfully, event: %s", event.String())
return nil
}
// processUpsert when runtime instance added or updated, we should add/update the corresponding instance resource
func (s *RuntimeInstanceEventSubscriber) processUpsert(rtInstanceRes *meshresource.RuntimeInstanceResource) error {
var instanceResource *meshresource.InstanceResource
var err error
switch rtInstanceRes.Spec.SourceEngineType {
case string(enginecfg.Kubernetes):
instanceResource, err = s.getRelatedInstanceByIP(rtInstanceRes)
default:
instanceResource, err = s.getRelatedInstanceByName(rtInstanceRes)
}
if err != nil {
return err
}
// if instance resource exists, that is to say the rpc instance exists in remote registry and has been watched by discovery.
// so we should merge the runtime info into it
if instanceResource != nil {
meshresource.MergeRuntimeInstanceIntoInstance(rtInstanceRes, instanceResource)
return s.instanceStore.Update(instanceResource)
}
// if instance resource does not exist, that is to say the rpc instance does not exist in remote registry.
// we need to check whether the runtime instance resource is enough to create a new instance resource
if !checkAttributesEnough(rtInstanceRes) {
logger.Warnf("cannot identify runtime instance %s to a dubbo instance, skipped creating new instance", rtInstanceRes.ResourceKey())
return nil
}
// if conditions met, we should create a new instance resource by runtime instance
instanceRes := meshresource.FromRuntimeInstance(rtInstanceRes)
if err = s.instanceStore.Add(instanceRes); err != nil {
logger.Errorf("add instance resource failed, instance: %s, err: %s", instanceRes.ResourceKey(), err.Error())
return err
}
instanceAddEvent := events.NewResourceChangedEvent(cache.Added, nil, instanceRes)
s.eventEmitter.Send(instanceAddEvent)
logger.Debugf("runtime instance upsert trigger instance add event, event: %s", instanceAddEvent.String())
return nil
}
// processDelete when runtime instance deleted, we should delete the corresponding instance resource
func (s *RuntimeInstanceEventSubscriber) processDelete(rtInstanceRes *meshresource.RuntimeInstanceResource) error {
var instanceResource *meshresource.InstanceResource
var err error
switch rtInstanceRes.Spec.SourceEngineType {
case string(enginecfg.Kubernetes):
instanceResource, err = s.getRelatedInstanceByIP(rtInstanceRes)
default:
instanceResource, err = s.getRelatedInstanceByName(rtInstanceRes)
}
if err != nil {
return err
}
if instanceResource == nil {
logger.Warnf("cannot find instance resource by runtime instance %s, skipped deleting instance", rtInstanceRes.ResourceKey())
return nil
}
if err = s.instanceStore.Delete(instanceResource); err != nil {
logger.Errorf("delete instance resource failed, instance: %s, err: %s", instanceResource.ResourceKey(), err.Error())
return err
}
instanceDeleteEvent := events.NewResourceChangedEvent(cache.Deleted, instanceResource, nil)
s.eventEmitter.Send(instanceDeleteEvent)
logger.Debugf("runtime instance delete trigger instance delete event, event: %s", instanceDeleteEvent.String())
return nil
}
func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceByName(
rtInstanceRes *meshresource.RuntimeInstanceResource) (*meshresource.InstanceResource, error) {
if rtInstanceRes.Spec == nil || strutil.IsBlank(rtInstanceRes.Spec.AppName) ||
strutil.IsBlank(rtInstanceRes.Spec.Ip) || rtInstanceRes.Spec.RpcPort <= 0 {
return nil, nil
}
instanceResName := meshresource.BuildInstanceResName(rtInstanceRes.Spec.AppName, rtInstanceRes.Spec.Ip, rtInstanceRes.Spec.RpcPort)
resources, err := s.instanceStore.ListByIndexes(map[string]string{
index.ByInstanceNameIndex: instanceResName,
})
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, nil
}
instanceResList := make([]*meshresource.InstanceResource, len(resources))
for i, item := range resources {
res, ok := item.(*meshresource.InstanceResource)
if !ok {
return nil, bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(item).Name())
}
instanceResList[i] = res
}
if len(instanceResList) > 1 {
resKeys := slice.Map(instanceResList, func(index int, item *meshresource.InstanceResource) string {
return item.ResourceKey()
})
logger.Warnf("there are more than two instances which have the same name, instance keys: %s, name: %s", resKeys, instanceResName)
return nil, nil
}
return instanceResList[0], nil
}
func (s *RuntimeInstanceEventSubscriber) getRelatedInstanceByIP(
rtInstanceRes *meshresource.RuntimeInstanceResource) (*meshresource.InstanceResource, error) {
resources, err := s.instanceStore.ListByIndexes(map[string]string{
index.ByInstanceIpIndex: rtInstanceRes.Spec.Ip,
})
if err != nil {
return nil, err
}
if len(resources) == 0 {
return nil, nil
}
instanceResList := make([]*meshresource.InstanceResource, len(resources))
for i, item := range resources {
res, ok := item.(*meshresource.InstanceResource)
if !ok {
return nil, bizerror.NewAssertionError("InstanceResource", reflect.TypeOf(item).Name())
}
instanceResList[i] = res
}
if len(instanceResList) > 1 {
resKeys := slice.Map(instanceResList, func(index int, item *meshresource.InstanceResource) string {
return item.ResourceKey()
})
logger.Warnf("there are instances which have same ip, instance keys: %s, ip: %s", resKeys, rtInstanceRes.Spec.Ip)
return nil, nil
}
return instanceResList[0], nil
}
func checkAttributesEnough(rtInstanceRes *meshresource.RuntimeInstanceResource) bool {
if rtInstanceRes.Spec == nil || strutil.IsBlank(rtInstanceRes.Spec.AppName) ||
strutil.IsBlank(rtInstanceRes.Spec.Ip) || rtInstanceRes.Spec.RpcPort <= 0 ||
strutil.IsBlank(rtInstanceRes.Mesh) || constants.DefaultMesh == rtInstanceRes.Mesh {
return false
}
return true
}