| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| /* |
| * pubsub_skt_handler.c |
| * |
| * \date July 18, 2019 |
| * \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a> |
| * \copyright Apache License, Version 2.0 |
| */ |
| |
| #include <stdio.h> |
| #include <string.h> |
| #include <unistd.h> |
| #include <stdlib.h> |
| #include <errno.h> |
| #include <array_list.h> |
| #include <pthread.h> |
| #if defined(__APPLE__) |
| #include <sys/types.h> |
| #include <sys/event.h> |
| #include <sys/time.h> |
| #else |
| #include <sys/epoll.h> |
| #endif |
| #include <limits.h> |
| #include <fcntl.h> |
| #include <arpa/inet.h> |
| #include <netinet/tcp.h> |
| //#include <netinet/udp.h> |
| #include "hash_map.h" |
| #include "utils.h" |
| #include "pubsub_skt_handler.h" |
| |
| #define MAX_EVENTS 64 |
| #define MAX_DEFAULT_BUFFER_SIZE 4u |
| |
| #if defined(__APPLE__) |
| #ifndef MSG_NOSIGNAL |
| #define MSG_NOSIGNAL (0) |
| #endif |
| #endif |
| |
| #define L_DEBUG(...) \ |
| celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__) |
| #define L_INFO(...) \ |
| celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_INFO, __VA_ARGS__) |
| #define L_WARN(...) \ |
| celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_WARNING, __VA_ARGS__) |
| #define L_ERROR(...) \ |
| celix_logHelper_log(handle->logHelper, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__) |
| |
| // |
| // Entry administration |
| // |
| typedef struct psa_skt_connection_entry { |
| char *interface_url; |
| char *url; |
| int fd; |
| int socket_domain; |
| int socket_type; |
| char* protocol; |
| struct sockaddr_in addr; |
| struct sockaddr_in dst_addr; |
| socklen_t len; |
| bool connected; |
| bool headerError; |
| pubsub_protocol_message_t header; |
| size_t maxMsgSize; |
| size_t readHeaderSize; |
| size_t readHeaderBufferSize; // Size of headerBuffer |
| void *readHeaderBuffer; |
| size_t writeHeaderBufferSize; // Size of headerBuffer |
| void *writeHeaderBuffer; |
| size_t readFooterSize; |
| size_t readFooterBufferSize; |
| void *readFooterBuffer; |
| size_t writeFooterBufferSize; |
| void *writeFooterBuffer; |
| size_t bufferSize; |
| void *buffer; |
| size_t readMetaBufferSize; |
| void *readMetaBuffer; |
| size_t writeMetaBufferSize; |
| void *writeMetaBuffer; |
| unsigned int retryCount; |
| celix_thread_mutex_t writeMutex; |
| struct msghdr readMsg; |
| struct sockaddr_in readMsgAddr; |
| } psa_skt_connection_entry_t; |
| |
| // |
| // Handle administration |
| // |
| struct pubsub_sktHandler { |
| celix_thread_rwlock_t dbLock; |
| unsigned int timeout; |
| hash_map_t *connection_url_map; |
| hash_map_t *connection_fd_map; |
| hash_map_t *interface_url_map; |
| hash_map_t *interface_fd_map; |
| int efd; |
| int fd; |
| pubsub_sktHandler_receiverConnectMessage_callback_t receiverConnectMessageCallback; |
| pubsub_sktHandler_receiverConnectMessage_callback_t receiverDisconnectMessageCallback; |
| void *receiverConnectPayload; |
| pubsub_sktHandler_acceptConnectMessage_callback_t acceptConnectMessageCallback; |
| pubsub_sktHandler_acceptConnectMessage_callback_t acceptDisconnectMessageCallback; |
| void *acceptConnectPayload; |
| pubsub_sktHandler_processMessage_callback_t processMessageCallback; |
| void *processMessagePayload; |
| celix_log_helper_t *logHelper; |
| pubsub_protocol_service_t *protocol; |
| unsigned int bufferSize; |
| unsigned int maxMsgSize; |
| unsigned int maxSendRetryCount; |
| unsigned int maxRcvRetryCount; |
| double sendTimeout; |
| double rcvTimeout; |
| celix_thread_t thread; |
| bool running; |
| bool enableReceiveEvent; |
| }; |
| |
| static inline int pubsub_sktHandler_closeConnectionEntry(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, bool lock); |
| static inline int pubsub_sktHandler_closeInterfaceEntry(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry); |
| static inline int pubsub_sktHandler_makeNonBlocking(pubsub_sktHandler_t *handle, int fd); |
| static inline struct sockaddr_in pubsub_sktHandler_getMultiCastAddr(psa_skt_connection_entry_t *entry, struct sockaddr_in* sin, struct sockaddr_in* intf_addr ); |
| static inline psa_skt_connection_entry_t* pubsub_sktHandler_createEntry(pubsub_sktHandler_t *handle, int fd, char *url, char *interface_url, struct sockaddr_in *addr); |
| static inline void pubsub_sktHandler_freeEntry(psa_skt_connection_entry_t *entry); |
| static inline void pubsub_sktHandler_releaseEntryBuffer(pubsub_sktHandler_t *handle, int fd, unsigned int index); |
| static inline long int pubsub_sktHandler_getMsgSize(psa_skt_connection_entry_t *entry); |
| static inline void pubsub_sktHandler_ensureReadBufferCapacity(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry); |
| static inline bool pubsub_sktHandler_readHeader(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry, long int* msgSize); |
| static inline void pubsub_sktHandler_decodePayload(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry); |
| static inline long int pubsub_sktHandler_readPayload(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry); |
| static inline void pubsub_sktHandler_connectionHandler(pubsub_sktHandler_t *handle, int fd); |
| static inline void pubsub_sktHandler_handler(pubsub_sktHandler_t *handle); |
| static void *pubsub_sktHandler_thread(void *data); |
| |
| |
| |
| // |
| // Create a handle |
| // |
| pubsub_sktHandler_t *pubsub_sktHandler_create(pubsub_protocol_service_t *protocol, celix_log_helper_t *logHelper) { |
| pubsub_sktHandler_t *handle = calloc(sizeof(*handle), 1); |
| if (handle != NULL) { |
| #if defined(__APPLE__) |
| handle->efd = kqueue(); |
| #else |
| handle->efd = epoll_create1(0); |
| #endif |
| handle->fd = -1; |
| handle->connection_url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); |
| handle->connection_fd_map = hashMap_create(NULL, NULL, NULL, NULL); |
| handle->interface_url_map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL); |
| handle->interface_fd_map = hashMap_create(NULL, NULL, NULL, NULL); |
| handle->timeout = 2000; // default 2 sec |
| handle->logHelper = logHelper; |
| handle->protocol = protocol; |
| handle->bufferSize = MAX_DEFAULT_BUFFER_SIZE; |
| celixThreadRwlock_create(&handle->dbLock, 0); |
| handle->running = true; |
| celixThread_create(&handle->thread, NULL, pubsub_sktHandler_thread, handle); |
| // signal(SIGPIPE, SIG_IGN); |
| } |
| return handle; |
| } |
| |
| // |
| // Destroys the handle |
| // |
| void pubsub_sktHandler_destroy(pubsub_sktHandler_t *handle) { |
| if (handle != NULL) { |
| celixThreadRwlock_readLock(&handle->dbLock); |
| bool running = handle->running; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| if (running) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->running = false; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| celixThread_join(handle->thread, NULL); |
| } |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| hash_map_iterator_t connection_iter = hashMapIterator_construct(handle->connection_url_map); |
| while (hashMapIterator_hasNext(&connection_iter)) { |
| psa_skt_connection_entry_t *entry = hashMapIterator_nextValue(&connection_iter); |
| if (entry != NULL) { |
| pubsub_sktHandler_closeConnectionEntry(handle, entry, true); |
| } |
| } |
| hash_map_iterator_t interface_iter = hashMapIterator_construct(handle->interface_url_map); |
| while (hashMapIterator_hasNext(&interface_iter)) { |
| psa_skt_connection_entry_t *entry = hashMapIterator_nextValue(&interface_iter); |
| if (entry != NULL) { |
| pubsub_sktHandler_closeInterfaceEntry(handle, entry); |
| } |
| } |
| if (handle->efd >= 0) close(handle->efd); |
| hashMap_destroy(handle->connection_url_map, false, false); |
| hashMap_destroy(handle->connection_fd_map, false, false); |
| hashMap_destroy(handle->interface_url_map, false, false); |
| hashMap_destroy(handle->interface_fd_map, false, false); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| celixThreadRwlock_destroy(&handle->dbLock); |
| free(handle); |
| } |
| } |
| |
| |
| |
| // |
| // Open the socket using an url |
| // |
| int pubsub_sktHandler_open(pubsub_sktHandler_t *handle, int socket_type, char *url) { |
| int rc = 0; |
| celixThreadRwlock_readLock(&handle->dbLock); |
| pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url); |
| int socket_domain = AF_INET; |
| //int socket_type = SOCK_STREAM(tcp); SOCK_DGRAM(udp); |
| if (url_info->protocol) { |
| // IPC is not supported !!! |
| //socket_domain = (!strcmp("ipc", url_info->protocol)) ? AF_LOCAL : AF_INET; |
| int url_socket_type = (!strcmp("udp", url_info->protocol)) ? SOCK_DGRAM : SOCK_STREAM; |
| if (url_socket_type != socket_type) { |
| L_ERROR("[SKT Socket] unexpected url socket type %s != %s \n", url, socket_type==SOCK_STREAM ? "tcp" : "udp"); |
| return -1; |
| } |
| } |
| // bool useBind = (socket_type == SOCK_DGRAM) ? false : true; |
| int fd = socket(socket_domain , socket_type, socket_type == SOCK_STREAM ? IPPROTO_TCP : IPPROTO_UDP); |
| if (fd >= 0) { |
| int setting = 1; |
| rc = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &setting, sizeof(setting)); |
| if (rc != 0) { |
| close(fd); |
| L_ERROR("[SKT Handler] Error setsockopt(SO_REUSEADDR): %s\n", strerror(errno)); |
| } |
| if (socket_type == SOCK_STREAM) { |
| if (rc == 0) { |
| rc = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &setting, sizeof(setting)); |
| if (rc != 0) { |
| close(fd); |
| L_ERROR("[TCP SKT Handler] Error setsockopt(SKT_NODELAY): %s\n", strerror(errno)); |
| } |
| } else { |
| L_ERROR("[TCP SKT Handler] Error creating socket: %s\n", strerror(errno)); |
| } |
| } |
| if (rc == 0 && handle->sendTimeout != 0.0) { |
| struct timeval tv; |
| tv.tv_sec = (long int) handle->sendTimeout; |
| tv.tv_usec = (long int) ((handle->sendTimeout - tv.tv_sec) * 1000000.0); |
| rc = setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); |
| if (rc != 0) { |
| L_ERROR("[SKT Handler] Error setsockopt (SO_SNDTIMEO) to set send timeout: %s", strerror(errno)); |
| } |
| } |
| if (rc == 0 && handle->rcvTimeout != 0.0) { |
| struct timeval tv; |
| tv.tv_sec = (long int) handle->rcvTimeout; |
| tv.tv_usec = (long int) ((handle->rcvTimeout - tv.tv_sec) * 1000000.0); |
| rc = setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); |
| if (rc != 0) { |
| L_ERROR("[SKT Handler] Error setsockopt (SO_RCVTIMEO) to set send timeout: %s", strerror(errno)); |
| } |
| } |
| } else { |
| L_ERROR("[SKT Handler] Error creating socket: %s\n", strerror(errno)); |
| } |
| pubsub_utils_url_free(url_info); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return (!rc) ? fd : rc; |
| } |
| |
| // |
| // Open the socket using an url |
| // |
| int pubsub_sktHandler_bind(pubsub_sktHandler_t *handle, int fd, char *url, unsigned int port_nr) { |
| int rc = 0; |
| celixThreadRwlock_readLock(&handle->dbLock); |
| struct sockaddr_in *addr = NULL; |
| int socket_domain = AF_INET; |
| #if !defined(__APPLE__) |
| socklen_t length = sizeof(int); |
| rc = getsockopt(fd, SOL_SOCKET, SO_DOMAIN, &socket_domain, &length); |
| #endif |
| if (url) { |
| pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url); |
| if (url_info->interface) { |
| addr = pubsub_utils_url_getInAddr(url_info->interface, (!port_nr) ? url_info->interface_port_nr : port_nr); |
| } else { |
| addr = pubsub_utils_url_getInAddr(url_info->hostname, (!port_nr) ? url_info->port_nr : port_nr); |
| } |
| pubsub_utils_url_free(url_info); |
| } else { |
| addr = pubsub_utils_url_getInAddr(NULL, port_nr); |
| } |
| if (addr) { |
| addr->sin_family = socket_domain; |
| rc = bind(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); |
| if (rc != 0) { |
| close(fd); |
| L_ERROR("[SKT Handler] Error bind: %s\n", strerror(errno)); |
| fd = -1; |
| } |
| free(addr); |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return (!rc) ? fd : rc; |
| } |
| |
| |
| // |
| // Closes the discriptor with it's connection/interfaces (receiver/sender) |
| // |
| int pubsub_sktHandler_close(pubsub_sktHandler_t *handle, int fd) { |
| int rc = 0; |
| if (handle != NULL) { |
| psa_skt_connection_entry_t *entry = NULL; |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); |
| if (entry) { |
| entry = hashMap_remove(handle->interface_url_map, (void *) (intptr_t) entry->url); |
| rc = pubsub_sktHandler_closeInterfaceEntry(handle, entry); |
| entry = NULL; |
| } |
| entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); |
| if (entry) { |
| entry = hashMap_remove(handle->connection_url_map, (void *) (intptr_t) entry->url); |
| rc = pubsub_sktHandler_closeConnectionEntry(handle, entry, false); |
| entry = NULL; |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| return rc; |
| } |
| |
| // |
| // Create connection/interface entry |
| // |
| static inline psa_skt_connection_entry_t * |
| pubsub_sktHandler_createEntry(pubsub_sktHandler_t *handle, int fd, char *url, char *interface_url, struct sockaddr_in *addr) { |
| psa_skt_connection_entry_t *entry = NULL; |
| if (fd >= 0) { |
| entry = calloc(sizeof(psa_skt_connection_entry_t), 1); |
| entry->fd = fd; |
| celixThreadMutex_create(&entry->writeMutex, NULL); |
| if (url) { |
| entry->url = strndup(url, 1024 * 1024); |
| } |
| if (interface_url) { |
| entry->interface_url = strndup(interface_url, 1024 * 1024); |
| } |
| entry->len = sizeof(struct sockaddr_in); |
| size_t headerSize = 0; |
| size_t footerSize = 0; |
| socklen_t length = sizeof(int); |
| entry->socket_domain = AF_INET; |
| #if !defined(__APPLE__) |
| getsockopt(fd, SOL_SOCKET, SO_DOMAIN, &entry->socket_domain, &length); |
| #endif |
| getsockopt(fd, SOL_SOCKET, SO_TYPE, &entry->socket_type, &length); |
| if (addr) { |
| entry->addr = *addr; |
| entry->addr.sin_family = entry->socket_domain; |
| } |
| entry->protocol = strndup((entry->socket_type == SOCK_STREAM) ? "tcp" : "udp",10); |
| handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize); |
| handle->protocol->getFooterSize(handle->protocol->handle, &footerSize); |
| entry->readHeaderBufferSize = headerSize; |
| entry->writeHeaderBufferSize = headerSize; |
| |
| entry->readFooterBufferSize = footerSize; |
| entry->writeFooterBufferSize = footerSize; |
| entry->bufferSize = MAX(handle->bufferSize, headerSize); |
| entry->connected = false; |
| unsigned minimalMsgSize = entry->writeHeaderBufferSize + entry->writeFooterBufferSize; |
| if ((minimalMsgSize > handle->maxMsgSize) && (handle->maxMsgSize)) { |
| L_ERROR("[SKT Handler] maxMsgSize (%d) < headerSize + FooterSize (%d): %s\n", handle->maxMsgSize, minimalMsgSize); |
| } else { |
| entry->maxMsgSize = (handle->maxMsgSize) ? handle->maxMsgSize : LONG_MAX; |
| } |
| entry->readHeaderBuffer = calloc(sizeof(char), headerSize); |
| entry->writeHeaderBuffer = calloc(sizeof(char), headerSize); |
| if (entry->readFooterBufferSize ) entry->readFooterBuffer = calloc(sizeof(char), entry->readFooterBufferSize); |
| if (entry->writeFooterBufferSize) entry->writeFooterBuffer = calloc(sizeof(char), entry->writeFooterBufferSize); |
| if (entry->bufferSize) entry->buffer = calloc(sizeof(char), entry->bufferSize); |
| memset(&entry->readMsg, 0x00, sizeof(struct msghdr)); |
| entry->readMsg.msg_iov = calloc(sizeof(struct iovec), IOV_MAX); |
| } |
| return entry; |
| } |
| |
| // |
| // Free connection/interface entry |
| // |
| static inline void |
| pubsub_sktHandler_freeEntry(psa_skt_connection_entry_t *entry) { |
| if (entry) { |
| free(entry->url); |
| free(entry->interface_url); |
| free(entry->buffer); |
| free(entry->protocol); |
| free(entry->readHeaderBuffer); |
| free(entry->writeHeaderBuffer); |
| free(entry->readFooterBuffer); |
| free(entry->writeFooterBuffer); |
| free(entry->readMetaBuffer); |
| free(entry->writeMetaBuffer); |
| free(entry->readMsg.msg_iov); |
| celixThreadMutex_destroy(&entry->writeMutex); |
| free(entry); |
| } |
| } |
| |
| // |
| // Releases the Buffer |
| // |
| static inline void |
| pubsub_sktHandler_releaseEntryBuffer(pubsub_sktHandler_t *handle, int fd, unsigned int index __attribute__((unused))) { |
| psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); |
| if (entry != NULL) { |
| entry->buffer = NULL; |
| entry->bufferSize = 0; |
| } |
| } |
| |
| static |
| int pubsub_sktHandler_add_fd_event(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, bool useInputEvent) |
| { |
| int rc = 0; |
| if ((handle->efd >= 0) && entry) { |
| #if defined(__APPLE__) |
| struct kevent ev; |
| EV_SET (&ev, entry->fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0); // EVFILT_READ | EVFILT_WRITE |
| rc = kevent(handle->efd, &ev, 1, NULL, 0, NULL); |
| #else |
| struct epoll_event event; |
| bzero(&event, sizeof(event)); // zero the struct |
| event.events = EPOLLRDHUP | EPOLLERR; |
| if (useInputEvent) { |
| event.events |= EPOLLIN; |
| } |
| event.data.fd = entry->fd; |
| rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); |
| #endif |
| |
| if (rc < 0) { |
| L_ERROR("[ %s SKT Handler] Cannot create poll: %s\n", entry->protocol, strerror(errno)); |
| errno = 0; |
| } |
| } |
| return rc; |
| }; |
| |
| |
| |
| |
| // |
| // Connect to url (receiver) |
| // |
| static |
| int pubsub_sktHandler_config_udp_connect(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, pubsub_utils_url_t *url_info) { |
| int rc = 0; |
| struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); |
| if (!addr) return -1; |
| bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname); |
| bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname); |
| if (is_multicast) { |
| struct ip_mreq mc_addr; |
| bzero(&mc_addr, sizeof(struct ip_mreq)); |
| mc_addr.imr_multiaddr.s_addr = addr->sin_addr.s_addr; |
| mc_addr.imr_interface.s_addr = entry->addr.sin_addr.s_addr; |
| if (rc == 0) { |
| rc = setsockopt(entry->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mc_addr, sizeof(mc_addr)); |
| } |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Error setsockopt (IP_ADD_MEMBERSHIP): %s", strerror(errno)); |
| } |
| } else if (is_broadcast) { |
| int setting = 1; |
| rc = setsockopt(entry->fd, SOL_SOCKET, SO_BROADCAST, &setting, sizeof(setting)); |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Error setsockopt(SO_BROADCAST): %s", strerror(errno)); |
| } |
| } else { |
| entry->dst_addr = *addr; |
| } |
| |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Cannot connect %s\n", strerror(errno)); |
| } |
| |
| free(addr); |
| return rc; |
| } |
| |
| // |
| // Connect to url (receiver) |
| // |
| int pubsub_sktHandler_tcp_connect(pubsub_sktHandler_t *handle, char *url) { |
| int rc = 0; |
| psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url); |
| if (entry == NULL) { |
| pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url); |
| int fd = pubsub_sktHandler_open(handle, SOCK_STREAM, url_info->interface_url); |
| fd = pubsub_sktHandler_bind(handle, fd, url_info->interface_url, 0); |
| rc = fd; |
| // Connect to sender |
| struct sockaddr_in sin; |
| socklen_t len = sizeof(sin); |
| getsockname(fd, (struct sockaddr *) &sin, &len); |
| char *interface_url = pubsub_utils_url_get_url(&sin, NULL); |
| struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); |
| if ((rc >= 0) && addr) { |
| rc = connect(fd, (struct sockaddr *) addr, sizeof(struct sockaddr)); |
| if (rc < 0 && errno != EINPROGRESS) { |
| L_ERROR("[TCP SKT Handler] Cannot connect to %s:%d: using %s err(%d): %s\n", url_info->hostname, url_info->port_nr, interface_url, errno, strerror(errno)); |
| close(fd); |
| } else { |
| entry = pubsub_sktHandler_createEntry(handle, fd, url, interface_url, &sin); |
| } |
| free(addr); |
| } |
| free(interface_url); |
| if (rc >= 0) { |
| rc = pubsub_sktHandler_add_fd_event(handle, entry, true); |
| } |
| |
| if (rc < 0) { |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| |
| if ((rc >= 0) && (entry)) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| hashMap_put(handle->connection_url_map, entry->url, entry); |
| hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| pubsub_sktHandler_connectionHandler(handle, fd); |
| if (entry->interface_url) { |
| L_INFO("[TCP SKT Handler] Connect to %s using: %s\n", entry->url, entry->interface_url); |
| } else { |
| L_INFO("[TCP SKT Handler] Connect to %s\n", entry->url); |
| } |
| } |
| pubsub_utils_url_free(url_info); |
| } |
| return rc; |
| } |
| |
| // |
| // Connect to url (receiver) |
| // |
| int pubsub_sktHandler_udp_connect(pubsub_sktHandler_t *handle, char *url) { |
| int rc = 0; |
| psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_url_map, (void *) (intptr_t) url); |
| if (entry == NULL) { |
| pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url); |
| bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname); |
| bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname); |
| int fd = pubsub_sktHandler_open(handle, SOCK_DGRAM, url_info->interface_url); |
| if (is_multicast || is_broadcast) { |
| fd = pubsub_sktHandler_bind(handle, fd, NULL, url_info->port_nr); |
| } else { |
| fd = pubsub_sktHandler_bind(handle, fd, url_info->interface_url, 0); |
| } |
| rc = fd; |
| char *pUrl = NULL; |
| struct sockaddr_in *sin = NULL; |
| sin = pubsub_utils_url_from_fd(fd); |
| // check if socket is bind |
| if (sin->sin_port) { |
| pUrl = pubsub_utils_url_get_url(sin, "udp"); |
| } |
| // Make handler fd entry |
| if (fd >= 0) { |
| entry = pubsub_sktHandler_createEntry(handle, fd, url_info->url, pUrl, sin); |
| rc = pubsub_sktHandler_config_udp_connect(handle, entry, url_info); |
| } |
| |
| if (rc >= 0) { |
| rc = pubsub_sktHandler_add_fd_event(handle, entry, true); |
| } |
| |
| if (rc < 0) { |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| |
| if ((rc>=0) && entry) { |
| L_INFO("[%s SKT Handler] Using %s for service annunciation", entry->protocol, entry->url); |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| hashMap_put(handle->connection_url_map, entry->url, entry); |
| hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry); |
| psa_skt_connection_entry_t *interface_entry = pubsub_sktHandler_createEntry(handle, entry->fd, entry->url, entry->interface_url, &entry->addr); |
| hashMap_put(handle->interface_fd_map, (void *) (intptr_t) interface_entry->fd, interface_entry); |
| hashMap_put(handle->interface_url_map, entry->interface_url ? interface_entry->interface_url : interface_entry->url, interface_entry); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| pubsub_sktHandler_connectionHandler(handle, entry->fd); |
| __atomic_store_n(&interface_entry->connected, true, __ATOMIC_RELEASE); |
| __atomic_store_n(&entry->connected, true, __ATOMIC_RELEASE); |
| if (!entry->interface_url) { |
| L_INFO("[%s SKT Handler] Connect to %s", entry->protocol, entry->url); |
| } else { |
| L_INFO("[%s SKT Handler] Connect to %s using: %s", entry->protocol, entry->url, entry->interface_url); |
| } |
| } |
| free(sin); |
| free(pUrl); |
| pubsub_utils_url_free(url_info); |
| } |
| return rc; |
| } |
| |
| // |
| // Disconnect from url |
| // |
| int pubsub_sktHandler_disconnect(pubsub_sktHandler_t *handle, char *url) { |
| int rc = 0; |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| psa_skt_connection_entry_t *entry = NULL; |
| entry = hashMap_remove(handle->connection_url_map, url); |
| if (entry) { |
| pubsub_sktHandler_closeConnectionEntry(handle, entry, false); |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| return rc; |
| } |
| |
| // loses the connection entry (of receiver) |
| // |
| static inline int pubsub_sktHandler_closeConnectionEntry( |
| pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, bool lock) { |
| int rc = 0; |
| if (handle != NULL && entry != NULL) { |
| L_INFO("[%s SKT Handler] Close connection to url: %s: ", entry->protocol, entry->url); |
| hashMap_remove(handle->connection_fd_map, (void *) (intptr_t) entry->fd); |
| if ((handle->efd >= 0)) { |
| // For TCP remove the connection socket |
| if (entry->socket_type == SOCK_STREAM) { |
| #if defined(__APPLE__) |
| struct kevent ev; |
| EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0); |
| rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); |
| #else |
| struct epoll_event event; |
| bzero(&event, sizeof(struct epoll_event)); // zero the struct |
| rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event); |
| #endif |
| if (rc < 0) { |
| L_ERROR("[SKT Handler] Error disconnecting %s", strerror(errno)); |
| } |
| } |
| } |
| if (entry->fd >= 0) { |
| if (handle->receiverDisconnectMessageCallback) |
| handle->receiverDisconnectMessageCallback(handle->receiverConnectPayload, entry->url, lock); |
| if (handle->acceptConnectMessageCallback) |
| handle->acceptConnectMessageCallback(handle->acceptConnectPayload, entry->url); |
| if (entry->socket_type == SOCK_STREAM) { |
| close(entry->fd); |
| } |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| } |
| return rc; |
| } |
| |
| // |
| // Closes the interface entry (of sender) |
| // |
| static inline int |
| pubsub_sktHandler_closeInterfaceEntry(pubsub_sktHandler_t *handle, |
| psa_skt_connection_entry_t *entry) { |
| int rc = 0; |
| if (handle != NULL && entry != NULL) { |
| L_INFO("[%s SKT Handler] Close interface url: %s: ", entry->protocol ,entry->interface_url ? entry->interface_url : entry->url); |
| hashMap_remove(handle->interface_fd_map, (void *) (intptr_t) entry->fd); |
| if ((handle->efd >= 0)) { |
| #if defined(__APPLE__) |
| struct kevent ev; |
| EV_SET (&ev, entry->fd, EVFILT_READ, EV_DELETE , 0, 0, 0); |
| rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); |
| #else |
| struct epoll_event event; |
| bzero(&event, sizeof(struct epoll_event)); // zero the struct |
| rc = epoll_ctl(handle->efd, EPOLL_CTL_DEL, entry->fd, &event); |
| #endif |
| if (rc < 0) { |
| L_ERROR("[SKT Handler] Error disconnecting %s", strerror(errno)); |
| } |
| } |
| if (entry->fd >= 0) { |
| close(entry->fd); |
| pubsub_sktHandler_freeEntry(entry); |
| } |
| } |
| return rc; |
| } |
| |
| static inline |
| struct sockaddr_in pubsub_sktHandler_getMultiCastAddr(psa_skt_connection_entry_t *entry, struct sockaddr_in* sin, struct sockaddr_in* intf_addr ) { |
| pubsub_utils_url_t* multiCastUrl = calloc(1, sizeof(pubsub_utils_url_t)); |
| pubsub_utils_url_parse_url(entry->url, multiCastUrl); |
| char* hostname = NULL; |
| if (multiCastUrl->hostname) hostname = strchr(multiCastUrl->hostname, '/'); |
| if (intf_addr->sin_addr.s_addr && hostname) { |
| in_addr_t listDigit = inet_lnaof(sin->sin_addr); |
| in_addr_t listDigitIntf = inet_lnaof(intf_addr->sin_addr); |
| uint32_t s_addr = ntohl(sin->sin_addr.s_addr); |
| sin->sin_addr.s_addr = htonl(s_addr - listDigit + listDigitIntf); |
| } |
| pubsub_utils_url_free(multiCastUrl); |
| return *sin; |
| } |
| |
| // |
| // Make accept file descriptor non blocking |
| // |
| static inline int pubsub_sktHandler_makeNonBlocking(pubsub_sktHandler_t *handle, int fd) { |
| int rc = 0; |
| int flags = fcntl(fd, F_GETFL, 0); |
| if (flags == -1) |
| rc = flags; |
| else { |
| rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
| if (rc < 0) { |
| L_ERROR("[SKT Handler] Cannot set to NON_BLOCKING: %s\n", strerror(errno)); |
| } |
| } |
| return rc; |
| } |
| |
| // |
| // setup listening to interface (sender) using an url |
| // |
| int pubsub_sktHandler_tcp_listen(pubsub_sktHandler_t *handle, char *url) { |
| int rc = 0; |
| celixThreadRwlock_readLock(&handle->dbLock); |
| psa_skt_connection_entry_t *entry = |
| hashMap_get(handle->connection_url_map, (void *) (intptr_t) url); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| if (entry == NULL) { |
| int fd = pubsub_sktHandler_open(handle, SOCK_STREAM, url); |
| fd = pubsub_sktHandler_bind(handle, fd, url, 0); |
| rc = fd; |
| struct sockaddr_in *sin = pubsub_utils_url_from_fd(fd); |
| // Make handler fd entry |
| char *pUrl = pubsub_utils_url_get_url(sin, "tcp"); |
| entry = pubsub_sktHandler_createEntry(handle, fd, pUrl, NULL, sin); |
| if (entry != NULL) { |
| __atomic_store_n(&entry->connected, true, __ATOMIC_RELEASE); |
| free(pUrl); |
| free(sin); |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| if (rc >= 0) { |
| rc = listen(fd, SOMAXCONN); |
| if (rc != 0) { |
| L_ERROR("[TCP SKT Handler] Error listen: %s\n", strerror(errno)); |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| } |
| if (rc >= 0) { |
| rc = pubsub_sktHandler_makeNonBlocking(handle, fd); |
| if (rc < 0) { |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| } |
| if (rc >= 0) { |
| rc = pubsub_sktHandler_add_fd_event(handle, entry, true); |
| } |
| |
| if (rc < 0) { |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| |
| if ((rc>=0) && entry) { |
| if (entry->interface_url) { |
| L_INFO("[TCP SKT Handler] Using %s:%s for service annunciation", entry->protocol, entry->url, entry->interface_url); |
| } else { |
| L_INFO("[TCP SKT Handler] Using %s for service annunciation", entry->protocol, entry->url); |
| } |
| hashMap_put(handle->interface_fd_map, (void *) (intptr_t) entry->fd, entry); |
| hashMap_put(handle->interface_url_map, entry->url, entry); |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } else { |
| L_ERROR("[TCP SKT Handler] Error listen socket cannot bind to %s: %s\n", url ? url : "", strerror(errno)); |
| } |
| } |
| return rc; |
| } |
| |
| // |
| // setup listening to interface (sender) using an url |
| // |
| static |
| int pubsub_sktHandler_config_udp_bind(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry, pubsub_utils_url_t *url_info) { |
| /** Check UDP type*/ |
| int rc = 0; |
| bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname); |
| bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname); |
| if (is_multicast) { |
| char loop = 1; |
| char ttl = 1; |
| struct sockaddr_in *intf_addr = pubsub_utils_url_getInAddr(url_info->interface, url_info->interface_port_nr); |
| if (!intf_addr) return -1; |
| rc = setsockopt(entry->fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)); |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Error setsockopt (IP_MULTICAST_LOOP): %s", strerror(errno)); |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| rc = setsockopt(entry->fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)); |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Error setsockopt (IP_MULTICAST_LOOP): %s", strerror(errno)); |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| if (!rc) { |
| rc = setsockopt(entry->fd, IPPROTO_IP, IP_MULTICAST_IF, &intf_addr->sin_addr, sizeof(struct in_addr)); |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Error setsockopt(IP_MULTICAST_IF): %s", strerror(errno)); |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| } |
| // bind multi cast address |
| struct sockaddr_in *addr = pubsub_utils_url_getInAddr(url_info->hostname, url_info->port_nr); |
| if (!rc && addr) { |
| rc = bind(entry->fd, (struct sockaddr *) addr, sizeof(*addr)); |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Cannot bind to multicast %s:%d: err(%d): %s\n", url_info->url, url_info->port_nr, errno, strerror(errno)); |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } else { |
| struct sockaddr_in *sin = pubsub_utils_url_from_fd(entry->fd); |
| entry->dst_addr = pubsub_sktHandler_getMultiCastAddr(entry, sin, &entry->addr); |
| free(sin); |
| } |
| } |
| free(addr); |
| free(intf_addr); |
| } else if (is_broadcast) { |
| int setting = 1; |
| if (!rc) { |
| rc = setsockopt(entry->fd, SOL_SOCKET, SO_BROADCAST, &setting, sizeof(setting)); |
| } |
| if (!entry->dst_addr.sin_port) entry->dst_addr.sin_port = entry->addr.sin_port; |
| if (rc != 0) { |
| L_ERROR("[UDP SKT Handler] Error setsockopt(SO_BROADCAST): %s", strerror(errno)); |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| } |
| |
| // Store connection); |
| if (!rc && entry) { |
| if (is_multicast || is_broadcast) { |
| free(entry->url); |
| free(entry->interface_url); |
| entry->url = pubsub_utils_url_get_url(&entry->dst_addr, url_info->protocol ? url_info->protocol : entry->protocol); |
| entry->interface_url = pubsub_utils_url_get_url(&entry->addr, url_info->protocol ? url_info->protocol : entry->protocol); |
| } |
| psa_skt_connection_entry_t *connection_entry = pubsub_sktHandler_createEntry(handle, entry->fd, entry->url, entry->interface_url, &entry->addr); |
| connection_entry->dst_addr = entry->dst_addr; |
| __atomic_store_n(&entry->connected, true, __ATOMIC_RELEASE); |
| __atomic_store_n(&connection_entry->connected, true, __ATOMIC_RELEASE); |
| hashMap_put(handle->connection_fd_map, (void *) (intptr_t) connection_entry->fd, connection_entry); |
| hashMap_put(handle->connection_url_map, connection_entry->url, connection_entry); |
| } |
| |
| // Remove not connected interface |
| if (!entry->connected) { |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| if (!rc && entry) { |
| rc = pubsub_sktHandler_add_fd_event(handle, entry, (!is_multicast && !is_broadcast)); |
| } |
| if (rc < 0) { |
| pubsub_sktHandler_freeEntry(entry); |
| entry = NULL; |
| } |
| |
| if ((rc>=0) && entry) { |
| if (entry->interface_url) { |
| L_INFO("[UDP SKT Handler] Using %s:%s for service annunciation", entry->protocol, entry->url, entry->interface_url); |
| } else { |
| L_INFO("[UDP SKT Handler] Using %s for service annunciation", entry->protocol, entry->url); |
| } |
| hashMap_put(handle->interface_fd_map, (void *) (intptr_t) entry->fd, entry); |
| hashMap_put(handle->interface_url_map, entry->url, entry); |
| } |
| return rc; |
| } |
| |
| // |
| // setup listening to interface (sender) using an url |
| // |
| int pubsub_sktHandler_udp_bind(pubsub_sktHandler_t *handle, char *url) { |
| int rc = 0; |
| celixThreadRwlock_readLock(&handle->dbLock); |
| psa_skt_connection_entry_t *entry = hashMap_get(handle->interface_url_map, (void *) (intptr_t) url); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| if (entry == NULL) { |
| pubsub_utils_url_t *url_info = pubsub_utils_url_parse(url); |
| bool is_multicast = pubsub_utils_url_is_multicast(url_info->hostname); |
| bool is_broadcast = pubsub_utils_url_is_broadcast(url_info->hostname); |
| int fd = pubsub_sktHandler_open(handle, SOCK_DGRAM, url); |
| char *pUrl = NULL; |
| struct sockaddr_in *sin = NULL; |
| if (!is_multicast) { |
| // Make handler fd entry |
| if (is_broadcast) { |
| fd = pubsub_sktHandler_bind(handle, fd, url_info->interface_url ? url_info->interface_url : NULL, url_info->port_nr); |
| } else { |
| fd = pubsub_sktHandler_bind(handle, fd, url, 0); |
| } |
| sin = pubsub_utils_url_from_fd(fd); |
| pUrl = pubsub_utils_url_get_url(sin, "udp"); |
| } |
| rc = fd; |
| if (is_multicast || is_broadcast) { |
| // Create entry for multicast / broadcast |
| entry = pubsub_sktHandler_createEntry(handle, fd, url_info->url, pUrl, sin); |
| } else { |
| // Create entry for unicast |
| entry = pubsub_sktHandler_createEntry(handle, fd, pUrl, NULL, sin); |
| } |
| if (entry != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| rc = pubsub_sktHandler_config_udp_bind(handle, entry, url_info); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } else { |
| L_ERROR("[UDP SKT Socket] Error publish socket cannot bind to %s: %s\n", url ? url : "", strerror(errno)); |
| } |
| free(sin); |
| free(pUrl); |
| pubsub_utils_url_free(url_info); |
| } |
| return rc; |
| } |
| |
| // |
| // Setup default receive buffer size. |
| // This size is used to allocated the initial read buffer, to avoid receive buffer reallocting. |
| // The default receive buffer is allocated in the createEntry when the connection is establised |
| // |
| int pubsub_sktHandler_setReceiveBufferSize(pubsub_sktHandler_t *handle, unsigned int size) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->bufferSize = size; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| return 0; |
| } |
| |
| // |
| // Set Maximum message size |
| // |
| int pubsub_sktHandler_setMaxMsgSize(pubsub_sktHandler_t *handle, unsigned int size) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->maxMsgSize = size; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| return 0; |
| } |
| |
| // |
| // Setup thread timeout |
| // |
| void pubsub_sktHandler_setTimeout(pubsub_sktHandler_t *handle, |
| unsigned int timeout) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->timeout = timeout; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| } |
| |
| // |
| // Setup thread name |
| // |
| void pubsub_sktHandler_setThreadName(pubsub_sktHandler_t *handle, |
| const char *topic, const char *scope) { |
| if ((handle != NULL) && (topic)) { |
| char *thread_name = NULL; |
| if ((scope) && (topic)) |
| asprintf(&thread_name, "SKT TS %s/%s", scope, topic); |
| else |
| asprintf(&thread_name, "SKT TS %s", topic); |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| celixThread_setName(&handle->thread, thread_name); |
| celixThreadRwlock_unlock(&handle->dbLock); |
| free(thread_name); |
| } |
| } |
| |
| // |
| // Setup thread priorities |
| // |
| void pubsub_sktHandler_setThreadPriority(pubsub_sktHandler_t *handle, long prio, |
| const char *sched) { |
| if (handle == NULL) |
| return; |
| |
| if (sched != NULL) { |
| int policy = SCHED_OTHER; |
| if (strncmp("SCHED_OTHER", sched, 16) == 0) { |
| policy = SCHED_OTHER; |
| #if !defined(__APPLE__) |
| } else if (strncmp("SCHED_BATCH", sched, 16) == 0) { |
| policy = SCHED_BATCH; |
| } else if (strncmp("SCHED_IDLE", sched, 16) == 0) { |
| policy = SCHED_IDLE; |
| #endif |
| } else if (strncmp("SCHED_FIFO", sched, 16) == 0) { |
| policy = SCHED_FIFO; |
| } else if (strncmp("SCHED_RR", sched, 16) == 0) { |
| policy = SCHED_RR; |
| } |
| |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| if (prio > 0 && prio < 100) { |
| struct sched_param sch; |
| bzero(&sch, sizeof(struct sched_param)); |
| sch.sched_priority = (int)prio; |
| pthread_setschedparam(handle->thread.thread, policy, &sch); |
| } else { |
| L_INFO("Skipping configuration of thread prio to %i and thread " |
| "scheduling to %s. No permission\n", |
| (int) prio, sched); |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| } |
| |
| void pubsub_sktHandler_setSendRetryCnt(pubsub_sktHandler_t *handle, unsigned int count) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->maxSendRetryCount = count; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| } |
| |
| void pubsub_sktHandler_setReceiveRetryCnt(pubsub_sktHandler_t *handle, unsigned int count) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->maxRcvRetryCount = count; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| } |
| |
| void pubsub_sktHandler_setSendTimeOut(pubsub_sktHandler_t *handle, double timeout) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->sendTimeout = timeout; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| } |
| |
| void pubsub_sktHandler_setReceiveTimeOut(pubsub_sktHandler_t *handle, double timeout) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->rcvTimeout = timeout; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| } |
| |
| void pubsub_sktHandler_enableReceiveEvent(pubsub_sktHandler_t *handle,bool enable) { |
| if (handle != NULL) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->enableReceiveEvent = enable; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| } |
| |
| |
| bool pubsub_sktHandler_isPassive(const char* buffer) { |
| bool isPassive = false; |
| // Parse Properties |
| if (buffer != NULL) { |
| char buf[32]; |
| snprintf(buf, 32, "%s", buffer); |
| char *trimmed = utils_stringTrim(buf); |
| if (strncasecmp("true", trimmed, strlen("true")) == 0) { |
| isPassive = true; |
| } else if (strncasecmp("false", trimmed, strlen("false")) == 0) { |
| isPassive = false; |
| } |
| } |
| return isPassive; |
| } |
| |
| |
| static inline long int pubsub_sktHandler_getMsgSize(psa_skt_connection_entry_t *entry) { |
| // Note header message is already read |
| return (long int)entry->header.header.payloadPartSize + (long int)entry->header.header.metadataSize + (long int)entry->readFooterSize; |
| } |
| |
| static inline |
| bool pubsub_sktHandler_readHeader(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry, long int* msgSize) { |
| bool result = false; |
| size_t syncSize = 0; |
| size_t protocolHeaderBufferSize = 0; |
| // Get Sync Size |
| handle->protocol->getSyncHeaderSize(handle->protocol->handle, &syncSize); |
| // Get HeaderSize of the Protocol Header |
| handle->protocol->getHeaderSize(handle->protocol->handle, &entry->readHeaderSize); |
| // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints) |
| handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize); |
| |
| // Ensure capacity in header buffer |
| pubsub_sktHandler_ensureReadBufferCapacity(handle, entry); |
| |
| entry->readMsg.msg_name = &entry->readMsgAddr; |
| entry->readMsg.msg_namelen = entry->len; |
| entry->readMsg.msg_iovlen = 0; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->readHeaderBufferSize; |
| entry->readMsg.msg_iovlen++; |
| |
| // Read the message |
| long int nbytes = 0; |
| // Use peek flag to find sync word or when header is part of the payload |
| bool isUdp = (entry->socket_type == SOCK_DGRAM) ? true : false; |
| unsigned int flag = (entry->headerError || (!protocolHeaderBufferSize) || isUdp) ? MSG_PEEK : 0; |
| if (entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL | flag); |
| if (nbytes >= entry->readHeaderSize) { |
| if (handle->protocol->decodeHeader(handle->protocol->handle, |
| entry->readMsg.msg_iov[0].iov_base, |
| entry->readMsg.msg_iov[0].iov_len, |
| &entry->header) == CELIX_SUCCESS) { |
| // read header from queue, when recovered from headerError and when header is not part of the payload. (Because of MSG_PEEK) |
| if (entry->headerError && protocolHeaderBufferSize && entry->readHeaderSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL); |
| entry->headerError = false; |
| result = true; |
| } else { |
| // Did not receive correct header |
| // skip sync word and try to read next header |
| if (!entry->headerError) { |
| L_WARN("[SKT Handler] Failed to decode message header (fd: %d) (url: %s)", entry->fd, entry->url); |
| } |
| entry->headerError = true; |
| entry->readMsg.msg_iovlen = 0; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = syncSize; |
| entry->readMsg.msg_iovlen++; |
| // remove sync item from the queue |
| if (syncSize) nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL); |
| } |
| } |
| if (msgSize) *msgSize = nbytes; |
| return result; |
| } |
| |
| |
| static inline void pubsub_sktHandler_ensureReadBufferCapacity(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry) { |
| if (entry->readHeaderSize > entry->readHeaderBufferSize) { |
| free(entry->readHeaderBuffer); |
| entry->readHeaderBuffer = malloc((size_t) entry->readHeaderSize); |
| entry->readHeaderBufferSize = entry->readHeaderSize; |
| } |
| |
| if (entry->header.header.payloadSize > entry->bufferSize) { |
| free(entry->buffer); |
| entry->buffer = malloc((size_t)entry->header.header.payloadSize); |
| entry->bufferSize = entry->header.header.payloadSize; |
| } |
| |
| if (entry->header.header.metadataSize > entry->readMetaBufferSize) { |
| free(entry->readMetaBuffer); |
| entry->readMetaBuffer = malloc((size_t) entry->header.header.metadataSize); |
| entry->readMetaBufferSize = entry->header.header.metadataSize; |
| } |
| |
| if (entry->readFooterSize > entry->readFooterBufferSize) { |
| free(entry->readFooterBuffer); |
| entry->readFooterBuffer = malloc( (size_t) entry->readFooterSize); |
| entry->readFooterBufferSize = entry->readFooterSize; |
| } |
| } |
| |
| static inline |
| void pubsub_sktHandler_decodePayload(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *entry) { |
| |
| if (entry->header.header.payloadSize > 0) { |
| handle->protocol->decodePayload(handle->protocol->handle, entry->buffer, entry->header.header.payloadSize, &entry->header); |
| } |
| if (entry->header.header.metadataSize > 0) { |
| handle->protocol->decodeMetadata(handle->protocol->handle, entry->readMetaBuffer, |
| entry->header.header.metadataSize, &entry->header); |
| } |
| if (handle->processMessageCallback && entry->header.payload.payload != NULL && entry->header.payload.length) { |
| struct timespec receiveTime; |
| clock_gettime(CLOCK_REALTIME, &receiveTime); |
| bool releaseEntryBuffer = false; |
| handle->processMessageCallback(handle->processMessagePayload, &entry->header, &releaseEntryBuffer, &receiveTime); |
| if (releaseEntryBuffer) { |
| pubsub_sktHandler_releaseEntryBuffer(handle, entry->fd, 0); |
| } |
| } |
| celix_properties_destroy(entry->header.metadata.metadata); |
| entry->header.metadata.metadata = NULL; |
| } |
| |
| static inline |
| long int pubsub_sktHandler_readPayload(pubsub_sktHandler_t *handle, int fd, psa_skt_connection_entry_t *entry) { |
| entry->readMsg.msg_iovlen = 0; |
| handle->protocol->getFooterSize(handle->protocol->handle, &entry->readFooterSize); |
| |
| // from the header can be determined how large buffers should be. Even before receiving all data these buffers can be allocated |
| pubsub_sktHandler_ensureReadBufferCapacity(handle, entry); |
| |
| // Read UDP packet in one message |
| if (entry->readHeaderSize && (entry->socket_type == SOCK_DGRAM)) { |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readHeaderBuffer; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->readHeaderBufferSize; |
| entry->readMsg.msg_iovlen++; |
| } |
| if (entry->header.header.payloadPartSize) { |
| char* buffer = entry->buffer; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = &buffer[entry->header.header.payloadOffset]; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->header.header.payloadPartSize; |
| entry->readMsg.msg_iovlen++; |
| } |
| if (entry->header.header.metadataSize) { |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readMetaBuffer; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->header.header.metadataSize; |
| entry->readMsg.msg_iovlen++; |
| } |
| |
| if (entry->readFooterSize) { |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_base = entry->readFooterBuffer; |
| entry->readMsg.msg_iov[entry->readMsg.msg_iovlen].iov_len = entry->readFooterSize; |
| entry->readMsg.msg_iovlen++; |
| } |
| |
| long int nbytes = recvmsg(fd, &(entry->readMsg), MSG_NOSIGNAL | MSG_WAITALL); |
| if (nbytes >= pubsub_sktHandler_getMsgSize(entry)) { |
| bool valid = true; |
| if (entry->readFooterSize) { |
| if (handle->protocol->decodeFooter(handle->protocol->handle, entry->readFooterBuffer, entry->readFooterBufferSize, &entry->header) != CELIX_SUCCESS) { |
| // Did not receive correct footer |
| L_ERROR("[SKT Handler] Failed to decode message footer seq %d (received corrupt message, transmit buffer full?) (fd: %d) (url: %s)", entry->header.header.seqNr, entry->fd, entry->url); |
| valid = false; |
| } |
| } |
| if (!entry->header.header.isLastSegment) { |
| // Not last Segment of message |
| valid = false; |
| } |
| |
| if (entry->socket_type == SOCK_DGRAM && entry->readMsg.msg_name && !entry->dst_addr.sin_port) { |
| entry->dst_addr = entry->readMsgAddr; |
| psa_skt_connection_entry_t *connection_entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); |
| if (connection_entry != NULL) { |
| connection_entry->dst_addr = entry->readMsgAddr;; |
| } |
| } |
| |
| if (valid) { |
| // Complete message is received |
| pubsub_sktHandler_decodePayload(handle, entry); |
| } |
| } |
| return nbytes; |
| } |
| |
| // |
| // Reads data from the filedescriptor which has date (determined by epoll()) and stores it in the internal structure |
| // If the message is completely reassembled true is returned and the index and size have valid values |
| // |
| int pubsub_sktHandler_read(pubsub_sktHandler_t *handle, int fd) { |
| celixThreadRwlock_readLock(&handle->dbLock); |
| psa_skt_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) fd); |
| if (entry == NULL) { |
| entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); |
| } |
| // Find FD entry |
| if (entry == NULL) { |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return -1; |
| } |
| // If it's not connected return from function |
| if (!__atomic_load_n(&entry->connected, __ATOMIC_ACQUIRE)) { |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return -1; |
| } |
| long int nbytes = 0; |
| // if not yet enough bytes are received the header can not be read |
| if (pubsub_sktHandler_readHeader(handle, fd, entry, &nbytes)) { |
| nbytes = pubsub_sktHandler_readPayload(handle, fd, entry); |
| } |
| if (nbytes > 0) { |
| entry->retryCount = 0; |
| } else if (nbytes < 0) { |
| if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { |
| // Non blocking interrupt |
| entry->retryCount = 0; |
| } else if (entry->retryCount < handle->maxRcvRetryCount) { |
| entry->retryCount++; |
| L_WARN( |
| "[SKT Handler] Failed to receive message (fd: %d), try again. error(%d): %s, Retry count %u of %u.", |
| entry->fd, errno, strerror(errno), entry->retryCount, handle->maxSendRetryCount); |
| } else { |
| L_ERROR("[SKT Handler] Failed to receive message (fd: %d) after %u retries! Closing connection... Error: %s", |
| entry->fd, handle->maxRcvRetryCount, strerror(errno)); |
| nbytes = 0; //Return 0 as indicator to close the connection |
| } |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return (int)nbytes; |
| } |
| |
| int pubsub_sktHandler_addMessageHandler(pubsub_sktHandler_t *handle, void *payload, |
| pubsub_sktHandler_processMessage_callback_t processMessageCallback) { |
| int result = 0; |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->processMessageCallback = processMessageCallback; |
| handle->processMessagePayload = payload; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return result; |
| } |
| |
| int pubsub_sktHandler_addReceiverConnectionCallback(pubsub_sktHandler_t *handle, void *payload, |
| pubsub_sktHandler_receiverConnectMessage_callback_t connectMessageCallback, |
| pubsub_sktHandler_receiverConnectMessage_callback_t disconnectMessageCallback) { |
| int result = 0; |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| handle->receiverConnectMessageCallback = connectMessageCallback; |
| handle->receiverDisconnectMessageCallback = disconnectMessageCallback; |
| handle->receiverConnectPayload = payload; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return result; |
| } |
| |
| // |
| // Write large data to socket . |
| // |
| int pubsub_sktHandler_write(pubsub_sktHandler_t *handle, pubsub_protocol_message_t *message, struct iovec *msgIoVec, |
| size_t msg_iov_len, int flags) { |
| int result = 0; |
| if (handle == NULL) { |
| return -1; |
| } |
| int connFdCloseQueue[hashMap_size(handle->connection_fd_map)+1]; // +1 to ensure a size of 0 never occurs. |
| int nofConnToClose = 0; |
| if (handle) { |
| celixThreadRwlock_readLock(&handle->dbLock); |
| hash_map_iterator_t iter = hashMapIterator_construct(handle->connection_fd_map); |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_skt_connection_entry_t *entry = hashMapIterator_nextValue(&iter); |
| if (!__atomic_load_n(&entry->connected, __ATOMIC_ACQUIRE)) { |
| continue; |
| } |
| // When maxMsgSize is zero then payloadSize is disabled |
| if (entry->maxMsgSize == 0) { |
| // if max msg size is set to zero nothing will be send |
| continue; |
| } |
| celixThreadMutex_lock(&entry->writeMutex); |
| void *payloadData = NULL; |
| size_t payloadSize = 0; |
| if (msg_iov_len == 1) { |
| handle->protocol->encodePayload(handle->protocol->handle, message, &payloadData, &payloadSize); |
| } else { |
| for (size_t i = 0; i < msg_iov_len; i++) { |
| payloadSize += msgIoVec[i].iov_len; |
| } |
| } |
| |
| size_t protocolHeaderBufferSize = 0; |
| // Get HeaderBufferSize of the Protocol Header, when headerBufferSize == 0, the protocol header is included in the payload (needed for endpoints) |
| handle->protocol->getHeaderBufferSize(handle->protocol->handle, &protocolHeaderBufferSize); |
| size_t footerSize = 0; |
| // Get size of the Protocol Footer |
| handle->protocol->getFooterSize(handle->protocol->handle, &footerSize); |
| size_t max_msg_iov_len = IOV_MAX; // header , footer, padding |
| max_msg_iov_len -= (protocolHeaderBufferSize) ? 1 : 0; |
| max_msg_iov_len -= (footerSize) ? 1 : 0; |
| |
| // check if message is not too large |
| bool isMessageSegmentationSupported = false; |
| handle->protocol->isMessageSegmentationSupported(handle->protocol->handle, &isMessageSegmentationSupported); |
| if (!isMessageSegmentationSupported && (msg_iov_len > max_msg_iov_len || payloadSize > entry->maxMsgSize)) { |
| L_WARN("[SKT Handler] Failed to send message (fd: %d), Message segmentation is not supported\n", entry->fd); |
| celixThreadMutex_unlock(&entry->writeMutex); |
| continue; |
| } |
| |
| message->header.convertEndianess = 0; |
| message->header.payloadSize = payloadSize; |
| message->header.payloadPartSize = payloadSize; |
| message->header.payloadOffset = 0; |
| message->header.isLastSegment = 1; |
| |
| void *metadataData = NULL; |
| size_t metadataSize = 0; |
| if (message->metadata.metadata) { |
| metadataData = entry->writeMetaBuffer; |
| metadataSize = entry->writeMetaBufferSize; |
| // When maxMsgSize is smaller than meta data is disabled |
| if (metadataSize > entry->maxMsgSize) { |
| metadataSize = 0; |
| } |
| handle->protocol->encodeMetadata(handle->protocol->handle, message, &metadataData, &metadataSize); |
| } |
| message->header.metadataSize = metadataSize; |
| size_t totalMsgSize = payloadSize + metadataSize; |
| |
| size_t sendMsgSize = 0; |
| size_t msgPayloadOffset = 0; |
| size_t msgIovOffset = 0; |
| bool allPayloadAdded = (payloadSize == 0); |
| long int nbytes = LONG_MAX; |
| while (sendMsgSize < totalMsgSize && nbytes > 0) { |
| struct msghdr msg; |
| struct iovec msg_iov[IOV_MAX]; |
| memset(&msg, 0x00, sizeof(struct msghdr)); |
| msg.msg_name = &entry->dst_addr; |
| msg.msg_namelen = entry->len; |
| msg.msg_flags = flags; |
| msg.msg_iov = msg_iov; |
| |
| size_t msgPartSize = 0; |
| message->header.payloadPartSize = 0; |
| message->header.payloadOffset = 0; |
| message->header.metadataSize = 0; |
| message->header.isLastSegment = 0; |
| size_t maxMsgSize = entry->maxMsgSize - protocolHeaderBufferSize - footerSize; |
| |
| // reserve space for the header if required, header is added later when size of message is known (message can split in parts) |
| if (protocolHeaderBufferSize) { |
| msg.msg_iovlen++; |
| } |
| // Write generic seralized payload in vector buffer |
| if (!allPayloadAdded) { |
| if (payloadSize && payloadData && maxMsgSize) { |
| char *buffer = payloadData; |
| msg.msg_iov[msg.msg_iovlen].iov_base = &buffer[msgPayloadOffset]; |
| msg.msg_iov[msg.msg_iovlen].iov_len = MIN((payloadSize - msgPayloadOffset), maxMsgSize); |
| msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; |
| msg.msg_iovlen++; |
| |
| } else { |
| // copy serialized vector into vector buffer |
| size_t i; |
| for (i = msgIovOffset; i < MIN(msg_iov_len, msgIovOffset + max_msg_iov_len); i++) { |
| if ((msgPartSize + msgIoVec[i].iov_len) > maxMsgSize) { |
| break; |
| } |
| // When iov_len is zero, skip item and send next item |
| if (!msgIoVec[i].iov_len) { |
| msgIovOffset = ++i; |
| } |
| msg.msg_iov[msg.msg_iovlen].iov_base = msgIoVec[i].iov_base; |
| msg.msg_iov[msg.msg_iovlen].iov_len = msgIoVec[i].iov_len; |
| msgPartSize += msg.msg_iov[msg.msg_iovlen].iov_len; |
| msg.msg_iovlen++; |
| } |
| // if no entry could be added |
| if (i == msgIovOffset) { |
| // TODO element can be split in parts? |
| L_ERROR("[SKT Handler] vector io element is larger than max msg size"); |
| break; |
| } |
| msgIovOffset = i; |
| } |
| message->header.payloadPartSize = msgPartSize; |
| message->header.payloadOffset = msgPayloadOffset; |
| msgPayloadOffset += message->header.payloadPartSize; |
| sendMsgSize = msgPayloadOffset; |
| allPayloadAdded= msgPayloadOffset >= payloadSize; |
| } |
| |
| // Write optional metadata in vector buffer |
| if (allPayloadAdded && |
| (metadataSize != 0 && metadataData) && |
| (msgPartSize < maxMsgSize) && |
| (msg.msg_iovlen-1 < max_msg_iov_len)) { // header is already included |
| msg.msg_iov[msg.msg_iovlen].iov_base = metadataData; |
| msg.msg_iov[msg.msg_iovlen].iov_len = metadataSize; |
| msg.msg_iovlen++; |
| msgPartSize += metadataSize; |
| message->header.metadataSize = metadataSize; |
| sendMsgSize += metadataSize; |
| } |
| if (sendMsgSize >= totalMsgSize) { |
| message->header.isLastSegment = 0x1; |
| } |
| |
| void *headerData = NULL; |
| size_t headerSize = 0; |
| // Get HeaderSize of the Protocol Header |
| handle->protocol->getHeaderSize(handle->protocol->handle, &headerSize); |
| |
| // check if header is not part of the payload (=> headerBufferSize = 0) |
| if (protocolHeaderBufferSize) { |
| headerData = entry->writeHeaderBuffer; |
| // Encode the header, with payload size and metadata size |
| handle->protocol->encodeHeader(handle->protocol->handle, message, &headerData, &headerSize); |
| entry->writeHeaderBufferSize = MAX(headerSize, entry->writeHeaderBufferSize); |
| if (headerData && entry->writeHeaderBuffer != headerData) { |
| entry->writeHeaderBuffer = headerData; |
| } |
| if (headerSize && headerData) { |
| // Write header in 1st vector buffer item |
| msg.msg_iov[0].iov_base = headerData; |
| msg.msg_iov[0].iov_len = headerSize; |
| msgPartSize += msg.msg_iov[0].iov_len; |
| } else { |
| L_ERROR("[SKT Handler] No header buffer is generated"); |
| break; |
| } |
| } |
| |
| void *footerData = NULL; |
| // Write optional footerData in vector buffer |
| if (footerSize) { |
| footerData = entry->writeFooterBuffer; |
| handle->protocol->encodeFooter(handle->protocol->handle, message, &footerData, &footerSize); |
| if (footerData && entry->writeFooterBuffer != footerData) { |
| entry->writeFooterBuffer = footerData; |
| entry->writeFooterBufferSize = footerSize; |
| } |
| if (footerData) { |
| msg.msg_iov[msg.msg_iovlen].iov_base = footerData; |
| msg.msg_iov[msg.msg_iovlen].iov_len = footerSize; |
| msg.msg_iovlen++; |
| msgPartSize += footerSize; |
| } |
| } |
| nbytes = sendmsg(entry->fd, &msg, flags | MSG_NOSIGNAL); |
| |
| // When a specific socket keeps reporting errors can indicate a subscriber |
| // which is not active anymore, the connection will remain until the retry |
| // counter exceeds the maximum retry count. |
| // Btw, also, SIGSTOP issued by a debugging tool can result in EINTR error. |
| if (nbytes == -1) { |
| if (entry->retryCount < handle->maxSendRetryCount) { |
| entry->retryCount++; |
| L_ERROR( |
| "[SKT Handler] Failed to send message (fd: %d), try again. Retry count %u of %u, error(%d): %s.", |
| entry->fd, entry->retryCount, handle->maxSendRetryCount, errno, strerror(errno)); |
| } else { |
| L_ERROR( |
| "[SKT Handler] Failed to send message (fd: %d) after %u retries! Closing connection... Error: %s", entry->fd, handle->maxSendRetryCount, strerror(errno)); |
| connFdCloseQueue[nofConnToClose++] = entry->fd; |
| } |
| result = -1; //At least one connection failed sending |
| } else if (msgPartSize) { |
| entry->retryCount = 0; |
| if (nbytes != msgPartSize) { |
| L_ERROR("[SKT Handler] seq: %d MsgSize not correct: %d != %d (%s)\n", message->header.seqNr, msgPartSize, nbytes, strerror(errno)); |
| } |
| } |
| // Note: serialized Payload is deleted by serializer |
| if (payloadData && (payloadData != message->payload.payload)) { |
| free(payloadData); |
| } |
| if (metadataData && (metadataData != entry->writeMetaBuffer)) { |
| free(metadataData); |
| } |
| } |
| celixThreadMutex_unlock(&entry->writeMutex); |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| //Force close all connections that are queued in a list, done outside of locking handle->dbLock to prevent deadlock |
| for (int i = 0; i < nofConnToClose; i++) { |
| pubsub_sktHandler_close(handle, connFdCloseQueue[i]); |
| } |
| return result; |
| } |
| |
| // |
| // get interface URL |
| // |
| char *pubsub_sktHandler_get_interface_url(pubsub_sktHandler_t *handle) { |
| hash_map_iterator_t iter = |
| hashMapIterator_construct(handle->interface_url_map); |
| char *url = NULL; |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_skt_connection_entry_t *entry = |
| hashMapIterator_nextValue(&iter); |
| if (entry && entry->url) { |
| if (!url) { |
| url = celix_utils_strdup(entry->url); |
| } else { |
| char *tmp = url; |
| asprintf(&url, "%s %s", tmp, entry->url); |
| free(tmp); |
| } |
| } |
| } |
| return url; |
| } |
| |
| // |
| // get interface URL |
| // |
| char *pubsub_sktHandler_get_connection_url(pubsub_sktHandler_t *handle) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| hash_map_iterator_t iter = |
| hashMapIterator_construct(handle->connection_url_map); |
| char *url = NULL; |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_skt_connection_entry_t *entry = |
| hashMapIterator_nextValue(&iter); |
| if (entry && entry->url) { |
| if (!url) { |
| pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url); |
| url = celix_utils_strdup(url_info->interface_url ? url_info->interface_url : entry->url); |
| pubsub_utils_url_free(url_info); |
| } else { |
| char *tmp = url; |
| pubsub_utils_url_t *url_info = pubsub_utils_url_parse(entry->url); |
| asprintf(&url, "%s %s", tmp, url_info->interface_url ? url_info->interface_url : entry->url); |
| pubsub_utils_url_free(url_info); |
| free(tmp); |
| } |
| } |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return url; |
| } |
| |
| // |
| // get interface URL |
| // |
| void pubsub_sktHandler_get_connection_urls(pubsub_sktHandler_t *handle, celix_array_list_t *urls) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| hash_map_iterator_t iter = |
| hashMapIterator_construct(handle->connection_url_map); |
| char *url = NULL; |
| while (hashMapIterator_hasNext(&iter)) { |
| psa_skt_connection_entry_t *entry = |
| hashMapIterator_nextValue(&iter); |
| if (entry && entry->url) { |
| asprintf(&url, "%s", entry->url); |
| celix_arrayList_add(urls, url); |
| free(url); |
| url = NULL; |
| } |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| |
| |
| |
| // |
| // Handle non-blocking accept (sender) |
| // |
| static inline |
| int pubsub_sktHandler_acceptHandler(pubsub_sktHandler_t *handle, psa_skt_connection_entry_t *pendingConnectionEntry) { |
| celixThreadRwlock_writeLock(&handle->dbLock); |
| // new connection available |
| struct sockaddr_in their_addr; |
| socklen_t len = sizeof(struct sockaddr_in); |
| int fd = accept(pendingConnectionEntry->fd, (struct sockaddr*)&their_addr, &len); |
| int rc = fd; |
| if (rc == -1) { |
| L_ERROR("[TCP SKT Handler] accept failed: %s\n", strerror(errno)); |
| } |
| if (rc >= 0) { |
| // handle new connection: |
| struct sockaddr_in sin; |
| getsockname(pendingConnectionEntry->fd, (struct sockaddr *) &sin, &len); |
| char *interface_url = pubsub_utils_url_get_url(&sin, NULL); |
| char *url = pubsub_utils_url_get_url(&their_addr, NULL); |
| psa_skt_connection_entry_t *entry = pubsub_sktHandler_createEntry(handle, fd, url, interface_url, &their_addr); |
| #if defined(__APPLE__) |
| struct kevent ev; |
| EV_SET (&ev, entry->fd, EVFILT_READ, EV_ADD | EV_ENABLE , 0, 0, 0); |
| rc = kevent (handle->efd, &ev, 1, NULL, 0, NULL); |
| #else |
| struct epoll_event event; |
| bzero(&event, sizeof(event)); // zero the struct |
| event.events = EPOLLRDHUP | EPOLLERR; |
| if (handle->enableReceiveEvent) event.events |= EPOLLIN; |
| event.data.fd = entry->fd; |
| // Register Read to epoll |
| rc = epoll_ctl(handle->efd, EPOLL_CTL_ADD, entry->fd, &event); |
| #endif |
| if (rc < 0) { |
| pubsub_sktHandler_freeEntry(entry); |
| free(entry); |
| L_ERROR("[TCP SKT Handler] Cannot create epoll\n"); |
| } else { |
| // Call Accept Connection callback |
| if (handle->acceptConnectMessageCallback) |
| handle->acceptConnectMessageCallback(handle->acceptConnectPayload, url); |
| hashMap_put(handle->connection_fd_map, (void *) (intptr_t) entry->fd, entry); |
| hashMap_put(handle->connection_url_map, entry->url, entry); |
| L_INFO("[TCP SKT Handler] New connection to url: %s: \n", url); |
| } |
| free(url); |
| free(interface_url); |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| return fd; |
| } |
| |
| // |
| // Handle sockets connection (sender) |
| // |
| static inline |
| void pubsub_sktHandler_connectionHandler(pubsub_sktHandler_t *handle, int fd) { |
| celixThreadRwlock_readLock(&handle->dbLock); |
| psa_skt_connection_entry_t *entry = hashMap_get(handle->connection_fd_map, (void *) (intptr_t) fd); |
| if (entry) |
| if (!__atomic_exchange_n(&entry->connected, true, __ATOMIC_ACQ_REL)) { |
| // tell sender that an receiver is connected |
| if (handle->receiverConnectMessageCallback) |
| handle->receiverConnectMessageCallback(handle->receiverConnectPayload, entry->url, false); |
| } |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } |
| |
| #if defined(__APPLE__) |
| // |
| // The main socket event loop |
| // |
| static inline |
| void pubsub_sktHandler_handler(pubsub_sktHandler_t *handle) { |
| int rc = 0; |
| if (handle->efd >= 0) { |
| int nof_events = 0; |
| // Wait for events. |
| struct kevent events[MAX_EVENTS]; |
| struct timespec ts = {handle->timeout / 1000, (handle->timeout % 1000) * 1000000}; |
| nof_events = kevent(handle->efd, NULL, 0, &events[0], MAX_EVENTS, handle->timeout ? &ts : NULL); |
| if (nof_events < 0) { |
| if ((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) { |
| } else |
| L_ERROR("[SKT Handler] Cannot create poll wait (%d) %s\n", nof_events, strerror(errno)); |
| } |
| for (int i = 0; i < nof_events; i++) { |
| psa_skt_connection_entry_t |
| *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) events[i].ident); |
| if (entry && (entry->socket_type == SOCK_STREAM)) { |
| int fd = pubsub_sktHandler_acceptHandler(handle, entry); |
| pubsub_sktHandler_connectionHandler(handle, fd); |
| } else if (events[i].filter & EVFILT_READ) { |
| rc = pubsub_sktHandler_read(handle, events[i].ident); |
| if (rc == 0) |
| pubsub_sktHandler_close(handle, events[i].ident); |
| } else if (events[i].flags & EV_EOF) { |
| int err = 0; |
| socklen_t len = sizeof(int); |
| rc = getsockopt(events[i].ident, SOL_SOCKET, SO_ERROR, &err, &len); |
| if (rc != 0) { |
| L_ERROR("[SKT Handler]:EV_EOF ERROR read from socket %s\n", strerror(errno)); |
| continue; |
| } |
| pubsub_sktHandler_close(handle, events[i].ident); |
| } else if (events[i].flags & EV_ERROR) { |
| L_ERROR("[SKT Handler]:EV_ERROR ERROR read from socket %s\n", strerror(errno)); |
| pubsub_sktHandler_close(handle, events[i].ident); |
| continue; |
| } |
| } |
| } |
| return; |
| } |
| |
| #else |
| |
| // |
| // The main socket event loop |
| // |
| static inline |
| void pubsub_sktHandler_handler(pubsub_sktHandler_t *handle) { |
| int rc = 0; |
| if (handle->efd >= 0) { |
| int nof_events = 0; |
| struct epoll_event events[MAX_EVENTS]; |
| nof_events = epoll_wait(handle->efd, events, MAX_EVENTS, (int)handle->timeout); |
| if ((nof_events < 0) && (!((errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)))) |
| L_ERROR("[SKT Socket] Cannot create epoll wait (%d) %s\n", nof_events, strerror(errno)); |
| for (int i = 0; i < nof_events; i++) { |
| psa_skt_connection_entry_t *entry = hashMap_get(handle->interface_fd_map, (void *) (intptr_t) events[i].data.fd ); |
| if (entry && (entry->socket_type == SOCK_STREAM)) { |
| int fd = pubsub_sktHandler_acceptHandler(handle, entry); |
| pubsub_sktHandler_connectionHandler(handle, fd); |
| } else if (events[i].events & EPOLLIN) { |
| rc = pubsub_sktHandler_read(handle, events[i].data.fd); |
| if (rc == 0) pubsub_sktHandler_close(handle, events[i].data.fd); |
| } else if (events[i].events & EPOLLRDHUP) { |
| int err = 0; |
| socklen_t len = sizeof(int); |
| rc = getsockopt(events[i].data.fd, SOL_SOCKET, SO_ERROR, &err, &len); |
| if (rc != 0) { |
| L_ERROR("[SKT Handler]:EPOLLRDHUP ERROR read from socket %s\n", strerror(errno)); |
| continue; |
| } |
| pubsub_sktHandler_close(handle, events[i].data.fd); |
| } else if (events[i].events & EPOLLERR) { |
| L_ERROR("[SKT Handler]:EPOLLERR ERROR read from socket %s\n", strerror(errno)); |
| pubsub_sktHandler_close(handle, events[i].data.fd); |
| continue; |
| } |
| } |
| } |
| } |
| #endif |
| |
| // |
| // The socket thread |
| // |
| static void *pubsub_sktHandler_thread(void *data) { |
| pubsub_sktHandler_t *handle = data; |
| celixThreadRwlock_readLock(&handle->dbLock); |
| bool running = handle->running; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| |
| while (running) { |
| pubsub_sktHandler_handler(handle); |
| celixThreadRwlock_readLock(&handle->dbLock); |
| running = handle->running; |
| celixThreadRwlock_unlock(&handle->dbLock); |
| } // while |
| return NULL; |
| } |