Merge pull request #286 from apache/feature/async_svc_registration

Feature/async svc registration
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 75fd9bd..96238c5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -32,6 +32,8 @@
     message( FATAL_ERROR "Building Celix using CMake 3.3 and makefiles is not supported due to a bug in the Makefile Generator (see Bug 15696). Please change the used CMake version - both, CMake 3.2 and CMake 3.4 are working fine. Or use a different generator (e.g. Ninja)." )
 ENDIF()
 
+# Options
+option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
 if (ENABLE_TESTING)
     find_package(GTest CONFIG QUIET)
     if (NOT GTest_FOUND)
@@ -120,8 +122,6 @@
 # Default bundle version
 set(DEFAULT_VERSION 1.0.0)
 
-# Options
-option(ENABLE_TESTING "Enables unit/bundle testing" FALSE)
 if (ENABLE_TESTING)
     enable_testing()
 endif()
diff --git a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
index a3bedf4..a19b425 100644
--- a/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_admin_tcp/CMakeLists.txt
@@ -28,6 +28,7 @@
         src/pubsub_tcp_topic_sender.c
         src/pubsub_tcp_topic_receiver.c
         src/pubsub_tcp_handler.c
+        src/pubsub_tcp_common.c
 )
 
 set_target_properties(celix_pubsub_admin_tcp PROPERTIES INSTALL_RPATH "$ORIGIN")
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
index a5ae576..d93c478 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/psa_activator.c
@@ -121,9 +121,7 @@
         celix_properties_t *props = celix_properties_create();
         celix_properties_set(props, CELIX_SHELL_COMMAND_NAME, "celix::psa_tcp");
         celix_properties_set(props, CELIX_SHELL_COMMAND_USAGE, "psa_tcp");
-        celix_properties_set(props,
-                             CELIX_SHELL_COMMAND_DESCRIPTION,
-                             "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
+        celix_properties_set(props, CELIX_SHELL_COMMAND_DESCRIPTION, "Print the information about the TopicSender and TopicReceivers for the TCP PSA");
         act->cmdSvcId = celix_bundleContext_registerService(ctx, &act->cmdSvc, CELIX_SHELL_COMMAND_SERVICE_NAME, props);
     }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
index 4284042..d493eaa 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_psa_tcp_constants.h
@@ -23,7 +23,7 @@
 #define PSA_TCP_BASE_PORT                       "PSA_TCP_BASE_PORT"
 #define PSA_TCP_MAX_PORT                        "PSA_TCP_MAX_PORT"
 
-#define PSA_TCP_MAX_RECV_SESSIONS               "PSA_TCP_MAX_RECV_SESSIONS"
+#define PSA_TCP_MAX_MESSAGE_SIZE                "PSA_TCP_MAX_MESSAGE_SIZE"
 #define PSA_TCP_RECV_BUFFER_SIZE                "PSA_TCP_RECV_BUFFER_SIZE"
 #define PSA_TCP_TIMEOUT                         "PSA_TCP_TIMEOUT"
 #define PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT   "PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT"
@@ -31,8 +31,7 @@
 #define PSA_TCP_DEFAULT_BASE_PORT               5501
 #define PSA_TCP_DEFAULT_MAX_PORT                6000
 
-#define PSA_TCP_DEFAULT_MAX_RECV_SESSIONS       1
-
+#define PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE        UINT32_MAX
 #define PSA_TCP_DEFAULT_RECV_BUFFER_SIZE        65 * 1024
 #define PSA_TCP_DEFAULT_TIMEOUT                 2000 // 2 seconds
 #define PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT 250 // 250 ms
@@ -83,7 +82,7 @@
  * Name of environment variable with ip/url to bind to
  * e.g. PSA_TCP_STATIC_BIND_FOR_topic_scope="tcp://0.0.0.0:4444"
  */
-#define PUBSUB_TCP_STATIC_BIND_URL_FOR "PSA_TCP_STATIC_BIND_URL_FOR_"
+#define PUBSUB_TCP_STATIC_BIND_URL_FOR          "PSA_TCP_STATIC_BIND_URL_FOR_"
 
 /**
  * Can be set in the topic properties to fix a static url used for discovery
@@ -102,21 +101,33 @@
  */
 #define PUBSUB_TCP_STATIC_CONNECT_URLS          "tcp.static.connect.urls"
 
+
+/**
+ * Defines if the publisher / subscriber is a passive endpoint and shares
+ * the connection with publisher / subscriber endpoint with the matching (passive) key
+ * e.g. tcp.passive.configured="true" means that a publisher / subscriber is passive,
+ * when a publisher / subscriber is found with a matching key (for example tcp.passive.key="localhost").
+ * This creates full-duplex connection using a single socket.
+ */
+#define PUBSUB_TCP_PASSIVE_CONFIGURED            "tcp.passive.configured"
+#define PUBSUB_TCP_PASSIVE_KEY                   "tcp.passive.key"
+
+/**
+ * Name of environment variable to indicate that passive endpoint is configured
+ * e.g. PSA_TCP_PASSIVE_CONFIGURED_topic_scope="true"
+ */
+#define PUBSUB_TCP_PASSIVE_ENABLED               "PSA_TCP_PASSIVE_CONFIGURED_"
+/**
+ * Name of environment variable to configure the passive key (see PUBSUB_TCP_PASSIVE_KEY )
+ * e.g. PSA_TCP_PASSIVE_KEY__topic_scope="tcp://localhost:4444"
+ */
+#define PUBSUB_TCP_PASSIVE_SELECTION_KEY         "PSA_TCP_PASSIVE_KEY_"
+
 /**
  * Name of environment variable with space-separated list of ips/urls to connect to
  * e.g. PSA_TCP_STATIC_CONNECT_FOR_topic_scope="tcp://127.0.0.1:4444 tcp://127.0.0.2:4444"
  */
-#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR "PSA_TCP_STATIC_CONNECT_URL_FOR_"
-
-/**
- * The static endpoint type which a static endpoint should be configured.
- * Can be set in the topic properties.
- */
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE         "tcp.static.endpoint.type"
-
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER  "server"
-#define PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT  "client"
-
+#define PUBSUB_TCP_STATIC_CONNECT_URLS_FOR       "PSA_TCP_STATIC_CONNECT_URL_FOR_"
 
 /**
  * Realtime thread prio and scheduling information. This is used to setup the thread prio/sched of the
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
index 89f5863..ba3bdb8 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_admin.c
@@ -18,11 +18,6 @@
  */
 
 #include <memory.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netdb.h>
-#include <ifaddrs.h>
 #include <pubsub_endpoint.h>
 #include <pubsub_serializer.h>
 #include <ip_utils.h>
@@ -51,7 +46,6 @@
     char *ipAddress;
 
     unsigned int basePort;
-    unsigned int maxPort;
 
     double qosSampleScore;
     double qosControlScore;
@@ -119,9 +113,7 @@
     psa->verbose = celix_bundleContext_getPropertyAsBool(ctx, PUBSUB_TCP_VERBOSE_KEY, PUBSUB_TCP_VERBOSE_DEFAULT);
     psa->fwUUID = celix_bundleContext_getProperty(ctx, OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
     long basePort = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_BASE_PORT, PSA_TCP_DEFAULT_BASE_PORT);
-    long maxPort = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_PORT, PSA_TCP_DEFAULT_MAX_PORT);
     psa->basePort = (unsigned int) basePort;
-    psa->maxPort = (unsigned int) maxPort;
     psa->defaultScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_DEFAULT_SCORE_KEY, PSA_TCP_DEFAULT_SCORE);
     psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_SAMPLE_SCORE_KEY,
                                                                   PSA_TCP_DEFAULT_QOS_SAMPLE_SCORE);
@@ -434,17 +426,19 @@
             const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
             const char *serType = serEntry->serType;
             const char *protType = protEntry->protType;
-            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
-                                                serType, protType, NULL);
+            newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
             celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
 
             celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
-            celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
-
+            if (pubsub_tcpTopicSender_isPassive(sender)) {
+                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
+            } else {
+                celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_SYSTEM_VISIBILITY);
+            }
             //if available also set container name
             const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
             if (cn != NULL)
-                celix_properties_set(newEndpoint, "container_name", cn);
+              celix_properties_set(newEndpoint, "container_name", cn);
             hashMap_put(psa->topicSenders.map, key, sender);
         } else {
             L_ERROR("[PSA TCP] Error creating a TopicSender");
@@ -458,10 +452,6 @@
     celixThreadMutex_unlock(&psa->protocols.mutex);
     celixThreadMutex_unlock(&psa->serializers.mutex);
 
-    if (sender != NULL && newEndpoint != NULL) {
-        //TODO connect endpoints to sender, NOTE is this needed for a tcp topic sender?
-    }
-
     if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
         *outPublisherEndpoint = newEndpoint;
     }
@@ -483,7 +473,6 @@
         char *mapKey = hashMapEntry_getKey(entry);
         pubsub_tcp_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
         free(mapKey);
-        //TODO disconnect endpoints to sender. note is this needed for a tcp topic sender?
         pubsub_tcpTopicSender_destroy(sender);
     } else {
         L_ERROR("[PSA TCP] Cannot teardown TopicSender with scope/topic %s/%s. Does not exists",
@@ -636,24 +625,13 @@
     //note can be called with discoveredEndpoint.mutex lock
     celix_status_t status = CELIX_SUCCESS;
 
-    const char *scope = pubsub_tcpTopicReceiver_scope(receiver);
-    const char *topic = pubsub_tcpTopicReceiver_topic(receiver);
-
-    const char *eScope = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_SCOPE, NULL);
-    const char *eTopic = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TOPIC_NAME, NULL);
     const char *url = celix_properties_get(endpoint, PUBSUB_TCP_URL_KEY, NULL);
 
     if (url == NULL) {
         L_WARN("[PSA TCP] Error got endpoint without tcp url");
         status = CELIX_BUNDLE_EXCEPTION;
     } else {
-        if (eTopic != NULL && topic != NULL && strncmp(eTopic, topic, 1024 * 1024) == 0) {
-            if (scope == NULL && eScope == NULL) {
-                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
-            } else if (scope != NULL && eScope != NULL && strncmp(eScope, scope, 1024 * 1024) == 0) {
-                pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
-            }
-        }
+        pubsub_tcpTopicReceiver_disconnectFrom(receiver, url);
     }
 
     return status;
@@ -689,6 +667,21 @@
                                     FILE *errStream __attribute__((unused))) {
     pubsub_tcp_admin_t *psa = handle;
     celix_status_t status = CELIX_SUCCESS;
+    char *line = celix_utils_strdup(commandLine);
+    char *token = line;
+    strtok_r(line, " ", &token); //first token is command name
+    strtok_r(NULL, " ", &token); //second token is sub command
+
+    if (celix_utils_stringEquals(token, "nr_of_receivers")) {
+        celixThreadMutex_lock(&psa->topicReceivers.mutex);
+        fprintf(out,"%i\n", hashMap_size(psa->topicReceivers.map));
+        celixThreadMutex_unlock(&psa->topicReceivers.mutex);
+    }
+    if (celix_utils_stringEquals(token, "nr_of_senders")) {
+        celixThreadMutex_lock(&psa->topicSenders.mutex);
+        fprintf(out, "%i\n", hashMap_size(psa->topicSenders.map));
+        celixThreadMutex_unlock(&psa->topicSenders.mutex);
+    }
 
     fprintf(out, "\n");
     fprintf(out, "Topic Senders:\n");
@@ -707,11 +700,12 @@
         const char *scope = pubsub_tcpTopicSender_scope(sender);
         const char *topic = pubsub_tcpTopicSender_topic(sender);
         const char *url = pubsub_tcpTopicSender_url(sender);
+        const char *isPassive = pubsub_tcpTopicSender_isPassive(sender) ? " (passive)" : "";
         const char *postUrl = pubsub_tcpTopicSender_isStatic(sender) ? " (static)" : "";
         fprintf(out, "|- Topic Sender %s/%s\n", scope == NULL ? "(null)" : scope, topic);
         fprintf(out, "   |- serializer type = %s\n", serType);
         fprintf(out, "   |- protocol type = %s\n", protType);
-        fprintf(out, "   |- url            = %s%s\n", url, postUrl);
+        fprintf(out, "   |- url            = %s%s%s\n", url, postUrl, isPassive);
     }
     celixThreadMutex_unlock(&psa->topicSenders.mutex);
     celixThreadMutex_unlock(&psa->protocols.mutex);
@@ -788,4 +782,4 @@
     celixThreadMutex_unlock(&psa->topicReceivers.mutex);
 
     return result;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
new file mode 100644
index 0000000..d8f05f7
--- /dev/null
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.c
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <stdio.h>
+#include <string.h>
+#include "pubsub_tcp_common.h"
+
+
+bool psa_tcp_isPassive(const char* buffer) {
+    bool isPassive = false;
+    // Parse Properties
+    if (buffer != NULL) {
+        char buf[32];
+        snprintf(buf, 32, "%s", buffer);
+        char *trimmed = utils_stringTrim(buf);
+        if (strncasecmp("true", trimmed, strlen("true")) == 0) {
+            isPassive = true;
+        } else if (strncasecmp("false", trimmed, strlen("false")) == 0) {
+            isPassive = false;
+        }
+    }
+    return isPassive;
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
index 1fe1d00..9ea31db 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_common.h
@@ -28,4 +28,6 @@
     hash_map_t *map;
 } pubsub_tcp_endPointStore_t;
 
