blob: f1b2e28e2ac138e2be7c4ee91a0dcd630fa0bf66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package serf
import (
"context"
"errors"
"time"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/hashicorp/serf/cmd/serf/command/agent"
"github.com/hashicorp/serf/serf"
)
// Agent warps the serf agent
type Agent struct {
*agent.Agent
conf *Config
readyCh chan struct{}
errorCh chan error
}
// Create create serf agent with config
func Create(conf *Config) (*Agent, error) {
// config cover to serf config
serfConf, err := conf.convertToSerf()
if err != nil {
return nil, err
}
// create serf agent with serf config
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
if err != nil {
return nil, err
}
return &Agent{
Agent: serfAgent,
conf: conf,
readyCh: make(chan struct{}),
errorCh: make(chan error),
}, nil
}
// Start agent
func (a *Agent) Start(ctx context.Context) {
err := a.Agent.Start()
if err != nil {
log.Errorf(err, "start serf agent failed")
a.errorCh <- err
return
}
a.RegisterEventHandler(a)
err = a.retryJoin(ctx)
if err != nil {
log.Errorf(err, "start serf agent failed")
if err != ctx.Err() && a.errorCh != nil {
a.errorCh <- err
}
}
}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
// used for logical grouping of serf nodes
func (a *Agent) HandleEvent(event serf.Event) {
if event.EventType() != serf.EventMemberJoin {
return
}
if a.conf.Mode == ModeCluster {
if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
return
}
}
a.DeregisterEventHandler(a)
close(a.readyCh)
}
// Ready Returns a channel that will be closed when serf is ready
func (a *Agent) Ready() <-chan struct{} {
return a.readyCh
}
// Error Returns a channel that will be transmit a serf error
func (a *Agent) Error() <-chan error {
return a.errorCh
}
// Stop serf agent
func (a *Agent) Stop() {
if a.errorCh != nil {
a.Leave()
a.Shutdown()
close(a.errorCh)
a.errorCh = nil
}
}
// LocalMember returns the Member information for the local node
func (a *Agent) LocalMember() *serf.Member {
serfAgent := a.Agent.Serf()
if serfAgent != nil {
member := serfAgent.LocalMember()
return &member
}
return nil
}
// GroupMembers returns a point-in-time snapshot of the members of by groupName
func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
serfAgent := a.Agent.Serf()
if serfAgent != nil {
for _, member := range serfAgent.Members() {
log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
if member.Tags[tagKeyClusterName] == groupName {
members = append(members, member)
}
}
}
return
}
// Member get member information with node
func (a *Agent) Member(node string) *serf.Member {
serfAgent := a.Agent.Serf()
if serfAgent != nil {
ms := serfAgent.Members()
for _, m := range ms {
if m.Name == node {
return &m
}
}
}
return nil
}
// SerfConfig get serf config
func (a *Agent) SerfConfig() *serf.Config {
return a.Agent.SerfConfig()
}
// Join serf clusters through one or more members
func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
return a.Agent.Join(addrs, replay)
}
// UserEvent sends a UserEvent on Serf
func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
return a.Agent.UserEvent(name, payload, coalesce)
}
// Query sends a Query on Serf
func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
return a.Agent.Query(name, payload, params)
}
func (a *Agent) retryJoin(ctx context.Context) (err error) {
if len(a.conf.RetryJoin) == 0 {
log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
return nil
}
// Count of attempts
attempt := 0
ticker := time.NewTicker(a.conf.RetryInterval)
for {
log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
var n int
// Try to join the specified serf nodes
n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
if err == nil {
log.Infof("serf: Join completed. Synced with %d initial agents", n)
break
}
attempt++
// If RetryMaxAttempts is greater than 0, agent will exit
// and throw an error when the number of attempts exceeds RetryMaxAttempts,
// else agent will try to join other nodes until successful always
if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
err = errors.New("serf: maximum retry join attempts made, exiting")
log.Errorf(err, err.Error())
break
}
select {
case <-ctx.Done():
err = ctx.Err()
goto done
// Waiting for ticker to trigger
case <-ticker.C:
}
}
done:
ticker.Stop()
return
}