Feature: append inner properties when heartbeat success (#1259)
* Feature: append inner properties when heartbeat success
* Optimize: print err log
diff --git a/etc/conf/app.yaml b/etc/conf/app.yaml
index ed039f4..f822961 100644
--- a/etc/conf/app.yaml
+++ b/etc/conf/app.yaml
@@ -145,7 +145,7 @@
name:
region:
availableZone:
- # properties params for instance
+ # inner properties params for instance, sc will always append these to instance properties
properties:
schema:
diff --git a/server/service/disco/instance.go b/server/service/disco/instance.go
index 226c24d..767d6ce 100644
--- a/server/service/disco/instance.go
+++ b/server/service/disco/instance.go
@@ -47,6 +47,13 @@
propertiesMap map[string]string
)
+func getInnerProperties() map[string]string {
+ once.Do(func() {
+ propertiesMap = config.GetStringMap("registry.instance.properties")
+ })
+ return propertiesMap
+}
+
func RegisterInstance(ctx context.Context, in *pb.RegisterInstanceRequest) (*pb.RegisterInstanceResponse, error) {
remoteIP := util.GetIPFromContext(ctx)
@@ -104,27 +111,22 @@
}
instance.Version = microservice.Version
- setPropertiesToInstance(instance)
+ appendInnerPropertiesToInstance(instance)
return nil
}
-func setPropertiesToInstance(instance *pb.MicroServiceInstance) {
+func appendInnerPropertiesToInstance(instance *pb.MicroServiceInstance) {
if instance.Properties == nil {
instance.Properties = make(map[string]string)
}
- once.Do(func() {
- propertiesMap = config.GetStringMap("registry.instance.properties")
- })
-
- if len(propertiesMap) <= 0 {
+ innerProps := getInnerProperties()
+ if len(innerProps) <= 0 {
return
}
- for k, v := range propertiesMap {
- if _, ok := instance.Properties[k]; !ok {
- instance.Properties[k] = v
- }
+ for k, v := range innerProps {
+ instance.Properties[k] = v
}
}
@@ -141,13 +143,64 @@
func SendHeartbeat(ctx context.Context, in *pb.HeartbeatRequest) error {
remoteIP := util.GetIPFromContext(ctx)
+ instanceID := in.InstanceId
+ serviceID := in.ServiceId
if err := validator.ValidateHeartbeatRequest(in); err != nil {
- log.Error(fmt.Sprintf("heartbeat failed, invalid parameters, operator %s", remoteIP), err)
+ log.Error(fmt.Sprintf("send heartbeat[%s/%s] failed, invalid parameters, operator %s",
+ serviceID, instanceID, remoteIP), err)
return pb.NewError(pb.ErrInvalidParams, err.Error())
}
- return datasource.GetMetadataManager().SendHeartbeat(ctx, in)
+ err := datasource.GetMetadataManager().SendHeartbeat(ctx, in)
+ if err != nil {
+ log.Error(fmt.Sprintf("send heartbeat[%s/%s] failed, operator %s", serviceID, instanceID, remoteIP), err)
+ return err
+ }
+
+ // append the inner properties
+ err = appendInnerProperties(ctx, serviceID, instanceID)
+ if err != nil {
+ log.Error(fmt.Sprintf("append inner instance[%s/%s] properties failed, operator %s",
+ serviceID, instanceID, remoteIP), err)
+ return err
+ }
+ return nil
+}
+
+func appendInnerProperties(ctx context.Context, serviceID string, instanceID string) error {
+ resp, err := datasource.GetMetadataManager().GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceID, ProviderInstanceId: instanceID})
+ if err != nil {
+ log.Error(fmt.Sprintf("get instance[%s/%s] failed", serviceID, instanceID), err)
+ return err
+ }
+ instance := resp.Instance
+ if !shouldAppendInnerProperties(instance) {
+ return nil
+ }
+ props := make(map[string]string, len(resp.Instance.Properties))
+ for k, v := range resp.Instance.Properties {
+ props[k] = v
+ }
+ return PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
+ ServiceId: serviceID,
+ InstanceId: instanceID,
+ Properties: props,
+ })
+}
+
+func shouldAppendInnerProperties(instance *pb.MicroServiceInstance) bool {
+ instProps := instance.Properties
+ innerProps := getInnerProperties()
+ if len(innerProps) == 0 {
+ return false
+ }
+ for k, v := range innerProps {
+ if prop, ok := instProps[k]; !ok || prop != v {
+ return true
+ }
+ }
+ return false
}
func SendManyHeartbeat(ctx context.Context, in *pb.HeartbeatSetRequest) (*pb.HeartbeatSetResponse, error) {
@@ -374,6 +427,13 @@
return pb.NewError(pb.ErrInvalidParams, err.Error())
}
+ properties := getInnerProperties()
+ if in.Properties == nil {
+ in.Properties = make(map[string]string, len(properties))
+ }
+ for k, v := range properties {
+ in.Properties[k] = v
+ }
return datasource.GetMetadataManager().PutInstanceProperties(ctx, in)
}
diff --git a/server/service/disco/instance_test.go b/server/service/disco/instance_test.go
index d5c9ab0..1cf4dc5 100644
--- a/server/service/disco/instance_test.go
+++ b/server/service/disco/instance_test.go
@@ -24,11 +24,12 @@
"strings"
"testing"
+ _ "github.com/apache/servicecomb-service-center/test"
+
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/apache/servicecomb-service-center/server/core"
discosvc "github.com/apache/servicecomb-service-center/server/service/disco"
- _ "github.com/apache/servicecomb-service-center/test"
pb "github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/pkg/errsvc"
"github.com/stretchr/testify/assert"
@@ -1862,13 +1863,26 @@
})
t.Run("when update a lease, should be passed", func(t *testing.T) {
- err := discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
+ resp, err := discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId1})
+ assert.NoError(t, err)
+ resp.Instance.Properties = nil
+
+ err = discosvc.PutInstance(ctx, &pb.RegisterInstanceRequest{Instance: resp.Instance})
+ assert.NoError(t, err)
+
+ err = discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
ServiceId: serviceId,
InstanceId: instanceId1,
})
assert.NoError(t, err)
- err = discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
+ resp, err = discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId1})
+ assert.NoError(t, err)
+ assert.Equal(t, "test_engineID", resp.Instance.Properties["engineID"])
+ })
+
+ t.Run("when update lease with invalid request, should be failed", func(t *testing.T) {
+ err := discosvc.SendHeartbeat(ctx, &pb.HeartbeatRequest{
ServiceId: "",
InstanceId: instanceId1,
})
@@ -2118,6 +2132,10 @@
})
assert.NoError(t, err)
+ resp, err := discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId})
+ assert.NoError(t, err)
+ assert.Equal(t, "test", resp.Instance.Properties["test"])
+
size := 1000
properties := make(map[string]string, size)
for i := 0; i < size; i++ {
@@ -2133,6 +2151,19 @@
err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
ServiceId: serviceId,
+ InstanceId: instanceId,
+ })
+ assert.NoError(t, err)
+
+ resp, err = discosvc.GetInstance(ctx, &pb.GetOneInstanceRequest{ProviderServiceId: serviceId, ProviderInstanceId: instanceId})
+ assert.NoError(t, err)
+ _, ok := resp.Instance.Properties["test"]
+ assert.False(t, ok)
+ })
+
+ t.Run("when update instance properties with invalid request, should be failed", func(t *testing.T) {
+ err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
+ ServiceId: serviceId,
InstanceId: "notexistins",
Properties: map[string]string{
"test": "test",
@@ -2198,12 +2229,6 @@
assert.Equal(t, pb.ErrInvalidParams, testErr.Code)
err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
- ServiceId: serviceId,
- InstanceId: instanceId,
- })
- assert.NoError(t, err)
-
- err = discosvc.PutInstanceProperties(ctx, &pb.UpdateInstancePropsRequest{
ServiceId: "notexistservice",
InstanceId: instanceId,
Properties: map[string]string{