fix: drop partition table (#231)

diff --git a/server/cluster/metadata/cluster_metadata.go b/server/cluster/metadata/cluster_metadata.go
index a409152..d1db967 100644
--- a/server/cluster/metadata/cluster_metadata.go
+++ b/server/cluster/metadata/cluster_metadata.go
@@ -234,7 +234,7 @@
 	}
 
 	if exists {
-		return CreateTableMetadataResult{}, ErrTableAlreadyExists
+		return CreateTableMetadataResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName)
 	}
 
 	// Create table in table manager.
@@ -311,7 +311,7 @@
 	}
 
 	if exists {
-		return CreateTableResult{}, ErrTableAlreadyExists
+		return CreateTableResult{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", request.TableName)
 	}
 
 	// Create table in table manager.
diff --git a/server/cluster/metadata/table_manager.go b/server/cluster/metadata/table_manager.go
index 5981797..1119fb3 100644
--- a/server/cluster/metadata/table_manager.go
+++ b/server/cluster/metadata/table_manager.go
@@ -112,7 +112,7 @@
 	}
 
 	if exists {
-		return storage.Table{}, ErrTableAlreadyExists
+		return storage.Table{}, errors.WithMessagef(ErrTableAlreadyExists, "tableName:%s", tableName)
 	}
 
 	// Create table in storage.
diff --git a/server/coordinator/factory_test.go b/server/coordinator/factory_test.go
index 4adcc37..48b9d12 100644
--- a/server/coordinator/factory_test.go
+++ b/server/coordinator/factory_test.go
@@ -112,9 +112,9 @@
 		OnFailed:    nil,
 	})
 	// Drop non-existing partition table.
-	re.Error(err)
-	re.False(ok)
-	re.Nil(p)
+	re.NoError(err)
+	re.True(ok)
+	re.NotNil(p)
 }
 
 func TestTransferLeader(t *testing.T) {
diff --git a/server/coordinator/procedure/ddl/common_util.go b/server/coordinator/procedure/ddl/common_util.go
index 1cdbcd9..3d1c1c9 100644
--- a/server/coordinator/procedure/ddl/common_util.go
+++ b/server/coordinator/procedure/ddl/common_util.go
@@ -67,18 +67,25 @@
 	}
 }
 
-func GetShardVersionByTableName(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string, shardVersions map[storage.ShardID]uint64) (storage.Table, metadata.ShardVersionUpdate, error) {
+func GetTableMetadata(clusterMetadata *metadata.ClusterMetadata, schemaName, tableName string) (storage.Table, error) {
 	table, exists, err := clusterMetadata.GetTable(schemaName, tableName)
 	if err != nil {
-		return storage.Table{}, metadata.ShardVersionUpdate{}, err
+		return storage.Table{}, err
 	}
 	if !exists {
-		return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrTableNotExists, "table not exists")
+		return storage.Table{}, errors.WithMessagef(procedure.ErrTableNotExists, "table not exists, tableName:%s", tableName)
 	}
+	return table, nil
+}
 
+// BuildShardVersionUpdate builds metadata.ShardVersionUpdate to assist DDL on the shard.
+//
+// And if no error is thrown, the returned boolean value is used to tell whether this table is allocated to shard.
+// In some cases, we need to use this value to determine whether DDL can be executed normally。
+func BuildShardVersionUpdate(table storage.Table, clusterMetadata *metadata.ClusterMetadata, shardVersions map[storage.ShardID]uint64) (metadata.ShardVersionUpdate, bool, error) {
 	shardNodesResult, err := clusterMetadata.GetShardNodeByTableIDs([]storage.TableID{table.ID})
 	if err != nil {
-		return storage.Table{}, metadata.ShardVersionUpdate{}, err
+		return metadata.ShardVersionUpdate{}, false, err
 	}
 
 	leader := storage.ShardNode{}
@@ -92,21 +99,22 @@
 	}
 
 	if !found {
-		return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessage(procedure.ErrShardLeaderNotFound, "can't find leader")
+		log.Warn("table can't find leader shard", zap.String("tableName", table.Name))
+		return metadata.ShardVersionUpdate{}, false, nil
 	}
 
 	prevVersion, exists := shardVersions[leader.ID]
 	if !exists {
-		return storage.Table{}, metadata.ShardVersionUpdate{}, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
+		return metadata.ShardVersionUpdate{}, false, errors.WithMessagef(metadata.ErrShardNotFound, "shard not found in shardVersions, shardID:%d", leader.ID)
 	}
 
 	currVersion := prevVersion + 1
 
