| // Copyright 2018 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package tester |
| |
| import ( |
| "context" |
| "fmt" |
| "strings" |
| "time" |
| |
| "github.com/coreos/etcd/clientv3" |
| "github.com/coreos/etcd/functional/rpcpb" |
| |
| "go.uber.org/zap" |
| ) |
| |
| type fetchSnapshotCaseQuorum struct { |
| desc string |
| rpcpbCase rpcpb.Case |
| injected map[int]struct{} |
| snapshotted int |
| } |
| |
| func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error { |
| // 1. Assume node C is the current leader with most up-to-date data. |
| lead, err := clus.GetLeader() |
| if err != nil { |
| return err |
| } |
| c.snapshotted = lead |
| |
| // 2. Download snapshot from node C, before destroying node A and B. |
| clus.lg.Info( |
| "save snapshot on leader node START", |
| zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), |
| ) |
| var resp *rpcpb.Response |
| resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_SAVE_SNAPSHOT) |
| if resp == nil || (resp != nil && !resp.Success) || err != nil { |
| clus.lg.Info( |
| "save snapshot on leader node FAIL", |
| zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), |
| zap.Error(err), |
| ) |
| return err |
| } |
| clus.lg.Info( |
| "save snapshot on leader node SUCCESS", |
| zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), |
| zap.String("member-name", resp.SnapshotInfo.MemberName), |
| zap.Strings("member-client-urls", resp.SnapshotInfo.MemberClientURLs), |
| zap.String("snapshot-path", resp.SnapshotInfo.SnapshotPath), |
| zap.String("snapshot-file-size", resp.SnapshotInfo.SnapshotFileSize), |
| zap.String("snapshot-total-size", resp.SnapshotInfo.SnapshotTotalSize), |
| zap.Int64("snapshot-total-key", resp.SnapshotInfo.SnapshotTotalKey), |
| zap.Int64("snapshot-hash", resp.SnapshotInfo.SnapshotHash), |
| zap.Int64("snapshot-revision", resp.SnapshotInfo.SnapshotRevision), |
| zap.String("took", resp.SnapshotInfo.Took), |
| zap.Error(err), |
| ) |
| if err != nil { |
| return err |
| } |
| clus.Members[lead].SnapshotInfo = resp.SnapshotInfo |
| |
| leaderc, err := clus.Members[lead].CreateEtcdClient() |
| if err != nil { |
| return err |
| } |
| defer leaderc.Close() |
| var mresp *clientv3.MemberListResponse |
| mresp, err = leaderc.MemberList(context.Background()) |
| mss := []string{} |
| if err == nil && mresp != nil { |
| mss = describeMembers(mresp) |
| } |
| clus.lg.Info( |
| "member list before disastrous machine failure", |
| zap.String("request-to", clus.Members[lead].EtcdClientEndpoint), |
| zap.Strings("members", mss), |
| zap.Error(err), |
| ) |
| if err != nil { |
| return err |
| } |
| |
| // simulate real life; machine failures may happen |
| // after some time since last snapshot save |
| time.Sleep(time.Second) |
| |
| // 3. Destroy node A and B, and make the whole cluster inoperable. |
| for { |
| c.injected = pickQuorum(len(clus.Members)) |
| if _, ok := c.injected[lead]; !ok { |
| break |
| } |
| } |
| for idx := range c.injected { |
| clus.lg.Info( |
| "disastrous machine failure to quorum START", |
| zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), |
| ) |
| err = clus.sendOp(idx, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA) |
| clus.lg.Info( |
| "disastrous machine failure to quorum END", |
| zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), |
| zap.Error(err), |
| ) |
| if err != nil { |
| return err |
| } |
| } |
| |
| // 4. Now node C cannot operate either. |
| // 5. SIGTERM node C and remove its data directories. |
| clus.lg.Info( |
| "disastrous machine failure to old leader START", |
| zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), |
| ) |
| err = clus.sendOp(lead, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA) |
| clus.lg.Info( |
| "disastrous machine failure to old leader END", |
| zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint), |
| zap.Error(err), |
| ) |
| return err |
| } |
| |
| func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error { |
| // 6. Restore a new seed member from node C's latest snapshot file. |
| oldlead := c.snapshotted |
| |
| // configuration on restart from recovered snapshot |
| // seed member's configuration is all the same as previous one |
| // except initial cluster string is now a single-node cluster |
| clus.Members[oldlead].EtcdOnSnapshotRestore = clus.Members[oldlead].Etcd |
| clus.Members[oldlead].EtcdOnSnapshotRestore.InitialClusterState = "existing" |
| name := clus.Members[oldlead].Etcd.Name |
| initClus := []string{} |
| for _, u := range clus.Members[oldlead].Etcd.AdvertisePeerURLs { |
| initClus = append(initClus, fmt.Sprintf("%s=%s", name, u)) |
| } |
| clus.Members[oldlead].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",") |
| |
| clus.lg.Info( |
| "restore snapshot and restart from snapshot request START", |
| zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint), |
| zap.Strings("initial-cluster", initClus), |
| ) |
| err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT) |
| clus.lg.Info( |
| "restore snapshot and restart from snapshot request END", |
| zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint), |
| zap.Strings("initial-cluster", initClus), |
| zap.Error(err), |
| ) |
| if err != nil { |
| return err |
| } |
| |
| leaderc, err := clus.Members[oldlead].CreateEtcdClient() |
| if err != nil { |
| return err |
| } |
| defer leaderc.Close() |
| |
| // 7. Add another member to establish 2-node cluster. |
| // 8. Add another member to establish 3-node cluster. |
| // 9. Add more if any. |
| idxs := make([]int, 0, len(c.injected)) |
| for idx := range c.injected { |
| idxs = append(idxs, idx) |
| } |
| clus.lg.Info("member add START", zap.Int("members-to-add", len(idxs))) |
| for i, idx := range idxs { |
| clus.lg.Info( |
| "member add request SENT", |
| zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), |
| zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs), |
| ) |
| ctx, cancel := context.WithTimeout(context.Background(), time.Minute) |
| _, err := leaderc.MemberAdd(ctx, clus.Members[idx].Etcd.AdvertisePeerURLs) |
| cancel() |
| clus.lg.Info( |
| "member add request DONE", |
| zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), |
| zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs), |
| zap.Error(err), |
| ) |
| if err != nil { |
| return err |
| } |
| |
| // start the added(new) member with fresh data |
| clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd |
| clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing" |
| name := clus.Members[idx].Etcd.Name |
| for _, u := range clus.Members[idx].Etcd.AdvertisePeerURLs { |
| initClus = append(initClus, fmt.Sprintf("%s=%s", name, u)) |
| } |
| clus.Members[idx].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",") |
| clus.lg.Info( |
| "restart from snapshot request SENT", |
| zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), |
| zap.Strings("initial-cluster", initClus), |
| ) |
| err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT) |
| clus.lg.Info( |
| "restart from snapshot request DONE", |
| zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint), |
| zap.Strings("initial-cluster", initClus), |
| zap.Error(err), |
| ) |
| if err != nil { |
| return err |
| } |
| |
| if i != len(c.injected)-1 { |
| // wait until membership reconfiguration entry gets applied |
| // TODO: test concurrent member add |
| dur := 5 * clus.Members[idx].ElectionTimeout() |
| clus.lg.Info( |
| "waiting after restart from snapshot request", |
| zap.Int("i", i), |
| zap.Int("idx", idx), |
| zap.Duration("sleep", dur), |
| ) |
| time.Sleep(dur) |
| } else { |
| clus.lg.Info( |
| "restart from snapshot request ALL END", |
| zap.Int("i", i), |
| zap.Int("idx", idx), |
| ) |
| } |
| } |
| return nil |
| } |
| |
| func (c *fetchSnapshotCaseQuorum) Desc() string { |
| if c.desc != "" { |
| return c.desc |
| } |
| return c.rpcpbCase.String() |
| } |
| |
| func (c *fetchSnapshotCaseQuorum) TestCase() rpcpb.Case { |
| return c.rpcpbCase |
| } |
| |
| func new_Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH(clus *Cluster) Case { |
| c := &fetchSnapshotCaseQuorum{ |
| rpcpbCase: rpcpb.Case_SIGQUIT_AND_REMOVE_QUORUM_AND_RESTORE_LEADER_SNAPSHOT_FROM_SCRATCH, |
| injected: make(map[int]struct{}), |
| snapshotted: -1, |
| } |
| // simulate real life; machine replacements may happen |
| // after some time since disaster |
| return &caseDelay{ |
| Case: c, |
| delayDuration: clus.GetCaseDelayDuration(), |
| } |
| } |