| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package rafthttp |
| |
| import ( |
| "context" |
| "errors" |
| "fmt" |
| "io" |
| "net/http" |
| "net/http/httptest" |
| "reflect" |
| "sync" |
| "testing" |
| "time" |
| |
| "golang.org/x/time/rate" |
| |
| "github.com/coreos/etcd/etcdserver/stats" |
| "github.com/coreos/etcd/pkg/testutil" |
| "github.com/coreos/etcd/pkg/types" |
| "github.com/coreos/etcd/raft/raftpb" |
| "github.com/coreos/etcd/version" |
| "github.com/coreos/go-semver/semver" |
| ) |
| |
| // TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached |
| // to streamWriter. After that, streamWriter can use it to send messages |
| // continuously, and closes it when stopped. |
| func TestStreamWriterAttachOutgoingConn(t *testing.T) { |
| sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) |
| // the expected initial state of streamWriter is not working |
| if _, ok := sw.writec(); ok { |
| t.Errorf("initial working status = %v, want false", ok) |
| } |
| |
| // repeat tests to ensure streamWriter can use last attached connection |
| var wfc *fakeWriteFlushCloser |
| for i := 0; i < 3; i++ { |
| prevwfc := wfc |
| wfc = newFakeWriteFlushCloser(nil) |
| sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) |
| |
| // previous attached connection should be closed |
| if prevwfc != nil { |
| select { |
| case <-prevwfc.closed: |
| case <-time.After(time.Second): |
| t.Errorf("#%d: close of previous connection timed out", i) |
| } |
| } |
| |
| // if prevwfc != nil, the new msgc is ready since prevwfc has closed |
| // if prevwfc == nil, the first connection may be pending, but the first |
| // msgc is already available since it's set on calling startStreamwriter |
| msgc, _ := sw.writec() |
| msgc <- raftpb.Message{} |
| |
| select { |
| case <-wfc.writec: |
| case <-time.After(time.Second): |
| t.Errorf("#%d: failed to write to the underlying connection", i) |
| } |
| // write chan is still available |
| if _, ok := sw.writec(); !ok { |
| t.Errorf("#%d: working status = %v, want true", i, ok) |
| } |
| } |
| |
| sw.stop() |
| // write chan is unavailable since the writer is stopped. |
| if _, ok := sw.writec(); ok { |
| t.Errorf("working status after stop = %v, want false", ok) |
| } |
| if !wfc.Closed() { |
| t.Errorf("failed to close the underlying connection") |
| } |
| } |
| |
| // TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad |
| // outgoingConn will close the outgoingConn and fall back to non-working status. |
| func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { |
| sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) |
| defer sw.stop() |
| wfc := newFakeWriteFlushCloser(errors.New("blah")) |
| sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc}) |
| |
| sw.msgc <- raftpb.Message{} |
| select { |
| case <-wfc.closed: |
| case <-time.After(time.Second): |
| t.Errorf("failed to close the underlying connection in time") |
| } |
| // no longer working |
| if _, ok := sw.writec(); ok { |
| t.Errorf("working = %v, want false", ok) |
| } |
| } |
| |
| func TestStreamReaderDialRequest(t *testing.T) { |
| for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} { |
| tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}} |
| sr := &streamReader{ |
| peerID: types.ID(2), |
| tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, |
| picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), |
| ctx: context.Background(), |
| } |
| sr.dial(tt) |
| |
| act, err := tr.rec.Wait(1) |
| if err != nil { |
| t.Fatal(err) |
| } |
| req := act[0].Params[0].(*http.Request) |
| |
| wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1") |
| if req.URL.String() != wurl { |
| t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl) |
| } |
| if w := "GET"; req.Method != w { |
| t.Errorf("#%d: method = %s, want %s", i, req.Method, w) |
| } |
| if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" { |
| t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g) |
| } |
| if g := req.Header.Get("X-Raft-To"); g != "2" { |
| t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g) |
| } |
| } |
| } |
| |
| // TestStreamReaderDialResult tests the result of the dial func call meets the |
| // HTTP response received. |
| func TestStreamReaderDialResult(t *testing.T) { |
| tests := []struct { |
| code int |
| err error |
| wok bool |
| whalt bool |
| }{ |
| {0, errors.New("blah"), false, false}, |
| {http.StatusOK, nil, true, false}, |
| {http.StatusMethodNotAllowed, nil, false, false}, |
| {http.StatusNotFound, nil, false, false}, |
| {http.StatusPreconditionFailed, nil, false, false}, |
| {http.StatusGone, nil, false, true}, |
| } |
| for i, tt := range tests { |
| h := http.Header{} |
| h.Add("X-Server-Version", version.Version) |
| tr := &respRoundTripper{ |
| code: tt.code, |
| header: h, |
| err: tt.err, |
| } |
| sr := &streamReader{ |
| peerID: types.ID(2), |
| tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, |
| picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), |
| errorc: make(chan error, 1), |
| ctx: context.Background(), |
| } |
| |
| _, err := sr.dial(streamTypeMessage) |
| if ok := err == nil; ok != tt.wok { |
| t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok) |
| } |
| if halt := len(sr.errorc) > 0; halt != tt.whalt { |
| t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt) |
| } |
| } |
| } |
| |
| // TestStreamReaderStopOnDial tests a stream reader closes the connection on stop. |
| func TestStreamReaderStopOnDial(t *testing.T) { |
| defer testutil.AfterTest(t) |
| h := http.Header{} |
| h.Add("X-Server-Version", version.Version) |
| tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}} |
| sr := &streamReader{ |
| peerID: types.ID(2), |
| tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, |
| picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), |
| errorc: make(chan error, 1), |
| typ: streamTypeMessage, |
| status: newPeerStatus(types.ID(2)), |
| rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), |
| } |
| tr.onResp = func() { |
| // stop() waits for the run() goroutine to exit, but that exit |
| // needs a response from RoundTrip() first; use goroutine |
| go sr.stop() |
| // wait so that stop() is blocked on run() exiting |
| time.Sleep(10 * time.Millisecond) |
| // sr.run() completes dialing then begins decoding while stopped |
| } |
| sr.start() |
| select { |
| case <-sr.done: |
| case <-time.After(time.Second): |
| t.Fatal("streamReader did not stop in time") |
| } |
| } |
| |
| type respWaitRoundTripper struct { |
| rrt *respRoundTripper |
| onResp func() |
| } |
| |
| func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { |
| resp, err := t.rrt.RoundTrip(req) |
| resp.Body = newWaitReadCloser() |
| t.onResp() |
| return resp, err |
| } |
| |
| type waitReadCloser struct{ closec chan struct{} } |
| |
| func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} } |
| func (wrc *waitReadCloser) Read(p []byte) (int, error) { |
| <-wrc.closec |
| return 0, io.EOF |
| } |
| func (wrc *waitReadCloser) Close() error { |
| close(wrc.closec) |
| return nil |
| } |
| |
| // TestStreamReaderDialDetectUnsupport tests that dial func could find |
| // out that the stream type is not supported by the remote. |
| func TestStreamReaderDialDetectUnsupport(t *testing.T) { |
| for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} { |
| // the response from etcd 2.0 |
| tr := &respRoundTripper{ |
| code: http.StatusNotFound, |
| header: http.Header{}, |
| } |
| sr := &streamReader{ |
| peerID: types.ID(2), |
| tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, |
| picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), |
| ctx: context.Background(), |
| } |
| |
| _, err := sr.dial(typ) |
| if err != errUnsupportedStreamType { |
| t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType) |
| } |
| } |
| } |
| |
| // TestStream tests that streamReader and streamWriter can build stream to |
| // send messages between each other. |
| func TestStream(t *testing.T) { |
| recvc := make(chan raftpb.Message, streamBufSize) |
| propc := make(chan raftpb.Message, streamBufSize) |
| msgapp := raftpb.Message{ |
| Type: raftpb.MsgApp, |
| From: 2, |
| To: 1, |
| Term: 1, |
| LogTerm: 1, |
| Index: 3, |
| Entries: []raftpb.Entry{{Term: 1, Index: 4}}, |
| } |
| |
| tests := []struct { |
| t streamType |
| m raftpb.Message |
| wc chan raftpb.Message |
| }{ |
| { |
| streamTypeMessage, |
| raftpb.Message{Type: raftpb.MsgProp, To: 2}, |
| propc, |
| }, |
| { |
| streamTypeMessage, |
| msgapp, |
| recvc, |
| }, |
| { |
| streamTypeMsgAppV2, |
| msgapp, |
| recvc, |
| }, |
| } |
| for i, tt := range tests { |
| h := &fakeStreamHandler{t: tt.t} |
| srv := httptest.NewServer(h) |
| defer srv.Close() |
| |
| sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{}) |
| defer sw.stop() |
| h.sw = sw |
| |
| picker := mustNewURLPicker(t, []string{srv.URL}) |
| tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)} |
| |
| sr := &streamReader{ |
| peerID: types.ID(2), |
| typ: tt.t, |
| tr: tr, |
| picker: picker, |
| status: newPeerStatus(types.ID(2)), |
| recvc: recvc, |
| propc: propc, |
| rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), |
| } |
| sr.start() |
| |
| // wait for stream to work |
| var writec chan<- raftpb.Message |
| for { |
| var ok bool |
| if writec, ok = sw.writec(); ok { |
| break |
| } |
| time.Sleep(time.Millisecond) |
| } |
| |
| writec <- tt.m |
| var m raftpb.Message |
| select { |
| case m = <-tt.wc: |
| case <-time.After(time.Second): |
| t.Fatalf("#%d: failed to receive message from the channel", i) |
| } |
| if !reflect.DeepEqual(m, tt.m) { |
| t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m) |
| } |
| |
| sr.stop() |
| } |
| } |
| |
| func TestCheckStreamSupport(t *testing.T) { |
| tests := []struct { |
| v *semver.Version |
| t streamType |
| w bool |
| }{ |
| // support |
| { |
| semver.Must(semver.NewVersion("2.1.0")), |
| streamTypeMsgAppV2, |
| true, |
| }, |
| // ignore patch |
| { |
| semver.Must(semver.NewVersion("2.1.9")), |
| streamTypeMsgAppV2, |
| true, |
| }, |
| // ignore prerelease |
| { |
| semver.Must(semver.NewVersion("2.1.0-alpha")), |
| streamTypeMsgAppV2, |
| true, |
| }, |
| } |
| for i, tt := range tests { |
| if g := checkStreamSupport(tt.v, tt.t); g != tt.w { |
| t.Errorf("#%d: check = %v, want %v", i, g, tt.w) |
| } |
| } |
| } |
| |
| type fakeWriteFlushCloser struct { |
| mu sync.Mutex |
| err error |
| written int |
| closed chan struct{} |
| writec chan struct{} |
| } |
| |
| func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser { |
| return &fakeWriteFlushCloser{ |
| err: err, |
| closed: make(chan struct{}), |
| writec: make(chan struct{}, 1), |
| } |
| } |
| |
| func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) { |
| wfc.mu.Lock() |
| defer wfc.mu.Unlock() |
| select { |
| case wfc.writec <- struct{}{}: |
| default: |
| } |
| wfc.written += len(p) |
| return len(p), wfc.err |
| } |
| |
| func (wfc *fakeWriteFlushCloser) Flush() {} |
| |
| func (wfc *fakeWriteFlushCloser) Close() error { |
| close(wfc.closed) |
| return wfc.err |
| } |
| |
| func (wfc *fakeWriteFlushCloser) Written() int { |
| wfc.mu.Lock() |
| defer wfc.mu.Unlock() |
| return wfc.written |
| } |
| |
| func (wfc *fakeWriteFlushCloser) Closed() bool { |
| select { |
| case <-wfc.closed: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| type fakeStreamHandler struct { |
| t streamType |
| sw *streamWriter |
| } |
| |
| func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
| w.Header().Add("X-Server-Version", version.Version) |
| w.(http.Flusher).Flush() |
| c := newCloseNotifier() |
| h.sw.attach(&outgoingConn{ |
| t: h.t, |
| Writer: w, |
| Flusher: w.(http.Flusher), |
| Closer: c, |
| }) |
| <-c.closeNotify() |
| } |