| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <pubsub_serializer.h> |
| #include <pubsub_protocol.h> |
| #include <stdlib.h> |
| #include <pubsub/subscriber.h> |
| #include <memory.h> |
| #include <pubsub_constants.h> |
| #if !defined(__APPLE__) |
| #include <sys/epoll.h> |
| #endif |
| |
| #include <assert.h> |
| #include <arpa/inet.h> |
| #include <czmq.h> |
| #include <uuid/uuid.h> |
| |
| #include "pubsub_endpoint.h" |
| #include "celix_log_helper.h" |
| #include "pubsub_zmq_topic_receiver.h" |
| #include "pubsub_psa_zmq_constants.h" |
| #include "pubsub_admin_metrics.h" |
| #include "pubsub_utils.h" |
| #include "celix_api.h" |
| #include "celix_version.h" |
| #include "pubsub_serializer_handler.h" |
| #include "pubsub_interceptors_handler.h" |
| #include "celix_utils_api.h" |
| #include "pubsub_zmq_admin.h" |
| |
| #define PSA_ZMQ_RECV_TIMEOUT 1000 |
| |
| #ifndef UUID_STR_LEN |
| #define UUID_STR_LEN 37 |
| #endif |
| |
| |
| #define L_DEBUG(...) \ |
| celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__) |
| #define L_INFO(...) \ |
| celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_INFO, __VA_ARGS__) |
| #define L_WARN(...) \ |
| celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_WARNING, __VA_ARGS__) |
| #define L_ERROR(...) \ |
| celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__) |
| |
| struct pubsub_zmq_topic_receiver { |
| celix_bundle_context_t *ctx; |
| celix_log_helper_t *logHelper; |
| pubsub_serializer_handler_t* serializerHandler; |
| void *admin; |
| long protocolSvcId; |
| pubsub_protocol_service_t *protocol; |
| char *scope; |
| char *topic; |
| |
| pubsub_interceptors_handler_t *interceptorsHandler; |
| |
| void *zmqCtx; |
| void *zmqSock; |
| |
| char sync[8]; |
| |
| struct { |
| celix_thread_t thread; |
| celix_thread_mutex_t mutex; |
| bool running; |
| } recvThread; |
| |
| struct { |
| celix_thread_mutex_t mutex; |
| hash_map_t *map; //key = zmq url, value = psa_zmq_requested_connection_entry_t* |
| bool allConnected; //true if all requestedConnectection are connected |
| } requestedConnections; |
| |
| long subscriberTrackerId; |
| struct { |
| celix_thread_mutex_t mutex; |
| hash_map_t *map; //key = bnd id, value = psa_zmq_subscriber_entry_t |
| bool allInitialized; |
| } subscribers; |
| }; |
| |
| typedef struct psa_zmq_requested_connection_entry { |
| char *url; |
| bool connected; |
| bool statically; //true if the connection is statically configured through the topic properties. |
| } psa_zmq_requested_connection_entry_t; |
| |
| typedef struct psa_zmq_subscriber_entry { |
| hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t* |
| bool initialized; //true if the init function is called through the receive thread |
| } psa_zmq_subscriber_entry_t; |
| |
| |
| static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); |
| static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd); |
| static void* psa_zmq_recvThread(void * data); |
| static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver); |
| static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver); |
| static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties); |
| static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties); |
| |
| |
| pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx, |
| celix_log_helper_t *logHelper, |
| const char *scope, |
| const char *topic, |
| const celix_properties_t *topicProperties, |
| pubsub_serializer_handler_t* serHandler, |
| void *admin, |
| long protocolSvcId, |
| pubsub_protocol_service_t *protocol) { |
| pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver)); |
| receiver->ctx = ctx; |
| receiver->logHelper = logHelper; |
| receiver->serializerHandler = serHandler; |
| receiver->admin = admin; |
| receiver->protocolSvcId = protocolSvcId; |
| receiver->protocol = protocol; |
| receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope); |
| receiver->topic = celix_utils_strdup(topic); |
| |
| pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler); |
| |
| #ifdef BUILD_WITH_ZMQ_SECURITY |
| char* keys_bundle_dir = pubsub_getKeysBundleDir(bundle_context); |
| if (keys_bundle_dir == NULL) { |
| return CELIX_SERVICE_EXCEPTION; |
| } |
| |
| const char* keys_file_path = NULL; |
| const char* keys_file_name = NULL; |
| bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_PATH, &keys_file_path); |
| bundleContext_getProperty(bundle_context, PROPERTY_KEYS_FILE_NAME, &keys_file_name); |
| |
| char sub_cert_path[MAX_CERT_PATH_LENGTH]; |
| char pub_cert_path[MAX_CERT_PATH_LENGTH]; |
| |
| //certificate path ".cache/bundle{id}/version0.0/./META-INF/keys/subscriber/private/sub_{topic}.key.enc" |
| snprintf(sub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/subscriber/private/sub_%s.key.enc", keys_bundle_dir, topic); |
| snprintf(pub_cert_path, MAX_CERT_PATH_LENGTH, "%s/META-INF/keys/publisher/public/pub_%s.pub", keys_bundle_dir, topic); |
| free(keys_bundle_dir); |
| |
| printf("PSA_ZMQ_PSA_ZMQ_TS: Loading subscriber key '%s'\n", sub_cert_path); |
| printf("PSA_ZMQ_PSA_ZMQ_TS: Loading publisher key '%s'\n", pub_cert_path); |
| |
| zcert_t* sub_cert = get_zcert_from_encoded_file((char *) keys_file_path, (char *) keys_file_name, sub_cert_path); |
| if (sub_cert == NULL) { |
| printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", sub_cert_path); |
| return CELIX_SERVICE_EXCEPTION; |
| } |
| |
| zcert_t* pub_cert = zcert_load(pub_cert_path); |
| if (pub_cert == NULL) { |
| zcert_destroy(&sub_cert); |
| printf("PSA_ZMQ_PSA_ZMQ_TS: Cannot load key '%s'\n", pub_cert_path); |
| return CELIX_SERVICE_EXCEPTION; |
| } |
| |
| const char* pub_key = zcert_public_txt(pub_cert); |
| #endif |
| receiver->zmqCtx = zmq_ctx_new(); |
| if (receiver->zmqCtx != NULL) { |
| psa_zmq_setupZmqContext(receiver, topicProperties); |
| receiver->zmqSock = zmq_socket(receiver->zmqCtx, ZMQ_SUB); |
| } else { |
| //LOG ctx problem |
| } |
| if (receiver->zmqSock != NULL) { |
| psa_zmq_setupZmqSocket(receiver, topicProperties); |
| } else if (receiver->zmqCtx != NULL) { |
| //LOG sock problem |
| } |
| |
| if (receiver->zmqSock == NULL) { |
| #ifdef BUILD_WITH_ZMQ_SECURITY |
| zcert_destroy(&sub_cert); |
| zcert_destroy(&pub_cert); |
| #endif |
| } |
| |
| |
| if (receiver->zmqSock != NULL) { |
| celixThreadMutex_create(&receiver->subscribers.mutex, NULL); |
| celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL); |
| celixThreadMutex_create(&receiver->recvThread.mutex, NULL); |
| |
| receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL); |
| receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); |
| } |
| |
| const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_ZMQ_STATIC_CONNECT_URLS_FOR, topic, scope); |
| if(staticConnectUrls == NULL) { |
| staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_ZMQ_STATIC_CONNECT_URLS, NULL); |
| } |
| if (receiver->zmqSock != NULL && staticConnectUrls != NULL) { |
| char *urlsCopy = celix_utils_strdup(staticConnectUrls); |
| char* url; |
| char* save = urlsCopy; |
| |
| while ((url = strtok_r(save, " ", &save))) { |
| psa_zmq_requested_connection_entry_t *entry = calloc(1, sizeof(*entry)); |
| entry->statically = true; |
| entry->connected = false; |
| entry->url = celix_utils_strdup(url); |
| hashMap_put(receiver->requestedConnections.map, entry->url, entry); |
| receiver->requestedConnections.allConnected = false; |
| } |
| free(urlsCopy); |
| } |
| |
| //track subscribers |
| if (receiver->zmqSock != NULL ) { |
| int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); |
| char buf[size+1]; |
| snprintf(buf, (size_t)size+1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic); |
| celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS; |
| opts.filter.ignoreServiceLanguage = true; |
| opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME; |
| opts.filter.filter = buf; |
| opts.callbackHandle = receiver; |
| opts.addWithOwner = pubsub_zmqTopicReceiver_addSubscriber; |
| opts.removeWithOwner = pubsub_zmqTopicReceiver_removeSubscriber; |
| |
| receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts); |
| } |
| |
| if (receiver->zmqSock != NULL ) { |
| receiver->recvThread.running = true; |
| celixThread_create(&receiver->recvThread.thread, NULL, psa_zmq_recvThread, receiver); |
| char name[64]; |
| snprintf(name, 64, "ZMQ TR %s/%s", scope == NULL ? "(null)" : scope, topic); |
| celixThread_setName(&receiver->recvThread.thread, name); |
| } |
| |
| if (receiver->zmqSock == NULL) { |
| if (receiver->scope != NULL) { |
| free(receiver->scope); |
| } |
| free(receiver->topic); |
| free(receiver); |
| receiver = NULL; |
| L_ERROR("[PSA_ZMQ] Cannot create TopicReceiver for %s/%s", scope == NULL ? "(null)" : scope, topic); |
| } |
| |
| return receiver; |
| } |
| |
| void pubsub_zmqTopicReceiver_destroy(pubsub_zmq_topic_receiver_t *receiver) { |
| if (receiver != NULL) { |
| |
| celixThreadMutex_lock(&receiver->recvThread.mutex); |
| receiver->recvThread.running = false; |
| celixThreadMutex_unlock(&receiver->recvThread.mutex); |
| celixThread_join(receiver->recvThread.thread, NULL); |
| |
| celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId); |
| |
| celixThreadMutex_lock(&receiver->subscribers.mutex); |
| hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); |
| if (entry != NULL) { |
| hashMap_destroy(entry->subscriberServices, false, false); |
| free(entry); |
| } |
| } |
| hashMap_destroy(receiver->subscribers.map, false, false); |
| celixThreadMutex_unlock(&receiver->subscribers.mutex); |
| |
| celixThreadMutex_lock(&receiver->requestedConnections.mutex); |
| iter = hashMapIterator_construct(receiver->requestedConnections.map); |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); |
| if (entry != NULL) { |
| free(entry->url); |
| free(entry); |
| } |
| } |
| hashMap_destroy(receiver->requestedConnections.map, false, false); |
| celixThreadMutex_unlock(&receiver->requestedConnections.mutex); |
| |
| celixThreadMutex_destroy(&receiver->subscribers.mutex); |
| celixThreadMutex_destroy(&receiver->requestedConnections.mutex); |
| celixThreadMutex_destroy(&receiver->recvThread.mutex); |
| |
| zmq_close(receiver->zmqSock); |
| zmq_ctx_term(receiver->zmqCtx); |
| |
| pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler); |
| |
| free(receiver->scope); |
| free(receiver->topic); |
| } |
| free(receiver); |
| } |
| |
| const char* pubsub_zmqTopicReceiver_scope(pubsub_zmq_topic_receiver_t *receiver) { |
| return receiver->scope; |
| } |
| const char* pubsub_zmqTopicReceiver_topic(pubsub_zmq_topic_receiver_t *receiver) { |
| return receiver->topic; |
| } |
| |
| const char* pubsub_zmqTopicReceiver_serializerType(pubsub_zmq_topic_receiver_t *receiver) { |
| return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler); |
| } |
| |
| long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver) { |
| return receiver->protocolSvcId; |
| } |
| |
| void pubsub_zmqTopicReceiver_listConnections(pubsub_zmq_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) { |
| celixThreadMutex_lock(&receiver->requestedConnections.mutex); |
| hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); |
| char *url = NULL; |
| asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : ""); |
| if (entry->connected) { |
| celix_arrayList_add(connectedUrls, url); |
| } else { |
| celix_arrayList_add(unconnectedUrls, url); |
| } |
| } |
| celixThreadMutex_unlock(&receiver->requestedConnections.mutex); |
| } |
| |
| |
| void pubsub_zmqTopicReceiver_connectTo( |
| pubsub_zmq_topic_receiver_t *receiver, |
| const char *url) { |
| L_DEBUG("[PSA_ZMQ] TopicReceiver %s/%s connecting to zmq url %s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic, url); |
| |
| celixThreadMutex_lock(&receiver->requestedConnections.mutex); |
| psa_zmq_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url); |
| if (entry == NULL) { |
| entry = calloc(1, sizeof(*entry)); |
| entry->url = celix_utils_strdup(url); |
| entry->connected = false; |
| entry->statically = false; |
| hashMap_put(receiver->requestedConnections.map, (void*)entry->url, entry); |
| receiver->requestedConnections.allConnected = false; |
| } |
| celixThreadMutex_unlock(&receiver->requestedConnections.mutex); |
| |
| psa_zmq_connectToAllRequestedConnections(receiver); |
| } |
| |
| void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url) { |
| L_DEBUG("[PSA ZMQ] TopicReceiver %s/%s disconnect from zmq url %s", receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic, url); |
| |
| celixThreadMutex_lock(&receiver->requestedConnections.mutex); |
| psa_zmq_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url); |
| if (entry != NULL && entry->connected) { |
| if (zmq_disconnect(receiver->zmqSock, url) == 0) { |
| entry->connected = false; |
| } else { |
| L_WARN("[PSA_ZMQ] Error disconnecting from zmq url %s. (%s)", url, strerror(errno)); |
| } |
| } |
| if (entry != NULL) { |
| free(entry->url); |
| free(entry); |
| } |
| celixThreadMutex_unlock(&receiver->requestedConnections.mutex); |
| } |
| |
| static void pubsub_zmqTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { |
| pubsub_zmq_topic_receiver_t *receiver = handle; |
| |
| long bndId = celix_bundle_getId(bnd); |
| long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); |
| const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL); |
| if (receiver->scope == NULL) { |
| if (subScope != NULL) { |
| return; |
| } |
| } else if (subScope != NULL) { |
| if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) { |
| //not the same scope. ignore |
| return; |
| } |
| } else { |
| //receiver scope is not NULL, but subScope is NULL -> ignore |
| return; |
| } |
| |
| celixThreadMutex_lock(&receiver->subscribers.mutex); |
| psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); |
| if (entry != NULL) { |
| hashMap_put(entry->subscriberServices, (void*)svcId, svc); |
| } else { |
| //new create entry |
| entry = calloc(1, sizeof(*entry)); |
| entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL); |
| entry->initialized = false; |
| hashMap_put(entry->subscriberServices, (void*)svcId, svc); |
| hashMap_put(receiver->subscribers.map, (void*)bndId, entry); |
| } |
| celixThreadMutex_unlock(&receiver->subscribers.mutex); |
| } |
| |
| static void pubsub_zmqTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd) { |
| pubsub_zmq_topic_receiver_t *receiver = handle; |
| |
| long bndId = celix_bundle_getId(bnd); |
| long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1); |
| |
| celixThreadMutex_lock(&receiver->subscribers.mutex); |
| psa_zmq_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void*)bndId); |
| if (entry != NULL) { |
| hashMap_remove(entry->subscriberServices, (void*)svcId); |
| } |
| if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) { |
| //remove entry |
| hashMap_remove(receiver->subscribers.map, (void*)bndId); |
| hashMap_destroy(entry->subscriberServices, false, false); |
| free(entry); |
| } |
| celixThreadMutex_unlock(&receiver->subscribers.mutex); |
| } |
| |
| static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) { |
| const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId); |
| if (msgFqn == NULL) { |
| L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId); |
| return; |
| } |
| |
| void *deserializedMsg = NULL; |
| bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion); |
| if (validVersion) { |
| struct iovec deSerializeBuffer; |
| deSerializeBuffer.iov_base = message->payload.payload; |
| deSerializeBuffer.iov_len = message->payload.length; |
| celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg); |
| if (status == CELIX_SUCCESS) { |
| uint32_t msgId = message->header.msgId; |
| celix_properties_t *metadata = message->metadata.metadata; |
| bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata); |
| bool release = true; |
| if (cont) { |
| hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); |
| while (hashMapIterator_hasNext(&iter2)) { |
| pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); |
| svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release); |
| pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata); |
| if (!release && hashMapIterator_hasNext(&iter2)) { |
| //receive function has taken ownership and still more receive function to come .. |
| //deserialize again for new message |
| status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg); |
| if (status != CELIX_SUCCESS) { |
| L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); |
| break; |
| } |
| release = true; |
| } |
| } |
| if (release) { |
| pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg); |
| } |
| } |
| } else { |
| L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic); |
| } |
| } else { |
| L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x", |
| msgFqn, |
| pubsub_serializerHandler_getSerializationType(receiver->serializerHandler), |
| (int)message->header.msgMajorVersion, |
| (int)message->header.msgMinorVersion, |
| pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId), |
| pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId)); |
| } |
| } |
| |
| static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) { |
| celixThreadMutex_lock(&receiver->subscribers.mutex); |
| hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); |
| if (entry != NULL) { |
| processMsgForSubscriberEntry(receiver, entry, message, receiveTime); |
| } |
| } |
| celixThreadMutex_unlock(&receiver->subscribers.mutex); |
| } |
| |
| static void* psa_zmq_recvThread(void * data) { |
| pubsub_zmq_topic_receiver_t *receiver = data; |
| |
| celixThreadMutex_lock(&receiver->recvThread.mutex); |
| bool running = receiver->recvThread.running; |
| celixThreadMutex_unlock(&receiver->recvThread.mutex); |
| |
| celixThreadMutex_lock(&receiver->requestedConnections.mutex); |
| bool allConnected = receiver->requestedConnections.allConnected; |
| celixThreadMutex_unlock(&receiver->requestedConnections.mutex); |
| |
| celixThreadMutex_lock(&receiver->subscribers.mutex); |
| bool allInitialized = receiver->subscribers.allInitialized; |
| celixThreadMutex_unlock(&receiver->subscribers.mutex); |
| |
| while (running) { |
| if (!allConnected) { |
| psa_zmq_connectToAllRequestedConnections(receiver); |
| } |
| if (!allInitialized) { |
| psa_zmq_initializeAllSubscribers(receiver); |
| } |
| |
| zmsg_t *zmsg = zmsg_recv(receiver->zmqSock); |
| if (zmsg != NULL) { |
| if (zmsg_size(zmsg) < 2) { |
| L_WARN("[PSA_ZMQ_TR] Always expecting at least frames per zmsg (header + payload (+ metadata) (+ footer)), got %i frames", (int)zmsg_size(zmsg)); |
| } else { |
| zframe_t *header = zmsg_pop(zmsg); // header |
| zframe_t *payload = NULL; |
| zframe_t *metadata = NULL; |
| zframe_t *footer = NULL; |
| |
| pubsub_protocol_message_t message; |
| size_t footerSize = 0; |
| receiver->protocol->getFooterSize(receiver->protocol->handle, &footerSize); |
| receiver->protocol->decodeHeader(receiver->protocol->handle, zframe_data(header), zframe_size(header), &message); |
| if (message.header.payloadSize > 0) { |
| payload = zmsg_pop(zmsg); |
| receiver->protocol->decodePayload(receiver->protocol->handle, zframe_data(payload), zframe_size(payload), &message); |
| } else { |
| message.payload.payload = NULL; |
| message.payload.length = 0; |
| } |
| if (message.header.metadataSize > 0) { |
| metadata = zmsg_pop(zmsg); |
| receiver->protocol->decodeMetadata(receiver->protocol->handle, zframe_data(metadata), zframe_size(metadata), &message); |
| } else { |
| message.metadata.metadata = NULL; |
| } |
| if (footerSize > 0) { |
| footer = zmsg_pop(zmsg); // footer |
| receiver->protocol->decodeFooter(receiver->protocol->handle, zframe_data(footer), zframe_size(footer), &message); |
| } |
| if (header != NULL && payload != NULL) { |
| struct timespec receiveTime; |
| clock_gettime(CLOCK_REALTIME, &receiveTime); |
| processMsg(receiver, &message, &receiveTime); |
| } |
| celix_properties_destroy(message.metadata.metadata); |
| zframe_destroy(&header); |
| zframe_destroy(&payload); |
| zframe_destroy(&metadata); |
| zframe_destroy(&footer); |
| } |
| zmsg_destroy(&zmsg); |
| } else { |
| if (errno == EAGAIN) { |
| //nop |
| } else if (errno == EINTR) { |
| L_DEBUG("[PSA_ZMQ_TR] zmsg_recv interrupted"); |
| } else { |
| L_WARN("[PSA_ZMQ_TR] Error receiving zmq message: %s", strerror(errno)); |
| } |
| } |
| |
| celixThreadMutex_lock(&receiver->recvThread.mutex); |
| running = receiver->recvThread.running; |
| celixThreadMutex_unlock(&receiver->recvThread.mutex); |
| |
| celixThreadMutex_lock(&receiver->requestedConnections.mutex); |
| allConnected = receiver->requestedConnections.allConnected; |
| celixThreadMutex_unlock(&receiver->requestedConnections.mutex); |
| |
| celixThreadMutex_lock(&receiver->subscribers.mutex); |
| allInitialized = receiver->subscribers.allInitialized; |
| celixThreadMutex_unlock(&receiver->subscribers.mutex); |
| } // while |
| |
| return NULL; |
| } |
| |
| |
| static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) { |
| celixThreadMutex_lock(&receiver->requestedConnections.mutex); |
| if (!receiver->requestedConnections.allConnected) { |
| bool allConnected = true; |
| hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map); |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_zmq_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter); |
| if (!entry->connected) { |
| if (zmq_connect(receiver->zmqSock, entry->url) == 0) { |
| entry->connected = true; |
| } else { |
| L_WARN("[PSA_ZMQ] Error connecting to zmq url %s. (%s)", entry->url, strerror(errno)); |
| allConnected = false; |
| } |
| } |
| } |
| receiver->requestedConnections.allConnected = allConnected; |
| } |
| celixThreadMutex_unlock(&receiver->requestedConnections.mutex); |
| } |
| |
| static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver) { |
| celixThreadMutex_lock(&receiver->subscribers.mutex); |
| if (!receiver->subscribers.allInitialized) { |
| bool allInitialized = true; |
| hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map); |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_zmq_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter); |
| if (!entry->initialized) { |
| hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices); |
| while (hashMapIterator_hasNext(&iter2)) { |
| pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2); |
| int rc = 0; |
| if (svc != NULL && svc->init != NULL) { |
| rc = svc->init(svc->handle); |
| } |
| if (rc == 0) { |
| //note now only initialized on first subscriber entries added. |
| entry->initialized = true; |
| } else { |
| L_WARN("Cannot initialize subscriber svc. Got rc %i", rc); |
| allInitialized = false; |
| } |
| } |
| } |
| } |
| receiver->subscribers.allInitialized = allInitialized; |
| } |
| celixThreadMutex_unlock(&receiver->subscribers.mutex); |
| } |
| |
| static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties) { |
| //NOTE. ZMQ will abort when performing a sched_setscheduler without permission. |
| //As result permission has to be checked first. |
| //TODO update this to use cap_get_pid and cap-get_flag instead of check user is root (note adds dep to -lcap) |
| bool gotPermission = false; |
| if (getuid() == 0) { |
| gotPermission = true; |
| } |
| |
| |
| long prio = celix_properties_getAsLong(topicProperties, PUBSUB_ZMQ_THREAD_REALTIME_PRIO, -1L); |
| if (prio > 0 && prio < 100) { |
| if (gotPermission) { |
| zmq_ctx_set(receiver->zmqCtx, ZMQ_THREAD_PRIORITY, (int) prio); |
| } else { |
| L_INFO("Skipping configuration of thread prio to %i. No permission\n", (int)prio); |
| } |
| } |
| |
| const char *sched = celix_properties_get(topicProperties, PUBSUB_ZMQ_THREAD_REALTIME_SCHED, NULL); |
| if (sched != NULL) { |
| int policy = ZMQ_THREAD_SCHED_POLICY_DFLT; |
| if (strncmp("SCHED_OTHER", sched, 16) == 0) { |
| policy = SCHED_OTHER; |
| #if !defined(__APPLE__) |
| } else if (strncmp("SCHED_BATCH", sched, 16) == 0) { |
| policy = SCHED_BATCH; |
| } else if (strncmp("SCHED_IDLE", sched, 16) == 0) { |
| policy = SCHED_IDLE; |
| #endif |
| } else if (strncmp("SCHED_FIFO", sched, 16) == 0) { |
| policy = SCHED_FIFO; |
| } else if (strncmp("SCHED_RR", sched, 16) == 0) { |
| policy = SCHED_RR; |
| } |
| if (gotPermission) { |
| zmq_ctx_set(receiver->zmqCtx, ZMQ_THREAD_SCHED_POLICY, policy); |
| } else { |
| L_INFO("Skipping configuration of thread scheduling to %s. No permission\n", sched); |
| } |
| } |
| } |
| |
| static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties) { |
| int timeout = PSA_ZMQ_RECV_TIMEOUT; |
| int res = zmq_setsockopt(receiver->zmqSock, ZMQ_RCVTIMEO, &timeout, sizeof(timeout)); |
| if (res) { |
| L_ERROR("[PSA_ZMQ] Cannot set ZMQ socket option ZMQ_RCVTIMEO errno=%d", errno); |
| } |
| |
| #ifdef ZMQ_HWM |
| long hwmProp = celix_properties_getAsLong(topicProperties, PUBSUB_ZMQ_HWM, -1L); |
| if (hwmProp >= 0) { |
| unsigned long hwm = (unsigned long)hwmProp; |
| zmq_setsockopt(receiver->zmqSock, ZMQ_HWM, &hwm, sizeof(hwm)); |
| } |
| #endif |
| |
| #ifdef BUILD_WITH_ZMQ_SECURITY |
| |
| zcert_apply (sub_cert, zmq_s); |
| zsock_set_curve_serverkey (zmq_s, pub_key); //apply key of publisher to socket of subscriber |
| #endif |
| receiver->protocol->getSyncHeader(receiver->protocol->handle, receiver->sync); |
| zsock_set_subscribe(receiver->zmqSock, receiver->sync); |
| |
| #ifdef BUILD_WITH_ZMQ_SECURITY |
| ts->zmq_cert = sub_cert; |
| ts->zmq_pub_cert = pub_cert; |
| #endif |
| } |