| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| */ |
| |
| package electron |
| |
| // #include <proton/disposition.h> |
| import "C" |
| |
| import ( |
| "fmt" |
| "time" |
| |
| "github.com/apache/qpid-proton/go/pkg/amqp" |
| "github.com/apache/qpid-proton/go/pkg/proton" |
| ) |
| |
| // Sender is a Link that sends messages. |
| // |
| // The result of sending a message is provided by an Outcome value. |
| // |
| // A sender can buffer messages up to the credit limit provided by the remote receiver. |
| // All the Send* methods will block if the buffer is full until there is space. |
| // Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. |
| // |
| type Sender interface { |
| Endpoint |
| LinkSettings |
| |
| // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. |
| // Returns an Outcome, which may contain an error if the message could not be sent. |
| SendSync(m amqp.Message) Outcome |
| |
| // SendWaitable puts a message in the send buffer and returns a channel that |
| // you can use to wait for the Outcome of just that message. The channel is |
| // buffered so you can receive from it whenever you want without blocking. |
| // |
| // Note: can block if there is no space to buffer the message. |
| SendWaitable(m amqp.Message) <-chan Outcome |
| |
| // SendForget buffers a message for sending and returns, with no notification of the outcome. |
| // |
| // Note: can block if there is no space to buffer the message. |
| SendForget(m amqp.Message) |
| |
| // SendAsync puts a message in the send buffer and returns immediately. An |
| // Outcome with Value = value will be sent to the ack channel when the remote |
| // receiver has acknowledged the message or if there is an error. |
| // |
| // You can use the same ack channel for many calls to SendAsync(), possibly on |
| // many Senders. The channel will receive the outcomes in the order they |
| // become available. The channel should be buffered and/or served by dedicated |
| // goroutines to avoid blocking the connection. |
| // |
| // If ack == nil no Outcome is sent. |
| // |
| // Note: can block if there is no space to buffer the message. |
| SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) |
| |
| SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) |
| |
| SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome |
| |
| SendForgetTimeout(m amqp.Message, timeout time.Duration) |
| |
| SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome |
| } |
| |
| // Outcome provides information about the outcome of sending a message. |
| type Outcome struct { |
| // Status of the message: was it sent, how was it acknowledged. |
| Status SentStatus |
| // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise. |
| Error error |
| // Value provided by the application in SendAsync() |
| Value interface{} |
| } |
| |
| func (o Outcome) send(ack chan<- Outcome) { |
| if ack != nil { |
| ack <- o |
| } |
| } |
| |
| // SentStatus indicates the status of a sent message. |
| type SentStatus int |
| |
| const ( |
| // Message was never sent |
| Unsent SentStatus = iota |
| // Message was sent but never acknowledged. It may or may not have been received. |
| Unacknowledged |
| // Message was accepted by the receiver (or was sent pre-settled, accept is assumed) |
| Accepted |
| // Message was rejected as invalid by the receiver |
| Rejected |
| // Message was not processed by the receiver but may be valid for a different receiver |
| Released |
| // Receiver responded with an unrecognized status. |
| Unknown |
| ) |
| |
| // String human readable name for SentStatus. |
| func (s SentStatus) String() string { |
| switch s { |
| case Unsent: |
| return "unsent" |
| case Unacknowledged: |
| return "unacknowledged" |
| case Accepted: |
| return "accepted" |
| case Rejected: |
| return "rejected" |
| case Released: |
| return "released" |
| case Unknown: |
| return "unknown" |
| default: |
| return fmt.Sprintf("invalid(%d)", s) |
| } |
| } |
| |
| // Convert proton delivery state code to SentStatus value |
| func sentStatus(d uint64) SentStatus { |
| switch d { |
| case proton.Accepted: |
| return Accepted |
| case proton.Rejected: |
| return Rejected |
| case proton.Released, proton.Modified: |
| return Released |
| default: |
| return Unknown |
| } |
| } |
| |
| type sendable struct { |
| m amqp.Message |
| ack chan<- Outcome // Channel for acknowledgement of m |
| v interface{} // Correlation value |
| sent chan struct{} // Closed when m is encoded and will be sent |
| } |
| |
| func (sm *sendable) unsent(err error) { |
| Outcome{Unsent, err, sm.v}.send(sm.ack) |
| } |
| |
| type sender struct { |
| link |
| sending []*sendable |
| } |
| |
| func newSender(ls linkSettings) *sender { |
| s := &sender{link: link{linkSettings: ls}} |
| s.endpoint.init(s.link.pLink.String()) |
| s.handler().addLink(s.pLink, s) |
| s.link.pLink.Open() |
| return s |
| } |
| |
| // Called in handler goroutine |
| func (s *sender) startSend(sm *sendable) { |
| s.sending = append(s.sending, sm) |
| s.trySend() |
| } |
| |
| // Called in handler goroutine |
| func (s *sender) trySend() { |
| for s.pLink.Credit() > 0 && len(s.sending) > 0 { |
| sm := s.sending[0] |
| s.sending = s.sending[1:] |
| s.send(sm) |
| } |
| } |
| |
| // Called in handler goroutine with credit > 0 |
| func (s *sender) send(sm *sendable) { |
| if err := s.Error(); err != nil { |
| sm.unsent(err) |
| return |
| } |
| bytes, err := s.session.connection.mc.Encode(sm.m, nil) |
| close(sm.sent) // Safe to re-use sm.m now |
| if err != nil { |
| sm.unsent(err) |
| return |
| } |
| d, err := s.pLink.SendMessageBytes(bytes) |
| if err != nil { |
| sm.unsent(err) |
| return |
| } |
| if s.SndSettle() == SndSettled || (s.SndSettle() == SndMixed && sm.ack == nil) { |
| d.Settle() // Pre-settled |
| Outcome{Accepted, nil, sm.v}.send(sm.ack) // Assume accepted |
| } else { |
| // Register with handler to receive the remote outcome |
| s.handler().sent[d] = sm |
| } |
| } |
| |
| func (s *sender) timeoutSend(sm *sendable) { |
| for i, sm2 := range s.sending { |
| if sm2 == sm { |
| n := copy(s.sending[i:], s.sending[i+1:]) |
| s.sending = s.sending[:i+n] // delete |
| close(sm.sent) |
| return |
| } |
| } |
| } |
| |
| func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { |
| sm := &sendable{m, ack, v, make(chan struct{})} |
| s.engine().Inject(func() { s.startSend(sm) }) |
| select { |
| case <-sm.sent: // OK |
| case <-After(t): // Try to timeout sm |
| s.engine().Inject(func() { s.timeoutSend(sm) }) |
| } |
| } |
| |
| func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome { |
| out := make(chan Outcome, 1) |
| s.SendAsyncTimeout(m, out, nil, t) |
| return out |
| } |
| |
| func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) { |
| s.SendAsyncTimeout(m, nil, nil, t) |
| } |
| |
| func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome { |
| deadline := time.Now().Add(t) |
| ack := s.SendWaitableTimeout(m, t) |
| t = deadline.Sub(time.Now()) // Adjust for time already spent. |
| if t < 0 { |
| t = 0 |
| } |
| if out, err := timedReceive(ack, t); err == nil { |
| return out.(Outcome) |
| } else { |
| if err == Closed && s.Error() != nil { |
| err = s.Error() |
| } |
| return Outcome{Unacknowledged, err, nil} |
| } |
| } |
| |
| func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) { |
| s.SendAsyncTimeout(m, ack, v, Forever) |
| } |
| |
| func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome { |
| return s.SendWaitableTimeout(m, Forever) |
| } |
| |
| func (s *sender) SendForget(m amqp.Message) { |
| s.SendForgetTimeout(m, Forever) |
| } |
| |
| func (s *sender) SendSync(m amqp.Message) Outcome { |
| return <-s.SendWaitable(m) |
| } |
| |
| // handler goroutine |
| func (s *sender) closed(err error) error { |
| for _, sm := range s.sending { |
| close(sm.sent) |
| } |
| s.sending = nil |
| return s.link.closed(err) |
| } |
| |
| // IncomingSender is sent on the Connection.Incoming() channel when there is |
| // an incoming request to open a sender link. |
| type IncomingSender struct { |
| incoming |
| linkSettings |
| } |
| |
| func newIncomingSender(sn *session, pLink proton.Link) *IncomingSender { |
| return &IncomingSender{ |
| incoming: makeIncoming(pLink), |
| linkSettings: makeIncomingLinkSettings(pLink, sn), |
| } |
| } |
| |
| // Accept accepts an incoming sender endpoint |
| func (in *IncomingSender) Accept() Endpoint { |
| return in.accept(func() Endpoint { return newSender(in.linkSettings) }) |
| } |
| |
| // Call in injected functions to check if the sender is valid. |
| func (s *sender) valid() bool { |
| s2, ok := s.handler().links[s.pLink].(*sender) |
| return ok && s2 == s |
| } |