blob: 30c6cd500e5e72026b033954a3202fb4cfea06cf [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package electron
// #include <proton/disposition.h>
import "C"
import (
"crypto/rand"
"encoding/hex"
"fmt"
"net"
"sync"
"time"
"github.com/apache/qpid-proton/go/pkg/amqp"
"github.com/apache/qpid-proton/go/pkg/proton"
)
// Settings associated with a Connection.
type ConnectionSettings interface {
// Authenticated user name associated with the connection.
User() string
// The AMQP virtual host name for the connection.
//
// Optional, useful when the server has multiple names and provides different
// service based on the name the client uses to connect.
//
// By default it is set to the DNS host name that the client uses to connect,
// but it can be set to something different at the client side with the
// VirtualHost() option.
//
// Returns error if the connection fails to authenticate.
VirtualHost() string
// Heartbeat is the maximum delay between sending frames that the remote peer
// has requested of us. If the interval expires an empty "heartbeat" frame
// will be sent automatically to keep the connection open.
Heartbeat() time.Duration
}
// Connection is an AMQP connection, created by a Container.
type Connection interface {
Endpoint
ConnectionSettings
// Sender opens a new sender on the DefaultSession.
Sender(...LinkOption) (Sender, error)
// Receiver opens a new Receiver on the DefaultSession().
Receiver(...LinkOption) (Receiver, error)
// DefaultSession() returns a default session for the connection. It is opened
// on the first call to DefaultSession and returned on subsequent calls.
DefaultSession() (Session, error)
// Session opens a new session.
Session(...SessionOption) (Session, error)
// Container for the connection.
Container() Container
// Disconnect the connection abruptly with an error.
Disconnect(error)
// Wait waits for the connection to be disconnected.
Wait() error
// WaitTimeout is like Wait but returns Timeout if the timeout expires.
WaitTimeout(time.Duration) error
// Incoming returns a channel for incoming endpoints opened by the remote peer.
// See the Incoming interface for more detail.
//
// Note: this channel will first return an *IncomingConnection for the
// connection itself which allows you to look at security information and
// decide whether to Accept() or Reject() the connection. Then it will return
// *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened
// by the remote end.
//
// Note 2: you must receiving from Incoming() and call Accept/Reject to avoid
// blocking electron event loop. Normally you would run a loop in a goroutine
// to handle incoming types that interest and Accept() those that don't.
Incoming() <-chan Incoming
}
type connectionSettings struct {
user, virtualHost string
heartbeat time.Duration
}
func (c connectionSettings) User() string { return c.user }
func (c connectionSettings) VirtualHost() string { return c.virtualHost }
func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
// ConnectionOption arguments can be passed when creating a connection to configure it.
type ConnectionOption func(*connection)
// User returns a ConnectionOption sets the user name for a connection
func User(user string) ConnectionOption {
return func(c *connection) {
c.user = user
c.pConnection.SetUser(user)
}
}
// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection.
// Only applies to outbound client connection.
func VirtualHost(virtualHost string) ConnectionOption {
return func(c *connection) {
c.virtualHost = virtualHost
c.pConnection.SetHostname(virtualHost)
}
}
// Password returns a ConnectionOption to set the password used to establish a
// connection. Only applies to outbound client connection.
//
// The connection will erase its copy of the password from memory as soon as it
// has been used to authenticate. If you are concerned about passwords staying in
// memory you should never store them as strings, and should overwrite your
// copy as soon as you are done with it.
//
func Password(password []byte) ConnectionOption {
return func(c *connection) { c.pConnection.SetPassword(password) }
}
// Server returns a ConnectionOption to put the connection in server mode for incoming connections.
//
// A server connection will do protocol negotiation to accept a incoming AMQP
// connection. Normally you would call this for a connection created by
// net.Listener.Accept()
//
func Server() ConnectionOption {
return func(c *connection) { c.setServer() }
}
// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
// Connection.Incoming() This is automatically set for Server() connections.
func AllowIncoming() ConnectionOption {
return func(c *connection) { c.incoming = make(chan Incoming) }
}
// Parent returns a ConnectionOption that associates the Connection with it's Container
// If not set a connection will create its own default container.
func Parent(cont Container) ConnectionOption {
return func(c *connection) { c.container = cont.(*container) }
}
// ContainerId returns a ConnectionOption that creates a new Container
// with id and associates it with the connection
func ContainerId(id string) ConnectionOption {
return func(c *connection) { c.container = NewContainer(id).(*container) }
}
type connection struct {
endpoint
connectionSettings
defaultSessionOnce, closeOnce sync.Once
container *container
conn net.Conn
server, client bool
incoming chan Incoming
handler *handler
engine *proton.Engine
pConnection proton.Connection
mc amqp.MessageCodec
defaultSession Session
}
// NewConnection creates a connection with the given options.
// Options are applied in order.
func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
c := &connection{
conn: conn,
}
c.handler = newHandler(c)
var err error
c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
if err != nil {
return nil, err
}
c.pConnection = c.engine.Connection()
for _, opt := range opts {
opt(c)
// If the first option is not Server(), then we are a client.
// Applying Server() after other options is an error
if !c.server {
c.client = true
}
}
if c.container == nil {
// Generate a random container-id. Not an RFC4122-compliant UUID but probably-unique
id := make([]byte, 16)
if _, err = rand.Read(id); err != nil {
return nil, err
}
c.container = NewContainer(hex.EncodeToString(id)).(*container)
}
c.pConnection.SetContainer(c.container.Id())
saslConfig.setup(c.engine)
c.endpoint.init(c.engine.String())
go c.run()
return c, nil
}
func (c *connection) setServer() {
if c.client {
panic("electron.Server() must be first in the ConnectionOption list")
}
c.server = true
c.engine.Server()
AllowIncoming()(c)
}
func (c *connection) run() {
if !c.server {
c.pConnection.Open()
}
_ = c.engine.Run()
if c.incoming != nil {
close(c.incoming)
}
_ = c.closed(Closed)
}
func (c *connection) Close(err error) {
c.closeOnce.Do(func() {
c.err.Set(err)
c.engine.Close(err)
c.mc.Close()
})
}
func (c *connection) Disconnect(err error) {
c.err.Set(err)
c.engine.Disconnect(err)
}
func (c *connection) Session(opts ...SessionOption) (Session, error) {
var s Session
err := c.engine.InjectWait(func() error {
if c.Error() != nil {
return c.Error()
}
pSession, err := c.engine.Connection().Session()
if err == nil {
pSession.Open()
if err == nil {
s = newSession(c, pSession, opts...)
}
}
return err
})
return s, err
}
func (c *connection) Container() Container { return c.container }
func (c *connection) DefaultSession() (s Session, err error) {
c.defaultSessionOnce.Do(func() {
c.defaultSession, err = c.Session()
})
if err == nil {
err = c.Error()
}
return c.defaultSession, err
}
func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
if s, err := c.DefaultSession(); err == nil {
return s.Sender(opts...)
} else {
return nil, err
}
}
func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
if s, err := c.DefaultSession(); err == nil {
return s.Receiver(opts...)
} else {
return nil, err
}
}
func (c *connection) Connection() Connection { return c }
func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
func (c *connection) WaitTimeout(timeout time.Duration) error {
_, err := timedReceive(c.done, timeout)
if err == Timeout {
return Timeout
}
return c.Error()
}
func (c *connection) Incoming() <-chan Incoming {
if c.incoming == nil {
panic(fmt.Errorf("Incoming() only allowed on Connection created with the Server() option: %s", c))
}
return c.incoming
}
type IncomingConnection struct {
incoming
connectionSettings
c *connection
}
func newIncomingConnection(c *connection) *IncomingConnection {
c.user = c.pConnection.Transport().User()
c.virtualHost = c.pConnection.RemoteHostname()
c.heartbeat = c.pConnection.Transport().RemoteIdleTimeout()
return &IncomingConnection{
incoming: makeIncoming(c.pConnection),
connectionSettings: c.connectionSettings,
c: c}
}
// AcceptConnection is like Accept() but takes ConnectionOption
// arguments like NewConnection(). For example you can set the
// Heartbeat() for the incoming connection.
func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
return in.accept(func() Endpoint {
for _, opt := range opts {
opt(in.c)
}
in.c.pConnection.Open()
return in.c
}).(Connection)
}
func (in *IncomingConnection) Accept() Endpoint {
return in.AcceptConnection()
}
func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
// SASLEnable returns a ConnectionOption that enables SASL authentication.
// Only required if you don't set any other SASL options.
func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
// mechanisms.
//
// Can be used on the client or the server to restrict the SASL for a connection.
// mechs is a space-separated list of mechanism names.
//
// The mechanisms allowed by default are determined by your SASL
// library and system configuration, with two exceptions: GSSAPI
// and GSS-SPNEGO are disabled by default. To enable them, you
// must explicitly add them using this option.
//
// Clients must set the allowed mechanisms before the the
// outgoing connection is attempted. Servers must set them
// before the listening connection is setup.
//
func SASLAllowedMechs(mechs string) ConnectionOption {
return func(c *connection) { sasl(c).AllowedMechs(mechs) }
}
// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
// text SASL authentication mechanisms
//
// By default the SASL layer is configured not to allow mechanisms that disclose
// the clear text of the password over an unencrypted AMQP connection. This specifically
// will disallow the use of the PLAIN mechanism without using SSL encryption.
//
// This default is to avoid disclosing password information accidentally over an
// insecure network.
//
func SASLAllowInsecure(b bool) ConnectionOption {
return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
}
// Heartbeat returns a ConnectionOption that requests the maximum delay
// between sending frames for the remote peer. If we don't receive any frames
// within 2*delay we will close the connection.
//
func Heartbeat(delay time.Duration) ConnectionOption {
// Proton-C divides the idle-timeout by 2 before sending, so compensate.
return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) }
}
type saslConfigState struct {
lock sync.Mutex
name string
dir string
initialized bool
}
func (s *saslConfigState) set(target *string, value string) {
s.lock.Lock()
defer s.lock.Unlock()
if s.initialized {
panic("SASL configuration cannot be changed after a Connection has been created")
}
*target = value
}
// Apply the global SASL configuration the first time a proton.Engine needs it
//
// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
// can realistically offer is global configuration. Later if/when the pn_sasl C
// impl is fixed we can offer per connection over-rides.
func (s *saslConfigState) setup(eng *proton.Engine) {
s.lock.Lock()
defer s.lock.Unlock()
if !s.initialized {
s.initialized = true
sasl := eng.Transport().SASL()
if s.name != "" {
sasl.ConfigName(saslConfig.name)
}
if s.dir != "" {
sasl.ConfigPath(saslConfig.dir)
}
}
}
var saslConfig saslConfigState
// GlobalSASLConfigDir sets the SASL configuration directory for every
// Connection created in this process. If not called, the default is determined
// by your SASL installation.
//
// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
//
// Must be called at most once, before any connections are created.
func GlobalSASLConfigDir(dir string) { saslConfig.set(&saslConfig.dir, dir) }
// GlobalSASLConfigName sets the SASL configuration name for every Connection
// created in this process. If not called the default is "proton-server".
//
// The complete configuration file name is
// <sasl-config-dir>/<sasl-config-name>.conf
//
// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
//
// Must be called at most once, before any connections are created.
func GlobalSASLConfigName(name string) { saslConfig.set(&saslConfig.name, name) }
// Do we support extended SASL negotiation?
// All implementations of Proton support ANONYMOUS and EXTERNAL on both
// client and server sides and PLAIN on the client side.
//
// Extended SASL implememtations use an external library (Cyrus SASL)
// to support other mechanisms beyond these basic ones.
func SASLExtended() bool { return proton.SASLExtended() }
// Dial is shorthand for using net.Dial() then NewConnection()
// See net.Dial() for the meaning of the network, address arguments.
func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
conn, err := net.Dial(network, address)
if err == nil {
c, err = NewConnection(conn, opts...)
}
return
}
// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
// See net.Dial() for the meaning of the network, address arguments.
func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error) {
conn, err := dialer.Dial(network, address)
if err == nil {
c, err = NewConnection(conn, opts...)
}
return
}