refactor: refactor http api module (#198)
diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index bf3550b..b750195 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -15,6 +15,7 @@ "github.com/CeresDB/ceresmeta/server/storage" "github.com/looplab/fsm" "github.com/pkg/errors" + "go.uber.org/zap" ) const ( @@ -60,6 +61,8 @@ return } + log.Debug("create table metadata finish", zap.String("tableName", createTableMetadataRequest.TableName)) + shardVersionUpdate := metadata.ShardVersionUpdate{ ShardID: params.ShardID, CurrVersion: req.p.relatedVersionInfo.ShardWithVersion[params.ShardID] + 1, @@ -72,12 +75,16 @@ return } + log.Debug("dispatch createTableOnShard finish", zap.String("tableName", createTableMetadataRequest.TableName)) + createTableResult, err := params.ClusterMetadata.AddTableTopology(req.ctx, params.ShardID, result.Table) if err != nil { - procedure.CancelEventWithLog(event, err, "create table metadata") + procedure.CancelEventWithLog(event, err, "add table topology") return } + log.Debug("add table topology finish", zap.String("tableName", createTableMetadataRequest.TableName)) + req.createTableResult = createTableResult }
diff --git a/server/coordinator/procedure/ddl/droptable/drop_table.go b/server/coordinator/procedure/ddl/droptable/drop_table.go index 2afba95..9af81b9 100644 --- a/server/coordinator/procedure/ddl/droptable/drop_table.go +++ b/server/coordinator/procedure/ddl/droptable/drop_table.go
@@ -63,12 +63,16 @@ return } + log.Debug("dispatch dropTableOnShard finish", zap.String("tableName", params.SourceReq.GetName()), zap.Uint64("procedureID", params.ID)) + result, err := params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) if err != nil { procedure.CancelEventWithLog(event, err, "cluster drop table") return } + log.Debug("drop table finish", zap.String("tableName", params.SourceReq.GetName()), zap.Uint64("procedureID", params.ID)) + if len(result.ShardVersionUpdate) != 1 { procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult is %d", len(result.ShardVersionUpdate))) return
diff --git a/server/coordinator/procedure/operation/transferleader/transfer_leader.go b/server/coordinator/procedure/operation/transferleader/transfer_leader.go index b1e8655..ab32098 100644 --- a/server/coordinator/procedure/operation/transferleader/transfer_leader.go +++ b/server/coordinator/procedure/operation/transferleader/transfer_leader.go
@@ -247,6 +247,8 @@ return } + log.Info("try to close shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("oldLeader", req.p.params.OldLeaderNodeName)) + closeShardRequest := eventdispatch.CloseShardRequest{ ShardID: uint32(req.p.params.ShardID), } @@ -254,6 +256,8 @@ procedure.CancelEventWithLog(event, err, "close shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("oldLeaderName", req.p.params.OldLeaderNodeName)) return } + + log.Info("close shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("oldLeader", req.p.params.OldLeaderNodeName)) } func openNewShardCallback(event *fsm.Event) { @@ -275,20 +279,24 @@ Shard: metadata.ShardInfo{ID: req.p.params.ShardID, Role: storage.ShardRoleLeader, Version: preVersion + 1}, } + log.Info("try to open shard", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName)) + if err := req.p.params.Dispatch.OpenShard(ctx, req.p.params.NewLeaderNodeName, openShardRequest); err != nil { procedure.CancelEventWithLog(event, err, "open shard", zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName)) return } + + log.Info("open shard finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint64("shardID", req.p.params.ID), zap.String("newLeader", req.p.params.NewLeaderNodeName)) } func finishCallback(event *fsm.Event) { - request, err := procedure.GetRequestFromEvent[callbackRequest](event) + req, err := procedure.GetRequestFromEvent[callbackRequest](event) if err != nil { procedure.CancelEventWithLog(event, err, "get request from event") return } - log.Info("transfer leader finish", zap.Uint32("shardID", uint32(request.p.params.ShardID)), zap.String("oldLeaderNode", request.p.params.OldLeaderNodeName), zap.String("newLeaderNode", request.p.params.NewLeaderNodeName)) + log.Info("transfer leader finish", zap.Uint64("procedureID", req.p.ID()), zap.Uint32("shardID", uint32(req.p.params.ShardID)), zap.String("oldLeaderNode", req.p.params.OldLeaderNodeName), zap.String("newLeaderNode", req.p.params.NewLeaderNodeName)) } func (p *Procedure) updateStateWithLock(state procedure.State) {
diff --git a/server/coordinator/scheduler/assign_shard_scheduler.go b/server/coordinator/scheduler/assign_shard_scheduler.go index dedba8a..26f756b 100644 --- a/server/coordinator/scheduler/assign_shard_scheduler.go +++ b/server/coordinator/scheduler/assign_shard_scheduler.go
@@ -4,6 +4,7 @@ import ( "context" + "fmt" "github.com/CeresDB/ceresmeta/server/cluster/metadata" "github.com/CeresDB/ceresmeta/server/coordinator" @@ -54,7 +55,7 @@ } return ScheduleResult{ Procedure: p, - Reason: AssignReason, + Reason: fmt.Sprintf("try to assign shard:%d to node:%s ,reason:%v", shardView.ShardID, newLeaderNode.Node.Name, AssignReason), }, nil } return ScheduleResult{}, nil
diff --git a/server/service/http/api.go b/server/service/http/api.go index 6a59dde..6eb4aaa 100644 --- a/server/service/http/api.go +++ b/server/service/http/api.go
@@ -18,6 +18,7 @@ "github.com/CeresDB/ceresmeta/server/config" "github.com/CeresDB/ceresmeta/server/coordinator" "github.com/CeresDB/ceresmeta/server/limiter" + "github.com/CeresDB/ceresmeta/server/member" "github.com/CeresDB/ceresmeta/server/status" "github.com/CeresDB/ceresmeta/server/storage" "go.uber.org/zap" @@ -52,6 +53,7 @@ router := New().WithPrefix(apiPrefix).WithInstrumentation(printRequestInsmt) // Register API. + router.Get("/leader", a.getLeaderAddr) router.Post("/getShardTables", a.getShardTables) router.Post("/transferLeader", a.transferLeader) router.Post("/split", a.split) @@ -159,32 +161,61 @@ } } -type GetShardTables struct { +func (a *API) getLeaderAddr(writer http.ResponseWriter, req *http.Request) { + leaderAddr, err := a.forwardClient.GetLeaderAddr(req.Context()) + if err != nil { + log.Error("get leader addr failed", zap.Error(err)) + a.respondError(writer, member.ErrGetLeader, fmt.Sprintf("err: %s", err.Error())) + return + } + a.respond(writer, leaderAddr) +} + +type GetShardTablesRequest struct { ClusterName string `json:"clusterName"` - NodeName string `json:"nodeName"` ShardIDs []uint32 `json:"shardIDs"` } func (a *API) getShardTables(writer http.ResponseWriter, req *http.Request) { - var getShardTables GetShardTables - err := json.NewDecoder(req.Body).Decode(&getShardTables) + resp, isLeader, err := a.forwardClient.forwardToLeader(req) + if err != nil { + log.Error("forward to leader failed", zap.Error(err)) + a.respondError(writer, ErrForwardToLeader, fmt.Sprintf("err: %s", err.Error())) + return + } + + if !isLeader { + a.respondForward(writer, resp) + return + } + + var getShardTablesReq GetShardTablesRequest + err = json.NewDecoder(req.Body).Decode(&getShardTablesReq) if err != nil { log.Error("decode request body failed", zap.Error(err)) a.respondError(writer, ErrParseRequest, fmt.Sprintf("err: %s", err.Error())) return } - log.Info("get shard tables request", zap.String("request", fmt.Sprintf("%+v", getShardTables))) + log.Info("get shard tables request", zap.String("request", fmt.Sprintf("%+v", getShardTablesReq))) - c, err := a.clusterManager.GetCluster(req.Context(), getShardTables.ClusterName) + c, err := a.clusterManager.GetCluster(req.Context(), getShardTablesReq.ClusterName) if err != nil { - log.Error("get cluster failed", zap.String("clusterName", getShardTables.ClusterName), zap.Error(err)) - a.respondError(writer, ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", getShardTables.ClusterName, err.Error())) + log.Error("get cluster failed", zap.String("clusterName", getShardTablesReq.ClusterName), zap.Error(err)) + a.respondError(writer, ErrGetCluster, fmt.Sprintf("clusterName: %s, err: %s", getShardTablesReq.ClusterName, err.Error())) return } - shardIDs := make([]storage.ShardID, len(getShardTables.ShardIDs)) - for _, shardID := range getShardTables.ShardIDs { - shardIDs = append(shardIDs, storage.ShardID(shardID)) + // If ShardIDs in the request is empty, query with all shardIDs in the cluster. + shardIDs := make([]storage.ShardID, len(getShardTablesReq.ShardIDs)) + if len(getShardTablesReq.ShardIDs) != 0 { + for _, shardID := range getShardTablesReq.ShardIDs { + shardIDs = append(shardIDs, storage.ShardID(shardID)) + } + } else { + shardViewsMapping := c.GetMetadata().GetClusterSnapshot().Topology.ShardViewsMapping + for shardID := range shardViewsMapping { + shardIDs = append(shardIDs, shardID) + } } shardTables := c.GetMetadata().GetShardTables(shardIDs) @@ -199,8 +230,20 @@ } func (a *API) transferLeader(writer http.ResponseWriter, req *http.Request) { + resp, isLeader, err := a.forwardClient.forwardToLeader(req) + if err != nil { + log.Error("forward to leader failed", zap.Error(err)) + a.respondError(writer, ErrForwardToLeader, fmt.Sprintf("err: %s", err.Error())) + return + } + + if !isLeader { + a.respondForward(writer, resp) + return + } + var transferLeaderRequest TransferLeaderRequest - err := json.NewDecoder(req.Body).Decode(&transferLeaderRequest) + err = json.NewDecoder(req.Body).Decode(&transferLeaderRequest) if err != nil { log.Error("decode request body failed", zap.Error(err)) a.respondError(writer, ErrParseRequest, fmt.Sprintf("err: %s", err.Error()))
diff --git a/server/service/http/forward.go b/server/service/http/forward.go index b941d63..f8df6d2 100644 --- a/server/service/http/forward.go +++ b/server/service/http/forward.go
@@ -45,6 +45,15 @@ } } +func (s *ForwardClient) GetLeaderAddr(ctx context.Context) (string, error) { + resp, err := s.member.GetLeaderAddr(ctx) + if err != nil { + return "", err + } + + return resp.LeaderEndpoint, nil +} + func (s *ForwardClient) getForwardedAddr(ctx context.Context) (string, bool, error) { resp, err := s.member.GetLeaderAddr(ctx) if err != nil {