blob: ba640f18f6be1198f078300c1d9e46c0f9b43548 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2009, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs, Allan Stockdill-Mander - SSL updates
* Ian Craggs - fix for issue #244, issue #20
*******************************************************************************/
/**
* @file
* \brief Socket buffering related functions
*
* Some other related functions are in the Socket module
*/
#include "SocketBuffer.h"
#include "LinkedList.h"
#include "Log.h"
#include "Messages.h"
#include "StackTrace.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "Heap.h"
#if defined(WIN32) || defined(WIN64)
#define iov_len len
#define iov_base buf
#endif
/**
* Default input queue buffer
*/
static socket_queue* def_queue;
/**
* List of queued input buffers
*/
static List* queues;
/**
* List of queued write buffers
*/
static List writes;
int socketcompare(void* a, void* b);
void SocketBuffer_newDefQ(void);
void SocketBuffer_freeDefQ(void);
int pending_socketcompare(void* a, void* b);
/**
* List callback function for comparing socket_queues by socket
* @param a first integer value
* @param b second integer value
* @return boolean indicating whether a and b are equal
*/
int socketcompare(void* a, void* b)
{
return ((socket_queue*)a)->socket == *(int*)b;
}
/**
* Create a new default queue when one has just been used.
*/
void SocketBuffer_newDefQ(void)
{
def_queue = malloc(sizeof(socket_queue));
def_queue->buflen = 1000;
def_queue->buf = malloc(def_queue->buflen);
def_queue->socket = def_queue->index = 0;
def_queue->buflen = def_queue->datalen = 0;
}
/**
* Initialize the socketBuffer module
*/
void SocketBuffer_initialize(void)
{
FUNC_ENTRY;
SocketBuffer_newDefQ();
queues = ListInitialize();
ListZero(&writes);
FUNC_EXIT;
}
/**
* Free the default queue memory
*/
void SocketBuffer_freeDefQ(void)
{
free(def_queue->buf);
free(def_queue);
}
/**
* Terminate the socketBuffer module
*/
void SocketBuffer_terminate(void)
{
ListElement* cur = NULL;
ListEmpty(&writes);
FUNC_ENTRY;
while (ListNextElement(queues, &cur))
free(((socket_queue*)(cur->content))->buf);
ListFree(queues);
SocketBuffer_freeDefQ();
FUNC_EXIT;
}
/**
* Cleanup any buffers for a specific socket
* @param socket the socket to clean up
*/
void SocketBuffer_cleanup(int socket)
{
FUNC_ENTRY;
SocketBuffer_writeComplete(socket); /* clean up write buffers */
if (ListFindItem(queues, &socket, socketcompare))
{
free(((socket_queue*)(queues->current->content))->buf);
ListRemove(queues, queues->current->content);
}
if (def_queue->socket == socket)
{
def_queue->socket = def_queue->index = 0;
def_queue->headerlen = def_queue->datalen = 0;
}
FUNC_EXIT;
}
/**
* Get any queued data for a specific socket
* @param socket the socket to get queued data for
* @param bytes the number of bytes of data to retrieve
* @param actual_len the actual length returned
* @return the actual data
*/
char* SocketBuffer_getQueuedData(int socket, size_t bytes, size_t* actual_len)
{
socket_queue* queue = NULL;
FUNC_ENTRY;
if (ListFindItem(queues, &socket, socketcompare))
{ /* if there is queued data for this socket, add any data read to it */
queue = (socket_queue*)(queues->current->content);
*actual_len = queue->datalen;
}
else
{
*actual_len = 0;
queue = def_queue;
}
if (bytes > queue->buflen)
{
if (queue->datalen > 0)
{
void* newmem = malloc(bytes);
memcpy(newmem, queue->buf, queue->datalen);
free(queue->buf);
queue->buf = newmem;
}
else
queue->buf = realloc(queue->buf, bytes);
queue->buflen = bytes;
}
FUNC_EXIT;
return queue->buf;
}
/**
* Get any queued character for a specific socket
* @param socket the socket to get queued data for
* @param c the character returned if any
* @return completion code
*/
int SocketBuffer_getQueuedChar(int socket, char* c)
{
int rc = SOCKETBUFFER_INTERRUPTED;
FUNC_ENTRY;
if (ListFindItem(queues, &socket, socketcompare))
{ /* if there is queued data for this socket, read that first */
socket_queue* queue = (socket_queue*)(queues->current->content);
if (queue->index < queue->headerlen)
{
*c = queue->fixed_header[(queue->index)++];
Log(TRACE_MAX, -1, "index is now %d, headerlen %d", queue->index, queue->headerlen);
rc = SOCKETBUFFER_COMPLETE;
goto exit;
}
else if (queue->index > 4)
{
Log(LOG_FATAL, -1, "header is already at full length");
rc = SOCKET_ERROR;
goto exit;
}
}
exit:
FUNC_EXIT_RC(rc);
return rc; /* there was no queued char if rc is SOCKETBUFFER_INTERRUPTED*/
}
/**
* A socket read was interrupted so we need to queue data
* @param socket the socket to get queued data for
* @param actual_len the actual length of data that was read
*/
void SocketBuffer_interrupted(int socket, size_t actual_len)
{
socket_queue* queue = NULL;
FUNC_ENTRY;
if (ListFindItem(queues, &socket, socketcompare))
queue = (socket_queue*)(queues->current->content);
else /* new saved queue */
{
queue = def_queue;
/* if SocketBuffer_queueChar() has not yet been called, then the socket number
in def_queue will not have been set. Issue #244.
If actual_len == 0 then we may not need to do anything - I'll leave that
optimization for another time. */
queue->socket = socket;
ListAppend(queues, def_queue, sizeof(socket_queue)+def_queue->buflen);
SocketBuffer_newDefQ();
}
queue->index = 0;
queue->datalen = actual_len;
FUNC_EXIT;
}
/**
* A socket read has now completed so we can get rid of the queue
* @param socket the socket for which the operation is now complete
* @return pointer to the default queue data
*/
char* SocketBuffer_complete(int socket)
{
FUNC_ENTRY;
if (ListFindItem(queues, &socket, socketcompare))
{
socket_queue* queue = (socket_queue*)(queues->current->content);
SocketBuffer_freeDefQ();
def_queue = queue;
ListDetach(queues, queue);
}
def_queue->socket = def_queue->index = 0;
def_queue->headerlen = def_queue->datalen = 0;
FUNC_EXIT;
return def_queue->buf;
}
/**
* A socket operation had now completed so we can get rid of the queue
* @param socket the socket for which the operation is now complete
* @param c the character to queue
*/
void SocketBuffer_queueChar(int socket, char c)
{
int error = 0;
socket_queue* curq = def_queue;
FUNC_ENTRY;
if (ListFindItem(queues, &socket, socketcompare))
curq = (socket_queue*)(queues->current->content);
else if (def_queue->socket == 0)
{
def_queue->socket = socket;
def_queue->index = 0;
def_queue->datalen = 0;
}
else if (def_queue->socket != socket)
{
Log(LOG_FATAL, -1, "attempt to reuse socket queue");
error = 1;
}
if (curq->index > 4)
{
Log(LOG_FATAL, -1, "socket queue fixed_header field full");
error = 1;
}
if (!error)
{
curq->fixed_header[(curq->index)++] = c;
curq->headerlen = curq->index;
}
Log(TRACE_MAX, -1, "queueChar: index is now %d, headerlen %d", curq->index, curq->headerlen);
FUNC_EXIT;
}
/**
* A socket write was interrupted so store the remaining data
* @param socket the socket for which the write was interrupted
* @param count the number of iovec buffers
* @param iovecs buffer array
* @param total total data length to be written
* @param bytes actual data length that was written
*/
#if defined(OPENSSL)
void SocketBuffer_pendingWrite(int socket, SSL* ssl, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
#else
void SocketBuffer_pendingWrite(int socket, int count, iobuf* iovecs, int* frees, size_t total, size_t bytes)
#endif
{
int i = 0;
pending_writes* pw = NULL;
FUNC_ENTRY;
/* store the buffers until the whole packet is written */
pw = malloc(sizeof(pending_writes));
pw->socket = socket;
#if defined(OPENSSL)
pw->ssl = ssl;
#endif
pw->bytes = bytes;
pw->total = total;
pw->count = count;
for (i = 0; i < count; i++)
{
pw->iovecs[i] = iovecs[i];
pw->frees[i] = frees[i];
}
ListAppend(&writes, pw, sizeof(pw) + total);
FUNC_EXIT;
}
/**
* List callback function for comparing pending_writes by socket
* @param a first integer value
* @param b second integer value
* @return boolean indicating whether a and b are equal
*/
int pending_socketcompare(void* a, void* b)
{
return ((pending_writes*)a)->socket == *(int*)b;
}
/**
* Get any queued write data for a specific socket
* @param socket the socket to get queued data for
* @return pointer to the queued data or NULL
*/
pending_writes* SocketBuffer_getWrite(int socket)
{
ListElement* le = ListFindItem(&writes, &socket, pending_socketcompare);
return (le) ? (pending_writes*)(le->content) : NULL;
}
/**
* A socket write has now completed so we can get rid of the queue
* @param socket the socket for which the operation is now complete
* @return completion code, boolean - was the queue removed?
*/
int SocketBuffer_writeComplete(int socket)
{
return ListRemoveItem(&writes, &socket, pending_socketcompare);
}
/**
* Update the queued write data for a socket in the case of QoS 0 messages.
* @param socket the socket for which the operation is now complete
* @param topic the topic of the QoS 0 write
* @param payload the payload of the QoS 0 write
* @return pointer to the updated queued data structure, or NULL
*/
pending_writes* SocketBuffer_updateWrite(int socket, char* topic, char* payload)
{
pending_writes* pw = NULL;
ListElement* le = NULL;
FUNC_ENTRY;
if ((le = ListFindItem(&writes, &socket, pending_socketcompare)) != NULL)
{
pw = (pending_writes*)(le->content);
if (pw->count == 4)
{
pw->iovecs[2].iov_base = topic;
pw->iovecs[3].iov_base = payload;
}
}
FUNC_EXIT;
return pw;
}