-	return table, metadata.ShardVersionUpdate{
+	return metadata.ShardVersionUpdate{
 		ShardID:     leader.ID,
 		CurrVersion: currVersion,
 		PrevVersion: prevVersion,
-	}, nil
+	}, true, nil
 }
 
 func DispatchDropTable(ctx context.Context, clusterMetadata *metadata.ClusterMetadata, dispatch eventdispatch.Dispatch, schemaName string, table storage.Table, version metadata.ShardVersionUpdate) error {
diff --git a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go
index 5b91a3c..f07e06e 100644
--- a/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go
+++ b/server/coordinator/procedure/ddl/createpartitiontable/create_partition_table.go
@@ -139,6 +139,7 @@
 			}
 			if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil {
 				p.updateStateWithLock(procedure.StateFailed)
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "create partition table")
 			}
 		case stateCreatePartitionTable:
@@ -147,6 +148,7 @@
 			}
 			if err := p.fsm.Event(eventCreateSubTables, createPartitionTableRequest); err != nil {
 				p.updateStateWithLock(procedure.StateFailed)
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "create data tables")
 			}
 		case stateCreateSubTables:
@@ -155,12 +157,14 @@
 			}
 			if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil {
 				p.updateStateWithLock(procedure.StateFailed)
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "update table shard metadata")
 			}
 		case stateFinish:
 			// TODO: The state update sequence here is inconsistent with the previous one. Consider reconstructing the state update logic of the state machine.
 			p.updateStateWithLock(procedure.StateFinished)
 			if err := p.persist(ctx); err != nil {
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "create partition table procedure persist")
 			}
 			return nil
diff --git a/server/coordinator/procedure/ddl/createtable/create_table.go b/server/coordinator/procedure/ddl/createtable/create_table.go
index b750195..f4ae07a 100644
--- a/server/coordinator/procedure/ddl/createtable/create_table.go
+++ b/server/coordinator/procedure/ddl/createtable/create_table.go
@@ -203,10 +203,12 @@
 		if err1 != nil {
 			err = errors.WithMessagef(err, "send eventFailed, err:%v", err1)
 		}
+		_ = p.params.OnFailed(err)
 		return errors.WithMessage(err, "send eventPrepare")
 	}
 
 	if err := p.fsm.Event(eventSuccess, req); err != nil {
+		_ = p.params.OnFailed(err)
 		return errors.WithMessage(err, "send eventSuccess")
 	}
 
diff --git a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go
index af3a010..d6ffc49 100644
--- a/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go
+++ b/server/coordinator/procedure/ddl/droppartitiontable/drop_partition_table.go
@@ -101,11 +101,11 @@
 			return procedure.RelatedVersionInfo{}, errors.WithMessagef(err, "get sub table, tableName:%s", subTableName)
 		}
 		if !exists {
-			return procedure.RelatedVersionInfo{}, errors.WithMessagef(procedure.ErrTableNotExists, "get sub table, tableName:%s", subTableName)
+			continue
 		}
 		shardID, exists := tableShardMapping[table.ID]
 		if !exists {
-			return procedure.RelatedVersionInfo{}, errors.WithMessagef(metadata.ErrShardNotFound, "get shard of sub table, tableID:%d", table.ID)
+			continue
 		}
 		shardView, exists := params.ClusterSnapshot.Topology.ShardViewsMapping[shardID]
 		if !exists {
@@ -154,6 +154,7 @@
 			}
 			if err := p.fsm.Event(eventDropDataTable, dropPartitionTableRequest); err != nil {
 				p.updateStateWithLock(procedure.StateFailed)
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "drop partition table procedure")
 			}
 		case stateDropDataTable:
@@ -162,6 +163,7 @@
 			}
 			if err := p.fsm.Event(eventDropPartitionTable, dropPartitionTableRequest); err != nil {
 				p.updateStateWithLock(procedure.StateFailed)
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "drop partition table procedure drop data table")
 			}
 		case stateDropPartitionTable:
