blob: c9cd2b3d69c2c2cc745735b2fc4797f74ec271d5 [file] [log] [blame]
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"reflect"
"sync"
"testing"
"time"
"golang.org/x/time/rate"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/version"
"github.com/coreos/go-semver/semver"
)
// TestStreamWriterAttachOutgoingConn tests that outgoingConn can be attached
// to streamWriter. After that, streamWriter can use it to send messages
// continuously, and closes it when stopped.
func TestStreamWriterAttachOutgoingConn(t *testing.T) {
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
// the expected initial state of streamWriter is not working
if _, ok := sw.writec(); ok {
t.Errorf("initial working status = %v, want false", ok)
}
// repeat tests to ensure streamWriter can use last attached connection
var wfc *fakeWriteFlushCloser
for i := 0; i < 3; i++ {
prevwfc := wfc
wfc = newFakeWriteFlushCloser(nil)
sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
// previous attached connection should be closed
if prevwfc != nil {
select {
case <-prevwfc.closed:
case <-time.After(time.Second):
t.Errorf("#%d: close of previous connection timed out", i)
}
}
// if prevwfc != nil, the new msgc is ready since prevwfc has closed
// if prevwfc == nil, the first connection may be pending, but the first
// msgc is already available since it's set on calling startStreamwriter
msgc, _ := sw.writec()
msgc <- raftpb.Message{}
select {
case <-wfc.writec:
case <-time.After(time.Second):
t.Errorf("#%d: failed to write to the underlying connection", i)
}
// write chan is still available
if _, ok := sw.writec(); !ok {
t.Errorf("#%d: working status = %v, want true", i, ok)
}
}
sw.stop()
// write chan is unavailable since the writer is stopped.
if _, ok := sw.writec(); ok {
t.Errorf("working status after stop = %v, want false", ok)
}
if !wfc.Closed() {
t.Errorf("failed to close the underlying connection")
}
}
// TestStreamWriterAttachBadOutgoingConn tests that streamWriter with bad
// outgoingConn will close the outgoingConn and fall back to non-working status.
func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
defer sw.stop()
wfc := newFakeWriteFlushCloser(errors.New("blah"))
sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
sw.msgc <- raftpb.Message{}
select {
case <-wfc.closed:
case <-time.After(time.Second):
t.Errorf("failed to close the underlying connection in time")
}
// no longer working
if _, ok := sw.writec(); ok {
t.Errorf("working = %v, want false", ok)
}
}
func TestStreamReaderDialRequest(t *testing.T) {
for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
ctx: context.Background(),
}
sr.dial(tt)
act, err := tr.rec.Wait(1)
if err != nil {
t.Fatal(err)
}
req := act[0].Params[0].(*http.Request)
wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint() + "/1")
if req.URL.String() != wurl {
t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
}
if w := "GET"; req.Method != w {
t.Errorf("#%d: method = %s, want %s", i, req.Method, w)
}
if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g)
}
if g := req.Header.Get("X-Raft-To"); g != "2" {
t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
}
}
}
// TestStreamReaderDialResult tests the result of the dial func call meets the
// HTTP response received.
func TestStreamReaderDialResult(t *testing.T) {
tests := []struct {
code int
err error
wok bool
whalt bool
}{
{0, errors.New("blah"), false, false},
{http.StatusOK, nil, true, false},
{http.StatusMethodNotAllowed, nil, false, false},
{http.StatusNotFound, nil, false, false},
{http.StatusPreconditionFailed, nil, false, false},
{http.StatusGone, nil, false, true},
}
for i, tt := range tests {
h := http.Header{}
h.Add("X-Server-Version", version.Version)
tr := &respRoundTripper{
code: tt.code,
header: h,
err: tt.err,
}
sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
ctx: context.Background(),
}
_, err := sr.dial(streamTypeMessage)
if ok := err == nil; ok != tt.wok {
t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
}
if halt := len(sr.errorc) > 0; halt != tt.whalt {
t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt)
}
}
}
// TestStreamReaderStopOnDial tests a stream reader closes the connection on stop.
func TestStreamReaderStopOnDial(t *testing.T) {
defer testutil.AfterTest(t)
h := http.Header{}
h.Add("X-Server-Version", version.Version)
tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
typ: streamTypeMessage,
status: newPeerStatus(types.ID(2)),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
tr.onResp = func() {
// stop() waits for the run() goroutine to exit, but that exit
// needs a response from RoundTrip() first; use goroutine
go sr.stop()
// wait so that stop() is blocked on run() exiting
time.Sleep(10 * time.Millisecond)
// sr.run() completes dialing then begins decoding while stopped
}
sr.start()
select {
case <-sr.done:
case <-time.After(time.Second):
t.Fatal("streamReader did not stop in time")
}
}
type respWaitRoundTripper struct {
rrt *respRoundTripper
onResp func()
}
func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := t.rrt.RoundTrip(req)
resp.Body = newWaitReadCloser()
t.onResp()
return resp, err
}
type waitReadCloser struct{ closec chan struct{} }
func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
func (wrc *waitReadCloser) Read(p []byte) (int, error) {
<-wrc.closec
return 0, io.EOF
}
func (wrc *waitReadCloser) Close() error {
close(wrc.closec)
return nil
}
// TestStreamReaderDialDetectUnsupport tests that dial func could find
// out that the stream type is not supported by the remote.
func TestStreamReaderDialDetectUnsupport(t *testing.T) {
for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
// the response from etcd 2.0
tr := &respRoundTripper{
code: http.StatusNotFound,
header: http.Header{},
}
sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
ctx: context.Background(),
}
_, err := sr.dial(typ)
if err != errUnsupportedStreamType {
t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
}
}
}
// TestStream tests that streamReader and streamWriter can build stream to
// send messages between each other.
func TestStream(t *testing.T) {
recvc := make(chan raftpb.Message, streamBufSize)
propc := make(chan raftpb.Message, streamBufSize)
msgapp := raftpb.Message{
Type: raftpb.MsgApp,
From: 2,
To: 1,
Term: 1,
LogTerm: 1,
Index: 3,
Entries: []raftpb.Entry{{Term: 1, Index: 4}},
}
tests := []struct {
t streamType
m raftpb.Message
wc chan raftpb.Message
}{
{
streamTypeMessage,
raftpb.Message{Type: raftpb.MsgProp, To: 2},
propc,
},
{
streamTypeMessage,
msgapp,
recvc,
},
{
streamTypeMsgAppV2,
msgapp,
recvc,
},
}
for i, tt := range tests {
h := &fakeStreamHandler{t: tt.t}
srv := httptest.NewServer(h)
defer srv.Close()
sw := startStreamWriter(types.ID(1), newPeerStatus(types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
defer sw.stop()
h.sw = sw
picker := mustNewURLPicker(t, []string{srv.URL})
tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
sr := &streamReader{
peerID: types.ID(2),
typ: tt.t,
tr: tr,
picker: picker,
status: newPeerStatus(types.ID(2)),
recvc: recvc,
propc: propc,
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
sr.start()
// wait for stream to work
var writec chan<- raftpb.Message
for {
var ok bool
if writec, ok = sw.writec(); ok {
break
}
time.Sleep(time.Millisecond)
}
writec <- tt.m
var m raftpb.Message
select {
case m = <-tt.wc:
case <-time.After(time.Second):
t.Fatalf("#%d: failed to receive message from the channel", i)
}
if !reflect.DeepEqual(m, tt.m) {
t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m)
}
sr.stop()
}
}
func TestCheckStreamSupport(t *testing.T) {
tests := []struct {
v *semver.Version
t streamType
w bool
}{
// support
{
semver.Must(semver.NewVersion("2.1.0")),
streamTypeMsgAppV2,
true,
},
// ignore patch
{
semver.Must(semver.NewVersion("2.1.9")),
streamTypeMsgAppV2,
true,
},
// ignore prerelease
{
semver.Must(semver.NewVersion("2.1.0-alpha")),
streamTypeMsgAppV2,
true,
},
}
for i, tt := range tests {
if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
}
}
}
type fakeWriteFlushCloser struct {
mu sync.Mutex
err error
written int
closed chan struct{}
writec chan struct{}
}
func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser {
return &fakeWriteFlushCloser{
err: err,
closed: make(chan struct{}),
writec: make(chan struct{}, 1),
}
}
func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) {
wfc.mu.Lock()
defer wfc.mu.Unlock()
select {
case wfc.writec <- struct{}{}:
default:
}
wfc.written += len(p)
return len(p), wfc.err
}
func (wfc *fakeWriteFlushCloser) Flush() {}
func (wfc *fakeWriteFlushCloser) Close() error {
close(wfc.closed)
return wfc.err
}
func (wfc *fakeWriteFlushCloser) Written() int {
wfc.mu.Lock()
defer wfc.mu.Unlock()
return wfc.written
}
func (wfc *fakeWriteFlushCloser) Closed() bool {
select {
case <-wfc.closed:
return true
default:
return false
}
}
type fakeStreamHandler struct {
t streamType
sw *streamWriter
}
func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Add("X-Server-Version", version.Version)
w.(http.Flusher).Flush()
c := newCloseNotifier()
h.sw.attach(&outgoingConn{
t: h.t,
Writer: w,
Flusher: w.(http.Flusher),
Closer: c,
})
<-c.closeNotify()
}