refactor: refactor http api module (#198)

diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go
index bf3550b..b750195 100644
--- a/server/coordinator/procedure/ddl/createtable/create_table.go
+++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -15,6 +15,7 @@
 	"github.com/CeresDB/ceresmeta/server/storage"
 	"github.com/looplab/fsm"
 	"github.com/pkg/errors"
+	"go.uber.org/zap"
 )
 
 const (
@@ -60,6 +61,8 @@
 		return
 	}
 
+	log.Debug("create table metadata finish", zap.String("tableName", createTableMetadataRequest.TableName))
+
 	shardVersionUpdate := metadata.ShardVersionUpdate{
 		ShardID:     params.ShardID,
 		CurrVersion: req.p.relatedVersionInfo.ShardWithVersion[params.ShardID] + 1,
@@ -72,12 +75,16 @@
 		return
 	}
 
+	log.Debug("dispatch createTableOnShard finish", zap.String("tableName", createTableMetadataRequest.TableName))
+
 	createTableResult, err := params.ClusterMetadata.AddTableTopology(req.ctx, params.ShardID, result.Table)
 	if err != nil {
-		procedure.CancelEventWithLog(event, err, "create table metadata")
+		procedure.CancelEventWithLog(event, err, "add table topology")
 		return
 	}
 
+	log.Debug("add table topology finish", zap.String("tableName", createTableMetadataRequest.TableName))
+
 	req.createTableResult = createTableResult
 }
 
diff --git a/server/coordinator/procedure/ddl/droptable/drop_table.go b/server/coordinator/procedure/ddl/droptable/drop_table.go
index 2afba95..9af81b9 100644
--- a/server/coordinator/procedure/ddl/droptable/drop_table.go
+++ b/server/coordinator/procedure/ddl/droptable/drop_table.go
@@ -63,12 +63,16 @@
 		return
 	}
 
+	log.Debug("dispatch dropTableOnShard finish", zap.String("tableName", params.SourceReq.GetName()), zap.Uint64("procedureID", params.ID))
+
 	result, err := params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
 	if err != nil {
 		procedure.CancelEventWithLog(event, err, "cluster drop table")
 		return
 	}
 
+	log.Debug("drop table finish", zap.String("tableName", params.SourceReq.GetName()), zap.Uint64("procedureID", params.ID))
+
 	if len(result.ShardVersionUpdate) != 1 {
 		procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult is %d", len(result.ShardVersionUpdate)))
 		return
diff --git a/server/coordinator/procedure/operation/transferleader/transfer_leader.go b/server/coordinator/procedure/operation/transferleader/transfer_leader.go
index b1e8655..ab32098 100644
--- a/server/coordinator/procedure/operation/transferleader/transfer_leader.go
+++ b/server/coordinator/procedure/operation/transferleader/transfer_leader.go
@@ -247,6 +247,8 @@
 		return
 	}
 
