| /******************************************************************************* |
| * Copyright (c) 2012, 2013 IBM Corp. |
| * |
| * All rights reserved. This program and the accompanying materials |
| * are made available under the terms of the Eclipse Public License v1.0 |
| * and Eclipse Distribution License v1.0 which accompany this distribution. |
| * |
| * The Eclipse Public License is available at |
| * http://www.eclipse.org/legal/epl-v10.html |
| * and the Eclipse Distribution License is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * Contributors: |
| * Ian Craggs - initial contribution |
| * Ian Craggs - fix for bug 413429 - connectionLost not called |
| * Guilherme Maciel Ferreira - add keep alive option |
| *******************************************************************************/ |
| |
| /* |
| |
| stdout subscriber for the asynchronous client |
| |
| compulsory parameters: |
| |
| --topic topic to subscribe to |
| |
| defaulted parameters: |
| |
| --host localhost |
| --port 1883 |
| --qos 2 |
| --delimiter \n |
| --clientid stdout-subscriber-async |
| --showtopics off |
| --keepalive 10 |
| |
| --userid none |
| --password none |
| |
| */ |
| |
| #include "MQTTAsync.h" |
| #include "MQTTClientPersistence.h" |
| |
| #include <stdio.h> |
| #include <signal.h> |
| #include <string.h> |
| #include <stdlib.h> |
| |
| |
| #if defined(WIN32) |
| #include <windows.h> |
| #define sleep Sleep |
| #else |
| #include <sys/time.h> |
| #include <unistd.h> |
| #endif |
| |
| #include <OsWrapper.h> |
| |
| volatile int finished = 0; |
| char* topic = NULL; |
| int subscribed = 0; |
| int disconnected = 0; |
| |
| |
| void cfinish(int sig) |
| { |
| signal(SIGINT, NULL); |
| finished = 1; |
| } |
| |
| |
| struct |
| { |
| char* clientid; |
| int nodelimiter; |
| char delimiter; |
| int qos; |
| char* username; |
| char* password; |
| char* host; |
| char* port; |
| int showtopics; |
| int keepalive; |
| } opts = |
| { |
| "stdout-subscriber-async", 1, '\n', 2, NULL, NULL, "localhost", "1883", 0, 10 |
| }; |
| |
| |
| void usage(void) |
| { |
| printf("MQTT stdout subscriber\n"); |
| printf("Usage: stdoutsub topicname <options>, where options are:\n"); |
| printf(" --host <hostname> (default is %s)\n", opts.host); |
| printf(" --port <port> (default is %s)\n", opts.port); |
| printf(" --qos <qos> (default is %d)\n", opts.qos); |
| printf(" --delimiter <delim> (default is no delimiter)\n"); |
| printf(" --clientid <clientid> (default is %s)\n", opts.clientid); |
| printf(" --username none\n"); |
| printf(" --password none\n"); |
| printf(" --showtopics <on or off> (default is on if the topic has a wildcard, else off)\n"); |
| printf(" --keepalive <seconds> (default is 10 seconds)\n"); |
| exit(EXIT_FAILURE); |
| } |
| |
| |
| void getopts(int argc, char** argv) |
| { |
| int count = 2; |
| |
| while (count < argc) |
| { |
| if (strcmp(argv[count], "--qos") == 0) |
| { |
| if (++count < argc) |
| { |
| if (strcmp(argv[count], "0") == 0) |
| opts.qos = 0; |
| else if (strcmp(argv[count], "1") == 0) |
| opts.qos = 1; |
| else if (strcmp(argv[count], "2") == 0) |
| opts.qos = 2; |
| else |
| usage(); |
| } |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--host") == 0) |
| { |
| if (++count < argc) |
| opts.host = argv[count]; |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--port") == 0) |
| { |
| if (++count < argc) |
| opts.port = argv[count]; |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--clientid") == 0) |
| { |
| if (++count < argc) |
| opts.clientid = argv[count]; |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--username") == 0) |
| { |
| if (++count < argc) |
| opts.username = argv[count]; |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--password") == 0) |
| { |
| if (++count < argc) |
| opts.password = argv[count]; |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--delimiter") == 0) |
| { |
| if (++count < argc) |
| { |
| if (strcmp("newline", argv[count]) == 0) |
| opts.delimiter = '\n'; |
| else |
| opts.delimiter = argv[count][0]; |
| opts.nodelimiter = 0; |
| } |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--showtopics") == 0) |
| { |
| if (++count < argc) |
| { |
| if (strcmp(argv[count], "on") == 0) |
| opts.showtopics = 1; |
| else if (strcmp(argv[count], "off") == 0) |
| opts.showtopics = 0; |
| else |
| usage(); |
| } |
| else |
| usage(); |
| } |
| else if (strcmp(argv[count], "--keepalive") == 0) |
| { |
| if (++count < argc) |
| opts.keepalive = atoi(argv[count]); |
| else |
| usage(); |
| } |
| count++; |
| } |
| |
| } |
| |
| |
| int messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message) |
| { |
| if (opts.showtopics) |
| printf("%s\t", topicName); |
| if (opts.nodelimiter) |
| printf("%.*s", message->payloadlen, (char*)message->payload); |
| else |
| printf("%.*s%c", message->payloadlen, (char*)message->payload, opts.delimiter); |
| fflush(stdout); |
| MQTTAsync_freeMessage(&message); |
| MQTTAsync_free(topicName); |
| return 1; |
| } |
| |
| |
| void onDisconnect(void* context, MQTTAsync_successData* response) |
| { |
| disconnected = 1; |
| } |
| |
| |
| void onSubscribe(void* context, MQTTAsync_successData* response) |
| { |
| subscribed = 1; |
| } |
| |
| |
| void onSubscribeFailure(void* context, MQTTAsync_failureData* response) |
| { |
| printf("Subscribe failed, rc %d\n", response->code); |
| finished = 1; |
| } |
| |
| |
| void onConnectFailure(void* context, MQTTAsync_failureData* response) |
| { |
| printf("Connect failed, rc %d\n", response ? response->code : -99); |
| finished = 1; |
| } |
| |
| |
| void onConnect(void* context, MQTTAsync_successData* response) |
| { |
| MQTTAsync client = (MQTTAsync)context; |
| MQTTAsync_responseOptions ropts = MQTTAsync_responseOptions_initializer; |
| int rc; |
| |
| if (opts.showtopics) |
| printf("Subscribing to topic %s with client %s at QoS %d\n", topic, opts.clientid, opts.qos); |
| |
| ropts.onSuccess = onSubscribe; |
| ropts.onFailure = onSubscribeFailure; |
| ropts.context = client; |
| if ((rc = MQTTAsync_subscribe(client, topic, opts.qos, &ropts)) != MQTTASYNC_SUCCESS) |
| { |
| printf("Failed to start subscribe, return code %d\n", rc); |
| finished = 1; |
| } |
| } |
| |
| |
| MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; |
| |
| |
| void connectionLost(void *context, char *cause) |
| { |
| MQTTAsync client = (MQTTAsync)context; |
| int rc; |
| |
| printf("connectionLost called\n"); |
| if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) |
| { |
| printf("Failed to start reconnect, return code %d\n", rc); |
| finished = 1; |
| } |
| } |
| |
| |
| int main(int argc, char** argv) |
| { |
| MQTTAsync client; |
| MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; |
| int rc = 0; |
| char url[100]; |
| |
| if (argc < 2) |
| usage(); |
| |
| topic = argv[1]; |
| |
| if (strchr(topic, '#') || strchr(topic, '+')) |
| opts.showtopics = 1; |
| if (opts.showtopics) |
| printf("topic is %s\n", topic); |
| |
| getopts(argc, argv); |
| sprintf(url, "%s:%s", opts.host, opts.port); |
| |
| rc = MQTTAsync_create(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL); |
| |
| MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL); |
| |
| signal(SIGINT, cfinish); |
| signal(SIGTERM, cfinish); |
| |
| conn_opts.keepAliveInterval = opts.keepalive; |
| conn_opts.cleansession = 1; |
| conn_opts.username = opts.username; |
| conn_opts.password = opts.password; |
| conn_opts.onSuccess = onConnect; |
| conn_opts.onFailure = onConnectFailure; |
| conn_opts.context = client; |
| if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) |
| { |
| printf("Failed to start connect, return code %d\n", rc); |
| exit(EXIT_FAILURE); |
| } |
| |
| while (!subscribed) |
| #if defined(WIN32) |
| Sleep(100); |
| #else |
| usleep(10000L); |
| #endif |
| |
| if (finished) |
| goto exit; |
| |
| while (!finished) |
| #if defined(WIN32) |
| Sleep(100); |
| #else |
| usleep(10000L); |
| #endif |
| |
| disc_opts.onSuccess = onDisconnect; |
| if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) |
| { |
| printf("Failed to start disconnect, return code %d\n", rc); |
| exit(EXIT_FAILURE); |
| } |
| |
| while (!disconnected) |
| #if defined(WIN32) |
| Sleep(100); |
| #else |
| usleep(10000L); |
| #endif |
| |
| exit: |
| MQTTAsync_destroy(&client); |
| |
| return EXIT_SUCCESS; |
| } |