DISPATCH-774: modify HTTP code to work with proton 0.17
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index dc8ff58..96734f8 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -22,7 +22,9 @@
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/timer.h>
-#include <proton/connection_driver.h>
+#include <proton/connection.h>
+#include <proton/event.h>
+#include <proton/transport.h>
#include <libwebsockets.h>
@@ -90,8 +92,8 @@
/* AMQPWS connection: set as lws user data and qd_conn->context */
struct qd_http_connection_t {
- pn_connection_driver_t driver;
- qd_connection_t* qd_conn;
+ qd_connection_t *qd_conn;
+ pn_transport_t *transport;
buffer_t wbuf; /* LWS requires allocated header space at start of buffer */
struct lws *wsi;
char name[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port */
@@ -150,10 +152,11 @@
return unexpected_close(c->wsi, "not-established");
}
qd_connection_process(c->qd_conn);
- if (pn_connection_driver_write_buffer(&c->driver).size) {
- lws_callback_on_writable(c->wsi);
+ if (pn_transport_pending(c->transport) > 0) {
+ lws_callback_on_writable(c->wsi);
}
- if (pn_connection_driver_finished(&c->driver)) {
+ bool has_event = pn_collector_peek(pn_connection_collector(c->qd_conn->pn_conn));
+ if (pn_transport_closed(c->transport) && !has_event) {
lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
c->closed = true;
qd_connection_process(c->qd_conn);
@@ -395,10 +398,18 @@
c->qd_conn->listener = hl->listener;
lws_get_peer_simple(wsi, c->hostip, sizeof(c->hostip));
strncpy(c->name, c->hostip, sizeof(c->name));
- int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL);
- if (err) {
- return unexpected_close(c->wsi, pn_code(err));
+
+ c->qd_conn->pn_conn = pn_connection();
+ c->transport = pn_transport();
+ c->qd_conn->collector = pn_collector();
+ if (!c->qd_conn->pn_conn || !c->transport || !c->qd_conn->collector) {
+ if (c->qd_conn->pn_conn) pn_connection_free(c->qd_conn->pn_conn);
+ if (c->transport) pn_transport_free(c->transport);
+ if (c->qd_conn->collector) pn_collector_free(c->qd_conn->collector);
+ return unexpected_close(c->wsi, "out of memory");
}
+ pn_connection_collect(c->qd_conn->pn_conn, c->qd_conn->collector);
+
c->qd_conn->http = c;
c->qd_conn->server = hs->server;
c->qd_conn->connection_id = qd_server_connection_id(c->qd_conn->server);
@@ -406,24 +417,23 @@
c->qd_conn->policy_counted = false;
const qd_server_config_t *config = hl->listener->config;
c->qd_conn->role = strdup(config->role);
- c->qd_conn->pn_conn = c->driver.connection;
pn_connection_set_context(c->qd_conn->pn_conn, ctx);
- c->qd_conn->collector = c->driver.collector;
qd_server_decorate_connection(c->qd_conn->server, c->qd_conn->pn_conn, config);
qd_log(hs->log, QD_LOG_DEBUG,
"[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS",
qd_connection_connection_id(c->qd_conn), c->hostip);
- pn_connection_driver_bind(&c->driver);
+ pn_transport_bind(c->transport, c->qd_conn->pn_conn);
return handle_events(c);
}
case LWS_CALLBACK_SERVER_WRITEABLE: {
if (handle_events(c)) return -1;
- pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver);
- if (dbuf.size) {
+ ssize_t pending = pn_transport_pending(c->transport);
+ if (pending > 0) {
+ pn_bytes_t dbuf = pn_bytes(pending, pn_transport_head(c->transport));
/* lws_write() demands LWS_PRE bytes of free space before the data,
- * so we must copy from the driver's buffer to larger temporary wbuf
+ * so we must copy from the transport buffer to larger temporary wbuf
*/
buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size);
if (c->wbuf.start == NULL) {
@@ -433,10 +443,10 @@
memcpy(buf, dbuf.start, dbuf.size);
ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY);
if (wrote < 0) {
- pn_connection_driver_write_close(&c->driver);
+ pn_transport_close_head(c->transport);
return unexpected_close(c->wsi, "write-error");
} else {
- pn_connection_driver_write_done(&c->driver, wrote);
+ pn_transport_pop(c->transport, wrote);
}
}
return handle_events(c);
@@ -445,13 +455,14 @@
case LWS_CALLBACK_RECEIVE: {
while (len > 0) {
if (handle_events(c)) return -1;
- pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver);
- if (dbuf.size == 0) {
- return unexpected_close(c->wsi, "unexpected-data");
+ ssize_t cap = pn_transport_capacity(c->transport);
+ if (cap <= 0) {
+ return unexpected_close(c->wsi, "unexpected-close");
}
+ pn_rwbytes_t dbuf = pn_rwbytes(cap, pn_transport_tail(c->transport));
size_t copy = (len < dbuf.size) ? len : dbuf.size;
memcpy(dbuf.start, in, copy);
- pn_connection_driver_read_done(&c->driver, copy);
+ pn_transport_process(c->transport, copy);
len -= copy;
in = (char*)in + copy;
}
@@ -459,7 +470,7 @@
}
case LWS_CALLBACK_USER: {
- pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now);
+ pn_timestamp_t next_tick = pn_transport_tick(c->transport, hs->now);
if (next_tick && next_tick > hs->now && next_tick < hs->next_tick) {
hs->next_tick = next_tick;
}
@@ -467,20 +478,21 @@
}
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
- pn_connection_driver_read_close(&c->driver);
+ pn_transport_close_tail(c->transport);
return handle_events(c);
}
case LWS_CALLBACK_CLOSED: {
qd_log(wsi_log(wsi), QD_LOG_DEBUG, "HTTP connection closed to %s from %s",
wsi_listener(wsi)->host_port, c->name);
- if (c->driver.transport) {
- pn_connection_driver_close(&c->driver);
+ if (c->transport) {
+ pn_transport_close_tail(c->transport);
+ pn_transport_close_head(c->transport);
handle_events(c);
}
- pn_connection_driver_destroy(&c->driver);
- c->qd_conn->pn_conn = NULL;
- c->qd_conn->collector = NULL;
+ pn_transport_free(c->transport);
+ pn_connection_free(c->qd_conn->pn_conn);
+ pn_collector_free(c->qd_conn->collector);
qd_connection_free(c->qd_conn);
free(c->wbuf.start);
return -1;
@@ -539,7 +551,7 @@
break;
case W_WAKE: {
qd_http_connection_t *c = w.value;
- pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
+ pn_collector_put(c->qd_conn->collector, PN_OBJECT, c->qd_conn->pn_conn,
PN_CONNECTION_WAKE);
handle_events(c);
break;