+bool psa_tcp_isPassive(const char* buffer);
+
 #endif //CELIX_PUBSUB_TCP_COMMON_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
index 5ab1ea6..262940b 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.c
@@ -39,10 +39,6 @@
 #include <sys/epoll.h>
 #endif
 #include <limits.h>
-#include <assert.h>
-#include "ctype.h"
-#include <netdb.h>
-#include <signal.h>
 #include <fcntl.h>
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
@@ -80,18 +76,26 @@
     bool connected;
     bool headerError;
     pubsub_protocol_message_t header;
-    unsigned int syncSize;
-    unsigned int headerSize;
-    unsigned int headerBufferSize; // Size of headerBuffer, size = 0, no headerBuffer -> included in payload
-    void *headerBuffer;
-    unsigned int footerSize;
-    void *footerBuffer;
-    unsigned int bufferSize;
+    size_t maxMsgSize;
+    size_t readHeaderSize;
+    size_t readHeaderBufferSize; // Size of headerBuffer
+    void *readHeaderBuffer;
+    size_t writeHeaderBufferSize; // Size of headerBuffer
+    void *writeHeaderBuffer;
+    size_t readFooterSize;
+    size_t readFooterBufferSize;
+    void *readFooterBuffer;
+    size_t writeFooterBufferSize;
+    void *writeFooterBuffer;
+    size_t bufferSize;
     void *buffer;
-    unsigned int bufferReadSize;
-    unsigned int metaBufferSize;
-    void *metaBuffer;
+    size_t readMetaBufferSize;
+    void *readMetaBuffer;
+    size_t writeMetaBufferSize;
+    void *writeMetaBuffer;
     unsigned int retryCount;
+    celix_thread_mutex_t writeMutex;
+    struct msghdr readMsg;
 } psa_tcp_connection_entry_t;
 
 //
@@ -116,40 +120,33 @@
     celix_log_helper_t *logHelper;
     pubsub_protocol_service_t *protocol;
     unsigned int bufferSize;
-    unsigned int maxNofBuffer;
+    unsigned int maxMsgSize;
     unsigned int maxSendRetryCount;
     unsigned int maxRcvRetryCount;
     double sendTimeout;
     double rcvTimeout;
     celix_thread_t thread;
     bool running;
+    bool enableReceiveEvent;
 };
 
-static inline int
-pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
-
+static inline int pubsub_tcpHandler_closeConnectionEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, bool lock);
 static inline int pubsub_tcpHandler_closeInterfaceEntry(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
 static inline int pubsub_tcpHandler_makeNonBlocking(pubsub_tcpHandler_t *handle, int fd);
-
-static inline psa_tcp_connection_entry_t *
-pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *external_url,
-                              struct sockaddr_in *addr);
-
+static inline psa_tcp_connection_entry_t* pubsub_tcpHandler_createEntry(pubsub_tcpHandler_t *handle, int fd, char *url, char *interface_url, struct sockaddr_in *addr);
 static inline void pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry);
-
 static inline void pubsub_tcpHandler_releaseEntryBuffer(pubsub_tcpHandler_t *handle, int fd, unsigned int index);
-
-static inline int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* buffer, unsigned int offset, unsigned int size, int flag );
-
+static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry);
+static inline void pubsub_tcpHandler_ensureReadBufferCapacity(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
+static inline bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry, long int* msgSize);
 static inline void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry);
-
+static inline long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry);
 static inline void pubsub_tcpHandler_connectionHandler(pubsub_tcpHandler_t *handle, int fd);
-
 static inline void pubsub_tcpHandler_handler(pubsub_tcpHandler_t *handle);
-
 static void *pubsub_tcpHandler_thread(void *data);
 
+
+
 //
 // Create a handle
 //
@@ -169,7 +166,6 @@
         handle->logHelper = logHelper;
         handle->protocol = protocol;
         handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE;
-        handle->maxNofBuffer = 1; // Reserved for future Use;
         celixThreadRwlock_create(&handle->dbLock, 0);
         handle->running = true;
         celixThread_create(&handle->thread, NULL, pubsub_tcpHandler_thread, handle);
@@ -263,7 +259,7 @@
                 L_ERROR("[TCP Socket] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno));
             }
         }
-        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if (addr) {
             rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc != 0) {
@@ -313,33 +309,45 @@
     if (fd >= 0) {
         entry = calloc(sizeof(psa_tcp_connection_entry_t), 1);
         entry->fd = fd;
-        if (url)
+        celixThreadMutex_create(&entry->writeMutex, NULL);
+        if (url) {
             entry->url = strndup(url, 1024 * 1024);
+        }
         if (interface_url) {
             entry->interface_url = strndup(interface_url, 1024 * 1024);
         } else {
-            if (url)
+            if (url) {
                 entry->interface_url = strndup(url, 1024 * 1024);
+            }
         }
-        if (addr)
+        if (addr) {
             entry->addr = *addr;
-        entry->len = sizeof(struct sockaddr_in);
-        size_t size = 0;
-        handle->protocol->getHeaderSize(handle->protocol->handle, &size);
-        entry->headerSize = size;
-        handle->protocol->getHeaderBufferSize(handle->protocol->handle, &size);
-        entry->headerBufferSize = size;
-        handle->protocol->getSyncHeaderSize(handle->protocol->handle, &size);
-        entry->syncSize = size;
-        handle->protocol->getFooterSize(handle->protocol->handle, &size);
-        entry->footerSize = size;
-        entry->bufferSize = handle->bufferSize;
-        entry->connected = false;
-        if (entry->headerBufferSize) {
-            entry->headerBuffer = calloc(sizeof(char), entry->headerSize);
         }
-        if (entry->footerSize) entry->footerBuffer = calloc(sizeof(char), entry->footerSize);
+        entry->len = sizeof(struct sockaddr_in);
+        size_t headerSize = 0;
+        size_t footerSize = 0;
+        handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+        handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+        entry->readHeaderBufferSize = headerSize;
+        entry->writeHeaderBufferSize = headerSize;
+
+        entry->readFooterBufferSize = footerSize;
+        entry->writeFooterBufferSize = footerSize;
+        entry->bufferSize = MAX(handle->bufferSize, headerSize);
+        entry->connected = false;
+        unsigned minimalMsgSize = entry->writeHeaderBufferSize + entry->writeFooterBufferSize;
+        if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) {
+            L_ERROR("[TCP Socket] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize);
+        } else {
+            entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : LONG_MAX;
+        }
+        entry->readHeaderBuffer = calloc(sizeof(char), headerSize);
+        entry->writeHeaderBuffer = calloc(sizeof(char), headerSize);
+        if (entry->readFooterBufferSize ) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterBufferSize );
+        if (entry->writeFooterBufferSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterBufferSize);
         if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize);
+        memset(&entry->readMsg, 0x00, sizeof(struct msghdr));
+        entry->readMsg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX);
     }
     return entry;
 }
@@ -350,40 +358,18 @@
 static inline void
 pubsub_tcpHandler_freeEntry(psa_tcp_connection_entry_t *entry) {
     if (entry) {
-        if (entry->url) {
-            free(entry->url);
-            entry->url = NULL;
-        }
-        if (entry->interface_url) {
-            free(entry->interface_url);
-            entry->interface_url = NULL;
-        }
-        if (entry->fd >= 0) {
-            close(entry->fd);
-            entry->fd = -1;
-        }
-        if (entry->buffer) {
-            free(entry->buffer);
-            entry->buffer = NULL;
-            entry->bufferSize = 0;
-        }
-        if (entry->headerBuffer) {
-            free(entry->headerBuffer);
-            entry->headerBuffer = NULL;
-            entry->headerBufferSize = 0;
-        }
-
-        if (entry->footerBuffer) {
-            free(entry->footerBuffer);
-            entry->footerBuffer = NULL;
-        }
-
-        if (entry->metaBuffer) {
-            free(entry->metaBuffer);
-            entry->metaBuffer = NULL;
-            entry->metaBufferSize = 0;
-        }
-        entry->connected = false;
+        free(entry->url);
+        free(entry->interface_url);
+        if (entry->fd >= 0) close(entry->fd);
+        free(entry->buffer);
+        free(entry->readHeaderBuffer);
+        free(entry->writeHeaderBuffer);
+        free(entry->readFooterBuffer);
+        free(entry->writeFooterBuffer);
+        free(entry->readMetaBuffer);
+        free(entry->writeMetaBuffer);
+        free(entry->readMsg.msg_iov);
+        celixThreadMutex_destroy(&entry->writeMutex);
         free(entry);
     }
 }
@@ -405,8 +391,7 @@
 //
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url) {
     int rc = 0;
-    psa_tcp_connection_entry_t *entry =
-        hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
+    psa_tcp_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url);
     if (entry == NULL) {
         pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url);
         int fd = pubsub_tcpHandler_open(handle, url_info->interface_url);
@@ -416,12 +401,11 @@
         socklen_t len = sizeof(sin);
         getsockname(fd, (struct sockaddr *) &sin, &len);
         char *interface_url = pubsub_utils_url_get_url(&sin, NULL);
-        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         if ((rc >= 0) && addr) {
             rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr));
             if (rc < 0 && errno != EINPROGRESS) {
-                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using; %s err: %s\n", url_info->hostname, url_info->portnr, interface_url,
-                        strerror(errno));
+                L_ERROR("[TCP Socket] Cannot connect to %s:%d: using %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno));
                 close(fd);
             } else {
                 entry = pubsub_tcpHandler_createEntry(handle, fd, url, interface_url, &sin);
@@ -454,7 +438,7 @@
             hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry);
             celixThreadRwlock_unlock(&handle->dbLock);
             pubsub_tcpHandler_connectionHandler(handle, fd);
-            L_INFO("[TCP Socket] Connect to %s using; %s\n", entry->url, entry->interface_url);
+            L_INFO("[TCP Socket] Connect to %s using: %s\n", entry->url, entry->interface_url);
         }
         pubsub_utils_url_free(url_info);
     }
@@ -554,7 +538,7 @@
     else {
         rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
         if (rc < 0) {
-            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING epoll: %s\n", strerror(errno));
+            L_ERROR("[TCP Socket] Cannot set to NON_BLOCKING: %s\n", strerror(errno));
         }
     }
     return rc;
@@ -572,6 +556,7 @@
     if (entry == NULL) {
         char protocol[] = "tcp";
         int fd = pubsub_tcpHandler_open(handle, url);
+        rc = fd;
         struct sockaddr_in *sin = pubsub_utils_url_from_fd(fd);
         // Make handler fd entry
         char *pUrl = pubsub_utils_url_get_url(sin, protocol);
@@ -581,7 +566,6 @@
             free(pUrl);
             free(sin);
             celixThreadRwlock_writeLock(&handle->dbLock);
-            rc = fd;
             if (rc >= 0) {
                 rc = listen(fd, SOMAXCONN);
                 if (rc != 0) {
@@ -630,16 +614,26 @@
 }
 
 //
-// Setup buffer sizes
+// Setup default receive buffer size.
+// This size is used to allocated the initial read buffer, to avoid receive buffer reallocting.
+// The default receive buffer is allocated in the createEntry when the connection is establised
 //
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers
-                                               __attribute__((__unused__)),
-                                               unsigned int bufferSize) {
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size) {
     if (handle != NULL) {
         celixThreadRwlock_writeLock(&handle->dbLock);
-        handle->bufferSize = bufferSize;
-        handle->maxNofBuffer = maxNofBuffers;
+        handle->bufferSize = size;
+        celixThreadRwlock_unlock(&handle->dbLock);
+    }
+    return 0;
+}
+
+//
+// Set Maximum message size
+//
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->maxMsgSize = size;
         celixThreadRwlock_unlock(&handle->dbLock);
     }
     return 0;
@@ -710,7 +704,7 @@
             if (prio > 0 && prio < 100) {
                 struct sched_param sch;
                 bzero(&sch, sizeof(struct sched_param));
-                sch.sched_priority = prio;
+                sch.sched_priority = (int)prio;
                 pthread_setschedparam(handle->thread.thread, policy, &sch);
             } else {
                 L_INFO("Skipping configuration of thread prio to %i and thread "
@@ -754,35 +748,107 @@
     }
 }
 
