blob: 303f815b8f12b9842afb825a8aca0d20c220e5af [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package electron
import (
"fmt"
"github.com/apache/qpid-proton/go/pkg/amqp"
"github.com/apache/qpid-proton/go/pkg/proton"
"time"
)
// Settings associated with a link
type LinkSettings interface {
// Source address that messages are coming from.
Source() string
// Target address that messages are going to.
Target() string
// Name is a unique name for the link among links between the same
// containers in the same direction. By default generated automatically.
LinkName() string
// IsSender is true if this is the sending end of the link.
IsSender() bool
// IsReceiver is true if this is the receiving end of the link.
IsReceiver() bool
// SndSettle defines when the sending end of the link settles message delivery.
SndSettle() SndSettleMode
// RcvSettle defines when the sending end of the link settles message delivery.
RcvSettle() RcvSettleMode
// Session containing the Link
Session() Session
// Filter for the link
Filter() map[amqp.Symbol]interface{}
// Advanced settings for the source
SourceSettings() TerminusSettings
// Advanced settings for the target
TargetSettings() TerminusSettings
}
// LinkOption can be passed when creating a sender or receiver link to set optional configuration.
type LinkOption func(*linkSettings)
// Source returns a LinkOption that sets address that messages are coming from.
func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } }
// Target returns a LinkOption that sets address that messages are going to.
func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
// LinkName returns a LinkOption that sets the link name.
func LinkName(s string) LinkOption { return func(l *linkSettings) { l.linkName = s } }
// SndSettle returns a LinkOption that sets the send settle mode
func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } }
// RcvSettle returns a LinkOption that sets the send settle mode
func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } }
// Capacity returns a LinkOption that sets the link capacity
func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } }
// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } }
// DurableSubscription returns a LinkOption that configures a Receiver as a named durable
// subscription. The name overrides (and is overridden by) LinkName() so you should normally
// only use one of these options.
func DurableSubscription(name string) LinkOption {
return func(l *linkSettings) {
l.linkName = name
l.sourceSettings.Durability = proton.Deliveries
l.sourceSettings.Expiry = proton.ExpireNever
}
}
// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
// are sent but no acknowledgment is received, messages can be lost if there is
// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
func AtMostOnce() LinkOption {
return func(l *linkSettings) {
SndSettle(SndSettled)(l)
RcvSettle(RcvFirst)(l)
}
}
// AtLeastOnce returns a LinkOption that requests acknowledgment for every
// message, acknowledgment indicates the message was definitely received. In the
// event of a failure, unacknowledged messages can be re-sent but there is a
// chance that the message will be received twice in this case. Sets
// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
func AtLeastOnce() LinkOption {
return func(l *linkSettings) {
SndSettle(SndUnsettled)(l)
RcvSettle(RcvFirst)(l)
}
}
// Filter returns a LinkOption that sets a filter.
func Filter(m map[amqp.Symbol]interface{}) LinkOption {
return func(l *linkSettings) { l.filter = m }
}
// SourceSettings returns a LinkOption that sets all the SourceSettings.
// Note: it will override the source address set by a Source() option
func SourceSettings(ts TerminusSettings) LinkOption {
return func(l *linkSettings) { l.sourceSettings = ts }
}
// TargetSettings returns a LinkOption that sets all the TargetSettings.
// Note: it will override the target address set by a Target() option
func TargetSettings(ts TerminusSettings) LinkOption {
return func(l *linkSettings) { l.targetSettings = ts }
}
// SndSettleMode defines when the sending end of the link settles message delivery.
type SndSettleMode proton.SndSettleMode
const (
// Messages are sent unsettled
SndUnsettled = SndSettleMode(proton.SndUnsettled)
// Messages are sent already settled
SndSettled = SndSettleMode(proton.SndSettled)
// Sender can send either unsettled or settled messages.
SndMixed = SndSettleMode(proton.SndMixed)
)
// RcvSettleMode defines when the receiving end of the link settles message delivery.
type RcvSettleMode proton.RcvSettleMode
const (
// Receiver settles first.
RcvFirst = RcvSettleMode(proton.RcvFirst)
// Receiver waits for sender to settle before settling.
RcvSecond = RcvSettleMode(proton.RcvSecond)
)
type linkSettings struct {
source string
sourceSettings TerminusSettings
target string
targetSettings TerminusSettings
linkName string
isSender bool
sndSettle SndSettleMode
rcvSettle RcvSettleMode
capacity int
prefetch bool
filter map[amqp.Symbol]interface{}
session *session
pLink proton.Link
}
// Advanced AMQP settings for the source or target of a link.
// Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription()
// and do not need to be set/examined directly.
type TerminusSettings struct {
Durability proton.Durability
Expiry proton.ExpiryPolicy
Timeout time.Duration
Dynamic bool
}
func makeTerminusSettings(t proton.Terminus) TerminusSettings {
return TerminusSettings{
Durability: t.Durability(),
Expiry: t.ExpiryPolicy(),
Timeout: t.Timeout(),
Dynamic: t.IsDynamic(),
}
}
type link struct {
endpoint
linkSettings
}
func (l *linkSettings) Source() string { return l.source }
func (l *linkSettings) Target() string { return l.target }
func (l *linkSettings) LinkName() string { return l.linkName }
func (l *linkSettings) IsSender() bool { return l.isSender }
func (l *linkSettings) IsReceiver() bool { return !l.isSender }
func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
func (l *linkSettings) Filter() map[amqp.Symbol]interface{} { return l.filter }
func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings }
func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings }
func (l *link) Session() Session { return l.session }
func (l *link) Connection() Connection { return l.session.Connection() }
func (l *link) engine() *proton.Engine { return l.session.connection.engine }
func (l *link) handler() *handler { return l.session.connection.handler }
// Open a link and return the linkSettings.
func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error) {
l := linkSettings{
isSender: isSender,
capacity: 1,
prefetch: false,
session: sn,
}
for _, set := range setting {
set(&l)
}
if l.linkName == "" {
l.linkName = l.session.connection.container.nextLinkName()
}
if l.IsSender() {
l.pLink = l.session.pSession.Sender(l.linkName)
} else {
l.pLink = l.session.pSession.Receiver(l.linkName)
}
if l.pLink.IsNil() {
return l, fmt.Errorf("cannot create link %s", l.pLink)
}
l.pLink.Source().SetAddress(l.source)
if len(l.filter) > 0 {
if err := l.pLink.Source().Filter().Marshal(l.filter); err != nil {
panic(err) // Shouldn't happen
}
}
l.pLink.Source().SetDurability(l.sourceSettings.Durability)
l.pLink.Source().SetExpiryPolicy(l.sourceSettings.Expiry)
l.pLink.Source().SetTimeout(l.sourceSettings.Timeout)
l.pLink.Source().SetDynamic(l.sourceSettings.Dynamic)
l.pLink.Target().SetAddress(l.target)
l.pLink.Target().SetDurability(l.targetSettings.Durability)
l.pLink.Target().SetExpiryPolicy(l.targetSettings.Expiry)
l.pLink.Target().SetTimeout(l.targetSettings.Timeout)
l.pLink.Target().SetDynamic(l.targetSettings.Dynamic)
l.pLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
l.pLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
l.pLink.Open()
return l, nil
}
func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
l := linkSettings{
isSender: pLink.IsSender(),
source: pLink.RemoteSource().Address(),
sourceSettings: makeTerminusSettings(pLink.RemoteSource()),
target: pLink.RemoteTarget().Address(),
targetSettings: makeTerminusSettings(pLink.RemoteTarget()),
linkName: pLink.Name(),
sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
capacity: 1,
prefetch: false,
pLink: pLink,
session: sn,
}
filter := l.pLink.RemoteSource().Filter()
if !filter.Empty() {
filter.Unmarshal(&l.filter) // TODO aconway 2017-06-08: ignoring errors
}
return l
}
// Not part of Link interface but use by Sender and Receiver.
func (l *link) Credit() (credit int, err error) {
err = l.engine().InjectWait(func() error {
if l.Error() != nil {
return l.Error()
}
credit = l.pLink.Credit()
return nil
})
return
}
// Not part of Link interface but use by Sender and Receiver.
func (l *link) Capacity() int { return l.capacity }
func (l *link) Close(err error) {
_ = l.engine().Inject(func() {
if l.Error() == nil {
localClose(l.pLink, err)
}
})
}