blob: 8f837a9ce91e5a7d21855e7785effcfc37dac245 [file] [log] [blame]
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package procedure
import (
"context"
"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/id"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
"go.uber.org/zap"
)
type Factory struct {
idAllocator id.Allocator
dispatch eventdispatch.Dispatch
storage Storage
clusterManager cluster.Manager
shardPicker ShardPicker
// TODO: This is a temporary implementation version, which needs to be refined to the table level later.
partitionTableProportionOfNodes float32
}
type ScatterRequest struct {
Cluster *cluster.Cluster
ShardIDs []storage.ShardID
}
type CreateTableRequest struct {
Cluster *cluster.Cluster
SourceReq *metaservicepb.CreateTableRequest
OnSucceeded func(cluster.CreateTableResult) error
OnFailed func(error) error
}
type DropTableRequest struct {
Cluster *cluster.Cluster
SourceReq *metaservicepb.DropTableRequest
OnSucceeded func(cluster.TableInfo) error
OnFailed func(error) error
}
type TransferLeaderRequest struct {
ClusterName string
ShardID storage.ShardID
OldLeaderNodeName string
NewLeaderNodeName string
ClusterVersion uint64
}
type SplitRequest struct {
ClusterName string
SchemaName string
TableNames []string
ShardID storage.ShardID
NewShardID storage.ShardID
TargetNodeName string
ClusterVersion uint64
}
type CreatePartitionTableRequest struct {
ClusterName string
SourceReq *metaservicepb.CreateTableRequest
PartitionTableRatioOfNodes float32
OnSucceeded func(cluster.CreateTableResult) error
OnFailed func(error) error
}
func NewFactory(allocator id.Allocator, dispatch eventdispatch.Dispatch, storage Storage, manager cluster.Manager, partitionTableProportionOfNodes float32) *Factory {
return &Factory{
idAllocator: allocator,
dispatch: dispatch,
storage: storage,
clusterManager: manager,
shardPicker: NewRandomShardPicker(manager),
partitionTableProportionOfNodes: partitionTableProportionOfNodes,
}
}
func (f *Factory) CreateScatterProcedure(ctx context.Context, request ScatterRequest) (Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}
procedure := NewScatterProcedure(f.dispatch, request.Cluster, id, request.ShardIDs)
return procedure, nil
}
func (f *Factory) MakeCreateTableProcedure(ctx context.Context, request CreateTableRequest) (Procedure, error) {
procedure, err := NewCreateTableProcedure(ctx, f, request.Cluster,
request.SourceReq, request.OnSucceeded, request.OnFailed)
if err != nil {
return nil, err
}
return procedure, nil
}
func (f *Factory) makeCreateNormalTableProcedure(ctx context.Context, request CreateTableRequest) (Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}
procedure := NewCreateNormalTableProcedure(f.dispatch, request.Cluster, id,
request.SourceReq, request.OnSucceeded, request.OnFailed)
return procedure, nil
}
func (f *Factory) makeCreatePartitionTableProcedure(ctx context.Context, request CreatePartitionTableRequest) (Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}
c, err := f.clusterManager.GetCluster(ctx, request.ClusterName)
if err != nil {
log.Error("cluster not found", zap.String("clusterName", request.ClusterName))
return nil, cluster.ErrClusterNotFound
}
getNodeShardResult, err := c.GetNodeShards(ctx)
if err != nil {
log.Error("cluster get node shard result")
return nil, err
}
nodeNames := make(map[string]int)
for _, nodeShard := range getNodeShardResult.NodeShards {
nodeNames[nodeShard.ShardNode.NodeName] = 1
}
partitionTableNum := Max(1, int(float32(len(nodeNames))*request.PartitionTableRatioOfNodes))
partitionTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, partitionTableNum, false)
if err != nil {
return nil, errors.WithMessage(err, "pick partition table shards")
}
dataTableShards, err := f.shardPicker.PickShards(ctx, request.ClusterName, len(request.SourceReq.PartitionTableInfo.SubTableNames), true)
if err != nil {
return nil, errors.WithMessage(err, "pick data table shards")
}
procedure := NewCreatePartitionTableProcedure(CreatePartitionTableProcedureRequest{
id: id, cluster: c, dispatch: f.dispatch, storage: f.storage,
req: request.SourceReq, partitionTableShards: partitionTableShards, dataTablesShards: dataTableShards,
onSucceeded: request.OnSucceeded, onFailed: request.OnFailed,
})
return procedure, nil
}
func (f *Factory) CreateDropTableProcedure(ctx context.Context, request DropTableRequest) (Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}
procedure := NewDropTableProcedure(f.dispatch, request.Cluster, id,
request.SourceReq, request.OnSucceeded, request.OnFailed)
return procedure, nil
}
func (f *Factory) CreateTransferLeaderProcedure(ctx context.Context, request TransferLeaderRequest) (Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}
c, err := f.clusterManager.GetCluster(ctx, request.ClusterName)
if err != nil {
log.Error("cluster not found", zap.String("clusterName", request.ClusterName))
return nil, cluster.ErrClusterNotFound
}
return NewTransferLeaderProcedure(f.dispatch, c, f.storage,
request.ShardID, request.OldLeaderNodeName, request.NewLeaderNodeName, id)
}
func (f *Factory) CreateSplitProcedure(ctx context.Context, request SplitRequest) (Procedure, error) {
id, err := f.allocProcedureID(ctx)
if err != nil {
return nil, err
}
c, err := f.clusterManager.GetCluster(ctx, request.ClusterName)
if err != nil {
log.Error("cluster not found", zap.String("clusterName", request.ClusterName))
return nil, cluster.ErrClusterNotFound
}
procedure := NewSplitProcedure(id, f.dispatch, f.storage, c, request.SchemaName, request.ShardID, request.NewShardID, request.TableNames, request.TargetNodeName)
return procedure, nil
}
func (f *Factory) allocProcedureID(ctx context.Context) (uint64, error) {
id, err := f.idAllocator.Alloc(ctx)
if err != nil {
return 0, errors.WithMessage(err, "alloc procedure id")
}
return id, nil
}