-static inline
-int pubsub_tcpHandler_readSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, int fd, void* _buffer, unsigned int offset, unsigned int size, int flag ) {
-    int expectedReadSize = size;
-    int nbytes = size;
-    int msgSize = 0;
-    char* buffer = (char*)_buffer;
-    while (nbytes > 0 && expectedReadSize > 0) {
-        // Read the message header
-        nbytes = recv(fd, &buffer[offset], expectedReadSize, flag | MSG_NOSIGNAL);
-        // Update buffer administration
-        offset += nbytes;
-        expectedReadSize -= nbytes;
-        msgSize += nbytes;
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle,bool enable) {
+    if (handle != NULL) {
+        celixThreadRwlock_writeLock(&handle->dbLock);
+        handle->enableReceiveEvent = enable;
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    if (nbytes <=0)  msgSize = nbytes;
-    return msgSize;
 }
 
+static inline long int pubsub_tcpHandler_getMsgSize(psa_tcp_connection_entry_t *entry) {
+    // Note header message is already read
+    return (long int)entry->header.header.payloadPartSize + (long int)entry->header.header.metadataSize + (long int)entry->readFooterSize;
+}
+
+static inline 
+bool pubsub_tcpHandler_readHeader(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry, long int* msgSize) {
+    bool result = false;
+    size_t syncSize = 0;
+    size_t protocolHeaderBufferSize = 0;
+    // Get Sync Size
+    handle->protocol->getSyncHeaderSize(handle->protocol->handle, &syncSize);
+    // Get HeaderSize of the Protocol Header
+    handle->protocol->getHeaderSize(handle->protocol->handle, &entry->readHeaderSize);
+    // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+    handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+
+    // Ensure capacity in header buffer
+    pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+
+    entry->readMsg.msg_iovlen = 0;
+    entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer;
+    entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len  = entry->readHeaderBufferSize;
+    entry->readMsg.msg_iovlen++;
+
+    // Read the message
+    long int nbytes = 0;
+    // Use peek flag to find sync word or when header is part of the payload
+    unsigned int flag = (entry->headerError || (!protocolHeaderBufferSize)) ? MSG_PEEK : 0;
+    if (entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL | flag);
+    if (nbytes >= entry->readHeaderSize) {
+        if (handle->protocol->decodeHeader(handle->protocol->handle,
+                                           entry->readMsg.msg_iov[0].iov_base,
+                                           entry->readMsg.msg_iov[0].iov_len,
+                                           &entry->header) == CELIX_SUCCESS) {
+            // read header from queue, when recovered from headerError and when header is not part of the payload. (Because of MSG_PEEK)
+            if (entry->headerError && protocolHeaderBufferSize && entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);
+            entry->headerError = false;
+            result = true;
+        } else {
+            // Did not receive correct header
+            // skip sync word and try to read next header
+            if (!entry->headerError) {
+                L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
+            }
+            entry->headerError = true;
+            entry->readMsg.msg_iovlen = 0;
+            entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = syncSize;
+            entry->readMsg.msg_iovlen++;
+            // remove sync item from the queue
+            if (syncSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);
+        }
+    }
+    if (msgSize) *msgSize = nbytes;
+    return result;
+}
+
+
+static inline void pubsub_tcpHandler_ensureReadBufferCapacity(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
+    if (entry->readHeaderSize > entry->readHeaderBufferSize) {
+        free(entry->readHeaderBuffer);
+        entry->readHeaderBuffer = malloc((size_t) entry->readHeaderSize);
+        entry->readHeaderBufferSize = entry->readHeaderSize;
+    }
+
+    if (entry->header.header.payloadSize > entry->bufferSize) {
+        free(entry->buffer);
+        entry->buffer = malloc((size_t)entry->header.header.payloadSize);
+        entry->bufferSize = entry->header.header.payloadSize;
+    }
+
+    if (entry->header.header.metadataSize > entry->readMetaBufferSize) {
+        free(entry->readMetaBuffer);
+        entry->readMetaBuffer = malloc((size_t) entry->header.header.metadataSize);
+        entry->readMetaBufferSize = entry->header.header.metadataSize;
+    }
+
+    if (entry->readFooterSize > entry->readFooterBufferSize) {
+        free(entry->readFooterBuffer);
+        entry->readFooterBuffer = malloc( (size_t) entry->readFooterSize);
+        entry->readFooterBufferSize = entry->readFooterSize;
+    }
+}
 
 static inline
 void pubsub_tcpHandler_decodePayload(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry) {
 
   if (entry->header.header.payloadSize > 0) {
-    handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
+      handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header);
   }
   if (entry->header.header.metadataSize > 0) {
-    handle->protocol->decodeMetadata(handle->protocol->handle, entry->metaBuffer,
-                                     entry->header.header.metadataSize, &entry->header);
-    entry->metaBufferSize = entry->header.header.metadataSize;
+      handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer,
+                                       entry->header.header.metadataSize, &entry->header);
   }
   if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) {
     struct timespec receiveTime;
@@ -793,16 +859,65 @@
   }
 }
 
+static inline
+long int pubsub_tcpHandler_readPayload(pubsub_tcpHandler_t *handle, int fd, psa_tcp_connection_entry_t *entry) {
+    entry->readMsg.msg_iovlen = 0;
+    handle->protocol->getFooterSize(handle->protocol->handle, &entry->readFooterSize);
+
+    // from the header can be determined how large buffers should be. Even before receiving all data these buffers can be allocated
+    pubsub_tcpHandler_ensureReadBufferCapacity(handle, entry);
+
+    if (entry->header.header.payloadPartSize) {
+        char* buffer = entry->buffer;
+        entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset];
+        entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->header.header.payloadPartSize;
+        entry->readMsg.msg_iovlen++;
+    }
+    if (entry->header.header.metadataSize) {
+        entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readMetaBuffer;
+        entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len  = entry->header.header.metadataSize;
+        entry->readMsg.msg_iovlen++;
+    }
+
+    if (entry->readFooterSize) {
+        entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readFooterBuffer;
+        entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len  = entry->readFooterSize;
+        entry->readMsg.msg_iovlen++;
+    }
+
+    long int nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL);
+    if (nbytes >= pubsub_tcpHandler_getMsgSize(entry)) {
+        bool valid = true;
+        if (entry->readFooterSize) {
+            if (handle->protocol->decodeFooter(handle->protocol->handle, entry->readFooterBuffer, entry->readFooterBufferSize, &entry->header) != CELIX_SUCCESS) {
+                // Did not receive correct footer
+                L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
+                valid = false;
+            }
+        }
+        if (!entry->header.header.isLastSegment) {
+            // Not last Segment of message
+            valid = false;
+        }
+
+        if (valid) {
+            // Complete message is received
+            pubsub_tcpHandler_decodePayload(handle, entry);
+        }
+    }
+    return nbytes;
+}
 
 //
 // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure
 // If the message is completely reassembled true is returned and the index and size have valid values
 //
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd) {
-    celixThreadRwlock_writeLock(&handle->dbLock);
+    celixThreadRwlock_readLock(&handle->dbLock);
     psa_tcp_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd);
-    if (entry == NULL)
+    if (entry == NULL) {
         entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd);
+    }
     // Find FD entry
     if (entry == NULL) {
         celixThreadRwlock_unlock(&handle->dbLock);
@@ -813,114 +928,22 @@
         celixThreadRwlock_unlock(&handle->dbLock);
         return -1;
     }
-
-    // Message buffer is to small, reallocate to make it bigger
-    if ((!entry->headerBufferSize) && (entry->headerSize > entry->bufferSize)) {
-        handle->bufferSize = MAX(handle->bufferSize, entry->headerSize );
-        if (entry->buffer) free(entry->buffer);
-            entry->buffer = malloc((size_t) handle->bufferSize);
-            entry->bufferSize = handle->bufferSize;
-        }
-    // Read the message
-    bool validMsg = false;
-    char* header_buffer = (entry->headerBufferSize) ? entry->headerBuffer : entry->buffer;
-    int nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, MSG_PEEK);
-    if (nbytes > 0) {
-        // Check header message buffer
-        if (handle->protocol->decodeHeader(handle->protocol->handle, header_buffer, entry->headerSize, &entry->header) != CELIX_SUCCESS) {
-            // Did not receive correct header
-            // skip sync word and try to read next header
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->syncSize, 0);
-            if (!entry->headerError) {
-                L_WARN("[TCP Socket] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url);
-            }
-            entry->headerError = true;
-            entry->bufferReadSize = 0;
-        } else {
-            // Read header message from queue
-            nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, header_buffer, 0, entry->headerSize, 0);
-            if ((nbytes > 0) && (nbytes == entry->headerSize)) {
-                entry->headerError = false;
-                // For headerless message, add header to bufferReadSize;
-                if (!entry->headerBufferSize)
-                    entry->bufferReadSize += nbytes;
-                // Alloc message buffers
-                if (entry->header.header.payloadSize > entry->bufferSize) {
-                    handle->bufferSize = MAX(handle->bufferSize, entry->header.header.payloadSize);
-                    if (entry->buffer)
-                        free(entry->buffer);
-                    entry->buffer = malloc((size_t) handle->bufferSize);
-                    entry->bufferSize = handle->bufferSize;
-                }
-                if (entry->header.header.metadataSize > entry->metaBufferSize) {
-                    if (entry->metaBuffer) {
-                      free(entry->metaBuffer);
-                    }
-                    entry->metaBuffer = malloc((size_t) entry->header.header.metadataSize);
-                    entry->metaBufferSize = entry->header.header.metadataSize;
-                    L_WARN("[TCP Socket] socket: %d, url: %s,  realloc read meta buffer: (%d, %d) \n", entry->fd,
-                      entry->url, entry->metaBufferSize, entry->header.header.metadataSize);
-                }
-
-                if (entry->header.header.payloadSize) {
-                    unsigned int offset = entry->header.header.payloadOffset;
-                    unsigned int size = entry->header.header.payloadPartSize;
-                    // For header less messages adjust offset and msg size;
-                    if (!entry->headerBufferSize) {
-                        offset = entry->headerSize;
-                        size -= offset;
-                    }
-                    // Read payload data from queue
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->buffer, offset, size, 0);
-                    if (nbytes > 0) {
-                        if (nbytes == size) {
-                            entry->bufferReadSize += nbytes;
-                        } else {
-                            entry->bufferReadSize = 0;
-                            L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                        }
-                    }
-                }
-                if (nbytes > 0 && entry->header.header.metadataSize) {
-                    // Read meta data from queue
-                    unsigned int size = entry->header.header.metadataSize;
-                    nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->metaBuffer,0, size,0);
-                    if ((nbytes > 0) && (nbytes != size)) {
-                        L_ERROR("[TCP Socket] Failed to receive complete payload buffer (fd: %d) nbytes : %d = msgSize %d", entry->fd, nbytes, size);
-                    }
-                }
-                // Check for end of message using, footer of message. Because of streaming protocol
-                if (nbytes > 0) {
-                    if (entry->footerSize > 0) {
-                        nbytes = pubsub_tcpHandler_readSocket(handle, entry, fd, entry->footerBuffer,0, entry->footerSize,0);
-                        if (handle->protocol->decodeFooter(handle->protocol->handle, entry->footerBuffer, entry->footerSize, &entry->header) == CELIX_SUCCESS) {
-                            // valid footer, this means that the message is valid
-                            validMsg = true;
-                        } else {
-                            // Did not receive correct header
-                            L_ERROR("[TCP Socket] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url);
-                            entry->bufferReadSize = 0;
-                        }
-                    } else {
-                        // No Footer, then complete message is received
-                        validMsg = true;
-                    }
-                }
-            }
-        }
+    long int nbytes = 0;
+    // if not yet enough bytes are received the header can not be read
+    if (pubsub_tcpHandler_readHeader(handle, fd, entry, &nbytes)) {
+        nbytes = pubsub_tcpHandler_readPayload(handle, fd, entry);
     }
     if (nbytes > 0) {
         entry->retryCount = 0;
-        // Check if complete message is received
-        if ((entry->bufferReadSize >= entry->header.header.payloadSize) && validMsg) {
-            entry->bufferReadSize = 0;
-            pubsub_tcpHandler_decodePayload(handle, entry);
-        }
-    } else {
-        if (entry->retryCount < handle->maxRcvRetryCount) {
+    } else if (nbytes < 0) {
+        if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+            // Non blocking interrupt
+            entry->retryCount = 0;
+        } else if (entry->retryCount < handle->maxRcvRetryCount) {
             entry->retryCount++;
-            L_WARN("[TCP Socket] Failed to receive message (fd: %d), error: %s. Retry count %u of %u,", entry->fd,
-                   strerror(errno), entry->retryCount, handle->maxRcvRetryCount);
+            L_WARN(
+                    "[TCP Socket] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.",
+                    entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
         } else {
             L_ERROR("[TCP Socket] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s",
                     entry->fd, handle->maxRcvRetryCount, strerror(errno));
@@ -928,10 +951,9 @@
         }
     }
     celixThreadRwlock_unlock(&handle->dbLock);
-    return nbytes;
+    return (int)nbytes;
 }
 
-
 int pubsub_tcpHandler_addMessageHandler(pubsub_tcpHandler_t *handle, void *payload,
                                         pubsub_tcpHandler_processMessage_callback_t processMessageCallback) {
     int result = 0;
@@ -954,70 +976,32 @@
     return result;
 }
 
