refactor: compatible with topology persistence (#191)
diff --git a/go.mod b/go.mod
index 9988f61..3a53779 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@
require (
github.com/AlekSi/gocov-xml v1.0.0
- github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60
+ github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c
github.com/axw/gocov v1.1.0
github.com/caarlos0/env/v6 v6.10.1
github.com/julienschmidt/httprouter v1.3.0
diff --git a/go.sum b/go.sum
index d26eaee..b4f80a2 100644
--- a/go.sum
+++ b/go.sum
@@ -20,6 +20,8 @@
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ=
github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
diff --git a/server/cluster/manager.go b/server/cluster/manager.go
index 9350b9f..3696aa7 100644
--- a/server/cluster/manager.go
+++ b/server/cluster/manager.go
@@ -4,6 +4,7 @@
import (
"context"
+ "fmt"
"path"
"sync"
"time"
@@ -56,9 +57,12 @@
alloc id.Allocator
rootPath string
idAllocatorStep uint
+
+ // TODO: topologyType is used to be compatible with cluster data changes and needs to be deleted later.
+ topologyType storage.TopologyType
}
-func NewManagerImpl(storage storage.Storage, kv clientv3.KV, client *clientv3.Client, rootPath string, idAllocatorStep uint) (Manager, error) {
+func NewManagerImpl(storage storage.Storage, kv clientv3.KV, client *clientv3.Client, rootPath string, idAllocatorStep uint, topologyType storage.TopologyType) (Manager, error) {
alloc := id.NewAllocatorImpl(log.GetLogger(), kv, path.Join(rootPath, AllocClusterIDPrefix), idAllocatorStep)
manager := &managerImpl{
@@ -69,6 +73,7 @@
clusters: make(map[string]*Cluster, 0),
rootPath: rootPath,
idAllocatorStep: idAllocatorStep,
+ topologyType: topologyType,
}
return manager, nil
@@ -319,16 +324,40 @@
for _, metadataStorage := range clusters.Clusters {
logger := log.With(zap.String("clusterName", metadataStorage.Name))
clusterMetadata := metadata.NewClusterMetadata(logger, metadataStorage, m.storage, m.kv, m.rootPath, m.idAllocatorStep)
- if err := clusterMetadata.Load(ctx); err != nil {
+ if err = clusterMetadata.Load(ctx); err != nil {
log.Error("fail to load cluster", zap.String("cluster", clusterMetadata.Name()), zap.Error(err))
return errors.WithMessage(err, "fail to load cluster")
}
+
+ // TODO: topologyType is used to be compatible with cluster data changes and needs to be deleted later
+ if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown {
+ req := storage.UpdateClusterRequest{
+ Cluster: storage.Cluster{
+ ID: metadataStorage.ID,
+ Name: metadataStorage.Name,
+ MinNodeCount: metadataStorage.MinNodeCount,
+ ShardTotal: metadataStorage.ShardTotal,
+ EnableSchedule: metadataStorage.EnableSchedule,
+ TopologyType: m.topologyType,
+ CreatedAt: metadataStorage.CreatedAt,
+ ModifiedAt: uint64(time.Now().UnixMilli()),
+ },
+ }
+ if err := m.storage.UpdateCluster(ctx, req); err != nil {
+ return errors.WithMessagef(err, "update cluster topology type failed, clusterName:%s", clusterMetadata.Name())
+ }
+ log.Info("update cluster topology type successfully", zap.String("request", fmt.Sprintf("%v", req)))
+ if err := clusterMetadata.LoadMetadata(ctx); err != nil {
+ log.Error("fail to load cluster", zap.String("clusterName", clusterMetadata.Name()), zap.Error(err))
+ return err
+ }
+ }
+
log.Info("open cluster successfully", zap.String("cluster", clusterMetadata.Name()))
c, err := NewCluster(logger, clusterMetadata, m.client, m.rootPath)
if err != nil {
return errors.WithMessage(err, "new cluster")
}
-
m.clusters[clusterMetadata.Name()] = c
if err := c.Start(ctx); err != nil {
return errors.WithMessage(err, "start cluster")
diff --git a/server/cluster/manager_test.go b/server/cluster/manager_test.go
index 02c0879..3cfbea5 100644
--- a/server/cluster/manager_test.go
+++ b/server/cluster/manager_test.go
@@ -42,7 +42,7 @@
}
func newClusterManagerWithStorage(storage storage.Storage, kv clientv3.KV, client *clientv3.Client) (cluster.Manager, error) {
- return cluster.NewManagerImpl(storage, kv, client, testRootPath, defaultIDAllocatorStep)
+ return cluster.NewManagerImpl(storage, kv, client, testRootPath, defaultIDAllocatorStep, defaultTopologyType)
}
func TestClusterManager(t *testing.T) {
diff --git a/server/server.go b/server/server.go
index 781edf2..c77bf62 100644
--- a/server/server.go
+++ b/server/server.go
@@ -161,9 +161,14 @@
MaxScanLimit: srv.cfg.MaxScanLimit, MinScanLimit: srv.cfg.MinScanLimit,
})
- manager, err := cluster.NewManagerImpl(storage, srv.etcdCli, srv.etcdCli, srv.cfg.StorageRootPath, srv.cfg.IDAllocatorStep)
+ topologyType, err := metadata.ParseTopologyType(srv.cfg.TopologyType)
if err != nil {
- return errors.WithMessage(err, "start server")
+ return err
+ }
+
+ manager, err := cluster.NewManagerImpl(storage, srv.etcdCli, srv.etcdCli, srv.cfg.StorageRootPath, srv.cfg.IDAllocatorStep, topologyType)
+ if err != nil {
+ return err
}
srv.clusterManager = manager
srv.flowLimiter = limiter.NewFlowLimiter(srv.cfg.FlowLimiter)
diff --git a/server/storage/types.go b/server/storage/types.go
index 71aaea8..a80cb82 100644
--- a/server/storage/types.go
+++ b/server/storage/types.go
@@ -24,6 +24,7 @@
ClusterStateStable
ClusterStatePrepare
+ TopologyTypeUnknown = "unknown"
TopologyTypeStatic = "static"
TopologyTypeDynamic = "dynamic"
)
@@ -283,6 +284,8 @@
func convertTopologyTypeToPB(topologyType TopologyType) clusterpb.Cluster_TopologyType {
switch topologyType {
+ case TopologyTypeUnknown:
+ return clusterpb.Cluster_UNKNOWN
case TopologyTypeStatic:
return clusterpb.Cluster_STATIC
case TopologyTypeDynamic:
@@ -293,6 +296,8 @@
func convertTopologyTypePB(topologyType clusterpb.Cluster_TopologyType) TopologyType {
switch topologyType {
+ case clusterpb.Cluster_UNKNOWN:
+ return TopologyTypeUnknown
case clusterpb.Cluster_STATIC:
return TopologyTypeStatic
case clusterpb.Cluster_DYNAMIC: