blob: 6d622172cfadb106314b996bcddedfeca91472d2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package servicecenter
import (
func TestClusterIndexer_Sync(t *testing.T) {
indexer := &ClusterIndexer{}
cache := discovery.NewKvCache("test", discovery.Configure())
cfg := discovery.Configure()
sccacher := NewServiceCenterCacher(cfg, cache)
arr := model.MicroserviceIndexSlice{}
// case: sync empty data
cfg.WithEventFunc(func(discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed")
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
// case: CREATE
cfg.WithEventFunc(func(evt discovery.KvEvent) {
if evt.Type != proto.EVT_CREATE {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: "a"})
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
// case: UPDATE
cfg.WithEventFunc(func(evt discovery.KvEvent) {
if evt.Type != proto.EVT_UPDATE {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
// case: UPDATE the same one
cfg.WithEventFunc(func(evt discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
// case: conflict but not print log
cfg.WithEventFunc(func(evt discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "b"})
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, indexer.logConflictFunc)
// case: conflict and print log
func() {
defer log.Recover()
cfg.WithEventFunc(func(evt discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 2, ClusterName: "b"})
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, indexer.logConflictFunc)
// '/a' is incorrect key and logConflictFunc will be excepted to panic here
t.Fatalf("TestClusterIndexer_Sync failed")
// case: some cluster err and do not overwrite the cache
cfg.WithEventFunc(func(evt discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "ab", Rev: 3, ClusterName: "b"})
indexer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
// case: DELETE but the cluster err
cfg.WithEventFunc(func(evt discovery.KvEvent) {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
indexer.checkWithConflictHandleFunc(sccacher, &arr, map[string]error{"a": fmt.Errorf("error")}, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
// case: DELETE
cfg.WithEventFunc(func(evt discovery.KvEvent) {
if evt.Type != proto.EVT_DELETE {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)
// case: CREATE again and set cluster to local cluster name
cfg.WithEventFunc(func(evt discovery.KvEvent) {
if evt.Type != proto.EVT_CREATE {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "a", Rev: 1, ClusterName: registry.Configuration().ClusterName})
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(*model.KV, model.Getter, int) {
t.Fatalf("TestClusterIndexer_Sync failed")
// case: UPDATE but skip local cluster
cfg.WithEventFunc(func(evt discovery.KvEvent) {
if evt.Type != proto.EVT_UPDATE && evt.KV.Value != "aa" {
t.Fatalf("TestClusterIndexer_Sync failed, %v", evt)
arr = model.MicroserviceIndexSlice{}
arr.SetValue(&model.KV{Key: "/a", Value: "x", Rev: 2, ClusterName: registry.Configuration().ClusterName})
arr.SetValue(&model.KV{Key: "/a", Value: "aa", Rev: 2, ClusterName: "a"})
indexer.checkWithConflictHandleFunc(sccacher, &arr, nil, func(kv *model.KV, _ model.Getter, _ int) {
t.Fatalf("TestClusterIndexer_Sync failed %v", kv)