-int pubsub_tcpHandler_addAcceptConnectionCallback(pubsub_tcpHandler_t *handle, void *payload,
-                                                  pubsub_tcpHandler_acceptConnectMessage_callback_t connectMessageCallback,
-                                                  pubsub_tcpHandler_acceptConnectMessage_callback_t disconnectMessageCallback) {
-    int result = 0;
-    celixThreadRwlock_writeLock(&handle->dbLock);
-    handle->acceptConnectMessageCallback = connectMessageCallback;
-    handle->acceptDisconnectMessageCallback = disconnectMessageCallback;
-    handle->acceptConnectPayload = payload;
-    celixThreadRwlock_unlock(&handle->dbLock);
-    return result;
-}
-
-static inline
-int pubsub_tcpHandler_writeSocket(pubsub_tcpHandler_t *handle, psa_tcp_connection_entry_t *entry, struct msghdr* msg, unsigned int size, int flag ) {
-  int nbytes = 0;
-  int msgSize = 0;
-  if (entry->fd >= 0 && size && msg->msg_iovlen) {
-    int expectedReadSize = size;
-    unsigned int offset = 0;
-    nbytes = size;
-    while (nbytes > 0 && expectedReadSize > 0) {
-      // Read the message header
-      nbytes = sendmsg(entry->fd, msg, flag | MSG_NOSIGNAL);
-      // Update admin
-      expectedReadSize -= nbytes;
-      msgSize += nbytes;
-      // Not all written
-      if (expectedReadSize && nbytes > 0) {
-        unsigned int readSize = 0;
-        unsigned int readIndex = 0;
-        unsigned int i = 0;
-        for (i = 0; i < msg->msg_iovlen; i++) {
-          if (nbytes < msg->msg_iov[i].iov_len) {
-            readIndex = i;
-            break;
-          }
-          readSize+= msg->msg_iov[i].iov_len;
-        }
-        msg->msg_iov = &msg->msg_iov[readIndex];
-        msg->msg_iovlen -= readIndex;
-        char* buffer = (char*)msg->msg_iov->iov_base;
-        offset = nbytes - readSize;
-        msg->msg_iov->iov_base = &buffer[offset];
-        msg->msg_iov->iov_len  = msg->msg_iov->iov_len - offset;
-      }
-    }
-  }
-  if (nbytes <=0)  msgSize = nbytes;
-  return msgSize;
-}
 //
 // Write large data to TCP. .
 //
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec,
                             size_t msg_iov_len, int flags) {
-    celixThreadRwlock_readLock(&handle->dbLock);
     int result = 0;
+    if (handle == NULL) {
+        return -1;
+    }
     int connFdCloseQueue[hashMap_size(handle->connection_fd_map)];
     int nofConnToClose = 0;
     if (handle) {
+        celixThreadRwlock_readLock(&handle->dbLock);
         hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map);
+        size_t max_msg_iov_len = IOV_MAX - 2; // header , footer, padding
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if (!entry->connected) continue;
+            if (!entry->connected) {
+                continue;
+            }
+            // When maxMsgSize is zero then payloadSize is disabled
+            if (entry->maxMsgSize == 0) {
+                // if max msg size is set to zero nothing will be send
+                continue;
+            }
+            celixThreadMutex_lock(&entry->writeMutex);
             void *payloadData = NULL;
             size_t payloadSize = 0;
             if (msg_iov_len == 1) {
@@ -1027,6 +1011,16 @@
                     payloadSize += msgIoVec[i].iov_len;
                 }
             }
+
+            // check if message is not too large
+            bool isMessageSegmentationSupported = false;
+            handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported);
+            if (!isMessageSegmentationSupported && (msg_iov_len > max_msg_iov_len || payloadSize > entry->maxMsgSize)) {
+                L_WARN("[TCP Socket] Failed to send message (fd: %d), Message segmentation is not supported\n", entry->fd);
+                celixThreadMutex_unlock(&entry->writeMutex);
+                continue;
+            }
+
             message->header.convertEndianess = 0;
             message->header.payloadSize = payloadSize;
             message->header.payloadPartSize = payloadSize;
@@ -1036,130 +1030,176 @@
             void *metadataData = NULL;
             size_t metadataSize = 0;
             if (message->metadata.metadata) {
-                metadataData = entry->metaBuffer;
-                handle->protocol->encodeMetadata(handle->protocol->handle, message,
-                                                 &metadataData,
-                                                 &metadataSize);
-                entry->metaBufferSize = metadataSize;
+                metadataSize = entry->writeMetaBufferSize;
+                metadataData = entry->writeMetaBuffer;
+                // When maxMsgSize is smaller then meta data is disabled
+                if (metadataSize > entry->maxMsgSize) {
+                    metadataSize = 0;
+                }
+                handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize);
             }
+
             message->header.metadataSize = metadataSize;
+            size_t totalMsgSize = payloadSize + metadataSize;
 
-            void *footerData = NULL;
-            size_t footerDataSize = 0;
-            if (entry->footerSize) {
-                footerData = entry->footerBuffer;
-                handle->protocol->encodeFooter(handle->protocol->handle, message,
-                                                 &footerData,
-                                                 &footerDataSize);
-                entry->footerSize = footerDataSize;
-            }
+            size_t sendMsgSize = 0;
+            size_t msgPayloadOffset = 0;
+            size_t msgIovOffset     = 0;
+            bool allPayloadAdded = (payloadSize == 0);
+            long int nbytes = LONG_MAX;
+            while (sendMsgSize < totalMsgSize && nbytes > 0) {
+                struct msghdr msg;
+                struct iovec msg_iov[IOV_MAX];
+                memset(&msg, 0x00, sizeof(struct msghdr));
+                msg.msg_name = &entry->addr;
+                msg.msg_namelen = entry->len;
+                msg.msg_flags = flags;
+                msg.msg_iov = msg_iov;
 
-            size_t msgSize = 0;
-            struct msghdr msg;
-            struct iovec msg_iov[IOV_MAX];
-            memset(&msg, 0x00, sizeof(struct msghdr));
-            msg.msg_name = &entry->addr;
-            msg.msg_namelen = entry->len;
-            msg.msg_flags = flags;
-            msg.msg_iov = msg_iov;
+                size_t msgPartSize = 0;
+                message->header.payloadPartSize = 0;
+                message->header.payloadOffset = 0;
+                message->header.metadataSize = 0;
+                message->header.isLastSegment = 0;
 
-            // Write generic seralized payload in vector buffer
-            if (payloadSize && payloadData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = payloadData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = payloadSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            } else {
-                // copy serialized vector into vector buffer
-                for (size_t i = 0; i < MIN(msg_iov_len, IOV_MAX - 2); i++) {
+                size_t protocolHeaderBufferSize = 0;
+                // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints)
+                handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize);
+                size_t footerSize = 0;
+                // Get size of the Protocol Footer
+                handle->protocol->getFooterSize(handle->protocol->handle, &footerSize);
+                size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize;
+
+                // reserve space for the header if required, header is added later when size of message is known (message can split in parts)
+                if (protocolHeaderBufferSize) {
                     msg.msg_iovlen++;
-                    msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
-                    msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
-                    msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                }
+                // Write generic seralized payload in vector buffer
+                if (!allPayloadAdded) {
+                    if (payloadSize && payloadData && maxMsgSize) {
+                        char *buffer = payloadData;
+                        msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadOffset];
+                        msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadOffset), maxMsgSize);
+                        msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                        msg.msg_iovlen++;
+
+                    } else {
+                        // copy serialized vector into vector buffer
+                        size_t i;
+                        for (i = msgIovOffset; i < MIN(msg_iov_len, msgIovOffset + max_msg_iov_len); i++) {
+                            if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) {
+                                break;
+                            }
+                            msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base;
+                            msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len;
+                            msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len;
+                            msg.msg_iovlen++;
+                        }
+                        // if no entry could be added
+                        if (i == msgIovOffset) {
+                            // TODO element can be split in parts?
+                            L_ERROR("[TCP Socket] vector io element is larger than max msg size");
+                            break;
+                        }
+                        msgIovOffset = i;
+                    }
+                    message->header.payloadPartSize = msgPartSize;
+                    message->header.payloadOffset   = msgPayloadOffset;
+                    msgPayloadOffset += message->header.payloadPartSize;
+                    sendMsgSize = msgPayloadOffset;
+                    allPayloadAdded= msgPayloadOffset >= payloadSize;
+                }
+
+                // Write optional metadata in vector buffer
+                if (allPayloadAdded &&
+                    (metadataSize != 0 && metadataData) &&
+                    (msgPartSize < maxMsgSize) &&
+                    (msg.msg_iovlen-1 < max_msg_iov_len)) {  // header is already included
+                    msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
+                    msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
+                    msg.msg_iovlen++;
+                    msgPartSize += metadataSize;
+                    message->header.metadataSize = metadataSize;
+                    sendMsgSize += metadataSize;
+                }
+                if (sendMsgSize >= totalMsgSize) {
+                    message->header.isLastSegment = 0x1;
+                }
+
+                void *headerData = NULL;
+                size_t headerSize = 0;
+                // Get HeaderSize of the Protocol Header
+                handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize);
+
+                // check if header is not part of the payload (=> headerBufferSize = 0)
+                if (protocolHeaderBufferSize) {
+                    headerData = entry->writeHeaderBuffer;
+                    // Encode the header, with payload size and metadata size
+                    handle->protocol->encodeHeader(handle->protocol->handle, message, &headerData, &headerSize);
+                    entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize);
+                    if (headerData && entry->writeHeaderBuffer != headerData) {
+                        entry->writeHeaderBuffer = headerData;
+                    }
+                    if (headerSize && headerData) {
+                        // Write header in 1st vector buffer item
+                        msg.msg_iov[0].iov_base = headerData;
+                        msg.msg_iov[0].iov_len = headerSize;
+                        msgPartSize += msg.msg_iov[0].iov_len;
+                    } else {
+                        L_ERROR("[TCP Socket] No header buffer is generated");
+                        break;
+                    }
+                }
+
+                void *footerData = NULL;
+                // Write optional footerData in vector buffer
+                if (footerSize) {
+                    footerData = entry->writeFooterBuffer;
+                    handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerSize);
+                    if (footerData && entry->writeFooterBuffer != footerData) {
+                        entry->writeFooterBuffer = footerData;
+                        entry->writeFooterBufferSize = footerSize;
+                    }
+                    if (footerData) {
+                        msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
+                        msg.msg_iov[msg.msg_iovlen].iov_len  = footerSize;
+                        msg.msg_iovlen++;
+                        msgPartSize += footerSize;
+                    }
+                }
+                nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL);
+
+                //  When a specific socket keeps reporting errors can indicate a subscriber
+                //  which is not active anymore, the connection will remain until the retry
+                //  counter exceeds the maximum retry count.
+                //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
+                if (nbytes == -1) {
+                    if (entry->retryCount < handle->maxSendRetryCount) {
+                        entry->retryCount++;
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.",
+                            entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno));
+                    } else {
+                        L_ERROR(
+                            "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno));
+                        connFdCloseQueue[nofConnToClose++] = entry->fd;
+                    }
+                    result = -1; //At least one connection failed sending
+                } else if (msgPartSize) {
+                    entry->retryCount = 0;
+                    if (nbytes != msgPartSize) {
+                        L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgPartSize, nbytes, strerror(errno));
+                    }
+                }
+                // Note: serialized Payload is deleted by serializer
+                if (payloadData && (payloadData != message->payload.payload)) {
+                    free(payloadData);
                 }
             }
-
-            // Write optional metadata in vector buffer
-            if (metadataSize && metadataData) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = metadataData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
-
-            // Write optional footerData in vector buffer
-            if (footerData && footerDataSize) {
-                msg.msg_iovlen++;
-                msg.msg_iov[msg.msg_iovlen].iov_base = footerData;
-                msg.msg_iov[msg.msg_iovlen].iov_len = footerDataSize;
-                msgSize += msg.msg_iov[msg.msg_iovlen].iov_len;
-            }
-
-            void *headerData = NULL;
-            size_t headerSize = 0;
-            // check if header is not part of the payload (=> headerBufferSize = 0)s
-            if (entry->headerBufferSize) {
-              headerData = entry->headerBuffer;
-              // Encode the header, with payload size and metadata size
-              handle->protocol->encodeHeader(handle->protocol->handle, message,
-                                             &headerData,
-                                             &headerSize);
-              entry->headerBufferSize = headerSize;
-            }
-            if (!entry->headerBufferSize) {
-              // Skip header buffer, when header is part of payload;
-              msg.msg_iov = &msg_iov[1];
-            } else if (headerSize && headerData) {
-              // Write header in 1st vector buffer item
-                msg.msg_iov[0].iov_base = headerData;
-                msg.msg_iov[0].iov_len = headerSize;
-                msgSize += msg.msg_iov[0].iov_len;
-                msg.msg_iovlen++;
-            } else {
-              L_ERROR("[TCP Socket] No header buffer is generated");
-              msg.msg_iovlen = 0;
-            }
-            long int nbytes = pubsub_tcpHandler_writeSocket(handle, entry, &msg, msgSize, flags);
-            //  When a specific socket keeps reporting errors can indicate a subscriber
-            //  which is not active anymore, the connection will remain until the retry
-            //  counter exceeds the maximum retry count.
-            //  Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error.
-            if (nbytes == -1) {
-                if (entry->retryCount < handle->maxSendRetryCount) {
-                    entry->retryCount++;
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d), error: %s. try again. Retry count %u of %u, ",
-                        entry->fd, strerror(errno), entry->retryCount, handle->maxSendRetryCount);
-                } else {
-                    L_ERROR(
-                        "[TCP Socket] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s",
-                        entry->fd, handle->maxSendRetryCount, strerror(errno));
-                    connFdCloseQueue[nofConnToClose++] = entry->fd;
-                }
-                result = -1; //At least one connection failed sending
-            } else if (msgSize) {
-                entry->retryCount = 0;
-                if (nbytes != msgSize) {
-                    L_ERROR("[TCP Socket] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgSize, nbytes,  strerror(errno));
-                }
-            }
-            // Release data
-            if (headerData && headerData != entry->headerBuffer) {
-                free(headerData);
-            }
-            // Note: serialized Payload is deleted by serializer
-            if (payloadData && (payloadData != message->payload.payload)) {
-                free(payloadData);
-            }
-            if (metadataData && metadataData != entry->metaBuffer) {
-                free(metadataData);
-            }
-            if (footerData && footerData != entry->footerBuffer) {
-                free(footerData);
-            }
+            celixThreadMutex_unlock(&entry->writeMutex);
         }
