blob: 7c512dde91fd3a03494a40ddb1e03a0586eb7721 [file] [log] [blame]
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package procedure
import (
"context"
"encoding/json"
"sync"
"github.com/CeresDB/ceresdbproto/golang/pkg/metaservicepb"
"github.com/CeresDB/ceresmeta/pkg/log"
"github.com/CeresDB/ceresmeta/server/cluster"
"github.com/CeresDB/ceresmeta/server/coordinator/eventdispatch"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/looplab/fsm"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// fsm state change:
// ┌────────┐ ┌──────────────────────┐ ┌────────────────────┐ ┌────────────────────┐ ┌───────────┐
// │ Begin ├─────▶ CreatePartitionTable ├─────▶ CreateDataTables ├─────▶OpenPartitionTables ├─────▶ Finish │
// └────────┘ └──────────────────────┘ └────────────────────┘ └────────────────────┘ └───────────┘
const (
eventCreatePartitionTable = "EventCreatePartitionTable"
eventCreateDataTables = "EventCreateDataTables"
eventOpenPartitionTables = "EventOpenPartitionTables"
eventFinish = "EventSuccess"
stateBegin = "StateCreatePartitionTableBegin"
stateCreatePartitionTable = "StateCreatePartitionTable"
stateCreateDataTables = "StateCreatePartitionTableCreateDataTables"
stateOpenPartitionTables = "StateOpenPartitionTables"
stateFinish = "StateCreatePartitionTableFinish"
)
var (
createPartitionTableEvents = fsm.Events{
{Name: eventCreatePartitionTable, Src: []string{stateBegin}, Dst: stateCreatePartitionTable},
{Name: eventCreateDataTables, Src: []string{stateCreatePartitionTable}, Dst: stateCreateDataTables},
{Name: eventOpenPartitionTables, Src: []string{stateCreateDataTables}, Dst: stateOpenPartitionTables},
{Name: eventFinish, Src: []string{stateOpenPartitionTables}, Dst: stateFinish},
}
createPartitionTableCallbacks = fsm.Callbacks{
eventCreatePartitionTable: createPartitionTableCallback,
eventCreateDataTables: createDataTablesCallback,
eventOpenPartitionTables: openPartitionTablesCallback,
eventFinish: finishCallback,
}
)
type CreatePartitionTableProcedure struct {
id uint64
fsm *fsm.FSM
cluster *cluster.Cluster
dispatch eventdispatch.Dispatch
storage Storage
req *metaservicepb.CreateTableRequest
partitionTableShards []cluster.ShardNodeWithVersion
dataTablesShards []cluster.ShardNodeWithVersion
onSucceeded func(cluster.CreateTableResult) error
onFailed func(error) error
lock sync.RWMutex
state State
}
type CreatePartitionTableProcedureRequest struct {
id uint64
cluster *cluster.Cluster
dispatch eventdispatch.Dispatch
storage Storage
req *metaservicepb.CreateTableRequest
partitionTableShards []cluster.ShardNodeWithVersion
dataTablesShards []cluster.ShardNodeWithVersion
onSucceeded func(cluster.CreateTableResult) error
onFailed func(error) error
}
func NewCreatePartitionTableProcedure(request CreatePartitionTableProcedureRequest) *CreatePartitionTableProcedure {
fsm := fsm.NewFSM(
stateBegin,
createPartitionTableEvents,
createPartitionTableCallbacks,
)
return &CreatePartitionTableProcedure{
id: request.id,
fsm: fsm,
cluster: request.cluster,
dispatch: request.dispatch,
storage: request.storage,
req: request.req,
partitionTableShards: request.partitionTableShards,
dataTablesShards: request.dataTablesShards,
onSucceeded: request.onSucceeded,
onFailed: request.onFailed,
}
}
func (p *CreatePartitionTableProcedure) ID() uint64 {
return p.id
}
func (p *CreatePartitionTableProcedure) Typ() Typ {
return CreatePartitionTable
}
func (p *CreatePartitionTableProcedure) Start(ctx context.Context) error {
p.updateStateWithLock(StateRunning)
createPartitionTableRequest := CreatePartitionTableCallbackRequest{
ctx: ctx,
cluster: p.cluster,
dispatch: p.dispatch,
sourceReq: p.req,
partitionTableShards: p.partitionTableShards,
dataTablesShards: p.dataTablesShards,
onSucceeded: p.onSucceeded,
onFailed: p.onFailed,
}
for {
switch p.fsm.Current() {
case stateBegin:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
if err := p.fsm.Event(eventCreatePartitionTable, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure create new shard view")
}
case stateCreatePartitionTable:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
if err := p.fsm.Event(eventCreateDataTables, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure create partition table")
}
case stateCreateDataTables:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
if err := p.fsm.Event(eventOpenPartitionTables, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure create data tables")
}
case stateOpenPartitionTables:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
if err := p.fsm.Event(eventFinish, createPartitionTableRequest); err != nil {
p.updateStateWithLock(StateFailed)
return errors.WithMessagef(err, "create partition table procedure open partition tables")
}
case stateFinish:
if err := p.persist(ctx); err != nil {
return errors.WithMessage(err, "create partition table procedure persist")
}
p.updateStateWithLock(StateFinished)
return nil
}
}
}
func (p *CreatePartitionTableProcedure) Cancel(_ context.Context) error {
p.updateStateWithLock(StateCancelled)
return nil
}
func (p *CreatePartitionTableProcedure) State() State {
p.lock.RLock()
defer p.lock.RUnlock()
return p.state
}
type CreatePartitionTableCallbackRequest struct {
ctx context.Context
cluster *cluster.Cluster
dispatch eventdispatch.Dispatch
sourceReq *metaservicepb.CreateTableRequest
onSucceeded func(cluster.CreateTableResult) error
onFailed func(error) error
createTableResult cluster.CreateTableResult
partitionTableShards []cluster.ShardNodeWithVersion
dataTablesShards []cluster.ShardNodeWithVersion
}
// 1. Create partition table in target node.
func createPartitionTableCallback(event *fsm.Event) {
req, err := getRequestFromEvent[CreatePartitionTableCallbackRequest](event)
if err != nil {
cancelEventWithLog(event, err, "get request from event")
return
}
// Select first shard to create partition table.
partitionTableShardNode := req.partitionTableShards[0]
createTableResult, err := createTableMetadata(req.ctx, req.cluster, req.sourceReq.GetSchemaName(), req.sourceReq.GetName(), partitionTableShardNode.ShardNode.NodeName, true)
if err != nil {
cancelEventWithLog(event, err, "create table metadata")
return
}
req.createTableResult = createTableResult
if err = createTableOnShard(req.ctx, req.cluster, req.dispatch, partitionTableShardNode.ShardInfo.ID, buildCreateTableRequest(createTableResult, req.sourceReq, true)); err != nil {
cancelEventWithLog(event, err, "dispatch create table on shard")
return
}
}
// 2. Create data tables in target nodes.
func createDataTablesCallback(event *fsm.Event) {
req, err := getRequestFromEvent[CreatePartitionTableCallbackRequest](event)
if err != nil {
cancelEventWithLog(event, err, "get request from event")
return
}
for i, dataTableShard := range req.dataTablesShards {
createTableResult, err := createTableMetadata(req.ctx, req.cluster, req.sourceReq.GetSchemaName(), req.sourceReq.GetPartitionTableInfo().SubTableNames[i], dataTableShard.ShardNode.NodeName, false)
if err != nil {
cancelEventWithLog(event, err, "create table metadata")
return
}
if err = createTableOnShard(req.ctx, req.cluster, req.dispatch, dataTableShard.ShardInfo.ID, buildCreateTableRequest(createTableResult, req.sourceReq, false)); err != nil {
cancelEventWithLog(event, err, "dispatch create table on shard")
return
}
}
}
// 3. Open partition table in target nodes.
// TODO: Replace open table implementation, avoid reopening shard.
func openPartitionTablesCallback(event *fsm.Event) {
req, err := getRequestFromEvent[CreatePartitionTableCallbackRequest](event)
if err != nil {
cancelEventWithLog(event, err, "get request from event")
return
}
partitionTable, _, err := req.cluster.GetTable(req.sourceReq.GetSchemaName(), req.sourceReq.GetName())
if err != nil {
cancelEventWithLog(event, err, "get table", zap.String("schemaName", req.sourceReq.GetSchemaName()), zap.String("tableName", req.sourceReq.GetName()))
return
}
req.partitionTableShards = append(req.partitionTableShards[:0], req.partitionTableShards[1:]...)
for _, partitionTableShard := range req.partitionTableShards {
// Update table shard mapping.
originShardTables := req.cluster.GetShardTables([]storage.ShardID{partitionTableShard.ShardInfo.ID}, partitionTableShard.ShardNode.NodeName)[partitionTableShard.ShardInfo.ID]
originShardTables.Shard.Version++
originShardTables.Tables = append(originShardTables.Tables, cluster.TableInfo{
ID: partitionTable.ID,
Name: partitionTable.Name,
SchemaID: partitionTable.SchemaID,
SchemaName: req.sourceReq.GetSchemaName(),
})
if err := req.cluster.UpdateShardTables(req.ctx, []cluster.ShardTables{originShardTables}); err != nil {
cancelEventWithLog(event, err, "update shard tables")
return
}
// Reopen partition table shard.
if err := req.dispatch.CloseShard(req.ctx, partitionTableShard.ShardNode.NodeName, eventdispatch.CloseShardRequest{
ShardID: uint32(partitionTableShard.ShardNode.ID),
}); err != nil {
cancelEventWithLog(event, err, "close shard")
return
}
if err := req.dispatch.OpenShard(req.ctx, partitionTableShard.ShardNode.NodeName, eventdispatch.OpenShardRequest{
Shard: partitionTableShard.ShardInfo,
}); err != nil {
cancelEventWithLog(event, err, "open shard")
return
}
}
}
func finishCallback(event *fsm.Event) {
req, err := getRequestFromEvent[CreatePartitionTableCallbackRequest](event)
if err != nil {
cancelEventWithLog(event, err, "get request from event")
return
}
log.Info("create partition table finish")
if err := req.onSucceeded(req.createTableResult); err != nil {
cancelEventWithLog(event, err, "create partition table on succeeded")
return
}
}
func (p *CreatePartitionTableProcedure) updateStateWithLock(state State) {
p.lock.Lock()
defer p.lock.Unlock()
p.state = state
}
func (p *CreatePartitionTableProcedure) persist(ctx context.Context) error {
meta, err := p.convertToMeta()
if err != nil {
return errors.WithMessage(err, "convert to meta")
}
err = p.storage.CreateOrUpdate(ctx, meta)
if err != nil {
return errors.WithMessage(err, "createOrUpdate procedure storage")
}
return nil
}
type CreatePartitionTableRawData struct {
ID uint64
FsmState string
State State
CreateTableResult cluster.CreateTableResult
PartitionTableShards []cluster.ShardNodeWithVersion
DataTablesShards []cluster.ShardNodeWithVersion
}
func (p *CreatePartitionTableProcedure) convertToMeta() (Meta, error) {
p.lock.RLock()
defer p.lock.RUnlock()
rawData := CreatePartitionTableRawData{
ID: p.id,
FsmState: p.fsm.Current(),
State: p.state,
PartitionTableShards: p.partitionTableShards,
DataTablesShards: p.dataTablesShards,
}
rawDataBytes, err := json.Marshal(rawData)
if err != nil {
return Meta{}, ErrEncodeRawData.WithCausef("marshal raw data, procedureID:%v, err:%v", p.id, err)
}
meta := Meta{
ID: p.id,
Typ: CreatePartitionTable,
State: p.state,
RawData: rawDataBytes,
}
return meta, nil
}