blob: 187dba7f4fce769793a0a7be0fac444fec50b3b4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "thread.h"
#include <proton/condition.h>
#include <proton/raw_connection.h>
#include <proton/listener.h>
#include <proton/netaddr.h>
#include <proton/proactor.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef struct app_data_t {
const char *host, *port;
const char *amqp_address;
pn_proactor_t *proactor;
pn_listener_t *listener;
pthread_mutex_t lock;
int64_t first_idle_time;
int64_t wake_conn_time;
int connects;
int disconnects;
/* Sender values */
/* Receiver values */
} app_data_t;
#define MAX_CONNECTIONS 5
typedef struct conn_data_t {
pn_raw_connection_t *connection;
int64_t last_recv_time;
int bytes;
int buffers;
} conn_data_t;
static conn_data_t conn_data[MAX_CONNECTIONS] = {{0}};
static int exit_code = 0;
/* Close the connection and the listener so so we will get a
* PN_PROACTOR_INACTIVE event and exit, once all outstanding events
* are processed.
*/
static void close_all(pn_raw_connection_t *c, app_data_t *app) {
if (c) pn_raw_connection_close(c);
if (app->listener) pn_listener_close(app->listener);
}
static bool check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
if (pn_condition_is_set(cond)) {
fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
pn_condition_get_name(cond), pn_condition_get_description(cond));
return true;
}
return false;
}
static void check_condition_fatal(pn_event_t *e, pn_condition_t *cond, app_data_t *app) {
if (check_condition(e, cond, app)) {
close_all(pn_event_raw_connection(e), app);
exit_code = 1;
}
}
static void send_message(pn_raw_connection_t *c, const char* msg) {
pn_raw_buffer_t buffer;
uint32_t len = strlen(msg);
char *buf = (char*) malloc(1024);
memcpy(buf, msg, len);
buffer.bytes = buf;
buffer.capacity = 1024;
buffer.offset = 0;
buffer.size = len;
// If message not accepted just throw it away!
if (pn_raw_connection_write_buffers(c, &buffer, 1) < 1) {
printf("**Couldn't send message: write not accepted**\n");
free(buf);
}
}
static void recv_message(pn_raw_buffer_t buf) {
fwrite(buf.bytes, buf.size, 1, stdout);
}
conn_data_t *make_conn_data(pn_raw_connection_t *c) {
int i;
for (i = 0; i < MAX_CONNECTIONS; ++i) {
if (!conn_data[i].connection) {
conn_data[i].connection = c;
return &conn_data[i];
}
}
return NULL;
}
void free_conn_data(conn_data_t *c) {
if (!c) return;
c->connection = NULL;
}
#define READ_BUFFERS 4
static void free_buffers(pn_raw_buffer_t buffs[], size_t n) {
unsigned i;
for (i=0; i<n; ++i) {
free(buffs[i].bytes);
}
}
/* This function handles events when we are acting as the receiver */
static void handle_receive(app_data_t *app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_RAW_CONNECTION_CONNECTED: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
pn_raw_buffer_t buffers[READ_BUFFERS] = {{0}};
if (cd) {
int i = READ_BUFFERS;
printf("**raw connection %tu connected\n", cd-conn_data);
pthread_mutex_lock(&app->lock);
app->connects++;
pthread_mutex_unlock(&app->lock);
for (; i; --i) {
pn_raw_buffer_t *buff = &buffers[READ_BUFFERS-i];
buff->bytes = (char*) malloc(1024);
buff->capacity = 1024;
buff->size = 0;
buff->offset = 0;
}
pn_raw_connection_give_read_buffers(c, buffers, READ_BUFFERS);
} else {
printf("**too many raw connections connected: closing\n");
pn_raw_connection_close(c);
}
} break;
case PN_RAW_CONNECTION_WAKE: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
printf("**raw connection %tu woken\n", cd-conn_data);
} break;
case PN_RAW_CONNECTION_DISCONNECTED: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
if (cd) {
pthread_mutex_lock(&app->lock);
app->disconnects++;
pthread_mutex_unlock(&app->lock);
printf("**raw connection %tu disconnected: bytes: %d, buffers: %d\n", cd-conn_data, cd->bytes, cd->buffers);
} else {
printf("**raw connection disconnected: not connected\n");
}
check_condition(event, pn_raw_connection_condition(c), app);
pn_raw_connection_wake(c);
free_conn_data(cd);
} break;
case PN_RAW_CONNECTION_DRAIN_BUFFERS: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
pn_raw_buffer_t buffs[READ_BUFFERS];
size_t n;
while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
free_buffers(buffs, n);
}
while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
free_buffers(buffs, n);
}
}
case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
} break;
case PN_RAW_CONNECTION_READ: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
conn_data_t *cd = (conn_data_t *) pn_raw_connection_get_context(c);
pn_raw_buffer_t buffs[READ_BUFFERS];
size_t n;
cd->last_recv_time = pn_proactor_now_64();
while ( (n = pn_raw_connection_take_read_buffers(c, buffs, READ_BUFFERS)) ) {
unsigned i;
for (i=0; i<n && buffs[i].bytes; ++i) {
cd->bytes += buffs[i].size;
recv_message(buffs[i]);
}
cd->buffers += n;
// Echo back if we can
if (!pn_raw_connection_is_write_closed(c)) {
pn_raw_connection_write_buffers(c, buffs, n);
} else if (!pn_raw_connection_is_read_closed(c)) {
pn_raw_connection_give_read_buffers(c, buffs, n);
} else {
free_buffers(buffs, n);
}
}
} break;
case PN_RAW_CONNECTION_CLOSED_READ: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
if (!pn_raw_connection_is_write_closed(c)) {
send_message(c, "** Goodbye **");
}
}
case PN_RAW_CONNECTION_CLOSED_WRITE:{
pn_raw_connection_t *c = pn_event_raw_connection(event);
pn_raw_connection_close(c);
} break;
case PN_RAW_CONNECTION_WRITTEN: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
pn_raw_buffer_t buffs[READ_BUFFERS];
size_t n;
while ( (n = pn_raw_connection_take_written_buffers(c, buffs, READ_BUFFERS)) ) {
if (!pn_raw_connection_is_read_closed(c)) {
pn_raw_connection_give_read_buffers(c, buffs, n);
} else {
free_buffers(buffs, n);
}
};
} break;
default:
break;
}
}
#define WRITE_BUFFERS 4
/* Handle all events, delegate to handle_send or handle_receive
Return true to continue, false to exit
*/
static bool handle(app_data_t* app, pn_event_t* event) {
switch (pn_event_type(event)) {
case PN_LISTENER_OPEN: {
char port[256]; /* Get the listening port */
pn_netaddr_host_port(pn_listener_addr(pn_event_listener(event)), NULL, 0, port, sizeof(port));
printf("**listening on %s\n", port);
fflush(stdout);
break;
}
case PN_LISTENER_ACCEPT: {
pn_listener_t *listener = pn_event_listener(event);
pn_raw_connection_t *c = pn_raw_connection();
void *cd = make_conn_data(c);
int64_t now = pn_proactor_now_64();
if (cd) {
pthread_mutex_lock(&app->lock);
app->first_idle_time = 0;
if (app->wake_conn_time < now) {
app->wake_conn_time = now + 5000;
pn_proactor_set_timeout(pn_listener_proactor(listener), 5000);
}
pn_raw_connection_set_context(c, cd);
pn_listener_raw_accept(listener, c);
pthread_mutex_unlock(&app->lock);
} else {
printf("**too many connections, trying again later...\n");
/* No other sensible/correct way to reject connection - have to defer closing to event handler */
pn_raw_connection_set_context(c, 0);
pn_listener_raw_accept(listener, c);
}
} break;
case PN_LISTENER_CLOSE: {
pn_listener_t *listener = pn_event_listener(event);
app->listener = NULL; /* Listener is closed */
printf("**listener closed\n");
check_condition_fatal(event, pn_listener_condition(listener), app);
} break;
case PN_PROACTOR_TIMEOUT: {
pn_proactor_t *proactor = pn_event_proactor(event);
pthread_mutex_lock(&app->lock);
int64_t now = pn_proactor_now_64();
pn_millis_t timeout = 5000;
if (app->connects - app->disconnects == 0) {
timeout = 20000;
if (app->first_idle_time == 0) {
printf("**idle detected, shutting down in %dms\n", timeout);
app->first_idle_time = now;
} else if (app->first_idle_time + 20000 <= now) {
printf("**no activity for %dms: shutting down now\n", timeout);
pn_listener_close(app->listener);
break; // No more timeouts
}
} else if (now >= app->wake_conn_time) {
int i;
for (i = 0; i < MAX_CONNECTIONS; ++i) {
if (conn_data[i].connection) pn_raw_connection_wake(conn_data[i].connection);
}
app->wake_conn_time = now + 5000;
}
pn_proactor_set_timeout(proactor, timeout);
pthread_mutex_unlock(&app->lock);
} break;
case PN_PROACTOR_INACTIVE:
case PN_PROACTOR_INTERRUPT: {
pn_proactor_t *proactor = pn_event_proactor(event);
pn_proactor_interrupt(proactor);
return false;
} break;
default: {
pn_raw_connection_t *c = pn_event_raw_connection(event);
if (c) {
handle_receive(app, event);
}
}
}
return exit_code == 0;
}
void* run(void *arg) {
app_data_t *app = arg;
/* Loop and handle events */
bool again = true;
do {
pn_event_batch_t *events = pn_proactor_wait(app->proactor);
pn_event_t *e;
for (e = pn_event_batch_next(events); e && again; e = pn_event_batch_next(events)) {
again = handle(app, e);
}
pn_proactor_done(app->proactor, events);
} while(again);
return NULL;
}
int main(int argc, char **argv) {
struct app_data_t app = {0};
pthread_mutex_init(&app.lock, NULL);
char addr[PN_MAX_ADDR];
app.host = (argc > 1) ? argv[1] : "";
app.port = (argc > 2) ? argv[2] : "amqp";
/* Create the proactor and connect */
app.proactor = pn_proactor();
app.listener = pn_listener();
pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
pn_proactor_listen(app.proactor, app.listener, addr, 16);
size_t thread_count = 3;
pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), thread_count);
int n;
for (n=0; n<thread_count; n++) {
int rc = pthread_create(&threads[n], 0, run, (void*)&app);
if (rc) {
fprintf(stderr, "Failed to create thread\n");
exit(-1);
}
}
run(&app);
for (n=0; n<thread_count; n++) {
pthread_join(threads[n], 0);
}
pn_proactor_free(app.proactor);
return exit_code;
}