package electron
import (
// Settings associated with a link
type LinkSettings interface {
// Source address that messages are coming from.
Source() string
// Target address that messages are going to.
Target() string
// Name is a unique name for the link among links between the same
// containers in the same direction. By default generated automatically.
LinkName() string
// IsSender is true if this is the sending end of the link.
IsSender() bool
// IsReceiver is true if this is the receiving end of the link.
IsReceiver() bool
// SndSettle defines when the sending end of the link settles message delivery.
SndSettle() SndSettleMode
// RcvSettle defines when the sending end of the link settles message delivery.
RcvSettle() RcvSettleMode
// Session containing the Link
Session() Session
// Filter for the link
Filter() map[amqp.Symbol]interface{}
// Advanced settings for the source
SourceSettings() TerminusSettings
// Advanced settings for the target
TargetSettings() TerminusSettings
// LinkOption can be passed when creating a sender or receiver link to set optional configuration.
type LinkOption func(*linkSettings)
// Source returns a LinkOption that sets address that messages are coming from.
func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } }
// Target returns a LinkOption that sets address that messages are going to.
func Target(s string) LinkOption { return func(l *linkSettings) { = s } }
// LinkName returns a LinkOption that sets the link name.
func LinkName(s string) LinkOption { return func(l *linkSettings) { l.linkName = s } }
// SndSettle returns a LinkOption that sets the send settle mode
func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } }
// RcvSettle returns a LinkOption that sets the send settle mode
func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } }
// Capacity returns a LinkOption that sets the link capacity
func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } }
// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } }
// DurableSubscription returns a LinkOption that configures a Receiver as a named durable
// subscription. The name overrides (and is overridden by) LinkName() so you should normally
// only use one of these options.
func DurableSubscription(name string) LinkOption {
return func(l *linkSettings) {
l.linkName = name
l.sourceSettings.Durability = proton.Deliveries
l.sourceSettings.Expiry = proton.ExpireNever
// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
// are sent but no acknowledgment is received, messages can be lost if there is
// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
func AtMostOnce() LinkOption {
return func(l *linkSettings) {
// AtLeastOnce returns a LinkOption that requests acknowledgment for every
// message, acknowledgment indicates the message was definitely received. In the
// event of a failure, unacknowledged messages can be re-sent but there is a
// chance that the message will be received twice in this case. Sets
// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
func AtLeastOnce() LinkOption {
return func(l *linkSettings) {
// Filter returns a LinkOption that sets a filter.
func Filter(m map[amqp.Symbol]interface{}) LinkOption {
return func(l *linkSettings) { l.filter = m }
// SourceSettings returns a LinkOption that sets all the SourceSettings.
// Note: it will override the source address set by a Source() option
func SourceSettings(ts TerminusSettings) LinkOption {
return func(l *linkSettings) { l.sourceSettings = ts }
// TargetSettings returns a LinkOption that sets all the TargetSettings.
// Note: it will override the target address set by a Target() option
func TargetSettings(ts TerminusSettings) LinkOption {
return func(l *linkSettings) { l.targetSettings = ts }
// SndSettleMode defines when the sending end of the link settles message delivery.
type SndSettleMode proton.SndSettleMode
const (
// Messages are sent unsettled
SndUnsettled = SndSettleMode(proton.SndUnsettled)
// Messages are sent already settled
SndSettled = SndSettleMode(proton.SndSettled)
// Sender can send either unsettled or settled messages.
SndMixed = SndSettleMode(proton.SndMixed)
// RcvSettleMode defines when the receiving end of the link settles message delivery.
type RcvSettleMode proton.RcvSettleMode
const (
// Receiver settles first.
RcvFirst = RcvSettleMode(proton.RcvFirst)
// Receiver waits for sender to settle before settling.
RcvSecond = RcvSettleMode(proton.RcvSecond)
type linkSettings struct {
source string
sourceSettings TerminusSettings
target string
targetSettings TerminusSettings
linkName string
isSender bool
sndSettle SndSettleMode
rcvSettle RcvSettleMode
capacity int
prefetch bool
filter map[amqp.Symbol]interface{}
session *session
pLink proton.Link
// Advanced AMQP settings for the source or target of a link.
// Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription()
// and do not need to be set/examined directly.
type TerminusSettings struct {
Durability proton.Durability
Expiry proton.ExpiryPolicy
Timeout time.Duration
Dynamic bool
func makeTerminusSettings(t proton.Terminus) TerminusSettings {
return TerminusSettings{
Durability: t.Durability(),
Expiry: t.ExpiryPolicy(),
Timeout: t.Timeout(),
Dynamic: t.IsDynamic(),
type link struct {
func (l *linkSettings) Source() string { return l.source }
func (l *linkSettings) Target() string { return }
func (l *linkSettings) LinkName() string { return l.linkName }
func (l *linkSettings) IsSender() bool { return l.isSender }
func (l *linkSettings) IsReceiver() bool { return !l.isSender }
func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
func (l *linkSettings) Filter() map[amqp.Symbol]interface{} { return l.filter }
func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings }
func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings }
func (l *link) Session() Session { return l.session }
func (l *link) Connection() Connection { return l.session.Connection() }
func (l *link) engine() *proton.Engine { return l.session.connection.engine }
func (l *link) handler() *handler { return l.session.connection.handler }
// Open a link and return the linkSettings.
func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error) {
l := linkSettings{
isSender: isSender,
capacity: 1,
prefetch: false,
session: sn,
for _, set := range setting {
if l.linkName == "" {
l.linkName = l.session.connection.container.nextLinkName()
if l.IsSender() {
l.pLink = l.session.pSession.Sender(l.linkName)
} else {
l.pLink = l.session.pSession.Receiver(l.linkName)
if l.pLink.IsNil() {
return l, fmt.Errorf("cannot create link %s", l.pLink)
if len(l.filter) > 0 {
if err := l.pLink.Source().Filter().Marshal(l.filter); err != nil {
panic(err) // Shouldn't happen
return l, nil
func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
l := linkSettings{
isSender: pLink.IsSender(),
source: pLink.RemoteSource().Address(),
sourceSettings: makeTerminusSettings(pLink.RemoteSource()),
target: pLink.RemoteTarget().Address(),
targetSettings: makeTerminusSettings(pLink.RemoteTarget()),
linkName: pLink.Name(),
sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
capacity: 1,
prefetch: false,
pLink: pLink,
session: sn,
filter := l.pLink.RemoteSource().Filter()
if !filter.Empty() {
filter.Unmarshal(&l.filter) // TODO aconway 2017-06-08: ignoring errors
return l
// Not part of Link interface but use by Sender and Receiver.
func (l *link) Credit() (credit int, err error) {
err = l.engine().InjectWait(func() error {
if l.Error() != nil {
return l.Error()
credit = l.pLink.Credit()
return nil
// Not part of Link interface but use by Sender and Receiver.
func (l *link) Capacity() int { return l.capacity }
func (l *link) Close(err error) {
_ = l.engine().Inject(func() {
if l.Error() == nil {
localClose(l.pLink, err)