fix: drop partition table (#231)
diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go index a409152..d1db967 100644 --- a/server/cluster/metadata/cluster_metadata.go +++ b/server/cluster/metadata/cluster_metadata.go
@@ -234,7 +234,7 @@ } if exists { - return CreateTableMetadataResult{}, ErrTableAlreadyExists + return CreateTableMetadataResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName) } // Create table in table manager. @@ -311,7 +311,7 @@ } if exists { - return CreateTableResult{}, ErrTableAlreadyExists + return CreateTableResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName) } // Create table in table manager.
diff --git a/server/cluster/metadata/table_manager.go b/server/cluster/metadata/table_manager.go index 5981797..1119fb3 100644 --- a/server/cluster/metadata/table_manager.go +++ b/server/cluster/metadata/table_manager.go
@@ -112,7 +112,7 @@ } if exists { - return storage.Table{}, ErrTableAlreadyExists + return storage.Table{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", tableName) } // Create table in storage.
diff --git a/server/coordinator/factory_test.go b/server/coordinator/factory_test.go index 4adcc37..48b9d12 100644 --- a/server/coordinator/factory_test.go +++ b/server/coordinator/factory_test.go
@@ -112,9 +112,9 @@ OnFailed: nil, }) // Drop non-existing partition table. - re.Error(err) - re.False(ok) - re.Nil(p) + re.NoError(err) + re.True(ok) + re.NotNil(p) } func TestTransferLeader(t *testing.T) {
diff --git a/server/coordinator/procedure/ddl/common_util.go b/server/coordinator/procedure/ddl/common_util.go index 1cdbcd9..3d1c1c9 100644 --- a/server/coordinator/procedure/ddl/common_util.go +++ b/server/coordinator/procedure/ddl/common_util.go
@@ -67,18 +67,25 @@ } } -func GetShardVersionByTableName(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string, shardVersions map[storage.ShardID]uint64) (storage.Table, metadata.ShardVersionUpdate, error) { +func GetTableMetadata(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string) (storage.Table, error) { table, exists, err := clusterMetadata.GetTable(schemaName, tableName) if err != nil { - return storage.Table{}, metadata.ShardVersionUpdate{}, err + return storage.Table{}, err } if !exists { - return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrTableNotExists, "table not exists") + return storage.Table{}, errors.WithMessagef(procedure.ErrTableNotExists, "table not exists, tableName:%s", tableName) } + return table, nil +} +// BuildShardVersionUpdate builds metadata.ShardVersionUpdate to assist DDL on the shard. +// +// And if no error is thrown, the returned boolean value is used to tell whether this table is allocated to shard. +// In some cases, we need to use this value to determine whether DDL can be executed normally。 +func BuildShardVersionUpdate(table storage.Table, clusterMetadata *metadata.ClusterMetadata, shardVersions map[storage.ShardID]uint64) (metadata.ShardVersionUpdate, bool, error) { shardNodesResult, err := clusterMetadata.GetShardNodeByTableIDs([]storage.TableID{table.ID}) if err != nil { - return storage.Table{}, metadata.ShardVersionUpdate{}, err + return metadata.ShardVersionUpdate{}, false, err } leader := storage.ShardNode{} @@ -92,21 +99,22 @@ } if !found { - return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrShardLeaderNotFound, "can't find leader") + log.Warn("table can't find leader shard", zap.String("tableName", table.Name)) + return metadata.ShardVersionUpdate{}, false, nil } prevVersion, exists := shardVersions[leader.ID] if !exists { - return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID) + return metadata.ShardVersionUpdate{}, false, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID) } currVersion := prevVersion + 1 - return table, metadata.ShardVersionUpdate{ + return metadata.ShardVersionUpdate{ ShardID: leader.ID, CurrVersion: currVersion, PrevVersion: prevVersion, - }, nil + }, true, nil } func DispatchDropTable(ctx context.Context, clusterMetadata *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, schemaName string, table storage.Table, version metadata.ShardVersionUpdate) error {
diff --git a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go index 5b91a3c..f07e06e 100644 --- a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go +++ b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go
@@ -139,6 +139,7 @@ } if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil { p.updateStateWithLock(procedure.StateFailed) + _ = p.params.OnFailed(err) return errors.WithMessage(err, "create partition table") } case stateCreatePartitionTable: @@ -147,6 +148,7 @@ } if err := p.fsm.Event(eventCreateSubTables, createPartitionTableRequest); err != nil { p.updateStateWithLock(procedure.StateFailed) + _ = p.params.OnFailed(err) return errors.WithMessage(err, "create data tables") } case stateCreateSubTables: @@ -155,12 +157,14 @@ } if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil { p.updateStateWithLock(procedure.StateFailed) + _ = p.params.OnFailed(err) return errors.WithMessage(err, "update table shard metadata") } case stateFinish: // TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine. p.updateStateWithLock(procedure.StateFinished) if err := p.persist(ctx); err != nil { + _ = p.params.OnFailed(err) return errors.WithMessage(err, "create partition table procedure persist") } return nil
diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go index b750195..f4ae07a 100644 --- a/server/coordinator/procedure/ddl/createtable/create_table.go +++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -203,10 +203,12 @@ if err1 != nil { err = errors.WithMessagef(err, "send eventFailed, err:%v", err1) } + _ = p.params.OnFailed(err) return errors.WithMessage(err, "send eventPrepare") } if err := p.fsm.Event(eventSuccess, req); err != nil { + _ = p.params.OnFailed(err) return errors.WithMessage(err, "send eventSuccess") }
diff --git a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go index af3a010..d6ffc49 100644 --- a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go +++ b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go
@@ -101,11 +101,11 @@ return procedure.RelatedVersionInfo{}, errors.WithMessagef(err, "get sub table, tableName:%s", subTableName) } if !exists { - return procedure.RelatedVersionInfo{}, errors.WithMessagef(procedure.ErrTableNotExists, "get sub table, tableName:%s", subTableName) + continue } shardID, exists := tableShardMapping[table.ID] if !exists { - return procedure.RelatedVersionInfo{}, errors.WithMessagef(metadata.ErrShardNotFound, "get shard of sub table, tableID:%d", table.ID) + continue } shardView, exists := params.ClusterSnapshot.Topology.ShardViewsMapping[shardID] if !exists { @@ -154,6 +154,7 @@ } if err := p.fsm.Event(eventDropDataTable, dropPartitionTableRequest); err != nil { p.updateStateWithLock(procedure.StateFailed) + _ = p.params.OnFailed(err) return errors.WithMessage(err, "drop partition table procedure") } case stateDropDataTable: @@ -162,6 +163,7 @@ } if err := p.fsm.Event(eventDropPartitionTable, dropPartitionTableRequest); err != nil { p.updateStateWithLock(procedure.StateFailed) + _ = p.params.OnFailed(err) return errors.WithMessage(err, "drop partition table procedure drop data table") } case stateDropPartitionTable: @@ -170,11 +172,13 @@ } if err := p.fsm.Event(eventFinish, dropPartitionTableRequest); err != nil { p.updateStateWithLock(procedure.StateFailed) + _ = p.params.OnFailed(err) return errors.WithMessage(err, "drop partition table procedure drop partition table") } case stateFinish: p.updateStateWithLock(procedure.StateFinished) if err := p.persist(ctx); err != nil { + _ = p.params.OnFailed(err) return errors.WithMessage(err, "drop partition table procedure persist") } return nil @@ -278,11 +282,28 @@ shardVersions := req.p.relatedVersionInfo.ShardWithVersion for _, tableName := range params.SourceReq.PartitionTableInfo.SubTableNames { - table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, req.schemaName(), tableName, shardVersions) + table, err := ddl.GetTableMetadata(params.ClusterMetadata, req.schemaName(), tableName) if err != nil { - procedure.CancelEventWithLog(event, err, fmt.Sprintf("get shard version by table, table:%s", tableName)) + log.Warn("get table metadata failed", zap.String("tableName", tableName)) + continue + } + + shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, shardVersions) + if err != nil { + log.Error("get shard version by table", zap.String("tableName", tableName), zap.Error(err)) + procedure.CancelEventWithLog(event, err, "build shard version update", zap.String("tableName", tableName)) return } + // If the shard corresponding to this table does not exist, it means that the actual table creation failed. + // In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table. + if !shardExists { + _, err := params.ClusterMetadata.DropTableMetadata(req.ctx, req.schemaName(), tableName) + if err != nil { + procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", tableName)) + return + } + continue + } if err := ddl.DispatchDropTable(req.ctx, params.ClusterMetadata, params.Dispatch, params.SourceReq.GetSchemaName(), table, shardVersionUpdate); err != nil { procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
diff --git a/server/coordinator/procedure/ddl/droptable/drop_table.go b/server/coordinator/procedure/ddl/droptable/drop_table.go index e546f64..940daa8 100644 --- a/server/coordinator/procedure/ddl/droptable/drop_table.go +++ b/server/coordinator/procedure/ddl/droptable/drop_table.go
@@ -51,9 +51,33 @@ } params := req.p.params - table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName(), req.p.relatedVersionInfo.ShardWithVersion) + table, err := ddl.GetTableMetadata(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) if err != nil { - procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName())) + procedure.CancelEventWithLog(event, err, "get table metadata", zap.String("tableName", params.SourceReq.GetName()), zap.Error(err)) + return + } + req.ret = metadata.TableInfo{ + ID: table.ID, + Name: table.Name, + SchemaID: table.SchemaID, + SchemaName: params.SourceReq.GetSchemaName(), + PartitionInfo: table.PartitionInfo, + } + + shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, req.p.relatedVersionInfo.ShardWithVersion) + if err != nil { + log.Error("get shard version by table", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err)) + procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err)) + return + } + // If the shard corresponding to this table does not exist, it means that the actual table creation failed. + // In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table. + if !shardExists { + _, err = params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName()) + if err != nil { + procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", params.SourceReq.GetName())) + return + } return } @@ -77,14 +101,6 @@ procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult is %d", len(result.ShardVersionUpdate))) return } - - req.ret = metadata.TableInfo{ - ID: table.ID, - Name: table.Name, - SchemaID: table.SchemaID, - SchemaName: params.SourceReq.GetSchemaName(), - PartitionInfo: table.PartitionInfo, - } } func successCallback(event *fsm.Event) {
diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go index d60437d..425150a 100644 --- a/server/coordinator/procedure/manager_impl.go +++ b/server/coordinator/procedure/manager_impl.go
@@ -156,12 +156,14 @@ func (m *ManagerImpl) startProcedureWorker(ctx context.Context, newProcedure Procedure, procedureWorkerChan chan struct{}) { go func() { + start := time.Now() m.logger.Info("procedure start", zap.Uint64("procedureID", newProcedure.ID())) err := newProcedure.Start(ctx) if err != nil { - m.logger.Error("procedure start failed", zap.Error(err)) + m.logger.Error("procedure start failed", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds())) + } else { + m.logger.Info("procedure start finish", zap.Uint64("procedureID", newProcedure.ID()), zap.Int64("costTime", time.Since(start).Milliseconds())) } - m.logger.Info("procedure finish", zap.Uint64("procedureID", newProcedure.ID())) for shardID := range newProcedure.RelatedVersionInfo().ShardWithVersion { m.lock.Lock() delete(m.runningProcedures, shardID)
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go index 2775e7e..70b106c 100644 --- a/server/service/grpc/service.go +++ b/server/service/grpc/service.go
@@ -146,6 +146,7 @@ // CreateTable implements gRPC CeresmetaServer. func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTableRequest) (*metaservicepb.CreateTableResponse, error) { + start := time.Now() // Since there may be too many table creation requests, a flow limiter is added here. if ok, err := s.allow(); !ok { return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table grpc request is rejected by flow limiter")}, nil @@ -201,6 +202,7 @@ select { case ret := <-resultCh: + log.Info("create table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds())) return &metaservicepb.CreateTableResponse{ Header: okResponseHeader(), CreatedTable: &metaservicepb.TableInfo{ @@ -216,12 +218,14 @@ }, }, nil case err = <-errorCh: + log.Warn("create table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds())) return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil } } // DropTable implements gRPC CeresmetaServer. func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableRequest) (*metaservicepb.DropTableResponse, error) { + start := time.Now() // Since there may be too many table dropping requests, a flow limiter is added here. if ok, err := s.allow(); !ok { return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table grpc request is rejected by flow limiter")}, nil @@ -275,17 +279,19 @@ err = c.GetProcedureManager().Submit(ctx, procedure) if err != nil { - log.Error("fail to drop table, manager submit procedure", zap.Error(err)) + log.Error("fail to drop table, manager submit procedure", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds())) return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil } select { case ret := <-resultCh: + log.Info("drop table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds())) return &metaservicepb.DropTableResponse{ Header: okResponseHeader(), DroppedTable: metadata.ConvertTableInfoToPB(ret), }, nil case err = <-errorCh: + log.Info("drop table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds())) return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil } }