| // Copyright 2017 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package clientv3 |
| |
| import ( |
| "context" |
| "errors" |
| "net" |
| "sync" |
| "testing" |
| "time" |
| |
| pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| "github.com/coreos/etcd/pkg/testutil" |
| |
| "google.golang.org/grpc" |
| ) |
| |
| var endpoints = []string{"localhost:2379", "localhost:22379", "localhost:32379"} |
| |
| func TestBalancerGetUnblocking(t *testing.T) { |
| hb := newHealthBalancer(endpoints, minHealthRetryDuration, func(string) (bool, error) { return true, nil }) |
| defer hb.Close() |
| if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { |
| t.Errorf("Initialize newHealthBalancer should have triggered Notify() chan, but it didn't") |
| } |
| unblockingOpts := grpc.BalancerGetOptions{BlockingWait: false} |
| |
| _, _, err := hb.Get(context.Background(), unblockingOpts) |
| if err != ErrNoAddrAvilable { |
| t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) |
| } |
| |
| down1 := hb.Up(grpc.Address{Addr: endpoints[1]}) |
| if addrs := <-hb.Notify(); len(addrs) != 1 { |
| t.Errorf("first Up() should have triggered balancer to send the first connected address via Notify chan so that other connections can be closed") |
| } |
| down2 := hb.Up(grpc.Address{Addr: endpoints[2]}) |
| addrFirst, putFun, err := hb.Get(context.Background(), unblockingOpts) |
| if err != nil { |
| t.Errorf("Get() with up endpoints should success, got %v", err) |
| } |
| if addrFirst.Addr != endpoints[1] { |
| t.Errorf("Get() didn't return expected address, got %v", addrFirst) |
| } |
| if putFun == nil { |
| t.Errorf("Get() returned unexpected nil put function") |
| } |
| addrSecond, _, _ := hb.Get(context.Background(), unblockingOpts) |
| if addrFirst.Addr != addrSecond.Addr { |
| t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) |
| } |
| |
| down1(errors.New("error")) |
| if addrs := <-hb.Notify(); len(addrs) != len(endpoints)-1 { // we call down on one endpoint |
| t.Errorf("closing the only connection should triggered balancer to send the %d endpoints via Notify chan so that we can establish a connection", len(endpoints)-1) |
| } |
| down2(errors.New("error")) |
| _, _, err = hb.Get(context.Background(), unblockingOpts) |
| if err != ErrNoAddrAvilable { |
| t.Errorf("Get() with no up endpoints should return ErrNoAddrAvailable, got: %v", err) |
| } |
| } |
| |
| func TestBalancerGetBlocking(t *testing.T) { |
| hb := newHealthBalancer(endpoints, minHealthRetryDuration, func(string) (bool, error) { return true, nil }) |
| defer hb.Close() |
| if addrs := <-hb.Notify(); len(addrs) != len(endpoints) { |
| t.Errorf("Initialize newHealthBalancer should have triggered Notify() chan, but it didn't") |
| } |
| blockingOpts := grpc.BalancerGetOptions{BlockingWait: true} |
| |
| ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) |
| _, _, err := hb.Get(ctx, blockingOpts) |
| cancel() |
| if err != context.DeadlineExceeded { |
| t.Errorf("Get() with no up endpoints should timeout, got %v", err) |
| } |
| |
| downC := make(chan func(error), 1) |
| |
| go func() { |
| // ensure hb.Up() will be called after hb.Get() to see if Up() releases blocking Get() |
| time.Sleep(time.Millisecond * 100) |
| f := hb.Up(grpc.Address{Addr: endpoints[1]}) |
| if addrs := <-hb.Notify(); len(addrs) != 1 { |
| t.Errorf("first Up() should have triggered balancer to send the first connected address via Notify chan so that other connections can be closed") |
| } |
| downC <- f |
| }() |
| addrFirst, putFun, err := hb.Get(context.Background(), blockingOpts) |
| if err != nil { |
| t.Errorf("Get() with up endpoints should success, got %v", err) |
| } |
| if addrFirst.Addr != endpoints[1] { |
| t.Errorf("Get() didn't return expected address, got %v", addrFirst) |
| } |
| if putFun == nil { |
| t.Errorf("Get() returned unexpected nil put function") |
| } |
| down1 := <-downC |
| |
| down2 := hb.Up(grpc.Address{Addr: endpoints[2]}) |
| addrSecond, _, _ := hb.Get(context.Background(), blockingOpts) |
| if addrFirst.Addr != addrSecond.Addr { |
| t.Errorf("Get() didn't return the same address as previous call, got %v and %v", addrFirst, addrSecond) |
| } |
| |
| down1(errors.New("error")) |
| if addrs := <-hb.Notify(); len(addrs) != len(endpoints)-1 { // we call down on one endpoint |
| t.Errorf("closing the only connection should triggered balancer to send the %d endpoints via Notify chan so that we can establish a connection", len(endpoints)-1) |
| } |
| down2(errors.New("error")) |
| ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*100) |
| _, _, err = hb.Get(ctx, blockingOpts) |
| cancel() |
| if err != context.DeadlineExceeded { |
| t.Errorf("Get() with no up endpoints should timeout, got %v", err) |
| } |
| } |
| |
| // TestHealthBalancerGraylist checks one endpoint is tried after the other |
| // due to gray listing. |
| func TestHealthBalancerGraylist(t *testing.T) { |
| var wg sync.WaitGroup |
| // Use 3 endpoints so gray list doesn't fallback to all connections |
| // after failing on 2 endpoints. |
| lns, eps := make([]net.Listener, 3), make([]string, 3) |
| wg.Add(3) |
| connc := make(chan string, 2) |
| for i := range eps { |
| ln, err := net.Listen("tcp", ":0") |
| testutil.AssertNil(t, err) |
| lns[i], eps[i] = ln, ln.Addr().String() |
| go func() { |
| defer wg.Done() |
| for { |
| conn, err := ln.Accept() |
| if err != nil { |
| return |
| } |
| _, err = conn.Read(make([]byte, 512)) |
| conn.Close() |
| if err == nil { |
| select { |
| case connc <- ln.Addr().String(): |
| // sleep some so balancer catches up |
| // before attempted next reconnect. |
| time.Sleep(50 * time.Millisecond) |
| default: |
| } |
| } |
| } |
| }() |
| } |
| |
| tf := func(s string) (bool, error) { return false, nil } |
| hb := newHealthBalancer(eps, 5*time.Second, tf) |
| |
| conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) |
| testutil.AssertNil(t, err) |
| defer conn.Close() |
| |
| kvc := pb.NewKVClient(conn) |
| <-hb.ready() |
| |
| kvc.Range(context.TODO(), &pb.RangeRequest{}) |
| ep1 := <-connc |
| kvc.Range(context.TODO(), &pb.RangeRequest{}) |
| ep2 := <-connc |
| for _, ln := range lns { |
| ln.Close() |
| } |
| wg.Wait() |
| |
| if ep1 == ep2 { |
| t.Fatalf("expected %q != %q", ep1, ep2) |
| } |
| } |
| |
| // TestBalancerDoNotBlockOnClose ensures that balancer and grpc don't deadlock each other |
| // due to rapid open/close conn. The deadlock causes balancer.Close() to block forever. |
| // See issue: https://github.com/coreos/etcd/issues/7283 for more detail. |
| func TestBalancerDoNotBlockOnClose(t *testing.T) { |
| defer testutil.AfterTest(t) |
| |
| kcl := newKillConnListener(t, 3) |
| defer kcl.close() |
| |
| for i := 0; i < 5; i++ { |
| hb := newHealthBalancer(kcl.endpoints(), minHealthRetryDuration, func(string) (bool, error) { return true, nil }) |
| conn, err := grpc.Dial("", grpc.WithInsecure(), grpc.WithBalancer(hb)) |
| if err != nil { |
| t.Fatal(err) |
| } |
| kvc := pb.NewKVClient(conn) |
| <-hb.readyc |
| |
| var wg sync.WaitGroup |
| wg.Add(100) |
| cctx, cancel := context.WithCancel(context.TODO()) |
| for j := 0; j < 100; j++ { |
| go func() { |
| defer wg.Done() |
| kvc.Range(cctx, &pb.RangeRequest{}, grpc.FailFast(false)) |
| }() |
| } |
| // balancer.Close() might block |
| // if balancer and grpc deadlock each other. |
| bclosec, cclosec := make(chan struct{}), make(chan struct{}) |
| go func() { |
| defer close(bclosec) |
| hb.Close() |
| }() |
| go func() { |
| defer close(cclosec) |
| conn.Close() |
| }() |
| select { |
| case <-bclosec: |
| case <-time.After(3 * time.Second): |
| testutil.FatalStack(t, "balancer close timeout") |
| } |
| select { |
| case <-cclosec: |
| case <-time.After(3 * time.Second): |
| t.Fatal("grpc conn close timeout") |
| } |
| |
| cancel() |
| wg.Wait() |
| } |
| } |
| |
| // killConnListener listens incoming conn and kills it immediately. |
| type killConnListener struct { |
| wg sync.WaitGroup |
| eps []string |
| stopc chan struct{} |
| t *testing.T |
| } |
| |
| func newKillConnListener(t *testing.T, size int) *killConnListener { |
| kcl := &killConnListener{stopc: make(chan struct{}), t: t} |
| |
| for i := 0; i < size; i++ { |
| ln, err := net.Listen("tcp", ":0") |
| if err != nil { |
| t.Fatal(err) |
| } |
| kcl.eps = append(kcl.eps, ln.Addr().String()) |
| kcl.wg.Add(1) |
| go kcl.listen(ln) |
| } |
| return kcl |
| } |
| |
| func (kcl *killConnListener) endpoints() []string { |
| return kcl.eps |
| } |
| |
| func (kcl *killConnListener) listen(l net.Listener) { |
| go func() { |
| defer kcl.wg.Done() |
| for { |
| conn, err := l.Accept() |
| select { |
| case <-kcl.stopc: |
| return |
| default: |
| } |
| if err != nil { |
| kcl.t.Fatal(err) |
| } |
| time.Sleep(1 * time.Millisecond) |
| conn.Close() |
| } |
| }() |
| <-kcl.stopc |
| l.Close() |
| } |
| |
| func (kcl *killConnListener) close() { |
| close(kcl.stopc) |
| kcl.wg.Wait() |
| } |