blob: d412ac1ecabd4882902297a92e46f07efb92ece5 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package electron
import (
"fmt"
"time"
"github.com/apache/qpid-proton/go/pkg/amqp"
"github.com/apache/qpid-proton/go/pkg/proton"
)
// Receiver is a Link that receives messages.
//
type Receiver interface {
Endpoint
LinkSettings
// Receive blocks until a message is available or until the Receiver is closed
// and has no more buffered messages.
Receive() (ReceivedMessage, error)
// ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
//
// Note that that if Prefetch is false, after a Timeout the credit issued by
// Receive remains on the link. It will be used by the next call to Receive.
ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
// Prefetch==true means the Receiver will automatically issue credit to the
// remote sender to keep its buffer as full as possible, i.e. it will
// "pre-fetch" messages independently of the application calling
// Receive(). This gives good throughput for applications that handle a
// continuous stream of messages. Larger capacity may improve throughput, the
// optimal value depends on the characteristics of your application.
//
// Prefetch==false means the Receiver will issue only issue credit when you
// call Receive(), and will only issue enough credit to satisfy the calls
// actually made. This gives lower throughput but will not fetch any messages
// in advance. It is good for synchronous applications that need to evaluate
// each message before deciding whether to receive another. The
// request-response pattern is a typical example. If you make concurrent
// calls to Receive with pre-fetch disabled, you can improve performance by
// setting the capacity close to the expected number of concurrent calls.
//
Prefetch() bool
// Capacity is the size (number of messages) of the local message buffer
// These are messages received but not yet returned to the application by a call to Receive()
Capacity() int
}
// Receiver implementation
type receiver struct {
link
buffer chan ReceivedMessage
callers int
}
func (r *receiver) Capacity() int { return cap(r.buffer) }
func (r *receiver) Prefetch() bool { return r.prefetch }
// Call in proton goroutine
func newReceiver(ls linkSettings) *receiver {
r := &receiver{link: link{linkSettings: ls}}
r.endpoint.init(r.link.pLink.String())
if r.capacity < 1 {
r.capacity = 1
}
r.buffer = make(chan ReceivedMessage, r.capacity)
r.handler().addLink(r.pLink, r)
r.link.pLink.Open()
if r.prefetch {
r.flow(r.maxFlow())
}
return r
}
// Call in proton goroutine. Max additional credit we can request.
func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - r.pLink.Credit() }
func (r *receiver) flow(credit int) {
if credit > 0 {
r.pLink.Flow(credit)
}
}
// Inject flow check per-caller call when prefetch is off.
// Called with inc=1 at start of call, inc = -1 at end
func (r *receiver) caller(inc int) {
_ = r.engine().Inject(func() {
r.callers += inc
need := r.callers - (len(r.buffer) + r.pLink.Credit())
max := r.maxFlow()
if need > max {
need = max
}
r.flow(need)
})
}
// Inject flow top-up if prefetch is enabled
func (r *receiver) flowTopUp() {
if r.prefetch {
_ = r.engine().Inject(func() { r.flow(r.maxFlow()) })
}
}
func (r *receiver) Receive() (rm ReceivedMessage, err error) {
return r.ReceiveTimeout(Forever)
}
func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
if r.buffer == nil {
panic(fmt.Errorf("Receiver is not open: %s", r))
}
if !r.prefetch { // Per-caller flow control
select { // Check for immediate availability, avoid caller() inject
case rm2, ok := <-r.buffer:
if ok {
rm = rm2
} else {
err = r.Error()
}
return
default: // Not immediately available, inject caller() counts
r.caller(+1)
defer r.caller(-1)
}
}
rmi, err := timedReceive(r.buffer, timeout)
switch err {
case nil:
r.flowTopUp()
rm = rmi.(ReceivedMessage)
case Closed:
err = r.Error()
}
return
}
// Called in proton goroutine on MMessage event.
func (r *receiver) message(delivery proton.Delivery) {
if r.pLink.State().RemoteClosed() {
localClose(r.pLink, r.pLink.RemoteCondition().Error())
return
}
if delivery.HasMessage() {
bytes, err := delivery.MessageBytes()
var m amqp.Message
if err == nil {
m = amqp.NewMessage()
err = r.session.connection.mc.Decode(m, bytes)
}
if err != nil {
localClose(r.pLink, err)
return
}
r.pLink.Advance()
if r.pLink.Credit() < 0 {
localClose(r.pLink, fmt.Errorf("received message in excess of credit limit"))
} else {
// We never issue more credit than cap(buffer) so this will not block.
r.buffer <- ReceivedMessage{m, delivery, r}
}
}
}
func (r *receiver) closed(err error) error {
e := r.link.closed(err)
if r.buffer != nil {
close(r.buffer)
}
return e
}
// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
type ReceivedMessage struct {
// Message is the received message.
Message amqp.Message
pDelivery proton.Delivery
receiver Receiver
}
// Acknowledge a ReceivedMessage with the given delivery status.
func (rm *ReceivedMessage) acknowledge(status uint64) error {
return rm.receiver.(*receiver).engine().Inject(func() {
// Deliveries are valid as long as the connection is, unless settled.
rm.pDelivery.SettleAs(uint64(status))
})
}
// Accept tells the sender that we take responsibility for processing the message.
func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) }
// Reject tells the sender we consider the message invalid and unusable.
func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) }
// Release tells the sender we will not process the message but some other
// receiver might.
func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) }
// IncomingReceiver is sent on the Connection.Incoming() channel when there is
// an incoming request to open a receiver link.
type IncomingReceiver struct {
incoming
linkSettings
}
func newIncomingReceiver(sn *session, pLink proton.Link) *IncomingReceiver {
return &IncomingReceiver{
incoming: makeIncoming(pLink),
linkSettings: makeIncomingLinkSettings(pLink, sn),
}
}
// SetCapacity sets the capacity of the incoming receiver, call before Accept()
func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity }
// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()
func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch }
// Accept accepts an incoming receiver endpoint
func (in *IncomingReceiver) Accept() Endpoint {
return in.accept(func() Endpoint { return newReceiver(in.linkSettings) })
}