| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package rafthttp |
| |
| import ( |
| "errors" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "net/http" |
| "sync" |
| "testing" |
| "time" |
| |
| "github.com/coreos/etcd/etcdserver/stats" |
| "github.com/coreos/etcd/pkg/testutil" |
| "github.com/coreos/etcd/pkg/types" |
| "github.com/coreos/etcd/raft/raftpb" |
| "github.com/coreos/etcd/version" |
| ) |
| |
| // TestPipelineSend tests that pipeline could send data using roundtripper |
| // and increase success count in stats. |
| func TestPipelineSend(t *testing.T) { |
| tr := &roundTripperRecorder{rec: testutil.NewRecorderStream()} |
| picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) |
| tp := &Transport{pipelineRt: tr} |
| p := startTestPipeline(tp, picker) |
| |
| p.msgc <- raftpb.Message{Type: raftpb.MsgApp} |
| tr.rec.Wait(1) |
| p.stop() |
| if p.followerStats.Counts.Success != 1 { |
| t.Errorf("success = %d, want 1", p.followerStats.Counts.Success) |
| } |
| } |
| |
| // TestPipelineKeepSendingWhenPostError tests that pipeline can keep |
| // sending messages if previous messages meet post error. |
| func TestPipelineKeepSendingWhenPostError(t *testing.T) { |
| tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")} |
| picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) |
| tp := &Transport{pipelineRt: tr} |
| p := startTestPipeline(tp, picker) |
| defer p.stop() |
| |
| for i := 0; i < 50; i++ { |
| p.msgc <- raftpb.Message{Type: raftpb.MsgApp} |
| } |
| |
| _, err := tr.rec.Wait(50) |
| if err != nil { |
| t.Errorf("unexpected wait error %v", err) |
| } |
| } |
| |
| func TestPipelineExceedMaximumServing(t *testing.T) { |
| rt := newRoundTripperBlocker() |
| picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) |
| tp := &Transport{pipelineRt: rt} |
| p := startTestPipeline(tp, picker) |
| defer p.stop() |
| |
| // keep the sender busy and make the buffer full |
| // nothing can go out as we block the sender |
| for i := 0; i < connPerPipeline+pipelineBufSize; i++ { |
| select { |
| case p.msgc <- raftpb.Message{}: |
| case <-time.After(time.Second): |
| t.Errorf("failed to send out message") |
| } |
| } |
| |
| // try to send a data when we are sure the buffer is full |
| select { |
| case p.msgc <- raftpb.Message{}: |
| t.Errorf("unexpected message sendout") |
| default: |
| } |
| |
| // unblock the senders and force them to send out the data |
| rt.unblock() |
| |
| // It could send new data after previous ones succeed |
| select { |
| case p.msgc <- raftpb.Message{}: |
| case <-time.After(time.Second): |
| t.Errorf("failed to send out message") |
| } |
| } |
| |
| // TestPipelineSendFailed tests that when send func meets the post error, |
| // it increases fail count in stats. |
| func TestPipelineSendFailed(t *testing.T) { |
| picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) |
| rt := newRespRoundTripper(0, errors.New("blah")) |
| rt.rec = testutil.NewRecorderStream() |
| tp := &Transport{pipelineRt: rt} |
| p := startTestPipeline(tp, picker) |
| |
| p.msgc <- raftpb.Message{Type: raftpb.MsgApp} |
| if _, err := rt.rec.Wait(1); err != nil { |
| t.Fatal(err) |
| } |
| |
| p.stop() |
| |
| if p.followerStats.Counts.Fail != 1 { |
| t.Errorf("fail = %d, want 1", p.followerStats.Counts.Fail) |
| } |
| } |
| |
| func TestPipelinePost(t *testing.T) { |
| tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}} |
| picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) |
| tp := &Transport{ClusterID: types.ID(1), pipelineRt: tr} |
| p := startTestPipeline(tp, picker) |
| if err := p.post([]byte("some data")); err != nil { |
| t.Fatalf("unexpected post error: %v", err) |
| } |
| act, err := tr.rec.Wait(1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| p.stop() |
| |
| req := act[0].Params[0].(*http.Request) |
| |
| if g := req.Method; g != "POST" { |
| t.Errorf("method = %s, want %s", g, "POST") |
| } |
| if g := req.URL.String(); g != "http://localhost:2380/raft" { |
| t.Errorf("url = %s, want %s", g, "http://localhost:2380/raft") |
| } |
| if g := req.Header.Get("Content-Type"); g != "application/protobuf" { |
| t.Errorf("content type = %s, want %s", g, "application/protobuf") |
| } |
| if g := req.Header.Get("X-Server-Version"); g != version.Version { |
| t.Errorf("version = %s, want %s", g, version.Version) |
| } |
| if g := req.Header.Get("X-Min-Cluster-Version"); g != version.MinClusterVersion { |
| t.Errorf("min version = %s, want %s", g, version.MinClusterVersion) |
| } |
| if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" { |
| t.Errorf("cluster id = %s, want %s", g, "1") |
| } |
| b, err := ioutil.ReadAll(req.Body) |
| if err != nil { |
| t.Fatalf("unexpected ReadAll error: %v", err) |
| } |
| if string(b) != "some data" { |
| t.Errorf("body = %s, want %s", b, "some data") |
| } |
| } |
| |
| func TestPipelinePostBad(t *testing.T) { |
| tests := []struct { |
| u string |
| code int |
| err error |
| }{ |
| // RoundTrip returns error |
| {"http://localhost:2380", 0, errors.New("blah")}, |
| // unexpected response status code |
| {"http://localhost:2380", http.StatusOK, nil}, |
| {"http://localhost:2380", http.StatusCreated, nil}, |
| } |
| for i, tt := range tests { |
| picker := mustNewURLPicker(t, []string{tt.u}) |
| tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)} |
| p := startTestPipeline(tp, picker) |
| err := p.post([]byte("some data")) |
| p.stop() |
| |
| if err == nil { |
| t.Errorf("#%d: err = nil, want not nil", i) |
| } |
| } |
| } |
| |
| func TestPipelinePostErrorc(t *testing.T) { |
| tests := []struct { |
| u string |
| code int |
| err error |
| }{ |
| {"http://localhost:2380", http.StatusForbidden, nil}, |
| } |
| for i, tt := range tests { |
| picker := mustNewURLPicker(t, []string{tt.u}) |
| tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)} |
| p := startTestPipeline(tp, picker) |
| p.post([]byte("some data")) |
| p.stop() |
| select { |
| case <-p.errorc: |
| default: |
| t.Fatalf("#%d: cannot receive from errorc", i) |
| } |
| } |
| } |
| |
| func TestStopBlockedPipeline(t *testing.T) { |
| picker := mustNewURLPicker(t, []string{"http://localhost:2380"}) |
| tp := &Transport{pipelineRt: newRoundTripperBlocker()} |
| p := startTestPipeline(tp, picker) |
| // send many messages that most of them will be blocked in buffer |
| for i := 0; i < connPerPipeline*10; i++ { |
| p.msgc <- raftpb.Message{} |
| } |
| |
| done := make(chan struct{}) |
| go func() { |
| p.stop() |
| done <- struct{}{} |
| }() |
| select { |
| case <-done: |
| case <-time.After(time.Second): |
| t.Fatalf("failed to stop pipeline in 1s") |
| } |
| } |
| |
| type roundTripperBlocker struct { |
| unblockc chan struct{} |
| mu sync.Mutex |
| cancel map[*http.Request]chan struct{} |
| } |
| |
| func newRoundTripperBlocker() *roundTripperBlocker { |
| return &roundTripperBlocker{ |
| unblockc: make(chan struct{}), |
| cancel: make(map[*http.Request]chan struct{}), |
| } |
| } |
| |
| func (t *roundTripperBlocker) unblock() { |
| close(t.unblockc) |
| } |
| |
| func (t *roundTripperBlocker) CancelRequest(req *http.Request) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| if c, ok := t.cancel[req]; ok { |
| c <- struct{}{} |
| delete(t.cancel, req) |
| } |
| } |
| |
| type respRoundTripper struct { |
| mu sync.Mutex |
| rec testutil.Recorder |
| |
| code int |
| header http.Header |
| err error |
| } |
| |
| func newRespRoundTripper(code int, err error) *respRoundTripper { |
| return &respRoundTripper{code: code, err: err} |
| } |
| func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| if t.rec != nil { |
| t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}}) |
| } |
| return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err |
| } |
| |
| type roundTripperRecorder struct { |
| rec testutil.Recorder |
| } |
| |
| func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) { |
| if t.rec != nil { |
| t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}}) |
| } |
| return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil |
| } |
| |
| type nopReadCloser struct{} |
| |
| func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF } |
| func (n *nopReadCloser) Close() error { return nil } |
| |
| func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline { |
| p := &pipeline{ |
| peerID: types.ID(1), |
| tr: tr, |
| picker: picker, |
| status: newPeerStatus(types.ID(1)), |
| raft: &fakeRaft{}, |
| followerStats: &stats.FollowerStats{}, |
| errorc: make(chan error, 1), |
| } |
| p.start() |
| return p |
| } |