| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package serf |
| |
| import ( |
| "context" |
| "errors" |
| "time" |
| |
| "github.com/apache/servicecomb-service-center/pkg/log" |
| "github.com/hashicorp/serf/cmd/serf/command/agent" |
| "github.com/hashicorp/serf/serf" |
| ) |
| |
| // Agent warps the serf agent |
| type Agent struct { |
| *agent.Agent |
| conf *Config |
| readyCh chan struct{} |
| errorCh chan error |
| } |
| |
| // Create create serf agent with config |
| func Create(conf *Config) (*Agent, error) { |
| // config cover to serf config |
| serfConf, err := conf.convertToSerf() |
| if err != nil { |
| return nil, err |
| } |
| |
| // create serf agent with serf config |
| serfAgent, err := agent.Create(conf.Config, serfConf, nil) |
| if err != nil { |
| return nil, err |
| } |
| return &Agent{ |
| Agent: serfAgent, |
| conf: conf, |
| readyCh: make(chan struct{}), |
| errorCh: make(chan error), |
| }, nil |
| } |
| |
| // Start agent |
| func (a *Agent) Start(ctx context.Context) { |
| err := a.Agent.Start() |
| if err != nil { |
| log.Errorf(err, "start serf agent failed") |
| a.errorCh <- err |
| return |
| } |
| a.RegisterEventHandler(a) |
| |
| err = a.retryJoin(ctx) |
| if err != nil { |
| log.Errorf(err, "start serf agent failed") |
| if err != ctx.Err() && a.errorCh != nil { |
| a.errorCh <- err |
| } |
| } |
| } |
| |
| // HandleEvent Handles serf.EventMemberJoin events, |
| // which will wait for members to join until the number of group members is equal to "groupExpect" |
| // when the startup mode is "ModeCluster", |
| // used for logical grouping of serf nodes |
| func (a *Agent) HandleEvent(event serf.Event) { |
| if event.EventType() != serf.EventMemberJoin { |
| return |
| } |
| |
| if a.conf.Mode == ModeCluster { |
| if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect { |
| return |
| } |
| } |
| a.DeregisterEventHandler(a) |
| close(a.readyCh) |
| } |
| |
| // Ready Returns a channel that will be closed when serf is ready |
| func (a *Agent) Ready() <-chan struct{} { |
| return a.readyCh |
| } |
| |
| // Error Returns a channel that will be transmit a serf error |
| func (a *Agent) Error() <-chan error { |
| return a.errorCh |
| } |
| |
| // Stop serf agent |
| func (a *Agent) Stop() { |
| if a.errorCh != nil { |
| a.Leave() |
| a.Shutdown() |
| close(a.errorCh) |
| a.errorCh = nil |
| } |
| } |
| |
| // LocalMember returns the Member information for the local node |
| func (a *Agent) LocalMember() *serf.Member { |
| serfAgent := a.Agent.Serf() |
| if serfAgent != nil { |
| member := serfAgent.LocalMember() |
| return &member |
| } |
| return nil |
| } |
| |
| // GroupMembers returns a point-in-time snapshot of the members of by groupName |
| func (a *Agent) GroupMembers(groupName string) (members []serf.Member) { |
| serfAgent := a.Agent.Serf() |
| if serfAgent != nil { |
| for _, member := range serfAgent.Members() { |
| log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName]) |
| if member.Tags[tagKeyClusterName] == groupName { |
| members = append(members, member) |
| } |
| } |
| } |
| return |
| } |
| |
| // Member get member information with node |
| func (a *Agent) Member(node string) *serf.Member { |
| serfAgent := a.Agent.Serf() |
| if serfAgent != nil { |
| ms := serfAgent.Members() |
| for _, m := range ms { |
| if m.Name == node { |
| return &m |
| } |
| } |
| } |
| return nil |
| } |
| |
| // SerfConfig get serf config |
| func (a *Agent) SerfConfig() *serf.Config { |
| return a.Agent.SerfConfig() |
| } |
| |
| // Join serf clusters through one or more members |
| func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { |
| return a.Agent.Join(addrs, replay) |
| } |
| |
| // UserEvent sends a UserEvent on Serf |
| func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error { |
| return a.Agent.UserEvent(name, payload, coalesce) |
| } |
| |
| // Query sends a Query on Serf |
| func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { |
| return a.Agent.Query(name, payload, params) |
| } |
| |
| func (a *Agent) retryJoin(ctx context.Context) (err error) { |
| if len(a.conf.RetryJoin) == 0 { |
| log.Infof("retry join mumber %d", len(a.conf.RetryJoin)) |
| return nil |
| } |
| |
| // Count of attempts |
| attempt := 0 |
| ticker := time.NewTicker(a.conf.RetryInterval) |
| for { |
| log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin) |
| var n int |
| |
| // Try to join the specified serf nodes |
| n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin) |
| if err == nil { |
| log.Infof("serf: Join completed. Synced with %d initial agents", n) |
| break |
| } |
| attempt++ |
| |
| // If RetryMaxAttempts is greater than 0, agent will exit |
| // and throw an error when the number of attempts exceeds RetryMaxAttempts, |
| // else agent will try to join other nodes until successful always |
| if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts { |
| err = errors.New("serf: maximum retry join attempts made, exiting") |
| log.Errorf(err, err.Error()) |
| break |
| } |
| select { |
| case <-ctx.Done(): |
| err = ctx.Err() |
| goto done |
| // Waiting for ticker to trigger |
| case <-ticker.C: |
| } |
| } |
| done: |
| ticker.Stop() |
| return |
| } |