| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #include <stdlib.h> |
| #include <stdio.h> |
| #include <string.h> |
| |
| #include "pncompat/misc_funcs.inc" |
| |
| #include "proton/reactor.h" |
| #include "proton/message.h" |
| #include "proton/connection.h" |
| #include "proton/session.h" |
| #include "proton/link.h" |
| #include "proton/delivery.h" |
| #include "proton/event.h" |
| #include "proton/handlers.h" |
| #include "proton/transport.h" |
| #include "proton/url.h" |
| |
| static int quiet = 0; |
| |
| // Credit batch if unlimited receive (-c 0) |
| static const int CAPACITY = 100; |
| #define MAX_SIZE 512 |
| |
| // Example application data. This data will be instantiated in the event |
| // handler, and is available during event processing. In this example it |
| // holds configuration and state information. |
| // |
| typedef struct { |
| int count; // # of messages to receive before exiting |
| const char *source; // name of the source node to receive from |
| pn_message_t *message; // holds the received message |
| } app_data_t; |
| |
| // helper to pull pointer to app_data_t instance out of the pn_handler_t |
| // |
| #define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler)) |
| |
| // Called when reactor exits to clean up app_data |
| // |
| static void delete_handler(pn_handler_t *handler) |
| { |
| app_data_t *d = GET_APP_DATA(handler); |
| if (d->message) { |
| pn_decref(d->message); |
| d->message = NULL; |
| } |
| } |
| |
| |
| /* Process each event posted by the reactor. |
| */ |
| static void event_handler(pn_handler_t *handler, |
| pn_event_t *event, |
| pn_event_type_t type) |
| { |
| app_data_t *data = GET_APP_DATA(handler); |
| |
| switch (type) { |
| |
| case PN_CONNECTION_INIT: { |
| // Create and open all the endpoints needed to send a message |
| // |
| pn_connection_t *conn; |
| pn_session_t *ssn; |
| pn_link_t *receiver; |
| |
| conn = pn_event_connection(event); |
| pn_connection_open(conn); |
| ssn = pn_session(conn); |
| pn_session_open(ssn); |
| receiver = pn_receiver(ssn, "MyReceiver"); |
| pn_terminus_set_address(pn_link_source(receiver), data->source); |
| pn_link_open(receiver); |
| // cannot receive without granting credit: |
| pn_link_flow(receiver, data->count ? data->count : CAPACITY); |
| } break; |
| |
| case PN_DELIVERY: { |
| // A message has been received |
| // |
| pn_link_t *link = NULL; |
| pn_delivery_t *dlv = pn_event_delivery(event); |
| if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { |
| // A full message has arrived |
| if (!quiet) { |
| static char buffer[MAX_SIZE]; |
| ssize_t len; |
| pn_bytes_t bytes; |
| bool found = false; |
| |
| // try to decode the message body |
| if (pn_delivery_pending(dlv) < MAX_SIZE) { |
| // read in the raw data |
| len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); |
| if (len > 0) { |
| // decode it into a proton message |
| pn_message_clear(data->message); |
| if (PN_OK == pn_message_decode(data->message, buffer, |
| len)) { |
| // Assuming the message came from the sender |
| // example, try to parse out a single string from |
| // the payload |
| // |
| pn_data_scan(pn_message_body(data->message), "?S", |
| &found, &bytes); |
| } |
| } |
| } |
| if (found) { |
| fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size, |
| bytes.start); |
| } else { |
| fprintf(stdout, "Message received!\n"); |
| } |
| } |
| |
| link = pn_delivery_link(dlv); |
| |
| if (!pn_delivery_settled(dlv)) { |
| // remote has not settled, so it is tracking the delivery. Ack |
| // it. |
| pn_delivery_update(dlv, PN_ACCEPTED); |
| } |
| |
| // done with the delivery, move to the next and free it |
| pn_link_advance(link); |
| pn_delivery_settle(dlv); // dlv is now freed |
| |
| if (data->count == 0) { |
| // send forever - see if more credit is needed |
| if (pn_link_credit(link) < CAPACITY/2) { |
| // Grant enough credit to bring it up to CAPACITY: |
| pn_link_flow(link, CAPACITY - pn_link_credit(link)); |
| } |
| } else if (--data->count == 0) { |
| // done receiving, close the endpoints |
| pn_session_t *ssn = pn_link_session(link); |
| pn_link_close(link); |
| pn_session_close(ssn); |
| pn_connection_close(pn_session_connection(ssn)); |
| } |
| } |
| } break; |
| |
| case PN_TRANSPORT_ERROR: { |
| // The connection to the peer failed. |
| // |
| pn_transport_t *tport = pn_event_transport(event); |
| pn_condition_t *cond = pn_transport_condition(tport); |
| fprintf(stderr, "Network transport failed!\n"); |
| if (pn_condition_is_set(cond)) { |
| const char *name = pn_condition_get_name(cond); |
| const char *desc = pn_condition_get_description(cond); |
| fprintf(stderr, " Error: %s Description: %s\n", |
| (name) ? name : "<error name not provided>", |
| (desc) ? desc : "<no description provided>"); |
| } |
| // pn_reactor_process() will exit with a false return value, stopping |
| // the main loop. |
| } break; |
| |
| default: |
| break; |
| } |
| } |
| |
| static void usage(void) |
| { |
| printf("Usage: receiver <options>\n"); |
| printf("-a \tThe host address [localhost:5672]\n"); |
| printf("-c \t# of messages to receive, 0=receive forever [1]\n"); |
| printf("-s \tSource address [examples]\n"); |
| printf("-i \tContainer name [ReceiveExample]\n"); |
| printf("-q \tQuiet - turn off stdout\n"); |
| exit(1); |
| } |
| |
| int main(int argc, char** argv) |
| { |
| const char *address = "localhost"; |
| const char *container = "ReceiveExample"; |
| int c; |
| pn_reactor_t *reactor = NULL; |
| pn_url_t *url = NULL; |
| pn_connection_t *conn = NULL; |
| |
| /* create a handler for the connection's events. |
| * event_handler will be called for each event. The handler will allocate |
| * a app_data_t instance which can be accessed when the event_handler is |
| * called. |
| */ |
| pn_handler_t *handler = pn_handler_new(event_handler, |
| sizeof(app_data_t), |
| delete_handler); |
| |
| /* set up the application data with defaults */ |
| app_data_t *app_data = GET_APP_DATA(handler); |
| memset(app_data, 0, sizeof(app_data_t)); |
| app_data->count = 1; |
| app_data->source = "examples"; |
| app_data->message = pn_message(); |
| |
| /* Attach the pn_handshaker() handler. This handler deals with endpoint |
| * events from the peer so we don't have to. |
| */ |
| { |
| pn_handler_t *handshaker = pn_handshaker(); |
| pn_handler_add(handler, handshaker); |
| pn_decref(handshaker); |
| } |
| |
| /* command line options */ |
| opterr = 0; |
| while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) { |
| switch(c) { |
| case 'h': usage(); break; |
| case 'a': address = optarg; break; |
| case 'c': |
| app_data->count = atoi(optarg); |
| if (app_data->count < 0) usage(); |
| break; |
| case 's': app_data->source = optarg; break; |
| case 'i': container = optarg; break; |
| case 'q': quiet = 1; break; |
| default: |
| usage(); |
| break; |
| } |
| } |
| |
| reactor = pn_reactor(); |
| |
| url = pn_url_parse(address); |
| if (url == NULL) { |
| fprintf(stderr, "Invalid host address %s\n", address); |
| exit(1); |
| } |
| conn = pn_reactor_connection_to_host(reactor, |
| pn_url_get_host(url), |
| pn_url_get_port(url), |
| handler); |
| pn_decref(url); |
| pn_decref(handler); |
| |
| // the container name should be unique for each client |
| pn_connection_set_container(conn, container); |
| |
| // wait up to 5 seconds for activity before returning from |
| // pn_reactor_process() |
| pn_reactor_set_timeout(reactor, 5000); |
| |
| pn_reactor_start(reactor); |
| |
| while (pn_reactor_process(reactor)) { |
| /* Returns 'true' until the connection is shut down. |
| * pn_reactor_process() will return true at least once every 5 seconds |
| * (due to the timeout). If no timeout was configured, |
| * pn_reactor_process() returns as soon as it finishes processing all |
| * pending I/O and events. Once the connection has closed, |
| * pn_reactor_process() will return false. |
| */ |
| } |
| pn_decref(reactor); |
| |
| return 0; |
| } |