blob: 1a4040d2e0d56d4de28a3fb785c01de957ee3aa4 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 2012, 2016 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial contribution
* Guilherme Maciel Ferreira - add keep alive option
*******************************************************************************/
/*
stdin publisher
compulsory parameters:
--topic topic to publish on
defaulted parameters:
--host localhost
--port 1883
--qos 0
--delimiters \n
--clientid stdin-publisher-async
--maxdatalen 100
--keepalive 10
--userid none
--password none
*/
#include "MQTTAsync.h"
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <stdlib.h>
#if defined(WIN32)
#include <windows.h>
#define sleep Sleep
#else
#include <unistd.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#include <OsWrapper.h>
volatile int toStop = 0;
struct
{
char* clientid;
char* delimiter;
int maxdatalen;
int qos;
int retained;
char* username;
char* password;
char* host;
char* port;
int verbose;
int keepalive;
} opts =
{
"stdin-publisher-async", "\n", 100, 0, 0, NULL, NULL, "localhost", "1883", 0, 10
};
void usage(void)
{
printf("MQTT stdin publisher\n");
printf("Usage: stdinpub topicname <options>, where options are:\n");
printf(" --host <hostname> (default is %s)\n", opts.host);
printf(" --port <port> (default is %s)\n", opts.port);
printf(" --qos <qos> (default is %d)\n", opts.qos);
printf(" --retained (default is %s)\n", opts.retained ? "on" : "off");
printf(" --delimiter <delim> (default is \\n)\n");
printf(" --clientid <clientid> (default is %s)\n", opts.clientid);
printf(" --maxdatalen <bytes> (default is %d)\n", opts.maxdatalen);
printf(" --username none\n");
printf(" --password none\n");
printf(" --keepalive <seconds> (default is 10 seconds)\n");
exit(EXIT_FAILURE);
}
void cfinish(int sig)
{
signal(SIGINT, NULL);
toStop = 1;
}
void getopts(int argc, char** argv);
int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{
/* not expecting any messages */
return 1;
}
static int disconnected = 0;
void onDisconnect(void* context, MQTTAsync_successData* response)
{
disconnected = 1;
}
static int connected = 0;
void myconnect(MQTTAsync* client);
void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
printf("Connect failed, rc %d\n", response ? response->code : -1);
connected = -1;
MQTTAsync client = (MQTTAsync)context;
myconnect(client);
}
void onConnect(void* context, MQTTAsync_successData* response)
{
printf("Connected\n");
connected = 1;
}
void myconnect(MQTTAsync* client)
{
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
int rc = 0;
printf("Connecting\n");
conn_opts.keepAliveInterval = opts.keepalive;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts;
conn_opts.automaticReconnect = 1;
connected = 0;
if ((rc = MQTTAsync_connect(*client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
static int published = 0;
void onPublishFailure(void* context, MQTTAsync_failureData* response)
{
printf("Publish failed, rc %d\n", response ? -1 : response->code);
published = -1;
}
void onPublish(void* context, MQTTAsync_successData* response)
{
published = 1;
}
void connectionLost(void* context, char* cause)
{
MQTTAsync client = (MQTTAsync)context;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer;
int rc = 0;
printf("Connecting\n");
conn_opts.keepAliveInterval = 10;
conn_opts.cleansession = 1;
conn_opts.username = opts.username;
conn_opts.password = opts.password;
conn_opts.onSuccess = onConnect;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = client;
ssl_opts.enableServerCertAuth = 0;
conn_opts.ssl = &ssl_opts;
connected = 0;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
}
int main(int argc, char** argv)
{
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer;
MQTTAsync client;
char* topic = NULL;
char* buffer = NULL;
int rc = 0;
char url[100];
if (argc < 2)
usage();
getopts(argc, argv);
sprintf(url, "%s:%s", opts.host, opts.port);
if (opts.verbose)
printf("URL is %s\n", url);
topic = argv[1];
printf("Using topic %s\n", topic);
create_opts.sendWhileDisconnected = 1;
rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
signal(SIGINT, cfinish);
signal(SIGTERM, cfinish);
rc = MQTTAsync_setCallbacks(client, client, connectionLost, messageArrived, NULL);
myconnect(&client);
buffer = malloc(opts.maxdatalen);
while (!toStop)
{
int data_len = 0;
int delim_len = 0;
delim_len = (int)strlen(opts.delimiter);
do
{
buffer[data_len++] = getchar();
if (data_len > delim_len)
{
/* printf("comparing %s %s\n", opts.delimiter, &buffer[data_len - delim_len]); */
if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0)
break;
}
} while (data_len < opts.maxdatalen);
if (opts.verbose)
printf("Publishing data of length %d\n", data_len);
pub_opts.onSuccess = onPublish;
pub_opts.onFailure = onPublishFailure;
do
{
rc = MQTTAsync_send(client, topic, data_len, buffer, opts.qos, opts.retained, &pub_opts);
}
while (rc != MQTTASYNC_SUCCESS);
}
printf("Stopping\n");
free(buffer);
disc_opts.onSuccess = onDisconnect;
if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to start disconnect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
while (!disconnected)
#if defined(WIN32)
Sleep(100);
#else
usleep(10000L);
#endif
MQTTAsync_destroy(&client);
return EXIT_SUCCESS;
}
void getopts(int argc, char** argv)
{
int count = 2;
while (count < argc)
{
if (strcmp(argv[count], "--retained") == 0)
opts.retained = 1;
if (strcmp(argv[count], "--verbose") == 0)
opts.verbose = 1;
else if (strcmp(argv[count], "--qos") == 0)
{
if (++count < argc)
{
if (strcmp(argv[count], "0") == 0)
opts.qos = 0;
else if (strcmp(argv[count], "1") == 0)
opts.qos = 1;
else if (strcmp(argv[count], "2") == 0)
opts.qos = 2;
else
usage();
}
else
usage();
}
else if (strcmp(argv[count], "--host") == 0)
{
if (++count < argc)
opts.host = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--port") == 0)
{
if (++count < argc)
opts.port = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--clientid") == 0)
{
if (++count < argc)
opts.clientid = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--username") == 0)
{
if (++count < argc)
opts.username = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--password") == 0)
{
if (++count < argc)
opts.password = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--maxdatalen") == 0)
{
if (++count < argc)
opts.maxdatalen = atoi(argv[count]);
else
usage();
}
else if (strcmp(argv[count], "--delimiter") == 0)
{
if (++count < argc)
opts.delimiter = argv[count];
else
usage();
}
else if (strcmp(argv[count], "--keepalive") == 0)
{
if (++count < argc)
opts.keepalive = atoi(argv[count]);
else
usage();
}
count++;
}
}