| /******************************************************************************* |
| * Copyright (c) 2009, 2013 IBM Corp. |
| * |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * and Eclipse Distribution License v1.0 which accompany this distribution. |
| * |
| * The Eclipse Public License is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * and the Eclipse Distribution License is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * Contributors: |
| * Ian Craggs - initial API and implementation and/or initial documentation |
| * Ian Craggs - async client updates |
| * Ian Craggs - fix for bug 432903 - queue persistence |
| *******************************************************************************/ |
| |
| /** |
| * @file |
| * \brief Functions that apply to persistence operations. |
| * |
| */ |
| |
| #include <stdio.h> |
| #include <string.h> |
| |
| #include "MQTTPersistence.h" |
| #include "MQTTPersistenceDefault.h" |
| #include "MQTTProtocolClient.h" |
| #include "Heap.h" |
| |
| |
| static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen); |
| static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size); |
| |
| /** |
| * Creates a ::MQTTClient_persistence structure representing a persistence implementation. |
| * @param persistence the ::MQTTClient_persistence structure. |
| * @param type the type of the persistence implementation. See ::MQTTClient_create. |
| * @param pcontext the context for this persistence implementation. See ::MQTTClient_create. |
| * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. |
| */ |
| #include "StackTrace.h" |
| |
| int MQTTPersistence_create(MQTTClient_persistence** persistence, int type, void* pcontext) |
| { |
| int rc = 0; |
| MQTTClient_persistence* per = NULL; |
| |
| FUNC_ENTRY; |
| #if !defined(NO_PERSISTENCE) |
| switch (type) |
| { |
| case MQTTCLIENT_PERSISTENCE_NONE : |
| per = NULL; |
| break; |
| case MQTTCLIENT_PERSISTENCE_DEFAULT : |
| per = malloc(sizeof(MQTTClient_persistence)); |
| if ( per != NULL ) |
| { |
| if ( pcontext != NULL ) |
| { |
| per->context = malloc(strlen(pcontext) + 1); |
| strcpy(per->context, pcontext); |
| } |
| else |
| per->context = "."; /* working directory */ |
| /* file system functions */ |
| per->popen = pstopen; |
| per->pclose = pstclose; |
| per->pput = pstput; |
| per->pget = pstget; |
| per->premove = pstremove; |
| per->pkeys = pstkeys; |
| per->pclear = pstclear; |
| per->pcontainskey = pstcontainskey; |
| } |
| else |
| rc = MQTTCLIENT_PERSISTENCE_ERROR; |
| break; |
| case MQTTCLIENT_PERSISTENCE_USER : |
| per = (MQTTClient_persistence *)pcontext; |
| if ( per == NULL || (per != NULL && (per->context == NULL || per->pclear == NULL || |
| per->pclose == NULL || per->pcontainskey == NULL || per->pget == NULL || per->pkeys == NULL || |
| per->popen == NULL || per->pput == NULL || per->premove == NULL)) ) |
| rc = MQTTCLIENT_PERSISTENCE_ERROR; |
| break; |
| default: |
| rc = MQTTCLIENT_PERSISTENCE_ERROR; |
| break; |
| } |
| #endif |
| |
| *persistence = per; |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| /** |
| * Open persistent store and restore any persisted messages. |
| * @param client the client as ::Clients. |
| * @param serverURI the URI of the remote end. |
| * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. |
| */ |
| int MQTTPersistence_initialize(Clients *c, const char *serverURI) |
| { |
| int rc = 0; |
| |
| FUNC_ENTRY; |
| if ( c->persistence != NULL ) |
| { |
| rc = c->persistence->popen(&(c->phandle), c->clientID, serverURI, c->persistence->context); |
| if ( rc == 0 ) |
| rc = MQTTPersistence_restore(c); |
| } |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| /** |
| * Close persistent store. |
| * @param client the client as ::Clients. |
| * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. |
| */ |
| int MQTTPersistence_close(Clients *c) |
| { |
| int rc =0; |
| |
| FUNC_ENTRY; |
| if (c->persistence != NULL) |
| { |
| rc = c->persistence->pclose(c->phandle); |
| c->phandle = NULL; |
| #if !defined(NO_PERSISTENCE) |
| if ( c->persistence->popen == pstopen ) |
| free(c->persistence); |
| #endif |
| c->persistence = NULL; |
| } |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| /** |
| * Clears the persistent store. |
| * @param client the client as ::Clients. |
| * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. |
| */ |
| int MQTTPersistence_clear(Clients *c) |
| { |
| int rc = 0; |
| |
| FUNC_ENTRY; |
| if (c->persistence != NULL) |
| rc = c->persistence->pclear(c->phandle); |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| /** |
| * Restores the persisted records to the outbound and inbound message queues of the |
| * client. |
| * @param client the client as ::Clients. |
| * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. |
| */ |
| int MQTTPersistence_restore(Clients *c) |
| { |
| int rc = 0; |
| char **msgkeys = NULL, |
| *buffer = NULL; |
| int nkeys, buflen; |
| int i = 0; |
| int msgs_sent = 0; |
| int msgs_rcvd = 0; |
| |
| FUNC_ENTRY; |
| if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0) |
| { |
| while (rc == 0 && i < nkeys) |
| { |
| if (strncmp(msgkeys[i], PERSISTENCE_COMMAND_KEY, strlen(PERSISTENCE_COMMAND_KEY)) == 0) |
| { |
| ; |
| } |
| else if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) == 0) |
| { |
| ; |
| } |
| else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0) |
| { |
| MQTTPacket* pack = MQTTPersistence_restorePacket(buffer, buflen); |
| if ( pack != NULL ) |
| { |
| if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_RECEIVED) != NULL ) |
| { |
| Publish* publish = (Publish*)pack; |
| Messages* msg = NULL; |
| msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain); |
| msg->nextMessageType = PUBREL; |
| /* order does not matter for persisted received messages */ |
| ListAppend(c->inboundMsgs, msg, msg->len); |
| publish->topic = NULL; |
| MQTTPacket_freePublish(publish); |
| msgs_rcvd++; |
| } |
| else if ( strstr(msgkeys[i],PERSISTENCE_PUBLISH_SENT) != NULL ) |
| { |
| Publish* publish = (Publish*)pack; |
| Messages* msg = NULL; |
| char *key = malloc(MESSAGE_FILENAME_LENGTH + 1); |
| sprintf(key, "%s%d", PERSISTENCE_PUBREL, publish->msgId); |
| msg = MQTTProtocol_createMessage(publish, &msg, publish->header.bits.qos, publish->header.bits.retain); |
| if ( c->persistence->pcontainskey(c->phandle, key) == 0 ) |
| /* PUBLISH Qo2 and PUBREL sent */ |
| msg->nextMessageType = PUBCOMP; |
| /* else: PUBLISH QoS1, or PUBLISH QoS2 and PUBREL not sent */ |
| /* retry at the first opportunity */ |
| msg->lastTouch = 0; |
| MQTTPersistence_insertInOrder(c->outboundMsgs, msg, msg->len); |
| publish->topic = NULL; |
| MQTTPacket_freePublish(publish); |
| free(key); |
| msgs_sent++; |
| } |
| else if ( strstr(msgkeys[i],PERSISTENCE_PUBREL) != NULL ) |
| { |
| /* orphaned PUBRELs ? */ |
| Pubrel* pubrel = (Pubrel*)pack; |
| char *key = malloc(MESSAGE_FILENAME_LENGTH + 1); |
| sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, pubrel->msgId); |
| if ( c->persistence->pcontainskey(c->phandle, key) != 0 ) |
| rc = c->persistence->premove(c->phandle, msgkeys[i]); |
| free(pubrel); |
| free(key); |
| } |
| } |
| else /* pack == NULL -> bad persisted record */ |
| rc = c->persistence->premove(c->phandle, msgkeys[i]); |
| } |
| if (buffer) |
| { |
| free(buffer); |
| buffer = NULL; |
| } |
| if (msgkeys[i]) |
| free(msgkeys[i]); |
| i++; |
| } |
| if (msgkeys) |
| free(msgkeys); |
| } |
| Log(TRACE_MINIMUM, -1, "%d sent messages and %d received messages restored for client %s\n", |
| msgs_sent, msgs_rcvd, c->clientID); |
| MQTTPersistence_wrapMsgID(c); |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| /** |
| * Returns a MQTT packet restored from persisted data. |
| * @param buffer the persisted data. |
| * @param buflen the number of bytes of the data buffer. |
| */ |
| void* MQTTPersistence_restorePacket(char* buffer, size_t buflen) |
| { |
| void* pack = NULL; |
| Header header; |
| int fixed_header_length = 1, ptype, remaining_length = 0; |
| char c; |
| int multiplier = 1; |
| extern pf new_packets[]; |
| |
| FUNC_ENTRY; |
| header.byte = buffer[0]; |
| /* decode the message length according to the MQTT algorithm */ |
| do |
| { |
| c = *(++buffer); |
| remaining_length += (c & 127) * multiplier; |
| multiplier *= 128; |
| fixed_header_length++; |
| } while ((c & 128) != 0); |
| |
| if ( (fixed_header_length + remaining_length) == buflen ) |
| { |
| ptype = header.bits.type; |
| if (ptype >= CONNECT && ptype <= DISCONNECT && new_packets[ptype] != NULL) |
| pack = (*new_packets[ptype])(header.byte, ++buffer, remaining_length); |
| } |
| |
| FUNC_EXIT; |
| return pack; |
| } |
| |
| |
| /** |
| * Inserts the specified message into the list, maintaining message ID order. |
| * @param list the list to insert the message into. |
| * @param content the message to add. |
| * @param size size of the message. |
| */ |
| void MQTTPersistence_insertInOrder(List* list, void* content, size_t size) |
| { |
| ListElement* index = NULL; |
| ListElement* current = NULL; |
| |
| FUNC_ENTRY; |
| while(ListNextElement(list, ¤t) != NULL && index == NULL) |
| { |
| if ( ((Messages*)content)->msgid < ((Messages*)current->content)->msgid ) |
| index = current; |
| } |
| |
| ListInsert(list, content, size, index); |
| FUNC_EXIT; |
| } |
| |
| |
| /** |
| * Adds a record to the persistent store. This function must not be called for QoS0 |
| * messages. |
| * @param socket the socket of the client. |
| * @param buf0 fixed header. |
| * @param buf0len length of the fixed header. |
| * @param count number of buffers representing the variable header and/or the payload. |
| * @param buffers the buffers representing the variable header and/or the payload. |
| * @param buflens length of the buffers representing the variable header and/or the payload. |
| * @param msgId the message ID. |
| * @param scr 0 indicates message in the sending direction; 1 indicates message in the |
| * receiving direction. |
| * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. |
| */ |
| int MQTTPersistence_put(int socket, char* buf0, size_t buf0len, int count, |
| char** buffers, size_t* buflens, int htype, int msgId, int scr ) |
| { |
| int rc = 0; |
| extern ClientStates* bstate; |
| int nbufs, i; |
| int* lens = NULL; |
| char** bufs = NULL; |
| char *key; |
| Clients* client = NULL; |
| |
| FUNC_ENTRY; |
| client = (Clients*)(ListFindItem(bstate->clients, &socket, clientSocketCompare)->content); |
| if (client->persistence != NULL) |
| { |
| key = malloc(MESSAGE_FILENAME_LENGTH + 1); |
| nbufs = 1 + count; |
| lens = (int *)malloc(nbufs * sizeof(int)); |
| bufs = (char **)malloc(nbufs * sizeof(char *)); |
| lens[0] = (int)buf0len; |
| bufs[0] = buf0; |
| for (i = 0; i < count; i++) |
| { |
| lens[i+1] = (int)buflens[i]; |
| bufs[i+1] = buffers[i]; |
| } |
| |
| /* key */ |
| if ( scr == 0 ) |
| { /* sending */ |
| if (htype == PUBLISH) /* PUBLISH QoS1 and QoS2*/ |
| sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId); |
| if (htype == PUBREL) /* PUBREL */ |
| sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId); |
| } |
| if ( scr == 1 ) /* receiving PUBLISH QoS2 */ |
| sprintf(key, "%s%d", PERSISTENCE_PUBLISH_RECEIVED, msgId); |
| |
| rc = client->persistence->pput(client->phandle, key, nbufs, bufs, lens); |
| |
| free(key); |
| free(lens); |
| free(bufs); |
| } |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| /** |
| * Deletes a record from the persistent store. |
| * @param client the client as ::Clients. |
| * @param type the type of the persisted record: #PERSISTENCE_PUBLISH_SENT, #PERSISTENCE_PUBREL |
| * or #PERSISTENCE_PUBLISH_RECEIVED. |
| * @param qos the qos field of the message. |
| * @param msgId the message ID. |
| * @return 0 if success, #MQTTCLIENT_PERSISTENCE_ERROR otherwise. |
| */ |
| int MQTTPersistence_remove(Clients* c, char *type, int qos, int msgId) |
| { |
| int rc = 0; |
| |
| FUNC_ENTRY; |
| if (c->persistence != NULL) |
| { |
| char *key = malloc(MESSAGE_FILENAME_LENGTH + 1); |
| if ( (strcmp(type,PERSISTENCE_PUBLISH_SENT) == 0) && qos == 2 ) |
| { |
| sprintf(key, "%s%d", PERSISTENCE_PUBLISH_SENT, msgId) ; |
| rc = c->persistence->premove(c->phandle, key); |
| sprintf(key, "%s%d", PERSISTENCE_PUBREL, msgId) ; |
| rc = c->persistence->premove(c->phandle, key); |
| } |
| else /* PERSISTENCE_PUBLISH_SENT && qos == 1 */ |
| { /* or PERSISTENCE_PUBLISH_RECEIVED */ |
| sprintf(key, "%s%d", type, msgId) ; |
| rc = c->persistence->premove(c->phandle, key); |
| } |
| free(key); |
| } |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| /** |
| * Checks whether the message IDs wrapped by looking for the largest gap between two consecutive |
| * message IDs in the outboundMsgs queue. |
| * @param client the client as ::Clients. |
| */ |
| void MQTTPersistence_wrapMsgID(Clients *client) |
| { |
| ListElement* wrapel = NULL; |
| ListElement* current = NULL; |
| |
| FUNC_ENTRY; |
| if ( client->outboundMsgs->count > 0 ) |
| { |
| int firstMsgID = ((Messages*)client->outboundMsgs->first->content)->msgid; |
| int lastMsgID = ((Messages*)client->outboundMsgs->last->content)->msgid; |
| int gap = MAX_MSG_ID - lastMsgID + firstMsgID; |
| current = ListNextElement(client->outboundMsgs, ¤t); |
| |
| while(ListNextElement(client->outboundMsgs, ¤t) != NULL) |
| { |
| int curMsgID = ((Messages*)current->content)->msgid; |
| int curPrevMsgID = ((Messages*)current->prev->content)->msgid; |
| int curgap = curMsgID - curPrevMsgID; |
| if ( curgap > gap ) |
| { |
| gap = curgap; |
| wrapel = current; |
| } |
| } |
| } |
| |
| if ( wrapel != NULL ) |
| { |
| /* put wrapel at the beginning of the queue */ |
| client->outboundMsgs->first->prev = client->outboundMsgs->last; |
| client->outboundMsgs->last->next = client->outboundMsgs->first; |
| client->outboundMsgs->first = wrapel; |
| client->outboundMsgs->last = wrapel->prev; |
| client->outboundMsgs->first->prev = NULL; |
| client->outboundMsgs->last->next = NULL; |
| } |
| FUNC_EXIT; |
| } |
| |
| |
| #if !defined(NO_PERSISTENCE) |
| int MQTTPersistence_unpersistQueueEntry(Clients* client, MQTTPersistence_qEntry* qe) |
| { |
| int rc = 0; |
| char key[PERSISTENCE_MAX_KEY_LENGTH + 1]; |
| |
| FUNC_ENTRY; |
| sprintf(key, "%s%u", PERSISTENCE_QUEUE_KEY, qe->seqno); |
| if ((rc = client->persistence->premove(client->phandle, key)) != 0) |
| Log(LOG_ERROR, 0, "Error %d removing qEntry from persistence", rc); |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| int MQTTPersistence_persistQueueEntry(Clients* aclient, MQTTPersistence_qEntry* qe) |
| { |
| int rc = 0; |
| int nbufs = 8; |
| int bufindex = 0; |
| char key[PERSISTENCE_MAX_KEY_LENGTH + 1]; |
| int* lens = NULL; |
| void** bufs = NULL; |
| |
| FUNC_ENTRY; |
| lens = (int*)malloc(nbufs * sizeof(int)); |
| bufs = malloc(nbufs * sizeof(char *)); |
| |
| bufs[bufindex] = &qe->msg->payloadlen; |
| lens[bufindex++] = sizeof(qe->msg->payloadlen); |
| |
| bufs[bufindex] = qe->msg->payload; |
| lens[bufindex++] = qe->msg->payloadlen; |
| |
| bufs[bufindex] = &qe->msg->qos; |
| lens[bufindex++] = sizeof(qe->msg->qos); |
| |
| bufs[bufindex] = &qe->msg->retained; |
| lens[bufindex++] = sizeof(qe->msg->retained); |
| |
| bufs[bufindex] = &qe->msg->dup; |
| lens[bufindex++] = sizeof(qe->msg->dup); |
| |
| bufs[bufindex] = &qe->msg->msgid; |
| lens[bufindex++] = sizeof(qe->msg->msgid); |
| |
| bufs[bufindex] = qe->topicName; |
| lens[bufindex++] = (int)strlen(qe->topicName) + 1; |
| |
| bufs[bufindex] = &qe->topicLen; |
| lens[bufindex++] = sizeof(qe->topicLen); |
| |
| sprintf(key, "%s%d", PERSISTENCE_QUEUE_KEY, ++aclient->qentry_seqno); |
| qe->seqno = aclient->qentry_seqno; |
| |
| if ((rc = aclient->persistence->pput(aclient->phandle, key, nbufs, (char**)bufs, lens)) != 0) |
| Log(LOG_ERROR, 0, "Error persisting queue entry, rc %d", rc); |
| |
| free(lens); |
| free(bufs); |
| |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| |
| |
| static MQTTPersistence_qEntry* MQTTPersistence_restoreQueueEntry(char* buffer, size_t buflen) |
| { |
| MQTTPersistence_qEntry* qe = NULL; |
| char* ptr = buffer; |
| int data_size; |
| |
| FUNC_ENTRY; |
| qe = malloc(sizeof(MQTTPersistence_qEntry)); |
| memset(qe, '\0', sizeof(MQTTPersistence_qEntry)); |
| |
| qe->msg = malloc(sizeof(MQTTPersistence_message)); |
| memset(qe->msg, '\0', sizeof(MQTTPersistence_message)); |
| |
| qe->msg->payloadlen = *(int*)ptr; |
| ptr += sizeof(int); |
| |
| data_size = qe->msg->payloadlen; |
| qe->msg->payload = malloc(data_size); |
| memcpy(qe->msg->payload, ptr, data_size); |
| ptr += data_size; |
| |
| qe->msg->qos = *(int*)ptr; |
| ptr += sizeof(int); |
| |
| qe->msg->retained = *(int*)ptr; |
| ptr += sizeof(int); |
| |
| qe->msg->dup = *(int*)ptr; |
| ptr += sizeof(int); |
| |
| qe->msg->msgid = *(int*)ptr; |
| ptr += sizeof(int); |
| |
| data_size = (int)strlen(ptr) + 1; |
| qe->topicName = malloc(data_size); |
| strcpy(qe->topicName, ptr); |
| ptr += data_size; |
| |
| qe->topicLen = *(int*)ptr; |
| ptr += sizeof(int); |
| |
| FUNC_EXIT; |
| return qe; |
| } |
| |
| |
| static void MQTTPersistence_insertInSeqOrder(List* list, MQTTPersistence_qEntry* qEntry, size_t size) |
| { |
| ListElement* index = NULL; |
| ListElement* current = NULL; |
| |
| FUNC_ENTRY; |
| while (ListNextElement(list, ¤t) != NULL && index == NULL) |
| { |
| if (qEntry->seqno < ((MQTTPersistence_qEntry*)current->content)->seqno) |
| index = current; |
| } |
| ListInsert(list, qEntry, size, index); |
| FUNC_EXIT; |
| } |
| |
| |
| /** |
| * Restores a queue of messages from persistence to memory |
| * @param c the client as ::Clients - the client object to restore the messages to |
| * @return return code, 0 if successful |
| */ |
| int MQTTPersistence_restoreMessageQueue(Clients* c) |
| { |
| int rc = 0; |
| char **msgkeys; |
| int nkeys; |
| int i = 0; |
| int entries_restored = 0; |
| |
| FUNC_ENTRY; |
| if (c->persistence && (rc = c->persistence->pkeys(c->phandle, &msgkeys, &nkeys)) == 0) |
| { |
| while (rc == 0 && i < nkeys) |
| { |
| char *buffer = NULL; |
| int buflen; |
| |
| if (strncmp(msgkeys[i], PERSISTENCE_QUEUE_KEY, strlen(PERSISTENCE_QUEUE_KEY)) != 0) |
| { |
| ; |
| } |
| else if ((rc = c->persistence->pget(c->phandle, msgkeys[i], &buffer, &buflen)) == 0) |
| { |
| MQTTPersistence_qEntry* qe = MQTTPersistence_restoreQueueEntry(buffer, buflen); |
| |
| if (qe) |
| { |
| qe->seqno = atoi(msgkeys[i]+2); |
| MQTTPersistence_insertInSeqOrder(c->messageQueue, qe, sizeof(MQTTPersistence_qEntry)); |
| free(buffer); |
| c->qentry_seqno = max(c->qentry_seqno, qe->seqno); |
| entries_restored++; |
| } |
| } |
| if (msgkeys[i]) |
| { |
| free(msgkeys[i]); |
| } |
| i++; |
| } |
| if (msgkeys != NULL) |
| free(msgkeys); |
| } |
| Log(TRACE_MINIMUM, -1, "%d queued messages restored for client %s", entries_restored, c->clientID); |
| FUNC_EXIT_RC(rc); |
| return rc; |
| } |
| #endif |