blob: 34b099304fab970014ef3a1822d48d92b8cd6ae0 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <proton/messenger.h>
#include <proton/driver.h>
#include <proton/util.h>
#include <proton/ssl.h>
#include <proton/object.h>
#include <assert.h>
#include <ctype.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include "../util.h"
#include "../platform.h"
#include "../platform_fmt.h"
#include "store.h"
#include "transform.h"
typedef struct {
pn_string_t *text;
bool passive;
char *scheme;
char *user;
char *pass;
char *host;
char *port;
char *name;
} pn_address_t;
struct pn_messenger_t {
char *name;
char *certificate;
char *private_key;
char *password;
char *trusted_certificates;
int timeout;
bool blocking;
pn_driver_t *driver;
int send_threshold;
int receiving;
int credit_batch;
int credit;
int distributed;
uint64_t next_tag;
pni_store_t *outgoing;
pni_store_t *incoming;
pn_subscription_t *subscriptions;
size_t sub_capacity;
size_t sub_count;
pn_subscription_t *incoming_subscription;
pn_error_t *error;
pn_transform_t *routes;
pn_transform_t *rewrites;
pn_address_t address;
pn_tracker_t outgoing_tracker;
pn_tracker_t incoming_tracker;
pn_string_t *original;
pn_string_t *rewritten;
bool worked;
};
struct pn_subscription_t {
char *scheme;
void *context;
};
typedef struct {
char *host;
char *port;
pn_subscription_t *subscription;
} pn_listener_ctx_t;
pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *scheme);
static pn_listener_ctx_t *pn_listener_ctx(pn_listener_t *lnr,
pn_messenger_t *messenger,
const char *scheme,
const char *host,
const char *port)
{
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
assert(!ctx);
ctx = (pn_listener_ctx_t *) malloc(sizeof(pn_listener_ctx_t));
pn_subscription_t *sub = pn_subscription(messenger, scheme);
ctx->subscription = sub;
ctx->host = pn_strdup(host);
ctx->port = pn_strdup(port);
pn_listener_set_context(lnr, ctx);
return ctx;
}
static void pn_listener_ctx_free(pn_listener_t *lnr)
{
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
// XXX: subscriptions are freed when the messenger is freed pn_subscription_free(ctx->subscription);
free(ctx->host);
free(ctx->port);
free(ctx);
pn_listener_set_context(lnr, NULL);
}
typedef struct {
char *address;
char *scheme;
char *user;
char *pass;
char *host;
char *port;
} pn_connection_ctx_t;
static pn_connection_ctx_t *pn_connection_ctx(pn_connection_t *conn,
const char *scheme,
const char *user,
const char *pass,
const char *host,
const char *port)
{
pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(conn);
assert(!ctx);
ctx = (pn_connection_ctx_t *) malloc(sizeof(pn_connection_ctx_t));
ctx->scheme = pn_strdup(scheme);
ctx->user = pn_strdup(user);
ctx->pass = pn_strdup(pass);
ctx->host = pn_strdup(host);
ctx->port = pn_strdup(port);
pn_connection_set_context(conn, ctx);
return ctx;
}
static void pn_connection_ctx_free(pn_connection_t *conn)
{
pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(conn);
free(ctx->scheme);
free(ctx->user);
free(ctx->pass);
free(ctx->host);
free(ctx->port);
free(ctx);
pn_connection_set_context(conn, NULL);
}
#define OUTGOING (0x0000000000000000)
#define INCOMING (0x1000000000000000)
#define pn_tracker(direction, sequence) ((direction) | (sequence))
#define pn_tracker_direction(tracker) ((tracker) & (0x1000000000000000))
#define pn_tracker_sequence(tracker) ((pn_sequence_t) ((tracker) & (0x00000000FFFFFFFF)))
static char *build_name(const char *name)
{
if (name) {
return pn_strdup(name);
} else {
return pn_i_genuuid();
}
}
pn_messenger_t *pn_messenger(const char *name)
{
pn_messenger_t *m = (pn_messenger_t *) malloc(sizeof(pn_messenger_t));
if (m) {
m->name = build_name(name);
m->certificate = NULL;
m->private_key = NULL;
m->password = NULL;
m->trusted_certificates = NULL;
m->timeout = -1;
m->blocking = true;
m->driver = pn_driver();
m->receiving = 0;
m->credit_batch = 1024;
m->credit = 0;
m->distributed = 0;
m->next_tag = 0;
m->outgoing = pni_store();
m->incoming = pni_store();
m->subscriptions = NULL;
m->sub_capacity = 0;
m->sub_count = 0;
m->incoming_subscription = NULL;
m->error = pn_error();
m->routes = pn_transform();
m->rewrites = pn_transform();
m->outgoing_tracker = 0;
m->incoming_tracker = 0;
m->address.text = pn_string(NULL);
m->original = pn_string(NULL);
m->rewritten = pn_string(NULL);
}
return m;
}
const char *pn_messenger_name(pn_messenger_t *messenger)
{
return messenger->name;
}
int pn_messenger_set_certificate(pn_messenger_t *messenger, const char *certificate)
{
if (messenger->certificate) free(messenger->certificate);
messenger->certificate = pn_strdup(certificate);
return 0;
}
const char *pn_messenger_get_certificate(pn_messenger_t *messenger)
{
return messenger->certificate;
}
int pn_messenger_set_private_key(pn_messenger_t *messenger, const char *private_key)
{
if (messenger->private_key) free(messenger->private_key);
messenger->private_key = pn_strdup(private_key);
return 0;
}
const char *pn_messenger_get_private_key(pn_messenger_t *messenger)
{
return messenger->private_key;
}
int pn_messenger_set_password(pn_messenger_t *messenger, const char *password)
{
if (messenger->password) free(messenger->password);
messenger->password = pn_strdup(password);
return 0;
}
const char *pn_messenger_get_password(pn_messenger_t *messenger)
{
return messenger->password;
}
int pn_messenger_set_trusted_certificates(pn_messenger_t *messenger, const char *trusted_certificates)
{
if (messenger->trusted_certificates) free(messenger->trusted_certificates);
messenger->trusted_certificates = pn_strdup(trusted_certificates);
return 0;
}
const char *pn_messenger_get_trusted_certificates(pn_messenger_t *messenger)
{
return messenger->trusted_certificates;
}
int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
{
if (!messenger) return PN_ARG_ERR;
messenger->timeout = timeout;
return 0;
}
int pn_messenger_get_timeout(pn_messenger_t *messenger)
{
return messenger ? messenger->timeout : 0;
}
bool pn_messenger_is_blocking(pn_messenger_t *messenger)
{
assert(messenger);
return messenger->blocking;
}
int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking)
{
messenger->blocking = blocking;
return 0;
}
static void pni_driver_reclaim(pn_driver_t *driver)
{
pn_listener_t *l = pn_listener_head(driver);
while (l) {
pn_listener_ctx_free(l);
l = pn_listener_next(l);
}
}
void pn_messenger_free(pn_messenger_t *messenger)
{
if (messenger) {
pn_free(messenger->rewritten);
pn_free(messenger->original);
pn_free(messenger->address.text);
free(messenger->name);
free(messenger->certificate);
free(messenger->private_key);
free(messenger->password);
free(messenger->trusted_certificates);
pni_driver_reclaim(messenger->driver);
pn_driver_free(messenger->driver);
pn_error_free(messenger->error);
pni_store_free(messenger->incoming);
pni_store_free(messenger->outgoing);
for (unsigned i = 0; i < messenger->sub_count; i++) {
free(messenger->subscriptions[i].scheme);
}
free(messenger->subscriptions);
pn_free(messenger->rewrites);
pn_free(messenger->routes);
free(messenger);
}
}
int pn_messenger_errno(pn_messenger_t *messenger)
{
if (messenger) {
return pn_error_code(messenger->error);
} else {
return PN_ARG_ERR;
}
}
pn_error_t *pn_messenger_error(pn_messenger_t *messenger)
{
assert(messenger);
return messenger->error;
}
void pn_messenger_flow(pn_messenger_t *messenger)
{
int link_ct = 0;
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
if (pn_link_is_receiver(link)) link_ct++;
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
ctor = pn_connector_next(ctor);
}
if (link_ct == 0) return;
if (messenger->receiving == -1) {
messenger->credit = link_ct * messenger->credit_batch - pn_messenger_incoming(messenger);
} else {
int total = messenger->credit + messenger->distributed;
if (messenger->receiving > total)
messenger->credit += (messenger->receiving - total);
}
int batch = (messenger->credit < link_ct) ? 1
: (messenger->credit/link_ct);
ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
if (pn_link_is_receiver(link)) {
int have = pn_link_credit(link);
if (have < batch) {
int need = batch - have;
int amount = (messenger->credit < need) ? messenger->credit : need;
pn_link_flow(link, amount);
messenger->distributed += amount;
messenger->credit -= amount;
if (messenger->credit == 0) return;
}
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
ctor = pn_connector_next(ctor);
}
}
static void pn_transport_config(pn_messenger_t *messenger,
pn_connector_t *connector,
pn_connection_t *connection)
{
pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
pn_transport_t *transport = pn_connector_transport(connector);
if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) {
pn_ssl_domain_t *d = pn_ssl_domain( PN_SSL_MODE_CLIENT );
if (messenger->certificate && messenger->private_key) {
pn_ssl_domain_set_credentials( d, messenger->certificate,
messenger->private_key,
messenger->password);
}
if (messenger->trusted_certificates) {
pn_ssl_domain_set_trusted_ca_db(d, messenger->trusted_certificates);
pn_ssl_domain_set_peer_authentication(d, PN_SSL_VERIFY_PEER, NULL);
} else {
pn_ssl_domain_set_peer_authentication(d, PN_SSL_ANONYMOUS_PEER, NULL);
}
pn_ssl_t *ssl = pn_ssl(transport);
pn_ssl_init(ssl, d, NULL);
pn_ssl_set_peer_hostname(ssl, pn_connection_get_hostname(connection));
pn_ssl_domain_free( d );
}
pn_sasl_t *sasl = pn_sasl(transport);
if (ctx->user) {
pn_sasl_plain(sasl, ctx->user, ctx->pass);
} else {
pn_sasl_mechanisms(sasl, "ANONYMOUS");
pn_sasl_client(sasl);
}
}
static void pn_error_report(const char *pfx, const char *error)
{
fprintf(stderr, "%s ERROR %s\n", pfx, error);
}
static void pn_condition_report(const char *pfx, pn_condition_t *condition)
{
if (pn_condition_is_redirect(condition)) {
fprintf(stderr, "%s NOTICE (%s) redirecting to %s:%i\n",
pfx,
pn_condition_get_name(condition),
pn_condition_redirect_host(condition),
pn_condition_redirect_port(condition));
} else if (pn_condition_is_set(condition)) {
char error[1024];
snprintf(error, 1024, "(%s) %s",
pn_condition_get_name(condition),
pn_condition_get_description(condition));
pn_error_report(pfx, error);
}
}
int pni_pump_in(pn_messenger_t *messenger, const char *address, pn_link_t *receiver)
{
pn_delivery_t *d = pn_link_current(receiver);
if (!pn_delivery_readable(d) && !pn_delivery_partial(d)) {
return 0;
}
pni_entry_t *entry = pni_store_put(messenger->incoming, address);
pn_buffer_t *buf = pni_entry_bytes(entry);
pni_entry_set_delivery(entry, d);
pn_subscription_t *sub = (pn_subscription_t *) pn_link_get_context(receiver);
pni_entry_set_context(entry, sub);
size_t pending = pn_delivery_pending(d);
int err = pn_buffer_ensure(buf, pending + 1);
if (err) return pn_error_format(messenger->error, err, "get: error growing buffer");
char *encoded = pn_buffer_bytes(buf).start;
ssize_t n = pn_link_recv(receiver, encoded, pending);
if (n != (ssize_t) pending) {
return pn_error_format(messenger->error, n,
"didn't receive pending bytes: %" PN_ZI " %" PN_ZI,
n, pending);
}
n = pn_link_recv(receiver, encoded + pending, 1);
pn_link_advance(receiver);
if (n != PN_EOS) {
return pn_error_format(messenger->error, n, "PN_EOS expected");
}
pn_buffer_append(buf, encoded, pending); // XXX
return 0;
}
int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender);
void pn_messenger_endpoints(pn_messenger_t *messenger, pn_connection_t *conn, pn_connector_t *ctor)
{
if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
pn_connection_open(conn);
}
pn_delivery_t *d = pn_work_head(conn);
while (d) {
pn_link_t *link = pn_delivery_link(d);
if (pn_delivery_updated(d)) {
if (pn_link_is_sender(link)) {
pn_delivery_update(d, pn_delivery_remote_state(d));
}
pni_entry_t *e = (pni_entry_t *) pn_delivery_get_context(d);
if (e) pni_entry_updated(e);
}
pn_delivery_clear(d);
if (pn_delivery_readable(d)) {
int err = pni_pump_in(messenger, pn_terminus_get_address(pn_link_source(link)), link);
if (err) {
fprintf(stderr, "%s\n", pn_error_text(messenger->error));
}
}
d = pn_work_next(d);
}
if (pn_work_head(conn)) {
return;
}
pn_session_t *ssn = pn_session_head(conn, PN_LOCAL_UNINIT);
while (ssn) {
pn_session_open(ssn);
ssn = pn_session_next(ssn, PN_LOCAL_UNINIT);
}
pn_link_t *link = pn_link_head(conn, PN_LOCAL_UNINIT);
while (link) {
pn_terminus_copy(pn_link_source(link), pn_link_remote_source(link));
pn_terminus_copy(pn_link_target(link), pn_link_remote_target(link));
pn_link_open(link);
if (pn_link_is_receiver(link)) {
pn_listener_t *listener = pn_connector_listener(ctor);
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(listener);
pn_link_set_context(link, ctx ? ctx->subscription : NULL);
}
link = pn_link_next(link, PN_LOCAL_UNINIT);
}
link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
while (link) {
if (pn_link_is_sender(link)) {
pni_pump_out(messenger, pn_terminus_get_address(pn_link_target(link)), link);
}
link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE);
}
pn_messenger_flow(messenger);
ssn = pn_session_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (ssn) {
pn_condition_report("SESSION", pn_session_remote_condition(ssn));
pn_session_close(ssn);
ssn = pn_session_next(ssn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (link) {
pn_condition_report("LINK", pn_link_remote_condition(link));
pn_link_close(link);
// XXX: should free link
link = pn_link_next(link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
pn_condition_t *condition = pn_connection_remote_condition(conn);
pn_condition_report("CONNECTION", condition);
pn_connection_close(conn);
if (pn_condition_is_redirect(condition)) {
const char *host = pn_condition_redirect_host(condition);
char buf[1024];
sprintf(buf, "%i", pn_condition_redirect_port(condition));
pn_connector_process(ctor);
pn_connector_set_connection(ctor, NULL);
pn_driver_t *driver = messenger->driver;
pn_connector_t *connector = pn_connector(driver, host, buf, NULL);
pn_transport_unbind(pn_connector_transport(ctor));
pn_connection_reset(conn);
pn_transport_config(messenger, connector, conn);
pn_connector_set_connection(connector, conn);
}
} else if (pn_connector_closed(ctor) && !(pn_connection_state(conn) & PN_REMOTE_CLOSED)) {
pn_error_report("CONNECTION", "connection aborted");
}
}
void pn_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn)
{
pn_link_t *link = pn_link_head(conn, 0);
while (link) {
if (pn_link_is_receiver(link) && pn_link_credit(link) > 0) {
int credit = pn_link_credit(link);
messenger->credit += credit;
messenger->distributed -= credit;
}
pn_delivery_t *d = pn_unsettled_head(link);
while (d) {
pni_entry_t *e = (pni_entry_t *) pn_delivery_get_context(d);
if (e) {
pni_entry_set_delivery(e, NULL);
}
d = pn_unsettled_next(d);
}
link = pn_link_next(link, 0);
}
}
pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
char *scheme,
char *user,
char *pass,
char *host,
char *port)
{
pn_connection_t *connection = pn_connection();
if (!connection) return NULL;
pn_connection_ctx(connection, scheme, user, pass, host, port);
pn_connection_set_container(connection, messenger->name);
pn_connection_set_hostname(connection, host);
return connection;
}
int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
{
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_messenger_endpoints(messenger, conn, ctor);
pn_connector_process(ctor);
ctor = pn_connector_next(ctor);
}
pn_timestamp_t now = pn_i_now();
long int deadline = now + timeout;
bool pred;
while (true) {
pred = predicate(messenger);
int remaining = deadline - now;
if (pred || (timeout >= 0 && remaining < 0)) break;
int error = pn_driver_wait(messenger->driver, remaining);
if (error && error != PN_INTR) return error;
pn_listener_t *l;
while ((l = pn_driver_listener(messenger->driver))) {
messenger->worked = true;
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(l);
pn_subscription_t *sub = ctx->subscription;
char *scheme = sub->scheme;
pn_connector_t *c = pn_listener_accept(l);
pn_transport_t *t = pn_connector_transport(c);
pn_ssl_domain_t *d = pn_ssl_domain( PN_SSL_MODE_SERVER );
if (messenger->certificate) {
pn_ssl_domain_set_credentials(d, messenger->certificate,
messenger->private_key,
messenger->password);
}
if (!(scheme && !strcmp(scheme, "amqps"))) {
pn_ssl_domain_allow_unsecured_client(d);
}
pn_ssl_t *ssl = pn_ssl(t);
pn_ssl_init(ssl, d, NULL);
pn_ssl_domain_free( d );
pn_sasl_t *sasl = pn_sasl(t);
pn_sasl_mechanisms(sasl, "ANONYMOUS");
pn_sasl_server(sasl);
pn_sasl_done(sasl, PN_SASL_OK);
pn_connection_t *conn =
pn_messenger_connection(messenger, scheme, NULL, NULL, NULL, NULL);
pn_connector_set_connection(c, conn);
}
pn_connector_t *c;
while ((c = pn_driver_connector(messenger->driver))) {
messenger->worked = true;
pn_connector_process(c);
pn_connection_t *conn = pn_connector_connection(c);
pn_messenger_endpoints(messenger, conn, c);
if (pn_connector_closed(c)) {
pn_connector_free(c);
if (conn) {
pn_messenger_reclaim(messenger, conn);
pn_connection_ctx_free(conn);
pn_connection_free(conn);
pn_messenger_flow(messenger);
}
} else {
pn_connector_process(c);
}
}
if (timeout >= 0) {
now = pn_i_now();
}
if (error == PN_INTR) {
return pred ? 0 : PN_INTR;
}
}
return pred ? 0 : PN_TIMEOUT;
}
int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
{
if (messenger->blocking) {
return pn_messenger_tsync(messenger, predicate, messenger->timeout);
} else {
int err = pn_messenger_tsync(messenger, predicate, 0);
if (err == PN_TIMEOUT) {
return PN_INPROGRESS;
} else {
return err;
}
}
}
int pn_messenger_start(pn_messenger_t *messenger)
{
if (!messenger) return PN_ARG_ERR;
// right now this is a noop
return 0;
}
bool pn_messenger_stopped(pn_messenger_t *messenger)
{
return pn_connector_head(messenger->driver) == NULL;
}
int pn_messenger_stop(pn_messenger_t *messenger)
{
if (!messenger) return PN_ARG_ERR;
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
pn_link_close(link);
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
pn_connection_close(conn);
ctor = pn_connector_next(ctor);
}
pn_listener_t *l = pn_listener_head(messenger->driver);
while (l) {
pn_listener_close(l);
pn_listener_t *prev = l;
l = pn_listener_next(l);
pn_listener_ctx_free(prev);
pn_listener_free(prev);
}
return pn_messenger_sync(messenger, pn_messenger_stopped);
}
static bool pn_streq(const char *a, const char *b)
{
return a == b || (a && b && !strcmp(a, b));
}
static const char *default_port(const char *scheme)
{
if (scheme && pn_streq(scheme, "amqps"))
return "5671";
else
return "5672";
}
static void pni_parse(pn_address_t *address)
{
address->passive = false;
address->scheme = NULL;
address->user = NULL;
address->pass = NULL;
address->host = NULL;
address->port = NULL;
address->name = NULL;
parse_url(pn_string_buffer(address->text), &address->scheme, &address->user,
&address->pass, &address->host, &address->port, &address->name);
if (address->host[0] == '~') {
address->passive = true;
address->host++;
}
}
static int pni_route(pn_messenger_t *messenger, const char *address)
{
pn_address_t *addr = &messenger->address;
int err = pn_transform_apply(messenger->routes, address, addr->text);
if (err) return pn_error_format(messenger->error, PN_ERR,
"transformation error");
pni_parse(addr);
return 0;
}
pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *address, char **name)
{
char domain[1024];
if (address && sizeof(domain) < strlen(address) + 1) {
pn_error_format(messenger->error, PN_ERR,
"address exceeded maximum length: %s", address);
return NULL;
}
int err = pni_route(messenger, address);
if (err) return NULL;
bool passive = messenger->address.passive;
char *scheme = messenger->address.scheme;
char *user = messenger->address.user;
char *pass = messenger->address.pass;
char *host = messenger->address.host;
char *port = messenger->address.port;
*name = messenger->address.name;
if (passive) {
pn_listener_t *lnr = pn_listener_head(messenger->driver);
while (lnr) {
pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
if (pn_streq(host, ctx->host) && pn_streq(port, ctx->port)) {
return NULL;
}
lnr = pn_listener_next(lnr);
}
lnr = pn_listener(messenger->driver, host, port ? port : default_port(scheme), NULL);
if (lnr) {
pn_listener_ctx(lnr, messenger, scheme, host, port);
} else {
pn_error_format(messenger->error, PN_ERR,
"unable to bind to address %s: %s:%s", address, host, port,
pn_driver_error(messenger->driver));
}
return NULL;
}
domain[0] = '\0';
if (user) {
strcat(domain, user);
strcat(domain, "@");
}
strcat(domain, host);
if (port) {
strcat(domain, ":");
strcat(domain, port);
}
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *connection = pn_connector_connection(ctor);
pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
if (pn_streq(scheme, ctx->scheme) && pn_streq(user, ctx->user) &&
pn_streq(pass, ctx->pass) && pn_streq(host, ctx->host) &&
pn_streq(port, ctx->port)) {
return connection;
}
const char *container = pn_connection_remote_container(connection);
if (pn_streq(container, domain)) {
return connection;
}
ctor = pn_connector_next(ctor);
}
pn_connector_t *connector = pn_connector(messenger->driver, host,
port ? port : default_port(scheme),
NULL);
if (!connector) {
pn_error_format(messenger->error, PN_ERR,
"unable to connect to %s: %s", address,
pn_driver_error(messenger->driver));
return NULL;
}
pn_connection_t *connection =
pn_messenger_connection(messenger, scheme, user, pass, host, port);
pn_transport_config(messenger, connector, connection);
pn_connection_open(connection);
pn_connector_set_connection(connector, connection);
return connection;
}
pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *scheme)
{
PN_ENSURE(messenger->subscriptions, messenger->sub_capacity, messenger->sub_count + 1, pn_subscription_t);
pn_subscription_t *sub = messenger->subscriptions + messenger->sub_count++;
sub->scheme = pn_strdup(scheme);
sub->context = NULL;
return sub;
}
void *pn_subscription_get_context(pn_subscription_t *sub)
{
assert(sub);
return sub->context;
}
void pn_subscription_set_context(pn_subscription_t *sub, void *context)
{
assert(sub);
sub->context = context;
}
pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender)
{
char *name = NULL;
pn_connection_t *connection = pn_messenger_resolve(messenger, address, &name);
if (!connection) return NULL;
pn_link_t *link = pn_link_head(connection, PN_LOCAL_ACTIVE);
while (link) {
if (pn_link_is_sender(link) == sender) {
const char *terminus = pn_link_is_sender(link) ?
pn_terminus_get_address(pn_link_target(link)) :
pn_terminus_get_address(pn_link_source(link));
if (pn_streq(name, terminus))
return link;
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
pn_session_t *ssn = pn_session(connection);
pn_session_open(ssn);
link = sender ? pn_sender(ssn, "sender-xxx") : pn_receiver(ssn, "receiver-xxx");
if ((sender && pn_messenger_get_outgoing_window(messenger)) ||
(!sender && pn_messenger_get_incoming_window(messenger))) {
// use explicit settlement via dispositions (not pre-settled)
pn_link_set_snd_settle_mode( link, PN_SND_UNSETTLED );
pn_link_set_rcv_settle_mode( link, PN_RCV_SECOND );
}
// XXX
pn_terminus_set_address(pn_link_target(link), name);
pn_terminus_set_address(pn_link_source(link), name);
if (!sender) {
pn_subscription_t *sub = pn_subscription(messenger, NULL);
pn_link_set_context(link, sub);
}
pn_link_open(link);
return link;
}
pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source)
{
return pn_messenger_link(messenger, source, false);
}
pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target)
{
return pn_messenger_link(messenger, target, true);
}
pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source)
{
pni_route(messenger, source);
if (pn_error_code(messenger->error)) return NULL;
bool passive = messenger->address.passive;
char *scheme = messenger->address.scheme;
char *host = messenger->address.host;
char *port = messenger->address.port;
if (passive) {
pn_listener_t *lnr = pn_listener(messenger->driver, host,
port ? port : default_port(scheme), NULL);
if (lnr) {
pn_listener_ctx_t *ctx = pn_listener_ctx(lnr, messenger, scheme, host, port);
return ctx->subscription;
} else {
pn_error_format(messenger->error, PN_ERR,
"unable to subscribe to address %s: %s", source,
pn_driver_error(messenger->driver));
return NULL;
}
} else {
pn_link_t *src = pn_messenger_source(messenger, source);
if (!src) return NULL;
pn_subscription_t *sub = (pn_subscription_t *) pn_link_get_context(src);
return sub;
}
}
int pn_messenger_get_outgoing_window(pn_messenger_t *messenger)
{
return pni_store_get_window(messenger->outgoing);
}
int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window)
{
pni_store_set_window(messenger->outgoing, window);
return 0;
}
int pn_messenger_get_incoming_window(pn_messenger_t *messenger)
{
return pni_store_get_window(messenger->incoming);
}
int pn_messenger_set_incoming_window(pn_messenger_t *messenger, int window)
{
pni_store_set_window(messenger->incoming, window);
return 0;
}
static void outward_munge(pn_messenger_t *mng, pn_message_t *msg)
{
char stackbuf[256];
char *heapbuf = NULL;
char *buf = stackbuf;
const char *address = pn_message_get_reply_to(msg);
int len = address ? strlen(address) : 0;
if (len > 1 && address[0] == '~' && address[1] == '/') {
unsigned needed = len + strlen(mng->name) + 9;
if (needed > sizeof(stackbuf)) {
heapbuf = (char *) malloc(needed);
buf = heapbuf;
}
sprintf(buf, "amqp://%s/%s", mng->name, address + 2);
pn_message_set_reply_to(msg, buf);
} else if (len == 0) {
unsigned needed = strlen(mng->name) + 8;
if (needed > sizeof(stackbuf)) {
heapbuf = (char *) malloc(needed);
buf = heapbuf;
}
sprintf(buf, "amqp://%s", mng->name);
pn_message_set_reply_to(msg, buf);
}
if (heapbuf) free (heapbuf);
}
int pni_pump_out(pn_messenger_t *messenger, const char *address, pn_link_t *sender)
{
pni_entry_t *entry = pni_store_get(messenger->outgoing, address);
if (!entry) return 0;
pn_buffer_t *buf = pni_entry_bytes(entry);
pn_bytes_t bytes = pn_buffer_bytes(buf);
char *encoded = bytes.start;
size_t size = bytes.size;
// XXX: proper tag
char tag[8];
void *ptr = &tag;
uint64_t next = messenger->next_tag++;
*((uint64_t *) ptr) = next;
pn_delivery_t *d = pn_delivery(sender, pn_dtag(tag, 8));
pni_entry_set_delivery(entry, d);
ssize_t n = pn_link_send(sender, encoded, size);
if (n < 0) {
pni_entry_free(entry);
return pn_error_format(messenger->error, n, "send error: %s",
pn_error_text(pn_link_error(sender)));
} else {
pn_link_advance(sender);
pni_entry_free(entry);
return 0;
}
}
static void pni_default_rewrite(pn_messenger_t *messenger, const char *address,
pn_string_t *dst)
{
pn_address_t *addr = &messenger->address;
if (address && strstr(address, "@")) {
int err = pn_string_set(addr->text, address);
if (err) assert(false);
pni_parse(addr);
if (addr->user || addr->pass)
{
pn_string_format(messenger->rewritten, "%s%s%s%s%s%s%s",
addr->scheme ? addr->scheme : "",
addr->scheme ? "://" : "",
addr->host,
addr->port ? ":" : "",
addr->port ? addr->port : "",
addr->name ? "/" : "",
addr->name ? addr->name : "");
}
}
}
static void pni_rewrite(pn_messenger_t *messenger, pn_message_t *msg)
{
const char *address = pn_message_get_address(msg);
pn_string_set(messenger->original, address);
int err = pn_transform_apply(messenger->rewrites, address,
messenger->rewritten);
if (err) assert(false);
if (!pn_transform_matched(messenger->rewrites)) {
pni_default_rewrite(messenger, pn_string_get(messenger->rewritten),
messenger->rewritten);
}
pn_message_set_address(msg, pn_string_get(messenger->rewritten));
}
static void pni_restore(pn_messenger_t *messenger, pn_message_t *msg)
{
pn_message_set_address(msg, pn_string_get(messenger->original));
}
int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
{
if (!messenger) return PN_ARG_ERR;
if (!msg) return pn_error_set(messenger->error, PN_ARG_ERR, "null message");
outward_munge(messenger, msg);
const char *address = pn_message_get_address(msg);
pni_entry_t *entry = pni_store_put(messenger->outgoing, address);
if (!entry)
return pn_error_format(messenger->error, PN_ERR, "store error");
messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_track(entry));
pn_buffer_t *buf = pni_entry_bytes(entry);
pni_rewrite(messenger, msg);
while (true) {
char *encoded = pn_buffer_bytes(buf).start;
size_t size = pn_buffer_capacity(buf);
int err = pn_message_encode(msg, encoded, &size);
if (err == PN_OVERFLOW) {
err = pn_buffer_ensure(buf, 2*pn_buffer_capacity(buf));
if (err) {
pni_entry_free(entry);
pni_restore(messenger, msg);
return pn_error_format(messenger->error, err, "put: error growing buffer");
}
} else if (err) {
pni_restore(messenger, msg);
return pn_error_format(messenger->error, err, "encode error: %s",
pn_message_error(msg));
} else {
pni_restore(messenger, msg);
pn_buffer_append(buf, encoded, size); // XXX
pn_link_t *sender = pn_messenger_target(messenger, address);
if (!sender) return 0;
return pni_pump_out(messenger, address, sender);
}
}
return PN_ERR;
}
pn_tracker_t pn_messenger_outgoing_tracker(pn_messenger_t *messenger)
{
return messenger->outgoing_tracker;
}
pni_store_t *pn_tracker_store(pn_messenger_t *messenger, pn_tracker_t tracker)
{
if (pn_tracker_direction(tracker) == OUTGOING) {
return messenger->outgoing;
} else {
return messenger->incoming;
}
}
pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t tracker)
{
pni_store_t *store = pn_tracker_store(messenger, tracker);
pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker));
if (e) {
return pni_entry_get_status(e);
} else {
return PN_STATUS_UNKNOWN;
}
}
int pn_messenger_settle(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
{
pni_store_t *store = pn_tracker_store(messenger, tracker);
return pni_store_update(store, pn_tracker_sequence(tracker), PN_STATUS_UNKNOWN, flags, true, true);
}
// true if all pending output has been sent to peer
bool pn_messenger_sent(pn_messenger_t *messenger)
{
int total = pni_store_size(messenger->outgoing);
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
// check if transport is done generating output
pn_transport_t *transport = pn_connector_transport(ctor);
if (transport) {
if (!pn_transport_quiesced(transport)) {
pn_connector_process(ctor);
return false;
}
}
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
if (pn_link_is_sender(link)) {
total += pn_link_queued(link);
pn_delivery_t *d = pn_unsettled_head(link);
while (d) {
if (!pn_delivery_remote_state(d) && !pn_delivery_settled(d)) {
total++;
}
d = pn_unsettled_next(d);
}
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
ctor = pn_connector_next(ctor);
}
return total <= messenger->send_threshold;
}
bool pn_messenger_rcvd(pn_messenger_t *messenger)
{
if (pni_store_size(messenger->incoming) > 0) return true;
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_delivery_t *d = pn_work_head(conn);
while (d) {
if (pn_delivery_readable(d) && !pn_delivery_partial(d)) {
return true;
}
d = pn_work_next(d);
}
ctor = pn_connector_next(ctor);
}
if (!pn_connector_head(messenger->driver) && !pn_listener_head(messenger->driver)) {
return true;
} else {
return false;
}
}
static bool work_pred(pn_messenger_t *messenger) {
return messenger->worked;
}
int pn_messenger_work(pn_messenger_t *messenger, int timeout)
{
messenger->worked = false;
return pn_messenger_tsync(messenger, work_pred, timeout);
}
int pn_messenger_interrupt(pn_messenger_t *messenger)
{
assert(messenger);
if (messenger->driver) {
return pn_driver_wakeup(messenger->driver);
} else {
return 0;
}
}
int pn_messenger_send(pn_messenger_t *messenger, int n)
{
if (n == -1) {
messenger->send_threshold = 0;
} else {
messenger->send_threshold = pn_messenger_outgoing(messenger) - n;
if (messenger->send_threshold < 0)
messenger->send_threshold = 0;
}
return pn_messenger_sync(messenger, pn_messenger_sent);
}
int pn_messenger_recv(pn_messenger_t *messenger, int n)
{
if (!messenger) return PN_ARG_ERR;
if (messenger->blocking && !pn_listener_head(messenger->driver)
&& !pn_connector_head(messenger->driver))
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
messenger->receiving = n;
pn_messenger_flow(messenger);
int err = pn_messenger_sync(messenger, pn_messenger_rcvd);
if (err) return err;
if (!pn_messenger_incoming(messenger) &&
messenger->blocking &&
!pn_listener_head(messenger->driver) &&
!pn_connector_head(messenger->driver)) {
return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
} else {
return 0;
}
}
int pn_messenger_receiving(pn_messenger_t *messenger)
{
assert(messenger);
return messenger->receiving;
}
int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
{
if (!messenger) return PN_ARG_ERR;
pni_entry_t *entry = pni_store_get(messenger->incoming, NULL);
// XXX: need to drain credit before returning EOS
if (!entry) return PN_EOS;
messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_track(entry));
pn_buffer_t *buf = pni_entry_bytes(entry);
pn_bytes_t bytes = pn_buffer_bytes(buf);
const char *encoded = bytes.start;
size_t size = bytes.size;
messenger->distributed--;
messenger->incoming_subscription = (pn_subscription_t *) pni_entry_get_context(entry);
if (msg) {
int err = pn_message_decode(msg, encoded, size);
pni_entry_free(entry);
if (err) {
return pn_error_format(messenger->error, err, "error decoding message: %s",
pn_message_error(msg));
} else {
return 0;
}
} else {
pni_entry_free(entry);
return 0;
}
}
pn_tracker_t pn_messenger_incoming_tracker(pn_messenger_t *messenger)
{
return messenger->incoming_tracker;
}
pn_subscription_t *pn_messenger_incoming_subscription(pn_messenger_t *messenger)
{
assert(messenger);
return messenger->incoming_subscription;
}
int pn_messenger_accept(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
{
if (pn_tracker_direction(tracker) != INCOMING) {
return pn_error_format(messenger->error, PN_ARG_ERR,
"invalid tracker, incoming tracker required");
}
return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
PN_STATUS_ACCEPTED, flags, false, false);
}
int pn_messenger_reject(pn_messenger_t *messenger, pn_tracker_t tracker, int flags)
{
if (pn_tracker_direction(tracker) != INCOMING) {
return pn_error_format(messenger->error, PN_ARG_ERR,
"invalid tracker, incoming tracker required");
}
return pni_store_update(messenger->incoming, pn_tracker_sequence(tracker),
PN_STATUS_REJECTED, flags, false, false);
}
int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
{
if (!messenger) return 0;
int result = 0;
pn_connector_t *ctor = pn_connector_head(messenger->driver);
while (ctor) {
pn_connection_t *conn = pn_connector_connection(ctor);
pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
while (link) {
if (pn_link_is_sender(link)) {
if (sender) {
result += pn_link_queued(link);
}
} else if (!sender) {
result += pn_link_queued(link);
}
link = pn_link_next(link, PN_LOCAL_ACTIVE);
}
ctor = pn_connector_next(ctor);
}
return result;
}
int pn_messenger_outgoing(pn_messenger_t *messenger)
{
return pni_store_size(messenger->outgoing) + pn_messenger_queued(messenger, true);
}
int pn_messenger_incoming(pn_messenger_t *messenger)
{
return pni_store_size(messenger->incoming) + pn_messenger_queued(messenger, false);
}
int pn_messenger_route(pn_messenger_t *messenger, const char *pattern, const char *address)
{
pn_transform_rule(messenger->routes, pattern, address);
return 0;
}
int pn_messenger_rewrite(pn_messenger_t *messenger, const char *pattern, const char *address)
{
pn_transform_rule(messenger->rewrites, pattern, address);
return 0;
}