blob: de9b0230e54fe032e42f3db4caa33aee3174f489 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
// This is a re-implementation of send.c using non-blocking/asynchronous calls.
#include "proton/message.h"
#include "proton/messenger.h"
#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#if EMSCRIPTEN
#include <emscripten.h>
#endif
pn_message_t * message;
pn_messenger_t * messenger;
pn_tracker_t tracker;
int running = 1;
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage(void)
{
printf("Usage: send [-a addr] [message]\n");
printf("-a \tThe target address [amqp[s]://domain[/name]]\n");
printf("message\tA text string to send.\n");
exit(0);
}
void process(void) {
pn_status_t status = pn_messenger_status(messenger, tracker);
if (status != PN_STATUS_PENDING) {
if (running) {
pn_messenger_stop(messenger);
running = 0;
}
}
if (pn_messenger_stopped(messenger)) {
pn_message_free(message);
pn_messenger_free(messenger);
message = NULL;
messenger = NULL;
}
}
#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
void pump(int fd, void* userData) {
while (pn_messenger_work(messenger, 0) >= 0) {
process();
}
}
void onclose(int fd, void* userData) {
process();
}
void onerror(int fd, int errno, const char* msg, void* userData) {
printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg);
}
#endif
int main(int argc, char** argv)
{
int c;
char * address = (char *) "amqp://0.0.0.0";
char * msgtext = (char *) "Hello World!";
pn_data_t* body;
opterr = 0;
while((c = getopt(argc, argv, "ha:b:c:")) != -1)
{
switch(c)
{
case 'a': address = optarg; break;
case 'h': usage(); break;
case '?':
if(optopt == 'a')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc) msgtext = argv[optind];
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously.
pn_messenger_set_outgoing_window(messenger, 1024);
pn_messenger_start(messenger);
pn_message_set_address(message, address);
body = pn_message_body(message);
pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
pn_messenger_put(messenger, message);
check(messenger);
tracker = pn_messenger_outgoing_tracker(messenger);
#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
emscripten_set_socket_error_callback(NULL, onerror);
emscripten_set_socket_open_callback(NULL, pump);
emscripten_set_socket_connection_callback(NULL, pump);
emscripten_set_socket_message_callback(NULL, pump);
emscripten_set_socket_close_callback(NULL, onclose);
#else // For native compiler.
while (running) {
pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
process();
}
while (messenger && !pn_messenger_stopped(messenger)) {
pn_messenger_work(messenger, 0);
process();
}
#endif
return 0;
}