Merge pull request #286 from apache/feature/async_svc_registration
Feature/async svc registration
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 75fd9bd..96238c5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -32,6 +32,8 @@
message( FATAL_ERROR "Building Celix using CMake 3.3 and makefiles is not supported due to a bug in the Makefile Generator (see Bug 15696). Please change the used CMake version - both, CMake 3.2 and CMake 3.4 are working fine. Or use a different generator (e.g. Ninja)." )
ENDIF()
+# Options
+option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
if (ENABLE_TESTING)
find_package(GTest CONFIG QUIET)
if (NOT GTest_FOUND)
@@ -120,8 +122,6 @@
# Default bundle version
set(DEFAULT_VERSION 1.0.0)
-# Options
-option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
if (ENABLE_TESTING)
enable_testing()
endif()
diff --git a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
index a3bedf4..a19b425 100644
--- a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
@@ -28,6 +28,7 @@
src/pubsub_tcp_topic_sender.c
src/pubsub_tcp_topic_receiver.c
src/pubsub_tcp_handler.c
+ src/pubsub_tcp_common.c
)
set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
index a5ae576..d93c478 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
@@ -121,9 +121,7 @@
celix_properties_t *props = celix_properties_create();
celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_tcp");
celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_tcp");
- celix_properties_set(props,
- CELIX_SHELL_COMMAND_DESCRIPTION,
- "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
+ celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 4284042..d493eaa 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,7 +23,7 @@
#define PSA_TCP_BASE_PORT "PSA_TCP_BASE_PORT"
#define PSA_TCP_MAX_PORT "PSA_TCP_MAX_PORT"
-#define PSA_TCP_MAX_RECV_SESSIONS "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE "PSA_TCP_MAX_MESSAGE_SIZE"
#define PSA_TCP_RECV_BUFFER_SIZE "PSA_TCP_RECV_BUFFER_SIZE"
#define PSA_TCP_TIMEOUT "PSA_TCP_TIMEOUT"
#define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
@@ -31,8 +31,7 @@
#define PSA_TCP_DEFAULT_BASE_PORT 5501
#define PSA_TCP_DEFAULT_MAX_PORT 6000
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS 1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE UINT32_MAX
#define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE 65 * 1024
#define PSA_TCP_DEFAULT_TIMEOUT 2000 // 2 seconds
#define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
@@ -83,7 +82,7 @@
* Name of environment variable with ip/url to bind to
* e.g. PSA_TCP_STATIC_BIND_FOR_topic_scope="tcp://0.0.0.0:4444"
*/
-#define PUBSUB_TCP_STATIC_BIND_URL_FOR "PSA_TCP_STATIC_BIND_URL_FOR_"
+#define PUBSUB_TCP_STATIC_BIND_URL_FOR "PSA_TCP_STATIC_BIND_URL_FOR_"
/**
* Can be set in the topic properties to fix a static url used for discovery
@@ -102,21 +101,33 @@
*/
#define PUBSUB_TCP_STATIC_CONNECT_URLS "tcp.static.connect.urls"
+
+/**
+ * Defines if the publisher / subscriber is a passive endpoint and shares
+ * the connection with publisher / subscriber endpoint with the matching (passive) key
+ * e.g. tcp.passive.configured="true" means that a publisher / subscriber is passive,
+ * when a publisher / subscriber is found with a matching key (for example tcp.passive.key="localhost").
+ * This creates full-duplex connection using a single socket.
+ */
+#define PUBSUB_TCP_PASSIVE_CONFIGURED "tcp.passive.configured"
+#define PUBSUB_TCP_PASSIVE_KEY "tcp.passive.key"
+
+/**
+ * Name of environment variable to indicate that passive endpoint is configured
+ * e.g. PSA_TCP_PASSIVE_CONFIGURED_topic_scope="true"
+ */
+#define PUBSUB_TCP_PASSIVE_ENABLED "PSA_TCP_PASSIVE_CONFIGURED_"
+/**
+ * Name of environment variable to configure the passive key (see PUBSUB_TCP_PASSIVE_KEY )
+ * e.g. PSA_TCP_PASSIVE_KEY__topic_scope="tcp://localhost:4444"
+ */
+#define PUBSUB_TCP_PASSIVE_SELECTION_KEY "PSA_TCP_PASSIVE_KEY_"
+
/**
* Name of environment variable with space-separated list of ips/urls to connect to
* e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
*/
-#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR "PSA_TCP_STATIC_CONNECT_URL_FOR_"
-
-/**
- * The static endpoint type which a static endpoint should be configured.
- * Can be set in the topic properties.
- */
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE "tcp.static.endpoint.type"
-
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER "server"
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT "client"
-
+#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR "PSA_TCP_STATIC_CONNECT_URL_FOR_"
/**
* Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 89f5863..ba3bdb8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -18,11 +18,6 @@
*/
#include <memory.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netdb.h>
-#include <ifaddrs.h>
#include <pubsub_endpoint.h>
#include <pubsub_serializer.h>
#include <ip_utils.h>
@@ -51,7 +46,6 @@
char *ipAddress;
unsigned int basePort;
- unsigned int maxPort;
double qosSampleScore;
double qosControlScore;
@@ -119,9 +113,7 @@
psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_TCP_VERBOSE_KEY, PUBSUB_TCP_VERBOSE_DEFAULT);
psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_BASE_PORT, PSA_TCP_DEFAULT_BASE_PORT);
- long maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_PORT, PSA_TCP_DEFAULT_MAX_PORT);
psa->basePort = (unsigned int) basePort;
- psa->maxPort = (unsigned int) maxPort;
psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_DEFAULT_SCORE_KEY, PSA_TCP_DEFAULT_SCORE);
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_SAMPLE_SCORE_KEY,
PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE);
@@ -434,17 +426,19 @@
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *serType = serEntry->serType;
const char *protType = protEntry->protType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, protType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
- celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
-
+ if (pubsub_tcpTopicSender_isPassive(sender)) {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+ } else {
+ celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+ }
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL)
- celix_properties_set(newEndpoint, "container_name", cn);
+ celix_properties_set(newEndpoint, "container_name", cn);
hashMap_put(psa->topicSenders.map, key, sender);
} else {
L_ERROR("[PSA TCP] Error creating a TopicSender");
@@ -458,10 +452,6 @@
celixThreadMutex_unlock(&psa->protocols.mutex);
celixThreadMutex_unlock(&psa->serializers.mutex);
- if (sender != NULL && newEndpoint != NULL) {
- //TODO connect endpoints to sender, NOTE is this needed for a tcp topic sender?
- }
-
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
}
@@ -483,7 +473,6 @@
char *mapKey = hashMapEntry_getKey(entry);
pubsub_tcp_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
free(mapKey);
- //TODO disconnect endpoints to sender. note is this needed for a tcp topic sender?
pubsub_tcpTopicSender_destroy(sender);
} else {
L_ERROR("[PSA TCP] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists",
@@ -636,24 +625,13 @@
//note can be called with discoveredEndpoint.mutex lock
celix_status_t status = CELIX_SUCCESS;
- const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
- const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
-
- const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
- const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
if (url == NULL) {
L_WARN("[PSA TCP] Error got endpoint without tcp url");
status = CELIX_BUNDLE_EXCEPTION;
} else {
- if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) {
- if (scope == NULL && eScope == NULL) {
- pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
- } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) {
- pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
- }
- }
+ pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
}
return status;
@@ -689,6 +667,21 @@
FILE *errStream __attribute__((unused))) {
pubsub_tcp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
+ char *line = celix_utils_strdup(commandLine);
+ char *token = line;
+ strtok_r(line, " ", &token); //first token is command name
+ strtok_r(NULL, " ", &token); //second token is sub command
+
+ if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+ celixThreadMutex_lock(&psa->topicReceivers.mutex);
+ fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+ celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+ }
+ if (celix_utils_stringEquals(token, "nr_of_senders")) {
+ celixThreadMutex_lock(&psa->topicSenders.mutex);
+ fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+ celixThreadMutex_unlock(&psa->topicSenders.mutex);
+ }
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
@@ -707,11 +700,12 @@
const char *scope = pubsub_tcpTopicSender_scope(sender);
const char *topic = pubsub_tcpTopicSender_topic(sender);
const char *url = pubsub_tcpTopicSender_url(sender);
+ const char *isPassive = pubsub_tcpTopicSender_isPassive(sender) ? " (passive)" : "";
const char *postUrl = pubsub_tcpTopicSender_isStatic(sender) ? " (static)" : "";
fprintf(out, "|- Topic Sender %s/%s\n", scope == NULL ? "(null)" : scope, topic);
fprintf(out, " |- serializer type = %s\n", serType);
fprintf(out, " |- protocol type = %s\n", protType);
- fprintf(out, " |- url = %s%s\n", url, postUrl);
+ fprintf(out, " |- url = %s%s%s\n", url, postUrl, isPassive);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
celixThreadMutex_unlock(&psa->protocols.mutex);
@@ -788,4 +782,4 @@
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
return result;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
new file mode 100644
index 0000000..d8f05f7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <stdio.h>
+#include <string.h>
+#include "pubsub_tcp_common.h"
+
+
+bool psa_tcp_isPassive(const char* buffer) {
+ bool isPassive = false;
+ // Parse Properties
+ if (buffer != NULL) {
+ char buf[32];
+ snprintf(buf, 32, "%s", buffer);
+ char *trimmed = utils_stringTrim(buf);
+ if (strncasecmp("true", trimmed, strlen("true")) == 0) {
+ isPassive = true;
+ } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {
+ isPassive = false;
+ }
+ }
+ return isPassive;
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
index 1fe1d00..9ea31db 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -28,4 +28,6 @@
hash_map_t *map;
} pubsub_tcp_endPointStore_t;
+bool psa_tcp_isPassive(const char* buffer);
+
#endif //CELIX_PUBSUB_TCP_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 5ab1ea6..262940b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -39,10 +39,6 @@
#include <sys/epoll.h>
#endif
#include <limits.h>
-#include <assert.h>
-#include "ctype.h"
-#include <netdb.h>
-#include <signal.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
@@ -80,18 +76,26 @@
bool connected;
bool headerError;
pubsub_protocol_message_t header;
- unsigned int syncSize;
- unsigned int headerSize;
- unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
- void *headerBuffer;
- unsigned int footerSize;
- void *footerBuffer;
- unsigned int bufferSize;
+ size_t maxMsgSize;
+ size_t readHeaderSize;
+ size_t readHeaderBufferSize; // Size of headerBuffer
+ void *readHeaderBuffer;
+ size_t writeHeaderBufferSize; // Size of headerBuffer
+ void *writeHeaderBuffer;
+ size_t readFooterSize;
+ size_t readFooterBufferSize;
+ void *readFooterBuffer;
+ size_t writeFooterBufferSize;
+ void *writeFooterBuffer;
+ size_t bufferSize;
void *buffer;
- unsigned int bufferReadSize;
- unsigned int metaBufferSize;
- void *metaBuffer;
+ size_t readMetaBufferSize;
+ void *readMetaBuffer;
+ size_t writeMetaBufferSize;
+ void *writeMetaBuffer;
unsigned int retryCount;
+ celix_thread_mutex_t writeMutex;
+ struct msghdr readMsg;
} psa_tcp_connection_entry_t;
//
@@ -116,40 +120,33 @@
celix_log_helper_t *logHelper;
pubsub_protocol_service_t *protocol;
unsigned int bufferSize;
- unsigned int maxNofBuffer;
+ unsigned int maxMsgSize;
unsigned int maxSendRetryCount;
unsigned int maxRcvRetryCount;
double sendTimeout;
double rcvTimeout;
celix_thread_t thread;
bool running;
+ bool enableReceiveEvent;
};
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
- struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *interface_url, struct sockaddr_in *addr);
static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-
static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-
+static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry);
+static inline void pubsub_tcpHandler_ensureReadBufferCapacity(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
+static inline bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry, long int* msgSize);
static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
+static inline long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry);
static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-
static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
static void *pubsub_tcpHandler_thread(void *data);
+
+
//
// Create a handle
//
@@ -169,7 +166,6 @@
handle->logHelper = logHelper;
handle->protocol = protocol;
handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
- handle->maxNofBuffer = 1; // Reserved for future Use;
celixThreadRwlock_create(&handle->dbLock, 0);
handle->running = true;
celixThread_create(&handle->thread, NULL, pubsub_tcpHandler_thread, handle);
@@ -263,7 +259,7 @@
L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
}
}
- struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+ struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
if (addr) {
rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
if (rc != 0) {
@@ -313,33 +309,45 @@
if (fd >= 0) {
entry = calloc(sizeof(psa_tcp_connection_entry_t), 1);
entry->fd = fd;
- if (url)
+ celixThreadMutex_create(&entry->writeMutex, NULL);
+ if (url) {
entry->url = strndup(url, 1024 * 1024);
+ }
if (interface_url) {
entry->interface_url = strndup(interface_url, 1024 * 1024);
} else {
- if (url)
+ if (url) {
entry->interface_url = strndup(url, 1024 * 1024);
+ }
}
- if (addr)
+ if (addr) {
entry->addr = *addr;
- entry->len = sizeof(struct sockaddr_in);
- size_t size = 0;
- handle->protocol->getHeaderSize(handle->protocol->handle, &size);
- entry->headerSize = size;
- handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size);
- entry->headerBufferSize = size;
- handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
- entry->syncSize = size;
- handle->protocol->getFooterSize(handle->protocol->handle, &size);
- entry->footerSize = size;
- entry->bufferSize = handle->bufferSize;
- entry->connected = false;
- if (entry->headerBufferSize) {
- entry->headerBuffer = calloc(sizeof(char), entry->headerSize);
}
- if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+ entry->len = sizeof(struct sockaddr_in);
+ size_t headerSize = 0;
+ size_t footerSize = 0;
+ handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+ handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+ entry->readHeaderBufferSize = headerSize;
+ entry->writeHeaderBufferSize = headerSize;
+
+ entry->readFooterBufferSize = footerSize;
+ entry->writeFooterBufferSize = footerSize;
+ entry->bufferSize = MAX(handle->bufferSize, headerSize);
+ entry->connected = false;
+ unsigned minimalMsgSize = entry->writeHeaderBufferSize + entry->writeFooterBufferSize;
+ if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) {
+ L_ERROR("[TCP Socket] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize);
+ } else {
+ entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : LONG_MAX;
+ }
+ entry->readHeaderBuffer = calloc(sizeof(char), headerSize);
+ entry->writeHeaderBuffer = calloc(sizeof(char), headerSize);
+ if (entry->readFooterBufferSize ) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterBufferSize );
+ if (entry->writeFooterBufferSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterBufferSize);
if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
+ memset(&entry->readMsg, 0x00, sizeof(struct msghdr));
+ entry->readMsg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
}
return entry;
}
@@ -350,40 +358,18 @@
static inline void
pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
if (entry) {
- if (entry->url) {
- free(entry->url);
- entry->url = NULL;
- }
- if (entry->interface_url) {
- free(entry->interface_url);
- entry->interface_url = NULL;
- }
- if (entry->fd >= 0) {
- close(entry->fd);
- entry->fd = -1;
- }
- if (entry->buffer) {
- free(entry->buffer);
- entry->buffer = NULL;
- entry->bufferSize = 0;
- }
- if (entry->headerBuffer) {
- free(entry->headerBuffer);
- entry->headerBuffer = NULL;
- entry->headerBufferSize = 0;
- }
-
- if (entry->footerBuffer) {
- free(entry->footerBuffer);
- entry->footerBuffer = NULL;
- }
-
- if (entry->metaBuffer) {
- free(entry->metaBuffer);
- entry->metaBuffer = NULL;
- entry->metaBufferSize = 0;
- }
- entry->connected = false;
+ free(entry->url);
+ free(entry->interface_url);
+ if (entry->fd >= 0) close(entry->fd);
+ free(entry->buffer);
+ free(entry->readHeaderBuffer);
+ free(entry->writeHeaderBuffer);
+ free(entry->readFooterBuffer);
+ free(entry->writeFooterBuffer);
+ free(entry->readMetaBuffer);
+ free(entry->writeMetaBuffer);
+ free(entry->readMsg.msg_iov);
+ celixThreadMutex_destroy(&entry->writeMutex);
free(entry);
}
}
@@ -405,8 +391,7 @@
//
int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
int rc = 0;
- psa_tcp_connection_entry_t *entry =
- hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+ psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
if (entry == NULL) {
pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
@@ -416,12 +401,11 @@
socklen_t len = sizeof(sin);
getsockname(fd, (struct sockaddr *) &sin, &len);
char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
- struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+ struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
if ((rc >= 0) && addr) {
rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
if (rc < 0 && errno != EINPROGRESS) {
- L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->portnr, interface_url,
- strerror(errno));
+ L_ERROR("[TCP Socket] Cannot connect to %s:%d: using %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno));
close(fd);
} else {
entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &sin);
@@ -454,7 +438,7 @@
hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry);
celixThreadRwlock_unlock(&handle->dbLock);
pubsub_tcpHandler_connectionHandler(handle, fd);
- L_INFO("[TCP Socket] Connect to %s using; %s\n", entry->url, entry->interface_url);
+ L_INFO("[TCP Socket] Connect to %s using: %s\n", entry->url, entry->interface_url);
}
pubsub_utils_url_free(url_info);
}
@@ -554,7 +538,7 @@
else {
rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
if (rc < 0) {
- L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+ L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
}
}
return rc;
@@ -572,6 +556,7 @@
if (entry == NULL) {
char protocol[] = "tcp";
int fd = pubsub_tcpHandler_open(handle, url);
+ rc = fd;
struct sockaddr_in *sin = pubsub_utils_url_from_fd(fd);
// Make handler fd entry
char *pUrl = pubsub_utils_url_get_url(sin, protocol);
@@ -581,7 +566,6 @@
free(pUrl);
free(sin);
celixThreadRwlock_writeLock(&handle->dbLock);
- rc = fd;
if (rc >= 0) {
rc = listen(fd, SOMAXCONN);
if (rc != 0) {
@@ -630,16 +614,26 @@
}
//
-// Setup buffer sizes
+// Setup default receive buffer size.
+// This size is used to allocated the initial read buffer, to avoid receive buffer reallocting.
+// The default receive buffer is allocated in the createEntry when the connection is establised
//
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
- unsigned int maxNofBuffers
- __attribute__((__unused__)),
- unsigned int bufferSize) {
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) {
if (handle != NULL) {
celixThreadRwlock_writeLock(&handle->dbLock);
- handle->bufferSize = bufferSize;
- handle->maxNofBuffer = maxNofBuffers;
+ handle->bufferSize = size;
+ celixThreadRwlock_unlock(&handle->dbLock);
+ }
+ return 0;
+}
+
+//
+// Set Maximum message size
+//
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->maxMsgSize = size;
celixThreadRwlock_unlock(&handle->dbLock);
}
return 0;
@@ -710,7 +704,7 @@
if (prio > 0 && prio < 100) {
struct sched_param sch;
bzero(&sch, sizeof(struct sched_param));
- sch.sched_priority = prio;
+ sch.sched_priority = (int)prio;
pthread_setschedparam(handle->thread.thread, policy, &sch);
} else {
L_INFO("Skipping configuration of thread prio to %i and thread "
@@ -754,35 +748,107 @@
}
}
-static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
- int expectedReadSize = size;
- int nbytes = size;
- int msgSize = 0;
- char* buffer = (char*)_buffer;
- while (nbytes > 0 && expectedReadSize > 0) {
- // Read the message header
- nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
- // Update buffer administration
- offset += nbytes;
- expectedReadSize -= nbytes;
- msgSize += nbytes;
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enable) {
+ if (handle != NULL) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ handle->enableReceiveEvent = enable;
+ celixThreadRwlock_unlock(&handle->dbLock);
}
- if (nbytes <=0) msgSize = nbytes;
- return msgSize;
}
+static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry) {
+ // Note header message is already read
+ return (long int)entry->header.header.payloadPartSize + (long int)entry->header.header.metadataSize + (long int)entry->readFooterSize;
+}
+
+static inline
+bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry, long int* msgSize) {
+ bool result = false;
+ size_t syncSize = 0;
+ size_t protocolHeaderBufferSize = 0;
+ // Get Sync Size
+ handle->protocol->getSyncHeaderSize(handle->protocol->handle, &syncSize);
+ // Get HeaderSize of the Protocol Header
+ handle->protocol->getHeaderSize(handle->protocol->handle, &entry->readHeaderSize);
+ // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+ handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+
+ // Ensure capacity in header buffer
+ pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+
+ entry->readMsg.msg_iovlen = 0;
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer;
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->readHeaderBufferSize;
+ entry->readMsg.msg_iovlen++;
+
+ // Read the message
+ long int nbytes = 0;
+ // Use peek flag to find sync word or when header is part of the payload
+ unsigned int flag = (entry->headerError || (!protocolHeaderBufferSize)) ? MSG_PEEK : 0;
+ if (entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL | flag);
+ if (nbytes >= entry->readHeaderSize) {
+ if (handle->protocol->decodeHeader(handle->protocol->handle,
+ entry->readMsg.msg_iov[0].iov_base,
+ entry->readMsg.msg_iov[0].iov_len,
+ &entry->header) == CELIX_SUCCESS) {
+ // read header from queue, when recovered from headerError and when header is not part of the payload. (Because of MSG_PEEK)
+ if (entry->headerError && protocolHeaderBufferSize && entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);
+ entry->headerError = false;
+ result = true;
+ } else {
+ // Did not receive correct header
+ // skip sync word and try to read next header
+ if (!entry->headerError) {
+ L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+ }
+ entry->headerError = true;
+ entry->readMsg.msg_iovlen = 0;
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = syncSize;
+ entry->readMsg.msg_iovlen++;
+ // remove sync item from the queue
+ if (syncSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);
+ }
+ }
+ if (msgSize) *msgSize = nbytes;
+ return result;
+}
+
+
+static inline void pubsub_tcpHandler_ensureReadBufferCapacity(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+ if (entry->readHeaderSize > entry->readHeaderBufferSize) {
+ free(entry->readHeaderBuffer);
+ entry->readHeaderBuffer = malloc((size_t) entry->readHeaderSize);
+ entry->readHeaderBufferSize = entry->readHeaderSize;
+ }
+
+ if (entry->header.header.payloadSize > entry->bufferSize) {
+ free(entry->buffer);
+ entry->buffer = malloc((size_t)entry->header.header.payloadSize);
+ entry->bufferSize = entry->header.header.payloadSize;
+ }
+
+ if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
+ free(entry->readMetaBuffer);
+ entry->readMetaBuffer = malloc((size_t) entry->header.header.metadataSize);
+ entry->readMetaBufferSize = entry->header.header.metadataSize;
+ }
+
+ if (entry->readFooterSize > entry->readFooterBufferSize) {
+ free(entry->readFooterBuffer);
+ entry->readFooterBuffer = malloc( (size_t) entry->readFooterSize);
+ entry->readFooterBufferSize = entry->readFooterSize;
+ }
+}
static inline
void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
if (entry->header.header.payloadSize > 0) {
- handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
+ handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
}
if (entry->header.header.metadataSize > 0) {
- handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
- entry->header.header.metadataSize, &entry->header);
- entry->metaBufferSize = entry->header.header.metadataSize;
+ handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer,
+ entry->header.header.metadataSize, &entry->header);
}
if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) {
struct timespec receiveTime;
@@ -793,16 +859,65 @@
}
}
+static inline
+long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry) {
+ entry->readMsg.msg_iovlen = 0;
+ handle->protocol->getFooterSize(handle->protocol->handle, &entry->readFooterSize);
+
+ // from the header can be determined how large buffers should be. Even before receiving all data these buffers can be allocated
+ pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+
+ if (entry->header.header.payloadPartSize) {
+ char* buffer = entry->buffer;
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset];
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->header.header.payloadPartSize;
+ entry->readMsg.msg_iovlen++;
+ }
+ if (entry->header.header.metadataSize) {
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readMetaBuffer;
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->header.header.metadataSize;
+ entry->readMsg.msg_iovlen++;
+ }
+
+ if (entry->readFooterSize) {
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readFooterBuffer;
+ entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->readFooterSize;
+ entry->readMsg.msg_iovlen++;
+ }
+
+ long int nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);
+ if (nbytes >= pubsub_tcpHandler_getMsgSize(entry)) {
+ bool valid = true;
+ if (entry->readFooterSize) {
+ if (handle->protocol->decodeFooter(handle->protocol->handle, entry->readFooterBuffer, entry->readFooterBufferSize, &entry->header) != CELIX_SUCCESS) {
+ // Did not receive correct footer
+ L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+ valid = false;
+ }
+ }
+ if (!entry->header.header.isLastSegment) {
+ // Not last Segment of message
+ valid = false;
+ }
+
+ if (valid) {
+ // Complete message is received
+ pubsub_tcpHandler_decodePayload(handle, entry);
+ }
+ }
+ return nbytes;
+}
//
// Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
// If the message is completely reassembled true is returned and the index and size have valid values
//
int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
- celixThreadRwlock_writeLock(&handle->dbLock);
+ celixThreadRwlock_readLock(&handle->dbLock);
psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
- if (entry == NULL)
+ if (entry == NULL) {
entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
+ }
// Find FD entry
if (entry == NULL) {
celixThreadRwlock_unlock(&handle->dbLock);
@@ -813,114 +928,22 @@
celixThreadRwlock_unlock(&handle->dbLock);
return -1;
}
-
- // Message buffer is to small, reallocate to make it bigger
- if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
- handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
- if (entry->buffer) free(entry->buffer);
- entry->buffer = malloc((size_t) handle->bufferSize);
- entry->bufferSize = handle->bufferSize;
- }
- // Read the message
- bool validMsg = false;
- char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
- int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
- if (nbytes > 0) {
- // Check header message buffer
- if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
- // Did not receive correct header
- // skip sync word and try to read next header
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
- if (!entry->headerError) {
- L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
- }
- entry->headerError = true;
- entry->bufferReadSize = 0;
- } else {
- // Read header message from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
- if ((nbytes > 0) && (nbytes == entry->headerSize)) {
- entry->headerError = false;
- // For headerless message, add header to bufferReadSize;
- if (!entry->headerBufferSize)
- entry->bufferReadSize += nbytes;
- // Alloc message buffers
- if (entry->header.header.payloadSize > entry->bufferSize) {
- handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
- if (entry->buffer)
- free(entry->buffer);
- entry->buffer = malloc((size_t) handle->bufferSize);
- entry->bufferSize = handle->bufferSize;
- }
- if (entry->header.header.metadataSize > entry->metaBufferSize) {
- if (entry->metaBuffer) {
- free(entry->metaBuffer);
- }
- entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
- entry->metaBufferSize = entry->header.header.metadataSize;
- L_WARN("[TCP Socket] socket: %d, url: %s, realloc read meta buffer: (%d, %d) \n", entry->fd,
- entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
- }
-
- if (entry->header.header.payloadSize) {
- unsigned int offset = entry->header.header.payloadOffset;
- unsigned int size = entry->header.header.payloadPartSize;
- // For header less messages adjust offset and msg size;
- if (!entry->headerBufferSize) {
- offset = entry->headerSize;
- size -= offset;
- }
- // Read payload data from queue
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
- if (nbytes > 0) {
- if (nbytes == size) {
- entry->bufferReadSize += nbytes;
- } else {
- entry->bufferReadSize = 0;
- L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
- }
- }
- }
- if (nbytes > 0 && entry->header.header.metadataSize) {
- // Read meta data from queue
- unsigned int size = entry->header.header.metadataSize;
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
- if ((nbytes > 0) && (nbytes != size)) {
- L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
- }
- }
- // Check for end of message using, footer of message. Because of streaming protocol
- if (nbytes > 0) {
- if (entry->footerSize > 0) {
- nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
- if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
- // valid footer, this means that the message is valid
- validMsg = true;
- } else {
- // Did not receive correct header
- L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
- entry->bufferReadSize = 0;
- }
- } else {
- // No Footer, then complete message is received
- validMsg = true;
- }
- }
- }
- }
+ long int nbytes = 0;
+ // if not yet enough bytes are received the header can not be read
+ if (pubsub_tcpHandler_readHeader(handle, fd, entry, &nbytes)) {
+ nbytes = pubsub_tcpHandler_readPayload(handle, fd, entry);
}
if (nbytes > 0) {
entry->retryCount = 0;
- // Check if complete message is received
- if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) {
- entry->bufferReadSize = 0;
- pubsub_tcpHandler_decodePayload(handle, entry);
- }
- } else {
- if (entry->retryCount < handle->maxRcvRetryCount) {
+ } else if (nbytes < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ // Non blocking interrupt
+ entry->retryCount = 0;
+ } else if (entry->retryCount < handle->maxRcvRetryCount) {
entry->retryCount++;
- L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd,
- strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+ L_WARN(
+ "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.",
+ entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
} else {
L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
entry->fd, handle->maxRcvRetryCount, strerror(errno));
@@ -928,10 +951,9 @@
}
}
celixThreadRwlock_unlock(&handle->dbLock);
- return nbytes;
+ return (int)nbytes;
}
-
int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
int result = 0;
@@ -954,70 +976,32 @@
return result;
}
-int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, void *payload,
- pubsub_tcpHandler_acceptConnectMessage_callback_t connectMessageCallback,
- pubsub_tcpHandler_acceptConnectMessage_callback_t disconnectMessageCallback) {
- int result = 0;
- celixThreadRwlock_writeLock(&handle->dbLock);
- handle->acceptConnectMessageCallback = connectMessageCallback;
- handle->acceptDisconnectMessageCallback = disconnectMessageCallback;
- handle->acceptConnectPayload = payload;
- celixThreadRwlock_unlock(&handle->dbLock);
- return result;
-}
-
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
- int nbytes = 0;
- int msgSize = 0;
- if (entry->fd >= 0 && size && msg->msg_iovlen) {
- int expectedReadSize = size;
- unsigned int offset = 0;
- nbytes = size;
- while (nbytes > 0 && expectedReadSize > 0) {
- // Read the message header
- nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
- // Update admin
- expectedReadSize -= nbytes;
- msgSize += nbytes;
- // Not all written
- if (expectedReadSize && nbytes > 0) {
- unsigned int readSize = 0;
- unsigned int readIndex = 0;
- unsigned int i = 0;
- for (i = 0; i < msg->msg_iovlen; i++) {
- if (nbytes < msg->msg_iov[i].iov_len) {
- readIndex = i;
- break;
- }
- readSize+= msg->msg_iov[i].iov_len;
- }
- msg->msg_iov = &msg->msg_iov[readIndex];
- msg->msg_iovlen -= readIndex;
- char* buffer = (char*)msg->msg_iov->iov_base;
- offset = nbytes - readSize;
- msg->msg_iov->iov_base = &buffer[offset];
- msg->msg_iov->iov_len = msg->msg_iov->iov_len - offset;
- }
- }
- }
- if (nbytes <=0) msgSize = nbytes;
- return msgSize;
-}
//
// Write large data to TCP. .
//
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
size_t msg_iov_len, int flags) {
- celixThreadRwlock_readLock(&handle->dbLock);
int result = 0;
+ if (handle == NULL) {
+ return -1;
+ }
int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
int nofConnToClose = 0;
if (handle) {
+ celixThreadRwlock_readLock(&handle->dbLock);
hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
+ size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (!entry->connected) continue;
+ if (!entry->connected) {
+ continue;
+ }
+ // When maxMsgSize is zero then payloadSize is disabled
+ if (entry->maxMsgSize == 0) {
+ // if max msg size is set to zero nothing will be send
+ continue;
+ }
+ celixThreadMutex_lock(&entry->writeMutex);
void *payloadData = NULL;
size_t payloadSize = 0;
if (msg_iov_len == 1) {
@@ -1027,6 +1011,16 @@
payloadSize += msgIoVec[i].iov_len;
}
}
+
+ // check if message is not too large
+ bool isMessageSegmentationSupported = false;
+ handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported);
+ if (!isMessageSegmentationSupported && (msg_iov_len > max_msg_iov_len || payloadSize > entry->maxMsgSize)) {
+ L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n", entry->fd);
+ celixThreadMutex_unlock(&entry->writeMutex);
+ continue;
+ }
+
message->header.convertEndianess = 0;
message->header.payloadSize = payloadSize;
message->header.payloadPartSize = payloadSize;
@@ -1036,130 +1030,176 @@
void *metadataData = NULL;
size_t metadataSize = 0;
if (message->metadata.metadata) {
- metadataData = entry->metaBuffer;
- handle->protocol->encodeMetadata(handle->protocol->handle, message,
- &metadataData,
- &metadataSize);
- entry->metaBufferSize = metadataSize;
+ metadataSize = entry->writeMetaBufferSize;
+ metadataData = entry->writeMetaBuffer;
+ // When maxMsgSize is smaller then meta data is disabled
+ if (metadataSize > entry->maxMsgSize) {
+ metadataSize = 0;
+ }
+ handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize);
}
+
message->header.metadataSize = metadataSize;
+ size_t totalMsgSize = payloadSize + metadataSize;
- void *footerData = NULL;
- size_t footerDataSize = 0;
- if (entry->footerSize) {
- footerData = entry->footerBuffer;
- handle->protocol->encodeFooter(handle->protocol->handle, message,
- &footerData,
- &footerDataSize);
- entry->footerSize = footerDataSize;
- }
+ size_t sendMsgSize = 0;
+ size_t msgPayloadOffset = 0;
+ size_t msgIovOffset = 0;
+ bool allPayloadAdded = (payloadSize == 0);
+ long int nbytes = LONG_MAX;
+ while (sendMsgSize < totalMsgSize && nbytes > 0) {
+ struct msghdr msg;
+ struct iovec msg_iov[IOV_MAX];
+ memset(&msg, 0x00, sizeof(struct msghdr));
+ msg.msg_name = &entry->addr;
+ msg.msg_namelen = entry->len;
+ msg.msg_flags = flags;
+ msg.msg_iov = msg_iov;
- size_t msgSize = 0;
- struct msghdr msg;
- struct iovec msg_iov[IOV_MAX];
- memset(&msg, 0x00, sizeof(struct msghdr));
- msg.msg_name = &entry->addr;
- msg.msg_namelen = entry->len;
- msg.msg_flags = flags;
- msg.msg_iov = msg_iov;
+ size_t msgPartSize = 0;
+ message->header.payloadPartSize = 0;
+ message->header.payloadOffset = 0;
+ message->header.metadataSize = 0;
+ message->header.isLastSegment = 0;
- // Write generic seralized payload in vector buffer
- if (payloadSize && payloadData) {
- msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
- msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
- } else {
- // copy serialized vector into vector buffer
- for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+ size_t protocolHeaderBufferSize = 0;
+ // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+ handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+ size_t footerSize = 0;
+ // Get size of the Protocol Footer
+ handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+ size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize;
+
+ // reserve space for the header if required, header is added later when size of message is known (message can split in parts)
+ if (protocolHeaderBufferSize) {
msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
- msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ }
+ // Write generic seralized payload in vector buffer
+ if (!allPayloadAdded) {
+ if (payloadSize && payloadData && maxMsgSize) {
+ char *buffer = payloadData;
+ msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadOffset];
+ msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadOffset), maxMsgSize);
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+
+ } else {
+ // copy serialized vector into vector buffer
+ size_t i;
+ for (i = msgIovOffset; i < MIN(msg_iov_len, msgIovOffset + max_msg_iov_len); i++) {
+ if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) {
+ break;
+ }
+ msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
+ msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
+ msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+ msg.msg_iovlen++;
+ }
+ // if no entry could be added
+ if (i == msgIovOffset) {
+ // TODO element can be split in parts?
+ L_ERROR("[TCP Socket] vector io element is larger than max msg size");
+ break;
+ }
+ msgIovOffset = i;
+ }
+ message->header.payloadPartSize = msgPartSize;
+ message->header.payloadOffset = msgPayloadOffset;
+ msgPayloadOffset += message->header.payloadPartSize;
+ sendMsgSize = msgPayloadOffset;
+ allPayloadAdded= msgPayloadOffset >= payloadSize;
+ }
+
+ // Write optional metadata in vector buffer
+ if (allPayloadAdded &&
+ (metadataSize != 0 && metadataData) &&
+ (msgPartSize < maxMsgSize) &&
+ (msg.msg_iovlen-1 < max_msg_iov_len)) { // header is already included
+ msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+ msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
+ msg.msg_iovlen++;
+ msgPartSize += metadataSize;
+ message->header.metadataSize = metadataSize;
+ sendMsgSize += metadataSize;
+ }
+ if (sendMsgSize >= totalMsgSize) {
+ message->header.isLastSegment = 0x1;
+ }
+
+ void *headerData = NULL;
+ size_t headerSize = 0;
+ // Get HeaderSize of the Protocol Header
+ handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+
+ // check if header is not part of the payload (=> headerBufferSize = 0)
+ if (protocolHeaderBufferSize) {
+ headerData = entry->writeHeaderBuffer;
+ // Encode the header, with payload size and metadata size
+ handle->protocol->encodeHeader(handle->protocol->handle, message, &headerData, &headerSize);
+ entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+ if (headerData && entry->writeHeaderBuffer != headerData) {
+ entry->writeHeaderBuffer = headerData;
+ }
+ if (headerSize && headerData) {
+ // Write header in 1st vector buffer item
+ msg.msg_iov[0].iov_base = headerData;
+ msg.msg_iov[0].iov_len = headerSize;
+ msgPartSize += msg.msg_iov[0].iov_len;
+ } else {
+ L_ERROR("[TCP Socket] No header buffer is generated");
+ break;
+ }
+ }
+
+ void *footerData = NULL;
+ // Write optional footerData in vector buffer
+ if (footerSize) {
+ footerData = entry->writeFooterBuffer;
+ handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerSize);
+ if (footerData && entry->writeFooterBuffer != footerData) {
+ entry->writeFooterBuffer = footerData;
+ entry->writeFooterBufferSize = footerSize;
+ }
+ if (footerData) {
+ msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+ msg.msg_iov[msg.msg_iovlen].iov_len = footerSize;
+ msg.msg_iovlen++;
+ msgPartSize += footerSize;
+ }
+ }
+ nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+
+ // When a specific socket keeps reporting errors can indicate a subscriber
+ // which is not active anymore, the connection will remain until the retry
+ // counter exceeds the maximum retry count.
+ // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+ if (nbytes == -1) {
+ if (entry->retryCount < handle->maxSendRetryCount) {
+ entry->retryCount++;
+ L_ERROR(
+ "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
+ entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno));
+ } else {
+ L_ERROR(
+ "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+ connFdCloseQueue[nofConnToClose++] = entry->fd;
+ }
+ result = -1; //At least one connection failed sending
+ } else if (msgPartSize) {
+ entry->retryCount = 0;
+ if (nbytes != msgPartSize) {
+ L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgPartSize, nbytes, strerror(errno));
+ }
+ }
+ // Note: serialized Payload is deleted by serializer
+ if (payloadData && (payloadData != message->payload.payload)) {
+ free(payloadData);
}
}
-
- // Write optional metadata in vector buffer
- if (metadataSize && metadataData) {
- msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
- msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
- }
-
- // Write optional footerData in vector buffer
- if (footerData && footerDataSize) {
- msg.msg_iovlen++;
- msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
- msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
- msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
- }
-
- void *headerData = NULL;
- size_t headerSize = 0;
- // check if header is not part of the payload (=> headerBufferSize = 0)s
- if (entry->headerBufferSize) {
- headerData = entry->headerBuffer;
- // Encode the header, with payload size and metadata size
- handle->protocol->encodeHeader(handle->protocol->handle, message,
- &headerData,
- &headerSize);
- entry->headerBufferSize = headerSize;
- }
- if (!entry->headerBufferSize) {
- // Skip header buffer, when header is part of payload;
- msg.msg_iov = &msg_iov[1];
- } else if (headerSize && headerData) {
- // Write header in 1st vector buffer item
- msg.msg_iov[0].iov_base = headerData;
- msg.msg_iov[0].iov_len = headerSize;
- msgSize += msg.msg_iov[0].iov_len;
- msg.msg_iovlen++;
- } else {
- L_ERROR("[TCP Socket] No header buffer is generated");
- msg.msg_iovlen = 0;
- }
- long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
- // When a specific socket keeps reporting errors can indicate a subscriber
- // which is not active anymore, the connection will remain until the retry
- // counter exceeds the maximum retry count.
- // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
- if (nbytes == -1) {
- if (entry->retryCount < handle->maxSendRetryCount) {
- entry->retryCount++;
- L_ERROR(
- "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ",
- entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
- } else {
- L_ERROR(
- "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
- entry->fd, handle->maxSendRetryCount, strerror(errno));
- connFdCloseQueue[nofConnToClose++] = entry->fd;
- }
- result = -1; //At least one connection failed sending
- } else if (msgSize) {
- entry->retryCount = 0;
- if (nbytes != msgSize) {
- L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes, strerror(errno));
- }
- }
- // Release data
- if (headerData && headerData != entry->headerBuffer) {
- free(headerData);
- }
- // Note: serialized Payload is deleted by serializer
- if (payloadData && (payloadData != message->payload.payload)) {
- free(payloadData);
- }
- if (metadataData && metadataData != entry->metaBuffer) {
- free(metadataData);
- }
- if (footerData && footerData != entry->footerBuffer) {
- free(footerData);
- }
+ celixThreadMutex_unlock(&entry->writeMutex);
}
+ celixThreadRwlock_unlock(&handle->dbLock);
}
- celixThreadRwlock_unlock(&handle->dbLock);
//Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
for (int i = 0; i < nofConnToClose; i++) {
pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
@@ -1171,12 +1211,12 @@
// get interface URL
//
char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
- hash_map_iterator_t interface_iter =
+ hash_map_iterator_t iter =
hashMapIterator_construct(handle->interface_url_map);
char *url = NULL;
- while (hashMapIterator_hasNext(&interface_iter)) {
+ while (hashMapIterator_hasNext(&iter)) {
psa_tcp_connection_entry_t *entry =
- hashMapIterator_nextValue(&interface_iter);
+ hashMapIterator_nextValue(&iter);
if (entry && entry->url) {
if (!url) {
url = celix_utils_strdup(entry->url);
@@ -1189,6 +1229,34 @@
}
return url;
}
+//
+// get interface URL
+//
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
+ celixThreadRwlock_writeLock(&handle->dbLock);
+ hash_map_iterator_t iter =
+ hashMapIterator_construct(handle->connection_url_map);
+ char *url = NULL;
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_tcp_connection_entry_t *entry =
+ hashMapIterator_nextValue(&iter);
+ if (entry && entry->url) {
+ if (!url) {
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+ url = celix_utils_strdup(url_info->interface_url ? url_info->interface_url : entry->url);
+ pubsub_utils_url_free(url_info);
+ } else {
+ char *tmp = url;
+ pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+ asprintf(&url, "%s %s", tmp, url_info->interface_url ? url_info->interface_url : entry->url);
+ pubsub_utils_url_free(url_info);
+ free(tmp);
+ }
+ }
+ }
+ celixThreadRwlock_unlock(&handle->dbLock);
+ return url;
+}
//
// Handle non-blocking accept (sender)
@@ -1218,7 +1286,8 @@
#else
struct epoll_event event;
bzero(&event, sizeof(event)); // zero the struct
- event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+ event.events = EPOLLRDHUP | EPOLLERR;
+ if (handle->enableReceiveEvent) event.events |= EPOLLIN;
event.data.fd = entry->fd;
// Register Read to epoll
rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
@@ -1321,7 +1390,7 @@
if (handle->efd >= 0) {
int nof_events = 0;
struct epoll_event events[MAX_EVENTS];
- nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, handle->timeout);
+ nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, (int)handle->timeout);
if (nof_events < 0) {
if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
} else
@@ -1357,7 +1426,6 @@
}
}
}
- return;
}
#endif
@@ -1377,4 +1445,4 @@
celixThreadRwlock_unlock(&handle->dbLock);
} // while
return NULL;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index ed4581c..2d97634 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -58,15 +58,14 @@
int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url);
int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url);
int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
-
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
- unsigned int maxNofBuffers,
- unsigned int bufferSize);
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size);
void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle, bool enable);
int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
@@ -86,6 +85,7 @@
pubsub_tcpHandler_acceptConnectMessage_callback_t connectMessageCallback,
pubsub_tcpHandler_acceptConnectMessage_callback_t disconnectMessageCallback);
char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle);
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle);
void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, const char *sched);
void pubsub_tcpHandler_setThreadName(pubsub_tcpHandler_t *handle, const char *topic, const char *scope);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 4fa4586..b1b2c58 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -24,23 +24,19 @@
#include <pubsub/subscriber.h>
#include <memory.h>
#include <pubsub_constants.h>
-#include <assert.h>
-#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <celix_log_helper.h>
-#include <math.h>
#include "pubsub_tcp_handler.h"
#include "pubsub_tcp_topic_receiver.h"
#include "pubsub_psa_tcp_constants.h"
#include "pubsub_tcp_common.h"
-#include "celix_utils_api.h"
#include <uuid/uuid.h>
#include <pubsub_admin_metrics.h>
#include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
#include <celix_api.h>
-#define MAX_EPOLL_EVENTS 16
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
#endif
@@ -65,8 +61,10 @@
char *topic;
size_t timeout;
bool metricsEnabled;
+ bool isPassive;
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
+ pubsub_interceptors_handler_t *interceptorsHandler;
struct {
celix_thread_t thread;
@@ -107,7 +105,6 @@
double averageDelayInSeconds;
double maxDelayInSeconds;
double minDelayInSeconds;
- unsigned int lastSeqNr;
unsigned long nrOfMissingSeqNumbers;
} psa_tcp_subscriber_metrics_entry_t;
@@ -118,24 +115,14 @@
bool initialized; //true if the init function is called through the receive thread
} psa_tcp_subscriber_entry_t;
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
-
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
- const celix_bundle_t *owner);
-
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
static void *psa_tcp_recvThread(void *data);
-
static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
-
static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
-
static void processMsg(void *handle, const pubsub_protocol_message_t *hdr, bool *release, struct timespec *receiveTime);
-
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
-
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-
static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -143,7 +130,7 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *serializer,
long protocolSvcId,
@@ -157,52 +144,45 @@
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
- bool isServerEndPoint = false;
+ pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+ const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
+ const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+ const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
- /* Check if it's a static endpoint */
- const char *staticClientEndPointUrls = NULL;
- const char *staticServerEndPointUrls = NULL;
- const char *staticConnectUrls = NULL;
-
- staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
-
+ if (isPassive) {
+ receiver->isPassive = psa_tcp_isPassive(isPassive);
+ }
if (topicProperties != NULL) {
if(staticConnectUrls == NULL) {
staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
}
- const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
- if (endPointType != NULL) {
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
- staticClientEndPointUrls = staticConnectUrls;
- }
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
- staticServerEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL);
- isServerEndPoint = true;
- }
+ if (isPassive == NULL) {
+ receiver->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
+ }
+ if (passiveKey == NULL) {
+ passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
}
}
+
// Set receiver connection thread timeout.
// property is in ms, timeout value in us. (convert ms to us).
receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
/* When it's an endpoint share the socket with the sender */
- if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
- celixThreadMutex_lock(&endPointStore->mutex);
- const char *endPointUrl = (staticServerEndPointUrls) ? staticServerEndPointUrls : staticClientEndPointUrls;
- pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+ if (passiveKey != NULL) {
+ celixThreadMutex_lock(&handlerStore->mutex);
+ pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (receiver->socketHandler == NULL)
receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
entry = receiver->socketHandler;
receiver->sharedSocketHandler = receiver->socketHandler;
- hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+ hashMap_put(handlerStore->map, (void *) passiveKey, entry);
} else {
receiver->socketHandler = entry;
receiver->sharedSocketHandler = entry;
}
- celixThreadMutex_unlock(&endPointStore->mutex);
+ celixThreadMutex_unlock(&handlerStore->mutex);
} else {
receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
}
@@ -213,14 +193,12 @@
long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
- long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
- PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
- long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
+ long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+
pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
- pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions,
- (unsigned int) buffer_size);
+ pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
@@ -230,8 +208,7 @@
pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
}
receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
- PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+ PSA_TCP_DEFAULT_METRICS_ENABLED);
celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
celixThreadMutex_create(&receiver->thread.mutex, NULL);
@@ -239,7 +216,7 @@
receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
- if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (staticServerEndPointUrls == NULL)) {
+ if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (!receiver->isPassive)) {
char *urlsCopy = strndup(staticConnectUrls, 1024 * 1024);
char *url;
char *save = urlsCopy;
@@ -255,7 +232,7 @@
free(urlsCopy);
}
- if (receiver->socketHandler != NULL && (!isServerEndPoint)) {
+ if (receiver->socketHandler != NULL && (!receiver->isPassive)) {
// Configure Receiver thread
receiver->thread.running = true;
celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
@@ -310,15 +287,14 @@
if (entry != NULL) {
receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
hashMap_destroy(entry->subscriberServices, false, false);
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+ while (hashMapIterator_hasNext(&iter2)) {
+ hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+ hashMap_destroy(origins, true, true);
+ }
+ hashMap_destroy(entry->metrics, false, false);
free(entry);
}
-
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
- while (hashMapIterator_hasNext(&iter2)) {
- hash_map_t *origins = hashMapIterator_nextValue(&iter2);
- hashMap_destroy(origins, true, true);
- }
- hashMap_destroy(entry->metrics, false, false);
}
hashMap_destroy(receiver->subscribers.map, false, false);
@@ -346,7 +322,7 @@
pubsub_tcpHandler_destroy(receiver->socketHandler);
receiver->socketHandler = NULL;
}
-
+ pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
if (receiver->scope != NULL) {
free(receiver->scope);
}
@@ -374,20 +350,36 @@
void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
celix_array_list_t *unconnectedUrls) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ if (receiver->isPassive) {
+ char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
char *url = NULL;
- asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
- if (entry->connected) {
+ asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
+ if (interface_url) {
celix_arrayList_add(connectedUrls, url);
} else {
celix_arrayList_add(unconnectedUrls, url);
}
+ free(interface_url);
+ } else {
+ hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+ char *url = NULL;
+ asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
+ if (entry->connected) {
+ celix_arrayList_add(connectedUrls, url);
+ } else {
+ celix_arrayList_add(unconnectedUrls, url);
+ }
+ }
}
celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
}
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *receiver) {
+ return receiver->isPassive;
+}
+
void pubsub_tcpTopicReceiver_connectTo(
pubsub_tcp_topic_receiver_t *receiver,
const char *url) {
@@ -466,8 +458,7 @@
hashMap_put(entry->subscriberServices, (void*)svcId, svc);
- int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
- &entry->msgTypes);
+ int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, &entry->msgTypes);
if (rc == 0) {
entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
@@ -491,14 +482,13 @@
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props,
const celix_bundle_t *bnd) {
pubsub_tcp_topic_receiver_t *receiver = handle;
long bndId = celix_bundle_getId(bnd);
long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
-
celixThreadMutex_lock(&receiver->subscribers.mutex);
psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
if (entry != NULL) {
@@ -527,7 +517,7 @@
static inline void
processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
- const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime) {
+ const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
//NOTE receiver->subscribers.mutex locked
pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (message->header.msgId));
bool monitor = receiver->metricsEnabled;
@@ -559,31 +549,39 @@
}
if (status == CELIX_SUCCESS) {
- hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+ const char *msgType = msgSer->msgName;
+ uint32_t msgId = message->header.msgId;
+ celix_properties_t *metadata = message->metadata.metadata;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
bool release = true;
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
- &release);
- if (!release && hashMapIterator_hasNext(&iter)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
- status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
- receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- break;
+ if (cont) {
+ hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+ svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+ if (!release && hashMapIterator_hasNext(&iter)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
+ msgSer->msgName,
+ receiver->scope == NULL ? "(null)" : receiver->scope,
+ receiver->topic);
+ break;
+ }
+ release = true;
}
- release = true;
}
+ if (release) {
+ msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+ }
+ if (message->metadata.metadata) {
+ celix_properties_destroy(message->metadata.metadata);
+ }
+ updateReceiveCount += 1;
}
- if (release) {
- msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
- }
- if (message->metadata.metadata) {
- celix_properties_destroy(message->metadata.metadata);
- }
- updateReceiveCount += 1;
} else {
updateSerError += 1;
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
@@ -650,10 +648,7 @@
pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope,
- PUBSUB_AMDIN_METRICS_NAME_MAX,
- "%s",
- receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+ snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
int msgTypesCount = 0;
@@ -677,8 +672,7 @@
hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
while (hashMapIterator_hasNext(&iter2)) {
hash_map_t *origins = hashMapIterator_nextValue(&iter2);
- result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins),
- sizeof(*(result->msgTypes[i].origins)));
+ result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
int k = 0;
hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
@@ -694,10 +688,8 @@
result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
- result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds =
- metrics->averageTimeBetweenMessagesInSeconds;
- result->msgTypes[i].origins[k].averageSerializationTimeInSeconds =
- metrics->averageSerializationTimeInSeconds;
+ result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+ result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
@@ -721,7 +713,7 @@
hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
- if ((entry) && (!entry->connected)) {
+ if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
if (rc < 0) {
allConnected = false;
@@ -812,12 +804,11 @@
int versionMajor;
int versionMinor;
- if (msgVersion != NULL) {
+ if (msgVersion!=NULL) {
version_getMajor(msgVersion, &versionMajor);
version_getMinor(msgVersion, &versionMinor);
- if (major == ((unsigned char) versionMajor)) { /* Different major means incompatible */
- check = (minor >=
- ((unsigned char) versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+ if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+ check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
}
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
index 50d5a97..118bf11 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
@@ -32,7 +32,7 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *serializer,
long protocolSvcId,
@@ -47,6 +47,7 @@
void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver,
celix_array_list_t *connectedUrls,
celix_array_list_t *unconnectedUrls);
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index dbb5e26..b287ebd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -33,10 +33,9 @@
#include "pubsub_tcp_common.h"
#include <uuid/uuid.h>
#include "celix_constants.h"
-#include <signal.h>
#include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
-#define FIRST_SEND_DELAY_IN_SECONDS 2
#define TCP_BIND_MAX_RETRY 10
#define L_DEBUG(...) \
@@ -59,13 +58,15 @@
bool metricsEnabled;
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
+ pubsub_interceptors_handler_t *interceptorsHandler;
char *scope;
char *topic;
char *url;
bool isStatic;
-
+ bool isPassive;
bool verbose;
+ unsigned long send_delay;
struct {
long svcId;
@@ -128,7 +129,7 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *ser,
long protocolSvcId,
@@ -144,52 +145,44 @@
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
- PSA_TCP_DEFAULT_METRICS_ENABLED);
- bool isEndpoint = false;
+ pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+ sender->isPassive = false;
+ sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
char *urls = NULL;
const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
- const char *discUrl = NULL;
- const char *staticClientEndPointUrls = NULL;
- const char *staticServerEndPointUrls = NULL;
+ const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+ const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+ const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
- discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
-
+ if (isPassive) {
+ sender->isPassive = psa_tcp_isPassive(isPassive);
+ }
if (topicProperties != NULL) {
if (discUrl == NULL) {
discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
}
- /* Check if it's a static endpoint */
- const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
- if (endPointType != NULL) {
- isEndpoint = true;
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
- staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
- }
- if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
- strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
- staticServerEndPointUrls = discUrl;
- }
+ if (isPassive == NULL) {
+ sender->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
+ }
+ if (passiveKey == NULL) {
+ passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
}
}
-
/* When it's an endpoint share the socket with the receiver */
- if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
- celixThreadMutex_lock(&endPointStore->mutex);
- const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls : staticServerEndPointUrls;
- pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+ if (passiveKey != NULL) {
+ celixThreadMutex_lock(&handlerStore->mutex);
+ pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
if (entry == NULL) {
if (sender->socketHandler == NULL)
sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
entry = sender->socketHandler;
sender->sharedSocketHandler = sender->socketHandler;
- hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+ hashMap_put(handlerStore->map, (void *) passiveKey, entry);
} else {
sender->socketHandler = entry;
sender->sharedSocketHandler = entry;
}
- celixThreadMutex_unlock(&endPointStore->mutex);
+ celixThreadMutex_unlock(&handlerStore->mutex);
} else {
sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
}
@@ -197,66 +190,66 @@
if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
- long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
- PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
- double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
- (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
- PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+ long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+ double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+ long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+ long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+ sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx, PUBSUB_UTILS_PSA_SEND_DELAY, PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY);
pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
- pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+ pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+ pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+ // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+ // Because the topic receiver is already started, enable the receive event.
+ pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
+ pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
}
- //setting up tcp socket for TCP TopicSender
- if (staticClientEndPointUrls != NULL) {
- // Store url for client static endpoint
- sender->url = strndup(staticClientEndPointUrls, 1024 * 1024);
- sender->isStatic = true;
- } else if (discUrl != NULL) {
- urls = strndup(discUrl, 1024 * 1024);
- sender->isStatic = true;
- } else if (ip != NULL) {
- urls = strndup(ip, 1024 * 1024);
- } else {
- struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
- urls = pubsub_utils_url_get_url(sin, NULL);
- free(sin);
- }
- if (!sender->url) {
- char *urlsCopy = strndup(urls, 1024 * 1024);
- char *url;
- char *save = urlsCopy;
- while ((url = strtok_r(save, " ", &save))) {
- int retry = 0;
- while (url && retry < TCP_BIND_MAX_RETRY) {
- pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
- int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
- if (rc < 0) {
- L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
- } else {
- url = NULL;
- }
- pubsub_utils_url_free(urlInfo);
- retry++;
- }
+ if (!sender->isPassive) {
+ //setting up tcp socket for TCP TopicSender
+ if (discUrl != NULL) {
+ urls = strndup(discUrl, 1024 * 1024);
+ sender->isStatic = true;
+ } else if (ip != NULL) {
+ urls = strndup(ip, 1024 * 1024);
+ } else {
+ struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
+ urls = pubsub_utils_url_get_url(sin, NULL);
+ free(sin);
}
- free(urlsCopy);
- sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
- }
- if (urls)
+ if (!sender->url) {
+ char *urlsCopy = strndup(urls, 1024 * 1024);
+ char *url;
+ char *save = urlsCopy;
+ while ((url = strtok_r(save, " ", &save))) {
+ int retry = 0;
+ while (url && retry < TCP_BIND_MAX_RETRY) {
+ pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+ int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
+ if (rc < 0) {
+ L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
+ } else {
+ url = NULL;
+ }
+ pubsub_utils_url_free(urlInfo);
+ retry++;
+ }
+ }
+ free(urlsCopy);
+ sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
+ }
free(urls);
+ }
- if (sender->url != NULL) {
+ //register publisher services using a service factory
+ if ((sender->url != NULL) || (sender->isPassive)) {
sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
sender->topic = strndup(topic, 1024 * 1024);
celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
- }
- //register publisher services using a service factory
- if (sender->url != NULL) {
sender->publisher.factory.handle = sender;
sender->publisher.factory.getService = psa_tcp_getPublisherService;
sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
@@ -274,9 +267,7 @@
opts.properties = props;
sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
- }
-
- if (sender->url == NULL) {
+ } else {
free(sender);
sender = NULL;
}
@@ -312,6 +303,7 @@
celixThreadMutex_unlock(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->boundedServices.mutex);
+ pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
pubsub_tcpHandler_destroy(sender->socketHandler);
sender->socketHandler = NULL;
@@ -343,18 +335,25 @@
}
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
- return sender->url;
+ if (sender->isPassive) {
+ return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+ } else {
+ return sender->url;
+ }
}
-
bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
return sender->isStatic;
}
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+ return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
//TODO subscriber count -> topic info
}
-void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
//TODO
}
@@ -483,8 +482,7 @@
result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
- result->msgMetrics[i].averageTimeBetweenMessagesInSeconds =
- mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+ result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
result->msgMetrics[i].bndId = entry->bndId;
result->msgMetrics[i].typeId = mEntry->type;
@@ -533,7 +531,11 @@
clock_gettime(CLOCK_REALTIME, &serializationEnd);
}
- if (status == CELIX_SUCCESS /*ser ok*/) {
+ bool cont = false;
+ if (status == CELIX_SUCCESS) /*ser ok*/ {
+ cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+ }
+ if (cont) {
pubsub_protocol_message_t message;
message.metadata.metadata = NULL;
message.payload.payload = NULL;
@@ -555,12 +557,12 @@
entry->seqNr++;
bool sendOk = true;
{
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput,
- serializedIoVecOutputLen, 0);
+ int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
if (rc < 0) {
status = -1;
sendOk = false;
}
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
if (message.metadata.metadata)
celix_properties_destroy(message.metadata.metadata);
if (serializedIoVecOutput) {
@@ -617,8 +619,10 @@
static bool firstSend = true;
if (firstSend) {
- L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
- sleep(FIRST_SEND_DELAY_IN_SECONDS);
+ if (sender->send_delay ) {
+ L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
+ }
+ usleep(sender->send_delay * 1000);
firstSend = false;
}
-}
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
index 2217989..ba5c3c9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
@@ -33,28 +33,23 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- pubsub_tcp_endPointStore_t *endPointStore,
+ pubsub_tcp_endPointStore_t *handlerStore,
long serializerSvcId,
pubsub_serializer_service_t *ser,
long protocolSvcId,
pubsub_protocol_service_t *prot);
void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender);
-
const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender);
-
const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender);
-
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
-
bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
-
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
-
long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
-
+/* Note this functions are deprecated and not used */
void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
-
+/* Note this functions are deprecated and not used */
void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
/**
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 6bd79d0..36f4f94 100644
--- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -162,6 +162,7 @@
message->header.seqNr = 0;
message->header.payloadPartSize = message->header.payloadSize;
message->header.payloadOffset = 0;
+ message->header.isLastSegment = 0x1;
}
}
} else {
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 4b4ad90..be91e26 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -39,7 +39,7 @@
#include "pubsub_admin.h"
#include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
-#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS 30L
+#define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS 30L
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@@ -79,8 +79,11 @@
manager->loghelper = logHelper;
manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
- manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS);
-
+ unsigned handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS);
+ if ( handlingThreadSleepTime >= 0 ) {
+ manager->handlingThreadSleepTime = handlingThreadSleepTime * 1000L;
+ }
+ manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY, manager->handlingThreadSleepTime);
manager->psaHandling.running = true;
celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
@@ -718,9 +721,12 @@
for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
listener->revokeEndpoint(listener->handle, endpoint);
- celix_properties_destroy(endpoint);
}
}
+ for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+ celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+ celix_properties_destroy(endpoint);
+ }
celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
celix_arrayList_destroy(revokeEndpoints);
@@ -758,7 +764,6 @@
"Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
entry->scope == NULL ? "(null)" : entry->scope, entry->topic, adminType, serType);
}
-
if (entry->endpoint != NULL) {
celix_arrayList_add(revokeEndpoints, celix_properties_copy(entry->endpoint));
struct pstm_teardown_entry* teardownEntry = malloc(sizeof(*teardownEntry));
@@ -808,9 +813,13 @@
for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
listener->revokeEndpoint(listener->handle, endpoint);
- celix_properties_destroy(endpoint);
}
}
+ // Clean-up properties
+ for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+ celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+ celix_properties_destroy(endpoint);
+ }
celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
celix_arrayList_destroy(revokeEndpoints);
@@ -1108,7 +1117,7 @@
pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
celixThreadMutex_lock(&manager->psaHandling.mutex);
- celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime, 0L);
+ celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime / 1000, (manager->handlingThreadSleepTime % 1000) * 1000000);
running = manager->psaHandling.running;
celixThreadMutex_unlock(&manager->psaHandling.mutex);
}
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 842d861..9cecf6f 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -31,7 +31,8 @@
#include "pubsub/subscriber.h"
#define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
-#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"
+#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"
+#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS"
#define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE false
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils.h
index de0d1d6..a326068 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils.h
@@ -29,7 +29,8 @@
#define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY "qos"
#define PUBSUB_UTILS_QOS_TYPE_SAMPLE "sample" /* A.k.a. unreliable connection */
#define PUBSUB_UTILS_QOS_TYPE_CONTROL "control" /* A.k.a. reliable connection */
-
+#define PUBSUB_UTILS_PSA_SEND_DELAY "PSA_SEND_DELAY"
+#define PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY 250 // 250 ms
/**
* Returns the pubsub info from the provided filter. A pubsub filter should have a topic and can
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
index 87d4263..b10863c 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
@@ -28,16 +28,16 @@
char *url;
char *protocol;
char *hostname;
- unsigned int portnr;
+ unsigned int port_nr;
char *uri;
char *interface;
- unsigned int interface_portnr;
+ unsigned int interface_port_nr;
char *interface_url;
} pubsub_utils_url_t;
struct sockaddr_in *pubsub_utils_url_from_fd(int fd);
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port);
-char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char *protocol);
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port);
+char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol);
char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol);
bool pubsub_utils_url_is_multicast(char *hostname);
char *pubsub_utils_url_get_multicast_ip(char *hostname);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
index d8d518c..65a1ff2 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
@@ -56,7 +56,7 @@
return inp;
}
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) {
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port) {
struct hostent *hp;
struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in));
bzero(inp, sizeof(struct sockaddr_in)); // zero the struct
@@ -220,11 +220,11 @@
maxPortnr += 1;
unsigned int minDigits = (unsigned int) atoi(portnr);
unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
- url_info->portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+ url_info->port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
} else {
unsigned int portDigits = (unsigned int) atoi(portnr);
if (portDigits != 0)
- url_info->portnr = portDigits;
+ url_info->port_nr = portDigits;
uri = strstr(port, "/");
if ((uri) && (!url_info->uri))
url_info->uri = celix_utils_strdup(uri);
@@ -256,11 +256,11 @@
maxPortnr += 1;
unsigned int minDigits = (unsigned int) atoi(portnr);
unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
- url_info->interface_portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+ url_info->interface_port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
} else {
unsigned int portDigits = (unsigned int) atoi(portnr);
if (portDigits != 0)
- url_info->interface_portnr = portDigits;
+ url_info->interface_port_nr = portDigits;
uri = strstr(port, "/");
if ((uri) && (!url_info->uri))
url_info->uri = celix_utils_strdup(uri);
@@ -289,13 +289,13 @@
free(url_info->interface);
url_info->interface = ip;
}
- struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr);
+ struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL);
free(m_sin);
pubsub_utils_url_parse_url(url_info->interface_url, &interface_url_info);
free(url_info->interface);
url_info->interface = interface_url_info.hostname;
- url_info->interface_portnr = interface_url_info.portnr;
+ url_info->interface_port_nr = interface_url_info.port_nr;
}
if (url_info->hostname) {
@@ -306,11 +306,11 @@
free(url_info->hostname);
url_info->hostname = ip;
}
- struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+ struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol);
free(url_info->hostname);
free(sin);
- url_info->portnr = 0;
+ url_info->port_nr = 0;
url_info->hostname = NULL;
pubsub_utils_url_parse_url(url_info->url, url_info);
}
@@ -338,7 +338,7 @@
url_info->hostname = NULL;
url_info->protocol = NULL;
url_info->interface = NULL;
- url_info->portnr = 0;
- url_info->interface_portnr = 0;
+ url_info->port_nr = 0;
+ url_info->interface_port_nr = 0;
free(url_info);
}
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index c0eb5fc..a22122e 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@
add_celix_bundle(pubsub_endpoint_tst
#Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
SOURCES
- test/tst_activator.c
+ test/tst_endpoint_activator.c
VERSION 1.0.0
)
target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -47,7 +47,7 @@
DESTINATION "META-INF/descriptors"
)
celix_bundle_files(pubsub_endpoint_tst
- meta_data/ping2.properties
+ meta_data/pong3.properties
DESTINATION "META-INF/topics/sub"
)
@@ -65,7 +65,7 @@
DESTINATION "META-INF/descriptors"
)
celix_bundle_files(pubsub_loopback
- meta_data/pong2.properties
+ meta_data/ping3.properties
DESTINATION "META-INF/topics/pub"
)
celix_bundle_files(pubsub_loopback
@@ -162,9 +162,9 @@
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_udp_multicast
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
@@ -188,7 +188,7 @@
endif()
if (BUILD_PUBSUB_PSA_TCP)
- add_celix_container(pubsub_tcp_tests
+ add_celix_container(pubsub_tcp_wire_v1_tests
USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
DIR ${CMAKE_CURRENT_BINARY_DIR}
@@ -198,17 +198,37 @@
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v1
Celix::pubsub_topology_manager
Celix::pubsub_admin_tcp
- Celix::pubsub_protocol_wire_v2
pubsub_sut
pubsub_tst
)
- target_link_libraries(pubsub_tcp_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
- target_include_directories(pubsub_tcp_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
- add_test(NAME pubsub_tcp_tests COMMAND pubsub_tcp_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_tests,CONTAINER_LOC>)
- setup_target_for_coverage(pubsub_tcp_tests SCAN_DIR ..)
+ target_link_libraries(pubsub_tcp_wire_v1_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
+ target_include_directories(pubsub_tcp_wire_v1_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+ add_test(NAME pubsub_tcp_wire_v1_tests COMMAND pubsub_tcp_wire_v1_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_wire_v1_tests,CONTAINER_LOC>)
+ setup_target_for_coverage(pubsub_tcp_wire_v1_tests SCAN_DIR ..)
+ add_celix_container(pubsub_tcp_wire_v2_tests
+ USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+ LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+ DIR ${CMAKE_CURRENT_BINARY_DIR}
+ PROPERTIES
+ LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+ BUNDLES
+ Celix::shell
+ Celix::shell_tui
+ Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
+ Celix::pubsub_topology_manager
+ Celix::pubsub_admin_tcp
+ pubsub_sut
+ pubsub_tst
+ )
+ target_link_libraries(pubsub_tcp_wire_v2_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
+ target_include_directories(pubsub_tcp_wire_v2_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+ add_test(NAME pubsub_tcp_wire_v2_tests COMMAND pubsub_tcp_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_wire_v2_tests,CONTAINER_LOC>)
+ setup_target_for_coverage(pubsub_tcp_wire_v2_tests SCAN_DIR ..)
add_celix_container(pubsub_tcp_endpoint_tests
USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -220,12 +240,12 @@
Celix::shell
Celix::shell_tui
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_tcp
- Celix::pubsub_protocol_wire_v1
- pubsub_loopback
- pubsub_endpoint_sut
pubsub_endpoint_tst
+ pubsub_endpoint_sut
+ pubsub_loopback
)
target_link_libraries(pubsub_tcp_endpoint_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
target_include_directories(pubsub_tcp_endpoint_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
@@ -238,9 +258,9 @@
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_tcp
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
@@ -295,9 +315,9 @@
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_websocket
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
@@ -328,9 +348,9 @@
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v1
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v1
pubsub_sut
pubsub_tst
pubsub_serializer
@@ -370,9 +390,9 @@
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
pubsub_sut
pubsub_tst
pubsub_serializer
@@ -393,9 +413,9 @@
PSA_ZMQ_ZEROCOPY_ENABLED=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v1
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v1
Celix::shell
Celix::shell_tui
pubsub_sut
@@ -442,9 +462,9 @@
PSA_ZMQ_ZEROCOPY_ENABLED=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
pubsub_sut
@@ -465,9 +485,9 @@
LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
BUNDLES
Celix::pubsub_serializer_json
+ Celix::pubsub_protocol_wire_v2
Celix::pubsub_topology_manager
Celix::pubsub_admin_zmq
- Celix::pubsub_protocol_wire_v2
Celix::shell
Celix::shell_tui
)
diff --git a/bundles/pubsub/test/meta_data/ping2.properties b/bundles/pubsub/test/meta_data/ping2.properties
index 4b42836..ff0dbed 100644
--- a/bundles/pubsub/test/meta_data/ping2.properties
+++ b/bundles/pubsub/test/meta_data/ping2.properties
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
tcp.static.bind.url=tcp://localhost:9500
-tcp.static.endpoint.type=server
+tcp.passive.key=tcp://localhost:9500
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/meta_data/ping3.properties b/bundles/pubsub/test/meta_data/ping3.properties
new file mode 100644
index 0000000..5571705
--- /dev/null
+++ b/bundles/pubsub/test/meta_data/ping3.properties
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+tcp.passive.key=tcp://localhost
+tcp.passive.configured=true
+#note only effective if run as root
+thread.realtime.sched=SCHED_FIFO
+thread.realtime.prio=50
+
diff --git a/bundles/pubsub/test/meta_data/pong2.properties b/bundles/pubsub/test/meta_data/pong2.properties
index fa55718..b95f3bc 100644
--- a/bundles/pubsub/test/meta_data/pong2.properties
+++ b/bundles/pubsub/test/meta_data/pong2.properties
@@ -14,8 +14,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-tcp.static.connect.urls=tcp://localhost:9500;localhost:9501
-tcp.static.endpoint.type=client
+tcp.static.connect.urls=tcp://localhost:9500
+tcp.passive.key=tcp://localhost
#note only effective if run as root
thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/meta_data/pong3.properties b/bundles/pubsub/test/meta_data/pong3.properties
new file mode 100644
index 0000000..cb64543
--- /dev/null
+++ b/bundles/pubsub/test/meta_data/pong3.properties
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+tcp.passive.key=tcp://localhost:9500
+tcp.passive.configured=true
+#note only effective if run as root
+thread.realtime.sched=SCHED_FIFO
+thread.realtime.prio=50
+
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/test/test/loopback_activator.c
index 43d8a78..8024fd5 100644
--- a/bundles/pubsub/test/test/loopback_activator.c
+++ b/bundles/pubsub/test/test/loopback_activator.c
@@ -42,7 +42,7 @@
celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
char filter[512];
- snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "pong2");
+ snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "ping3");
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.set = sut_pubSet;
opts.callbackHandle = act;
@@ -86,6 +86,7 @@
msg_t *msg = voidMsg;
msg_t send_msg = *msg;
pthread_mutex_lock(&act->mutex);
+
if (act->pubSvc != NULL) {
if (act->count == 0) {
act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
index 636b9a6..05520ed 100644
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ b/bundles/pubsub/test/test/tst_endpoint_activator.c
@@ -48,7 +48,7 @@
{
celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping2");
+ celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "pong3");
act->subSvc.handle = act;
act->subSvc.receive = tst_receive;
act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);