blob: a49ff2358b2425e4c9522f1f453187b8b11eb47c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <pubsub_serializer.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <pubsub/subscriber.h>
#include <memory.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
#include "pubsub_tcp_handler.h"
#include "pubsub_tcp_topic_receiver.h"
#include "pubsub_psa_tcp_constants.h"
#include "pubsub_tcp_common.h"
#include "pubsub_tcp_admin.h"
#include <uuid/uuid.h>
#include <pubsub_utils.h>
#include "pubsub_interceptors_handler.h"
#include <celix_api.h>
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
#endif
#define L_DEBUG(...) \
celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
#define L_INFO(...) \
celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_INFO, __VA_ARGS__)
#define L_WARN(...) \
celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_WARNING, __VA_ARGS__)
#define L_ERROR(...) \
celix_logHelper_log(receiver->logHelper, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__)
struct pubsub_tcp_topic_receiver {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
pubsub_serializer_handler_t* serializerHandler;
void *admin;
size_t timeout;
bool isPassive;
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
struct {
celix_thread_t thread;
celix_thread_mutex_t mutex;
bool running;
} thread;
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = tcp url, value = psa_tcp_requested_connection_entry_t*
bool allConnected; //true if all requestedConnection are connected
} requestedConnections;
long subscriberTrackerId;
struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = bnd id, value = psa_tcp_subscriber_entry_t
bool allInitialized;
} subscribers;
};
typedef struct psa_tcp_requested_connection_entry {
pubsub_tcp_topic_receiver_t *parent;
char *url;
bool connected;
bool statically; //true if the connection is statically configured through the topic properties.
} psa_tcp_requested_connection_entry_t;
typedef struct psa_tcp_subscriber_entry {
hash_map_t *subscriberServices; //key = servide id, value = pubsub_subscriber_t*
bool initialized; //true if the init function is called through the receive thread
} psa_tcp_subscriber_entry_t;
static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *bnd);
static void *psa_tcp_recvThread(void *data);
static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime);
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
receiver->serializerHandler = serializerHandler;
receiver->admin = admin;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
receiver->scope = celix_utils_strdup(scope);
receiver->topic = celix_utils_strdup(topic);
pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
if (isPassive) {
receiver->isPassive = psa_tcp_isPassive(isPassive);
}
if (topicProperties != NULL) {
if(staticConnectUrls == NULL) {
staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
}
if (isPassive == NULL) {
receiver->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
}
if (passiveKey == NULL) {
passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
}
}
// Set receiver connection thread timeout.
// property is in ms, timeout value in us. (convert ms to us).
receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
/* When it's an endpoint share the socket with the sender */
if (passiveKey != NULL) {
celixThreadMutex_lock(&handlerStore->mutex);
pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (receiver->socketHandler == NULL)
receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
entry = receiver->socketHandler;
receiver->sharedSocketHandler = receiver->socketHandler;
hashMap_put(handlerStore->map, (void *) passiveKey, entry);
} else {
receiver->socketHandler = entry;
receiver->sharedSocketHandler = entry;
}
celixThreadMutex_unlock(&handlerStore->mutex);
} else {
receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
}
if (receiver->socketHandler != NULL) {
long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
psa_tcp_disConnectHandler);
pubsub_tcpHandler_setThreadPriority(receiver->socketHandler, prio, sched);
pubsub_tcpHandler_setReceiveRetryCnt(receiver->socketHandler, (unsigned int) retryCnt);
pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
}
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
celixThreadMutex_create(&receiver->thread.mutex, NULL);
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (!receiver->isPassive)) {
char *urlsCopy = celix_utils_strdup(staticConnectUrls);
char *url;
char *save = urlsCopy;
while ((url = strtok_r(save, " ", &save))) {
psa_tcp_requested_connection_entry_t *entry = calloc(1, sizeof(*entry));
entry->statically = true;
entry->connected = false;
entry->url = celix_utils_strdup(url);
entry->parent = receiver;
hashMap_put(receiver->requestedConnections.map, entry->url, entry);
receiver->requestedConnections.allConnected = false;
}
free(urlsCopy);
}
if (receiver->socketHandler != NULL && (!receiver->isPassive)) {
// Configure Receiver thread
receiver->thread.running = true;
celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
char name[64];
snprintf(name, 64, "TCP TR %s/%s", scope == NULL ? "(null)" : scope, topic);
celixThread_setName(&receiver->thread.thread, name);
}
//track subscribers
if (receiver->socketHandler != NULL) {
int size = snprintf(NULL, 0, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
char buf[size + 1];
snprintf(buf, (size_t) size + 1, "(%s=%s)", PUBSUB_SUBSCRIBER_TOPIC, topic);
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.filter.ignoreServiceLanguage = true;
opts.filter.serviceName = PUBSUB_SUBSCRIBER_SERVICE_NAME;
opts.filter.filter = buf;
opts.callbackHandle = receiver;
opts.addWithOwner = pubsub_tcpTopicReceiver_addSubscriber;
opts.removeWithOwner = pubsub_tcpTopicReceiver_removeSubscriber;
receiver->subscriberTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
}
if (receiver->socketHandler == NULL) {
if (receiver->scope != NULL) {
free(receiver->scope);
}
free(receiver->topic);
free(receiver);
receiver = NULL;
L_ERROR("[PSA_TCP] Cannot create TopicReceiver for %s/%s", scope == NULL ? "(null)" : scope, topic);
}
return receiver;
}
void pubsub_tcpTopicReceiver_destroy(pubsub_tcp_topic_receiver_t *receiver) {
if (receiver != NULL) {
celixThreadMutex_lock(&receiver->thread.mutex);
if (receiver->thread.running) {
receiver->thread.running = false;
celixThreadMutex_unlock(&receiver->thread.mutex);
celixThread_join(receiver->thread.thread, NULL);
}
celix_bundleContext_stopTracker(receiver->ctx, receiver->subscriberTrackerId);
celixThreadMutex_lock(&receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
}
hashMap_destroy(receiver->subscribers.map, false, false);
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
free(entry->url);
free(entry);
}
}
hashMap_destroy(receiver->requestedConnections.map, false, false);
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->subscribers.mutex);
celixThreadMutex_destroy(&receiver->requestedConnections.mutex);
celixThreadMutex_destroy(&receiver->thread.mutex);
pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, NULL, NULL);
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, NULL, NULL, NULL);
if ((receiver->socketHandler) && (receiver->sharedSocketHandler == NULL)) {
pubsub_tcpHandler_destroy(receiver->socketHandler);
receiver->socketHandler = NULL;
}
pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
if (receiver->scope != NULL) {
free(receiver->scope);
}
free(receiver->topic);
}
free(receiver);
}
const char *pubsub_tcpTopicReceiver_scope(pubsub_tcp_topic_receiver_t *receiver) {
return receiver->scope;
}
const char *pubsub_tcpTopicReceiver_topic(pubsub_tcp_topic_receiver_t *receiver) {
return receiver->topic;
}
const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) {
return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) {
return receiver->protocolSvcId;
}
void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
celix_array_list_t *unconnectedUrls) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
if (receiver->isPassive) {
char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
char *url = NULL;
asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
if (interface_url) {
celix_arrayList_add(connectedUrls, url);
} else {
celix_arrayList_add(unconnectedUrls, url);
}
free(interface_url);
} else {
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
char *url = NULL;
asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
if (entry->connected) {
celix_arrayList_add(connectedUrls, url);
} else {
celix_arrayList_add(unconnectedUrls, url);
}
}
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *receiver) {
return receiver->isPassive;
}
void pubsub_tcpTopicReceiver_connectTo(
pubsub_tcp_topic_receiver_t *receiver,
const char *url) {
L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s",
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic,
url);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
if (entry == NULL) {
entry = calloc(1, sizeof(*entry));
entry->url = celix_utils_strdup(url);
entry->connected = false;
entry->statically = false;
entry->parent = receiver;
hashMap_put(receiver->requestedConnections.map, (void *) entry->url, entry);
receiver->requestedConnections.allConnected = false;
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
psa_tcp_connectToAllRequestedConnections(receiver);
}
void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url) {
L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s",
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic,
url);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
psa_tcp_requested_connection_entry_t *entry = hashMap_remove(receiver->requestedConnections.map, url);
if (entry != NULL) {
int rc = pubsub_tcpHandler_disconnect(receiver->socketHandler, entry->url);
if (rc < 0)
L_WARN("[PSA_TCP] Error disconnecting from tcp url %s. (%s)", url, strerror(errno));
}
if (entry != NULL) {
free(entry->url);
free(entry);
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
const celix_bundle_t *bnd) {
pubsub_tcp_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
const char *subScope = celix_properties_get(props, PUBSUB_SUBSCRIBER_SCOPE, NULL);
if (receiver->scope == NULL) {
if (subScope != NULL) {
return;
}
} else if (subScope != NULL) {
if (strncmp(subScope, receiver->scope, strlen(receiver->scope)) != 0) {
//not the same scope. ignore
return;
}
} else {
//receiver scope is not NULL, but subScope is NULL -> ignore
return;
}
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *)bndId);
if (entry != NULL) {
hashMap_put(entry->subscriberServices, (void*)svcId, svc);
} else {
//new create entry
entry = calloc(1, sizeof(*entry));
entry->subscriberServices = hashMap_create(NULL, NULL, NULL, NULL);
entry->initialized = false;
receiver->subscribers.allInitialized = false;
hashMap_put(entry->subscriberServices, (void*)svcId, svc);
hashMap_put(receiver->subscribers.map, (void *)bndId, entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props,
const celix_bundle_t *bnd) {
pubsub_tcp_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
hashMap_remove(entry->subscriberServices, (void*)svcId);
}
if (entry != NULL && hashMap_size(entry->subscriberServices) == 0) {
//remove entry
hashMap_remove(receiver->subscribers.map, (void *) bndId);
hashMap_destroy(entry->subscriberServices, false, false);
free(entry);
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
static inline void
processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
//NOTE receiver->subscribers.mutex locked
const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
if (msgFqn == NULL) {
L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
return;
}
void *deSerializedMsg = NULL;
bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
// When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
if (message->payload.payload == deSerializedMsg) {
*releaseMsg = true;
}
if (status == CELIX_SUCCESS) {
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
bool release = true;
if (cont) {
hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
if (!release && hashMapIterator_hasNext(&iter)) {
//receive function has taken ownership and still more receive function to come ..
//deserialize again for new message
status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
break;
}
release = true;
}
}
if (release) {
pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
}
if (message->metadata.metadata) {
celix_properties_destroy(message->metadata.metadata);
}
}
} else {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
} else {
L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
msgFqn,
pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
(int)message->header.msgMajorVersion,
(int)message->header.msgMinorVersion,
pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
}
static void
processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime) {
pubsub_tcp_topic_receiver_t *receiver = handle;
celixThreadMutex_lock(&receiver->subscribers.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (entry != NULL) {
processMsgForSubscriberEntry(receiver, entry, message, release, receiveTime);
}
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
static void *psa_tcp_recvThread(void *data) {
pubsub_tcp_topic_receiver_t *receiver = data;
celixThreadMutex_lock(&receiver->thread.mutex);
bool running = receiver->thread.running;
celixThreadMutex_unlock(&receiver->thread.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
bool allConnected = receiver->requestedConnections.allConnected;
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
celixThreadMutex_lock(&receiver->subscribers.mutex);
bool allInitialized = receiver->subscribers.allInitialized;
celixThreadMutex_unlock(&receiver->subscribers.mutex);
while (running) {
if (!allConnected) {
psa_tcp_connectToAllRequestedConnections(receiver);
}
if (!allInitialized) {
psa_tcp_initializeAllSubscribers(receiver);
}
usleep(receiver->timeout);
celixThreadMutex_lock(&receiver->thread.mutex);
running = receiver->thread.running;
celixThreadMutex_unlock(&receiver->thread.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
allConnected = receiver->requestedConnections.allConnected;
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
celixThreadMutex_lock(&receiver->subscribers.mutex);
allInitialized = receiver->subscribers.allInitialized;
celixThreadMutex_unlock(&receiver->subscribers.mutex);
} // while
return NULL;
}
static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
if (!receiver->requestedConnections.allConnected) {
bool allConnected = true;
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
if (rc < 0) {
allConnected = false;
}
}
}
receiver->requestedConnections.allConnected = allConnected;
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock) {
pubsub_tcp_topic_receiver_t *receiver = handle;
L_DEBUG("[PSA_TCP] TopicReceiver %s/%s connecting to tcp url %s",
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic,
url);
if (lock)
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
if (entry == NULL) {
entry = calloc(1, sizeof(*entry));
entry->parent = receiver;
entry->url = celix_utils_strdup(url);
entry->statically = true;
hashMap_put(receiver->requestedConnections.map, (void *) entry->url, entry);
receiver->requestedConnections.allConnected = false;
}
entry->connected = true;
if (lock)
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock) {
pubsub_tcp_topic_receiver_t *receiver = handle;
L_DEBUG("[PSA TCP] TopicReceiver %s/%s disconnect from tcp url %s",
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic,
url);
if (lock)
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
psa_tcp_requested_connection_entry_t *entry = hashMap_get(receiver->requestedConnections.map, url);
if (entry != NULL) {
entry->connected = false;
receiver->requestedConnections.allConnected = false;
}
if (lock)
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->subscribers.mutex);
if (!receiver->subscribers.allInitialized) {
bool allInitialized = true;
hash_map_iterator_t iter = hashMapIterator_construct(receiver->subscribers.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_subscriber_entry_t *entry = hashMapIterator_nextValue(&iter);
if (!entry->initialized) {
hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
while (hashMapIterator_hasNext(&iter2)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
int rc = 0;
if (svc != NULL && svc->init != NULL) {
rc = svc->init(svc->handle);
}
if (rc == 0) {
//note now only initialized on first subscriber entries added.
entry->initialized = true;
} else {
L_WARN("Cannot initialize subscriber svc. Got rc %i", rc);
allInitialized = false;
}
}
}
}
receiver->subscribers.allInitialized = allInitialized;
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}