blob: c0f009356f393682189582768c7d494d92fcc2b8 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package proton
import (
"fmt"
"net"
"os"
"strings"
"sync"
"time"
"unsafe"
)
/*
#include <proton/connection.h>
#include <proton/event.h>
#include <proton/error.h>
#include <proton/handlers.h>
#include <proton/session.h>
#include <proton/transport.h>
#include <memory.h>
#include <stdlib.h>
*/
import "C"
// Injecter allows functions to be "injected" into the event-processing loop, to
// be called in the same goroutine as event handlers.
type Injecter interface {
// Inject a function into the engine goroutine.
//
// f() will be called in the same goroutine as event handlers, so it can safely
// use values belonging to event handlers without synchronization. f() should
// not block, no further events or injected functions can be processed until
// f() returns.
//
// Returns a non-nil error if the function could not be injected and will
// never be called. Otherwise the function will eventually be called.
//
// Note that proton values (Link, Session, Connection etc.) that existed when
// Inject(f) was called may have become invalid by the time f() is executed.
// Handlers should handle keep track of Closed events to ensure proton values
// are not used after they become invalid. One technique is to have map from
// proton values to application values. Check that the map has the correct
// proton/application value pair at the start of the injected function and
// delete the value from the map when handling a Closed event.
Inject(f func()) error
// InjectWait is like Inject but does not return till f() has completed.
// If f() cannot be injected it returns the error from Inject(), otherwise
// it returns the error from f()
InjectWait(f func() error) error
}
// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
// Handler functions sequentially in a single goroutine. Actions taken by
// Handler functions (such as sending messages) are encoded and written to the
// net.Conn. You can create multiple Engines to handle multiple connections
// concurrently.
//
// You implement the EventHandler and/or MessagingHandler interfaces and provide
// those values to NewEngine(). Their HandleEvent method will be called in the
// event-handling goroutine.
//
// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
// other goroutines, store them, or use them as map indexes. Effectively they are
// just pointers. Other goroutines cannot call their methods directly but they can
// can create a function closure to call such methods and pass it to Engine.Inject()
// to have it evaluated in the engine goroutine.
//
// You are responsible for ensuring you don't use an event value after it is
// invalid. The handler methods will tell you when a value is no longer valid. For
// example after a LinkClosed event, that link is no longer valid. If you do
// Link.Close() yourself (in a handler or injected function) the link remains valid
// until the corresponing LinkClosed event is received by the handler.
//
// Engine.Close() will take care of cleaning up any remaining values when you are
// done with the Engine. All values associated with a engine become invalid when you
// call Engine.Close()
//
// The qpid.apache.org/proton/concurrent package will do all this for you, so it
// may be a better choice for some applications.
//
type Engine struct {
// Error is set on exit from Run() if there was an error.
err ErrorHolder
inject chan func()
conn net.Conn
connection Connection
transport Transport
collector *C.pn_collector_t
handlers []EventHandler // Handlers for proton events.
running chan struct{} // This channel will be closed when the goroutines are done.
closeOnce sync.Once
timer *time.Timer
traceEvent bool
}
const bufferSize = 4096
func envBool(name string) bool {
v := strings.ToLower(os.Getenv(name))
return v == "true" || v == "1" || v == "yes" || v == "on"
}
// Create a new Engine and call Initialize() with conn and handlers
func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
eng := &Engine{}
return eng, eng.Initialize(conn, handlers...)
}
// Initialize an Engine with a connection and handlers. Start it with Run()
func (eng *Engine) Initialize(conn net.Conn, handlers ...EventHandler) error {
eng.inject = make(chan func())
eng.conn = conn
eng.connection = Connection{C.pn_connection()}
eng.transport = Transport{C.pn_transport()}
eng.collector = C.pn_collector()
eng.handlers = handlers
eng.running = make(chan struct{})
eng.timer = time.NewTimer(0)
eng.traceEvent = envBool("PN_TRACE_EVT")
if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil {
eng.free()
return fmt.Errorf("proton.NewEngine cannot allocate")
}
C.pn_connection_collect(eng.connection.pn, eng.collector)
return nil
}
// Create a byte slice backed by C memory.
// Empty or error (size <= 0) returns a nil byte slice.
func cByteSlice(start unsafe.Pointer, size int) []byte {
if start == nil || size <= 0 {
return nil
} else {
// Slice from very large imaginary array in C memory
return (*[1 << 30]byte)(start)[:size:size]
}
}
func (eng *Engine) Connection() Connection {
return eng.connection
}
func (eng *Engine) Transport() Transport {
return eng.transport
}
func (eng *Engine) String() string {
return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), eng.conn.RemoteAddr())
}
func (eng *Engine) Id() string {
// Use transport address to match default PN_TRACE_FRM=1 output.
return fmt.Sprintf("%p", eng.Transport().CPtr())
}
func (eng *Engine) Error() error {
return eng.err.Get()
}
// Inject a function into the Engine's event loop.
//
// f() will be called in the same event-processing goroutine that calls Handler
// methods. f() can safely call methods on values that belong to this engine
// (Sessions, Links etc)
//
// The injected function has no parameters or return values. It is normally a
// closure and can use channels to communicate with the injecting goroutine if
// necessary.
//
// Returns a non-nil error if the engine is closed before the function could be
// injected.
func (eng *Engine) Inject(f func()) error {
select {
case eng.inject <- f:
return nil
case <-eng.running:
return eng.Error()
}
}
// InjectWait is like Inject but does not return till f() has completed or the
// engine is closed, and returns an error value from f()
func (eng *Engine) InjectWait(f func() error) error {
done := make(chan error)
defer close(done)
err := eng.Inject(func() { done <- f() })
if err != nil {
return err
}
select {
case <-eng.running:
return eng.Error()
case err := <-done:
return err
}
}
// Server puts the Engine in server mode, meaning it will auto-detect security settings on
// the incoming connnection such as use of SASL and SSL.
// Must be called before Run()
//
func (eng *Engine) Server() { eng.Transport().SetServer() }
func (eng *Engine) disconnect(err error) {
cond := eng.Transport().Condition()
cond.SetError(err) // Set the provided error.
cond.SetError(eng.conn.Close()) // Use connection error if cond is not already set.
eng.transport.CloseTail()
eng.transport.CloseHead()
}
// Close the engine's connection.
// If err != nil pass it to the remote end as the close condition.
// Returns when the remote end closes or disconnects.
func (eng *Engine) Close(err error) {
_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
<-eng.running
}
// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout.
func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
select {
case <-eng.running:
case <-time.After(timeout):
eng.Disconnect(err)
}
}
// Disconnect the engine's connection immediately without an AMQP close.
// Process any termination events before returning.
func (eng *Engine) Disconnect(err error) {
_ = eng.Inject(func() { eng.disconnect(err) })
<-eng.running
}
// Let proton run timed activity and set up the next tick
func (eng *Engine) tick() {
now := time.Now()
next := eng.Transport().Tick(now)
if !next.IsZero() {
eng.timer.Reset(next.Sub(now))
}
}
func (eng *Engine) dispatch() bool {
for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) {
e := makeEvent(ce, eng)
if eng.traceEvent {
eng.transport.Log(e.String())
}
for _, h := range eng.handlers {
h.HandleEvent(e)
}
if e.Type() == EConnectionRemoteOpen {
eng.tick() // Update the tick if changed by remote.
}
C.pn_collector_pop(eng.collector)
}
return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil
}
func (eng *Engine) writeBuffer() []byte {
size := eng.Transport().Pending() // Evaluate before Head(), may change buffer.
start := eng.Transport().Head()
return cByteSlice(start, size)
}
func (eng *Engine) readBuffer() []byte {
size := eng.Transport().Capacity()
start := eng.Transport().Tail()
return cByteSlice(start, size)
}
func (eng *Engine) free() {
if !eng.transport.IsNil() {
eng.transport.Unbind()
eng.transport.Free()
eng.transport = Transport{}
}
if !eng.connection.IsNil() {
eng.connection.Free()
eng.connection = Connection{}
}
if eng.collector != nil {
C.pn_collector_release(eng.collector)
C.pn_collector_free(eng.collector)
eng.collector = nil
}
}
// Run the engine. Engine.Run() will exit when the engine is closed or
// disconnected. You can check for errors after exit with Engine.Error().
//
func (eng *Engine) Run() error {
defer eng.free()
eng.transport.Bind(eng.connection)
eng.tick() // Start ticking if needed
// Channels for read and write buffers going in and out of the read/write goroutines.
// The channels are unbuffered: we want to exchange buffers in seuquence.
readsIn, writesIn := make(chan []byte), make(chan []byte)
readsOut, writesOut := make(chan []byte), make(chan []byte)
wait := sync.WaitGroup{}
wait.Add(2) // Read and write goroutines
go func() { // Read goroutine
defer wait.Done()
for {
rbuf, ok := <-readsIn
if !ok {
return
}
n, err := eng.conn.Read(rbuf)
if n > 0 {
readsOut <- rbuf[:n]
} else if err != nil {
_ = eng.Inject(func() {
eng.Transport().Condition().SetError(err)
eng.Transport().CloseTail()
})
return
}
}
}()
go func() { // Write goroutine
defer wait.Done()
for {
wbuf, ok := <-writesIn
if !ok {
return
}
n, err := eng.conn.Write(wbuf)
if n > 0 {
writesOut <- wbuf[:n]
} else if err != nil {
_ = eng.Inject(func() {
eng.Transport().Condition().SetError(err)
eng.Transport().CloseHead()
})
return
}
}
}()
for eng.dispatch() {
readBuf := eng.readBuffer()
writeBuf := eng.writeBuffer()
// Note that getting the buffers can generate events (eg. SASL events) that
// might close the transport. Check if we are already finished before
// blocking for IO.
if !eng.dispatch() {
break
}
// sendReads/sendWrites are nil (not sendable in select) unless we have a
// buffer to read/write
var sendReads, sendWrites chan []byte
if readBuf != nil {
sendReads = readsIn
}
if writeBuf != nil {
sendWrites = writesIn
}
// Send buffers to the read/write goroutines if we have them.
// Get buffers from the read/write goroutines and process them
// Check for injected functions
select {
case sendReads <- readBuf:
case sendWrites <- writeBuf:
case buf := <-readsOut:
eng.transport.Process(uint(len(buf)))
case buf := <-writesOut:
eng.transport.Pop(uint(len(buf)))
case f, ok := <-eng.inject: // Function injected from another goroutine
if ok {
f()
}
case <-eng.timer.C:
eng.tick()
}
}
eng.err.Set(EndpointError(eng.Connection()))
eng.err.Set(eng.Transport().Condition().Error())
close(readsIn)
close(writesIn)
close(eng.running) // Signal goroutines have exited and Error is set, disable Inject()
_ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject)
wait.Wait() // Wait for goroutines
return eng.err.Get()
}