@@ -170,11 +172,13 @@
 			}
 			if err := p.fsm.Event(eventFinish, dropPartitionTableRequest); err != nil {
 				p.updateStateWithLock(procedure.StateFailed)
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "drop partition table procedure drop partition table")
 			}
 		case stateFinish:
 			p.updateStateWithLock(procedure.StateFinished)
 			if err := p.persist(ctx); err != nil {
+				_ = p.params.OnFailed(err)
 				return errors.WithMessage(err, "drop partition table procedure persist")
 			}
 			return nil
@@ -278,11 +282,28 @@
 
 	shardVersions := req.p.relatedVersionInfo.ShardWithVersion
 	for _, tableName := range params.SourceReq.PartitionTableInfo.SubTableNames {
-		table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, req.schemaName(), tableName, shardVersions)
+		table, err := ddl.GetTableMetadata(params.ClusterMetadata, req.schemaName(), tableName)
 		if err != nil {
-			procedure.CancelEventWithLog(event, err, fmt.Sprintf("get shard version by table, table:%s", tableName))
+			log.Warn("get table metadata failed", zap.String("tableName", tableName))
+			continue
+		}
+
+		shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, shardVersions)
+		if err != nil {
+			log.Error("get shard version by table", zap.String("tableName", tableName), zap.Error(err))
+			procedure.CancelEventWithLog(event, err, "build shard version update", zap.String("tableName", tableName))
 			return
 		}
+		// If the shard corresponding to this table does not exist, it means that the actual table creation failed.
+		// In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table.
+		if !shardExists {
+			_, err := params.ClusterMetadata.DropTableMetadata(req.ctx, req.schemaName(), tableName)
+			if err != nil {
+				procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", tableName))
+				return
+			}
+			continue
+		}
 
 		if err := ddl.DispatchDropTable(req.ctx, params.ClusterMetadata, params.Dispatch, params.SourceReq.GetSchemaName(), table, shardVersionUpdate); err != nil {
 			procedure.CancelEventWithLog(event, err, fmt.Sprintf("drop table, table:%s", tableName))
diff --git a/server/coordinator/procedure/ddl/droptable/drop_table.go b/server/coordinator/procedure/ddl/droptable/drop_table.go
index e546f64..940daa8 100644
--- a/server/coordinator/procedure/ddl/droptable/drop_table.go
+++ b/server/coordinator/procedure/ddl/droptable/drop_table.go
@@ -51,9 +51,33 @@
 	}
 	params := req.p.params
 
-	table, shardVersionUpdate, err := ddl.GetShardVersionByTableName(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName(), req.p.relatedVersionInfo.ShardWithVersion)
+	table, err := ddl.GetTableMetadata(params.ClusterMetadata, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
 	if err != nil {
-		procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()))
+		procedure.CancelEventWithLog(event, err, "get table metadata", zap.String("tableName", params.SourceReq.GetName()), zap.Error(err))
+		return
+	}
+	req.ret = metadata.TableInfo{
+		ID:            table.ID,
+		Name:          table.Name,
+		SchemaID:      table.SchemaID,
+		SchemaName:    params.SourceReq.GetSchemaName(),
+		PartitionInfo: table.PartitionInfo,
+	}
+
+	shardVersionUpdate, shardExists, err := ddl.BuildShardVersionUpdate(table, params.ClusterMetadata, req.p.relatedVersionInfo.ShardWithVersion)
+	if err != nil {
+		log.Error("get shard version by table", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err))
+		procedure.CancelEventWithLog(event, err, "get shard version by table name", zap.String("tableName", params.SourceReq.GetName()), zap.Bool("shardExists", shardExists), zap.Error(err))
+		return
+	}
+	// If the shard corresponding to this table does not exist, it means that the actual table creation failed.
+	// In order to ensure that the table can be deleted normally, we need to directly delete the metadata of the table.
+	if !shardExists {
+		_, err = params.ClusterMetadata.DropTable(req.ctx, params.SourceReq.GetSchemaName(), params.SourceReq.GetName())
+		if err != nil {
+			procedure.CancelEventWithLog(event, err, "drop table metadata", zap.String("tableName", params.SourceReq.GetName()))
+			return
+		}
 		return
 	}
 