+        celixThreadRwlock_unlock(&handle->dbLock);
     }
-    celixThreadRwlock_unlock(&handle->dbLock);
     //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock
     for (int i = 0; i < nofConnToClose; i++) {
         pubsub_tcpHandler_close(handle, connFdCloseQueue[i]);
@@ -1171,12 +1211,12 @@
 // get interface URL
 //
 char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle) {
-    hash_map_iterator_t interface_iter =
+    hash_map_iterator_t iter =
         hashMapIterator_construct(handle->interface_url_map);
     char *url = NULL;
-    while (hashMapIterator_hasNext(&interface_iter)) {
+    while (hashMapIterator_hasNext(&iter)) {
         psa_tcp_connection_entry_t *entry =
-            hashMapIterator_nextValue(&interface_iter);
+            hashMapIterator_nextValue(&iter);
         if (entry && entry->url) {
             if (!url) {
                 url = celix_utils_strdup(entry->url);
@@ -1189,6 +1229,34 @@
     }
     return url;
 }
+//
+// get interface URL
+//
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle) {
+    celixThreadRwlock_writeLock(&handle->dbLock);
+    hash_map_iterator_t iter =
+            hashMapIterator_construct(handle->connection_url_map);
+    char *url = NULL;
+    while (hashMapIterator_hasNext(&iter)) {
+        psa_tcp_connection_entry_t *entry =
+                hashMapIterator_nextValue(&iter);
+        if (entry && entry->url) {
+            if (!url) {
+                pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+                url = celix_utils_strdup(url_info->interface_url ? url_info->interface_url : entry->url);
+                pubsub_utils_url_free(url_info);
+            } else {
+                char *tmp = url;
+                pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url);
+                asprintf(&url, "%s %s", tmp, url_info->interface_url ? url_info->interface_url : entry->url);
+                pubsub_utils_url_free(url_info);
+                free(tmp);
+            }
+        }
+    }
+    celixThreadRwlock_unlock(&handle->dbLock);
+    return url;
+}
 
 //
 // Handle non-blocking accept (sender)
@@ -1218,7 +1286,8 @@
 #else
         struct epoll_event event;
         bzero(&event, sizeof(event)); // zero the struct
-        event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
+        event.events = EPOLLRDHUP | EPOLLERR;
+        if (handle->enableReceiveEvent) event.events |= EPOLLIN;
         event.data.fd = entry->fd;
         // Register Read to epoll
         rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event);
@@ -1321,7 +1390,7 @@
     if (handle->efd >= 0) {
         int nof_events = 0;
         struct epoll_event events[MAX_EVENTS];
-        nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, handle->timeout);
+        nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, (int)handle->timeout);
         if (nof_events < 0) {
             if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
             } else
@@ -1357,7 +1426,6 @@
             }
         }
     }
-    return;
 }
 #endif
 
@@ -1377,4 +1445,4 @@
         celixThreadRwlock_unlock(&handle->dbLock);
     } // while
     return NULL;
-}
\ No newline at end of file
+}
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
index ed4581c..2d97634 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_handler.h
@@ -58,15 +58,14 @@
 int pubsub_tcpHandler_connect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_disconnect(pubsub_tcpHandler_t *handle, char *url);
 int pubsub_tcpHandler_listen(pubsub_tcpHandler_t *handle, char *url);
-
-int pubsub_tcpHandler_createReceiveBufferStore(pubsub_tcpHandler_t *handle,
-                                               unsigned int maxNofBuffers,
-                                               unsigned int bufferSize);
+int pubsub_tcpHandler_setReceiveBufferSize(pubsub_tcpHandler_t *handle, unsigned int size);
+int pubsub_tcpHandler_setMaxMsgSize(pubsub_tcpHandler_t *handle, unsigned int size);
 void pubsub_tcpHandler_setTimeout(pubsub_tcpHandler_t *handle, unsigned int timeout);
 void pubsub_tcpHandler_setSendRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
 void pubsub_tcpHandler_setReceiveRetryCnt(pubsub_tcpHandler_t *handle, unsigned int count);
 void pubsub_tcpHandler_setSendTimeOut(pubsub_tcpHandler_t *handle, double timeout);
 void pubsub_tcpHandler_setReceiveTimeOut(pubsub_tcpHandler_t *handle, double timeout);
+void pubsub_tcpHandler_enableReceiveEvent(pubsub_tcpHandler_t *handle, bool enable);
 
 int pubsub_tcpHandler_read(pubsub_tcpHandler_t *handle, int fd);
 int pubsub_tcpHandler_write(pubsub_tcpHandler_t *handle,
@@ -86,6 +85,7 @@
                                                   pubsub_tcpHandler_acceptConnectMessage_callback_t connectMessageCallback,
                                                   pubsub_tcpHandler_acceptConnectMessage_callback_t disconnectMessageCallback);
 char *pubsub_tcpHandler_get_interface_url(pubsub_tcpHandler_t *handle);
+char *pubsub_tcpHandler_get_connection_url(pubsub_tcpHandler_t *handle);
 void pubsub_tcpHandler_setThreadPriority(pubsub_tcpHandler_t *handle, long prio, const char *sched);
 void pubsub_tcpHandler_setThreadName(pubsub_tcpHandler_t *handle, const char *topic, const char *scope);
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
index 4fa4586..b1b2c58 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.c
@@ -24,23 +24,19 @@
 #include <pubsub/subscriber.h>
 #include <memory.h>
 #include <pubsub_constants.h>
-#include <assert.h>
-#include <pubsub_endpoint.h>
 #include <arpa/inet.h>
 #include <celix_log_helper.h>
-#include <math.h>
 #include "pubsub_tcp_handler.h"
 #include "pubsub_tcp_topic_receiver.h"
 #include "pubsub_psa_tcp_constants.h"
 #include "pubsub_tcp_common.h"
 
-#include "celix_utils_api.h"
 #include <uuid/uuid.h>
 #include <pubsub_admin_metrics.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 #include <celix_api.h>
 
-#define MAX_EPOLL_EVENTS     16
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN  37
 #endif
@@ -65,8 +61,10 @@
     char *topic;
     size_t timeout;
     bool metricsEnabled;
+    bool isPassive;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     struct {
         celix_thread_t thread;
@@ -107,7 +105,6 @@
     double averageDelayInSeconds;
     double maxDelayInSeconds;
     double minDelayInSeconds;
-    unsigned int lastSeqNr;
     unsigned long nrOfMissingSeqNumbers;
 } psa_tcp_subscriber_metrics_entry_t;
 
@@ -118,24 +115,14 @@
     bool initialized; //true if the init function is called through the receive thread
 } psa_tcp_subscriber_entry_t;
 
-static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                  const celix_bundle_t *owner);
-
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
-                                                     const celix_bundle_t *owner);
-
+static void pubsub_tcpTopicReceiver_addSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props, const celix_bundle_t *owner);
 static void *psa_tcp_recvThread(void *data);
-
 static void psa_tcp_connectToAllRequestedConnections(pubsub_tcp_topic_receiver_t *receiver);
-
 static void psa_tcp_initializeAllSubscribers(pubsub_tcp_topic_receiver_t *receiver);
-
 static void processMsg(void *handle, const pubsub_protocol_message_t *hdr, bool *release, struct timespec *receiveTime);
-
 static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
-
 static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-
 static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
 
 pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -143,7 +130,7 @@
                                                             const char *scope,
                                                             const char *topic,
                                                             const celix_properties_t *topicProperties,
-                                                            pubsub_tcp_endPointStore_t *endPointStore,
+                                                            pubsub_tcp_endPointStore_t *handlerStore,
                                                             long serializerSvcId,
                                                             pubsub_serializer_service_t *serializer,
                                                             long protocolSvcId,
@@ -157,52 +144,45 @@
     receiver->protocol = protocol;
     receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
     receiver->topic = strndup(topic, 1024 * 1024);
-    bool isServerEndPoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
+    const char *staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
-    /* Check if it's a static endpoint */
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
-    const char *staticConnectUrls = NULL;
-
-    staticConnectUrls = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_CONNECT_URLS_FOR, topic, scope);
-
+    if (isPassive) {
+        receiver->isPassive = psa_tcp_isPassive(isPassive);
+    }
     if (topicProperties != NULL) {
         if(staticConnectUrls == NULL) {
             staticConnectUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
         }
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = staticConnectUrls;
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_BIND_URL, NULL);
-                isServerEndPoint = true;
-            }
+        if (isPassive == NULL) {
+            receiver->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
+
     // Set receiver connection thread timeout.
     // property is in ms, timeout value in us. (convert ms to us).
     receiver->timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_SUBSCRIBER_CONNECTION_TIMEOUT,
                                                               PSA_TCP_SUBSCRIBER_CONNECTION_DEFAULT_TIMEOUT) * 1000;
     /* When it's an endpoint share the socket with the sender */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticServerEndPointUrls) ? staticServerEndPointUrls : staticClientEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (receiver->socketHandler == NULL)
                 receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
             entry = receiver->socketHandler;
             receiver->sharedSocketHandler = receiver->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             receiver->socketHandler = entry;
             receiver->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         receiver->socketHandler = pubsub_tcpHandler_create(receiver->protocol, receiver->logHelper);
     }
@@ -213,14 +193,12 @@
         long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_KEY,
                                                    PUBSUB_TCP_SUBSCRIBER_RETRY_CNT_DEFAULT);
         double rcvTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_KEY, PUBSUB_TCP_SUBSCRIBER_RCVTIMEO_DEFAULT);
-        long sessions = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_MAX_RECV_SESSIONS,
-                                                              PSA_TCP_DEFAULT_MAX_RECV_SESSIONS);
-        long buffer_size = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
+        long bufferSize = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_RECV_BUFFER_SIZE,
                                                                  PSA_TCP_DEFAULT_RECV_BUFFER_SIZE);
         long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+
         pubsub_tcpHandler_setThreadName(receiver->socketHandler, topic, scope);
-        pubsub_tcpHandler_createReceiveBufferStore(receiver->socketHandler, (unsigned int) sessions,
-                                                   (unsigned int) buffer_size);
+        pubsub_tcpHandler_setReceiveBufferSize(receiver->socketHandler, (unsigned int) bufferSize);
         pubsub_tcpHandler_setTimeout(receiver->socketHandler, (unsigned int) timeout);
         pubsub_tcpHandler_addMessageHandler(receiver->socketHandler, receiver, processMsg);
         pubsub_tcpHandler_addReceiverConnectionCallback(receiver->socketHandler, receiver, psa_tcp_connectHandler,
@@ -230,8 +208,7 @@
         pubsub_tcpHandler_setReceiveTimeOut(receiver->socketHandler, rcvTimeout);
     }
     receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                     PSA_TCP_DEFAULT_METRICS_ENABLED);
-
+                                                                          PSA_TCP_DEFAULT_METRICS_ENABLED);
     celixThreadMutex_create(&receiver->subscribers.mutex, NULL);
     celixThreadMutex_create(&receiver->requestedConnections.mutex, NULL);
     celixThreadMutex_create(&receiver->thread.mutex, NULL);
@@ -239,7 +216,7 @@
     receiver->subscribers.map = hashMap_create(NULL, NULL, NULL, NULL);
     receiver->requestedConnections.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
 
-    if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (staticServerEndPointUrls == NULL)) {
+    if ((staticConnectUrls != NULL) && (receiver->socketHandler != NULL) && (!receiver->isPassive)) {
         char *urlsCopy = strndup(staticConnectUrls, 1024 * 1024);
         char *url;
         char *save = urlsCopy;
@@ -255,7 +232,7 @@
         free(urlsCopy);
     }
 
-    if (receiver->socketHandler != NULL && (!isServerEndPoint)) {
+    if (receiver->socketHandler != NULL && (!receiver->isPassive)) {
         // Configure Receiver thread
         receiver->thread.running = true;
         celixThread_create(&receiver->thread.thread, NULL, psa_tcp_recvThread, receiver);
@@ -310,15 +287,14 @@
             if (entry != NULL) {
                 receiver->serializer->destroySerializerMap(receiver->serializer->handle, entry->msgTypes);
                 hashMap_destroy(entry->subscriberServices, false, false);
+                hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
+                while (hashMapIterator_hasNext(&iter2)) {
+                    hash_map_t *origins = hashMapIterator_nextValue(&iter2);
+                    hashMap_destroy(origins, true, true);
+                }
+                hashMap_destroy(entry->metrics, false, false);
                 free(entry);
             }
-
-            hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
-            while (hashMapIterator_hasNext(&iter2)) {
-                hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-                hashMap_destroy(origins, true, true);
-            }
-            hashMap_destroy(entry->metrics, false, false);
         }
         hashMap_destroy(receiver->subscribers.map, false, false);
 
@@ -346,7 +322,7 @@
             pubsub_tcpHandler_destroy(receiver->socketHandler);
             receiver->socketHandler = NULL;
         }
-
+        pubsubInterceptorsHandler_destroy(receiver->interceptorsHandler);
         if (receiver->scope != NULL) {
             free(receiver->scope);
         }
@@ -374,20 +350,36 @@
 void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver, celix_array_list_t *connectedUrls,
                                              celix_array_list_t *unconnectedUrls) {
     celixThreadMutex_lock(&receiver->requestedConnections.mutex);
-    hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
-    while (hashMapIterator_hasNext(&iter)) {
-        psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+    if (receiver->isPassive) {
+        char* interface_url = pubsub_tcpHandler_get_interface_url(receiver->socketHandler);
         char *url = NULL;
-        asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
-        if (entry->connected) {
+        asprintf(&url, "%s (passive)", interface_url ? interface_url : "");
+        if (interface_url) {
             celix_arrayList_add(connectedUrls, url);
         } else {
             celix_arrayList_add(unconnectedUrls, url);
         }
+        free(interface_url);
+    } else {
+        hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
+            char *url = NULL;
+            asprintf(&url, "%s%s", entry->url, entry->statically ? " (static)" : "");
+            if (entry->connected) {
+                celix_arrayList_add(connectedUrls, url);
+            } else {
+                celix_arrayList_add(unconnectedUrls, url);
+            }
+        }
     }
     celixThreadMutex_unlock(&receiver->requestedConnections.mutex);
 }
 
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *receiver) {
+    return receiver->isPassive;
+}
+
 void pubsub_tcpTopicReceiver_connectTo(
     pubsub_tcp_topic_receiver_t *receiver,
     const char *url) {
@@ -466,8 +458,7 @@
 
         hashMap_put(entry->subscriberServices, (void*)svcId, svc);
 
-        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd,
-                                                           &entry->msgTypes);
+        int rc = receiver->serializer->createSerializerMap(receiver->serializer->handle, (celix_bundle_t *) bnd, &entry->msgTypes);
 
         if (rc == 0) {
             entry->metrics = hashMap_create(NULL, NULL, NULL, NULL);
@@ -491,14 +482,13 @@
     celixThreadMutex_unlock(&receiver->subscribers.mutex);
 }
 