+	log.Info("try to close shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("oldLeader", req.p.params.OldLeaderNodeName))
+
 	closeShardRequest := eventdispatch.CloseShardRequest{
 		ShardID: uint32(req.p.params.ShardID),
 	}
@@ -254,6 +256,8 @@
 		procedure.CancelEventWithLog(event, err, "close shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("oldLeaderName", req.p.params.OldLeaderNodeName))
 		return
 	}
+
+	log.Info("close shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("oldLeader", req.p.params.OldLeaderNodeName))
 }
 
 func openNewShardCallback(event *fsm.Event) {
@@ -275,20 +279,24 @@
 		Shard: metadata.ShardInfo{ID: req.p.params.ShardID, Role: storage.ShardRoleLeader, Version: preVersion + 1},
 	}
 
+	log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName))
+
 	if err := req.p.params.Dispatch.OpenShard(ctx, req.p.params.NewLeaderNodeName, openShardRequest); err != nil {
 		procedure.CancelEventWithLog(event, err, "open shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName))
 		return
 	}
+
+	log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName))
 }
 
 func finishCallback(event *fsm.Event) {
-	request, err := procedure.GetRequestFromEvent[callbackRequest](event)
+	req, err := procedure.GetRequestFromEvent[callbackRequest](event)
 	if err != nil {
 		procedure.CancelEventWithLog(event, err, "get request from event")
 		return
 	}
 
-	log.Info("transfer leader finish", zap.Uint32("shardID", uint32(request.p.params.ShardID)), zap.String("oldLeaderNode", request.p.params.OldLeaderNodeName), zap.String("newLeaderNode", request.p.params.NewLeaderNodeName))
+	log.Info("transfer leader finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("oldLeaderNode", req.p.params.OldLeaderNodeName), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName))
 }
 
 func (p *Procedure) updateStateWithLock(state procedure.State) {
diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go
index dedba8a..26f756b 100644
--- a/server/coordinator/scheduler/assign_shard_scheduler.go
+++ b/server/coordinator/scheduler/assign_shard_scheduler.go
@@ -4,6 +4,7 @@
 
 import (
 	"context"
+	"fmt"
 
 	"github.com/CeresDB/ceresmeta/server/cluster/metadata"
 	"github.com/CeresDB/ceresmeta/server/coordinator"
@@ -54,7 +55,7 @@
 		}
 		return ScheduleResult{
 			Procedure: p,
-			Reason:    AssignReason,
+			Reason:    fmt.Sprintf("try to assign shard:%d to node:%s ,reason:%v", shardView.ShardID, newLeaderNode.Node.Name, AssignReason),
 		}, nil
 	}
 	return ScheduleResult{}, nil
diff --git a/server/service/http/api.go b/server/service/http/api.go
index 6a59dde..6eb4aaa 100644
--- a/server/service/http/api.go
+++ b/server/service/http/api.go
@@ -18,6 +18,7 @@
 	"github.com/CeresDB/ceresmeta/server/config"
 	"github.com/CeresDB/ceresmeta/server/coordinator"
 	"github.com/CeresDB/ceresmeta/server/limiter"
+	"github.com/CeresDB/ceresmeta/server/member"
 	"github.com/CeresDB/ceresmeta/server/status"
 	"github.com/CeresDB/ceresmeta/server/storage"
 	"go.uber.org/zap"
@@ -52,6 +53,7 @@
 	router := New().WithPrefix(apiPrefix).WithInstrumentation(printRequestInsmt)
 
 	// Register API.
+	router.Get("/leader", a.getLeaderAddr)
 	router.Post("/getShardTables", a.getShardTables)
 	router.Post("/transferLeader", a.transferLeader)
 	router.Post("/split", a.split)
@@ -159,32 +161,61 @@
 	}
 }
 
