blob: 031e21fb0a95010ad133aa6d5ac9d07972aa15e4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
#undef _GNU_SOURCE
#include "proton/raw_connection.h"
#include "proton/event.h"
#include "proton/listener.h"
#include "proton/object.h"
#include "proton/proactor.h"
#include "proton/types.h"
#include "core/util.h"
#include "proactor-internal.h"
#include <assert.h>
#include <errno.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include "raw_connection-internal.h"
PN_STRUCT_CLASSDEF(pn_raw_connection)
void pni_raw_initialize(pn_raw_connection_t *conn) {
// Link together free lists
for (buff_ptr i = 1; i<=read_buffer_count; i++) {
conn->rbuffers[i-1].next = i==read_buffer_count ? 0 : i+1;
conn->rbuffers[i-1].type = buff_rempty;
conn->wbuffers[i-1].next = i==read_buffer_count ? 0 : i+1;
conn->wbuffers[i-1].type = buff_wempty;
}
conn->condition = pn_condition();
conn->collector = pn_collector();
conn->attachments = pn_record();
conn->rbuffer_first_empty = 1;
conn->wbuffer_first_empty = 1;
conn->state = conn_init;
}
typedef enum {
conn_connected = 0,
conn_read_closed = 1,
conn_write_closed = 2,
conn_write_drained = 3,
int_read_shutdown = 4,
int_write_shutdown = 5,
int_disconnect = 6,
api_read_close = 7,
api_write_close = 8,
} raw_event;
/*
* There's a little trick in this state table - both conn_init - the initial state and the self transition are represented by
* 0. This is because no state ever transitions to the initial state.
*/
const uint8_t state_transitions[][9] = {
/* State\event cconnected, crclosed, cwclosed, cwdrained, irshutdown, iwshutdown, idsconnect, arclose, awclose */
[conn_init] = {conn_ro_wo, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_rc_wo, conn_ro_wd},
[conn_ro_wo] = {conn_wrong, conn_rc_wo, conn_ro_wc, 0, conn_wrong, conn_wrong, conn_wrong, conn_rc_wo, conn_ro_wd},
[conn_ro_wd] = {conn_wrong, conn_rc_wd, conn_ro_wc, conn_ro_wc, conn_wrong, conn_wrong, conn_wrong, conn_rc_wd, 0},
[conn_ro_wc] = {conn_wrong, conn_rs_ws, 0, conn_wrong, conn_wrong, conn_ro_ws, conn_wrong, conn_rs_ws, 0},
[conn_ro_ws] = {conn_wrong, conn_rs_ws, 0, 0, conn_wrong, conn_wrong, conn_wrong, conn_rs_ws, 0},
[conn_rc_wo] = {conn_wrong, 0, conn_rs_ws, 0, conn_rs_wo, conn_wrong, conn_wrong, 0, conn_rc_wd},
[conn_rc_wd] = {conn_wrong, 0, conn_rs_ws, conn_rs_ws, conn_rs_wd, conn_wrong, conn_wrong, 0, 0},
[conn_rs_wo] = {conn_wrong, 0, conn_rs_ws, 0, conn_wrong, conn_wrong, conn_wrong, 0, conn_rs_wd},
[conn_rs_wd] = {conn_wrong, 0, conn_rs_ws, conn_rs_ws, conn_wrong, conn_wrong, conn_wrong, 0, 0},
[conn_rs_ws] = {conn_wrong, 0, 0, conn_wrong, conn_wrong, conn_wrong, conn_fini, 0, 0},
[conn_fini] = {conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, conn_wrong, 0, 0, 0},
};
static inline uint8_t pni_raw_new_state(pn_raw_connection_t *conn, raw_event event) {
uint8_t old_state = conn->state;
uint8_t new_state = state_transitions[old_state][event];
assert(new_state != (uint8_t)conn_wrong);
return new_state == 0 ? old_state : new_state;
}
// Open for read by application
static inline bool pni_raw_ropen(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_ro_wo:
case conn_ro_wd:
case conn_ro_wc:
case conn_ro_ws:
return true;
default:
return false;
}
}
// Closed for reading by application
static inline bool pni_raw_rclosed(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_rc_wo:
case conn_rc_wd:
case conn_rs_wo:
case conn_rs_wd:
case conn_rs_ws:
case conn_fini :
return true;
default:
return false;
}
}
// Not yet sent read closed event
static inline bool pni_raw_rclosing(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_rc_wo:
case conn_rc_wd:
return true;
default:
return false;
}
}
// Unused code
#if 0
// Open for write by application
static inline bool pni_raw_wopen(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_ro_wo:
case conn_rc_wo:
case conn_rs_wo:
return true;
default:
return false;
}
}
#endif
// Return whether closed for writing by application (name a little confusing!)
// NB Initial state is NOT closed for writing - app can write before connecting
static inline bool pni_raw_wclosed(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_ro_wd:
case conn_ro_wc:
case conn_ro_ws:
case conn_rc_wd:
case conn_rs_wd:
case conn_rs_ws:
case conn_fini :
return true;
default:
return false;
}
}
// Return whether closed for application and all writes drained
static inline bool pni_raw_wdrained(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_ro_wc:
case conn_ro_ws:
case conn_rs_ws:
case conn_fini :
return true;
default:
return false;
}
}
// Not yet sent write closed event
static inline bool pni_raw_wclosing(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_ro_wc:
return true;
default:
return false;
}
}
// Fully closed for read/write - sent both closed events
static inline bool pni_raw_rwclosed(pn_raw_connection_t *conn) {
switch (conn->state) {
case conn_rs_ws:
case conn_fini :
return true;
default:
return false;
}
}
bool pni_raw_validate(pn_raw_connection_t *conn) {
int rempty_count = 0;
for (buff_ptr i = conn->rbuffer_first_empty; i; i = conn->rbuffers[i-1].next) {
if (conn->rbuffers[i-1].type != buff_rempty) return false;
rempty_count++;
}
int runused_count = 0;
for (buff_ptr i = conn->rbuffer_first_unused; i; i = conn->rbuffers[i-1].next) {
if (conn->rbuffers[i-1].type != buff_unread) return false;
runused_count++;
}
int rread_count = 0;
for (buff_ptr i = conn->rbuffer_first_read; i; i = conn->rbuffers[i-1].next) {
if (conn->rbuffers[i-1].type != buff_read) return false;
rread_count++;
}
if (rempty_count+runused_count+rread_count != read_buffer_count) return false;
if (!conn->rbuffer_first_unused && conn->rbuffer_last_unused) return false;
if (conn->rbuffer_last_unused &&
(conn->rbuffers[conn->rbuffer_last_unused-1].type != buff_unread || conn->rbuffers[conn->rbuffer_last_unused-1].next != 0)) return false;
if (!conn->rbuffer_first_read && conn->rbuffer_last_read) return false;
if (conn->rbuffer_last_read &&
(conn->rbuffers[conn->rbuffer_last_read-1].type != buff_read || conn->rbuffers[conn->rbuffer_last_read-1].next != 0)) return false;
int wempty_count = 0;
for (buff_ptr i = conn->wbuffer_first_empty; i; i = conn->wbuffers[i-1].next) {
if (conn->wbuffers[i-1].type != buff_wempty) return false;
wempty_count++;
}
int wunwritten_count = 0;
for (buff_ptr i = conn->wbuffer_first_towrite; i; i = conn->wbuffers[i-1].next) {
if (conn->wbuffers[i-1].type != buff_unwritten) return false;
wunwritten_count++;
}
int wwritten_count = 0;
for (buff_ptr i = conn->wbuffer_first_written; i; i = conn->wbuffers[i-1].next) {
if (conn->wbuffers[i-1].type != buff_written) return false;
wwritten_count++;
}
if (wempty_count+wunwritten_count+wwritten_count != write_buffer_count) return false;
if (!conn->wbuffer_first_towrite && conn->wbuffer_last_towrite) return false;
if (conn->wbuffer_last_towrite &&
(conn->wbuffers[conn->wbuffer_last_towrite-1].type != buff_unwritten || conn->wbuffers[conn->wbuffer_last_towrite-1].next != 0)) return false;
if (!conn->wbuffer_first_written && conn->wbuffer_last_written) return false;
if (conn->wbuffer_last_written &&
(conn->wbuffers[conn->wbuffer_last_written-1].type != buff_written || conn->wbuffers[conn->wbuffer_last_written-1].next != 0)) return false;
return true;
}
void pni_raw_finalize(pn_raw_connection_t *conn) {
pn_condition_free(conn->condition);
pn_collector_free(conn->collector);
pn_free(conn->attachments);
}
size_t pn_raw_connection_read_buffers_capacity(pn_raw_connection_t *conn) {
assert(conn);
bool rclosed = pni_raw_rclosed(conn);
return rclosed ? 0 : (read_buffer_count - conn->rbuffer_count);
}
size_t pn_raw_connection_write_buffers_capacity(pn_raw_connection_t *conn) {
assert(conn);
bool wclosed = pni_raw_wclosed(conn);
return wclosed ? 0 : (write_buffer_count-conn->wbuffer_count);
}
size_t pn_raw_connection_give_read_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t const *buffers, size_t num) {
assert(conn);
size_t can_take = pn_min(num, pn_raw_connection_read_buffers_capacity(conn));
if ( can_take==0 ) return 0;
buff_ptr current = conn->rbuffer_first_empty;
assert(current);
buff_ptr previous;
for (size_t i = 0; i < can_take; i++) {
// Get next free
assert(conn->rbuffers[current-1].type == buff_rempty);
conn->rbuffers[current-1].context = buffers[i].context;
conn->rbuffers[current-1].bytes = buffers[i].bytes;
conn->rbuffers[current-1].capacity = buffers[i].capacity;
conn->rbuffers[current-1].size = 0;
conn->rbuffers[current-1].offset = buffers[i].offset;
conn->rbuffers[current-1].type = buff_unread;
previous = current;
current = conn->rbuffers[current-1].next;
}
if (!conn->rbuffer_last_unused) {
conn->rbuffer_last_unused = previous;
}
conn->rbuffers[previous-1].next = conn->rbuffer_first_unused;
conn->rbuffer_first_unused = conn->rbuffer_first_empty;
conn->rbuffer_first_empty = current;
conn->rbuffer_count += can_take;
conn->rrequestedbuffers = false;
return can_take;
}
size_t pn_raw_connection_take_read_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t *buffers, size_t num) {
assert(conn);
size_t count = 0;
buff_ptr current = conn->rbuffer_first_read;
if (!current) return 0;
buff_ptr previous;
for (; current && count < num; count++) {
assert(conn->rbuffers[current-1].type == buff_read);
buffers[count].context = conn->rbuffers[current-1].context;
buffers[count].bytes = conn->rbuffers[current-1].bytes;
buffers[count].capacity = conn->rbuffers[current-1].capacity;
buffers[count].size = conn->rbuffers[current-1].size;
buffers[count].offset = conn->rbuffers[current-1].offset - conn->rbuffers[current-1].size;
conn->rbuffers[current-1].type = buff_rempty;
previous = current;
current = conn->rbuffers[current-1].next;
}
if (!count) return 0;
conn->rbuffers[previous-1].next = conn->rbuffer_first_empty;
conn->rbuffer_first_empty = conn->rbuffer_first_read;
conn->rbuffer_first_read = current;
if (!current) {
conn->rbuffer_last_read = 0;
}
conn->rbuffer_count -= count;
return count;
}
size_t pn_raw_connection_write_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t const *buffers, size_t num) {
assert(conn);
size_t can_take = pn_min(num, pn_raw_connection_write_buffers_capacity(conn));
if ( can_take==0 ) return 0;
buff_ptr current = conn->wbuffer_first_empty;
assert(current);
buff_ptr previous;
for (size_t i = 0; i < can_take; i++) {
// Get next free
assert(conn->wbuffers[current-1].type == buff_wempty);
conn->wbuffers[current-1].context = buffers[i].context;
conn->wbuffers[current-1].bytes = buffers[i].bytes;
conn->wbuffers[current-1].capacity = buffers[i].capacity;
conn->wbuffers[current-1].size = buffers[i].size;
conn->wbuffers[current-1].offset = buffers[i].offset;
conn->wbuffers[current-1].type = buff_unwritten;
previous = current;
current = conn->wbuffers[current-1].next;
}
if (!conn->wbuffer_first_towrite) {
conn->wbuffer_first_towrite = conn->wbuffer_first_empty;
}
if (conn->wbuffer_last_towrite) {
conn->wbuffers[conn->wbuffer_last_towrite-1].next = conn->wbuffer_first_empty;
}
conn->wbuffer_last_towrite = previous;
conn->wbuffers[previous-1].next = 0;
conn->wbuffer_first_empty = current;
conn->wbuffer_count += can_take;
conn->wrequestedbuffers = false;
return can_take;
}
size_t pn_raw_connection_take_written_buffers(pn_raw_connection_t *conn, pn_raw_buffer_t *buffers, size_t num) {
assert(conn);
size_t count = 0;
buff_ptr current = conn->wbuffer_first_written;
if (!current) return 0;
buff_ptr previous;
for (; current && count < num; count++) {
assert(conn->wbuffers[current-1].type == buff_written);
buffers[count].context = conn->wbuffers[current-1].context;
buffers[count].bytes = conn->wbuffers[current-1].bytes;
buffers[count].capacity = conn->wbuffers[current-1].capacity;
buffers[count].size = conn->wbuffers[current-1].size;
buffers[count].offset = conn->wbuffers[current-1].offset;
conn->wbuffers[current-1].type = buff_wempty;
previous = current;
current = conn->wbuffers[current-1].next;
}
if (!count) return 0;
conn->wbuffers[previous-1].next = conn->wbuffer_first_empty;
conn->wbuffer_first_empty = conn->wbuffer_first_written;
conn->wbuffer_first_written = current;
if (!current) {
conn->wbuffer_last_written = 0;
}
conn->wbuffer_count -= count;
return count;
}
static inline void pni_raw_put_event(pn_raw_connection_t *conn, pn_event_type_t type) {
pn_collector_put(conn->collector, PN_CLASSCLASS(pn_raw_connection), (void*)conn, type);
}
static inline void pni_raw_release_buffers(pn_raw_connection_t *conn) {
for(;conn->rbuffer_first_unused;) {
buff_ptr p = conn->rbuffer_first_unused;
assert(conn->rbuffers[p-1].type == buff_unread);
conn->rbuffers[p-1].size = 0;
if (!conn->rbuffer_first_read) {
conn->rbuffer_first_read = p;
}
if (conn->rbuffer_last_read) {
conn->rbuffers[conn->rbuffer_last_read-1].next = p;
}
conn->rbuffer_last_read = p;
conn->rbuffer_first_unused = conn->rbuffers[p-1].next;
conn->rbuffers[p-1].next = 0;
conn->rbuffers[p-1].type = buff_read;
}
conn->rbuffer_last_unused = 0;
for(;conn->wbuffer_first_towrite;) {
buff_ptr p = conn->wbuffer_first_towrite;
assert(conn->wbuffers[p-1].type == buff_unwritten);
if (!conn->wbuffer_first_written) {
conn->wbuffer_first_written = p;
}
if (conn->wbuffer_last_written) {
conn->wbuffers[conn->wbuffer_last_written-1].next = p;
}
conn->wbuffer_last_written = p;
conn->wbuffer_first_towrite = conn->wbuffers[p-1].next;
conn->wbuffers[p-1].next = 0;
conn->wbuffers[p-1].type = buff_written;
}
conn->wbuffer_last_towrite = 0;
}
static inline void pni_raw_disconnect(pn_raw_connection_t *conn) {
pni_raw_release_buffers(conn);
conn->disconnectpending = true;
conn->disconnect_state = disc_init;
conn->state = pni_raw_new_state(conn, int_disconnect);
}
void pni_raw_connected(pn_raw_connection_t *conn) {
pn_condition_clear(conn->condition);
pni_raw_put_event(conn, PN_RAW_CONNECTION_CONNECTED);
conn->state = pni_raw_new_state(conn, conn_connected);
}
void pni_raw_connect_failed(pn_raw_connection_t *conn) {
conn->state = conn_fini;
pni_raw_disconnect(conn);
}
void pni_raw_wake(pn_raw_connection_t *conn) {
conn->wakepending = true;
}
void pni_raw_read(pn_raw_connection_t *conn, int sock, long (*recv)(int, void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
assert(conn);
if (!pni_raw_ropen(conn)) return;
bool closed = false;
for(;conn->rbuffer_first_unused;) {
buff_ptr p = conn->rbuffer_first_unused;
assert(conn->rbuffers[p-1].type == buff_unread);
char *bytes = conn->rbuffers[p-1].bytes+conn->rbuffers[p-1].offset;
size_t s = conn->rbuffers[p-1].capacity-conn->rbuffers[p-1].offset;
int r = recv(sock, bytes, s);
if (r < 0) {
switch (errno) {
// Interrupted system call try again
case EINTR: continue;
// Would block
case EWOULDBLOCK: goto finished_reading;
// Detected an error
default:
set_error(conn, "recv error", errno);
pni_raw_close(conn);
return;
}
}
conn->rbuffers[p-1].size += r;
conn->rbuffers[p-1].offset += r;
if (!conn->rbuffer_first_read) {
conn->rbuffer_first_read = p;
}
if (conn->rbuffer_last_read) {
conn->rbuffers[conn->rbuffer_last_read-1].next = p;
}
conn->rbuffer_last_read = p;
conn->rbuffer_first_unused = conn->rbuffers[p-1].next;
conn->rbuffers[p-1].next = 0;
conn->rbuffers[p-1].type = buff_read;
// Checking for end of stream here ensures that there is a buffer at the end with nothing in it
if (r == 0) {
closed = true;
break;
}
}
finished_reading:
if (!conn->rbuffer_first_unused) {
conn->rbuffer_last_unused = 0;
}
// Read something - we are now either out of buffers; end of stream; or blocked for read
if (conn->rbuffer_first_read && !conn->rpending) {
conn->rpending = true;
}
uint8_t old_state = conn->state;
// Socket closed for read
if (closed) {
conn->state = pni_raw_new_state(conn, conn_read_closed);
conn->rclosedpending = true;
}
if (conn->state != old_state) {
if (pni_raw_rwclosed(conn)) {
pni_raw_disconnect(conn);
}
}
return;
}
void pni_raw_write(pn_raw_connection_t *conn, int sock, long (*send)(int, const void*, size_t), void(*set_error)(pn_raw_connection_t *, const char *, int)) {
assert(conn);
if (pni_raw_wdrained(conn)) return;
bool closed = false;
bool drained = false;
for(;conn->wbuffer_first_towrite;) {
buff_ptr p = conn->wbuffer_first_towrite;
assert(conn->wbuffers[p-1].type == buff_unwritten);
char *bytes = conn->wbuffers[p-1].bytes+conn->wbuffers[p-1].offset+conn->unwritten_offset;
size_t s = conn->wbuffers[p-1].size-conn->unwritten_offset;
int r = send(sock, bytes, s);
if (r < 0) {
// Interrupted system call try again
switch (errno) {
// Interrupted system call try again
case EINTR: continue;
case EWOULDBLOCK:
goto finished_writing;
default:
set_error(conn, "send error", errno);
pni_raw_release_buffers(conn);
pni_raw_close(conn);
return;
}
}
// return of 0 was never observed in testing and the documentation
// implies that 0 could only be returned if 0 bytes were sent; however
// leaving this case here seems safe.
if (r == 0 && s > 0) {
closed = true;
break;
}
// Only wrote a partial buffer - adjust buffer
if (r != (int)s) {
conn->unwritten_offset += r;
break;
}
conn->unwritten_offset = 0;
if (!conn->wbuffer_first_written) {
conn->wbuffer_first_written = p;
}
if (conn->wbuffer_last_written) {
conn->wbuffers[conn->wbuffer_last_written-1].next = p;
}
conn->wbuffer_last_written = p;
conn->wbuffer_first_towrite = conn->wbuffers[p-1].next;
conn->wbuffers[p-1].next = 0;
conn->wbuffers[p-1].type = buff_written;
}
finished_writing:
if (!conn->wbuffer_first_towrite) {
conn->wbuffer_last_towrite = 0;
drained = true;
}
// Wrote something; end of stream; out of buffers; or blocked for write
if (conn->wbuffer_first_written && !conn->wpending) {
conn->wpending = true;
}
uint8_t old_state = conn->state;
// Drained all writes
if (drained) {
conn->state = pni_raw_new_state(conn, conn_write_drained);
}
// Socket closed for write
if (closed) {
conn->state = pni_raw_new_state(conn, conn_write_closed);
conn->wclosedpending = true;
}
if (conn->state != old_state) {
if (pni_raw_rwclosed(conn)) {
pni_raw_disconnect(conn);
}
}
return;
}
void pni_raw_process_shutdown(pn_raw_connection_t *conn, int sock, int (*shutdown_rd)(int), int (*shutdown_wr)(int)) {
assert(conn);
if (pni_raw_rclosing(conn)) {
shutdown_rd(sock);
conn->state = pni_raw_new_state(conn, int_read_shutdown);
}
if (pni_raw_wclosing(conn)) {
shutdown_wr(sock);
conn->state = pni_raw_new_state(conn, int_write_shutdown);
}
}
bool pni_raw_can_read(pn_raw_connection_t *conn) {
return pni_raw_ropen(conn) && conn->rbuffer_first_unused;
}
bool pni_raw_can_write(pn_raw_connection_t *conn) {
return !pni_raw_wdrained(conn) && conn->wbuffer_first_towrite;
}
pn_event_t *pni_raw_event_next(pn_raw_connection_t *conn) {
assert(conn);
do {
pn_event_t *event = pn_collector_next(conn->collector);
if (event) {
return pni_log_event(conn, event);
} else if (conn->wakepending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_WAKE);
conn->wakepending = false;
} else if (conn->rpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
conn->rpending = false;
} else if (conn->wpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_WRITTEN);
conn->wpending = false;
} else if (conn->rclosedpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_CLOSED_READ);
conn->rclosedpending = false;
} else if (conn->wclosedpending) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_CLOSED_WRITE);
conn->wclosedpending = false;
} else if (conn->disconnectpending) {
switch (conn->disconnect_state) {
case disc_init:
if (conn->rbuffer_first_read || conn->wbuffer_first_written) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_DRAIN_BUFFERS);
}
conn->disconnect_state = disc_drain_msg;
break;
// TODO: We'll leave the read/written events in here for the moment for backward compatibility
// remove them soon (after dispatch uses DRAIN_BUFFER)
case disc_drain_msg:
if (conn->rbuffer_first_read) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_READ);
}
conn->disconnect_state = disc_read_msg;
break;
case disc_read_msg:
if (conn->wbuffer_first_written) {
pni_raw_put_event(conn, PN_RAW_CONNECTION_WRITTEN);
}
conn->disconnect_state = disc_written_msg;
break;
case disc_written_msg:
pni_raw_put_event(conn, PN_RAW_CONNECTION_DISCONNECTED);
conn->disconnectpending = false;
conn->disconnect_state = disc_fini;
break;
}
} else if (!pni_raw_wdrained(conn) && !conn->wbuffer_first_towrite && !conn->wrequestedbuffers) {
// Ran out of write buffers
pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_WRITE_BUFFERS);
conn->wrequestedbuffers = true;
} else if (!pni_raw_rclosed(conn) && !conn->rbuffer_first_unused && !conn->rrequestedbuffers) {
// Ran out of read buffers
pni_raw_put_event(conn, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
conn->rrequestedbuffers = true;
} else {
return NULL;
}
} while (true);
}
void pni_raw_read_close(pn_raw_connection_t *conn) {
// If already fully closed nothing to do
if (pni_raw_rwclosed(conn)) {
return;
}
if (!pni_raw_rclosed(conn)) {
conn->rclosedpending = true;
}
conn->state = pni_raw_new_state(conn, api_read_close);
if (pni_raw_rwclosed(conn)) {
pni_raw_disconnect(conn);
}
}
void pni_raw_write_close(pn_raw_connection_t *conn) {
// If already fully closed nothing to do
if (pni_raw_rwclosed(conn)) {
return;
}
if (!pni_raw_wdrained(conn)) {
if (!pni_raw_wclosed(conn)) {
conn->wclosedpending = true;
}
conn->state = pni_raw_new_state(conn, api_write_close);
if (!conn->wbuffer_first_towrite) {
conn->state = pni_raw_new_state(conn, conn_write_drained);
}
}
if (pni_raw_rwclosed(conn)) {
pni_raw_disconnect(conn);
}
}
void pni_raw_close(pn_raw_connection_t *conn) {
// If already fully closed nothing to do
if (pni_raw_rwclosed(conn)) {
return;
}
if (!pni_raw_rclosed(conn)) {
conn->rclosedpending = true;
}
conn->state = pni_raw_new_state(conn, api_read_close);
if (!pni_raw_wdrained(conn)) {
if (!pni_raw_wclosed(conn)) {
conn->wclosedpending = true;
}
conn->state = pni_raw_new_state(conn, api_write_close);
if (!conn->wbuffer_first_towrite) {
conn->state = pni_raw_new_state(conn, conn_write_drained);
}
}
if (pni_raw_rwclosed(conn)) {
pni_raw_disconnect(conn);
}
}
bool pn_raw_connection_is_read_closed(pn_raw_connection_t *conn) {
assert(conn);
return pni_raw_rclosed(conn);
}
bool pn_raw_connection_is_write_closed(pn_raw_connection_t *conn) {
assert(conn);
return pni_raw_wclosed(conn);
}
pn_condition_t *pn_raw_connection_condition(pn_raw_connection_t *conn) {
assert(conn);
return conn->condition;
}
void *pn_raw_connection_get_context(pn_raw_connection_t *conn) {
assert(conn);
return pn_record_get(conn->attachments, PN_LEGCTX);
}
void pn_raw_connection_set_context(pn_raw_connection_t *conn, void *context) {
assert(conn);
pn_record_set(conn->attachments, PN_LEGCTX, context);
}
pn_record_t *pn_raw_connection_attachments(pn_raw_connection_t *conn) {
assert(conn);
return conn->attachments;
}
pn_raw_connection_t *pn_event_raw_connection(pn_event_t *event) {
return (pn_event_class(event) == PN_CLASSCLASS(pn_raw_connection)) ? (pn_raw_connection_t*)pn_event_context(event) : NULL;
}