-static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc, const celix_properties_t *props,
+static void pubsub_tcpTopicReceiver_removeSubscriber(void *handle, void *svc __attribute__((unused)), const celix_properties_t *props,
                                                      const celix_bundle_t *bnd) {
     pubsub_tcp_topic_receiver_t *receiver = handle;
 
     long bndId = celix_bundle_getId(bnd);
     long svcId = celix_properties_getAsLong(props, OSGI_FRAMEWORK_SERVICE_ID, -1);
 
-
     celixThreadMutex_lock(&receiver->subscribers.mutex);
     psa_tcp_subscriber_entry_t *entry = hashMap_get(receiver->subscribers.map, (void *) bndId);
     if (entry != NULL) {
@@ -527,7 +517,7 @@
 
 static inline void
 processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
-                             const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime) {
+                             const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
     //NOTE receiver->subscribers.mutex locked
     pubsub_msg_serializer_t *msgSer = hashMap_get(entry->msgTypes, (void *) (uintptr_t) (message->header.msgId));
     bool monitor = receiver->metricsEnabled;
@@ -559,31 +549,39 @@
             }
 
             if (status == CELIX_SUCCESS) {
-                hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                const char *msgType = msgSer->msgName;
+                uint32_t msgId = message->header.msgId;
+                celix_properties_t *metadata = message->metadata.metadata;
+                bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
                 bool release = true;
-                while (hashMapIterator_hasNext(&iter)) {
-                    pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
-                    svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata,
-                                 &release);
-                    if (!release && hashMapIterator_hasNext(&iter)) {
-                        //receive function has taken ownership and still more receive function to come ..
-                        //deserialize again for new message
-                        status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
-                        if (status != CELIX_SUCCESS) {
-                            L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
-                                   receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
-                            break;
+                if (cont) {
+                    hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
+                    while (hashMapIterator_hasNext(&iter)) {
+                        pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
+                        svc->receive(svc->handle, msgSer->msgName, msgSer->msgId, deSerializedMsg, message->metadata.metadata, &release);
+                        pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+                        if (!release && hashMapIterator_hasNext(&iter)) {
+                            //receive function has taken ownership and still more receive function to come ..
+                            //deserialize again for new message
+                            status = msgSer->deserialize(msgSer->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+                            if (status != CELIX_SUCCESS) {
+                                L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
+                                       msgSer->msgName,
+                                       receiver->scope == NULL ? "(null)" : receiver->scope,
+                                       receiver->topic);
+                                break;
+                            }
+                            release = true;
                         }
-                        release = true;
                     }
+                    if (release) {
+                        msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
+                    }
+                    if (message->metadata.metadata) {
+                        celix_properties_destroy(message->metadata.metadata);
+                    }
+                    updateReceiveCount += 1;
                 }
-                if (release) {
-                    msgSer->freeDeserializeMsg(msgSer->handle, deSerializedMsg);
-                }
-                if (message->metadata.metadata) {
-                    celix_properties_destroy(message->metadata.metadata);
-                }
-                updateReceiveCount += 1;
             } else {
                 updateSerError += 1;
                 L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgSer->msgName,
@@ -650,10 +648,7 @@
 
 pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver) {
     pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
-    snprintf(result->scope,
-             PUBSUB_AMDIN_METRICS_NAME_MAX,
-             "%s",
-             receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
+    snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
     snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
 
     int msgTypesCount = 0;
@@ -677,8 +672,7 @@
         hash_map_iterator_t iter2 = hashMapIterator_construct(entry->metrics);
         while (hashMapIterator_hasNext(&iter2)) {
             hash_map_t *origins = hashMapIterator_nextValue(&iter2);
-            result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins),
-                                                 sizeof(*(result->msgTypes[i].origins)));
+            result->msgTypes[i].origins = calloc((size_t) hashMap_size(origins), sizeof(*(result->msgTypes[i].origins)));
             result->msgTypes[i].nrOfOrigins = hashMap_size(origins);
             int k = 0;
             hash_map_iterator_t iter3 = hashMapIterator_construct(origins);
@@ -694,10 +688,8 @@
                     result->msgTypes[i].origins[k].averageDelayInSeconds = metrics->averageDelayInSeconds;
                     result->msgTypes[i].origins[k].maxDelayInSeconds = metrics->maxDelayInSeconds;
                     result->msgTypes[i].origins[k].minDelayInSeconds = metrics->minDelayInSeconds;
-                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds =
-                        metrics->averageTimeBetweenMessagesInSeconds;
-                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds =
-                        metrics->averageSerializationTimeInSeconds;
+                    result->msgTypes[i].origins[k].averageTimeBetweenMessagesInSeconds = metrics->averageTimeBetweenMessagesInSeconds;
+                    result->msgTypes[i].origins[k].averageSerializationTimeInSeconds = metrics->averageSerializationTimeInSeconds;
                     result->msgTypes[i].origins[k].lastMessageReceived = metrics->lastMessageReceived;
                     result->msgTypes[i].origins[k].nrOfMissingSeqNumbers = metrics->nrOfMissingSeqNumbers;
 
@@ -721,7 +713,7 @@
         hash_map_iterator_t iter = hashMapIterator_construct(receiver->requestedConnections.map);
         while (hashMapIterator_hasNext(&iter)) {
             psa_tcp_requested_connection_entry_t *entry = hashMapIterator_nextValue(&iter);
-            if ((entry) && (!entry->connected)) {
+            if ((entry) && (!entry->connected) && (!receiver->isPassive)) {
                 int rc = pubsub_tcpHandler_connect(entry->parent->socketHandler, entry->url);
                 if (rc < 0) {
                     allConnected = false;
@@ -812,12 +804,11 @@
 
     int versionMajor;
     int versionMinor;
-    if (msgVersion != NULL) {
+    if (msgVersion!=NULL) {
         version_getMajor(msgVersion, &versionMajor);
         version_getMinor(msgVersion, &versionMinor);
-        if (major == ((unsigned char) versionMajor)) { /* Different major means incompatible */
-            check = (minor >=
-                ((unsigned char) versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
+        if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
+            check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
         }
     }
 
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
index 50d5a97..118bf11 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_receiver.h
@@ -32,7 +32,7 @@
                                                             const char *scope,
                                                             const char *topic,
                                                             const celix_properties_t *topicProperties,
-                                                            pubsub_tcp_endPointStore_t *endPointStore,
+                                                            pubsub_tcp_endPointStore_t *handlerStore,
                                                             long serializerSvcId,
                                                             pubsub_serializer_service_t *serializer,
                                                             long protocolSvcId,
@@ -47,6 +47,7 @@
 void pubsub_tcpTopicReceiver_listConnections(pubsub_tcp_topic_receiver_t *receiver,
                                              celix_array_list_t *connectedUrls,
                                              celix_array_list_t *unconnectedUrls);
+bool pubsub_tcpTopicReceiver_isPassive(pubsub_tcp_topic_receiver_t *sender);
 
 void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
 void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
index dbb5e26..b287ebd 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.c
@@ -33,10 +33,9 @@
 #include "pubsub_tcp_common.h"
 #include <uuid/uuid.h>
 #include "celix_constants.h"
-#include <signal.h>
 #include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
 
-#define FIRST_SEND_DELAY_IN_SECONDS              2
 #define TCP_BIND_MAX_RETRY                      10
 
 #define L_DEBUG(...) \
@@ -59,13 +58,15 @@
     bool metricsEnabled;
     pubsub_tcpHandler_t *socketHandler;
     pubsub_tcpHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
 
     char *scope;
     char *topic;
     char *url;
     bool isStatic;
-
+    bool isPassive;
     bool verbose;
+    unsigned long send_delay;
 
     struct {
         long svcId;
@@ -128,7 +129,7 @@
     const char *scope,
     const char *topic,
     const celix_properties_t *topicProperties,
-    pubsub_tcp_endPointStore_t *endPointStore,
+    pubsub_tcp_endPointStore_t *handlerStore,
     long serializerSvcId,
     pubsub_serializer_service_t *ser,
     long protocolSvcId,
@@ -144,52 +145,44 @@
     if (uuid != NULL) {
         uuid_parse(uuid, sender->fwUUID);
     }
-    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED,
-                                                                   PSA_TCP_DEFAULT_METRICS_ENABLED);
-    bool isEndpoint = false;
+    pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
+    sender->isPassive = false;
+    sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_TCP_METRICS_ENABLED, PSA_TCP_DEFAULT_METRICS_ENABLED);
     char *urls = NULL;
     const char *ip = celix_bundleContext_getProperty(ctx, PUBSUB_TCP_PSA_IP_KEY, NULL);
-    const char *discUrl = NULL;
-    const char *staticClientEndPointUrls = NULL;
-    const char *staticServerEndPointUrls = NULL;
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_PASSIVE_SELECTION_KEY, topic, scope);
 
-    discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, PUBSUB_TCP_STATIC_BIND_URL_FOR, topic, scope);
-
+    if (isPassive) {
+        sender->isPassive = psa_tcp_isPassive(isPassive);
+    }
     if (topicProperties != NULL) {
         if (discUrl == NULL) {
             discUrl = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_DISCOVER_URL, NULL);
         }
-        /* Check if it's a static endpoint */
-        const char *endPointType = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_ENDPOINT_TYPE, NULL);
-        if (endPointType != NULL) {
-            isEndpoint = true;
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_CLIENT)) == 0) {
-                staticClientEndPointUrls = celix_properties_get(topicProperties, PUBSUB_TCP_STATIC_CONNECT_URLS, NULL);
-            }
-            if (strncmp(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER, endPointType,
-                        strlen(PUBSUB_TCP_STATIC_ENDPOINT_TYPE_SERVER)) == 0) {
-                staticServerEndPointUrls = discUrl;
-            }
+        if (isPassive == NULL) {
+            sender->isPassive = celix_properties_getAsBool(topicProperties, PUBSUB_TCP_PASSIVE_CONFIGURED, false);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, PUBSUB_TCP_PASSIVE_KEY, NULL);
         }
     }
-
     /* When it's an endpoint share the socket with the receiver */
-    if ((staticClientEndPointUrls != NULL) || (staticServerEndPointUrls)) {
-        celixThreadMutex_lock(&endPointStore->mutex);
-        const char *endPointUrl = (staticClientEndPointUrls) ? staticClientEndPointUrls : staticServerEndPointUrls;
-        pubsub_tcpHandler_t *entry = hashMap_get(endPointStore->map, endPointUrl);
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_tcpHandler_t *entry = hashMap_get(handlerStore->map, passiveKey);
         if (entry == NULL) {
             if (sender->socketHandler == NULL)
                 sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
             entry = sender->socketHandler;
             sender->sharedSocketHandler = sender->socketHandler;
-            hashMap_put(endPointStore->map, (void *) endPointUrl, entry);
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
         } else {
             sender->socketHandler = entry;
             sender->sharedSocketHandler = entry;
         }
-        celixThreadMutex_unlock(&endPointStore->mutex);
+        celixThreadMutex_unlock(&handlerStore->mutex);
     } else {
         sender->socketHandler = pubsub_tcpHandler_create(sender->protocol, sender->logHelper);
     }
@@ -197,66 +190,66 @@
     if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
         long prio = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_THREAD_REALTIME_PRIO, -1L);
         const char *sched = celix_properties_get(topicProperties, PUBSUB_TCP_THREAD_REALTIME_SCHED, NULL);
-        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY,
-                                                   PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
-        double timeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY,
-                                                                       (!isEndpoint) ? PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT :
-                                                                                       PUBSUB_TCP_PUBLISHER_SNDTIMEO_ENDPOINT_DEFAULT);
+        long retryCnt = celix_properties_getAsLong(topicProperties, PUBSUB_TCP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_TCP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, PUBSUB_TCP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_TCP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, PSA_TCP_MAX_MESSAGE_SIZE, PSA_TCP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, PSA_TCP_TIMEOUT, PSA_TCP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  PUBSUB_UTILS_PSA_SEND_DELAY, PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY);
         pubsub_tcpHandler_setThreadName(sender->socketHandler, topic, scope);
         pubsub_tcpHandler_setThreadPriority(sender->socketHandler, prio, sched);
         pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
-        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, timeout);
+        pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
+        // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+        // Because the topic receiver is already started, enable the receive event.
+        pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
+        pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
     }
 