@@ -77,14 +101,6 @@
 		procedure.CancelEventWithLog(event, procedure.ErrDropTableResult, fmt.Sprintf("legnth of shardVersionResult is %d", len(result.ShardVersionUpdate)))
 		return
 	}
-
-	req.ret = metadata.TableInfo{
-		ID:            table.ID,
-		Name:          table.Name,
-		SchemaID:      table.SchemaID,
-		SchemaName:    params.SourceReq.GetSchemaName(),
-		PartitionInfo: table.PartitionInfo,
-	}
 }
 
 func successCallback(event *fsm.Event) {
diff --git a/server/coordinator/procedure/manager_impl.go b/server/coordinator/procedure/manager_impl.go
index d60437d..425150a 100644
--- a/server/coordinator/procedure/manager_impl.go
+++ b/server/coordinator/procedure/manager_impl.go
@@ -156,12 +156,14 @@
 
 func (m *ManagerImpl) startProcedureWorker(ctx context.Context, newProcedure Procedure, procedureWorkerChan chan struct{}) {
 	go func() {
+		start := time.Now()
 		m.logger.Info("procedure start", zap.Uint64("procedureID", newProcedure.ID()))
 		err := newProcedure.Start(ctx)
 		if err != nil {
-			m.logger.Error("procedure start failed", zap.Error(err))
+			m.logger.Error("procedure start failed", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds()))
+		} else {
+			m.logger.Info("procedure start finish", zap.Uint64("procedureID", newProcedure.ID()), zap.Int64("costTime", time.Since(start).Milliseconds()))
 		}
-		m.logger.Info("procedure finish", zap.Uint64("procedureID", newProcedure.ID()))
 		for shardID := range newProcedure.RelatedVersionInfo().ShardWithVersion {
 			m.lock.Lock()
 			delete(m.runningProcedures, shardID)
diff --git a/server/service/grpc/service.go b/server/service/grpc/service.go
index 2775e7e..70b106c 100644
--- a/server/service/grpc/service.go
+++ b/server/service/grpc/service.go
@@ -146,6 +146,7 @@
 
 // CreateTable implements gRPC CeresmetaServer.
 func (s *Service) CreateTable(ctx context.Context, req *metaservicepb.CreateTableRequest) (*metaservicepb.CreateTableResponse, error) {
+	start := time.Now()
 	// Since there may be too many table creation requests, a flow limiter is added here.
 	if ok, err := s.allow(); !ok {
 		return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table grpc request is rejected by flow limiter")}, nil
@@ -201,6 +202,7 @@
 
 	select {
 	case ret := <-resultCh:
+		log.Info("create table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
 		return &metaservicepb.CreateTableResponse{
 			Header: okResponseHeader(),
 			CreatedTable: &metaservicepb.TableInfo{
@@ -216,12 +218,14 @@
 			},
 		}, nil
 	case err = <-errorCh:
+		log.Warn("create table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
 		return &metaservicepb.CreateTableResponse{Header: responseHeader(err, "create table")}, nil
 	}
 }
 
 // DropTable implements gRPC CeresmetaServer.
 func (s *Service) DropTable(ctx context.Context, req *metaservicepb.DropTableRequest) (*metaservicepb.DropTableResponse, error) {
+	start := time.Now()
 	// Since there may be too many table dropping requests, a flow limiter is added here.
 	if ok, err := s.allow(); !ok {
 		return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table grpc request is rejected by flow limiter")}, nil
@@ -275,17 +279,19 @@
 
 	err = c.GetProcedureManager().Submit(ctx, procedure)
 	if err != nil {
-		log.Error("fail to drop table, manager submit procedure", zap.Error(err))
+		log.Error("fail to drop table, manager submit procedure", zap.Error(err), zap.Int64("costTime", time.Since(start).Milliseconds()))
 		return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
 	}
 
 	select {
 	case ret := <-resultCh:
+		log.Info("drop table finish", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
 		return &metaservicepb.DropTableResponse{
 			Header:       okResponseHeader(),
 			DroppedTable: metadata.ConvertTableInfoToPB(ret),
 		}, nil
 	case err = <-errorCh:
+		log.Info("drop table failed", zap.String("tableName", req.Name), zap.Int64("costTime", time.Since(start).Milliseconds()))
 		return &metaservicepb.DropTableResponse{Header: responseHeader(err, "drop table")}, nil
 	}
 }