fix: drop partition table (#231)
diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go
index a409152..d1db967 100644
--- a/server/cluster/metadata/cluster_metadata.go
+++ b/server/cluster/metadata/cluster_metadata.go
@@ -234,7 +234,7 @@
}
if exists {
- return CreateTableMetadataResult{}, ErrTableAlreadyExists
+ return CreateTableMetadataResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName)
}
// Create table in table manager.
@@ -311,7 +311,7 @@
}
if exists {
- return CreateTableResult{}, ErrTableAlreadyExists
+ return CreateTableResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName)
}
// Create table in table manager.
diff --git a/server/cluster/metadata/table_manager.go b/server/cluster/metadata/table_manager.go
index 5981797..1119fb3 100644
--- a/server/cluster/metadata/table_manager.go
+++ b/server/cluster/metadata/table_manager.go
@@ -112,7 +112,7 @@
}
if exists {
- return storage.Table{}, ErrTableAlreadyExists
+ return storage.Table{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", tableName)
}
// Create table in storage.
diff --git a/server/coordinator/factory_test.go b/server/coordinator/factory_test.go
index 4adcc37..48b9d12 100644
--- a/server/coordinator/factory_test.go
+++ b/server/coordinator/factory_test.go
@@ -112,9 +112,9 @@
OnFailed: nil,
})
// Drop non-existing partition table.
- re.Error(err)
- re.False(ok)
- re.Nil(p)
+ re.NoError(err)
+ re.True(ok)
+ re.NotNil(p)
}
func TestTransferLeader(t *testing.T) {
diff --git a/server/coordinator/procedure/ddl/common_util.go b/server/coordinator/procedure/ddl/common_util.go
index 1cdbcd9..3d1c1c9 100644
--- a/server/coordinator/procedure/ddl/common_util.go
+++ b/server/coordinator/procedure/ddl/common_util.go
@@ -67,18 +67,25 @@
}
}
-func GetShardVersionByTableName(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string, shardVersions map[storage.ShardID]uint64) (storage.Table, metadata.ShardVersionUpdate, error) {
+func GetTableMetadata(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string) (storage.Table, error) {
table, exists, err := clusterMetadata.GetTable(schemaName, tableName)
if err != nil {
- return storage.Table{}, metadata.ShardVersionUpdate{}, err
+ return storage.Table{}, err
}
if !exists {
- return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrTableNotExists, "table not exists")
+ return storage.Table{}, errors.WithMessagef(procedure.ErrTableNotExists, "table not exists, tableName:%s", tableName)
}
+ return table, nil
+}
+// BuildShardVersionUpdate builds metadata.ShardVersionUpdate to assist DDL on the shard.
+//
+// And if no error is thrown, the returned boolean value is used to tell whether this table is allocated to shard.
+// In some cases, we need to use this value to determine whether DDL can be executed normally。
+func BuildShardVersionUpdate(table storage.Table, clusterMetadata *metadata.ClusterMetadata, shardVersions map[storage.ShardID]uint64) (metadata.ShardVersionUpdate, bool, error) {
shardNodesResult, err := clusterMetadata.GetShardNodeByTableIDs([]storage.TableID{table.ID})
if err != nil {
- return storage.Table{}, metadata.ShardVersionUpdate{}, err
+ return metadata.ShardVersionUpdate{}, false, err
}
leader := storage.ShardNode{}
@@ -92,21 +99,22 @@
}
if !found {
- return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrShardLeaderNotFound, "can't find leader")
+ log.Warn("table can't find leader shard", zap.String("tableName", table.Name))
+ return metadata.ShardVersionUpdate{}, false, nil
}
prevVersion, exists := shardVersions[leader.ID]
if !exists {
- return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
+ return metadata.ShardVersionUpdate{}, false, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
}
currVersion := prevVersion + 1
- return table, metadata.ShardVersionUpdate{
+ return metadata.ShardVersionUpdate{
ShardID: leader.ID,
CurrVersion: currVersion,
PrevVersion: prevVersion,
- }, nil
+ }, true, nil
}
func DispatchDropTable(ctx context.Context, clusterMetadata *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, schemaName string, table storage.Table, version metadata.ShardVersionUpdate) error {
diff --git a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go
index 5b91a3c..f07e06e 100644
--- a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go
+++ b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go
@@ -139,6 +139,7 @@
}
if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "create partition table")
}
case stateCreatePartitionTable:
@@ -147,6 +148,7 @@
}
if err := p.fsm.Event(eventCreateSubTables, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "create data tables")
}
case stateCreateSubTables:
@@ -155,12 +157,14 @@
}
if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "update table shard metadata")
}
case stateFinish:
// TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine.
p.updateStateWithLock(procedure.StateFinished)
if err := p.persist(ctx); err != nil {
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "create partition table procedure persist")
}
return nil
diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go
index b750195..f4ae07a 100644
--- a/server/coordinator/procedure/ddl/createtable/create_table.go
+++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -203,10 +203,12 @@
if err1 != nil {
err = errors.WithMessagef(err, "send eventFailed, err:%v", err1)
}
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "send eventPrepare")
}
if err := p.fsm.Event(eventSuccess, req); err != nil {
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "send eventSuccess")
}
diff --git a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go
index af3a010..d6ffc49 100644
--- a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go
+++ b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go
@@ -101,11 +101,11 @@
return procedure.RelatedVersionInfo{}, errors.WithMessagef(err, "get sub table, tableName:%s", subTableName)
}
if !exists {
- return procedure.RelatedVersionInfo{}, errors.WithMessagef(procedure.ErrTableNotExists, "get sub table, tableName:%s", subTableName)
+ continue
}
shardID, exists := tableShardMapping[table.ID]
if !exists {
- return procedure.RelatedVersionInfo{}, errors.WithMessagef(metadata.ErrShardNotFound, "get shard of sub table, tableID:%d", table.ID)
+ continue
}
shardView, exists := params.ClusterSnapshot.Topology.ShardViewsMapping[shardID]
if !exists {
@@ -154,6 +154,7 @@
}
if err := p.fsm.Event(eventDropDataTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure")
}
case stateDropDataTable:
@@ -162,6 +163,7 @@
}
if err := p.fsm.Event(eventDropPartitionTable, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure drop data table")
}
case stateDropPartitionTable:
@@ -170,11 +172,13 @@
}
if err := p.fsm.Event(eventFinish, dropPartitionTableRequest); err != nil {
p.updateStateWithLock(procedure.StateFailed)
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure drop partition table")
}
case stateFinish:
p.updateStateWithLock(procedure.StateFinished)
if err := p.persist(ctx); err != nil {
+ _ = p.params.OnFailed(err)
return errors.WithMessage(err, "drop partition table procedure persist")
}
return nil
@@ -278,11 +282,28 @@
shardVersions := req.p.relatedVersionInfo.ShardWithVersion
for _, tableName := range params.SourceReq.PartitionTableInfo.SubTableNames {
- table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, req.schemaName(), tableName, shardVersions)
+ table, err := ddl.GetTableMetadata(params.ClusterMetadata, req.schemaName(), tableName)
if err != nil {
- procedure.CancelEventWithLog(event, err, fmt.Sprintf("get shard version by table, table:%s", tableName))
+ log.Warn("get table metadata failed", zap.String("tableName", tableName))
+ continue
+ }
+
+ shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, shardVersions)
+ if err != nil {
+ log.Error("get shard version by table", zap.String("tableName", tableName), zap.Error(err))
+ procedure.CancelEventWithLog(event, err, "build shard version update", zap.String("tableName", tableName))
return
}
+ // If the shard corresponding to this table does not exist, it means that the actual table creation failed.
+ // In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table.
+ if !shardExists {
+ _, err := params.ClusterMetadata.DropTableMetadata(req.ctx, req.schemaName(), tableName)
+ if err != nil {
+ procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", tableName))
+ return
+ }
+ continue
+ }
if err := ddl.DispatchDropTable(req.ctx, params.ClusterMetadata, params.Dispatch, params.SourceReq.GetSchemaName(), table, shardVersionUpdate); err != nil {
procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
diff --git a/server/coordinator/procedure/ddl/droptable/drop_table.go b/server/coordinator/procedure/ddl/droptable/drop_table.go
index e546f64..940daa8 100644
--- a/server/coordinator/procedure/ddl/droptable/drop_table.go
+++ b/server/coordinator/procedure/ddl/droptable/drop_table.go
@@ -51,9 +51,33 @@
}
params := req.p.params
- table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName(), req.p.relatedVersionInfo.ShardWithVersion)
+ table, err := ddl.GetTableMetadata(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
if err != nil {
- procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()))
+ procedure.CancelEventWithLog(event, err, "get table metadata", zap.String("tableName", params.SourceReq.GetName()), zap.Error(err))
+ return
+ }
+ req.ret = metadata.TableInfo{
+ ID: table.ID,
+ Name: table.Name,
+ SchemaID: table.SchemaID,
+ SchemaName: params.SourceReq.GetSchemaName(),
+ PartitionInfo: table.PartitionInfo,
+ }
+
+ shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, req.p.relatedVersionInfo.ShardWithVersion)
+ if err != nil {
+ log.Error("get shard version by table", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err))
+ procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err))
+ return
+ }
+ // If the shard corresponding to this table does not exist, it means that the actual table creation failed.
+ // In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table.
+ if !shardExists {
+ _, err = params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
+ if err != nil {
+ procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", params.SourceReq.GetName()))
+ return
+ }
return
}
@@ -77,14 +101,6 @@
procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult is %d", len(result.ShardVersionUpdate)))
return
}
-
- req.ret = metadata.TableInfo{
- ID: table.ID,
- Name: table.Name,
- SchemaID: table.SchemaID,
- SchemaName: params.SourceReq.GetSchemaName(),
- PartitionInfo: table.PartitionInfo,
- }
}
func successCallback(event *fsm.Event) {
diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go
index d60437d..425150a 100644
--- a/server/coordinator/procedure/manager_impl.go
+++ b/server/coordinator/procedure/manager_impl.go
@@ -156,12 +156,14 @@
func (m *ManagerImpl) startProcedureWorker(ctx context.Context, newProcedure Procedure, procedureWorkerChan chan struct{}) {
go func() {
+ start := time.Now()
m.logger.Info("procedure start", zap.Uint64("procedureID", newProcedure.ID()))
err := newProcedure.Start(ctx)
if err != nil {
- m.logger.Error("procedure start failed", zap.Error(err))
+ m.logger.Error("procedure start failed", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds()))
+ } else {
+ m.logger.Info("procedure start finish", zap.Uint64("procedureID", newProcedure.ID()), zap.Int64("costTime", time.Since(start).Milliseconds()))
}
- m.logger.Info("procedure finish", zap.Uint64("procedureID", newProcedure.ID()))
for shardID := range newProcedure.RelatedVersionInfo().ShardWithVersion {
m.lock.Lock()
delete(m.runningProcedures, shardID)
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go
index 2775e7e..70b106c 100644
--- a/server/service/grpc/service.go
+++ b/server/service/grpc/service.go
@@ -146,6 +146,7 @@
// CreateTable implements gRPC CeresmetaServer.
func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTableRequest) (*metaservicepb.CreateTableResponse, error) {
+ start := time.Now()
// Since there may be too many table creation requests, a flow limiter is added here.
if ok, err := s.allow(); !ok {
return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table grpc request is rejected by flow limiter")}, nil
@@ -201,6 +202,7 @@
select {
case ret := <-resultCh:
+ log.Info("create table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.CreateTableResponse{
Header: okResponseHeader(),
CreatedTable: &metaservicepb.TableInfo{
@@ -216,12 +218,14 @@
},
}, nil
case err = <-errorCh:
+ log.Warn("create table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil
}
}
// DropTable implements gRPC CeresmetaServer.
func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableRequest) (*metaservicepb.DropTableResponse, error) {
+ start := time.Now()
// Since there may be too many table dropping requests, a flow limiter is added here.
if ok, err := s.allow(); !ok {
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table grpc request is rejected by flow limiter")}, nil
@@ -275,17 +279,19 @@
err = c.GetProcedureManager().Submit(ctx, procedure)
if err != nil {
- log.Error("fail to drop table, manager submit procedure", zap.Error(err))
+ log.Error("fail to drop table, manager submit procedure", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
}
select {
case ret := <-resultCh:
+ log.Info("drop table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{
Header: okResponseHeader(),
DroppedTable: metadata.ConvertTableInfoToPB(ret),
}, nil
case err = <-errorCh:
+ log.Info("drop table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
}
}