-    //setting up tcp socket for TCP TopicSender
-    if (staticClientEndPointUrls != NULL) {
-        // Store url for client static endpoint
-        sender->url = strndup(staticClientEndPointUrls, 1024 * 1024);
-        sender->isStatic = true;
-    } else if (discUrl != NULL) {
-        urls = strndup(discUrl, 1024 * 1024);
-        sender->isStatic = true;
-    } else if (ip != NULL) {
-        urls = strndup(ip, 1024 * 1024);
-    } else {
-        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
-        urls = pubsub_utils_url_get_url(sin, NULL);
-        free(sin);
-    }
-    if (!sender->url) {
-        char *urlsCopy = strndup(urls, 1024 * 1024);
-        char *url;
-        char *save = urlsCopy;
-        while ((url = strtok_r(save, " ", &save))) {
-            int retry = 0;
-            while (url && retry < TCP_BIND_MAX_RETRY) {
-                pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
-                int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
-                if (rc < 0) {
-                    L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
-                } else {
-                    url = NULL;
-                }
-                pubsub_utils_url_free(urlInfo);
-                retry++;
-            }
+    if (!sender->isPassive) {
+        //setting up tcp socket for TCP TopicSender
+        if (discUrl != NULL) {
+            urls = strndup(discUrl, 1024 * 1024);
+            sender->isStatic = true;
+        } else if (ip != NULL) {
+            urls = strndup(ip, 1024 * 1024);
+        } else {
+            struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
+            urls = pubsub_utils_url_get_url(sin, NULL);
+            free(sin);
         }
-        free(urlsCopy);
-        sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
-    }
-    if (urls)
+        if (!sender->url) {
+            char *urlsCopy = strndup(urls, 1024 * 1024);
+            char *url;
+            char *save = urlsCopy;
+            while ((url = strtok_r(save, " ", &save))) {
+                int retry = 0;
+                while (url && retry < TCP_BIND_MAX_RETRY) {
+                    pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+                    int rc = pubsub_tcpHandler_listen(sender->socketHandler, urlInfo->url);
+                    if (rc < 0) {
+                        L_WARN("Error for tcp_bind using dynamic bind url '%s'. %s", urlInfo->url, strerror(errno));
+                    } else {
+                        url = NULL;
+                    }
+                    pubsub_utils_url_free(urlInfo);
+                    retry++;
+                }
+            }
+            free(urlsCopy);
+            sender->url = pubsub_tcpHandler_get_interface_url(sender->socketHandler);
+        }
         free(urls);
+    }
 
-    if (sender->url != NULL) {
+    //register publisher services using a service factory
+    if ((sender->url != NULL) ||  (sender->isPassive)) {
         sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
         sender->topic = strndup(topic, 1024 * 1024);
 
         celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
         sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
-    }
 
-    //register publisher services using a service factory
-    if (sender->url != NULL) {
         sender->publisher.factory.handle = sender;
         sender->publisher.factory.getService = psa_tcp_getPublisherService;
         sender->publisher.factory.ungetService = psa_tcp_ungetPublisherService;
@@ -274,9 +267,7 @@
         opts.properties = props;
 
         sender->publisher.svcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
-    }
-
-    if (sender->url == NULL) {
+    } else {
         free(sender);
         sender = NULL;
     }
@@ -312,6 +303,7 @@
         celixThreadMutex_unlock(&sender->boundedServices.mutex);
         celixThreadMutex_destroy(&sender->boundedServices.mutex);
 
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
         if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
             pubsub_tcpHandler_destroy(sender->socketHandler);
             sender->socketHandler = NULL;
@@ -343,18 +335,25 @@
 }
 
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
-    return sender->url;
+    if (sender->isPassive) {
+        return pubsub_tcpHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
 }
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender) {
     return sender->isStatic;
 }
 
-void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
     //TODO subscriber count -> topic info
 }
 
-void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint) {
+void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender __attribute__((unused)), const celix_properties_t *endpoint __attribute__((unused))) {
     //TODO
 }
 
@@ -483,8 +482,7 @@
             result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
             result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
             result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
-            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds =
-                mEntry->metrics.averageTimeBetweenMessagesInSeconds;
+            result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
             result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
             result->msgMetrics[i].bndId = entry->bndId;
             result->msgMetrics[i].typeId = mEntry->type;
@@ -533,7 +531,11 @@
             clock_gettime(CLOCK_REALTIME, &serializationEnd);
         }
 
-        if (status == CELIX_SUCCESS /*ser ok*/) {
+        bool cont = false;
+        if (status == CELIX_SUCCESS) /*ser ok*/ {
+            cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, &metadata);
+        }
+        if (cont) {
             pubsub_protocol_message_t message;
             message.metadata.metadata = NULL;
             message.payload.payload = NULL;
@@ -555,12 +557,12 @@
             entry->seqNr++;
             bool sendOk = true;
             {
-                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput,
-                                                 serializedIoVecOutputLen, 0);
+                int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
                 if (rc < 0) {
                     status = -1;
                     sendOk = false;
                 }
+                pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, entry->msgSer->msgName, msgTypeId, inMsg, metadata);
                 if (message.metadata.metadata)
                     celix_properties_destroy(message.metadata.metadata);
                 if (serializedIoVecOutput) {
@@ -617,8 +619,10 @@
     static bool firstSend = true;
 
     if (firstSend) {
-        L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
-        sleep(FIRST_SEND_DELAY_IN_SECONDS);
+        if (sender->send_delay ) {
+            L_INFO("PSA_TCP_TP: Delaying first send for late joiners...\n");
+        }
+        usleep(sender->send_delay * 1000);
         firstSend = false;
     }
-}
+}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
index 2217989..ba5c3c9 100644
--- a/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/src/pubsub_tcp_topic_sender.h
@@ -33,28 +33,23 @@
     const char *scope,
     const char *topic,
     const celix_properties_t *topicProperties,
-    pubsub_tcp_endPointStore_t *endPointStore,
+    pubsub_tcp_endPointStore_t *handlerStore,
     long serializerSvcId,
     pubsub_serializer_service_t *ser,
     long protocolSvcId,
     pubsub_protocol_service_t *prot);
 
 void pubsub_tcpTopicSender_destroy(pubsub_tcp_topic_sender_t *sender);
-
 const char *pubsub_tcpTopicSender_scope(pubsub_tcp_topic_sender_t *sender);
-
 const char *pubsub_tcpTopicSender_topic(pubsub_tcp_topic_sender_t *sender);
-
 const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender);
-
 bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
-
+bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
 long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
-
 long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
