blob: eb8af8563ec59dc0ac6133e5ebd0967fcd797967 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package electron
// #include <proton/disposition.h>
import "C"
import (
"fmt"
"time"
"github.com/apache/qpid-proton/go/pkg/amqp"
"github.com/apache/qpid-proton/go/pkg/proton"
)
// Sender is a Link that sends messages.
//
// The result of sending a message is provided by an Outcome value.
//
// A sender can buffer messages up to the credit limit provided by the remote receiver.
// All the Send* methods will block if the buffer is full until there is space.
// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
//
type Sender interface {
Endpoint
LinkSettings
// SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
// Returns an Outcome, which may contain an error if the message could not be sent.
SendSync(m amqp.Message) Outcome
// SendWaitable puts a message in the send buffer and returns a channel that
// you can use to wait for the Outcome of just that message. The channel is
// buffered so you can receive from it whenever you want without blocking.
//
// Note: can block if there is no space to buffer the message.
SendWaitable(m amqp.Message) <-chan Outcome
// SendForget buffers a message for sending and returns, with no notification of the outcome.
//
// Note: can block if there is no space to buffer the message.
SendForget(m amqp.Message)
// SendAsync puts a message in the send buffer and returns immediately. An
// Outcome with Value = value will be sent to the ack channel when the remote
// receiver has acknowledged the message or if there is an error.
//
// You can use the same ack channel for many calls to SendAsync(), possibly on
// many Senders. The channel will receive the outcomes in the order they
// become available. The channel should be buffered and/or served by dedicated
// goroutines to avoid blocking the connection.
//
// If ack == nil no Outcome is sent.
//
// Note: can block if there is no space to buffer the message.
SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)
SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome
SendForgetTimeout(m amqp.Message, timeout time.Duration)
SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
}
// Outcome provides information about the outcome of sending a message.
type Outcome struct {
// Status of the message: was it sent, how was it acknowledged.
Status SentStatus
// Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
Error error
// Value provided by the application in SendAsync()
Value interface{}
}
func (o Outcome) send(ack chan<- Outcome) {
if ack != nil {
ack <- o
}
}
// SentStatus indicates the status of a sent message.
type SentStatus int
const (
// Message was never sent
Unsent SentStatus = iota
// Message was sent but never acknowledged. It may or may not have been received.
Unacknowledged
// Message was accepted by the receiver (or was sent pre-settled, accept is assumed)
Accepted
// Message was rejected as invalid by the receiver
Rejected
// Message was not processed by the receiver but may be valid for a different receiver
Released
// Receiver responded with an unrecognized status.
Unknown
)
// String human readable name for SentStatus.
func (s SentStatus) String() string {
switch s {
case Unsent:
return "unsent"
case Unacknowledged:
return "unacknowledged"
case Accepted:
return "accepted"
case Rejected:
return "rejected"
case Released:
return "released"
case Unknown:
return "unknown"
default:
return fmt.Sprintf("invalid(%d)", s)
}
}
// Convert proton delivery state code to SentStatus value
func sentStatus(d uint64) SentStatus {
switch d {
case proton.Accepted:
return Accepted
case proton.Rejected:
return Rejected
case proton.Released, proton.Modified:
return Released
default:
return Unknown
}
}
type sendable struct {
m amqp.Message
ack chan<- Outcome // Channel for acknowledgement of m
v interface{} // Correlation value
sent chan struct{} // Closed when m is encoded and will be sent
}
func (sm *sendable) unsent(err error) {
Outcome{Unsent, err, sm.v}.send(sm.ack)
}
type sender struct {
link
sending []*sendable
}
func newSender(ls linkSettings) *sender {
s := &sender{link: link{linkSettings: ls}}
s.endpoint.init(s.link.pLink.String())
s.handler().addLink(s.pLink, s)
s.link.pLink.Open()
return s
}
// Called in handler goroutine
func (s *sender) startSend(sm *sendable) {
s.sending = append(s.sending, sm)
s.trySend()
}
// Called in handler goroutine
func (s *sender) trySend() {
for s.pLink.Credit() > 0 && len(s.sending) > 0 {
sm := s.sending[0]
s.sending = s.sending[1:]
s.send(sm)
}
}
// Called in handler goroutine with credit > 0
func (s *sender) send(sm *sendable) {
if err := s.Error(); err != nil {
sm.unsent(err)
return
}
bytes, err := s.session.connection.mc.Encode(sm.m, nil)
close(sm.sent) // Safe to re-use sm.m now
if err != nil {
sm.unsent(err)
return
}
d, err := s.pLink.SendMessageBytes(bytes)
if err != nil {
sm.unsent(err)
return
}
if s.SndSettle() == SndSettled || (s.SndSettle() == SndMixed && sm.ack == nil) {
d.Settle() // Pre-settled
Outcome{Accepted, nil, sm.v}.send(sm.ack) // Assume accepted
} else {
// Register with handler to receive the remote outcome
s.handler().sent[d] = sm
}
}
func (s *sender) timeoutSend(sm *sendable) {
for i, sm2 := range s.sending {
if sm2 == sm {
n := copy(s.sending[i:], s.sending[i+1:])
s.sending = s.sending[:i+n] // delete
close(sm.sent)
return
}
}
}
func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
sm := &sendable{m, ack, v, make(chan struct{})}
s.engine().Inject(func() { s.startSend(sm) })
select {
case <-sm.sent: // OK
case <-After(t): // Try to timeout sm
s.engine().Inject(func() { s.timeoutSend(sm) })
}
}
func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome {
out := make(chan Outcome, 1)
s.SendAsyncTimeout(m, out, nil, t)
return out
}
func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
s.SendAsyncTimeout(m, nil, nil, t)
}
func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
deadline := time.Now().Add(t)
ack := s.SendWaitableTimeout(m, t)
t = deadline.Sub(time.Now()) // Adjust for time already spent.
if t < 0 {
t = 0
}
if out, err := timedReceive(ack, t); err == nil {
return out.(Outcome)
} else {
if err == Closed && s.Error() != nil {
err = s.Error()
}
return Outcome{Unacknowledged, err, nil}
}
}
func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) {
s.SendAsyncTimeout(m, ack, v, Forever)
}
func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
return s.SendWaitableTimeout(m, Forever)
}
func (s *sender) SendForget(m amqp.Message) {
s.SendForgetTimeout(m, Forever)
}
func (s *sender) SendSync(m amqp.Message) Outcome {
return <-s.SendWaitable(m)
}
// handler goroutine
func (s *sender) closed(err error) error {
for _, sm := range s.sending {
close(sm.sent)
}
s.sending = nil
return s.link.closed(err)
}
// IncomingSender is sent on the Connection.Incoming() channel when there is
// an incoming request to open a sender link.
type IncomingSender struct {
incoming
linkSettings
}
func newIncomingSender(sn *session, pLink proton.Link) *IncomingSender {
return &IncomingSender{
incoming: makeIncoming(pLink),
linkSettings: makeIncomingLinkSettings(pLink, sn),
}
}
// Accept accepts an incoming sender endpoint
func (in *IncomingSender) Accept() Endpoint {
return in.accept(func() Endpoint { return newSender(in.linkSettings) })
}
// Call in injected functions to check if the sender is valid.
func (s *sender) valid() bool {
s2, ok := s.handler().links[s.pLink].(*sender)
return ok && s2 == s
}