blob: 92986dd115d95a468f6317b0b7eb2cadeaff38d8 [file] [log] [blame]
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
package metadata
import (
"context"
"fmt"
"sync"
"github.com/CeresDB/ceresmeta/server/id"
"github.com/CeresDB/ceresmeta/server/storage"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// TopologyManager manages the cluster topology, including the mapping relationship between shards, nodes, and tables.
type TopologyManager interface {
// Load load cluster topology from storage.
Load(ctx context.Context) error
// GetVersion get cluster view version.
GetVersion() uint64
// GetClusterState get cluster view state.
GetClusterState() storage.ClusterState
// GetTableIDs get shardNode and tablesIDs with shardID and nodeName.
GetTableIDs(shardIDs []storage.ShardID) map[storage.ShardID]ShardTableIDs
// AddTable add table to cluster topology.
AddTable(ctx context.Context, shardID storage.ShardID, tables []storage.Table) (ShardVersionUpdate, error)
// RemoveTable remove table on target shards from cluster topology.
RemoveTable(ctx context.Context, shardID storage.ShardID, tableIDs []storage.TableID) (ShardVersionUpdate, error)
// EvictTable evict table from cluster topology.
EvictTable(ctx context.Context, tableID storage.TableID) ([]ShardVersionUpdate, error)
// GetShardNodesByID get shardNodes with shardID.
GetShardNodesByID(shardID storage.ShardID) ([]storage.ShardNode, error)
// GetShardNodesByTableIDs get shardNodes with tableIDs.
GetShardNodesByTableIDs(tableID []storage.TableID) (GetShardNodesByTableIDsResult, error)
// GetShardNodes get all shardNodes in cluster topology.
GetShardNodes() GetShardNodesResult
// DropShardNodes drop target shardNodes in cluster topology.
DropShardNodes(ctx context.Context, shardNodes []storage.ShardNode) error
// InitClusterView init cluster view when create new cluster.
InitClusterView(ctx context.Context) error
// UpdateClusterView update cluster view with shardNodes.
UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error
// UpdateClusterViewByNode update cluster view with target shardNodes, it will only update shardNodes corresponding the node name.
UpdateClusterViewByNode(ctx context.Context, shardNodes map[string][]storage.ShardNode) error
// GetClusterView return current cluster view.
GetClusterView() storage.ClusterView
// CreateShardViews create shardViews.
CreateShardViews(ctx context.Context, shardViews []CreateShardView) error
// GetTopology get current topology snapshot.
GetTopology() Topology
}
type ShardTableIDs struct {
TableIDs []storage.TableID
Version uint64
}
type GetShardTablesByNodeResult struct {
ShardTableIDs map[storage.ShardID]ShardTableIDs
}
type GetShardNodesByTableIDsResult struct {
ShardNodes map[storage.TableID][]storage.ShardNode
Version map[storage.ShardID]uint64
}
type GetShardNodesResult struct {
shardNodes []storage.ShardNode
versions map[storage.ShardID]uint64
}
type CreateShardView struct {
ShardID storage.ShardID
Tables []storage.TableID
}
type Topology struct {
ShardViewsMapping map[storage.ShardID]storage.ShardView
ClusterView storage.ClusterView
}
func (t *Topology) IsStable() bool {
if t.ClusterView.State != storage.ClusterStateStable {
return false
}
if len(t.ClusterView.ShardNodes) != len(t.ShardViewsMapping) {
return false
}
return true
}
func (t *Topology) IsPrepareFinished() bool {
if t.ClusterView.State != storage.ClusterStatePrepare {
return false
}
if len(t.ShardViewsMapping) != len(t.ClusterView.ShardNodes) {
return false
}
return true
}
type TopologyManagerImpl struct {
logger *zap.Logger
storage storage.Storage
clusterID storage.ClusterID
shardIDAlloc id.Allocator
// RWMutex is used to protect following fields.
lock sync.RWMutex
clusterView storage.ClusterView // ClusterView in memory.
shardNodesMapping map[storage.ShardID][]storage.ShardNode // ShardID -> nodes of the shard
nodeShardsMapping map[string][]storage.ShardNode // nodeName -> shards of the NodeName
// ShardView in memory.
shardViews []storage.ShardView
shardTablesMapping map[storage.ShardID]*storage.ShardView // ShardID -> shardTopology
tableShardMapping map[storage.TableID][]storage.ShardID // tableID -> ShardID
nodes map[string]storage.Node // NodeName in memory.
}
func NewTopologyManagerImpl(logger *zap.Logger, storage storage.Storage, clusterID storage.ClusterID, shardIDAlloc id.Allocator) TopologyManager {
return &TopologyManagerImpl{
logger: logger,
storage: storage,
clusterID: clusterID,
shardIDAlloc: shardIDAlloc,
}
}
func (m *TopologyManagerImpl) Load(ctx context.Context) error {
m.lock.Lock()
defer m.lock.Unlock()
if err := m.loadClusterView(ctx); err != nil {
return errors.WithMessage(err, "load cluster view")
}
if err := m.loadShardViews(ctx); err != nil {
return errors.WithMessage(err, "load shard views")
}
if err := m.loadNodes(ctx); err != nil {
return errors.WithMessage(err, "load nodes")
}
return nil
}
func (m *TopologyManagerImpl) GetVersion() uint64 {
m.lock.RLock()
defer m.lock.RUnlock()
return m.clusterView.Version
}
func (m *TopologyManagerImpl) GetClusterState() storage.ClusterState {
m.lock.RLock()
defer m.lock.RUnlock()
return m.clusterView.State
}
func (m *TopologyManagerImpl) GetTableIDs(shardIDs []storage.ShardID) map[storage.ShardID]ShardTableIDs {
m.lock.RLock()
defer m.lock.RUnlock()
shardTableIDs := make(map[storage.ShardID]ShardTableIDs, len(shardIDs))
for _, shardID := range shardIDs {
shardView := m.shardTablesMapping[shardID]
shardTableIDs[shardID] = ShardTableIDs{
TableIDs: shardView.TableIDs,
Version: shardView.Version,
}
}
return shardTableIDs
}
func (m *TopologyManagerImpl) AddTable(ctx context.Context, shardID storage.ShardID, tables []storage.Table) (ShardVersionUpdate, error) {
m.lock.Lock()
defer m.lock.Unlock()
shardView, ok := m.shardTablesMapping[shardID]
if !ok {
return ShardVersionUpdate{}, ErrShardNotFound.WithCausef("shard id:%d", shardID)
}
prevVersion := shardView.Version
tableIDsToAdd := make([]storage.TableID, 0, len(tables))
for _, table := range tables {
tableIDsToAdd = append(tableIDsToAdd, table.ID)
}
tableIDs := make([]storage.TableID, 0, len(shardView.TableIDs)+1)
tableIDs = append(tableIDs, shardView.TableIDs...)
tableIDs = append(tableIDs, tableIDsToAdd...)
newShardView := storage.NewShardView(shardID, prevVersion+1, tableIDs)
// Update shard view in storage.
err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{
ClusterID: m.clusterID,
ShardView: newShardView,
LatestVersion: prevVersion,
})
if err != nil {
return ShardVersionUpdate{}, errors.WithMessage(err, "storage update shard view")
}
// Update shard view in memory.
m.shardTablesMapping[shardID] = &newShardView
for _, tableID := range tableIDsToAdd {
_, exists := m.tableShardMapping[tableID]
if !exists {
m.tableShardMapping[tableID] = []storage.ShardID{shardID}
} else {
m.tableShardMapping[tableID] = append(m.tableShardMapping[tableID], shardID)
}
}
m.updateShardView(shardID, newShardView)
return ShardVersionUpdate{
ShardID: shardID,
CurrVersion: prevVersion + 1,
PrevVersion: prevVersion,
}, nil
}
func (m *TopologyManagerImpl) updateShardView(shardID storage.ShardID, newShardView storage.ShardView) {
for i := range m.shardViews {
if m.shardViews[i].ShardID == shardID {
m.shardViews[i] = newShardView
break
}
}
}
func (m *TopologyManagerImpl) RemoveTable(ctx context.Context, shardID storage.ShardID, tableIDs []storage.TableID) (ShardVersionUpdate, error) {
m.lock.Lock()
defer m.lock.Unlock()
shardView, ok := m.shardTablesMapping[shardID]
if !ok {
return ShardVersionUpdate{}, ErrShardNotFound.WithCausef("shard id:%d", shardID)
}
prevVersion := shardView.Version
newTableIDs := make([]storage.TableID, 0, len(shardView.TableIDs))
for _, tableID := range shardView.TableIDs {
for _, tableIDToRemove := range tableIDs {
if tableID != tableIDToRemove {
newTableIDs = append(newTableIDs, tableID)
}
}
}
// Update shardView in storage.
newShardView := storage.NewShardView(shardView.ShardID, prevVersion+1, newTableIDs)
if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{
ClusterID: m.clusterID,
ShardView: newShardView,
LatestVersion: prevVersion,
}); err != nil {
return ShardVersionUpdate{}, errors.WithMessage(err, "storage update shard view")
}
// Update shardView in memory.
shardView.Version = prevVersion + 1
shardView.TableIDs = tableIDs
for _, tableID := range tableIDs {
delete(m.tableShardMapping, tableID)
}
for i, tableID := range m.shardTablesMapping[shardID].TableIDs {
for _, tableIDToRemove := range tableIDs {
if tableIDToRemove == tableID {
lastElementIndex := len(m.shardTablesMapping[shardID].TableIDs) - 1
m.shardTablesMapping[shardID].TableIDs[i] = m.shardTablesMapping[shardID].TableIDs[lastElementIndex]
m.shardTablesMapping[shardID].TableIDs = append(m.shardTablesMapping[shardID].TableIDs[:lastElementIndex], m.shardTablesMapping[shardID].TableIDs[lastElementIndex+1:]...)
}
}
}
m.updateShardView(shardID, newShardView)
return ShardVersionUpdate{
ShardID: shardID,
CurrVersion: prevVersion + 1,
PrevVersion: prevVersion,
}, nil
}
func (m *TopologyManagerImpl) EvictTable(ctx context.Context, tableID storage.TableID) ([]ShardVersionUpdate, error) {
m.lock.Lock()
defer m.lock.Unlock()
shardIDs, ok := m.tableShardMapping[tableID]
if !ok {
return []ShardVersionUpdate{}, ErrTableNotFound.WithCausef("table id:%d", tableID)
}
result := []ShardVersionUpdate{}
for _, shardID := range shardIDs {
shardView, ok := m.shardTablesMapping[shardID]
if !ok {
return nil, ErrShardNotFound.WithCausef("shard id:%d", shardID)
}
prevVersion := shardView.Version
tableIDs := make([]storage.TableID, 0, len(shardView.TableIDs))
for _, id := range shardView.TableIDs {
if id != tableID {
tableIDs = append(tableIDs, id)
}
}
// Update shardView in storage.
// TODO: Move the code that modifies the version to the outside, and the actual version should be obtained from the call result of CeresDB.
newShardView := storage.NewShardView(shardView.ShardID, prevVersion+1, tableIDs)
if err := m.storage.UpdateShardView(ctx, storage.UpdateShardViewRequest{
ClusterID: m.clusterID,
ShardView: newShardView,
LatestVersion: prevVersion,
}); err != nil {
return nil, errors.WithMessage(err, "storage update shard view")
}
// Update shardView in memory.
shardView.Version = prevVersion + 1
shardView.TableIDs = tableIDs
delete(m.tableShardMapping, tableID)
m.updateShardView(shardView.ShardID, newShardView)
result = append(result, ShardVersionUpdate{
ShardID: shardID,
CurrVersion: prevVersion + 1,
PrevVersion: prevVersion,
})
}
return result, nil
}
func (m *TopologyManagerImpl) GetShardNodesByID(shardID storage.ShardID) ([]storage.ShardNode, error) {
m.lock.RLock()
defer m.lock.RUnlock()
shardNodes, ok := m.shardNodesMapping[shardID]
if !ok {
return nil, ErrShardNotFound.WithCausef("shard id:%d", shardID)
}
return shardNodes, nil
}
func (m *TopologyManagerImpl) GetShardNodesByTableIDs(tableIDs []storage.TableID) (GetShardNodesByTableIDsResult, error) {
m.lock.RLock()
defer m.lock.RUnlock()
tableShardNodes := make(map[storage.TableID][]storage.ShardNode, len(tableIDs))
shardViewVersions := make(map[storage.ShardID]uint64, 0)
for _, tableID := range tableIDs {
shardIDs, ok := m.tableShardMapping[tableID]
// If the table is not assigned to any shard, return an empty slice.
if !ok {
tableShardNodes[tableID] = []storage.ShardNode{}
continue
}
for _, shardID := range shardIDs {
shardNodes, ok := m.shardNodesMapping[shardID]
if !ok {
// If the shard is not assigned to any node, return an empty slice.
tableShardNodes[tableID] = []storage.ShardNode{}
continue
}
if _, exists := tableShardNodes[tableID]; !exists {
tableShardNodes[tableID] = shardNodes
} else {
tableShardNodes[tableID] = append(tableShardNodes[tableID], shardNodes...)
}
_, ok = shardViewVersions[shardID]
if !ok {
shardViewVersions[shardID] = m.shardTablesMapping[shardID].Version
}
}
}
return GetShardNodesByTableIDsResult{
ShardNodes: tableShardNodes,
Version: shardViewVersions,
}, nil
}
func (m *TopologyManagerImpl) GetShardNodes() GetShardNodesResult {
m.lock.RLock()
defer m.lock.RUnlock()
shardNodes := make([]storage.ShardNode, 0, len(m.shardNodesMapping))
shardViewVersions := make(map[storage.ShardID]uint64, len(m.shardTablesMapping))
for _, shardNode := range m.shardNodesMapping {
shardNodes = append(shardNodes, shardNode...)
}
for shardID, shardView := range m.shardTablesMapping {
shardViewVersions[shardID] = shardView.Version
}
return GetShardNodesResult{
shardNodes: shardNodes,
versions: shardViewVersions,
}
}
func (m *TopologyManagerImpl) DropShardNodes(ctx context.Context, shardNodes []storage.ShardNode) error {
m.lock.Lock()
defer m.lock.Unlock()
newShardNodes := make([]storage.ShardNode, 0, len(m.clusterView.ShardNodes))
for i := 0; i < len(m.clusterView.ShardNodes); i++ {
if !contains(shardNodes, m.clusterView.ShardNodes[i]) {
newShardNodes = append(newShardNodes, m.clusterView.ShardNodes[i])
}
}
return m.updateClusterViewWithLock(ctx, m.clusterView.State, newShardNodes)
}
func contains(shardNodes []storage.ShardNode, originShardNode storage.ShardNode) bool {
for _, dropShardNode := range shardNodes {
if originShardNode.NodeName == dropShardNode.NodeName && originShardNode.ID == dropShardNode.ID {
return true
}
}
return false
}
func (m *TopologyManagerImpl) InitClusterView(ctx context.Context) error {
clusterView := storage.NewClusterView(m.clusterID, 0, storage.ClusterStateEmpty, []storage.ShardNode{})
err := m.storage.CreateClusterView(ctx, storage.CreateClusterViewRequest{ClusterView: clusterView})
if err != nil {
return errors.WithMessage(err, "storage create cluster view")
}
return nil
}
func (m *TopologyManagerImpl) UpdateClusterView(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error {
m.lock.Lock()
defer m.lock.Unlock()
return m.updateClusterViewWithLock(ctx, state, shardNodes)
}
func (m *TopologyManagerImpl) updateClusterViewWithLock(ctx context.Context, state storage.ClusterState, shardNodes []storage.ShardNode) error {
// Update cluster view in storage.
newClusterView := storage.NewClusterView(m.clusterID, m.clusterView.Version+1, state, shardNodes)
if err := m.storage.UpdateClusterView(ctx, storage.UpdateClusterViewRequest{
ClusterID: m.clusterID,
ClusterView: newClusterView,
LatestVersion: m.clusterView.Version,
}); err != nil {
return errors.WithMessage(err, "storage update cluster view")
}
// Load cluster view into memory.
if err := m.loadClusterView(ctx); err != nil {
return errors.WithMessage(err, "load cluster view")
}
return nil
}
func (m *TopologyManagerImpl) UpdateClusterViewByNode(ctx context.Context, shardNodes map[string][]storage.ShardNode) error {
m.lock.Lock()
defer m.lock.Unlock()
newShardNodes := make([]storage.ShardNode, 0, len(m.clusterView.ShardNodes))
for _, shardNode := range shardNodes {
newShardNodes = append(newShardNodes, shardNode...)
}
originShardNodes := m.clusterView.ShardNodes
for _, shardNode := range originShardNodes {
if _, exists := shardNodes[shardNode.NodeName]; !exists {
newShardNodes = append(newShardNodes, shardNode)
}
}
return m.updateClusterViewWithLock(ctx, m.clusterView.State, newShardNodes)
}
func (m *TopologyManagerImpl) GetClusterView() storage.ClusterView {
m.lock.RLock()
defer m.lock.RUnlock()
return m.clusterView
}
func (m *TopologyManagerImpl) CreateShardViews(ctx context.Context, createShardViews []CreateShardView) error {
m.lock.Lock()
defer m.lock.Unlock()
// Create shard view in storage.
shardViews := make([]storage.ShardView, 0, len(createShardViews))
for _, createShardView := range createShardViews {
shardViews = append(shardViews, storage.NewShardView(createShardView.ShardID, 0, createShardView.Tables))
}
if err := m.storage.CreateShardViews(ctx, storage.CreateShardViewsRequest{
ClusterID: m.clusterID,
ShardViews: shardViews,
}); err != nil {
return errors.WithMessage(err, "storage create shard view")
}
// Load shard view into memory.
if err := m.loadShardViews(ctx); err != nil {
return errors.WithMessage(err, "load shard view")
}
return nil
}
func (m *TopologyManagerImpl) GetTopology() Topology {
m.lock.RLock()
defer m.lock.RUnlock()
shardViewsMapping := make(map[storage.ShardID]storage.ShardView, len(m.shardViews))
for i := 0; i < len(m.shardViews); i++ {
shardViewsMapping[m.shardViews[i].ShardID] = m.shardViews[i]
}
return Topology{
ShardViewsMapping: shardViewsMapping,
ClusterView: m.clusterView,
}
}
func (m *TopologyManagerImpl) loadClusterView(ctx context.Context) error {
clusterViewResult, err := m.storage.GetClusterView(ctx, storage.GetClusterViewRequest{
ClusterID: m.clusterID,
})
if err != nil {
return errors.WithMessage(err, "storage get cluster view")
}
m.logger.Debug("load cluster view", zap.String("clusterViews", fmt.Sprintf("%+v", clusterViewResult)))
m.shardNodesMapping = make(map[storage.ShardID][]storage.ShardNode, len(clusterViewResult.ClusterView.ShardNodes))
m.nodeShardsMapping = make(map[string][]storage.ShardNode, len(clusterViewResult.ClusterView.ShardNodes))
for _, shardNode := range clusterViewResult.ClusterView.ShardNodes {
m.shardNodesMapping[shardNode.ID] = append(m.shardNodesMapping[shardNode.ID], shardNode)
m.nodeShardsMapping[shardNode.NodeName] = append(m.nodeShardsMapping[shardNode.NodeName], shardNode)
}
m.clusterView = clusterViewResult.ClusterView
return nil
}
func (m *TopologyManagerImpl) loadShardViews(ctx context.Context) error {
shardViewsResult, err := m.storage.ListShardViews(ctx, storage.ListShardViewsRequest{
ClusterID: m.clusterID,
})
if err != nil {
return errors.WithMessage(err, "storage list shard views")
}
m.logger.Debug("load shard views", zap.Int32("clusterID", int32(m.clusterID)), zap.String("shardViews", fmt.Sprintf("%+v", shardViewsResult)))
// Reset data in memory.
m.shardViews = shardViewsResult.ShardViews
m.shardTablesMapping = make(map[storage.ShardID]*storage.ShardView, len(shardViewsResult.ShardViews))
m.tableShardMapping = make(map[storage.TableID][]storage.ShardID, 0)
for _, shardView := range shardViewsResult.ShardViews {
view := shardView
m.shardTablesMapping[shardView.ShardID] = &view
for _, tableID := range shardView.TableIDs {
if _, exists := m.tableShardMapping[tableID]; !exists {
m.tableShardMapping[tableID] = []storage.ShardID{}
}
m.tableShardMapping[tableID] = append(m.tableShardMapping[tableID], shardView.ShardID)
}
}
return nil
}
func (m *TopologyManagerImpl) loadNodes(ctx context.Context) error {
nodesResult, err := m.storage.ListNodes(ctx, storage.ListNodesRequest{ClusterID: m.clusterID})
if err != nil {
return errors.WithMessage(err, "storage list nodes")
}
// Reset data in memory.
m.nodes = make(map[string]storage.Node, len(nodesResult.Nodes))
for _, node := range nodesResult.Nodes {
m.nodes[node.Name] = node
}
return nil
}