-type GetShardTables struct {
+func (a *API) getLeaderAddr(writer http.ResponseWriter, req *http.Request) {
+	leaderAddr, err := a.forwardClient.GetLeaderAddr(req.Context())
+	if err != nil {
+		log.Error("get leader addr failed", zap.Error(err))
+		a.respondError(writer, member.ErrGetLeader, fmt.Sprintf("err: %s", err.Error()))
+		return
+	}
+	a.respond(writer, leaderAddr)
+}
+
+type GetShardTablesRequest struct {
 	ClusterName string   `json:"clusterName"`
-	NodeName    string   `json:"nodeName"`
 	ShardIDs    []uint32 `json:"shardIDs"`
 }
 
 func (a *API) getShardTables(writer http.ResponseWriter, req *http.Request) {
-	var getShardTables GetShardTables
-	err := json.NewDecoder(req.Body).Decode(&getShardTables)
+	resp, isLeader, err := a.forwardClient.forwardToLeader(req)
+	if err != nil {
+		log.Error("forward to leader failed", zap.Error(err))
+		a.respondError(writer, ErrForwardToLeader, fmt.Sprintf("err: %s", err.Error()))
+		return
+	}
+
+	if !isLeader {
+		a.respondForward(writer, resp)
+		return
+	}
+
+	var getShardTablesReq GetShardTablesRequest
+	err = json.NewDecoder(req.Body).Decode(&getShardTablesReq)
 	if err != nil {
 		log.Error("decode request body failed", zap.Error(err))
 		a.respondError(writer, ErrParseRequest, fmt.Sprintf("err: %s", err.Error()))
 		return
 	}
-	log.Info("get shard tables request", zap.String("request", fmt.Sprintf("%+v", getShardTables)))
+	log.Info("get shard tables request", zap.String("request", fmt.Sprintf("%+v", getShardTablesReq)))
 
-	c, err := a.clusterManager.GetCluster(req.Context(), getShardTables.ClusterName)
+	c, err := a.clusterManager.GetCluster(req.Context(), getShardTablesReq.ClusterName)
 	if err != nil {
-		log.Error("get cluster failed", zap.String("clusterName", getShardTables.ClusterName), zap.Error(err))
-		a.respondError(writer, ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", getShardTables.ClusterName, err.Error()))
+		log.Error("get cluster failed", zap.String("clusterName", getShardTablesReq.ClusterName), zap.Error(err))
+		a.respondError(writer, ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", getShardTablesReq.ClusterName, err.Error()))
 		return
 	}
 
-	shardIDs := make([]storage.ShardID, len(getShardTables.ShardIDs))
-	for _, shardID := range getShardTables.ShardIDs {
-		shardIDs = append(shardIDs, storage.ShardID(shardID))
+	// If ShardIDs in the request is empty, query with all shardIDs in the cluster.
+	shardIDs := make([]storage.ShardID, len(getShardTablesReq.ShardIDs))
+	if len(getShardTablesReq.ShardIDs) != 0 {
+		for _, shardID := range getShardTablesReq.ShardIDs {
+			shardIDs = append(shardIDs, storage.ShardID(shardID))
+		}
+	} else {
+		shardViewsMapping := c.GetMetadata().GetClusterSnapshot().Topology.ShardViewsMapping
+		for shardID := range shardViewsMapping {
+			shardIDs = append(shardIDs, shardID)
+		}
 	}
 
 	shardTables := c.GetMetadata().GetShardTables(shardIDs)
@@ -199,8 +230,20 @@
 }
 
 func (a *API) transferLeader(writer http.ResponseWriter, req *http.Request) {
+	resp, isLeader, err := a.forwardClient.forwardToLeader(req)
+	if err != nil {
+		log.Error("forward to leader failed", zap.Error(err))
+		a.respondError(writer, ErrForwardToLeader, fmt.Sprintf("err: %s", err.Error()))
+		return
+	}
+
+	if !isLeader {
+		a.respondForward(writer, resp)
+		return
+	}
+
 	var transferLeaderRequest TransferLeaderRequest
-	err := json.NewDecoder(req.Body).Decode(&transferLeaderRequest)
+	err = json.NewDecoder(req.Body).Decode(&transferLeaderRequest)
 	if err != nil {
 		log.Error("decode request body failed", zap.Error(err))
 		a.respondError(writer, ErrParseRequest, fmt.Sprintf("err: %s", err.Error()))
diff --git a/server/service/http/forward.go b/server/service/http/forward.go
index b941d63..f8df6d2 100644
--- a/server/service/http/forward.go
+++ b/server/service/http/forward.go
@@ -45,6 +45,15 @@
 	}
 }
 
+func (s *ForwardClient) GetLeaderAddr(ctx context.Context) (string, error) {
+	resp, err := s.member.GetLeaderAddr(ctx)
+	if err != nil {
+		return "", err
+	}
+
+	return resp.LeaderEndpoint, nil
+}
+
 func (s *ForwardClient) getForwardedAddr(ctx context.Context) (string, bool, error) {
 	resp, err := s.member.GetLeaderAddr(ctx)
 	if err != nil {