refactor: compatible with topology persistence (#191)

diff --git a/go.mod b/go.mod
index 9988f61..3a53779 100644
--- a/go.mod
+++ b/go.mod
@@ -4,7 +4,7 @@
 
 require (
 	github.com/AlekSi/gocov-xml v1.0.0
-	github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60
+	github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c
 	github.com/axw/gocov v1.1.0
 	github.com/caarlos0/env/v6 v6.10.1
 	github.com/julienschmidt/httprouter v1.3.0
diff --git a/go.sum b/go.sum
index d26eaee..b4f80a2 100644
--- a/go.sum
+++ b/go.sum
@@ -20,6 +20,8 @@
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60 h1:+/bcJ6M6SnXWjhA80c5Qq6u+LASrPGxoDCMIZoJcmaQ=
 github.com/CeresDB/ceresdbproto/golang v0.0.0-20230515021908-1b3a3eae3d60/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c h1:Z/FkMasq2ZTcsKsFuiUaLi26mLyx23mxwlbt1NC/eRY=
+github.com/CeresDB/ceresdbproto/golang v0.0.0-20230531074927-f60b148b3a5c/go.mod h1:qLTh6jtSu2ZLIFsU3iiDIKvkrQvyY/Csg6Mk0Ub0QZ4=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
diff --git a/server/cluster/manager.go b/server/cluster/manager.go
index 9350b9f..3696aa7 100644
--- a/server/cluster/manager.go
+++ b/server/cluster/manager.go
@@ -4,6 +4,7 @@
 
 import (
 	"context"
+	"fmt"
 	"path"
 	"sync"
 	"time"
@@ -56,9 +57,12 @@
 	alloc           id.Allocator
 	rootPath        string
 	idAllocatorStep uint
+
+	// TODO: topologyType is used to be compatible with cluster data changes and needs to be deleted later.
+	topologyType storage.TopologyType
 }
 
-func NewManagerImpl(storage storage.Storage, kv clientv3.KV, client *clientv3.Client, rootPath string, idAllocatorStep uint) (Manager, error) {
+func NewManagerImpl(storage storage.Storage, kv clientv3.KV, client *clientv3.Client, rootPath string, idAllocatorStep uint, topologyType storage.TopologyType) (Manager, error) {
 	alloc := id.NewAllocatorImpl(log.GetLogger(), kv, path.Join(rootPath, AllocClusterIDPrefix), idAllocatorStep)
 
 	manager := &managerImpl{
@@ -69,6 +73,7 @@
 		clusters:        make(map[string]*Cluster, 0),
 		rootPath:        rootPath,
 		idAllocatorStep: idAllocatorStep,
+		topologyType:    topologyType,
 	}
 
 	return manager, nil
@@ -319,16 +324,40 @@
 	for _, metadataStorage := range clusters.Clusters {
 		logger := log.With(zap.String("clusterName", metadataStorage.Name))
 		clusterMetadata := metadata.NewClusterMetadata(logger, metadataStorage, m.storage, m.kv, m.rootPath, m.idAllocatorStep)
-		if err := clusterMetadata.Load(ctx); err != nil {
+		if err = clusterMetadata.Load(ctx); err != nil {
 			log.Error("fail to load cluster", zap.String("cluster", clusterMetadata.Name()), zap.Error(err))
 			return errors.WithMessage(err, "fail to load cluster")
 		}
+
+		// TODO: topologyType is used to be compatible with cluster data changes and needs to be deleted later
+		if clusterMetadata.GetStorageMetadata().TopologyType == storage.TopologyTypeUnknown {
+			req := storage.UpdateClusterRequest{
+				Cluster: storage.Cluster{
+					ID:             metadataStorage.ID,
+					Name:           metadataStorage.Name,
+					MinNodeCount:   metadataStorage.MinNodeCount,
+					ShardTotal:     metadataStorage.ShardTotal,
+					EnableSchedule: metadataStorage.EnableSchedule,
+					TopologyType:   m.topologyType,
+					CreatedAt:      metadataStorage.CreatedAt,
+					ModifiedAt:     uint64(time.Now().UnixMilli()),
+				},
+			}
+			if err := m.storage.UpdateCluster(ctx, req); err != nil {
+				return errors.WithMessagef(err, "update cluster topology type failed, clusterName:%s", clusterMetadata.Name())
+			}
+			log.Info("update cluster topology type successfully", zap.String("request", fmt.Sprintf("%v", req)))
+			if err := clusterMetadata.LoadMetadata(ctx); err != nil {
+				log.Error("fail to load cluster", zap.String("clusterName", clusterMetadata.Name()), zap.Error(err))
+				return err
+			}
+		}
+
 		log.Info("open cluster successfully", zap.String("cluster", clusterMetadata.Name()))
 		c, err := NewCluster(logger, clusterMetadata, m.client, m.rootPath)
 		if err != nil {
 			return errors.WithMessage(err, "new cluster")
 		}
-
 		m.clusters[clusterMetadata.Name()] = c
 		if err := c.Start(ctx); err != nil {
 			return errors.WithMessage(err, "start cluster")
diff --git a/server/cluster/manager_test.go b/server/cluster/manager_test.go
index 02c0879..3cfbea5 100644
--- a/server/cluster/manager_test.go
+++ b/server/cluster/manager_test.go
@@ -42,7 +42,7 @@
 }
 
 func newClusterManagerWithStorage(storage storage.Storage, kv clientv3.KV, client *clientv3.Client) (cluster.Manager, error) {
-	return cluster.NewManagerImpl(storage, kv, client, testRootPath, defaultIDAllocatorStep)
+	return cluster.NewManagerImpl(storage, kv, client, testRootPath, defaultIDAllocatorStep, defaultTopologyType)
 }
 
 func TestClusterManager(t *testing.T) {
diff --git a/server/server.go b/server/server.go
index 781edf2..c77bf62 100644
--- a/server/server.go
+++ b/server/server.go
@@ -161,9 +161,14 @@
 		MaxScanLimit: srv.cfg.MaxScanLimit, MinScanLimit: srv.cfg.MinScanLimit,
 	})
 
-	manager, err := cluster.NewManagerImpl(storage, srv.etcdCli, srv.etcdCli, srv.cfg.StorageRootPath, srv.cfg.IDAllocatorStep)
+	topologyType, err := metadata.ParseTopologyType(srv.cfg.TopologyType)
 	if err != nil {
-		return errors.WithMessage(err, "start server")
+		return err
+	}
+
+	manager, err := cluster.NewManagerImpl(storage, srv.etcdCli, srv.etcdCli, srv.cfg.StorageRootPath, srv.cfg.IDAllocatorStep, topologyType)
+	if err != nil {
+		return err
 	}
 	srv.clusterManager = manager
 	srv.flowLimiter = limiter.NewFlowLimiter(srv.cfg.FlowLimiter)
diff --git a/server/storage/types.go b/server/storage/types.go
index 71aaea8..a80cb82 100644
--- a/server/storage/types.go
+++ b/server/storage/types.go
@@ -24,6 +24,7 @@
 	ClusterStateStable
 	ClusterStatePrepare
 
+	TopologyTypeUnknown = "unknown"
 	TopologyTypeStatic  = "static"
 	TopologyTypeDynamic = "dynamic"
 )
@@ -283,6 +284,8 @@
 
 func convertTopologyTypeToPB(topologyType TopologyType) clusterpb.Cluster_TopologyType {
 	switch topologyType {
+	case TopologyTypeUnknown:
+		return clusterpb.Cluster_UNKNOWN
 	case TopologyTypeStatic:
 		return clusterpb.Cluster_STATIC
 	case TopologyTypeDynamic:
@@ -293,6 +296,8 @@
 
 func convertTopologyTypePB(topologyType clusterpb.Cluster_TopologyType) TopologyType {
 	switch topologyType {
+	case clusterpb.Cluster_UNKNOWN:
+		return TopologyTypeUnknown
 	case clusterpb.Cluster_STATIC:
 		return TopologyTypeStatic
 	case clusterpb.Cluster_DYNAMIC: