| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| |
| package electron |
| |
| import ( |
| "fmt" |
| "testing" |
| "time" |
| |
| "github.com/apache/qpid-proton/go/pkg/amqp" |
| "github.com/apache/qpid-proton/go/pkg/internal/test" |
| ) |
| |
| // Send a message one way with a client sender and server receiver, verify ack. |
| func TestClientSender(t *testing.T) { |
| p := newPipe(t, nil, nil) |
| defer func() { p.close() }() |
| |
| nLinks := 3 |
| nMessages := 3 |
| |
| s := make([]Sender, nLinks) |
| r := make([]Receiver, nLinks) |
| for i := 0; i < nLinks; i++ { |
| s[i], r[i] = p.sender(Target(fmt.Sprintf("foo%d", i))) |
| } |
| |
| for i := 0; i < nLinks; i++ { |
| for j := 0; j < nMessages; j++ { |
| // Client send |
| ack := make(chan Outcome, 1) |
| sendDone := make(chan struct{}) |
| go func() { |
| defer close(sendDone) |
| m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j)) |
| var err error |
| s[i].SendAsync(m, ack, "testing") |
| test.FatalIf(t, err) |
| }() |
| |
| // Server receive |
| rm, err := r[i].Receive() |
| test.FatalIf(t, err) |
| if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got { |
| t.Errorf("%#v != %#v", want, got) |
| } |
| |
| // Should not be acknowledged on client yet |
| <-sendDone |
| select { |
| case <-ack: |
| t.Errorf("unexpected ack") |
| default: |
| } |
| |
| // Server send ack |
| if err = rm.Reject(); err != nil { |
| t.Error(err) |
| } |
| // Client get ack. |
| if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected { |
| t.Error("unexpected ack: ", a.Status, a.Error, a.Value) |
| } |
| } |
| } |
| } |
| |
| func TestClientReceiver(t *testing.T) { |
| nMessages := 3 |
| p := newPipe(t, nil, nil) |
| defer func() { p.close() }() |
| r, s := p.receiver(Source("foo"), Capacity(nMessages), Prefetch(true)) |
| go func() { |
| for i := 0; i < nMessages; i++ { // Server sends |
| out := s.SendSync(amqp.NewMessageWith(int32(i))) |
| test.FatalIf(t, out.Error) |
| } |
| }() |
| for i := 0; i < nMessages; i++ { // Client receives |
| rm, err := r.Receive() |
| test.FatalIf(t, err) |
| test.ErrorIf(t, test.Differ(int32(i), rm.Message.Body())) |
| test.ErrorIf(t, rm.Accept()) |
| } |
| } |
| |
| // Test timeout versions of waiting functions. |
| func TestTimeouts(t *testing.T) { |
| p := newPipe(t, nil, nil) |
| defer func() { p.close() }() |
| snd, rcv := p.sender(Target("test")) |
| |
| // Test send with timeout |
| short := time.Millisecond |
| long := time.Second |
| m := amqp.NewMessage() |
| if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout. |
| t.Error("want Timeout got", err) |
| } |
| if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout. |
| t.Error("want Timeout got", err) |
| } |
| // Test receive with timeout |
| if _, err := rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout. |
| t.Error("want Timeout got", err) |
| } |
| // Test receive with timeout |
| if _, err := rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout. |
| t.Error("want Timeout got", err) |
| } |
| // There is now a credit on the link due to receive |
| ack := make(chan Outcome) |
| snd.SendAsyncTimeout(m, ack, nil, short) |
| // Disposition should timeout |
| select { |
| case <-ack: |
| t.Errorf("want Timeout got %#v", ack) |
| case <-time.After(short): |
| } |
| |
| // Receive and accept |
| rm, err := rcv.ReceiveTimeout(long) |
| if err != nil { |
| t.Fatal(err) |
| } |
| if err = rm.Accept(); err != nil { |
| t.Fatal(err) |
| } |
| // Sender get ack |
| if a := <-ack; a.Status != Accepted || a.Error != nil { |
| t.Errorf("want (accepted, nil) got %#v", a) |
| } |
| } |
| |
| type result struct { |
| label string |
| err error |
| value interface{} |
| } |
| |
| func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) } |
| |
| func doSend(snd Sender, results chan result) { |
| err := snd.SendSync(amqp.NewMessage()).Error |
| results <- result{"send", err, nil} |
| } |
| |
| func doReceive(rcv Receiver, results chan result) { |
| msg, err := rcv.Receive() |
| results <- result{"receive", err, msg} |
| } |
| |
| func doDisposition(ack <-chan Outcome, results chan result) { |
| results <- result{"disposition", (<-ack).Error, nil} |
| } |
| |
| // Senders get credit immediately if receivers have prefetch set |
| func TestSendReceivePrefetch(t *testing.T) { |
| p := newPipe(t, nil, nil) |
| p.prefetch = true |
| s, r := p.sender() |
| s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit. |
| if _, err := r.Receive(); err != nil { |
| t.Error(err) |
| } |
| } |
| |
| // Senders do not get credit till Receive() if receivers don't have prefetch |
| func TestSendReceiveNoPrefetch(t *testing.T) { |
| p := newPipe(t, nil, nil) |
| p.prefetch = false |
| s, r := p.sender() |
| done := make(chan struct{}, 1) |
| go func() { |
| s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit. |
| close(done) |
| }() |
| select { |
| case <-done: |
| t.Errorf("send should be blocked on credit") |
| default: |
| if _, err := r.Receive(); err != nil { |
| t.Error(err) |
| } else { |
| <-done |
| } // Should be unblocked now |
| } |
| } |
| |
| // Test that closing Links interrupts blocked link functions. |
| func TestLinkCloseInterrupt(t *testing.T) { |
| want := amqp.Error{Name: "x", Description: "all bad"} |
| p := newPipe(t, nil, nil) |
| results := make(chan result) // Collect expected errors |
| |
| // Note closing the link does not interrupt Send() calls, the AMQP spec says |
| // that deliveries can be settled after the link is closed. |
| |
| // Receiver.Close() interrupts Receive() |
| snd, rcv := p.sender() |
| go doReceive(rcv, results) |
| rcv.Close(want) |
| if r := <-results; want != r.err { |
| t.Errorf("want %#v got %#v", want, r) |
| } |
| |
| // Remote Sender.Close() interrupts Receive() |
| snd, rcv = p.sender() |
| go doReceive(rcv, results) |
| snd.Close(want) |
| if r := <-results; want != r.err { |
| t.Errorf("want %#v got %#v", want, r) |
| } |
| } |
| |
| // Test closing the server end of a connection. |
| func TestConnectionCloseInterrupt1(t *testing.T) { |
| want := amqp.Error{Name: "x", Description: "bad"} |
| p := newSocketPair(t, nil, nil) |
| p.prefetch = true |
| results := make(chan result) // Collect expected errors |
| |
| // Connection.Close() interrupts Send, Receive, Disposition. |
| snd, rcv := p.sender() |
| go doSend(snd, results) |
| |
| if _, err := rcv.Receive(); err != nil { |
| t.Error("receive", err) |
| } |
| rcv, snd = p.receiver() |
| go doReceive(rcv, results) |
| |
| snd, rcv = p.sender() |
| ack := snd.SendWaitable(amqp.NewMessage()) |
| if _, err := rcv.Receive(); err != nil { |
| t.Error("receive", err) |
| } |
| go doDisposition(ack, results) |
| |
| p.server.Close(want) |
| for i := 0; i < 3; i++ { |
| if r := <-results; want != r.err { |
| t.Errorf("want %v got %v", want, r) |
| } |
| } |
| } |
| |
| // Test closing the client end of the connection. |
| func TestConnectionCloseInterrupt2(t *testing.T) { |
| want := amqp.Error{Name: "x", Description: "bad"} |
| p := newSocketPair(t, nil, nil) |
| p.prefetch = true |
| results := make(chan result) // Collect expected errors |
| |
| // Connection.Close() interrupts Send, Receive, Disposition. |
| snd, rcv := p.sender() |
| go doSend(snd, results) |
| if _, err := rcv.Receive(); err != nil { |
| t.Error("receive", err) |
| } |
| |
| rcv, snd = p.receiver() |
| go doReceive(rcv, results) |
| |
| snd, rcv = p.sender() |
| ack := snd.SendWaitable(amqp.NewMessage()) |
| go doDisposition(ack, results) |
| |
| p.client.Connection().Close(want) |
| for i := 0; i < 3; i++ { |
| if r := <-results; want != r.err { |
| t.Errorf("want %v got %v", want, r.err) |
| } |
| } |
| } |
| |
| func TestHeartbeat(t *testing.T) { |
| p := newSocketPair(t, |
| []ConnectionOption{Heartbeat(102 * time.Millisecond)}, |
| []ConnectionOption{Heartbeat(101 * time.Millisecond)}) |
| defer func() { p.close() }() |
| |
| // Function to freeze the server to stop it sending heartbeats. |
| unfreeze := make(chan bool) |
| defer close(unfreeze) |
| freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) } |
| |
| test.FatalIf(t, p.client.Sync()) |
| test.ErrorIf(t, test.Differ(101*time.Millisecond, p.client.Connection().Heartbeat())) |
| test.ErrorIf(t, test.Differ(102*time.Millisecond, p.server.Heartbeat())) |
| |
| // Freeze the server for less than a heartbeat |
| test.FatalIf(t, freeze()) |
| time.Sleep(5 * time.Millisecond) |
| unfreeze <- true |
| // Make sure server is still responding. |
| s, _ := p.sender() |
| test.ErrorIf(t, s.Sync()) |
| |
| // Freeze the server till the p.client times out the connection |
| test.FatalIf(t, freeze()) |
| select { |
| case <-p.client.Done(): |
| if amqp.ResourceLimitExceeded != p.client.Error().(amqp.Error).Name { |
| t.Error("bad timeout error:", p.client.Error()) |
| } |
| case <-time.After(1400 * time.Millisecond): |
| t.Error("connection failed to time out") |
| } |
| |
| unfreeze <- true // Unfreeze the server |
| <-p.server.Done() |
| if p.server.Error() == nil { |
| t.Error("expected server side time-out or connection abort error") |
| } |
| } |