blob: 4835cb95f4bfdf6f1c7714af83cb3e9c0fb94b0d [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package electron
import (
"fmt"
"qpid.apache.org/proton"
)
// Link is the common interface for AMQP links. Sender and Receiver provide
// more methods for the sending or receiving end of a link respectively.
type Link interface {
Endpoint
// Source address that messages are coming from.
Source() string
// Target address that messages are going to.
Target() string
// Name is a unique name for the link among links between the same
// containers in the same direction. By default generated automatically.
LinkName() string
// IsSender is true if this is the sending end of the link.
IsSender() bool
// IsReceiver is true if this is the receiving end of the link.
IsReceiver() bool
// SndSettle defines when the sending end of the link settles message delivery.
SndSettle() SndSettleMode
// RcvSettle defines when the sending end of the link settles message delivery.
RcvSettle() RcvSettleMode
// Session containing the Link
Session() Session
// Called in event loop on closed event.
closed(err error)
// Called to open a link (local or accepted incoming link)
open()
}
// LinkOption can be passed when creating a sender or receiver link.
type LinkOption func(*link)
// Source sets address that messages are coming from.
func Source(s string) LinkOption { return func(l *link) { l.source = s } }
// Target sets address that messages are going to.
func Target(s string) LinkOption { return func(l *link) { l.target = s } }
// LinkName sets the link name.
func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
// SndSettle sets the send settle mode
func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } }
// RcvSettle sets the send settle mode
func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } }
// SndSettleMode defines when the sending end of the link settles message delivery.
type SndSettleMode proton.SndSettleMode
// Capacity sets the link capacity
func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
// AtMostOnce sets "fire and forget" mode, messages are sent but no
// acknowledgment is received, messages can be lost if there is a network
// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
func AtMostOnce() LinkOption {
return func(l *link) {
SndSettle(SndSettled)(l)
RcvSettle(RcvFirst)(l)
}
}
// AtLeastOnce requests acknowledgment for every message, acknowledgment
// indicates the message was definitely received. In the event of a
// failure, unacknowledged messages can be re-sent but there is a chance
// that the message will be received twice in this case.
// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
func AtLeastOnce() LinkOption {
return func(l *link) {
SndSettle(SndUnsettled)(l)
RcvSettle(RcvFirst)(l)
}
}
const (
// Messages are sent unsettled
SndUnsettled = SndSettleMode(proton.SndUnsettled)
// Messages are sent already settled
SndSettled = SndSettleMode(proton.SndSettled)
// Sender can send either unsettled or settled messages.
SendMixed = SndSettleMode(proton.SndMixed)
)
// RcvSettleMode defines when the receiving end of the link settles message delivery.
type RcvSettleMode proton.RcvSettleMode
const (
// Receiver settles first.
RcvFirst = RcvSettleMode(proton.RcvFirst)
// Receiver waits for sender to settle before settling.
RcvSecond = RcvSettleMode(proton.RcvSecond)
)
type link struct {
endpoint
// Link settings.
source string
target string
linkName string
isSender bool
sndSettle SndSettleMode
rcvSettle RcvSettleMode
capacity int
prefetch bool
session *session
eLink proton.Link
done chan struct{} // Closed when link is closed
}
func (l *link) Source() string { return l.source }
func (l *link) Target() string { return l.target }
func (l *link) LinkName() string { return l.linkName }
func (l *link) IsSender() bool { return l.isSender }
func (l *link) IsReceiver() bool { return !l.isSender }
func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
func (l *link) Session() Session { return l.session }
func (l *link) Connection() Connection { return l.session.Connection() }
func (l *link) engine() *proton.Engine { return l.session.connection.engine }
func (l *link) handler() *handler { return l.session.connection.handler }
// Set up link fields and open the proton.Link
func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) {
l := link{
session: sn,
isSender: isSender,
capacity: 1,
prefetch: false,
done: make(chan struct{}),
}
for _, set := range setting {
set(&l)
}
if l.linkName == "" {
l.linkName = l.session.connection.container.nextLinkName()
}
if l.IsSender() {
l.eLink = l.session.eSession.Sender(l.linkName)
} else {
l.eLink = l.session.eSession.Receiver(l.linkName)
}
if l.eLink.IsNil() {
l.err.Set(fmt.Errorf("cannot create link %s", l))
return l, l.err.Get()
}
l.eLink.Source().SetAddress(l.source)
l.eLink.Target().SetAddress(l.target)
l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
l.str = l.eLink.String()
l.eLink.Open()
return l, nil
}
type incomingLink struct {
incoming
link
}
// Set up a link from an incoming proton.Link.
func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
l := incomingLink{
link: link{
session: sn,
isSender: eLink.IsSender(),
eLink: eLink,
source: eLink.RemoteSource().Address(),
target: eLink.RemoteTarget().Address(),
linkName: eLink.Name(),
sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
capacity: 1,
prefetch: false,
done: make(chan struct{}),
},
}
l.str = eLink.String()
return l
}
// Called in proton goroutine on closed or disconnected
func (l *link) closed(err error) {
l.err.Set(err)
l.err.Set(Closed) // If no error set, mark as closed.
close(l.done)
}
// Not part of Link interface but use by Sender and Receiver.
func (l *link) Credit() (credit int, err error) {
err = l.engine().InjectWait(func() error {
credit = l.eLink.Credit()
return nil
})
return
}
// Not part of Link interface but use by Sender and Receiver.
func (l *link) Capacity() int { return l.capacity }
func (l *link) Close(err error) {
l.engine().Inject(func() { localClose(l.eLink, err) })
}
func (l *link) open() {
l.eLink.Open()
}