-
+/* Note this functions are deprecated and not used */
 void pubsub_tcpTopicSender_connectTo(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
-
+/* Note this functions are deprecated and not used */
 void pubsub_tcpTopicSender_disconnectFrom(pubsub_tcp_topic_sender_t *sender, const celix_properties_t *endpoint);
 
 /**
diff --git a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
index 6bd79d0..36f4f94 100644
--- a/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
+++ b/bundles/pubsub/pubsub_protocol/pubsub_protocol_wire_v1/src/pubsub_wire_protocol_impl.c
@@ -162,6 +162,7 @@
                 message->header.seqNr           = 0;
                 message->header.payloadPartSize = message->header.payloadSize;
                 message->header.payloadOffset   = 0;
+                message->header.isLastSegment   = 0x1;
             }
         }
     } else {
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
index 4b4ad90..be91e26 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.c
@@ -39,7 +39,7 @@
 #include "pubsub_admin.h"
 #include "../../pubsub_admin_udp_mc/src/pubsub_udpmc_topic_sender.h"
 
-#define PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS       30L
+#define PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS       30L
 
 #ifndef UUID_STR_LEN
 #define UUID_STR_LEN    37
@@ -79,8 +79,11 @@
 
     manager->loghelper = logHelper;
     manager->verbose = celix_bundleContext_getPropertyAsBool(context, PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY, PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE);
-    manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_DEFAULT_SLEEPTIME_IN_SECONDS);
-
+    unsigned handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY, PSTM_PSA_HANDLING_SLEEPTIME_IN_SECONDS);
+    if ( handlingThreadSleepTime >= 0 ) {
+        manager->handlingThreadSleepTime = handlingThreadSleepTime * 1000L;
+    }
+    manager->handlingThreadSleepTime = celix_bundleContext_getPropertyAsLong(context, PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY,  manager->handlingThreadSleepTime);
     manager->psaHandling.running = true;
     celixThread_create(&manager->psaHandling.thread, NULL, pstm_psaHandlingThread, manager);
     celixThread_setName(&manager->psaHandling.thread, "PubSub TopologyManager");
@@ -718,9 +721,12 @@
         for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
             celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
             listener->revokeEndpoint(listener->handle, endpoint);
-            celix_properties_destroy(endpoint);
         }
     }
+    for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+        celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+        celix_properties_destroy(endpoint);
+    }
     celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
     celix_arrayList_destroy(revokeEndpoints);
 
@@ -758,7 +764,6 @@
                               "Tearing down TopicReceiver for scope/topic %s/%s with psa admin type %s and serializer %s\n",
                               entry->scope == NULL ? "(null)" : entry->scope, entry->topic, adminType, serType);
             }
-
             if (entry->endpoint != NULL) {
                 celix_arrayList_add(revokeEndpoints, celix_properties_copy(entry->endpoint));
                 struct pstm_teardown_entry* teardownEntry = malloc(sizeof(*teardownEntry));
@@ -808,9 +813,13 @@
         for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
             celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
             listener->revokeEndpoint(listener->handle, endpoint);
-            celix_properties_destroy(endpoint);
         }
     }
+    // Clean-up properties
+    for (int k = 0; k < celix_arrayList_size(revokeEndpoints); ++k) {
+        celix_properties_t* endpoint = celix_arrayList_get(revokeEndpoints, k);
+        celix_properties_destroy(endpoint);
+    }
     celixThreadMutex_unlock(&manager->announceEndpointListeners.mutex);
     celix_arrayList_destroy(revokeEndpoints);
 
@@ -1108,7 +1117,7 @@
         pstm_findPsaForEndpoints(manager); //trying to find psa and possible set for endpoints with no psa
 
         celixThreadMutex_lock(&manager->psaHandling.mutex);
-        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime, 0L);
+        celixThreadCondition_timedwaitRelative(&manager->psaHandling.cond, &manager->psaHandling.mutex, manager->handlingThreadSleepTime  / 1000, (manager->handlingThreadSleepTime  % 1000) * 1000000);
         running = manager->psaHandling.running;
         celixThreadMutex_unlock(&manager->psaHandling.mutex);
     }
diff --git a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
index 842d861..9cecf6f 100644
--- a/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
+++ b/bundles/pubsub/pubsub_topology_manager/src/pubsub_topology_manager.h
@@ -31,7 +31,8 @@
 #include "pubsub/subscriber.h"
 
 #define PUBSUB_TOPOLOGY_MANAGER_VERBOSE_KEY         "PUBSUB_TOPOLOGY_MANAGER_VERBOSE"
-#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY         "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"
+#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS_KEY   "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_SECONDS"
+#define PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS_KEY        "PUBSUB_TOPOLOGY_MANAGER_HANDLING_THREAD_SLEEPTIME_MS"
 #define PUBSUB_TOPOLOGY_MANAGER_DEFAULT_VERBOSE     false
 
 
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils.h
index de0d1d6..a326068 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils.h
@@ -29,7 +29,8 @@
 #define PUBSUB_UTILS_QOS_ATTRIBUTE_KEY      "qos"
 #define PUBSUB_UTILS_QOS_TYPE_SAMPLE        "sample"    /* A.k.a. unreliable connection */
 #define PUBSUB_UTILS_QOS_TYPE_CONTROL       "control"   /* A.k.a. reliable connection */
-
+#define PUBSUB_UTILS_PSA_SEND_DELAY         "PSA_SEND_DELAY"
+#define PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY 250 //  250 ms
 
 /**
  * Returns the pubsub info from the provided filter. A pubsub filter should have a topic and can 
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
index 87d4263..b10863c 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_utils_url.h
@@ -28,16 +28,16 @@
   char *url;
   char *protocol;
   char *hostname;
-  unsigned int portnr;
+  unsigned int port_nr;
   char *uri;
   char *interface;
-  unsigned int interface_portnr;
+  unsigned int interface_port_nr;
   char *interface_url;
 } pubsub_utils_url_t;
 
 struct sockaddr_in *pubsub_utils_url_from_fd(int fd);
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port);
-char *pubsub_utils_url_generate_url(char *hostname, unsigned int portnr, char *protocol);
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port);
+char *pubsub_utils_url_generate_url(char *hostname, unsigned int port_nr, char *protocol);
 char *pubsub_utils_url_get_url(struct sockaddr_in *inp, char *protocol);
 bool pubsub_utils_url_is_multicast(char *hostname);
 char *pubsub_utils_url_get_multicast_ip(char *hostname);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
index d8d518c..65a1ff2 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_utils_url.c
@@ -56,7 +56,7 @@
     return inp;
 }
 
-struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, int port) {
+struct sockaddr_in *pubsub_utils_url_getInAddr(const char *hostname, unsigned int port) {
     struct hostent *hp;
     struct sockaddr_in *inp = malloc(sizeof(struct sockaddr_in));
     bzero(inp, sizeof(struct sockaddr_in)); // zero the struct
@@ -220,11 +220,11 @@
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+                url_info->port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->portnr = portDigits;
+                    url_info->port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -256,11 +256,11 @@
                 maxPortnr += 1;
                 unsigned int minDigits = (unsigned int) atoi(portnr);
                 unsigned int maxDigits = (unsigned int) atoi(maxPortnr);
-                url_info->interface_portnr = pubsub_utils_url_rand_range(minDigits, maxDigits);
+                url_info->interface_port_nr = pubsub_utils_url_rand_range(minDigits, maxDigits);
             } else {
                 unsigned int portDigits = (unsigned int) atoi(portnr);
                 if (portDigits != 0)
-                    url_info->interface_portnr = portDigits;
+                    url_info->interface_port_nr = portDigits;
                 uri = strstr(port, "/");
                 if ((uri) && (!url_info->uri))
                     url_info->uri = celix_utils_strdup(uri);
@@ -289,13 +289,13 @@
             free(url_info->interface);
             url_info->interface = ip;
         }
-        struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_portnr);
+        struct sockaddr_in *m_sin = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr);
         url_info->interface_url = pubsub_utils_url_get_url(m_sin, NULL);
         free(m_sin);
         pubsub_utils_url_parse_url(url_info->interface_url, &interface_url_info);
         free(url_info->interface);
         url_info->interface = interface_url_info.hostname;
-        url_info->interface_portnr = interface_url_info.portnr;
+        url_info->interface_port_nr = interface_url_info.port_nr;
     }
 
     if (url_info->hostname) {
@@ -306,11 +306,11 @@
             free(url_info->hostname);
             url_info->hostname = ip;
         }
-        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->portnr);
+        struct sockaddr_in *sin = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr);
         url_info->url = pubsub_utils_url_get_url(sin, url_info->protocol);
         free(url_info->hostname);
         free(sin);
-        url_info->portnr = 0;
+        url_info->port_nr = 0;
         url_info->hostname = NULL;
         pubsub_utils_url_parse_url(url_info->url, url_info);
     }
@@ -338,7 +338,7 @@
     url_info->hostname = NULL;
     url_info->protocol = NULL;
     url_info->interface = NULL;
-    url_info->portnr = 0;
-    url_info->interface_portnr = 0;
+    url_info->port_nr = 0;
+    url_info->interface_port_nr = 0;
     free(url_info);
 }
diff --git a/bundles/pubsub/test/CMakeLists.txt b/bundles/pubsub/test/CMakeLists.txt
index c0eb5fc..a22122e 100644
--- a/bundles/pubsub/test/CMakeLists.txt
+++ b/bundles/pubsub/test/CMakeLists.txt
@@ -38,7 +38,7 @@
 add_celix_bundle(pubsub_endpoint_tst
         #Test bundle containing cpputests and uses celix_test_runner launcher instead of the celix launcher
         SOURCES
-        test/tst_activator.c
+        test/tst_endpoint_activator.c
         VERSION 1.0.0
         )
 target_link_libraries(pubsub_endpoint_tst PRIVATE Celix::framework Celix::pubsub_api)
@@ -47,7 +47,7 @@
         DESTINATION "META-INF/descriptors"
         )
 celix_bundle_files(pubsub_endpoint_tst
-        meta_data/ping2.properties
+        meta_data/pong3.properties
         DESTINATION "META-INF/topics/sub"
         )
 
@@ -65,7 +65,7 @@
         DESTINATION "META-INF/descriptors"
         )
 celix_bundle_files(pubsub_loopback
-        meta_data/pong2.properties
+        meta_data/ping3.properties
         DESTINATION "META-INF/topics/pub"
         )
 celix_bundle_files(pubsub_loopback
@@ -162,9 +162,9 @@
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_udp_multicast
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
@@ -188,7 +188,7 @@
 endif()
 
 if (BUILD_PUBSUB_PSA_TCP)
-    add_celix_container(pubsub_tcp_tests
+    add_celix_container(pubsub_tcp_wire_v1_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
             LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
             DIR ${CMAKE_CURRENT_BINARY_DIR}
@@ -198,17 +198,37 @@
             Celix::shell
             Celix::shell_tui
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v1
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_tcp
-            Celix::pubsub_protocol_wire_v2
             pubsub_sut
             pubsub_tst
             )
-    target_link_libraries(pubsub_tcp_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
-    target_include_directories(pubsub_tcp_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
-    add_test(NAME pubsub_tcp_tests COMMAND pubsub_tcp_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_tests,CONTAINER_LOC>)
-    setup_target_for_coverage(pubsub_tcp_tests SCAN_DIR ..)
+    target_link_libraries(pubsub_tcp_wire_v1_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
+    target_include_directories(pubsub_tcp_wire_v1_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+    add_test(NAME pubsub_tcp_wire_v1_tests COMMAND pubsub_tcp_wire_v1_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_wire_v1_tests,CONTAINER_LOC>)
+    setup_target_for_coverage(pubsub_tcp_wire_v1_tests SCAN_DIR ..)
 
+    add_celix_container(pubsub_tcp_wire_v2_tests
+            USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
+            LAUNCHER_SRC ${CMAKE_CURRENT_LIST_DIR}/test/test_runner.cc
+            DIR ${CMAKE_CURRENT_BINARY_DIR}
+            PROPERTIES
+            LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
+            BUNDLES
+            Celix::shell
+            Celix::shell_tui
+            Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
+            Celix::pubsub_topology_manager
+            Celix::pubsub_admin_tcp
+            pubsub_sut
+            pubsub_tst
+            )
+    target_link_libraries(pubsub_tcp_wire_v2_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
+    target_include_directories(pubsub_tcp_wire_v2_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
+    add_test(NAME pubsub_tcp_wire_v2_tests COMMAND pubsub_tcp_wire_v2_tests WORKING_DIRECTORY $<TARGET_PROPERTY:pubsub_tcp_wire_v2_tests,CONTAINER_LOC>)
+    setup_target_for_coverage(pubsub_tcp_wire_v2_tests SCAN_DIR ..)
 
     add_celix_container(pubsub_tcp_endpoint_tests
             USE_CONFIG #ensures that a config.properties will be created with the launch bundles.
@@ -220,12 +240,12 @@
             Celix::shell
             Celix::shell_tui
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_tcp
-            Celix::pubsub_protocol_wire_v1
-            pubsub_loopback
-            pubsub_endpoint_sut
             pubsub_endpoint_tst
+            pubsub_endpoint_sut
+            pubsub_loopback
             )
     target_link_libraries(pubsub_tcp_endpoint_tests PRIVATE Celix::pubsub_api ${CppUTest_LIBRARIES} Jansson Celix::dfi)
     target_include_directories(pubsub_tcp_endpoint_tests SYSTEM PRIVATE ${CppUTest_INCLUDE_DIR} test)
@@ -238,9 +258,9 @@
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_tcp
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
@@ -295,9 +315,9 @@
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_websocket
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
@@ -328,9 +348,9 @@
                 LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
                 Celix::pubsub_serializer_json
+                Celix::pubsub_protocol_wire_v1
                 Celix::pubsub_topology_manager
                 Celix::pubsub_admin_zmq
-                Celix::pubsub_protocol_wire_v1
                 pubsub_sut
                 pubsub_tst
                 pubsub_serializer
@@ -370,9 +390,9 @@
         LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
         BUNDLES
         Celix::pubsub_serializer_json
+        Celix::pubsub_protocol_wire_v2
         Celix::pubsub_topology_manager
         Celix::pubsub_admin_zmq
-        Celix::pubsub_protocol_wire_v2
         pubsub_sut
         pubsub_tst
         pubsub_serializer
@@ -393,9 +413,9 @@
                 PSA_ZMQ_ZEROCOPY_ENABLED=true
             BUNDLES
                 Celix::pubsub_serializer_json
+                Celix::pubsub_protocol_wire_v1
                 Celix::pubsub_topology_manager
                 Celix::pubsub_admin_zmq
-                Celix::pubsub_protocol_wire_v1
                 Celix::shell
                 Celix::shell_tui
                 pubsub_sut
@@ -442,9 +462,9 @@
         PSA_ZMQ_ZEROCOPY_ENABLED=true
         BUNDLES
         Celix::pubsub_serializer_json
+        Celix::pubsub_protocol_wire_v2
         Celix::pubsub_topology_manager
         Celix::pubsub_admin_zmq
-        Celix::pubsub_protocol_wire_v2
         Celix::shell
         Celix::shell_tui
         pubsub_sut
@@ -465,9 +485,9 @@
             LOGHELPER_STDOUT_FALLBACK_INCLUDE_DEBUG=true
             BUNDLES
             Celix::pubsub_serializer_json
+            Celix::pubsub_protocol_wire_v2
             Celix::pubsub_topology_manager
             Celix::pubsub_admin_zmq
-            Celix::pubsub_protocol_wire_v2
             Celix::shell
             Celix::shell_tui
             )
diff --git a/bundles/pubsub/test/meta_data/ping2.properties b/bundles/pubsub/test/meta_data/ping2.properties
index 4b42836..ff0dbed 100644
--- a/bundles/pubsub/test/meta_data/ping2.properties
+++ b/bundles/pubsub/test/meta_data/ping2.properties
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 tcp.static.bind.url=tcp://localhost:9500
-tcp.static.endpoint.type=server
+tcp.passive.key=tcp://localhost:9500
 
 #note only effective if run as root
 thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/meta_data/ping3.properties b/bundles/pubsub/test/meta_data/ping3.properties
new file mode 100644
index 0000000..5571705
--- /dev/null
+++ b/bundles/pubsub/test/meta_data/ping3.properties
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+tcp.passive.key=tcp://localhost
+tcp.passive.configured=true
+#note only effective if run as root
+thread.realtime.sched=SCHED_FIFO
+thread.realtime.prio=50
+
diff --git a/bundles/pubsub/test/meta_data/pong2.properties b/bundles/pubsub/test/meta_data/pong2.properties
index fa55718..b95f3bc 100644
--- a/bundles/pubsub/test/meta_data/pong2.properties
+++ b/bundles/pubsub/test/meta_data/pong2.properties
@@ -14,8 +14,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-tcp.static.connect.urls=tcp://localhost:9500;localhost:9501
-tcp.static.endpoint.type=client
+tcp.static.connect.urls=tcp://localhost:9500
+tcp.passive.key=tcp://localhost
 
 #note only effective if run as root
 thread.realtime.sched=SCHED_FIFO
diff --git a/bundles/pubsub/test/meta_data/pong3.properties b/bundles/pubsub/test/meta_data/pong3.properties
new file mode 100644
index 0000000..cb64543
--- /dev/null
+++ b/bundles/pubsub/test/meta_data/pong3.properties
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+tcp.passive.key=tcp://localhost:9500
+tcp.passive.configured=true
+#note only effective if run as root
+thread.realtime.sched=SCHED_FIFO
+thread.realtime.prio=50
+
diff --git a/bundles/pubsub/test/test/loopback_activator.c b/bundles/pubsub/test/test/loopback_activator.c
index 43d8a78..8024fd5 100644
--- a/bundles/pubsub/test/test/loopback_activator.c
+++ b/bundles/pubsub/test/test/loopback_activator.c
@@ -42,7 +42,7 @@
 celix_status_t bnd_start(struct activator *act, celix_bundle_context_t *ctx) {
 
 	char filter[512];
-	snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "pong2");
+	snprintf(filter, 512, "(%s=%s)", PUBSUB_PUBLISHER_TOPIC, "ping3");
 	celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
 	opts.set = sut_pubSet;
 	opts.callbackHandle = act;
@@ -86,6 +86,7 @@
   msg_t *msg = voidMsg;
   msg_t send_msg = *msg;
   pthread_mutex_lock(&act->mutex);
+
   if (act->pubSvc != NULL) {
     if (act->count == 0) {
       act->pubSvc->localMsgTypeIdForMsgType(act->pubSvc->handle, MSG_NAME, &act->msgId);
diff --git a/bundles/pubsub/test/test/tst_endpoint_activator.c b/bundles/pubsub/test/test/tst_endpoint_activator.c
index 636b9a6..05520ed 100644
--- a/bundles/pubsub/test/test/tst_endpoint_activator.c
+++ b/bundles/pubsub/test/test/tst_endpoint_activator.c
@@ -48,7 +48,7 @@
 
     {
         celix_properties_t *props = celix_properties_create();
-        celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "ping2");
+        celix_properties_set(props, PUBSUB_SUBSCRIBER_TOPIC, "pong3");
         act->subSvc.handle = act;
         act->subSvc.receive = tst_receive;
         act->subSvcId = celix_bundleContext_registerService(ctx, &act->subSvc, PUBSUB_SUBSCRIBER_SERVICE_NAME, props);