Merge pull request #350 from apache/feature/use_async_api_for_rsa
Feature/use async api for rsa
diff --git a/bundles/cxx_remote_services/CMakeLists.txt b/bundles/cxx_remote_services/CMakeLists.txt
index d768861..f09ae80 100644
--- a/bundles/cxx_remote_services/CMakeLists.txt
+++ b/bundles/cxx_remote_services/CMakeLists.txt
@@ -21,7 +21,14 @@
add_subdirectory(rsa_spi)
add_subdirectory(admin)
add_subdirectory(discovery_configured)
- add_subdirectory(integration)
+
+ find_package(ZMQ)
+ find_package(CZMQ)
+ if (ZMQ_FOUND AND CZMQ_FOUND)
+ add_subdirectory(integration)
+ else ()
+ message(STATUS "C++ RSA integration test will not be build, because they require zmq and czmq")
+ endif ()
#NOTE the topology manager is not yet used. The discovery and RSA need to be refactor for this
#add_subdirectory(topology_manager)
diff --git a/bundles/cxx_remote_services/admin/CMakeLists.txt b/bundles/cxx_remote_services/admin/CMakeLists.txt
index 4a6dac3..24b77ab 100644
--- a/bundles/cxx_remote_services/admin/CMakeLists.txt
+++ b/bundles/cxx_remote_services/admin/CMakeLists.txt
@@ -23,8 +23,8 @@
FILENAME "Celix_RemoteServiceAdmin"
SOURCES
src/RemoteServiceAdmin.cc
+ src/RemoteServiceAdminActivator.cc
)
-target_include_directories(RemoteServiceAdmin PRIVATE include)
target_link_libraries(RemoteServiceAdmin PRIVATE
Celix::rsa_spi
Celix::framework
diff --git a/bundles/cxx_remote_services/admin/gtest/src/RemoteServiceAdminTestSuite.cc b/bundles/cxx_remote_services/admin/gtest/src/RemoteServiceAdminTestSuite.cc
index 3ffdd18..d9b00cf 100644
--- a/bundles/cxx_remote_services/admin/gtest/src/RemoteServiceAdminTestSuite.cc
+++ b/bundles/cxx_remote_services/admin/gtest/src/RemoteServiceAdminTestSuite.cc
@@ -87,10 +87,10 @@
std::string cmpUUID{};
};
-class StubExportServiceGuard : public celix::rsa::IExportServiceGuard {
+class StubExportRegistration : public celix::rsa::IExportRegistration {
public:
- explicit StubExportServiceGuard(std::weak_ptr<StubExportedServiceEntry> _entry) : entry{std::move(_entry)} {}
- ~StubExportServiceGuard() noexcept override {
+ explicit StubExportRegistration(std::weak_ptr<StubExportedServiceEntry> _entry) : entry{std::move(_entry)} {}
+ ~StubExportRegistration() noexcept override {
auto e = entry.lock();
if (e) {
e->close();
@@ -101,28 +101,45 @@
const std::weak_ptr<StubExportedServiceEntry> entry;
};
-class StubExportServiceFactory : public celix::rsa::IExportServiceFactory {
-public:
- explicit StubExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)} {}
-
- std::unique_ptr<celix::rsa::IExportServiceGuard> exportService(const celix::Properties& /*serviceProperties*/) override {
- auto entry = std::make_shared<StubExportedServiceEntry>(ctx);
- std::lock_guard<std::mutex> lock{mutex};
- entries.emplace_back(entry);
- return std::make_unique<StubExportServiceGuard>(entry);
- }
-
-private:
- const std::shared_ptr<celix::BundleContext> ctx;
- std::mutex mutex{};
- std::vector<std::shared_ptr<StubExportedServiceEntry>> entries{};
-};
-
class IDummyService {
public:
virtual ~IDummyService() noexcept = default;
};
+class StubExportServiceFactory : public celix::rsa::IExportServiceFactory {
+public:
+ explicit StubExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)} {}
+ ~StubExportServiceFactory() noexcept override = default;
+
+ std::unique_ptr<celix::rsa::IExportRegistration> exportService(const celix::Properties& /*serviceProperties*/) override {
+ auto entry = std::make_shared<StubExportedServiceEntry>(ctx);
+ std::lock_guard<std::mutex> lock{mutex};
+ entries.emplace_back(entry);
+ return std::make_unique<StubExportRegistration>(entry);
+ }
+
+
+ [[nodiscard]] const std::string &getRemoteServiceType() const override {
+ return serviceType;
+ }
+
+ [[nodiscard]] const std::vector<std::string> &getSupportedIntents() const override {
+ return intents;
+ }
+
+ [[nodiscard]] const std::vector<std::string> &getSupportedConfigs() const override {
+ return configs;
+ }
+
+private:
+ const std::string serviceType = celix::typeName<IDummyService>();
+ const std::vector<std::string> configs = {"test"};
+ const std::vector<std::string> intents = {"osgi.basic"};
+ const std::shared_ptr<celix::BundleContext> ctx;
+ std::mutex mutex{};
+ std::vector<std::shared_ptr<StubExportedServiceEntry>> entries{};
+};
+
class DummyServiceImpl : public IDummyService {
public:
~DummyServiceImpl() noexcept override = default;
@@ -141,7 +158,7 @@
EXPECT_EQ(0, count);
auto reg1 = ctx->registerService<celix::rsa::IExportServiceFactory>(std::make_shared<StubExportServiceFactory>(ctx))
- .addProperty(celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME, celix::typeName<IDummyService>())
+ .addProperty(celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE, celix::typeName<IDummyService>())
.build();
count = ctx->useService<celix::rsa::IExportedService>()
@@ -190,7 +207,7 @@
EXPECT_EQ(0, count);
auto reg2 = ctx->registerService<celix::rsa::IExportServiceFactory>(std::make_shared<StubExportServiceFactory>(ctx))
- .addProperty(celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME, celix::typeName<IDummyService>())
+ .addProperty(celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE, celix::typeName<IDummyService>())
.build();
//rsa called export service factory which created a IExportServiceRegistration, which register the marker interface IExportedService indicating an exported service
@@ -199,7 +216,38 @@
EXPECT_EQ(1, count);
}
+TEST_F(RemoteServiceAdminTestSuite, exportServiceUsingConfig) {
+ auto bndId = ctx->installBundle(REMOTE_SERVICE_ADMIN_BUNDLE_LOCATION);
+ EXPECT_GE(bndId, 0);
+ auto factoryRegistration = ctx->registerService<celix::rsa::IExportServiceFactory>(std::make_shared<StubExportServiceFactory>(ctx))
+ .addProperty(celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE, celix::typeName<IDummyService>())
+ .build();
+
+ /**
+ * When I add a export service factory and register a service with exported interfaces and a mismatched
+ * service.export.configs this will no be exported.
+ */
+ auto reg = ctx->registerService<IDummyService>(std::make_shared<DummyServiceImpl>())
+ .addProperty(celix::rsa::SERVICE_EXPORTED_INTERFACES, "*")
+ .addProperty(celix::rsa::SERVICE_IMPORTED_CONFIGS, "non-existing-config")
+ .build();
+ auto count = ctx->useService<celix::rsa::IExportedService>()
+ .build();
+ EXPECT_EQ(0, count);
+
+ /**
+ * When I add a export service factory and register a service with exported interfaces and a correct
+ * service.export.configs the service will be exported
+ */
+ reg = ctx->registerService<IDummyService>(std::make_shared<DummyServiceImpl>())
+ .addProperty(celix::rsa::SERVICE_EXPORTED_INTERFACES, "*")
+ .addProperty(celix::rsa::SERVICE_IMPORTED_CONFIGS, "test")
+ .build();
+ count = ctx->useService<celix::rsa::IExportedService>()
+ .build();
+ EXPECT_EQ(1, count);
+}
/**
@@ -237,10 +285,10 @@
std::string cmpUUID{};
};
-class StubImportServiceGuard : public celix::rsa::IImportServiceGuard {
+class StubImportRegistration : public celix::rsa::IImportRegistration {
public:
- explicit StubImportServiceGuard(std::weak_ptr<StubImportedServiceEntry> _entry) : entry{std::move(_entry)} {}
- ~StubImportServiceGuard() noexcept override {
+ explicit StubImportRegistration(std::weak_ptr<StubImportedServiceEntry> _entry) : entry{std::move(_entry)} {}
+ ~StubImportRegistration() noexcept override {
auto e = entry.lock();
if (e) {
e->close();
@@ -254,19 +302,30 @@
class StubImportServiceFactory : public celix::rsa::IImportServiceFactory {
public:
explicit StubImportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)} {}
+ ~StubImportServiceFactory() noexcept override = default;
- virtual std::unique_ptr<celix::rsa::IImportServiceGuard> importService(const celix::rsa::EndpointDescription& endpoint) {
+ [[nodiscard]] std::unique_ptr<celix::rsa::IImportRegistration> importService(const celix::rsa::EndpointDescription& endpoint) override {
if (endpoint.getInterface() == celix::typeName<IDummyService>()) {
std::lock_guard<std::mutex> lock{mutex};
auto entry = std::make_shared<StubImportedServiceEntry>(ctx);
entries.emplace_back(entry);
- return std::make_unique<StubImportServiceGuard>(entry);
+ return std::make_unique<StubImportRegistration>(entry);
} else {
return {};
}
}
+ [[nodiscard]] const std::string &getRemoteServiceType() const override {
+ return serviceType;
+ }
+
+ [[nodiscard]] const std::vector<std::string> &getSupportedConfigs() const override {
+ return configs;
+ }
+
private:
+ const std::string serviceType{celix::typeName<IDummyService>()};
+ const std::vector<std::string> configs{"test"};
const std::shared_ptr<celix::BundleContext> ctx;
std::mutex mutex{};
std::vector<std::shared_ptr<StubImportedServiceEntry>> entries{};
@@ -285,7 +344,7 @@
EXPECT_EQ(0, count);
auto reg1 = ctx->registerService<celix::rsa::IImportServiceFactory>(std::make_shared<StubImportServiceFactory>(ctx))
- .addProperty(celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME, celix::typeName<IDummyService>())
+ .addProperty(celix::rsa::IImportServiceFactory::REMOTE_SERVICE_TYPE, celix::typeName<IDummyService>())
.build();
count = ctx->useService<IDummyService>()
@@ -294,7 +353,8 @@
auto endpoint = std::make_shared<celix::rsa::EndpointDescription>(celix::Properties{
{celix::rsa::ENDPOINT_ID, "endpoint-id-1"},
- {celix::SERVICE_NAME,celix::typeName<IDummyService>()}});
+ {celix::SERVICE_NAME,celix::typeName<IDummyService>()},
+ {celix::rsa::SERVICE_IMPORTED_CONFIGS, "test"}});
auto reg2 = ctx->registerService<celix::rsa::EndpointDescription>(std::move(endpoint))
.build();
diff --git a/bundles/cxx_remote_services/admin/src/RemoteServiceAdmin.cc b/bundles/cxx_remote_services/admin/src/RemoteServiceAdmin.cc
index 207feec..116820c 100644
--- a/bundles/cxx_remote_services/admin/src/RemoteServiceAdmin.cc
+++ b/bundles/cxx_remote_services/admin/src/RemoteServiceAdmin.cc
@@ -19,21 +19,20 @@
#include "RemoteServiceAdmin.h"
#include "celix/BundleContext.h"
-#include "celix/BundleActivator.h"
#include "celix/rsa/RemoteConstants.h"
-#include "celix/rsa/IImportServiceFactory.h"
-#include "celix/rsa/IExportServiceFactory.h"
+#define L_TRACE(...) \
+ logHelper.trace(__VA_ARGS__);
#define L_DEBUG(...) \
- _logHelper.debug(__VA_ARGS__);
+ logHelper.debug(__VA_ARGS__);
#define L_INFO(...) \
_logHelper.info(__VA_ARGS__);
#define L_WARN(...) \
- _logHelper.warning(__VA_ARGS__);
+ logHelper.warning(__VA_ARGS__);
#define L_ERROR(...) \
- _logHelper.error(__VA_ARGS__);
+ logHelper.error(__VA_ARGS__);
-celix::rsa::RemoteServiceAdmin::RemoteServiceAdmin(celix::LogHelper logHelper) : _logHelper{std::move(logHelper)} {}
+celix::rsa::RemoteServiceAdmin::RemoteServiceAdmin(celix::LogHelper logHelper) : logHelper{std::move(logHelper)} {}
void celix::rsa::RemoteServiceAdmin::addEndpoint(const std::shared_ptr<celix::rsa::EndpointDescription>& endpoint) {
assert(endpoint);
@@ -50,8 +49,8 @@
return;
}
- std::lock_guard l(_m);
- _toBeImportedServices.emplace_back(endpoint);
+ std::lock_guard l(mutex);
+ toBeImportedServices.emplace_back(endpoint);
createImportServices();
}
@@ -64,18 +63,18 @@
return;
}
- std::shared_ptr<celix::rsa::IImportServiceGuard> tmpStore{}; //to ensure destruction outside of lock
+ std::shared_ptr<celix::rsa::IImportRegistration> tmpStore{}; //to ensure destruction outside of lock
{
- std::lock_guard l(_m);
+ std::lock_guard l(mutex);
- _toBeImportedServices.erase(std::remove_if(_toBeImportedServices.begin(), _toBeImportedServices.end(), [&id](auto const &endpoint){
+ toBeImportedServices.erase(std::remove_if(toBeImportedServices.begin(), toBeImportedServices.end(), [&id](auto const &endpoint){
return id == endpoint->getId();
- }), _toBeImportedServices.end());
+ }), toBeImportedServices.end());
- auto it = _importedServices.find(id);
- if (it != _importedServices.end()) {
+ auto it = importedServices.find(id);
+ if (it != importedServices.end()) {
tmpStore = std::move(it->second);
- _importedServices.erase(it);
+ importedServices.erase(it);
}
}
}
@@ -83,21 +82,21 @@
void celix::rsa::RemoteServiceAdmin::addImportedServiceFactory(
const std::shared_ptr<celix::rsa::IImportServiceFactory> &factory,
const std::shared_ptr<const celix::Properties> &properties) {
- auto targetServiceName = properties->get(celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME);
+ auto targetServiceName = properties->get(celix::rsa::IImportServiceFactory::REMOTE_SERVICE_TYPE);
if (targetServiceName.empty()) {
- L_WARN("Adding service factory but missing %s property", celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME);
+ L_WARN("Adding service factory but missing %s property", celix::rsa::IImportServiceFactory::REMOTE_SERVICE_TYPE);
return;
}
- std::lock_guard l(_m);
- auto existingFactory = _importServiceFactories.find(targetServiceName);
- if (existingFactory != end(_importServiceFactories)) {
+ std::lock_guard l(mutex);
+ auto existingFactory = importServiceFactories.find(targetServiceName);
+ if (existingFactory != end(importServiceFactories)) {
L_WARN("Adding imported factory but factory already exists");
return;
}
- _importServiceFactories.emplace(targetServiceName, factory);
+ importServiceFactories.emplace(targetServiceName, factory);
createImportServices();
}
@@ -105,14 +104,14 @@
void celix::rsa::RemoteServiceAdmin::removeImportedServiceFactory(
const std::shared_ptr<celix::rsa::IImportServiceFactory> &/*factory*/,
const std::shared_ptr<const celix::Properties> &properties) {
- auto targetServiceName = properties->get(celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME);
+ auto targetServiceName = properties->get(celix::rsa::IImportServiceFactory::REMOTE_SERVICE_TYPE);
if (targetServiceName.empty()) {
- L_WARN("Removing service factory but missing %s property", celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME);
+ L_WARN("Removing service factory but missing %s property", celix::rsa::IImportServiceFactory::REMOTE_SERVICE_TYPE);
return;
}
- std::lock_guard l(_m);
- _importServiceFactories.erase(targetServiceName);
+ std::lock_guard l(mutex);
+ importServiceFactories.erase(targetServiceName);
//TODO remove imported services from this factory ??needed
}
@@ -120,28 +119,28 @@
void celix::rsa::RemoteServiceAdmin::addExportedServiceFactory(
const std::shared_ptr<celix::rsa::IExportServiceFactory> &factory,
const std::shared_ptr<const celix::Properties> &properties) {
- auto targetServiceName = properties->get(celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME);
+ auto targetServiceName = properties->get(celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE);
if (targetServiceName.empty()) {
- L_WARN("Adding service factory but missing %s property", celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME);
+ L_WARN("Adding service factory but missing %s property", celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE);
return;
}
- std::lock_guard<std::mutex> lock{_m};
- _exportServiceFactories.emplace(targetServiceName, factory);
+ std::lock_guard<std::mutex> lock{mutex};
+ exportServiceFactories.emplace(targetServiceName, factory);
createExportServices();
}
void celix::rsa::RemoteServiceAdmin::removeExportedServiceFactory(
const std::shared_ptr<celix::rsa::IExportServiceFactory> &/*factory*/,
const std::shared_ptr<const celix::Properties> &properties) {
- auto targetServiceName = properties->get(celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME);
+ auto targetServiceName = properties->get(celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE);
if (targetServiceName.empty()) {
- L_WARN("Removing service factory but missing %s property", celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME);
+ L_WARN("Removing service factory but missing %s property", celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE);
return;
}
- std::lock_guard l(_m);
- _exportServiceFactories.erase(targetServiceName);
+ std::lock_guard l(mutex);
+ exportServiceFactories.erase(targetServiceName);
//TODO remove exported services from this factory ??needed
}
@@ -153,8 +152,8 @@
return;
}
- std::lock_guard<std::mutex> lock{_m};
- _toBeExportedServices.emplace_back(props);
+ std::lock_guard<std::mutex> lock{mutex};
+ toBeExportedServices.emplace_back(props);
createExportServices();
}
@@ -165,94 +164,125 @@
return;
}
- std::lock_guard l(_m);
+ std::lock_guard l(mutex);
- auto instanceIt = _exportedServices.find(svcId);
- if (instanceIt != end(_exportedServices)) {
- _exportedServices.erase(instanceIt);
+ auto instanceIt = exportedServices.find(svcId);
+ if (instanceIt != end(exportedServices)) {
+ exportedServices.erase(instanceIt);
}
//remove to be exported endpoint (if present)
- for (auto it = _toBeExportedServices.begin(); it != _toBeExportedServices.end(); ++it) {
+ for (auto it = toBeExportedServices.begin(); it != toBeExportedServices.end(); ++it) {
if ((*it)->getAsLong(celix::SERVICE_ID, -1) == svcId) {
- _toBeExportedServices.erase(it);
+ toBeExportedServices.erase(it);
break;
}
}
}
-void celix::rsa::RemoteServiceAdmin::createImportServices() {
- auto it = _toBeImportedServices.begin();
- while (it != _toBeImportedServices.end()) {
- auto interface = (*it)->getInterface();
- auto existingFactory = _importServiceFactories.find(interface);
- if (existingFactory == end(_importServiceFactories)) {
- L_DEBUG("Adding endpoint to be imported but no factory available yet, delaying import");
- it++;
- continue;
+bool celix::rsa::RemoteServiceAdmin::isEndpointMatch(const celix::rsa::EndpointDescription& endpoint, const celix::rsa::IImportServiceFactory& factory) const {
+ if (factory.getSupportedConfigs().empty()) {
+ L_WARN("Matching endpoint with a import service factory with no supported configs, this will always fail");
+ return false;
+ }
+ if (endpoint.getConfigurationTypes().empty()) {
+ L_WARN("Matching endpoint with empty configuration types (%s), this will always fail", celix::rsa::SERVICE_IMPORTED_CONFIGS);
+ return false;
+ }
+ size_t matchCount = 0;
+ for (const auto& config : endpoint.getConfigurationTypes()) {
+ for (const auto& factoryConfig : factory.getSupportedConfigs()) {
+ if (config == factoryConfig) {
+ ++matchCount;
+ break;
+ }
}
- auto endpointId = (*it)->getId();
- L_DEBUG("Adding endpoint, created service");
- _importedServices.emplace(endpointId, existingFactory->second->importService(**it));
- it = _toBeImportedServices.erase(it);
+ }
+ return matchCount == factory.getSupportedConfigs().size(); //note must match all requested configurations
+}
+
+bool celix::rsa::RemoteServiceAdmin::isExportServiceMatch(const celix::Properties& svcProperties, const celix::rsa::IExportServiceFactory& factory) const {
+ auto providedConfigs = celix::split(svcProperties.get(celix::rsa::SERVICE_IMPORTED_CONFIGS));
+ auto providedIntents = celix::split(svcProperties.get(celix::rsa::SERVICE_EXPORTED_INTENTS, "osgi.basic"));
+ if (providedConfigs.empty() && providedIntents.empty()) {
+ //note cannot match export service with no config or intent
+ return false;
+ }
+ if (factory.getSupportedIntents().empty() && factory.getSupportedConfigs().empty()) {
+ L_WARN("Matching service marked for export with a export service factory with no supported intents or configs, this will always fail");
+ return false;
+ }
+ if (!providedConfigs.empty()) {
+ size_t matchCount = 0;
+ for (const auto& config : providedConfigs) {
+ for (const auto& factoryConfig : factory.getSupportedConfigs()) {
+ if (config == factoryConfig) {
+ ++matchCount;
+ break;
+ }
+ }
+ }
+ return matchCount == factory.getSupportedConfigs().size(); //note must match all requested configurations
+ } else /*match on intent*/ {
+ for (const auto& intent: providedIntents) {
+ for (const auto& factoryIntent: factory.getSupportedIntents()) {
+ if (intent == factoryIntent) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
+
+void celix::rsa::RemoteServiceAdmin::createImportServices() {
+ //precondition mutex taken
+ auto it = toBeImportedServices.begin();
+ while (it != toBeImportedServices.end()) {
+ auto interface = (*it)->getInterface();
+ bool match = false;
+ for (auto factoryIt = importServiceFactories.lower_bound(interface); factoryIt != importServiceFactories.end() && factoryIt->first == interface; ++factoryIt) {
+ if (isEndpointMatch(**it, *factoryIt->second)) {
+ match = true;
+ auto endpointId = (*it)->getId();
+ L_DEBUG("Adding endpoint %s, created export service for %s", endpointId.c_str(), (*it)->getInterface().c_str());
+ importedServices.emplace(endpointId, factoryIt->second->importService(**it));
+ break;
+ } else {
+ L_TRACE("Config mismatch.");
+ }
+ }
+ if (match) {
+ it = toBeImportedServices.erase(it);
+ } else {
+ L_DEBUG("Adding endpoint to be imported but no matching factory available yet, delaying import");
+ ++it;
+ }
}
}
void celix::rsa::RemoteServiceAdmin::createExportServices() {
- auto it = _toBeExportedServices.begin();
- while (it != _toBeExportedServices.end()) {
+ //precondition mutex taken
+ auto it = toBeExportedServices.begin();
+ while (it != toBeExportedServices.end()) {
const auto& svcProperties = **it;
auto serviceName = svcProperties.get(celix::SERVICE_NAME, "");
auto svcId = svcProperties.getAsLong(celix::SERVICE_ID, -1);
- if (serviceName.empty()) {
- L_WARN("Adding service to be exported but missing objectclass for svc id %li", svcId);
- it++;
- continue;
+ bool match = false;
+ for (auto factoryIt = exportServiceFactories.lower_bound(serviceName); factoryIt != exportServiceFactories.end() && factoryIt->first == serviceName; ++factoryIt) {
+ if (isExportServiceMatch(svcProperties, *factoryIt->second)) {
+ match = true;
+ exportedServices.emplace(svcId, factoryIt->second->exportService(svcProperties));
+ break;
+ } else {
+ L_TRACE("Intent mismatch");
+ }
}
- auto factory = _exportServiceFactories.find(serviceName);
- if (factory == end(_exportServiceFactories)) {
- L_DEBUG("Adding service to be exported but no factory available yet, delaying creation");
- it++;
- continue;
+ if (match) {
+ it = toBeExportedServices.erase(it);
+ } else {
+ L_DEBUG("Adding endpoint to be imported but no matching factory available yet, delaying import");
+ ++it;
}
- _exportedServices.emplace(svcId, factory->second->exportService(svcProperties));
- it = _toBeExportedServices.erase(it);
}
-}
-
-class AdminActivator {
-public:
- explicit AdminActivator(const std::shared_ptr<celix::BundleContext>& ctx) {
- auto admin = std::make_shared<celix::rsa::RemoteServiceAdmin>(celix::LogHelper{ctx, celix::typeName<celix::rsa::RemoteServiceAdmin>()});
-
- auto& cmp = ctx->getDependencyManager()->createComponent(admin);
- cmp.createServiceDependency<celix::rsa::EndpointDescription>()
- .setRequired(false)
- .setStrategy(celix::dm::DependencyUpdateStrategy::locking)
- .setCallbacks(&celix::rsa::RemoteServiceAdmin::addEndpoint, &celix::rsa::RemoteServiceAdmin::removeEndpoint);
- cmp.createServiceDependency<celix::rsa::IImportServiceFactory>()
- .setRequired(false)
- .setStrategy(celix::dm::DependencyUpdateStrategy::locking)
- .setCallbacks(&celix::rsa::RemoteServiceAdmin::addImportedServiceFactory, &celix::rsa::RemoteServiceAdmin::removeImportedServiceFactory);
- cmp.createServiceDependency<celix::rsa::IExportServiceFactory>()
- .setRequired(false)
- .setStrategy(celix::dm::DependencyUpdateStrategy::locking)
- .setCallbacks(&celix::rsa::RemoteServiceAdmin::addExportedServiceFactory, &celix::rsa::RemoteServiceAdmin::removeExportedServiceFactory);
- cmp.build();
-
- //note adding void service dependencies is not supported for the dependency manager, using a service tracker instead.
- _remoteServiceTracker = ctx->trackAnyServices()
- .setFilter(std::string{"("}.append(celix::rsa::SERVICE_EXPORTED_INTERFACES).append("=*)"))
- .addAddWithPropertiesCallback([admin](const std::shared_ptr<void>& svc, const std::shared_ptr<const celix::Properties>& properties) {
- admin->addService(svc, properties);
- })
- .addRemWithPropertiesCallback([admin](const std::shared_ptr<void>& svc, const std::shared_ptr<const celix::Properties>& properties) {
- admin->removeService(svc, properties);
- })
- .build();
- }
-private:
- std::shared_ptr<celix::ServiceTracker<void>> _remoteServiceTracker{};
-};
-
-CELIX_GEN_CXX_BUNDLE_ACTIVATOR(AdminActivator)
\ No newline at end of file
+}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/admin/include/RemoteServiceAdmin.h b/bundles/cxx_remote_services/admin/src/RemoteServiceAdmin.h
similarity index 63%
rename from bundles/cxx_remote_services/admin/include/RemoteServiceAdmin.h
rename to bundles/cxx_remote_services/admin/src/RemoteServiceAdmin.h
index 969a095..6101799 100644
--- a/bundles/cxx_remote_services/admin/include/RemoteServiceAdmin.h
+++ b/bundles/cxx_remote_services/admin/src/RemoteServiceAdmin.h
@@ -17,7 +17,6 @@
* under the License.
*/
-
#include <mutex>
#include "celix/LogHelper.h"
@@ -35,6 +34,15 @@
namespace celix::rsa {
+ /**
+ * @brief Remote Service Admin based on endpoint/proxy factories.
+ *
+ * A Remote Service Admin which does not contain on itself any technology, but relies on remote
+ * service endpoint/proxy factories to create exported/imported remote services.
+ *
+ * The RSA can be configured to use different remote service endpoint/proxy factories based on the
+ * intent of the remote service endpoint/proxy factories.
+ */
class RemoteServiceAdmin {
public:
explicit RemoteServiceAdmin(celix::LogHelper logHelper);
@@ -57,24 +65,26 @@
private:
void createExportServices();
void createImportServices();
+ bool isEndpointMatch(const celix::rsa::EndpointDescription& endpoint, const celix::rsa::IImportServiceFactory& factory) const;
+ bool isExportServiceMatch(const celix::Properties& svcProperties, const celix::rsa::IExportServiceFactory& factory) const;
- celix::LogHelper _logHelper;
- std::mutex _m{}; // protects below
+ celix::LogHelper logHelper;
+ std::mutex mutex{}; // protects below
#if __cpp_lib_memory_resource
- std::pmr::unsynchronized_pool_resource _memResource{};
+ std::pmr::unsynchronized_pool_resource memResource{};
- std::pmr::unordered_map<std::string, std::shared_ptr<celix::rsa::IExportServiceFactory>> _exportServiceFactories{&_memResource}; //key = service name
- std::pmr::unordered_map<std::string, std::shared_ptr<celix::rsa::IImportServiceFactory>> _importServiceFactories{&_memResource}; //key = service name
- std::pmr::unordered_map<std::string, std::unique_ptr<celix::rsa::IImportServiceGuard>> _importedServices{&_memResource}; //key = endpoint id
- std::pmr::unordered_map<long, std::unique_ptr<celix::rsa::IExportServiceGuard>> _exportedServices{&_memResource}; //key = service id
+ std::pmr::multimap<std::string, std::shared_ptr<celix::rsa::IExportServiceFactory>> exportServiceFactories{&memResource}; //key = service name
+ std::pmr::multimap<std::string, std::shared_ptr<celix::rsa::IImportServiceFactory>> importServiceFactories{&memResource}; //key = service name
+ std::pmr::unordered_map<std::string, std::unique_ptr<celix::rsa::IImportRegistration>> importedServices{&memResource}; //key = endpoint id
+ std::pmr::unordered_map<long, std::unique_ptr<celix::rsa::IExportRegistration>> exportedServices{&memResource}; //key = service id
#else
- std::unordered_map<std::string, std::shared_ptr<celix::rsa::IExportServiceFactory>> _exportServiceFactories{}; //key = service name
- std::unordered_map<std::string, std::shared_ptr<celix::rsa::IImportServiceFactory>> _importServiceFactories{}; //key = service name
- std::unordered_map<std::string, std::unique_ptr<celix::rsa::IImportServiceGuard>> _importedServices{}; //key = endpoint id
- std::unordered_map<long, std::unique_ptr<celix::rsa::IExportServiceGuard>> _exportedServices{}; //key = service id
+ std::multimap<std::string, std::shared_ptr<celix::rsa::IExportServiceFactory>> exportServiceFactories{}; //key = service name
+ std::multimap<std::string, std::shared_ptr<celix::rsa::IImportServiceFactory>> importServiceFactories{}; //key = service name
+ std::unordered_map<std::string, std::unique_ptr<celix::rsa::IImportRegistration>> importedServices{}; //key = endpoint id
+ std::unordered_map<long, std::unique_ptr<celix::rsa::IExportRegistration>> exportedServices{}; //key = service id
#endif
- std::vector<std::shared_ptr<celix::rsa::EndpointDescription>> _toBeImportedServices{};
- std::vector<std::shared_ptr<const celix::Properties>> _toBeExportedServices{};
+ std::vector<std::shared_ptr<celix::rsa::EndpointDescription>> toBeImportedServices{};
+ std::vector<std::shared_ptr<const celix::Properties>> toBeExportedServices{};
};
}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/admin/src/RemoteServiceAdminActivator.cc b/bundles/cxx_remote_services/admin/src/RemoteServiceAdminActivator.cc
new file mode 100644
index 0000000..0b646ac
--- /dev/null
+++ b/bundles/cxx_remote_services/admin/src/RemoteServiceAdminActivator.cc
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "celix/BundleActivator.h"
+#include "RemoteServiceAdmin.h"
+
+/**
+ * @Brief Remote Service Admin which use export/import service factories to import/export remote services.
+ *
+ * Supported intends and config are based on what is supported by the export/import service factories.
+ */
+class AdminActivator {
+public:
+ explicit AdminActivator(const std::shared_ptr<celix::BundleContext>& ctx) {
+ auto admin = std::make_shared<celix::rsa::RemoteServiceAdmin>(celix::LogHelper{ctx, celix::typeName<celix::rsa::RemoteServiceAdmin>()});
+
+ auto& cmp = ctx->getDependencyManager()->createComponent(admin);
+ cmp.createServiceDependency<celix::rsa::EndpointDescription>()
+ .setRequired(false)
+ .setStrategy(celix::dm::DependencyUpdateStrategy::locking)
+ .setCallbacks(&celix::rsa::RemoteServiceAdmin::addEndpoint, &celix::rsa::RemoteServiceAdmin::removeEndpoint);
+ cmp.createServiceDependency<celix::rsa::IImportServiceFactory>()
+ .setRequired(false)
+ .setStrategy(celix::dm::DependencyUpdateStrategy::locking)
+ .setCallbacks(&celix::rsa::RemoteServiceAdmin::addImportedServiceFactory, &celix::rsa::RemoteServiceAdmin::removeImportedServiceFactory);
+ cmp.createServiceDependency<celix::rsa::IExportServiceFactory>()
+ .setRequired(false)
+ .setStrategy(celix::dm::DependencyUpdateStrategy::locking)
+ .setCallbacks(&celix::rsa::RemoteServiceAdmin::addExportedServiceFactory, &celix::rsa::RemoteServiceAdmin::removeExportedServiceFactory);
+ cmp.build();
+
+ //note adding void service dependencies is not supported for the dependency manager, using a service tracker instead.
+ _remoteServiceTracker = ctx->trackAnyServices()
+ .setFilter(std::string{"("}.append(celix::rsa::SERVICE_EXPORTED_INTERFACES).append("=*)"))
+ .addAddWithPropertiesCallback([admin](const std::shared_ptr<void>& svc, const std::shared_ptr<const celix::Properties>& properties) {
+ admin->addService(svc, properties);
+ })
+ .addRemWithPropertiesCallback([admin](const std::shared_ptr<void>& svc, const std::shared_ptr<const celix::Properties>& properties) {
+ admin->removeService(svc, properties);
+ })
+ .build();
+ }
+private:
+ std::shared_ptr<celix::ServiceTracker<void>> _remoteServiceTracker{};
+};
+
+CELIX_GEN_CXX_BUNDLE_ACTIVATOR(AdminActivator)
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/discovery_configured/gtest/src/RsaConfiguredDiscoveryTestSuite.cc b/bundles/cxx_remote_services/discovery_configured/gtest/src/RsaConfiguredDiscoveryTestSuite.cc
index dcd900f..cd773be 100644
--- a/bundles/cxx_remote_services/discovery_configured/gtest/src/RsaConfiguredDiscoveryTestSuite.cc
+++ b/bundles/cxx_remote_services/discovery_configured/gtest/src/RsaConfiguredDiscoveryTestSuite.cc
@@ -54,7 +54,7 @@
auto count = ctx->useServices<celix::rsa::EndpointDescription>()
.addUseCallback([](auto& endpoint) {
EXPECT_NE(endpoint.getId(), "");
- EXPECT_NE(endpoint.getConfigurationTypes(), "");
+ EXPECT_NE(endpoint.getConfigurationTypes(), std::vector<std::string>{});
EXPECT_NE(endpoint.getInterface(), "");
EXPECT_NE(endpoint.getFrameworkUUID(), "");
EXPECT_NE(endpoint.getProperties().get("endpoint.scope"), ""); //note async specific
diff --git a/bundles/cxx_remote_services/integration/CMakeLists.txt b/bundles/cxx_remote_services/integration/CMakeLists.txt
index f5bf8c9..4b578ce 100644
--- a/bundles/cxx_remote_services/integration/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/CMakeLists.txt
@@ -21,7 +21,6 @@
target_link_libraries(TestExportImportRemoteServiceFactory PRIVATE Celix::rsa_spi Celix::pubsub_api Celix::Promises Celix::log_helper)
target_compile_options(TestExportImportRemoteServiceFactory PRIVATE -std=c++17) #TODO how can this be improved (bring back -std=c++17 on INTERFACE for promises?
target_include_directories(TestExportImportRemoteServiceFactory PRIVATE include)
-#TODO improve with serializer svc, for now using descriptors
celix_bundle_files(TestExportImportRemoteServiceFactory
resources/Calculator$add$Invoke.descriptor
resources/Calculator$add$Return.descriptor
@@ -61,8 +60,7 @@
#Pubsub needed for remote services on pubsub
Celix::pubsub_serializer_json
Celix::pubsub_topology_manager
- #TODO replace with v1 when marker interfaces for a serializer type are introduced Celix::pubsub_admin_zmq_v2
- Celix::pubsub_admin_zmq
+ Celix::pubsub_admin_zmq_v2
Celix::pubsub_protocol_wire_v2
Celix::pubsub_discovery_etcd
@@ -72,6 +70,7 @@
CalculatorProvider
)
+target_link_libraries(RemoteCalculatorProvider PRIVATE ZMQ::lib CZMQ::lib)
add_celix_container(RemoteCalculatorConsumer
GROUP rsa
@@ -85,7 +84,7 @@
Celix::pubsub_serializer_json
Celix::pubsub_topology_manager
Celix::pubsub_discovery_etcd
- Celix::pubsub_admin_zmq
+ Celix::pubsub_admin_zmq_v2
Celix::pubsub_protocol_wire_v2
#Remote Services
@@ -94,4 +93,5 @@
TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator
CalculatorConsumer
-)
\ No newline at end of file
+)
+target_link_libraries(RemoteCalculatorConsumer PRIVATE ZMQ::lib CZMQ::lib)
diff --git a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
index 824bd48..5c548ec 100644
--- a/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
+++ b/bundles/cxx_remote_services/integration/gtest/CMakeLists.txt
@@ -18,14 +18,14 @@
add_executable(test_cxx_remote_services_integration
src/RemoteServicesIntegrationTestSuite.cc
)
-target_link_libraries(test_cxx_remote_services_integration PRIVATE Celix::framework Celix::Promises Celix::shell_api GTest::gtest GTest::gtest_main)
+target_link_libraries(test_cxx_remote_services_integration PRIVATE Celix::framework Celix::Promises Celix::shell_api ZMQ::lib CZMQ::lib GTest::gtest GTest::gtest_main)
target_compile_options(test_cxx_remote_services_integration PRIVATE -std=c++17)
target_include_directories(test_cxx_remote_services_integration PRIVATE ../include) #Add ICalculator
add_celix_bundle_dependencies(test_cxx_remote_services_integration
Celix::pubsub_serializer_json
Celix::pubsub_topology_manager
- Celix::pubsub_admin_zmq
+ Celix::pubsub_admin_zmq_v2
Celix::pubsub_protocol_wire_v2
Celix::RsaConfiguredDiscovery
Celix::RemoteServiceAdmin
@@ -40,7 +40,7 @@
celix_get_bundle_file(Celix::pubsub_topology_manager PS_PSTM_BUNDLE_LOC)
target_compile_definitions(test_cxx_remote_services_integration PRIVATE PS_PSTM_BUNDLE_LOC="${PS_PSTM_BUNDLE_LOC}")
-celix_get_bundle_file(Celix::pubsub_admin_zmq PS_PSA_BUNDLE_LOC)
+celix_get_bundle_file(Celix::pubsub_admin_zmq_v2 PS_PSA_BUNDLE_LOC)
target_compile_definitions(test_cxx_remote_services_integration PRIVATE PS_PSA_BUNDLE_LOC="${PS_PSA_BUNDLE_LOC}")
celix_get_bundle_file(Celix::pubsub_protocol_wire_v2 PS_WIRE_BUNDLE_LOC)
diff --git a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
index dd4a3a2..ec4ea86 100644
--- a/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
+++ b/bundles/cxx_remote_services/integration/src/CalculatorProvider.cc
@@ -50,7 +50,8 @@
cmp.createProvidedService<ICalculator>()
.addProperty("service.exported.interfaces", celix::typeName<ICalculator>())
.addProperty("endpoint.topic", "test")
- .addProperty("endpoint.scope", "default");
+ .addProperty("endpoint.scope", "default")
+ .addProperty("service.exported.intents", "osgi.async");
cmp.build();
}
};
diff --git a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
index 154e42b..d8a6962 100644
--- a/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
+++ b/bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc
@@ -158,10 +158,10 @@
/**
* A import service guard, which will remove the component if it goes out of scope.
*/
-class ComponentImportServiceGuard final : public celix::rsa::IImportServiceGuard {
+class ComponentImportRegistration final : public celix::rsa::IImportRegistration {
public:
- ComponentImportServiceGuard(std::shared_ptr<celix::BundleContext> _ctx, std::string _componentId) : ctx{std::move(_ctx)}, componentId{std::move(_componentId)} {}
- ~ComponentImportServiceGuard() noexcept override {
+ ComponentImportRegistration(std::shared_ptr<celix::BundleContext> _ctx, std::string _componentId) : ctx{std::move(_ctx)}, componentId{std::move(_componentId)} {}
+ ~ComponentImportRegistration() noexcept override {
auto context = ctx.lock();
if (context) {
context->getDependencyManager()->removeComponentAsync(componentId);
@@ -177,14 +177,12 @@
*/
class CalculatorImportServiceFactory final : public celix::rsa::IImportServiceFactory {
public:
+ static constexpr const char * const CONFIGS = "pubsub";
+
explicit CalculatorImportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)}, logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
+ ~CalculatorImportServiceFactory() noexcept override = default;
- std::unique_ptr<celix::rsa::IImportServiceGuard> importService(const celix::rsa::EndpointDescription& endpoint) override {
- if (endpoint.getConfigurationTypes() != "pubsub") {
- ctx->logTrace("skipping endpoint, not pubsub configuration. Found config '%s'", endpoint.getConfigurationTypes().c_str());
- return nullptr;
- }
-
+ std::unique_ptr<celix::rsa::IImportRegistration> importService(const celix::rsa::EndpointDescription& endpoint) override {
auto topic = endpoint.getProperties().get("endpoint.topic");
auto scope = endpoint.getProperties().get("endpoint.topic");
if (topic.empty() || scope.empty()) {
@@ -193,8 +191,17 @@
}
auto componentId = createImportedCalculatorComponent(endpoint);
- return std::make_unique<ComponentImportServiceGuard>(ctx, std::move(componentId));
+ return std::make_unique<ComponentImportRegistration>(ctx, std::move(componentId));
}
+
+ [[nodiscard]] const std::string& getRemoteServiceType() const override {
+ return serviceType;
+ }
+
+ const std::vector<std::string>& getSupportedConfigs() const override {
+ return configs;
+ }
+
private:
std::string createImportedCalculatorComponent(const celix::rsa::EndpointDescription& endpoint) {
auto invokeTopic = endpoint.getProperties().get("endpoint.topic") + "_invoke";
@@ -243,6 +250,8 @@
return cmp.getUUID();
}
+ const std::string serviceType = celix::typeName<ICalculator>();
+ const std::vector<std::string> configs = celix::split(CONFIGS);
std::shared_ptr<celix::BundleContext> ctx;
celix::LogHelper logHelper;
std::mutex mutex{}; //protects below
@@ -341,10 +350,10 @@
/**
* A import service guard, which will remove the component if it goes out of scope.
*/
-class ComponentExportServiceGuard final : public celix::rsa::IExportServiceGuard {
+class ComponentExportRegistration final : public celix::rsa::IExportRegistration {
public:
- ComponentExportServiceGuard(std::shared_ptr<celix::BundleContext> _ctx, std::string _componentId) : ctx{std::move(_ctx)}, componentId{std::move(_componentId)} {}
- ~ComponentExportServiceGuard() noexcept override {
+ ComponentExportRegistration(std::shared_ptr<celix::BundleContext> _ctx, std::string _componentId) : ctx{std::move(_ctx)}, componentId{std::move(_componentId)} {}
+ ~ComponentExportRegistration() noexcept override {
auto context = ctx.lock();
if (context) {
context->getDependencyManager()->removeComponentAsync(componentId);
@@ -360,10 +369,14 @@
*/
class CalculatorExportServiceFactory final : public celix::rsa::IExportServiceFactory {
public:
+ static constexpr const char * const CONFIGS = "pubsub";
+ static constexpr const char * const INTENTS = "osgi.async";
+
explicit CalculatorExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)},
logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
+ ~CalculatorExportServiceFactory() noexcept override = default;
- std::unique_ptr<celix::rsa::IExportServiceGuard> exportService(const celix::Properties& serviceProperties) override {
+ std::unique_ptr<celix::rsa::IExportRegistration> exportService(const celix::Properties& serviceProperties) override {
auto topic = serviceProperties.get("endpoint.topic");
auto scope = serviceProperties.get("endpoint.topic");
if (topic.empty() || scope.empty()) {
@@ -372,8 +385,21 @@
}
auto componentId = createExportedCalculatorComponent(serviceProperties);
- return std::make_unique<ComponentExportServiceGuard>(ctx, std::move(componentId));
+ return std::make_unique<ComponentExportRegistration>(ctx, std::move(componentId));
}
+
+ [[nodiscard]] const std::string& getRemoteServiceType() const override {
+ return serviceType;
+ }
+
+ [[nodiscard]] const std::vector<std::string>& getSupportedIntents() const override {
+ return intents;
+ }
+
+ [[nodiscard]] const std::vector<std::string>& getSupportedConfigs() const override {
+ return configs;
+ }
+
private:
std::string createExportedCalculatorComponent(const celix::Properties& serviceProperties) {
auto invokeTopic = serviceProperties.get("endpoint.topic") + "_invoke";
@@ -417,6 +443,9 @@
return cmp.getUUID();
}
+ const std::string serviceType = celix::typeName<ICalculator>();
+ const std::vector<std::string> configs = celix::split(CONFIGS);
+ const std::vector<std::string> intents = celix::split(INTENTS);
std::shared_ptr<celix::BundleContext> ctx;
celix::LogHelper logHelper;
std::mutex mutex{}; //protects below
@@ -428,12 +457,15 @@
ctx->logInfo("Starting TestExportImportRemoteServiceFactory");
registrations.emplace_back(
ctx->registerService<celix::rsa::IImportServiceFactory>(std::make_shared<CalculatorImportServiceFactory>(ctx))
- .addProperty(celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME, celix::typeName<ICalculator>())
+ .addProperty(celix::rsa::IImportServiceFactory::REMOTE_SERVICE_TYPE, celix::typeName<ICalculator>())
+ .addProperty(celix::rsa::REMOTE_CONFIGS_SUPPORTED, CalculatorImportServiceFactory::CONFIGS)
.build()
);
registrations.emplace_back(
ctx->registerService<celix::rsa::IExportServiceFactory>(std::make_shared<CalculatorExportServiceFactory>(ctx))
- .addProperty(celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME, celix::typeName<ICalculator>())
+ .addProperty(celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE, celix::typeName<ICalculator>())
+ .addProperty(celix::rsa::REMOTE_CONFIGS_SUPPORTED, CalculatorExportServiceFactory::CONFIGS)
+ .addProperty(celix::rsa::REMOTE_INTENTS_SUPPORTED, CalculatorExportServiceFactory::INTENTS)
.build()
);
registrations.emplace_back(
diff --git a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/EndpointDescription.h b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/EndpointDescription.h
index 4b43142..a747deb 100644
--- a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/EndpointDescription.h
+++ b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/EndpointDescription.h
@@ -21,6 +21,7 @@
#include <string>
#include <map>
+#include "celix/Utils.h"
#include "celix/Constants.h"
#include "celix/Properties.h"
#include "celix/rsa/RemoteConstants.h"
@@ -51,7 +52,13 @@
*
* @param The properties from which to create the Endpoint Description.
*/
- explicit EndpointDescription(celix::Properties properties) : endpointProperties{std::move(properties)} {
+ explicit EndpointDescription(celix::Properties properties) :
+ endpointProperties{std::move(properties)},
+ endpointId{endpointProperties.get(celix::rsa::ENDPOINT_ID)},
+ configurationTypes{celix::split(endpointProperties.get(celix::rsa::SERVICE_IMPORTED_CONFIGS))},
+ frameworkUUID{endpointProperties.get(celix::rsa::ENDPOINT_FRAMEWORK_UUID)},
+ intents{celix::split(endpointProperties.get(celix::rsa::SERVICE_INTENTS))},
+ serviceName{endpointProperties.get(celix::SERVICE_NAME)} {
checkValidEndpoint();
}
@@ -66,8 +73,13 @@
* @param serviceProperties The service properties of a service that can be exported.
* @param rsaProperties The optional properties provided by the Remote Service Admin.
*/
- EndpointDescription(std::string_view frameworkUUID, const celix::Properties& serviceProperties, const celix::Properties& rsaProperties = {}) : endpointProperties{
- importedProperties(frameworkUUID, serviceProperties, rsaProperties)} {
+ EndpointDescription(std::string_view frameworkUUID, const celix::Properties& serviceProperties, const celix::Properties& rsaProperties = {}) :
+ endpointProperties{importedProperties(frameworkUUID, serviceProperties, rsaProperties)},
+ endpointId{endpointProperties.get(celix::rsa::ENDPOINT_ID)},
+ configurationTypes{celix::split(endpointProperties.get(celix::rsa::SERVICE_IMPORTED_CONFIGS))},
+ frameworkUUID{endpointProperties.get(celix::rsa::ENDPOINT_FRAMEWORK_UUID)},
+ intents{celix::split(endpointProperties.get(celix::rsa::SERVICE_INTENTS))},
+ serviceName{endpointProperties.get(celix::SERVICE_NAME)} {
checkValidEndpoint();
}
@@ -79,8 +91,8 @@
* Two Endpoint Descriptions with the same id must represent the same endpoint.
* The value of the id is stored in the RemoteConstants.ENDPOINT_ID property.
*/
- [[nodiscard]] std::string getId() const {
- return endpointProperties.get(celix::rsa::ENDPOINT_ID);
+ [[nodiscard]] const std::string& getId() const {
+ return endpointId;
}
/**
@@ -93,8 +105,8 @@
* increase the change a receiving distribution provider can create a connection to this endpoint.
* This value of the configuration types is stored in the celix::rsa::SERVICE_IMPORTED_CONFIGS service property.
*/
- [[nodiscard]] std::string getConfigurationTypes() const {
- return endpointProperties.get(celix::rsa::SERVICE_IMPORTED_CONFIGS);
+ [[nodiscard]] const std::vector<std::string>& getConfigurationTypes() const {
+ return configurationTypes;
}
/**
@@ -102,8 +114,8 @@
*
* The value of the remote framework UUID is stored in the celix::rsa::ENDPOINT_FRAMEWORK_UUID endpoint property.
*/
- [[nodiscard]] std::string getFrameworkUUID() const {
- return endpointProperties.get(celix::rsa::ENDPOINT_FRAMEWORK_UUID);
+ [[nodiscard]] const std::string& getFrameworkUUID() const {
+ return frameworkUUID;
}
/**
@@ -114,17 +126,16 @@
* All qualified intents must have been expanded.
* This value of the intents is stored in the RemoteConstants.SERVICE_INTENTS service property.
*/
- //TODO make std::vector<std::string>
- [[nodiscard]] std::string getIntents() const {
- return endpointProperties.get(celix::rsa::SERVICE_INTENTS);
+ [[nodiscard]] const std::vector<std::string>& getIntents() const {
+ return intents;
}
/**
* @brief Provide the interface implemented by the exported service.
* The value of the interface is derived from the objectClass property.
*/
- [[nodiscard]] std::string getInterface() const {
- return endpointProperties.get(celix::SERVICE_NAME);
+ [[nodiscard]] const std::string& getInterface() const {
+ return serviceName;
}
/**
@@ -170,17 +181,22 @@
void checkValidEndpoint() const {
std::string baseMsg = "Invalid properties for EndpointDescription, missing mandatory property ";
- if (endpointProperties.get(celix::rsa::ENDPOINT_ID).empty()) {
+ if (endpointId.empty()) {
throw celix::rsa::RemoteServicesException{baseMsg.append(celix::rsa::ENDPOINT_ID)};
}
- if (endpointProperties.get(celix::rsa::ENDPOINT_ID).empty()) {
+ if (configurationTypes.empty()) {
throw celix::rsa::RemoteServicesException{baseMsg.append(celix::rsa::SERVICE_IMPORTED_CONFIGS)};
}
- if (endpointProperties.get(celix::SERVICE_NAME).empty()) {
+ if (serviceName.empty()) {
throw celix::rsa::RemoteServicesException{baseMsg.append(celix::SERVICE_NAME)};
}
}
-
+
const celix::Properties endpointProperties;
+ const std::string endpointId;
+ const std::vector<std::string> configurationTypes;
+ const std::string frameworkUUID;
+ const std::vector<std::string> intents;
+ const std::string serviceName;
};
} // end namespace celix::rsa.
diff --git a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IExportRegistration.h b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IExportRegistration.h
new file mode 100644
index 0000000..81944c6
--- /dev/null
+++ b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IExportRegistration.h
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+namespace celix::rsa {
+
+ /**
+ * @brief IExportRegistration class which represent a (opaque) exported service.
+ * If lifetime of this object expires it should remove the underlining exported service.
+ * */
+ class IExportRegistration {
+ public:
+ virtual ~IExportRegistration() noexcept = default;
+ };
+}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IExportServiceFactory.h b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IExportServiceFactory.h
index 66383b6..73ebe28 100644
--- a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IExportServiceFactory.h
+++ b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IExportServiceFactory.h
@@ -19,24 +19,16 @@
#pragma once
#include <memory>
-#include "celix/rsa/EndpointDescription.h"
+
+#include "celix/rsa/IExportRegistration.h"
namespace celix::rsa {
/**
- * @brief IExportServiceGuard class which represent a (opaque) exported service.
- * If lifetime of this object expires it should remove the underlining exported service.
- * */
- class IExportServiceGuard {
- public:
- virtual ~IExportServiceGuard() noexcept = default;
- };
-
- /**
* @brief A export service factory for a specific service type.
*
- * The service type which this export service factory targets is provided with
- * the mandatory celix::rsa::IExportServiceFactory::TARGET_SERVICE_NAME service property.
+ * the mandatory service properties:
+ * - celix::rsa::IExportServiceFactory::REMOTE_SERVICE_TYPE
*
*/
class IExportServiceFactory {
@@ -44,16 +36,34 @@
/**
* @brief The service name for which this factory can created exported services.
*/
- static constexpr const char * const TARGET_SERVICE_NAME = "target.service.name";
+ static constexpr const char * const REMOTE_SERVICE_TYPE = "remote.service.type";
virtual ~IExportServiceFactory() noexcept = default;
/**
- * @brief Exports the service identified with svcId
- * @param svcId The service id of the exported service.
- * @return A ExportService.
+ * @brief The service name for which this factory can export services as remote services.
+ *
+ * The value should be based on the "remote.service.type" service property of
+ * this import service factory.
+ */
+ [[nodiscard]] virtual const std::string& getRemoteServiceType() const = 0;
+
+ /**
+ * @Brief The supported intents for this export service factory.
+ */
+ [[nodiscard]] virtual const std::vector<std::string>& getSupportedIntents() const = 0;
+
+ /**
+ * @Brief The supported configs for this export service factory.
+ */
+ [[nodiscard]] virtual const std::vector<std::string>& getSupportedConfigs() const = 0;
+
+ /**
+ * @brief Exports the service associated with the provided serviceProperties.
+ * @param serviceProperties The service properties of the to be exported service.
+ * @return A new export registration.
* @throws celix::rsa::RemoteServicesException if the export failed.
*/
- virtual std::unique_ptr<celix::rsa::IExportServiceGuard> exportService(const celix::Properties& serviceProperties) = 0;
+ [[nodiscard]] virtual std::unique_ptr<celix::rsa::IExportRegistration> exportService(const celix::Properties& serviceProperties) = 0;
};
}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IImportRegistration.h b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IImportRegistration.h
new file mode 100644
index 0000000..cb366dc
--- /dev/null
+++ b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IImportRegistration.h
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+namespace celix::rsa {
+
+ /**
+ * @brief IImportRegistration class which represent a (opaque) imported service.
+ * If lifetime of this object expires it should remove the underlining imported service.
+ */
+ class IImportRegistration {
+ public:
+ virtual ~IImportRegistration() noexcept = default;
+ };
+}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IImportServiceFactory.h b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IImportServiceFactory.h
index 7c6f3a1..dd9d3c6 100644
--- a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IImportServiceFactory.h
+++ b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/IImportServiceFactory.h
@@ -19,41 +19,47 @@
#pragma once
#include <memory>
+
#include "celix/rsa/EndpointDescription.h"
+#include "celix/rsa/IImportRegistration.h"
namespace celix::rsa {
/**
- * @brief IImportServiceGuard class which represent a (opaque) imported service.
- * If lifetime of this object expires it should remove the underlining imported service.
- */
- class IImportServiceGuard {
- public:
- virtual ~IImportServiceGuard() noexcept = default;
- };
-
- /**
* @brief A import service factory for a specific service type.
*
- * The service type which this import service factory targets is provided with
- * the mandatory celix::rsa::IImportServiceFactory::TARGET_SERVICE_NAME service property.
+ * The service type which this export service factory targets is provided with `REMOTE_SERVICE_TYPE` and
+ * the supported configs with `REMOTE_CONFIGS_SUPPORTED`.
*
*/
class IImportServiceFactory {
public:
/**
- * @brief The service name for which this factory can created exported services.
+ * @brief The service name for which this factory can import remote services.
*/
- static constexpr const char * const TARGET_SERVICE_NAME = "target.service.name";
+ static constexpr const char * const REMOTE_SERVICE_TYPE = "remote.service.type";
virtual ~IImportServiceFactory() noexcept = default;
/**
- * @brief Imports the service identified with svcId
- * @param svcId The service id of the exported service.
- * @return A ImportService.
+ * @brief The service name for which this factory can import remote services.
+ *
+ * The value should be based on the "remote.service.type" service property of
+ * this import service factory.
+ */
+ [[nodiscard]] virtual const std::string& getRemoteServiceType() const = 0;
+
+ /**
+ * @Brief The supported configs for this import service factory.
+ */
+ [[nodiscard]] virtual const std::vector<std::string>& getSupportedConfigs() const = 0;
+
+ /**
+ * @brief Imports a service for the provided remote service endpoint description.
+ * @param endpoint The endpoint description describing the remote service.
+ * @return A new import registration.
* @throws celix::rsa::RemoteServicesException if the import failed.
*/
- virtual std::unique_ptr<celix::rsa::IImportServiceGuard> importService(const celix::rsa::EndpointDescription& endpoint) = 0;
+ [[nodiscard]] virtual std::unique_ptr<celix::rsa::IImportRegistration> importService(const celix::rsa::EndpointDescription& endpoint) = 0;
};
}
\ No newline at end of file
diff --git a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/RemoteConstants.h b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/RemoteConstants.h
index 9dae5e3..7cae240 100644
--- a/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/RemoteConstants.h
+++ b/bundles/cxx_remote_services/rsa_spi/include/celix/rsa/RemoteConstants.h
@@ -91,7 +91,7 @@
/**
* @brief Service property marking the service for export. It defines the interfaces under which this service can be exported.
*
- * Note for Celix only 1 interface can be register per service regiration, so only 1 interface can be exported using
+ * Note for Celix only 1 interface can be register per service registration, so only 1 interface can be exported using
* the service.exported.interfaces property.
* This value must be the exported service type or the value of an asterisk ('*' \u002A).
* The value of this property must be of type string.
diff --git a/bundles/logging/log_helper/include/celix/LogHelper.h b/bundles/logging/log_helper/include/celix/LogHelper.h
index 3f447f2..c22cb35 100644
--- a/bundles/logging/log_helper/include/celix/LogHelper.h
+++ b/bundles/logging/log_helper/include/celix/LogHelper.h
@@ -32,7 +32,7 @@
/**
* @brief Logs to celix_logHelper_log using the CELIX_LOG_LEVEL_TRACE level, printf style
*/
- void trace(const char *format, ...) {
+ void trace(const char *format, ...) const {
va_list args;
va_start(args, format);
vlog(CELIX_LOG_LEVEL_TRACE, format, args);
@@ -42,7 +42,7 @@
/**
* @brief Logs to celix_logHelper_log using the CELIX_LOG_LEVEL_DEBUG level, printf style
*/
- void debug(const char *format, ...) {
+ void debug(const char *format, ...) const {
va_list args;
va_start(args, format);
vlog(CELIX_LOG_LEVEL_DEBUG, format, args);
@@ -52,7 +52,7 @@
/**
* @brief Logs to celix_logHelper_log using the CELIX_LOG_LEVEL_INFO level, printf style
*/
- void info(const char *format, ...) {
+ void info(const char *format, ...) const {
va_list args;
va_start(args, format);
vlog(CELIX_LOG_LEVEL_INFO, format, args);
@@ -62,7 +62,7 @@
/**
* @brief Logs to celix_logHelper_log using the CELIX_LOG_LEVEL_WARNING level, printf style
*/
- void warning(const char *format, ...) {
+ void warning(const char *format, ...) const {
va_list args;
va_start(args, format);
vlog(CELIX_LOG_LEVEL_WARNING, format, args);
@@ -72,7 +72,7 @@
/**
* @brief Logs to celix_logHelper_log using the CELIX_LOG_LEVEL_ERROR level, printf style
*/
- void error(const char *format, ...) {
+ void error(const char *format, ...) const {
va_list args;
va_start(args, format);
vlog(CELIX_LOG_LEVEL_ERROR, format, args);
@@ -82,7 +82,7 @@
/**
* @brief Logs to celix_logHelper_log using the CELIX_LOG_LEVEL_FATAL level, printf style
*/
- void fatal(const char *format, ...) {
+ void fatal(const char *format, ...) const {
va_list args;
va_start(args, format);
vlog(CELIX_LOG_LEVEL_FATAL, format, args);
@@ -94,7 +94,7 @@
*
* Silently ignores log level CELIX_LOG_LEVEL_DISABLED.
*/
- void vlog(celix_log_level_e level, const char *format, va_list formatArgs) {
+ void vlog(celix_log_level_e level, const char *format, va_list formatArgs) const {
celix_logHelper_vlog(logHelper.get(), level, format, formatArgs);
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
index ec9badb..ec3f853 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/psa_activator.c
@@ -20,7 +20,6 @@
#include <stdlib.h>
#include "celix_api.h"
-#include "pubsub_serializer.h"
#include "pubsub_protocol.h"
#include "celix_log_helper.h"
@@ -34,7 +33,6 @@
pubsub_tcp_admin_t *admin;
- long serializersTrackerId;
long protocolsTrackerId;
pubsub_admin_service_t adminService;
@@ -50,7 +48,6 @@
int psa_tcp_start(psa_tcp_activator_t *act, celix_bundle_context_t *ctx) {
act->adminSvcId = -1L;
act->cmdSvcId = -1L;
- act->serializersTrackerId = -1L;
act->protocolsTrackerId = -1L;
act->logHelper = celix_logHelper_create(ctx, "celix_psa_admin_tcp_v2");
@@ -58,17 +55,6 @@
act->admin = pubsub_tcpAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_tcpAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_tcpAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track protocols
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
@@ -132,7 +118,6 @@
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
- celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
pubsub_tcpAdmin_destroy(act->admin);
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
index 66c92ef..9806991 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.c
@@ -55,11 +55,6 @@
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_tcp_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = svcId, value = psa_tcp_protocol_entry_t*
} protocols;
@@ -79,6 +74,11 @@
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
+
pubsub_tcp_endPointStore_t endpointStore;
};
@@ -101,11 +101,6 @@
return type != NULL && strncmp(PUBSUB_PUBLISHER_ENDPOINT_TYPE, type, strlen(PUBSUB_PUBLISHER_ENDPOINT_TYPE)) == 0;
}
-static void pubsub_tcpAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
pubsub_tcp_admin_t *pubsub_tcpAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
pubsub_tcp_admin_t *psa = calloc(1, sizeof(*psa));
psa->ctx = ctx;
@@ -120,11 +115,9 @@
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_TCP_QOS_CONTROL_SCORE_KEY,
PSA_TCP_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->protocols.mutex, NULL);
psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
+
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -137,6 +130,9 @@
celixThreadMutex_create(&psa->endpointStore.mutex, NULL);
psa->endpointStore.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -177,13 +173,13 @@
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
- free(entry);
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
}
- celixThreadRwlock_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
celixThreadMutex_lock(&psa->protocols.mutex);
iter = hashMapIterator_construct(psa->protocols.map);
@@ -205,8 +201,9 @@
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
+
celixThreadMutex_destroy(&psa->protocols.mutex);
hashMap_destroy(psa->protocols.map, false, false);
@@ -215,101 +212,6 @@
free(psa);
}
-void pubsub_tcpAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_TCP_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_TCP_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
- return;
- }
- L_INFO("[PSA_TCP_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- L_INFO("[PSA_TCP_V2] typeEntries added %p %s", psa->serializers.map, serType);
- }
- psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_tcp_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- L_INFO("[PSA_TCP_V2] entry added");
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_tcpAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_tcp_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- //remove serializer
- // 1) First find entry and
- // 2) loop and destroy all topic sender using the serializer and
- // 3) loop and destroy all topic receivers using the serializer
- // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_tcp_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_tcpTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_tcpTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
- pubsub_tcp_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
- if (receiver != NULL && strncmp(serType, pubsub_tcpTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(receiverEntry);
- hashMapIterator_remove(&iter);
- pubsub_tcpTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
void pubsub_tcpAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
pubsub_tcp_admin_t *psa = handle;
@@ -421,30 +323,40 @@
return status;
}
+static pubsub_serializer_handler_t* pubsub_tcpAdmin_getSerializationHandler(pubsub_tcp_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+ }
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
+}
+
celix_status_t pubsub_tcpAdmin_setupTopicSender(void *handle, const char *scope, const char *topic,
const celix_properties_t *topicProperties, long serializerSvcId,
long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_tcp_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
-
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_tcpAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
@@ -453,14 +365,15 @@
if (sender == NULL) {
psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+ sender = pubsub_tcpTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId,
protEntry->svc);
}
if (sender != NULL) {
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *protType = protEntry->protType;
- newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
+ pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_TCP_URL_KEY, pubsub_tcpTopicSender_url(sender));
celix_properties_setBool(newEndpoint, PUBSUB_TCP_STATIC_CONFIGURED, pubsub_tcpTopicSender_isStatic(sender));
@@ -525,21 +438,15 @@
long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_tcp_admin_t *psa = handle;
+ pubsub_serializer_handler_t* handler = pubsub_tcpAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
+
celix_properties_t *newEndpoint = NULL;
-
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_tcpAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
pubsub_tcp_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
@@ -547,7 +454,8 @@
if (receiver == NULL) {
psa_tcp_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void *) protocolSvcId);
if (protEntry != NULL) {
- receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic, serType, handle, topicProperties,
+ receiver = pubsub_tcpTopicReceiver_create(psa->ctx, psa->log, scope, topic,
+ handler, handle, topicProperties,
&psa->endpointStore, protocolSvcId, protEntry->svc);
} else {
L_ERROR("[PSA_TCP_V2] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
@@ -556,7 +464,7 @@
const char *psaType = PUBSUB_TCP_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, pubsub_serializerHandler_getSerializationType(handler), protType, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -792,56 +700,10 @@
celixThreadMutex_unlock(&psa->protocols.mutex);
fprintf(out, "\n");
+ free(line);
return status;
}
-
-
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_tcp_admin_t *psa = handle;
- psa_tcp_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer __attribute__((unused))) {
- pubsub_tcp_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_tcp_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_tcp_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
- }
- } else {
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- L_WARN("[PSA_TCP_V2] pubsub_tcpAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
- return id;
-}
-
pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle) {
return NULL;
}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
index 2440fbb..513a934 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_admin.h
@@ -82,10 +82,6 @@
void pubsub_tcpAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
bool pubsub_tcpAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
-psa_tcp_serializer_entry_t* pubsub_tcpAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_tcpAdmin_releaseSerializer(void *handle, psa_tcp_serializer_entry_t* serializer);
-int64_t pubsub_tcpAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
pubsub_admin_metrics_t *pubsub_tcpAdmin_metrics(void *handle);
#endif //CELIX_PUBSUB_TCP_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
index 853e49d..a49ff23 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.c
@@ -56,7 +56,7 @@
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
- char *serType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
size_t timeout;
bool isPassive;
@@ -104,13 +104,12 @@
static void processMsg(void *handle, const pubsub_protocol_message_t *message, bool *release, struct timespec *receiveTime);
static void psa_tcp_connectHandler(void *handle, const char *url, bool lock);
static void psa_tcp_disConnectHandler(void *handle, const char *url, bool lock);
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_tcp_topic_receiver_t *pubsub_tcpTopicReceiver_create(celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -119,7 +118,7 @@
pubsub_tcp_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serType = celix_utils_strdup(serType);
+ receiver->serializerHandler = serializerHandler;
receiver->admin = admin;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
@@ -269,7 +268,6 @@
}
}
hashMap_destroy(receiver->subscribers.map, false, false);
-
celixThreadMutex_unlock(&receiver->subscribers.mutex);
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -299,7 +297,6 @@
free(receiver->scope);
}
free(receiver->topic);
- free(receiver->serType);
}
free(receiver);
}
@@ -313,7 +310,7 @@
}
const char *pubsub_tcpTopicReceiver_serializerType(pubsub_tcp_topic_receiver_t *receiver) {
- return receiver->serType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
long pubsub_tcpTopicReceiver_protocolSvcId(pubsub_tcp_topic_receiver_t *receiver) {
@@ -460,47 +457,43 @@
processMsgForSubscriberEntry(pubsub_tcp_topic_receiver_t *receiver, psa_tcp_subscriber_entry_t *entry,
const pubsub_protocol_message_t *message, bool *releaseMsg, struct timespec *receiveTime __attribute__((unused))) {
//NOTE receiver->subscribers.mutex locked
- psa_tcp_serializer_entry_t *msgSer = pubsub_tcpAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, message->header.msgId);
- if(msgSer == NULL) {
- pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
- L_WARN("[PSA_TCP_TR] Cannot find serializer for type id 0x%X. Received payload size is %u.", message->header.msgId, message->payload.length);
+ const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+ if (msgFqn == NULL) {
+ L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
return;
}
void *deSerializedMsg = NULL;
- celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
- bool validVersion = psa_tcp_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
- celix_version_destroy(version);
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = message->payload.payload;
deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
// When received payload pointer is the same as deserializedMsg, set ownership of pointer to topic receiver
if (message->payload.payload == deSerializedMsg) {
*releaseMsg = true;
}
- const char *msgType = msgSer->fqn;
if (status == CELIX_SUCCESS) {
uint32_t msgId = message->header.msgId;
celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, &metadata);
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, &metadata);
bool release = true;
if (cont) {
hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle,msgType, msgId, deSerializedMsg, message->metadata.metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgType, msgId, deSerializedMsg, metadata);
+ svc->receive(svc->handle, msgFqn, msgId, deSerializedMsg, message->metadata.metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deSerializedMsg, metadata);
if (!release && hashMapIterator_hasNext(&iter)) {
//receive function has taken ownership and still more receive function to come ..
//deserialize again for new message
- status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 1, &deSerializedMsg);
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 1, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s",
- msgType,
+ msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope,
receiver->topic);
break;
@@ -509,19 +502,25 @@
}
}
if (release) {
- msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deSerializedMsg);
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deSerializedMsg);
}
if (message->metadata.metadata) {
celix_properties_destroy(message->metadata.metadata);
}
}
} else {
- L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgType,
+ L_WARN("[PSA_TCP_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn,
receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ } else {
+ L_WARN("[PSA_TCP_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
+ msgFqn,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+ (int)message->header.msgMajorVersion,
+ (int)message->header.msgMinorVersion,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
}
-
- pubsub_tcpAdmin_releaseSerializer(receiver->admin, msgSer);
}
static void
@@ -664,24 +663,3 @@
}
celixThreadMutex_unlock(&receiver->subscribers.mutex);
}
-
-static bool psa_tcp_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
- bool check = false;
-
- if (major == 0 && minor == 0) {
- //no check
- return true;
- }
-
- int versionMajor;
- int versionMinor;
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion, &versionMajor);
- version_getMinor(msgVersion, &versionMinor);
- if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
- check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
index a7de405..35c14c6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_receiver.h
@@ -20,10 +20,11 @@
#ifndef CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
#include "celix_bundle_context.h"
-#include <pubsub_protocol.h>
+#include "pubsub_protocol.h"
#include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_receiver pubsub_tcp_topic_receiver_t;
@@ -31,7 +32,7 @@
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -52,6 +53,4 @@
void pubsub_tcpTopicReceiver_connectTo(pubsub_tcp_topic_receiver_t *receiver, const char *url);
void pubsub_tcpTopicReceiver_disconnectFrom(pubsub_tcp_topic_receiver_t *receiver, const char *url);
-pubsub_admin_receiver_metrics_t *pubsub_tcpTopicReceiver_metrics(pubsub_tcp_topic_receiver_t *receiver);
-
#endif //CELIX_PUBSUB_TCP_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
index 847d538..2c8daf4 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.c
@@ -57,16 +57,17 @@
pubsub_tcpHandler_t *socketHandler;
pubsub_tcpHandler_t *sharedSocketHandler;
pubsub_interceptors_handler_t *interceptorsHandler;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
char *scope;
char *topic;
char *url;
- char *serializerType;
bool isStatic;
bool isPassive;
bool verbose;
unsigned long send_delay;
+ int seqNr; //atomic
struct {
long svcId;
@@ -79,24 +80,10 @@
} boundedServices;
};
-typedef struct psa_tcp_send_msg_entry {
- uint32_t type; //msg type id (hash of fqn)
- const char *fqn;
- uint8_t major;
- uint8_t minor;
- unsigned char originUUID[16];
-// pubsub_msg_serializer_t *msgSer;
- pubsub_protocol_service_t *protSer;
- struct iovec *serializedIoVecOutput;
- size_t serializedIoVecOutputLen;
- unsigned int seqNr;
-} psa_tcp_send_msg_entry_t;
-
typedef struct psa_tcp_bounded_service_entry {
pubsub_tcp_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgEntries; //key = msg type id, value = psa_tcp_send_msg_entry_t
int getCount;
} psa_tcp_bounded_service_entry_t;
@@ -118,7 +105,7 @@
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -127,7 +114,7 @@
pubsub_tcp_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializerType = celix_utils_strdup(serializerType);
+ sender->serializerHandler = serializerHandler;
sender->admin = admin;
sender->protocolSvcId = protocolSvcId;
sender->protocol = protocol;
@@ -189,7 +176,7 @@
pubsub_tcpHandler_setSendRetryCnt(sender->socketHandler, (unsigned int) retryCnt);
pubsub_tcpHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
pubsub_tcpHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) maxMsgSize);
- // Hhen passiveKey is specified, enable receive event for full-duplex connection using key.
+ // When passiveKey is specified, enable receive event for full-duplex connection using key.
// Because the topic receiver is already started, enable the receive event.
pubsub_tcpHandler_enableReceiveEvent(sender->socketHandler, (passiveKey) ? true : false);
pubsub_tcpHandler_setTimeout(sender->socketHandler, (unsigned int) timeout);
@@ -273,18 +260,7 @@
hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
while (hashMapIterator_hasNext(&iter)) {
psa_tcp_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
- if (msgEntry->serializedIoVecOutput)
- free(msgEntry->serializedIoVecOutput);
- msgEntry->serializedIoVecOutput = NULL;
- free(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
- free(entry);
- }
+ free(entry);
}
hashMap_destroy(sender->boundedServices.map, false, false);
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -301,7 +277,6 @@
}
free(sender->topic);
free(sender->url);
- free(sender->serializerType);
free(sender);
}
}
@@ -319,7 +294,7 @@
}
const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender) {
- return sender->serializerType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
const char *pubsub_tcpTopicSender_url(pubsub_tcp_topic_sender_t *sender) {
@@ -337,15 +312,6 @@
return sender->isPassive;
}
-static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
- psa_tcp_bounded_service_entry_t *entry = (psa_tcp_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_tcpAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
- }
- return 0;
-}
-
static void *psa_tcp_getPublisherService(void *handle, const celix_bundle_t *requestingBundle,
const celix_properties_t *svcProperties __attribute__((unused))) {
pubsub_tcp_topic_sender_t *sender = handle;
@@ -360,7 +326,6 @@
entry->getCount = 1;
entry->parent = sender;
entry->bndId = bndId;
- entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_tcp_localMsgTypeIdForMsgType;
entry->service.send = psa_tcp_topicPublicationSend;
@@ -384,16 +349,6 @@
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void *) bndId);
-
- hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter)) {
- psa_tcp_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
- if (msgEntry->serializedIoVecOutput)
- free(msgEntry->serializedIoVecOutput);
- msgEntry->serializedIoVecOutput = NULL;
- free(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -402,91 +357,75 @@
static int
psa_tcp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- int status = CELIX_SUCCESS;
psa_tcp_bounded_service_entry_t *bound = handle;
pubsub_tcp_topic_sender_t *sender = bound->parent;
+ const char* msgFqn;
+ int majorVersion;
+ int minorVersion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
- psa_tcp_serializer_entry_t *serializer = pubsub_tcpAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_TCP_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
+ if (status != CELIX_SUCCESS) {
+ L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+ pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+ return status;
}
- psa_tcp_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+ if (!cont) {
+ L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+ return status;
+ }
- if(entry == NULL) {
- entry = calloc(1, sizeof(psa_tcp_send_msg_entry_t));
- entry->protSer = sender->protocol;
- entry->type = msgTypeId;
- entry->fqn = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->major = (uint8_t)celix_version_getMajor(version);
- entry->minor = (uint8_t)celix_version_getMinor(version);
- celix_version_destroy(version);
- uuid_copy(entry->originUUID, sender->fwUUID);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
+ struct iovec *serializedIoVecOutput = NULL;
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn,
+ sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ return status;
}
delay_first_send_for_late_joiners(sender);
- size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
- struct iovec *serializedIoVecOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
- entry->serializedIoVecOutputLen = MAX(serializedIoVecOutputLen, entry->serializedIoVecOutputLen);
-
- bool cont = false;
- if (status == CELIX_SUCCESS) /*ser ok*/ {
- cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
+ pubsub_protocol_message_t message;
+ message.metadata.metadata = NULL;
+ message.payload.payload = NULL;
+ message.payload.length = 0;
+ if (serializedIoVecOutput) {
+ message.payload.payload = serializedIoVecOutput->iov_base;
+ message.payload.length = serializedIoVecOutput->iov_len;
}
- if (cont) {
- pubsub_protocol_message_t message;
- message.metadata.metadata = NULL;
- message.payload.payload = NULL;
- message.payload.length = 0;
+ message.header.msgId = msgTypeId;
+ message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+ message.header.msgMajorVersion = (uint16_t)majorVersion;
+ message.header.msgMinorVersion = (uint16_t)minorVersion;
+ message.header.payloadSize = 0;
+ message.header.payloadPartSize = 0;
+ message.header.payloadOffset = 0;
+ message.header.metadataSize = 0;
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ }
+ bool sendOk = true;
+ {
+ int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
+ if (rc < 0) {
+ status = -1;
+ sendOk = false;
+ }
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+ if (message.metadata.metadata) {
+ celix_properties_destroy(message.metadata.metadata);
+ }
if (serializedIoVecOutput) {
- message.payload.payload = serializedIoVecOutput->iov_base;
- message.payload.length = serializedIoVecOutput->iov_len;
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+ serializedIoVecOutput = NULL;
}
- message.header.msgId = msgTypeId;
- message.header.seqNr = entry->seqNr;
- message.header.msgMajorVersion = entry->major;
- message.header.msgMinorVersion = entry->minor;
- message.header.payloadSize = 0;
- message.header.payloadPartSize = 0;
- message.header.payloadOffset = 0;
- message.header.metadataSize = 0;
- if (metadata != NULL) {
- message.metadata.metadata = metadata;
- }
- entry->seqNr++;
- bool sendOk = true;
- {
- int rc = pubsub_tcpHandler_write(sender->socketHandler, &message, serializedIoVecOutput, serializedIoVecOutputLen, 0);
- if (rc < 0) {
- status = -1;
- sendOk = false;
- }
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
- if (message.metadata.metadata) {
- celix_properties_destroy(message.metadata.metadata);
- }
- if (serializedIoVecOutput) {
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedIoVecOutput, serializedIoVecOutputLen);
- serializedIoVecOutput = NULL;
- }
- }
-
- if (!sendOk) {
- L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
- }
- } else {
- L_WARN("[PSA_TCP_V2_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn,
- sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
}
- pubsub_tcpAdmin_releaseSerializer(sender->admin, serializer);
+ if (!sendOk) {
+ L_WARN("[PSA_TCP_V2_TS] Error sending msg. %s", strerror(errno));
+ }
return status;
}
@@ -502,4 +441,11 @@
usleep(sender->send_delay * 1000);
firstSend = false;
}
+}
+
+static int psa_tcp_localMsgTypeIdForMsgType(void *handle, const char *msgType, unsigned int *msgTypeId) {
+ psa_tcp_bounded_service_entry_t* entry = handle;
+ uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+ *msgTypeId = (unsigned int)msgId;
+ return 0;
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
index dfb5014..57b13a6 100644
--- a/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_tcp/v2/src/pubsub_tcp_topic_sender.h
@@ -24,6 +24,7 @@
#include "pubsub_admin_metrics.h"
#include "pubsub_protocol.h"
#include "pubsub_tcp_common.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_tcp_topic_sender pubsub_tcp_topic_sender_t;
@@ -32,7 +33,7 @@
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
const celix_properties_t *topicProperties,
pubsub_tcp_endPointStore_t *handlerStore,
@@ -46,12 +47,6 @@
const char* pubsub_tcpTopicSender_serializerType(pubsub_tcp_topic_sender_t *sender);
bool pubsub_tcpTopicSender_isStatic(pubsub_tcp_topic_sender_t *sender);
bool pubsub_tcpTopicSender_isPassive(pubsub_tcp_topic_sender_t *sender);
-long pubsub_tcpTopicSender_serializerSvcId(pubsub_tcp_topic_sender_t *sender);
long pubsub_tcpTopicSender_protocolSvcId(pubsub_tcp_topic_sender_t *sender);
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t *pubsub_tcpTopicSender_metrics(pubsub_tcp_topic_sender_t *sender);
-
#endif //CELIX_PUBSUB_TCP_TOPIC_SENDER_H
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
index 159d8ed..33cc86f 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/psa_activator.c
@@ -53,17 +53,6 @@
act->admin = pubsub_websocketAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers (only json)
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_websocketAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_websocketAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//register pubsub admin service
if (status == CELIX_SUCCESS) {
pubsub_admin_service_t *psaSvc = &act->adminService;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
index 8950fda..2a103ee 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_admin.c
@@ -18,18 +18,18 @@
*/
#include <memory.h>
-#include <pubsub_endpoint.h>
-#include <pubsub_serializer.h>
#include <ip_utils.h>
-#include <pubsub_message_serialization_service.h>
-#include <pubsub_matching.h>
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_matching.h"
#include "pubsub_utils.h"
#include "pubsub_websocket_admin.h"
#include "pubsub_psa_websocket_constants.h"
#include "pubsub_websocket_topic_sender.h"
#include "pubsub_websocket_topic_receiver.h"
#include "pubsub_websocket_common.h"
+#include "pubsub_serializer_handler.h"
#define L_DEBUG(...) \
celix_logHelper_log(psa->log, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
@@ -52,11 +52,6 @@
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_websocket_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = scope:topic key, value = pubsub_websocket_topic_sender_t*
} topicSenders;
@@ -71,17 +66,15 @@
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
};
static celix_status_t pubsub_websocketAdmin_connectEndpointToReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_websocketAdmin_disconnectEndpointFromReceiver(pubsub_websocket_admin_t* psa, pubsub_websocket_topic_receiver_t *receiver, const celix_properties_t *endpoint);
-
-static void pubsub_websocketAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
pubsub_websocket_admin_t* pubsub_websocketAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper) {
pubsub_websocket_admin_t *psa = calloc(1, sizeof(*psa));
psa->ctx = ctx;
@@ -93,9 +86,6 @@
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_SAMPLE_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_WEBSOCKET_QOS_CONTROL_SCORE_KEY, PSA_WEBSOCKET_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->topicSenders.mutex, NULL);
psa->topicSenders.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
@@ -105,6 +95,9 @@
celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -138,13 +131,13 @@
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_serializer_entry_t *entry = hashMapIterator_nextValue(&iter);
- free(entry);
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
}
- celixThreadRwlock_unlock(&psa->serializers.mutex);
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -155,112 +148,12 @@
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
free(psa);
}
-void pubsub_websocketAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_websocket_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_WEBSOCKET_V2] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_WEBSOCKET_V2] Ignored serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
- return;
- }
- L_INFO("[PSA_WEBSOCKET_V2] Adding serializer type %s msgId %li fqn %s", serType, msgId, msgFqn);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- L_INFO("[PSA_WEBSOCKET_V2] typeEntries added %p %s", psa->serializers.map, serType);
- }
- psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_websocket_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- L_INFO("[PSA_WEBSOCKET_V2] entry added");
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_websocketAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_websocket_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- if(serType == NULL || msgId == -1) {
- L_ERROR("[PSA_WEBSOCKET_V2] Error removing serializer svc %s %i", serType, msgId);
- return;
- }
-
- //remove serializer
- // 1) First find entry and
- // 2) loop and destroy all topic sender using the serializer and
- // 3) loop and destroy all topic receivers using the serializer
- // Note that it is the responsibility of the topology manager to create new topic senders/receivers
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_websocket_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_websocket_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_websocketTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_websocketTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_websocket_topic_receiver_t *receiver = hashMapEntry_getValue(senderEntry);
- if (receiver != NULL && strncmp(serType, pubsub_websocketTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_websocketTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
celix_status_t pubsub_websocketAdmin_matchPublisher(void *handle, long svcRequesterBndId, const celix_filter_t *svcFilter, celix_properties_t **topicProperties, double *outScore, long *outSerializerSvcId, long *outProtocolSvcId) {
pubsub_websocket_admin_t *psa = handle;
L_DEBUG("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_matchPublisher");
@@ -297,38 +190,49 @@
return status;
}
+static pubsub_serializer_handler_t* pubsub_websocketAdmin_getSerializationHandler(pubsub_websocket_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
+ }
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
+}
+
+
celix_status_t pubsub_websocketAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_websocket_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_websocketAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_websocket_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
if (sender == NULL) {
- sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, serType, psa);
+ sender = pubsub_websocketTopicSender_create(psa->ctx, psa->log, scope, topic, handler, psa);
if (sender != NULL) {
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, NULL, NULL);
+ pubsub_serializerHandler_getSerializationType(handler), NULL, NULL);
//Set endpoint visibility to local because the http server handles discovery
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
@@ -382,29 +286,25 @@
celix_status_t pubsub_websocketAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_websocket_admin_t *psa = handle;
-
celix_properties_t *newEndpoint = NULL;
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_websocketAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+ pubsub_serializer_handler_t* handler = pubsub_websocketAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
+
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
pubsub_websocket_topic_receiver_t *receiver = hashMap_get(psa->topicReceivers.map, key);
if (receiver == NULL) {
- receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, psa);
+ receiver = pubsub_websocketTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, psa);
if (receiver != NULL) {
const char *psaType = PUBSUB_WEBSOCKET_ADMIN_TYPE;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, NULL, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ pubsub_serializerHandler_getSerializationType(handler), NULL, NULL);
//Set endpoint visibility to local because the http server handles discovery
celix_properties_set(newEndpoint, PUBSUB_ENDPOINT_VISIBILITY, PUBSUB_ENDPOINT_LOCAL_VISIBILITY);
@@ -578,57 +478,11 @@
return status;
}
-psa_websocket_serializer_entry_t* pubsub_websocketAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_websocket_admin_t *psa = handle;
- psa_websocket_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_websocketAdmin_releaseSerializer(void *handle, psa_websocket_serializer_entry_t* serializer) {
- pubsub_websocket_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_websocketAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_websocket_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_websocket_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn key fqn %s %s", entry->fqn, fqn);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
- }
- } else {
- L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn typeEntries == NULL %s %s", serializationType, fqn);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- L_WARN("[PSA_WEBSOCKET_V2] pubsub_websocketAdmin_getMessageIdForMessageFqn %p %s %s = %i", psa->serializers.map, serializationType, fqn, id);
-
- return id;
-}
-
bool pubsub_websocketAdmin_executeCommand(void *handle, const char *commandLine __attribute__((unused)), FILE *out, FILE *errStream __attribute__((unused))) {
pubsub_websocket_admin_t *psa = handle;
fprintf(out, "\n");
fprintf(out, "Topic Senders:\n");
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -642,11 +496,9 @@
fprintf(out, " |- url = %s\n", url);
}
celixThreadMutex_unlock(&psa->topicSenders.mutex);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
fprintf(out, "\nTopic Receivers:\n");
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
iter = hashMapIterator_construct(psa->topicReceivers.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -677,7 +529,6 @@
celix_arrayList_destroy(unconnected);
}
celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
fprintf(out, "\n");
return true;
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
index c93f078..ea997e9 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.c
@@ -68,10 +68,11 @@
void *admin;
char *scope;
char *topic;
- char *serType;
char scopeAndTopicFilter[5];
char *uri;
+ pubsub_serializer_handler_t* serializerHandler;
+
celix_websocket_service_t sockSvc;
long svcId;
@@ -131,12 +132,12 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin) {
pubsub_websocket_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serType = celix_utils_strdup(serType);
+ receiver->serializerHandler = serializerHandler;
receiver->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
receiver->topic = strndup(topic, 1024 * 1024);
receiver->admin = admin;
@@ -309,7 +310,6 @@
free(receiver->uri);
free(receiver->scope);
free(receiver->topic);
- free(receiver->serType);
}
free(receiver);
}
@@ -325,7 +325,7 @@
}
const char *pubsub_websocketTopicReceiver_serializerType(pubsub_websocket_topic_receiver_t *receiver) {
- return receiver->serType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
void pubsub_websocketTopicReceiver_listConnections(pubsub_websocket_topic_receiver_t *receiver, celix_array_list_t *connectedUrls, celix_array_list_t *unconnectedUrls) {
@@ -451,58 +451,52 @@
static inline void processMsgForSubscriberEntry(pubsub_websocket_topic_receiver_t *receiver, psa_websocket_subscriber_entry_t* entry, pubsub_websocket_msg_header_t *hdr, const char* payload, size_t payloadSize) {
//NOTE receiver->subscribers.mutex locked
- int64_t msgTypeId = pubsub_websocketAdmin_getMessageIdForMessageFqn(receiver->admin, receiver->serType, hdr->id);
+ uint32_t msgId = pubsub_serializerHandler_getMsgId(receiver->serializerHandler, hdr->id);
- if(msgTypeId < 0) {
- L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- return;
- }
-
- psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
- L_WARN("[PSA_WEBSOCKET_V2_TR] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", receiver->serType, msgTypeId, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ if (msgId == 0) {
+ L_WARN("Cannot find msg id for msg fqn %s", hdr->id);
return;
}
void *deSerializedMsg = NULL;
-
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- bool validVersion = psa_websocket_checkVersion(version, hdr);
- celix_version_destroy(version);
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, msgId, hdr->major, hdr->minor);
if (validVersion) {
struct iovec deSerializeBuffer;
deSerializeBuffer.iov_base = (void *)payload;
deSerializeBuffer.iov_len = payloadSize;
- celix_status_t status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg);
-
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
if (status == CELIX_SUCCESS) {
hash_map_iterator_t iter = hashMapIterator_construct(entry->subscriberServices);
bool release = true;
while (hashMapIterator_hasNext(&iter)) {
pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter);
- svc->receive(svc->handle, serializer->fqn, msgTypeId, deSerializedMsg, NULL, &release);
+ svc->receive(svc->handle, hdr->id, msgId, deSerializedMsg, NULL, &release);
if (!release && hashMapIterator_hasNext(&iter)) {
//receive function has taken ownership and still more receive function to come ..
//deserialize again for new message
- status = serializer->svc->deserialize(serializer->svc->handle, &deSerializeBuffer, 0, &deSerializedMsg);
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, msgId, hdr->major, hdr->minor, &deSerializeBuffer, 0, &deSerializedMsg);
if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
break;
}
release = true;
}
}
if (release) {
- serializer->svc->freeDeserializedMsg(serializer->svc->handle, deSerializedMsg);
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, msgId, deSerializedMsg);
}
} else {
- L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", serializer->fqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize msg type %s for scope/topic %s/%s", hdr->id, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
}
+ } else {
+ L_WARN("[PSA_WEBSOCKET_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version send: %i.%i.x",
+ hdr->id,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+ (int)hdr->major,
+ (int)hdr->minor,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, msgId));
}
-
- pubsub_websocketAdmin_releaseSerializer(receiver->admin, serializer);
}
static inline void processMsg(pubsub_websocket_topic_receiver_t *receiver, const char *msg, size_t msgSize) {
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
index 55d5255..f5edda5 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_receiver.h
@@ -20,8 +20,9 @@
#ifndef CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_WEBSOCKET_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
+#include "pubsub_admin_metrics.h"
#include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_websocket_topic_receiver pubsub_websocket_topic_receiver_t;
@@ -30,7 +31,7 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin);
void pubsub_websocketTopicReceiver_destroy(pubsub_websocket_topic_receiver_t *receiver);
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
index 98a1ad7..adc5ffe 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.c
@@ -52,10 +52,13 @@
void *admin;
char *scope;
char *topic;
- char *serType;
char scopeAndTopicFilter[5];
char *uri;
+ pubsub_serializer_handler_t* serializerHandler;
+
+ int seqNr; //atomic
+
celix_websocket_service_t websockSvc;
long websockSvcId;
struct mg_connection *sockConnection;
@@ -71,16 +74,10 @@
} boundedServices;
};
-typedef struct psa_websocket_send_msg_entry {
- pubsub_websocket_msg_header_t header; //partially filled header (only seqnr and time needs to be updated per send)
- uint32_t type; //msg type id (hash of fqn)
-} psa_websocket_send_msg_entry_t;
-
typedef struct psa_websocket_bounded_service_entry {
pubsub_websocket_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgEntries; //key = msg type id, value = psa_websocket_send_msg_entry_t
int getCount;
} psa_websocket_bounded_service_entry_t;
@@ -99,18 +96,12 @@
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin) {
pubsub_websocket_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serType = celix_utils_strdup(serType);
-
- if(sender->serType == NULL) {
- L_ERROR("[PSA_WEBSOCKET_V2_TS] Error getting serType");
- free(sender);
- return NULL;
- }
+ sender->serializerHandler = serializerHandler;
psa_websocket_setScopeAndTopicFilter(scope, topic, sender->scopeAndTopicFilter);
sender->uri = psa_websocket_createURI(scope, topic);
@@ -174,17 +165,7 @@
hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
while (hashMapIterator_hasNext(&iter)) {
psa_websocket_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
- free(msgEntry);
-
- }
- hashMap_destroy(entry->msgEntries, false, false);
-
- free(entry);
- }
+ free(entry);
}
hashMap_destroy(sender->boundedServices.map, false, false);
celixThreadMutex_unlock(&sender->boundedServices.mutex);
@@ -198,7 +179,6 @@
}
free(sender->topic);
free(sender->uri);
- free(sender->serType);
free(sender);
}
}
@@ -216,16 +196,17 @@
}
const char* pubsub_websocketTopicSender_serializerType(pubsub_websocket_topic_sender_t *sender) {
- return sender->serType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
static int psa_websocket_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
psa_websocket_bounded_service_entry_t *entry = (psa_websocket_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_websocketAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
+ uint32_t msgId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
+ if (msgId != 0) {
+ *msgTypeId = msgId;
+ return 0;
}
- return 0;
+ return -1;
}
static void* psa_websocket_getPublisherService(void *handle, const celix_bundle_t *requestingBundle, const celix_properties_t *svcProperties __attribute__((unused))) {
@@ -241,7 +222,6 @@
entry->getCount = 1;
entry->parent = sender;
entry->bndId = bndId;
- entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_websocket_localMsgTypeIdForMsgType;
entry->service.send = psa_websocket_topicPublicationSend;
@@ -264,60 +244,39 @@
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
- hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter)) {
- psa_websocket_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
- free(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
-
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
static int psa_websocket_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- int status = CELIX_SERVICE_EXCEPTION;
psa_websocket_bounded_service_entry_t *bound = handle;
pubsub_websocket_topic_sender_t *sender = bound->parent;
- psa_websocket_serializer_entry_t *serializer = pubsub_websocketAdmin_acquireSerializerForMessageId(sender->admin, sender->serType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_WEBSOCKET_V2_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
+ const char* msgFqn;
+ int majorVersion;
+ int minorVersion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorVersion);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+ pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+ return status;
}
-
- psa_websocket_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void *) (uintptr_t) (msgTypeId));
- if(entry == NULL) {
- entry = calloc(1, sizeof(psa_websocket_send_msg_entry_t));
- entry->type = msgTypeId;
- entry->header.id = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->header.major = (uint8_t)celix_version_getMajor(version);
- entry->header.minor = (uint8_t)celix_version_getMinor(version);
- entry->header.seqNr = 0;
- celix_version_destroy(version);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
- }
if (sender->sockConnection != NULL) {
delay_first_send_for_late_joiners(sender);
size_t serializedOutputLen = 0;
struct iovec* serializedOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen);
-
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedOutput, &serializedOutputLen);
if (status == CELIX_SUCCESS /*ser ok*/) {
json_error_t jsError;
json_t *jsMsg = json_object();
- json_object_set_new_nocheck(jsMsg, "id", json_string(entry->header.id));
- json_object_set_new_nocheck(jsMsg, "major", json_integer(entry->header.major));
- json_object_set_new_nocheck(jsMsg, "minor", json_integer(entry->header.minor));
- uint32_t seqNr = __atomic_fetch_add(&entry->header.seqNr, 1, __ATOMIC_RELAXED);
+ json_object_set_new_nocheck(jsMsg, "id", json_string(msgFqn));
+ json_object_set_new_nocheck(jsMsg, "major", json_integer(majorVersion));
+ json_object_set_new_nocheck(jsMsg, "minor", json_integer(minorVersion));
+ uint32_t seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
json_object_set_new_nocheck(jsMsg, "seqNr", json_integer(seqNr));
json_t *jsData;
@@ -338,17 +297,15 @@
}
json_decref(jsMsg); //Decrease ref count means freeing the object
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedOutput, serializedOutputLen);
} else {
- L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %s for scope/topic %s/%s",
- entry->header.id, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ L_WARN("[PSA_WEBSOCKET_TS] Error serialize message of type %u for scope/topic %s/%s",
+ msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
}
} else { // when (sender->sockConnection == NULL) we dont have a client, but we do have a valid entry
status = CELIX_SUCCESS; // Not an error, just nothing to do
}
- pubsub_websocketAdmin_releaseSerializer(sender->admin, serializer);
-
return status;
}
diff --git a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
index 8f8cebf..6b42500 100644
--- a/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_websocket/v2/src/pubsub_websocket_topic_sender.h
@@ -22,6 +22,7 @@
#include "celix_bundle_context.h"
#include "pubsub_admin_metrics.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_websocket_topic_sender pubsub_websocket_topic_sender_t;
@@ -30,7 +31,7 @@
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char *serType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin);
void pubsub_websocketTopicSender_destroy(pubsub_websocket_topic_sender_t *sender);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
index 014401e..7aaee4d 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/psa_activator.c
@@ -41,9 +41,6 @@
pubsub_admin_service_t adminService;
long adminSvcId;
- pubsub_admin_metrics_service_t adminMetricsService;
- long adminMetricsSvcId;
-
celix_shell_command_t cmdSvc;
long cmdSvcId;
} psa_zmq_activator_t;
@@ -59,17 +56,6 @@
act->admin = pubsub_zmqAdmin_create(ctx, act->logHelper);
celix_status_t status = act->admin != NULL ? CELIX_SUCCESS : CELIX_BUNDLE_EXCEPTION;
- //track serializers
- if (status == CELIX_SUCCESS) {
- celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.filter.ignoreServiceLanguage = true;
- opts.callbackHandle = act->admin;
- opts.addWithProperties = pubsub_zmqAdmin_addSerializerSvc;
- opts.removeWithProperties = pubsub_zmqAdmin_removeSerializerSvc;
- act->serializersTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- }
-
//track protocols
if (status == CELIX_SUCCESS) {
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
@@ -101,16 +87,6 @@
act->adminSvcId = celix_bundleContext_registerService(ctx, psaSvc, PUBSUB_ADMIN_SERVICE_NAME, props);
}
- if (status == CELIX_SUCCESS) {
- act->adminMetricsService.handle = act->admin;
- act->adminMetricsService.metrics = pubsub_zmqAdmin_metrics;
-
- celix_properties_t *props = celix_properties_create();
- celix_properties_set(props, PUBSUB_ADMIN_SERVICE_TYPE, PUBSUB_ZMQ_ADMIN_TYPE);
-
- act->adminMetricsSvcId = celix_bundleContext_registerService(ctx, &act->adminMetricsService, PUBSUB_ADMIN_METRICS_SERVICE_NAME, props);
- }
-
//register shell command service
{
act->cmdSvc.handle = act->admin;
@@ -128,7 +104,6 @@
int psa_zmq_stop(psa_zmq_activator_t *act, celix_bundle_context_t *ctx) {
celix_bundleContext_unregisterService(ctx, act->adminSvcId);
celix_bundleContext_unregisterService(ctx, act->cmdSvcId);
- celix_bundleContext_unregisterService(ctx, act->adminMetricsSvcId);
celix_bundleContext_stopTracker(ctx, act->serializersTrackerId);
celix_bundleContext_stopTracker(ctx, act->protocolsTrackerId);
pubsub_zmqAdmin_destroy(act->admin);
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
index c50006a..7f1d891 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_psa_zmq_constants.h
@@ -35,10 +35,6 @@
#define PSA_ZMQ_QOS_CONTROL_SCORE_KEY "PSA_ZMQ_QOS_CONTROL_SCORE"
#define PSA_ZMQ_DEFAULT_SCORE_KEY "PSA_ZMQ_DEFAULT_SCORE"
-
-#define PSA_ZMQ_METRICS_ENABLED "PSA_ZMQ_METRICS_ENABLED"
-#define PSA_ZMQ_DEFAULT_METRICS_ENABLED true
-
#define PSA_ZMQ_ZEROCOPY_ENABLED "PSA_ZMQ_ZEROCOPY_ENABLED"
#define PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED false
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
index 5d396e5..f842f01 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.c
@@ -23,15 +23,17 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <ifaddrs.h>
-#include <pubsub_endpoint.h>
#include <czmq.h>
-#include <pubsub_serializer.h>
-#include <pubsub_protocol.h>
#include <ip_utils.h>
-#include <pubsub_matching.h>
-#include <pubsub_utils.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_serializer_handler.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_serializer.h"
+#include "pubsub_protocol.h"
+#include "pubsub_matching.h"
+#include "pubsub_utils.h"
+#include "pubsub_message_serialization_service.h"
+#include "pubsub_message_serialization_marker.h"
#include "pubsub_zmq_admin.h"
#include "pubsub_psa_zmq_constants.h"
#include "pubsub_zmq_topic_sender.h"
@@ -64,11 +66,6 @@
bool verbose;
struct {
- celix_thread_rwlock_t mutex;
- hash_map_t *map; //key = svcId, value = psa_zmq_serializer_entry_t*
- } serializers;
-
- struct {
celix_thread_mutex_t mutex;
hash_map_t *map; //key = svcId, value = psa_zmq_protocol_entry_t*
} protocols;
@@ -88,6 +85,10 @@
hash_map_t *map; //key = endpoint uuid, value = celix_properties_t* (endpoint)
} discoveredEndpoints;
+ struct {
+ celix_thread_mutex_t mutex;
+ hash_map_t *map; //key = pubsub message serialization marker svc id (long), pubsub_serialization_handler_t*.
+ } serializationHandlers;
};
typedef struct psa_zmq_protocol_entry {
@@ -99,6 +100,7 @@
static celix_status_t zmq_getIpAddress(const char* interface, char** ip);
static celix_status_t pubsub_zmqAdmin_connectEndpointToReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
static celix_status_t pubsub_zmqAdmin_disconnectEndpointFromReceiver(pubsub_zmq_admin_t* psa, pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *endpoint);
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId);
static bool pubsub_zmqAdmin_endpointIsPublisher(const celix_properties_t *endpoint) {
const char *type = celix_properties_get(endpoint, PUBSUB_ENDPOINT_TYPE, NULL);
@@ -182,9 +184,6 @@
psa->qosSampleScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_SAMPLE_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_SAMPLE_SCORE);
psa->qosControlScore = celix_bundleContext_getPropertyAsDouble(ctx, PSA_ZMQ_QOS_CONTROL_SCORE_KEY, PSA_ZMQ_DEFAULT_QOS_CONTROL_SCORE);
- celixThreadRwlock_create(&psa->serializers.mutex, NULL);
- psa->serializers.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
-
celixThreadMutex_create(&psa->protocols.mutex, NULL);
psa->protocols.map = hashMap_create(NULL, NULL, NULL, NULL);
@@ -197,6 +196,9 @@
celixThreadMutex_create(&psa->discoveredEndpoints.mutex, NULL);
psa->discoveredEndpoints.map = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
+ celixThreadMutex_create(&psa->serializationHandlers.mutex, NULL);
+ psa->serializationHandlers.map = hashMap_create(NULL, NULL, NULL, NULL);
+
return psa;
}
@@ -231,15 +233,6 @@
}
celixThreadMutex_unlock(&psa->discoveredEndpoints.mutex);
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- iter = hashMapIterator_construct(psa->serializers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_t *entry = hashMapIterator_nextValue(&iter);
- hashMap_destroy(entry, false, true);
- }
- hashMap_clear(psa->serializers.map, false, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
celixThreadMutex_lock(&psa->protocols.mutex);
iter = hashMapIterator_construct(psa->protocols.map);
while (hashMapIterator_hasNext(&iter)) {
@@ -248,6 +241,14 @@
}
celixThreadMutex_unlock(&psa->protocols.mutex);
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ iter = hashMapIterator_construct(psa->serializationHandlers.map);
+ while (hashMapIterator_hasNext(&iter)) {
+ pubsub_serializer_handler_t* entry = hashMapIterator_nextValue(&iter);
+ pubsub_serializerHandler_destroy(entry);
+ }
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+
celixThreadMutex_destroy(&psa->topicSenders.mutex);
hashMap_destroy(psa->topicSenders.map, true, false);
@@ -257,12 +258,12 @@
celixThreadMutex_destroy(&psa->discoveredEndpoints.mutex);
hashMap_destroy(psa->discoveredEndpoints.map, false, false);
- celixThreadRwlock_destroy(&psa->serializers.mutex);
- hashMap_destroy(psa->serializers.map, false, false);
-
celixThreadMutex_destroy(&psa->protocols.mutex);
hashMap_destroy(psa->protocols.map, false, false);
+ celixThreadMutex_destroy(&psa->serializationHandlers.mutex);
+ hashMap_destroy(psa->serializationHandlers.map, false, false);
+
if (psa->zmq_auth != NULL) {
zactor_destroy(&psa->zmq_auth);
}
@@ -272,92 +273,6 @@
free(psa);
}
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_zmq_admin_t *psa = handle;
-
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
- const char *msgFqn = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, NULL);
- const char *msgVersion = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, "0.0.0");
-
- if (serType == NULL || msgId == -1L || msgFqn == NULL) {
- L_INFO("[PSA_ZMQ] Ignoring serializer service without one of the following properties: %s or %s or %s",
- PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY);
-
- L_INFO("[PSA_ZMQ] Ignored serializer type %s msgId %li fqn %s\n", serType, msgId, msgFqn);
- return;
- }
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries == NULL) {
- typeEntries = hashMap_create(NULL, NULL, NULL, NULL);
- hashMap_put(psa->serializers.map, (void*)celix_utils_strdup(serType), typeEntries);
- }
- psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, (void*)msgId);
- if (entry == NULL) {
- entry = calloc(1, sizeof(psa_zmq_serializer_entry_t));
- entry->svc = svc;
- entry->fqn = celix_utils_strdup(msgFqn);
- entry->version = celix_utils_strdup(msgVersion);
- hashMap_put(typeEntries, (void*)msgId, entry);
- }
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props) {
- pubsub_zmq_admin_t *psa = handle;
- const char *serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
- long msgId = celix_properties_getAsLong(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, -1L);
-
- celixThreadRwlock_writeLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serType);
- if(typeEntries != NULL) {
- psa_zmq_serializer_entry_t *entry = hashMap_remove(typeEntries, (void*)msgId);
- free((void*)entry->fqn);
- free((void*)entry->version);
- free(entry);
-
- // check if there are no remaining serializers for the given type. If not, remove all senders and receivers for this type.
- if(hashMap_size(typeEntries) == 0) {
- hashMap_destroy(hashMap_removeFreeKey(psa->serializers.map, serType), true, false);
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *senderEntry = hashMapIterator_nextEntry(&iter);
- pubsub_zmq_topic_sender_t *sender = hashMapEntry_getValue(senderEntry);
- if (sender != NULL && strncmp(serType, pubsub_zmqTopicSender_serializerType(sender), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(senderEntry);
- hashMapIterator_remove(&iter);
- pubsub_zmqTopicSender_destroy(sender);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- hash_map_entry_t *receiverEntry = hashMapIterator_nextEntry(&iter);
- pubsub_zmq_topic_receiver_t *receiver = hashMapEntry_getValue(receiverEntry);
- if (receiver != NULL && strncmp(serType, pubsub_zmqTopicReceiver_serializerType(receiver), 1024 * 1024) == 0) {
- char *key = hashMapEntry_getKey(receiverEntry);
- hashMapIterator_remove(&iter);
- pubsub_zmqTopicReceiver_destroy(receiver);
- free(key);
- }
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
- } else {
- celixThreadRwlock_unlock(&psa->serializers.mutex);
- }
-}
-
void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props) {
pubsub_zmq_admin_t *psa = handle;
@@ -462,20 +377,21 @@
return status;
}
-static void pubsub_zmqAdmin_getSerType(void *handle, void *svc __attribute__((unused)), const celix_properties_t* props) {
- const char** out = handle;
- *out = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, NULL);
-}
-
-
-celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
+celix_status_t pubsub_zmqAdmin_setupTopicSender(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId, long protocolSvcId, celix_properties_t **outPublisherEndpoint) {
pubsub_zmq_admin_t *psa = handle;
celix_status_t status = CELIX_SUCCESS;
- //1) Create TopicSender
- //2) Store TopicSender
- //3) Connect existing endpoints
- //4) set outPublisherEndpoint
+ //1) Get serialization handler
+ //2) Create TopicSender
+ //3) Store TopicSender
+ //4) Connect existing endpoints
+ //5) set outPublisherEndpoint
+
+ pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic sender without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
celix_properties_t *newEndpoint = NULL;
@@ -485,17 +401,6 @@
}
char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_zmqAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
-
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicSenders.mutex);
pubsub_zmq_topic_sender_t *sender = hashMap_get(psa->topicSenders.map, key);
@@ -503,14 +408,14 @@
if (sender == NULL) {
psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
if (protEntry != NULL) {
- sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, serType, handle,
+ sender = pubsub_zmqTopicSender_create(psa->ctx, psa->log, scope, topic, handler, handle,
protocolSvcId, protEntry->svc, psa->ipAddress, staticBindUrl, psa->basePort, psa->maxPort);
}
if (sender != NULL) {
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic, PUBSUB_PUBLISHER_ENDPOINT_TYPE, psaType,
- serType, protType, NULL);
+ pubsub_zmqTopicSender_serializerType(sender), protType, NULL);
celix_properties_set(newEndpoint, PUBSUB_ZMQ_URL_KEY, pubsub_zmqTopicSender_url(sender));
//if configured use a static discover url
@@ -546,10 +451,6 @@
}
celixThreadMutex_unlock(&psa->protocols.mutex);
- if (sender != NULL && newEndpoint != NULL) {
- //TODO connect endpoints to sender, NOTE is this needed for a zmq topic sender?
- }
-
if (newEndpoint != NULL && outPublisherEndpoint != NULL) {
*outPublisherEndpoint = newEndpoint;
}
@@ -572,7 +473,6 @@
pubsub_zmq_topic_sender_t *sender = hashMap_remove(psa->topicSenders.map, key);
celixThreadMutex_unlock(&psa->topicSenders.mutex);
free(mapKey);
- //TODO disconnect endpoints to sender. note is this needed for a zmq topic sender?
pubsub_zmqTopicSender_destroy(sender);
} else {
celixThreadMutex_unlock(&psa->topicSenders.mutex);
@@ -585,21 +485,15 @@
celix_status_t pubsub_zmqAdmin_setupTopicReceiver(void *handle, const char *scope, const char *topic, const celix_properties_t *topicProperties, long serializerSvcId __attribute__((unused)), long protocolSvcId, celix_properties_t **outSubscriberEndpoint) {
pubsub_zmq_admin_t *psa = handle;
-
celix_properties_t *newEndpoint = NULL;
- char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
+ pubsub_serializer_handler_t* handler = pubsub_zmqAdmin_getSerializationHandler(psa, serializerSvcId);
+ if (handler == NULL) {
+ L_ERROR("Cannot create topic receiver without serialization handler");
+ return CELIX_ILLEGAL_STATE;
+ }
- //get serializer type
- const char *serType = NULL;
- celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
- opts.callbackHandle = &serType;
- opts.useWithProperties = pubsub_zmqAdmin_getSerType;
- opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- char filter[32];
- snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, serializerSvcId);
- opts.filter.filter = filter;
- celix_bundleContext_useServiceWithOptions(psa->ctx, &opts);
+ char *key = pubsubEndpoint_createScopeTopicKey(scope, topic);
celixThreadMutex_lock(&psa->protocols.mutex);
celixThreadMutex_lock(&psa->topicReceivers.mutex);
@@ -608,7 +502,7 @@
if (receiver == NULL) {
psa_zmq_protocol_entry_t *protEntry = hashMap_get(psa->protocols.map, (void*)protocolSvcId);
if (protEntry != NULL) {
- receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, serType, handle, protocolSvcId, protEntry->svc);
+ receiver = pubsub_zmqTopicReceiver_create(psa->ctx, psa->log, scope, topic, topicProperties, handler, handle, protocolSvcId, protEntry->svc);
} else {
L_ERROR("[PSA_ZMQ] Cannot find serializer or protocol for TopicSender %s/%s", scope == NULL ? "(null)" : scope, topic);
}
@@ -616,7 +510,8 @@
const char *psaType = PUBSUB_ZMQ_ADMIN_TYPE;
const char *protType = protEntry->protType;
newEndpoint = pubsubEndpoint_create(psa->fwUUID, scope, topic,
- PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType, serType, protType, NULL);
+ PUBSUB_SUBSCRIBER_ENDPOINT_TYPE, psaType,
+ pubsub_zmqTopicReceiver_serializerType(receiver), protType, NULL);
//if available also set container name
const char *cn = celix_bundleContext_getProperty(psa->ctx, "CELIX_CONTAINER_NAME", NULL);
if (cn != NULL) {
@@ -856,72 +751,18 @@
return status;
}
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId) {
- pubsub_zmq_admin_t *psa = handle;
- psa_zmq_serializer_entry_t *serializer = NULL;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- serializer = hashMap_get(typeEntries, (void*)(uintptr_t)msgId);
- }
-
- return serializer;
-}
-
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer __attribute__((unused))) {
- pubsub_zmq_admin_t *psa = handle;
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-}
-
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn) {
- pubsub_zmq_admin_t *psa = handle;
- int64_t id = -1L;
-
- celixThreadRwlock_readLock(&psa->serializers.mutex);
- hash_map_t *typeEntries = hashMap_get(psa->serializers.map, serializationType);
- if(typeEntries != NULL) {
- hash_map_iterator_t iterator = hashMapIterator_construct(typeEntries);
- while(hashMapIterator_hasNext(&iterator)) {
- void *key = hashMapIterator_nextKey(&iterator);
- psa_zmq_serializer_entry_t *entry = hashMap_get(typeEntries, key);
- if(strncmp(fqn, entry->fqn, 1024*1024) == 0) {
- id = (uint32_t)(uintptr_t)key;
- break;
- }
+static pubsub_serializer_handler_t* pubsub_zmqAdmin_getSerializationHandler(pubsub_zmq_admin_t* psa, long msgSerializationMarkerSvcId) {
+ pubsub_serializer_handler_t* handler = NULL;
+ celixThreadMutex_lock(&psa->serializationHandlers.mutex);
+ handler = hashMap_get(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId);
+ if (handler == NULL) {
+ handler = pubsub_serializerHandler_createForMarkerService(psa->ctx, msgSerializationMarkerSvcId, psa->log);
+ if (handler != NULL) {
+ hashMap_put(psa->serializationHandlers.map, (void*)msgSerializationMarkerSvcId, handler);
}
}
- celixThreadRwlock_unlock(&psa->serializers.mutex);
-
- return id;
-}
-
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle) {
- pubsub_zmq_admin_t *psa = handle;
- pubsub_admin_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->psaType, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", PUBSUB_ZMQ_ADMIN_TYPE);
- result->senders = celix_arrayList_create();
- result->receivers = celix_arrayList_create();
-
- celixThreadMutex_lock(&psa->topicSenders.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(psa->topicSenders.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_zmq_topic_sender_t *sender = hashMapIterator_nextValue(&iter);
- pubsub_admin_sender_metrics_t *metrics = pubsub_zmqTopicSender_metrics(sender);
- celix_arrayList_add(result->senders, metrics);
- }
- celixThreadMutex_unlock(&psa->topicSenders.mutex);
-
- celixThreadMutex_lock(&psa->topicReceivers.mutex);
- iter = hashMapIterator_construct(psa->topicReceivers.map);
- while (hashMapIterator_hasNext(&iter)) {
- pubsub_zmq_topic_receiver_t *receiver = hashMapIterator_nextValue(&iter);
- pubsub_admin_receiver_metrics_t *metrics = pubsub_zmqTopicReceiver_metrics(receiver);
- celix_arrayList_add(result->receivers, metrics);
- }
- celixThreadMutex_unlock(&psa->topicReceivers.mutex);
-
- return result;
+ celixThreadMutex_unlock(&psa->serializationHandlers.mutex);
+ return handler;
}
#ifndef ANDROID
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
index f99ee08..6a8ba97 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_admin.h
@@ -27,12 +27,6 @@
typedef struct pubsub_zmq_admin pubsub_zmq_admin_t;
-typedef struct psa_zmq_serializer_entry {
- const char *fqn;
- const char *version;
- pubsub_message_serialization_service_t *svc;
-} psa_zmq_serializer_entry_t;
-
pubsub_zmq_admin_t* pubsub_zmqAdmin_create(celix_bundle_context_t *ctx, celix_log_helper_t *logHelper);
void pubsub_zmqAdmin_destroy(pubsub_zmq_admin_t *psa);
@@ -49,19 +43,10 @@
celix_status_t pubsub_zmqAdmin_addDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
celix_status_t pubsub_zmqAdmin_removeDiscoveredEndpoint(void *handle, const celix_properties_t *endpoint);
-void pubsub_zmqAdmin_addSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-void pubsub_zmqAdmin_removeSerializerSvc(void *handle, void *svc, const celix_properties_t *props);
-
void pubsub_zmqAdmin_addProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
void pubsub_zmqAdmin_removeProtocolSvc(void *handle, void *svc, const celix_properties_t *props);
bool pubsub_zmqAdmin_executeCommand(void *handle, const char *commandLine, FILE *outStream, FILE *errStream);
-psa_zmq_serializer_entry_t* pubsub_zmqAdmin_acquireSerializerForMessageId(void *handle, const char *serializationType, uint32_t msgId);
-void pubsub_zmqAdmin_releaseSerializer(void *handle, psa_zmq_serializer_entry_t* serializer);
-int64_t pubsub_zmqAdmin_getMessageIdForMessageFqn(void *handle, const char *serializationType, const char *fqn);
-
-pubsub_admin_metrics_t* pubsub_zmqAdmin_metrics(void *handle);
-
#endif //CELIX_PUBSUB_ZMQ_ADMIN_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
index 0568834..90e9510 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.c
@@ -26,22 +26,22 @@
#if !defined(__APPLE__)
#include <sys/epoll.h>
#endif
+
#include <assert.h>
-#include <pubsub_endpoint.h>
#include <arpa/inet.h>
#include <czmq.h>
-#include <celix_log_helper.h>
+#include <uuid/uuid.h>
+
+#include "pubsub_endpoint.h"
+#include "celix_log_helper.h"
#include "pubsub_zmq_topic_receiver.h"
#include "pubsub_psa_zmq_constants.h"
-
-#include <uuid/uuid.h>
-#include <pubsub_admin_metrics.h>
-#include <pubsub_utils.h>
-#include <celix_api.h>
-#include <celix_version.h>
-
+#include "pubsub_admin_metrics.h"
+#include "pubsub_utils.h"
+#include "celix_api.h"
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
#include "pubsub_interceptors_handler.h"
-
#include "celix_utils_api.h"
#include "pubsub_zmq_admin.h"
@@ -64,13 +64,12 @@
struct pubsub_zmq_topic_receiver {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- const char *serializerType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
char *scope;
char *topic;
- bool metricsEnabled;
pubsub_interceptors_handler_t *interceptorsHandler;
@@ -118,7 +117,6 @@
static void psa_zmq_initializeAllSubscribers(pubsub_zmq_topic_receiver_t *receiver);
static void psa_zmq_setupZmqContext(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
static void psa_zmq_setupZmqSocket(pubsub_zmq_topic_receiver_t *receiver, const celix_properties_t *topicProperties);
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor);
pubsub_zmq_topic_receiver_t* pubsub_zmqTopicReceiver_create(celix_bundle_context_t *ctx,
@@ -126,20 +124,19 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char* serializerType,
+ pubsub_serializer_handler_t* serHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *protocol) {
pubsub_zmq_topic_receiver_t *receiver = calloc(1, sizeof(*receiver));
receiver->ctx = ctx;
receiver->logHelper = logHelper;
- receiver->serializerType = celix_utils_strdup(serializerType);
+ receiver->serializerHandler = serHandler;
receiver->admin = admin;
receiver->protocolSvcId = protocolSvcId;
receiver->protocol = protocol;
receiver->scope = scope == NULL ? NULL : celix_utils_strdup(scope);
receiver->topic = celix_utils_strdup(topic);
- receiver->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
pubsubInterceptorsHandler_create(ctx, scope, topic, &receiver->interceptorsHandler);
@@ -312,7 +309,6 @@
free(receiver->scope);
free(receiver->topic);
- free((void*)receiver->serializerType);
}
free(receiver);
}
@@ -325,7 +321,7 @@
}
const char* pubsub_zmqTopicReceiver_serializerType(pubsub_zmq_topic_receiver_t *receiver) {
- return receiver->serializerType;
+ return pubsub_serializerHandler_getSerializationType(receiver->serializerHandler);
}
long pubsub_zmqTopicReceiver_protocolSvcId(pubsub_zmq_topic_receiver_t *receiver) {
@@ -444,70 +440,57 @@
}
static inline void processMsgForSubscriberEntry(pubsub_zmq_topic_receiver_t *receiver, psa_zmq_subscriber_entry_t* entry, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
- //NOTE receiver->subscribers.mutex locked
- bool monitor = receiver->metricsEnabled;
- psa_zmq_serializer_entry_t *msgSer = pubsub_zmqAdmin_acquireSerializerForMessageId(receiver->admin, receiver->serializerType, message->header.msgId);
-
- //monitoring
- struct timespec beginSer;
- struct timespec endSer;
- int updateReceiveCount = 0;
- int updateSerError = 0;
-
- if (msgSer!= NULL) {
- void *deserializedMsg = NULL;
- const char *msgFqn = msgSer->fqn;
- celix_version_t *version = celix_version_createVersionFromString(msgSer->version);
- bool validVersion = psa_zmq_checkVersion(version, message->header.msgMajorVersion, message->header.msgMinorVersion);
- celix_version_destroy(version);
- if (validVersion) {
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &beginSer);
- }
- struct iovec deSerializeBuffer;
- deSerializeBuffer.iov_base = message->payload.payload;
- deSerializeBuffer.iov_len = message->payload.length;
- celix_status_t status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &endSer);
- }
- if (status == CELIX_SUCCESS) {
- uint32_t msgId = message->header.msgId;
- celix_properties_t *metadata = message->metadata.metadata;
- bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
- bool release = true;
- if (cont) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
- while (hashMapIterator_hasNext(&iter2)) {
- pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
- svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
- pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
- if (!release && hashMapIterator_hasNext(&iter2)) {
- //receive function has taken ownership and still more receive function to come ..
- //deserialize again for new message
- status = msgSer->svc->deserialize(msgSer->svc->handle, &deSerializeBuffer, 0, &deserializedMsg);
- if (status != CELIX_SUCCESS) {
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- break;
- }
- release = true;
- }
- }
- if (release) {
- msgSer->svc->freeDeserializedMsg(msgSer->svc->handle, deserializedMsg);
- }
- updateReceiveCount += 1;
- }
- } else {
- updateSerError += 1;
- L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
- }
- }
- } else {
- L_WARN("[PSA_ZMQ_TR] Cannot find serializer for type id 0x%X", message->header.msgId);
+ const char* msgFqn = pubsub_serializerHandler_getMsgFqn(receiver->serializerHandler, message->header.msgId);
+ if (msgFqn == NULL) {
+ L_WARN("Cannot find msg fqn for msg id %u", message->header.msgId);
+ return;
}
- pubsub_zmqAdmin_releaseSerializer(receiver->admin, msgSer);
+ void *deserializedMsg = NULL;
+ bool validVersion = pubsub_serializerHandler_isMessageSupported(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion);
+ if (validVersion) {
+ struct iovec deSerializeBuffer;
+ deSerializeBuffer.iov_base = message->payload.payload;
+ deSerializeBuffer.iov_len = message->payload.length;
+ celix_status_t status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+ if (status == CELIX_SUCCESS) {
+ uint32_t msgId = message->header.msgId;
+ celix_properties_t *metadata = message->metadata.metadata;
+ bool cont = pubsubInterceptorHandler_invokePreReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, &metadata);
+ bool release = true;
+ if (cont) {
+ hash_map_iterator_t iter2 = hashMapIterator_construct(entry->subscriberServices);
+ while (hashMapIterator_hasNext(&iter2)) {
+ pubsub_subscriber_t *svc = hashMapIterator_nextValue(&iter2);
+ svc->receive(svc->handle, msgFqn, message->header.msgId, deserializedMsg, metadata, &release);
+ pubsubInterceptorHandler_invokePostReceive(receiver->interceptorsHandler, msgFqn, msgId, deserializedMsg, metadata);
+ if (!release && hashMapIterator_hasNext(&iter2)) {
+ //receive function has taken ownership and still more receive function to come ..
+ //deserialize again for new message
+ status = pubsub_serializerHandler_deserialize(receiver->serializerHandler, message->header.msgId, message->header.msgMajorVersion, message->header.msgMinorVersion, &deSerializeBuffer, 0, &deserializedMsg);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ break;
+ }
+ release = true;
+ }
+ }
+ if (release) {
+ pubsub_serializerHandler_freeDeserializedMsg(receiver->serializerHandler, message->header.msgId, deserializedMsg);
+ }
+ }
+ } else {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize msg type %s for scope/topic %s/%s", msgFqn, receiver->scope == NULL ? "(null)" : receiver->scope, receiver->topic);
+ }
+ } else {
+ L_WARN("[PSA_ZMQ_TR] Cannot deserialize message '%s' using %s, version mismatch. Version received: %i.%i.x, version local: %i.%i.x",
+ msgFqn,
+ pubsub_serializerHandler_getSerializationType(receiver->serializerHandler),
+ (int)message->header.msgMajorVersion,
+ (int)message->header.msgMinorVersion,
+ pubsub_serializerHandler_getMsgMajorVersion(receiver->serializerHandler, message->header.msgId),
+ pubsub_serializerHandler_getMsgMinorVersion(receiver->serializerHandler, message->header.msgId));
+ }
}
static inline void processMsg(pubsub_zmq_topic_receiver_t *receiver, pubsub_protocol_message_t *message, struct timespec *receiveTime) {
@@ -614,13 +597,6 @@
return NULL;
}
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver) {
- pubsub_admin_receiver_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : receiver->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", receiver->topic);
- return result;
-}
-
static void psa_zmq_connectToAllRequestedConnections(pubsub_zmq_topic_receiver_t *receiver) {
celixThreadMutex_lock(&receiver->requestedConnections.mutex);
@@ -744,24 +720,3 @@
ts->zmq_pub_cert = pub_cert;
#endif
}
-
-static bool psa_zmq_checkVersion(version_pt msgVersion, uint16_t major, uint16_t minor) {
- bool check=false;
-
- if (major == 0 && minor == 0) {
- //no check
- return true;
- }
-
- int versionMajor;
- int versionMinor;
- if (msgVersion!=NULL) {
- version_getMajor(msgVersion, &versionMajor);
- version_getMinor(msgVersion, &versionMinor);
- if (major==((unsigned char)versionMajor)) { /* Different major means incompatible */
- check = (minor>=((unsigned char)versionMinor)); /* Compatible only if the provider has a minor equals or greater (means compatible update) */
- }
- }
-
- return check;
-}
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
index 67ba6d4..f47ce37 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_receiver.h
@@ -20,9 +20,10 @@
#ifndef CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
#define CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
-#include <pubsub_admin_metrics.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_admin_metrics.h"
+#include "pubsub_message_serialization_service.h"
#include "celix_bundle_context.h"
+#include "pubsub_serializer_handler.h"
typedef struct pubsub_zmq_topic_receiver pubsub_zmq_topic_receiver_t;
@@ -31,7 +32,7 @@
const char *scope,
const char *topic,
const celix_properties_t *topicProperties,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *protocol);
@@ -48,7 +49,5 @@
void pubsub_zmqTopicReceiver_disconnectFrom(pubsub_zmq_topic_receiver_t *receiver, const char *url);
-pubsub_admin_receiver_metrics_t* pubsub_zmqTopicReceiver_metrics(pubsub_zmq_topic_receiver_t *receiver);
-
#endif //CELIX_PUBSUB_ZMQ_TOPIC_RECEIVER_H
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
index f969f4c..7d9e750 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.c
@@ -21,17 +21,18 @@
#include <pubsub_protocol.h>
#include <stdlib.h>
#include <memory.h>
-#include <pubsub_constants.h>
-#include <pubsub/publisher.h>
-#include <utils.h>
-#include <zconf.h>
#include <arpa/inet.h>
#include <czmq.h>
-#include <celix_log_helper.h>
+#include <uuid/uuid.h>
+
+#include "celix_utils.h"
+#include "pubsub_constants.h"
+#include "pubsub/publisher.h"
+#include "celix_log_helper.h"
#include "pubsub_zmq_topic_sender.h"
#include "pubsub_psa_zmq_constants.h"
-#include <uuid/uuid.h>
-#include <celix_version.h>
+#include "celix_version.h"
+#include "pubsub_serializer_handler.h"
#include "celix_constants.h"
#include "pubsub_interceptors_handler.h"
#include "pubsub_zmq_admin.h"
@@ -51,12 +52,11 @@
struct pubsub_zmq_topic_sender {
celix_bundle_context_t *ctx;
celix_log_helper_t *logHelper;
- const char *serializerType;
+ pubsub_serializer_handler_t* serializerHandler;
void *admin;
long protocolSvcId;
pubsub_protocol_service_t *protocol;
uuid_t fwUUID;
- bool metricsEnabled;
bool zeroCopyEnabled;
pubsub_interceptors_handler_t *interceptorsHandler;
@@ -66,6 +66,18 @@
char *url;
bool isStatic;
+ long seqNr; //atomic
+
+ struct {
+ bool dataLock; //atomic, protects below and protect zmq internal data
+ void *headerBuffer;
+ size_t headerBufferSize;
+ void *metadataBuffer;
+ size_t metadataBufferSize;
+ void *footerBuffer;
+ size_t footerBufferSize;
+ } zmqBuffers;
+
struct {
zsock_t *socket;
zcert_t *cert;
@@ -82,42 +94,16 @@
} boundedServices;
};
-typedef struct psa_zmq_send_msg_entry {
- uint32_t type; //msg type id (hash of fqn)
- const char *fqn;
- uint8_t major;
- uint8_t minor;
- unsigned char originUUID[16];
- pubsub_protocol_service_t *protSer;
- unsigned int seqNr;
- void *headerBuffer;
- size_t headerBufferSize;
- void *metadataBuffer;
- size_t metadataBufferSize;
- void *footerBuffer;
- size_t footerBufferSize;
- bool dataLocked; // protected ZMQ functions and seqNr
- struct {
- celix_thread_mutex_t mutex; //protects entries in struct
- unsigned long nrOfMessagesSend;
- unsigned long nrOfMessagesSendFailed;
- unsigned long nrOfSerializationErrors;
- struct timespec lastMessageSend;
- double averageTimeBetweenMessagesInSeconds;
- double averageSerializationTimeInSeconds;
- } metrics;
-} psa_zmq_send_msg_entry_t;
-
typedef struct psa_zmq_bounded_service_entry {
pubsub_zmq_topic_sender_t *parent;
pubsub_publisher_t service;
long bndId;
- hash_map_t *msgEntries; //key = msg type id, value = psa_zmq_send_msg_entry_t
int getCount;
} psa_zmq_bounded_service_entry_t;
typedef struct psa_zmq_zerocopy_free_entry {
- psa_zmq_serializer_entry_t *msgSer;
+ uint32_t msgId;
+ pubsub_serializer_handler_t *serHandler;
struct iovec *serializedOutput;
size_t serializedOutputLen;
} psa_zmq_zerocopy_free_entry;
@@ -135,7 +121,7 @@
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
@@ -146,7 +132,7 @@
pubsub_zmq_topic_sender_t *sender = calloc(1, sizeof(*sender));
sender->ctx = ctx;
sender->logHelper = logHelper;
- sender->serializerType = celix_utils_strdup(serializerType);
+ sender->serializerHandler = serializerHandler;
sender->admin = admin;
sender->protocolSvcId = protocolSvcId;
sender->protocol = prot;
@@ -154,7 +140,6 @@
if (uuid != NULL) {
uuid_parse(uuid, sender->fwUUID);
}
- sender->metricsEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_METRICS_ENABLED, PSA_ZMQ_DEFAULT_METRICS_ENABLED);
sender->zeroCopyEnabled = celix_bundleContext_getPropertyAsBool(ctx, PSA_ZMQ_ZEROCOPY_ENABLED, PSA_ZMQ_DEFAULT_ZEROCOPY_ENABLED);
pubsubInterceptorsHandler_create(ctx, scope, topic, &sender->interceptorsHandler);
@@ -305,20 +290,6 @@
return sender;
}
-static void pubsub_zmqTopicSender_destroyEntry(psa_zmq_send_msg_entry_t *msgEntry) {
- celixThreadMutex_destroy(&msgEntry->metrics.mutex);
- if(msgEntry->headerBuffer != NULL) {
- free(msgEntry->headerBuffer);
- }
- if(msgEntry->metadataBuffer != NULL) {
- free(msgEntry->metadataBuffer);
- }
- if(msgEntry->footerBuffer != NULL) {
- free(msgEntry->footerBuffer);
- }
- free(msgEntry);
-}
-
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender) {
if (sender != NULL) {
celix_bundleContext_unregisterService(sender->ctx, sender->publisher.svcId);
@@ -326,21 +297,7 @@
zsock_destroy(&sender->zmq.socket);
celixThreadMutex_lock(&sender->boundedServices.mutex);
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- if (entry != NULL) {
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter2);
- pubsub_zmqTopicSender_destroyEntry(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
-
- free(entry);
- }
- }
- hashMap_destroy(sender->boundedServices.map, false, false);
+ hashMap_destroy(sender->boundedServices.map, false, true);
celixThreadMutex_unlock(&sender->boundedServices.mutex);
celixThreadMutex_destroy(&sender->boundedServices.mutex);
@@ -352,13 +309,15 @@
}
free(sender->topic);
free(sender->url);
- free((void*)sender->serializerType);
+ free(sender->zmqBuffers.headerBuffer);
+ free(sender->zmqBuffers.metadataBuffer);
+ free(sender->zmqBuffers.footerBuffer);
free(sender);
}
}
const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender) {
- return sender->serializerType;
+ return pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
}
long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender) {
@@ -383,10 +342,7 @@
static int psa_zmq_localMsgTypeIdForMsgType(void* handle, const char* msgType, unsigned int* msgTypeId) {
psa_zmq_bounded_service_entry_t *entry = (psa_zmq_bounded_service_entry_t *) handle;
- int64_t rc = pubsub_zmqAdmin_getMessageIdForMessageFqn(entry->parent->admin, entry->parent->serializerType, msgType);
- if(rc >= 0) {
- *msgTypeId = (unsigned int)rc;
- }
+ *msgTypeId = pubsub_serializerHandler_getMsgId(entry->parent->serializerHandler, msgType);
return 0;
}
@@ -403,7 +359,6 @@
entry->getCount = 1;
entry->parent = sender;
entry->bndId = bndId;
- entry->msgEntries = hashMap_create(NULL, NULL, NULL, NULL);
entry->service.handle = entry;
entry->service.localMsgTypeIdForMsgType = psa_zmq_localMsgTypeIdForMsgType;
entry->service.send = psa_zmq_topicPublicationSend;
@@ -426,297 +381,175 @@
if (entry != NULL && entry->getCount == 0) {
//free entry
hashMap_remove(sender->boundedServices.map, (void*)bndId);
-
- hash_map_iterator_t iter = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_send_msg_entry_t *msgEntry = hashMapIterator_nextValue(&iter);
- pubsub_zmqTopicSender_destroyEntry(msgEntry);
- }
- hashMap_destroy(entry->msgEntries, false, false);
free(entry);
}
celixThreadMutex_unlock(&sender->boundedServices.mutex);
}
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender) {
- pubsub_admin_sender_metrics_t *result = calloc(1, sizeof(*result));
- snprintf(result->scope, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->scope == NULL ? PUBSUB_DEFAULT_ENDPOINT_SCOPE : sender->scope);
- snprintf(result->topic, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", sender->topic);
- celixThreadMutex_lock(&sender->boundedServices.mutex);
- size_t count = 0;
- hash_map_iterator_t iter = hashMapIterator_construct(sender->boundedServices.map);
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- hashMapIterator_nextValue(&iter2);
- count += 1;
- }
- }
-
- result->msgMetrics = calloc(count, sizeof(*result));
-
- iter = hashMapIterator_construct(sender->boundedServices.map);
- int i = 0;
- while (hashMapIterator_hasNext(&iter)) {
- psa_zmq_bounded_service_entry_t *entry = hashMapIterator_nextValue(&iter);
- hash_map_iterator_t iter2 = hashMapIterator_construct(entry->msgEntries);
- while (hashMapIterator_hasNext(&iter2)) {
- psa_zmq_send_msg_entry_t *mEntry = hashMapIterator_nextValue(&iter2);
- celixThreadMutex_lock(&mEntry->metrics.mutex);
- result->msgMetrics[i].nrOfMessagesSend = mEntry->metrics.nrOfMessagesSend;
- result->msgMetrics[i].nrOfMessagesSendFailed = mEntry->metrics.nrOfMessagesSendFailed;
- result->msgMetrics[i].nrOfSerializationErrors = mEntry->metrics.nrOfSerializationErrors;
- result->msgMetrics[i].averageSerializationTimeInSeconds = mEntry->metrics.averageSerializationTimeInSeconds;
- result->msgMetrics[i].averageTimeBetweenMessagesInSeconds = mEntry->metrics.averageTimeBetweenMessagesInSeconds;
- result->msgMetrics[i].lastMessageSend = mEntry->metrics.lastMessageSend;
- result->msgMetrics[i].bndId = entry->bndId;
- result->msgMetrics[i].typeId = mEntry->type;
- snprintf(result->msgMetrics[i].typeFqn, PUBSUB_AMDIN_METRICS_NAME_MAX, "%s", mEntry->fqn);
- i += 1;
- celixThreadMutex_unlock(&mEntry->metrics.mutex);
- }
- }
-
- celixThreadMutex_unlock(&sender->boundedServices.mutex);
- result->nrOfmsgMetrics = (int)count;
- return result;
-}
-
static void psa_zmq_freeMsg(void *msg, void *hint) {
psa_zmq_zerocopy_free_entry *entry = hint;
- entry->msgSer->svc->freeSerializedMsg(entry->msgSer->svc->handle, entry->serializedOutput, entry->serializedOutputLen);
+ pubsub_serializerHandler_freeSerializedMsg(entry->serHandler, entry->msgId, entry->serializedOutput, entry->serializedOutputLen);
free(entry);
}
-static void psa_zmq_unlockData(void *unused __attribute__((unused)), void *hint) {
- psa_zmq_send_msg_entry_t *entry = hint;
- __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
-}
-
static int psa_zmq_topicPublicationSend(void* handle, unsigned int msgTypeId, const void *inMsg, celix_properties_t *metadata) {
- int status = CELIX_SUCCESS;
psa_zmq_bounded_service_entry_t *bound = handle;
pubsub_zmq_topic_sender_t *sender = bound->parent;
- bool monitor = sender->metricsEnabled;
- psa_zmq_serializer_entry_t *serializer = pubsub_zmqAdmin_acquireSerializerForMessageId(sender->admin, sender->serializerType, msgTypeId);
-
- if(serializer == NULL) {
- pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
- L_WARN("[PSA_ZMQ_TS] Error cannot serialize message with serType %s msg type id %i for scope/topic %s/%s", sender->serializerType, msgTypeId, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
- return CELIX_SERVICE_EXCEPTION;
+ const char* msgFqn;
+ int majorVersion;
+ int minorversion;
+ celix_status_t status = pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, &msgFqn, &majorVersion, &minorversion);
+ if (status != CELIX_SUCCESS) {
+ L_WARN("Cannot find serializer for msg id %u for serializer %s", msgTypeId,
+ pubsub_serializerHandler_getSerializationType(sender->serializerHandler));
+ return status;
}
- psa_zmq_send_msg_entry_t *entry = hashMap_get(bound->msgEntries, (void*)(uintptr_t)(msgTypeId));
+ bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, &metadata);
+ if (!cont) {
+ L_DEBUG("Cancel send based on pubsub interceptor cancel return");
+ return status;
+ }
- //metrics updates
- struct timespec sendTime = { 0, 0 };
- struct timespec serializationStart;
- struct timespec serializationEnd;
- //int unknownMessageCountUpdate = 0;
- int sendErrorUpdate = 0;
- int serializationErrorUpdate = 0;
- int sendCountUpdate = 0;
+ size_t serializedIoVecOutputLen = 0; //entry->serializedIoVecOutputLen;
+ struct iovec *serializedIoVecOutput = NULL;
+ status = pubsub_serializerHandler_serialize(sender->serializerHandler, msgTypeId, inMsg, &serializedIoVecOutput, &serializedIoVecOutputLen);
- if(entry == NULL) {
- entry = calloc(1, sizeof(psa_zmq_send_msg_entry_t));
- entry->protSer = sender->protocol;
- entry->type = msgTypeId;
- entry->fqn = serializer->fqn;
- celix_version_t* version = celix_version_createVersionFromString(serializer->version);
- entry->major = (uint8_t)celix_version_getMajor(version);
- entry->minor = (uint8_t)celix_version_getMinor(version);
- celix_version_destroy(version);
- uuid_copy(entry->originUUID, sender->fwUUID);
- celixThreadMutex_create(&entry->metrics.mutex, NULL);
- hashMap_put(bound->msgEntries, (void*)(uintptr_t)msgTypeId, entry);
+ if (status != CELIX_SUCCESS /*serialization not ok*/) {
+ L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", msgFqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ return status;
}
delay_first_send_for_late_joiners(sender);
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationStart);
- }
- size_t serializedOutputLen = 0;
- struct iovec *serializedOutput = NULL;
- status = serializer->svc->serialize(serializer->svc->handle, inMsg, &serializedOutput, &serializedOutputLen);
-
- if (monitor) {
- clock_gettime(CLOCK_REALTIME, &serializationEnd);
+ // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
+ // Also protect sender->zmqBuffers (header, meta and footer)
+ bool expected = false;
+ while(!__atomic_compare_exchange_n(&sender->zmqBuffers.dataLock, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
+ expected = false;
+ usleep(5);
}
- if (status == CELIX_SUCCESS /*ser ok*/) {
- // Some ZMQ functions are not thread-safe, but this atomic compare exchange ensures one access at a time.
- bool expected = false;
- while(!__atomic_compare_exchange_n(&entry->dataLocked, &expected, true, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)) {
- expected = false;
- usleep(500);
- }
+ pubsub_protocol_message_t message;
+ message.payload.payload = serializedIoVecOutput->iov_base;
+ message.payload.length = serializedIoVecOutput->iov_len;
- bool cont = pubsubInterceptorHandler_invokePreSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, &metadata);
- if (cont) {
+ void *payloadData = NULL;
+ size_t payloadLength = 0;
+ sender->protocol->encodePayload(sender->protocol->handle, &message, &payloadData, &payloadLength);
- pubsub_protocol_message_t message;
- message.payload.payload = serializedOutput->iov_base;
- message.payload.length = serializedOutput->iov_len;
-
- void *payloadData = NULL;
- size_t payloadLength = 0;
- entry->protSer->encodePayload(entry->protSer->handle, &message, &payloadData, &payloadLength);
-
- if (metadata != NULL) {
- message.metadata.metadata = metadata;
- entry->protSer->encodeMetadata(entry->protSer->handle, &message, &entry->metadataBuffer, &entry->metadataBufferSize);
- } else {
- message.metadata.metadata = NULL;
- }
-
- entry->protSer->encodeFooter(entry->protSer->handle, &message, &entry->footerBuffer, &entry->footerBufferSize);
-
- message.header.msgId = msgTypeId;
- message.header.seqNr = entry->seqNr;
- message.header.msgMajorVersion = 0;
- message.header.msgMinorVersion = 0;
- message.header.payloadSize = payloadLength;
- message.header.metadataSize = entry->metadataBufferSize;
- message.header.payloadPartSize = payloadLength;
- message.header.payloadOffset = 0;
- message.header.isLastSegment = 1;
- message.header.convertEndianess = 0;
-
- // increase seqNr
- entry->seqNr++;
-
- entry->protSer->encodeHeader(entry->protSer->handle, &message, &entry->headerBuffer, &entry->headerBufferSize);
-
- errno = 0;
- bool sendOk;
-
- if (bound->parent->zeroCopyEnabled) {
-
- zmq_msg_t msg1; // Header
- zmq_msg_t msg2; // Payload
- zmq_msg_t msg3; // Metadata
- zmq_msg_t msg4; // Footer
- void *socket = zsock_resolve(sender->zmq.socket);
- psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry));
- freeMsgEntry->msgSer = serializer;
- freeMsgEntry->serializedOutput = serializedOutput;
- freeMsgEntry->serializedOutputLen = serializedOutputLen;
-
- zmq_msg_init_data(&msg1, entry->headerBuffer, entry->headerBufferSize, psa_zmq_unlockData, entry);
- //send header
- int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
- if (rc == -1) {
- L_WARN("Error sending header msg. %s", strerror(errno));
- zmq_msg_close(&msg1);
- }
-
- //send Payload
- if (rc > 0) {
- int flag = ((entry->metadataBufferSize > 0) || (entry->footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
- zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
- rc = zmq_msg_send(&msg2, socket, flag);
- if (rc == -1) {
- L_WARN("Error sending payload msg. %s", strerror(errno));
- zmq_msg_close(&msg2);
- }
- }
-
- //send MetaData
- if (rc > 0 && entry->metadataBufferSize > 0) {
- int flag = (entry->footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
- zmq_msg_init_data(&msg3, entry->metadataBuffer, entry->metadataBufferSize, NULL, NULL);
- rc = zmq_msg_send(&msg3, socket, flag);
- if (rc == -1) {
- L_WARN("Error sending metadata msg. %s", strerror(errno));
- zmq_msg_close(&msg3);
- }
- }
-
- //send Footer
- if (rc > 0 && entry->footerBufferSize > 0) {
- zmq_msg_init_data(&msg4, entry->footerBuffer, entry->footerBufferSize, NULL, NULL);
- rc = zmq_msg_send(&msg4, socket, 0);
- if (rc == -1) {
- L_WARN("Error sending footer msg. %s", strerror(errno));
- zmq_msg_close(&msg4);
- }
- }
-
- sendOk = rc > 0;
- } else {
- //no zero copy
- zmsg_t *msg = zmsg_new();
- zmsg_addmem(msg, entry->headerBuffer, entry->headerBufferSize);
- zmsg_addmem(msg, payloadData, payloadLength);
- if (entry->metadataBufferSize > 0) {
- zmsg_addmem(msg, entry->metadataBuffer, entry->metadataBufferSize);
- }
- if (entry->footerBufferSize > 0) {
- zmsg_addmem(msg, entry->footerBuffer, entry->footerBufferSize);
- }
- int rc = zmsg_send(&msg, sender->zmq.socket);
- sendOk = rc == 0;
-
- if (!sendOk) {
- zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
- }
-
- // Note: serialized Payload is deleted by serializer
- if (payloadData && (payloadData != message.payload.payload)) {
- free(payloadData);
- }
-
- __atomic_store_n(&entry->dataLocked, false, __ATOMIC_RELEASE);
- }
- pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, serializer->fqn, msgTypeId, inMsg, metadata);
-
- if (message.metadata.metadata) {
- celix_properties_destroy(message.metadata.metadata);
- }
- if (!bound->parent->zeroCopyEnabled && serializedOutput) {
- serializer->svc->freeSerializedMsg(serializer->svc->handle, serializedOutput, serializedOutputLen);
- }
-
- if (sendOk) {
- sendCountUpdate = 1;
- } else {
- sendErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
- }
- } else {
- L_WARN("no continue");
- }
+ if (metadata != NULL) {
+ message.metadata.metadata = metadata;
+ sender->protocol->encodeMetadata(sender->protocol->handle, &message, &sender->zmqBuffers.metadataBuffer, &sender->zmqBuffers.metadataBufferSize);
} else {
- serializationErrorUpdate = 1;
- L_WARN("[PSA_ZMQ_TS] Error serialize message of type %s for scope/topic %s/%s", serializer->fqn, sender->scope == NULL ? "(null)" : sender->scope, sender->topic);
+ message.metadata.metadata = NULL;
}
- pubsub_zmqAdmin_releaseSerializer(sender->admin, serializer);
+ sender->protocol->encodeFooter(sender->protocol->handle, &message, &sender->zmqBuffers.footerBuffer, &sender->zmqBuffers.footerBufferSize);
- if (monitor && entry != NULL) {
- celixThreadMutex_lock(&entry->metrics.mutex);
+ message.header.msgId = msgTypeId;
+ message.header.seqNr = __atomic_fetch_add(&sender->seqNr, 1, __ATOMIC_RELAXED);
+ message.header.msgMajorVersion = majorVersion;
+ message.header.msgMinorVersion = minorversion;
+ message.header.payloadSize = payloadLength;
+ message.header.metadataSize = sender->zmqBuffers.metadataBufferSize;
+ message.header.payloadPartSize = payloadLength;
+ message.header.payloadOffset = 0;
+ message.header.isLastSegment = 1;
+ message.header.convertEndianess = 0;
- long n = entry->metrics.nrOfMessagesSend + entry->metrics.nrOfMessagesSendFailed;
- double diff = celix_difftime(&serializationStart, &serializationEnd);
- double average = (entry->metrics.averageSerializationTimeInSeconds * n + diff) / (n+1);
- entry->metrics.averageSerializationTimeInSeconds = average;
+ sender->protocol->encodeHeader(sender->protocol->handle, &message, &sender->zmqBuffers.headerBuffer, &sender->zmqBuffers.headerBufferSize);
- if (entry->metrics.nrOfMessagesSend > 2) {
- diff = celix_difftime(&entry->metrics.lastMessageSend, &sendTime);
- n = entry->metrics.nrOfMessagesSend;
- average = (entry->metrics.averageTimeBetweenMessagesInSeconds * n + diff) / (n+1);
- entry->metrics.averageTimeBetweenMessagesInSeconds = average;
+ errno = 0;
+ bool sendOk;
+ if (bound->parent->zeroCopyEnabled) {
+ zmq_msg_t msg1; // Header
+ zmq_msg_t msg2; // Payload
+ zmq_msg_t msg3; // Metadata
+ zmq_msg_t msg4; // Footer
+ void *socket = zsock_resolve(sender->zmq.socket);
+ psa_zmq_zerocopy_free_entry *freeMsgEntry = malloc(sizeof(psa_zmq_zerocopy_free_entry)); //NOTE should be improved. Not really zero copy
+ freeMsgEntry->serHandler = sender->serializerHandler;
+ freeMsgEntry->msgId = msgTypeId;
+ freeMsgEntry->serializedOutput = serializedIoVecOutput;
+ freeMsgEntry->serializedOutputLen = serializedIoVecOutputLen;
+
+ zmq_msg_init_data(&msg1, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize, NULL, NULL);
+ //send header
+ int rc = zmq_msg_send(&msg1, socket, ZMQ_SNDMORE);
+ if (rc == -1) {
+ L_WARN("Error sending header msg. %s", strerror(errno));
+ zmq_msg_close(&msg1);
}
- entry->metrics.lastMessageSend = sendTime;
- entry->metrics.nrOfMessagesSend += sendCountUpdate;
- entry->metrics.nrOfMessagesSendFailed += sendErrorUpdate;
- entry->metrics.nrOfSerializationErrors += serializationErrorUpdate;
+ //send Payload
+ if (rc > 0) {
+ int flag = ((sender->zmqBuffers.metadataBufferSize > 0) || (sender->zmqBuffers.footerBufferSize > 0)) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg2, payloadData, payloadLength, psa_zmq_freeMsg, freeMsgEntry);
+ rc = zmq_msg_send(&msg2, socket, flag);
+ if (rc == -1) {
+ L_WARN("Error sending payload msg. %s", strerror(errno));
+ zmq_msg_close(&msg2);
+ }
+ }
- celixThreadMutex_unlock(&entry->metrics.mutex);
+ //send MetaData
+ if (rc > 0 && sender->zmqBuffers.metadataBufferSize > 0) {
+ int flag = (sender->zmqBuffers.footerBufferSize > 0 ) ? ZMQ_SNDMORE : 0;
+ zmq_msg_init_data(&msg3, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize, NULL, NULL);
+ rc = zmq_msg_send(&msg3, socket, flag);
+ if (rc == -1) {
+ L_WARN("Error sending metadata msg. %s", strerror(errno));
+ zmq_msg_close(&msg3);
+ }
+ }
+
+ //send Footer
+ if (rc > 0 && sender->zmqBuffers.footerBufferSize > 0) {
+ zmq_msg_init_data(&msg4, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize, NULL, NULL);
+ rc = zmq_msg_send(&msg4, socket, 0);
+ if (rc == -1) {
+ L_WARN("Error sending footer msg. %s", strerror(errno));
+ zmq_msg_close(&msg4);
+ }
+ }
+ sendOk = rc > 0;
+ } else {
+ //no zero copy
+ zmsg_t *msg = zmsg_new();
+ zmsg_addmem(msg, sender->zmqBuffers.headerBuffer, sender->zmqBuffers.headerBufferSize);
+ zmsg_addmem(msg, payloadData, payloadLength);
+ if (sender->zmqBuffers.metadataBufferSize > 0) {
+ zmsg_addmem(msg, sender->zmqBuffers.metadataBuffer, sender->zmqBuffers.metadataBufferSize);
+ }
+ if (sender->zmqBuffers.footerBufferSize > 0) {
+ zmsg_addmem(msg, sender->zmqBuffers.footerBuffer, sender->zmqBuffers.footerBufferSize);
+ }
+ int rc = zmsg_send(&msg, sender->zmq.socket);
+ sendOk = rc == 0;
+
+ if (!sendOk) {
+ zmsg_destroy(&msg); //if send was not ok, no owner change -> destroy msg
+ }
+
+ // Note: serialized Payload is deleted by serializer
+ if (payloadData && (payloadData != message.payload.payload)) {
+ free(payloadData);
+ }
+ }
+ __atomic_store_n(&sender->zmqBuffers.dataLock, false, __ATOMIC_RELEASE);
+ pubsubInterceptorHandler_invokePostSend(sender->interceptorsHandler, msgFqn, msgTypeId, inMsg, metadata);
+
+ if (message.metadata.metadata) {
+ celix_properties_destroy(message.metadata.metadata);
+ }
+ if (!bound->parent->zeroCopyEnabled && serializedIoVecOutput) {
+ pubsub_serializerHandler_freeSerializedMsg(sender->serializerHandler, msgTypeId, serializedIoVecOutput, serializedIoVecOutputLen);
+ }
+
+ if (!sendOk) {
+ L_WARN("[PSA_ZMQ_TS] Error sending zmg. %s", strerror(errno));
}
return status;
diff --git a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
index 744f653..bb49a2a 100644
--- a/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
+++ b/bundles/pubsub/pubsub_admin_zmq/v2/src/pubsub_zmq_topic_sender.h
@@ -20,7 +20,7 @@
#ifndef CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
#define CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_serializer_handler.h"
#include "celix_bundle_context.h"
#include "pubsub_admin_metrics.h"
#include "celix_log_helper.h"
@@ -32,7 +32,7 @@
celix_log_helper_t *logHelper,
const char *scope,
const char *topic,
- const char* serializerType,
+ pubsub_serializer_handler_t* serializerHandler,
void *admin,
long protocolSvcId,
pubsub_protocol_service_t *prot,
@@ -40,6 +40,7 @@
const char *staticBindUrl,
unsigned int basePort,
unsigned int maxPort);
+
void pubsub_zmqTopicSender_destroy(pubsub_zmq_topic_sender_t *sender);
const char* pubsub_zmqTopicSender_scope(pubsub_zmq_topic_sender_t *sender);
@@ -50,9 +51,5 @@
const char* pubsub_zmqTopicSender_serializerType(pubsub_zmq_topic_sender_t *sender);
long pubsub_zmqTopicSender_protocolSvcId(pubsub_zmq_topic_sender_t *sender);
-/**
- * Returns a array of pubsub_admin_sender_msg_type_metrics_t entries for every msg_type/bundle send with the topic sender.
- */
-pubsub_admin_sender_metrics_t* pubsub_zmqTopicSender_metrics(pubsub_zmq_topic_sender_t *sender);
#endif //CELIX_PUBSUB_ZMQ_TOPIC_SENDER_H
diff --git a/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
index 78ae8e1..59e4ca5 100644
--- a/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_avrobin/gtest/CMakeLists.txt
@@ -24,7 +24,7 @@
add_executable(test_pubsub_serializer_avrobin
src/PubSubAvrobinSerializationProviderTestSuite.cc
)
-target_link_libraries(test_pubsub_serializer_avrobin PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_link_libraries(test_pubsub_serializer_avrobin PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main Celix::pubsub_spi)
target_compile_options(test_pubsub_serializer_avrobin PRIVATE -std=c++14) #Note test code is allowed to be C++14
add_dependencies(test_pubsub_serializer_avrobin celix_pubsub_serializer_avrobin_bundle pubsub_avrobin_serialization_descriptor_bundle)
diff --git a/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c b/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
index 9c81e05..2008613 100644
--- a/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
+++ b/bundles/pubsub/pubsub_serializer_avrobin/src/pubsub_avrobin_serialization_provider.c
@@ -110,7 +110,7 @@
}
pubsub_serialization_provider_t* pubsub_avrobinSerializationProvider_create(celix_bundle_context_t* ctx) {
- pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "avrobin", 0, pubsub_avrobinSerializationProvider_serialize, pubsub_avrobinSerializationProvider_freeSerializeMsg, pubsub_avrobinSerializationProvider_deserialize, pubsub_avrobinSerializationProvider_freeDeserializeMsg);
+ pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "avrobin", false, 0, pubsub_avrobinSerializationProvider_serialize, pubsub_avrobinSerializationProvider_freeSerializeMsg, pubsub_avrobinSerializationProvider_deserialize, pubsub_avrobinSerializationProvider_freeDeserializeMsg);
avrobinSerializer_logSetup(dfi_log, pubsub_serializationProvider_getLogHelper(provider), 1);
return provider;
}
diff --git a/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt
index a065ebe..9a07c94 100644
--- a/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_serializer_json/gtest/CMakeLists.txt
@@ -24,7 +24,7 @@
add_executable(test_pubsub_serializer_json
src/PubSubJsonSerializationProviderTestSuite.cc
)
-target_link_libraries(test_pubsub_serializer_json PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_link_libraries(test_pubsub_serializer_json PRIVATE Celix::framework Celix::dfi Celix::pubsub_utils GTest::gtest GTest::gtest_main Celix::pubsub_spi)
target_compile_options(test_pubsub_serializer_json PRIVATE -std=c++14) #Note test code is allowed to be C++14
add_dependencies(test_pubsub_serializer_json celix_pubsub_serializer_json_bundle pubsub_json_serialization_descriptor_bundle)
diff --git a/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c b/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c
index 605d108..a43f6b5 100644
--- a/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c
+++ b/bundles/pubsub/pubsub_serializer_json/src/pubsub_json_serialization_provider.c
@@ -106,7 +106,7 @@
}
pubsub_serialization_provider_t* pubsub_jsonSerializationProvider_create(celix_bundle_context_t* ctx) {
- pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "json", 0, pubsub_jsonSerializationProvider_serialize, pubsub_jsonSerializationProvider_freeSerializeMsg, pubsub_jsonSerializationProvider_deserialize, pubsub_jsonSerializationProvider_freeDeserializeMsg);
+ pubsub_serialization_provider_t* provider = pubsub_serializationProvider_create(ctx, "json", true, 0, pubsub_jsonSerializationProvider_serialize, pubsub_jsonSerializationProvider_freeSerializeMsg, pubsub_jsonSerializationProvider_deserialize, pubsub_jsonSerializationProvider_freeDeserializeMsg);
jsonSerializer_logSetup(dfi_log, pubsub_serializationProvider_getLogHelper(provider), 1);;
return provider;
}
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
new file mode 100644
index 0000000..a4ff06b
--- /dev/null
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_marker.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_
+
+#include "hash_map.h"
+#include "version.h"
+#include "celix_bundle.h"
+#include "sys/uio.h"
+
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME "pubsub_message_serialization_marker"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION "1.0.0"
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_RANGE "[1,2)"
+
+/**
+ * @brief Service property (named "serialization.type") identifying the serialization type (e.g json, avrobin, etc)
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY "serialization.type"
+
+/**
+ * @brief Service property (named "serialization.backwards.compatible") identifying whether the serialization is
+ * backwards compatible (i.e. for json a extra - new - field can be safely ignored and is thus backwards compatible.
+ *
+ * Type if boolean. If service property is not present, false will be used.
+ */
+#define PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE "serialization.backwards.compatible"
+
+/**
+ * @brief Marker interface - interface with no methods - to indicate that a serialization.type is available.
+ *
+ * This marker interface is used to indicate that a serialization type is available without the need to rely on
+ * pubsub message serialization service per msg type.
+ * The service.ranking of this marker interface must be used to select a serialization type if no serialization type is
+ * configured. The service.ranking of individual pubsub_messge_serization_service is used to override serializers per
+ * type.
+ *
+ * The properties serialization.type is mandatory
+ */
+typedef struct pubsub_message_serialization_marker {
+ void* handle;
+} pubsub_message_serialization_marker_t;
+
+#endif /* PUBSUB_MESSAGE_SERIALIZATION_MARKER_H_ */
diff --git a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
index 4464cb3..09df5a5 100644
--- a/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
+++ b/bundles/pubsub/pubsub_spi/include/pubsub_message_serialization_service.h
@@ -25,15 +25,6 @@
#include "celix_bundle.h"
#include "sys/uio.h"
-/**
- * There should be a pubsub_serializer_t
- * per msg type (msg id) per bundle
- *
- * The pubsub_serializer_service can create
- * a serializer_map per bundle. Potentially using
- * the extender pattern.
- */
-
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME "pubsub_message_serialization_service"
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_VERSION "1.0.0"
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE "[1,2)"
@@ -44,7 +35,7 @@
#define PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY "msg.id"
/**
- * A message serialization service for a serialization type (e.g. json) and
+ * @brief A message serialization service for a serialization type (e.g. json) and
* for a specific msg type (based on the fully qualified name) and version.
*
* The properties serialization.type, msg,fqn, msg.version and msg.id are mandatory
@@ -53,7 +44,7 @@
void* handle;
/**
- * Serialize a message into iovec structs (set of structures with buffer pointer and length)
+ * @brief Serialize a message into iovec structs (set of structures with buffer pointer and length)
*
* The correct message serialization services will be selected based on the provided msgId.
*
@@ -67,12 +58,12 @@
celix_status_t (*serialize)(void* handle, const void* input, struct iovec** output, size_t* outputIovLen);
/**
- * Free the memory of for the serialized msg.
+ * @brief Free the memory of for the serialized msg.
*/
void (*freeSerializedMsg)(void* handle, struct iovec* input, size_t inputIovLen);
/**
- * Deserialize a message using the provided iovec buffers.
+ * @brief Deserialize a message using the provided iovec buffers.
*
* The deserialize function will also check if the target major/minor version of the message is valid with the version
* of the serialized data.
@@ -95,7 +86,7 @@
celix_status_t (*deserialize)(void* handle, const struct iovec* input, size_t inputIovLen, void** out); //note inputLen can be 0 if predefined size is not needed
/**
- * Free the memory for the deserialized message.
+ * @brief Free the memory for the deserialized message.
*/
void (*freeDeserializedMsg)(void* handle, void* msg);
diff --git a/bundles/pubsub/pubsub_utils/CMakeLists.txt b/bundles/pubsub/pubsub_utils/CMakeLists.txt
index 8426737..0c285aa 100644
--- a/bundles/pubsub/pubsub_utils/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_utils/CMakeLists.txt
@@ -28,7 +28,8 @@
$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include>
$<INSTALL_INTERFACE:include/celix/pubsub_utils>
)
-target_link_libraries(pubsub_utils PUBLIC Celix::framework Celix::pubsub_api Celix::pubsub_spi Celix::log_helper Celix::shell_api)
+target_link_libraries(pubsub_utils PUBLIC Celix::framework Celix::pubsub_api Celix::log_helper Celix::shell_api)
+target_link_libraries(pubsub_utils PRIVATE Celix::pubsub_spi)
add_library(Celix::pubsub_utils ALIAS pubsub_utils)
diff --git a/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt b/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt
index ad7cce4..a883776 100644
--- a/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt
+++ b/bundles/pubsub/pubsub_utils/gtest/CMakeLists.txt
@@ -51,7 +51,7 @@
src/PubSubSerializationProviderTestSuite.cc
src/PubSubMatchingTestSuite.cpp
)
-target_link_libraries(test_pubsub_utils PRIVATE Celix::framework Celix::pubsub_utils GTest::gtest GTest::gtest_main)
+target_link_libraries(test_pubsub_utils PRIVATE Celix::framework Celix::pubsub_utils GTest::gtest GTest::gtest_main Celix::pubsub_spi)
target_compile_options(test_pubsub_utils PRIVATE -std=c++14) #Note test code is allowed to be C++14
add_test(NAME test_pubsub_utils COMMAND test_pubsub_utils)
setup_target_for_coverage(test_pubsub_utils SCAN_DIR ..)
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
index ff26f25..10555a3 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubMatchingTestSuite.cpp
@@ -20,16 +20,17 @@
#include "gtest/gtest.h"
#include <memory>
-
-#include <celix_api.h>
-#include "pubsub_serializer_handler.h"
-#include "dyn_message.h"
#include <cstdarg>
-#include <pubsub_protocol.h>
-#include <pubsub_constants.h>
-#include <pubsub_matching.h>
-#include <pubsub/api.h>
-#include <pubsub_endpoint.h>
+
+#include "celix_api.h"
+#include "pubsub_message_serialization_service.h"
+#include "dyn_message.h"
+#include "pubsub_protocol.h"
+#include "pubsub_constants.h"
+#include "pubsub_matching.h"
+#include "pubsub/api.h"
+#include "pubsub_endpoint.h"
+#include "pubsub_message_serialization_marker.h"
static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
va_list ap;
@@ -54,70 +55,46 @@
bndId = celix_bundleContext_installBundle(ctx.get(), MATCHING_BUNDLE, true);
dynMessage_logSetup(stdLog, NULL, 1);
-
- msgSerSvc.handle = this;
- msgSerSvc.serialize = [](void*, const void*, struct iovec**, size_t*) -> celix_status_t {
- return CELIX_SUCCESS;
- };
- msgSerSvc.freeSerializedMsg = [](void*, struct iovec* , size_t) {
- };
- msgSerSvc.deserialize = [](void*, const struct iovec*, size_t, void**) -> celix_status_t {
- return CELIX_SUCCESS;
- };
- msgSerSvc.freeDeserializedMsg = [](void*, void*) {
- };
}
~PubSubMatchingTestSuite() override {
celix_bundleContext_uninstallBundle(ctx.get(), bndId);
}
- long registerSerSvc(const char* type, uint32_t msgId, const char* msgFqn, const char* msgVersion, long ranking) {
+ long registerMarkerSerSvc(const char* type) {
auto* p = celix_properties_create();
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, type);
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_ID_PROPERTY, std::to_string(msgId).c_str());
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_FQN_PROPERTY, msgFqn);
- celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_SERVICE_MSG_VERSION_PROPERTY, msgVersion);
- celix_properties_setLong(p, OSGI_FRAMEWORK_SERVICE_RANKING, ranking);
+ celix_properties_set(p, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, type);
celix_service_registration_options_t opts{};
- opts.svc = static_cast<void*>(&msgSerSvc);
+ opts.svc = static_cast<void*>(&serMarkerSvc);
opts.properties = p;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_VERSION;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION;
return celix_bundleContext_registerServiceWithOptions(ctx.get(), &opts);
}
std::shared_ptr<celix_framework_t> fw{};
std::shared_ptr<celix_bundle_context_t> ctx{};
- pubsub_message_serialization_service_t msgSerSvc{};
+ pubsub_message_serialization_marker_t serMarkerSvc{};
pubsub_protocol_service_t protocolSvc{};
long bndId{};
};
TEST_F(PubSubMatchingTestSuite, MatchPublisherSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
-
+ auto serMarkerId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
-
pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
- EXPECT_EQ(foundSvcId, serId);
-
- celix_bundleContext_unregisterService(ctx.get(), serId);
+ EXPECT_EQ(foundSvcId, serMarkerId);
+ celix_bundleContext_unregisterService(ctx.get(), serMarkerId);
}
TEST_F(PubSubMatchingTestSuite, MatchPublisherMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
-
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
-
pubsub_utils_matchPublisher(ctx.get(), bndId, "(&(objectClass=pubsub.publisher)(service.lang=C)(topic=fiets))", "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
- EXPECT_EQ(foundSvcId, serFiets2Id);
-
+ EXPECT_EQ(foundSvcId, serFietsId); //older service are ranked higher
celix_bundleContext_unregisterService(ctx.get(), serFietsId);
celix_bundleContext_unregisterService(ctx.get(), serFiets2Id);
celix_bundleContext_unregisterService(ctx.get(), serAutoId);
@@ -125,26 +102,23 @@
}
TEST_F(PubSubMatchingTestSuite, MatchSubscriberSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+ auto serId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
auto* p = celix_properties_create();
celix_properties_set(p, PUBSUB_SUBSCRIBER_SCOPE, "scope");
celix_properties_set(p, PUBSUB_SUBSCRIBER_TOPIC, "fiets");
-
pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
-
EXPECT_EQ(foundSvcId, serId);
-
celix_properties_destroy(p);
celix_bundleContext_unregisterService(ctx.get(), serId);
}
TEST_F(PubSubMatchingTestSuite, MatchSubscriberMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
@@ -154,7 +128,7 @@
pubsub_utils_matchSubscriber(ctx.get(), bndId, p, "admin?", 0, 0, 0, false, NULL, &foundSvcId, NULL);
- EXPECT_EQ(foundSvcId, serFiets2Id);
+ EXPECT_EQ(foundSvcId, serFietsId);
celix_properties_destroy(p);
celix_bundleContext_unregisterService(ctx.get(), serFietsId);
@@ -164,7 +138,7 @@
}
TEST_F(PubSubMatchingTestSuite, MatchEndpointSimple) {
- auto serId = registerSerSvc("fiets", 1, "fiets", "1", 0);
+ auto serId = registerMarkerSerSvc("fiets");
long foundSvcId = -1;
@@ -183,10 +157,10 @@
}
TEST_F(PubSubMatchingTestSuite, MatchEndpointMultiple) {
- auto serFietsId = registerSerSvc("fiets", 1, "fiets", "1", 0);
- auto serFiets2Id = registerSerSvc("fiets", 1, "fiets", "1", 8);
- auto serAutoId = registerSerSvc("auto", 2, "auto", "1", 5);
- auto serBelId = registerSerSvc("bel", 3, "bel", "1", 10);
+ auto serFietsId = registerMarkerSerSvc("fiets");
+ auto serFiets2Id = registerMarkerSerSvc("fiets");
+ auto serAutoId = registerMarkerSerSvc("auto");
+ auto serBelId = registerMarkerSerSvc("bel");
long foundSvcId = -1;
@@ -197,7 +171,7 @@
pubsub_utils_matchEndpoint(ctx.get(), logHelper, ep, "admin?", false, &foundSvcId, NULL);
- EXPECT_EQ(foundSvcId, serFiets2Id);
+ EXPECT_EQ(foundSvcId, serFietsId);
celix_properties_destroy(ep);
celix_logHelper_destroy(logHelper);
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
index ce7948f..77a2fa0 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationHandlerTestSuite.cc
@@ -20,11 +20,13 @@
#include "gtest/gtest.h"
#include <memory>
+#include <cstdarg>
-#include <celix_api.h>
+#include "celix_api.h"
+#include "pubsub_message_serialization_service.h"
#include "pubsub_serializer_handler.h"
#include "dyn_message.h"
-#include <cstdarg>
+#include "pubsub_message_serialization_marker.h"
static void stdLog(void*, int level, const char *file, int line, const char *msg, ...) {
va_list ap;
@@ -97,6 +99,7 @@
TEST_F(PubSubSerializationHandlerTestSuite, CreateDestroy) {
auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
ASSERT_TRUE(handler != nullptr);
+ ASSERT_STREQ("json", pubsub_serializerHandler_getSerializationType(handler));
pubsub_serializerHandler_destroy(handler);
}
@@ -108,7 +111,6 @@
EXPECT_EQ(42, pubsub_serializerHandler_getMsgId(handler, "example::Msg"));
auto *fqn = pubsub_serializerHandler_getMsgFqn(handler, 42);
EXPECT_STREQ("example::Msg", fqn);
- free(fqn);
EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 0));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
@@ -175,6 +177,8 @@
EXPECT_TRUE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
celix_bundleContext_unregisterService(ctx.get(), svcId1);
celix_bundleContext_unregisterService(ctx.get(), svcId2);
@@ -193,6 +197,8 @@
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 1, 14));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 1));
EXPECT_FALSE(pubsub_serializerHandler_isMessageSupported(handler, 42, 2, 0));
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMajorVersion(handler, 42), 1);
+ EXPECT_EQ(pubsub_serializerHandler_getMsgMinorVersion(handler, 42), 0);
celix_bundleContext_unregisterService(ctx.get(), svcId1);
pubsub_serializerHandler_destroy(handler);
@@ -260,4 +266,51 @@
celix_bundleContext_unregisterService(ctx.get(), svcId1);
pubsub_serializerHandler_destroy(handler);
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, CreateHandlerFromMarker) {
+ auto* logHelper = celix_logHelper_create(ctx.get(), "test");
+ auto* marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), 1032 /*invalid*/, logHelper);
+ EXPECT_FALSE(marker); //non existing svc
+
+ pubsub_message_serialization_marker_t markerSvc;
+ long svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, NULL);
+ EXPECT_GE(svcId, 0);
+ marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+ EXPECT_FALSE(marker); //missing ser type service property
+ celix_bundleContext_unregisterService(ctx.get(), svcId);
+
+ auto* props = celix_properties_create();
+ celix_properties_set(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, "test");
+ svcId = celix_bundleContext_registerService(ctx.get(), &markerSvc, PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, props);
+ EXPECT_GE(svcId, 0);
+ marker = pubsub_serializerHandler_createForMarkerService(ctx.get(), svcId, logHelper);
+ EXPECT_TRUE(marker);
+ EXPECT_STREQ("test", pubsub_serializerHandler_getSerializationType(marker));
+ celix_bundleContext_unregisterService(ctx.get(), svcId);
+ pubsub_serializerHandler_destroy(marker);
+
+ celix_logHelper_destroy(logHelper);
+}
+
+TEST_F(PubSubSerializationHandlerTestSuite, GetMsgInfo) {
+ auto *handler = pubsub_serializerHandler_create(ctx.get(), "json", true);
+ EXPECT_FALSE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+ EXPECT_EQ(CELIX_ILLEGAL_ARGUMENT, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+
+ long svcId1 = registerSerSvc("json", 42, "example::Msg1", "1.0.0");
+ EXPECT_TRUE(pubsub_serializerHandler_isMessageSerializationServiceAvailable(handler, 42));
+ EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, nullptr, nullptr, nullptr));
+
+ const char* msgFqn;
+ int major;
+ int minor;
+ EXPECT_EQ(CELIX_SUCCESS, pubsub_serializerHandler_getMsgInfo(handler, 42, &msgFqn, &major, &minor));
+ EXPECT_STREQ("example::Msg1", msgFqn);
+ EXPECT_EQ(1, major);
+ EXPECT_EQ(0, minor);
+
+ celix_bundleContext_unregisterService(ctx.get(), svcId1);
+ pubsub_serializerHandler_destroy(handler);
}
\ No newline at end of file
diff --git a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc
index a7f660c..e9ec48d 100644
--- a/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc
+++ b/bundles/pubsub/pubsub_utils/gtest/src/PubSubSerializationProviderTestSuite.cc
@@ -21,7 +21,8 @@
#include <memory>
-#include <celix_api.h>
+#include "celix_api.h"
+#include "pubsub_message_serialization_marker.h"
#include "pubsub_serialization_provider.h"
class PubSubSerializationProviderTestSuite : public ::testing::Test {
@@ -47,12 +48,14 @@
TEST_F(PubSubSerializationProviderTestSuite, CreateDestroy) {
//checks if the bundles are started and stopped correctly (no mem leaks).
- auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", 0, nullptr, nullptr, nullptr, nullptr);
+ auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", false, 0, nullptr, nullptr, nullptr, nullptr);
+ auto count = celix_bundleContext_useService(ctx.get(), PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME, nullptr, nullptr);
+ EXPECT_EQ(1, count);
pubsub_serializationProvider_destroy(provider);
}
TEST_F(PubSubSerializationProviderTestSuite, FindSerializationServices) {
- auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", 0, nullptr, nullptr, nullptr, nullptr);
+ auto* provider = pubsub_serializationProvider_create(ctx.get(), "test", false, 0, nullptr, nullptr, nullptr, nullptr);
size_t nrEntries = pubsub_serializationProvider_nrOfEntries(provider);
EXPECT_EQ(5, nrEntries);
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h b/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h
index 030c0bb..c5bfab2 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serialization_provider.h
@@ -49,10 +49,14 @@
bool valid;
const char* invalidReason;
+
+ //custom user data, will initialized to NULL. If freeUserData is set during destruction of the entry, this will be called.
+ void* userData;
+ void (*freeUserData)(void* userData);
} pubsub_serialization_entry_t;
/**
- * Creates A (descriptor based) Serialization Provider.
+ * @brief Creates A (descriptor based) Serialization Provider.
*
* The provider monitors bundles and creates pubsub message serialization services for every unique descriptor found.
*
@@ -73,6 +77,8 @@
*
* @param ctx The bundle context
* @param serializationType The serialization type (e.g. 'json')
+ * @param backwardsCompatible Whether the serializer can deserialize data if the minor version is higher. (note true for JSON)
+ * Will be used to set the 'serialization.backwards.compatible' service property for the pusbub_message_serialization_marker
* @param serializationServiceRanking The service raking used for the serialization marker service.
* @param serialize The serialize function to use
* @param freeSerializeMsg The freeSerializeMsg function to use
@@ -84,6 +90,7 @@
pubsub_serialization_provider_t *pubsub_serializationProvider_create(
celix_bundle_context_t *ctx,
const char* serializationType,
+ bool backwardsCompatible,
long serializationServiceRanking,
celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen),
void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen),
@@ -97,17 +104,17 @@
/**
- * Returns the number of valid entries.
+ * @brief Returns the number of valid entries.
*/
size_t pubsub_serializationProvider_nrOfEntries(pubsub_serialization_provider_t *provider);
/**
- * Returns the number of invalid entries.
+ * @brief Returns the number of invalid entries.
*/
size_t pubsub_serializationProvider_nrOfInvalidEntries(pubsub_serialization_provider_t *provider);
/**
- * Returns the log helper of the serialization provider.
+ * @brief Returns the log helper of the serialization provider.
*/
celix_log_helper_t* pubsub_serializationProvider_getLogHelper(pubsub_serialization_provider_t *provider);
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
index b873901..f2c58ac 100644
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
+++ b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_handler.h
@@ -21,10 +21,10 @@
#define CELIX_PUBSUB_SERIALIZER_HANDLER_H
#include <stdint.h>
+#include <sys/uio.h>
+#include "celix_log_helper.h"
#include "celix_api.h"
-#include "pubsub_message_serialization_service.h"
-
#ifdef __cplusplus
extern "C" {
@@ -34,8 +34,10 @@
/**
- * Creates a handler which track pubsub_custom_msg_serialization_service services with a (serialization.type=<serializerType)) filter.
- * If multiple pubsub_message_serialization_service for the same msg fqn (targeted.msg.fqn property) the highest ranking service will be used.
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the provided serialization type.
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
*
* The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
* and that only one version for a message serialization is registered.
@@ -53,14 +55,36 @@
*/
pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible);
-
-void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler);
-
-void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
-void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
+/**
+ * @brief Creates a pubsub serializer handler which tracks pubsub_custom_msg_serialization_service services using the serialization type of the provided
+ * marker service.id
+ *
+ * If there are multiple pubsub_message_serialization_service services for the same msg fqn
+ * (targeted.msg.fqn property) the highest ranking service will be used.
+ *
+ * The message handler assumes (and checks) that all provided serialization services do not clash in message ids (so every msgId should have its own msgFqn)
+ * and that only one version for a message serialization is registered.
+ * This means that all bundles in a single celix container (a single process) should all use the same version of messages.
+ *
+ * If backwards compatibility is supported, when serialized message with a higher minor version when available in the serializer handler are used to
+ * deserialize. This could be supported for serialization like json.
+ * So when a json message of version 1.1.x with content {"field1":"value1", "field2":"value2"} is deserialized to a version 1.0 (which only has field1),
+ * the message can and will be deserialized
+ *
+ * @param ctx The bundle contest.
+ * @param pubsubSerializerMarkerSvcId The service.id of the pubsub_serialization_marker to use for deferring serializationType and backwardsCompatible.
+ * @param logHelper Optional log helper. If provided will be used to log issues whit creating a serializer handler for the provided marker svc id.
+ * @return A newly created pubsub serializer handler.
+ */
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper);
/**
- * Serialize a message into iovec structs (set of structures with buffer pointer and length)
+ * @brief destroy the pubsub_serializer_handler and free the used memory.
+ */
+void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Serialize a message into iovec structs (set of structures with buffer pointer and length)
*
* The correct message serialization services will be selected based on the provided msgId.
*
@@ -74,12 +98,12 @@
celix_status_t pubsub_serializerHandler_serialize(pubsub_serializer_handler_t* handler, uint32_t msgId, const void* input, struct iovec** output, size_t* outputIovLen);
/**
- * Free the memory of for the serialized msg.
+ * @brief Free the memory of for the serialized msg.
*/
celix_status_t pubsub_serializerHandler_freeSerializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, struct iovec* input, size_t inputIovLen);
/**
- * Deserialize a message using the provided iovec buffers.
+ * @brief Deserialize a message using the provided iovec buffers.
*
* The deserialize function will also check if the target major/minor version of the message is valid with the version
* of the serialized data.
@@ -102,34 +126,79 @@
celix_status_t pubsub_serializerHandler_deserialize(pubsub_serializer_handler_t* handler, uint32_t msgId, int serializedMajorVersion, int serializedMinorVersion, const struct iovec* input, size_t inputIovLen, void** out);
/**
- * Free the memory for the deserialized message.
+ * @brief Free the memory for the deserialized message.
*/
celix_status_t pubsub_serializerHandler_freeDeserializedMsg(pubsub_serializer_handler_t* handler, uint32_t msgId, void* msg);
/**
- * Whether the msg is support. More specifically:
+ * @brief Whether the msg is support. More specifically:
* - msg id is known and
* - a serialized msg with the provided major and minor version can be deserialized.
*/
bool pubsub_serializerHandler_isMessageSupported(pubsub_serializer_handler_t* handler, uint32_t msgId, int majorVersion, int minorVersion);
/**
- * Get msg fqn from a msg id.
- * @return msg fqn or NULL if msg id is not known.
+ * @brief Whether the serializer handler has found 1 or more pubsub_message_serialization_service for the provided msg id.
*/
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId);
/**
- * Get a msg id from a msgFqn.
+ * @brief Get msg fqn from a msg id.
+ * @return msg fqn or NULL if msg id is not known. msg fqn is valid as long as the handler exists.
+ */
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
+ * @brief Get a msg id from a msgFqn.
* @return msg id or 0 if msg fqn is not known.
*/
uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler, const char* msgFqn);
/**
- * nr of serialization services found.
+ * @brief nr of serialization services found.
*/
size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Get the serializer type for this hanlder.
+ *
+ * Valid as long as the handler exists.
+ */
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler);
+
+/**
+ * @brief Returns the major version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+/**
+ * @brief Returns the minor version part of a message version.
+ *
+ * Returns -1 if message cannot be found.
+ */
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId);
+
+
+/**
+ * @brief Returns msg info (fqn, major version, minor version) in 1 call.
+ *
+ * @param handler The serializer handler
+ * @param msgId The msg id where to get the info for
+ * @param msgFqnOut If not NULL will be set to the msgFqn (valid as long as the serializer handler is valid)
+ * @param msgMajorVersionOut If not NULL will be set to the msg major version
+ * @param msgMinorVersionOut If not NULL will be set to the msg minor version
+ * @return CELIX_SUCCESS on success, CELIX_ILLEGAL_ARGUMENT if the message for the provided msg id cannot be found.
+ */
+celix_status_t pubsub_serializerHandler_getMsgInfo(
+ pubsub_serializer_handler_t* handler,
+ uint32_t msgId,
+ const char** msgFqnOut,
+ int* msgMajorVersionOut,
+ int* msgMinorVersionOut);
+
#ifdef __cplusplus
}
#endif
diff --git a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h b/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
deleted file mode 100644
index c64e62e..0000000
--- a/bundles/pubsub/pubsub_utils/include/pubsub_serializer_provider.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-#ifndef CELIX_PUBSUB_SERIALIZER_PROVIDER_H
-#define CELIX_PUBSUB_SERIALIZER_PROVIDER_H
-
-typedef struct pubsub_serializer_provider pubsub_serializer_provider_t; //opaque type
-
-
-/**
- * Creates a handler which track bundles and registers pubsub_custom_msg_serialization_service
- * for every descriptor file found for json serialization.
- *
- * Added properties:
- * serialization.type=json
- * targeted.msg.fqn=<descriptor fqn>
- * targeted.msg.id=<msg fqn hash or msg id annotated in the descriptor>
- * targeted.msg.version=<msg version in descriptor if present> (optional)
- * service.ranking=0
- *
- * For descriptor found multiple times (same fqn and version) only the first one is registered
- *
- */
-pubsub_serializer_provider_t* pubsub_providerHandler_create(celix_bundle_context_t* ctx, const char *serializerType /* i.e. json */);
-
-void pubsub_providerHandler_destroy(pubsub_serializer_provider_t* handler);
-
-void pubsub_providerHandler_addBundle(pubsub_serializer_provider_t* handler, const celix_bundle_t *bnd);
-void pubsub_providerHandler_removeBundle(pubsub_serializer_provider_t* handler, const celix_bundle_t *bnd);
-
-//note can be used for shell commands
-void pubsub_providerHandler_printRegisteredSerializer(pubsub_serializer_provider_t* handler, FILE *stream);
-
-#endif //CELIX_PUBSUB_SERIALIZER_PROVIDER_H
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
index b2a78df..6c90743 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_matching.c
@@ -7,10 +7,10 @@
#include "celix_bundle.h"
-#include <pubsub_endpoint.h>
-#include <pubsub_admin.h>
-#include <pubsub_protocol.h>
-#include <pubsub_message_serialization_service.h>
+#include "pubsub_endpoint.h"
+#include "pubsub_protocol.h"
+#include "pubsub_admin.h"
+#include "pubsub_message_serialization_marker.h"
struct ps_utils_serializer_selection_data {
@@ -36,22 +36,19 @@
static long getPSSerializer(celix_bundle_context_t *ctx, const char *requested_serializer) {
long svcId = -1L;
+ celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+
if (requested_serializer != NULL) {
char filter[512];
- int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
+ int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
if (written > 512) {
fprintf(stderr, "Cannot create serializer filter. need more than 512 char array\n");
} else {
- celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
opts.filter = filter;
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
} else {
- celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
- opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
- opts.ignoreServiceLanguage = true;
-
//note findService will automatically return the highest ranking service id
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
index d008191..b8b8503 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serialization_provider.c
@@ -24,6 +24,7 @@
#include <dirent.h>
#include <string.h>
+#include "pubsub_message_serialization_marker.h"
#include "celix_constants.h"
#include "dyn_function.h"
#include "celix_version.h"
@@ -71,6 +72,9 @@
celix_shell_command_t cmdSvc;
long cmdSvcId;
+ pubsub_message_serialization_marker_t markerSvc;
+ long markerSvcId;
+
celix_thread_mutex_t mutex; //protects below
celix_array_list_t *serializationSvcEntries; //key = pubsub_serialization_entry;
};
@@ -586,6 +590,7 @@
pubsub_serialization_provider_t *pubsub_serializationProvider_create(
celix_bundle_context_t *ctx,
const char* serializationType,
+ bool backwardsCompatible,
long serializationServiceRanking,
celix_status_t (*serialize)(pubsub_serialization_entry_t* entry, const void* msg, struct iovec** output, size_t* outputIovLen),
void (*freeSerializeMsg)(pubsub_serialization_entry_t* entry, struct iovec* input, size_t inputIovLen),
@@ -608,6 +613,7 @@
dynCommon_logSetup(dfi_log, provider, 1);
{
+ //Start bundle tracker and register pubsub_message_serialization services
celix_bundle_tracking_options_t opts = CELIX_EMPTY_BUNDLE_TRACKING_OPTIONS;
opts.callbackHandle = provider;
opts.onInstalled = pubsub_serializationProvider_onInstalledBundle;
@@ -616,13 +622,13 @@
}
{
+ //Register shell command to query serializers
provider->cmdSvc.handle = provider;
provider->cmdSvc.executeCommand = pubsub_serializationProvider_executeCommand;
char *name = NULL;
asprintf(&name,"celix::%s_message_serialization", provider->serializationType);
char *usage = NULL;
- //TODO add support for listing invalid entries
asprintf(&usage,"celix::%s_message_serialization [verbose | invalids | <msg id> | <msg fqn>]", provider->serializationType);
celix_properties_t* props = celix_properties_create();
@@ -640,11 +646,28 @@
free(name);
free(usage);
}
+
+ {
+ //Register pubsub_message_serialization_marker service to indicate the availability of this message serialization type.
+ celix_properties_t* props = celix_properties_create();
+ provider->markerSvc.handle = provider;
+ celix_properties_set(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, provider->serializationType);
+ celix_properties_setBool(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE, backwardsCompatible);
+ celix_service_registration_options_t opts = CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+ opts.svc = &provider->markerSvc;
+ opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.serviceVersion = PUBSUB_MESSAGE_SERIALIZATION_MARKER_VERSION;
+ opts.properties = props;
+ provider->markerSvcId = celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+ }
+
return provider;
}
void pubsub_serializationProvider_destroy(pubsub_serialization_provider_t* provider) {
if (provider != NULL) {
+ celix_bundleContext_unregisterService(provider->ctx, provider->markerSvcId);
+
celix_bundleContext_stopTracker(provider->ctx, provider->bundleTrackerId);
celix_bundleContext_unregisterService(provider->ctx, provider->cmdSvcId);
@@ -652,6 +675,9 @@
for (int i = 0; i < celix_arrayList_size(provider->serializationSvcEntries); ++i) {
pubsub_serialization_entry_t *entry = celix_arrayList_get(provider->serializationSvcEntries, i);
celix_bundleContext_unregisterService(provider->ctx, entry->svcId);
+ if (entry->freeUserData) {
+ entry->freeUserData(entry->userData);
+ }
free(entry->descriptorContent);
free(entry->readFromEntryPath);
free(entry->msgVersionStr);
diff --git a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
index f5962f3..e339795 100644
--- a/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
+++ b/bundles/pubsub/pubsub_utils/src/pubsub_serializer_handler.c
@@ -22,9 +22,11 @@
#include <string.h>
+#include "pubsub_message_serialization_marker.h"
#include "celix_version.h"
#include "pubsub_message_serialization_service.h"
#include "celix_log_helper.h"
+#include "celix_constants.h"
#define L_DEBUG(...) \
celix_logHelper_debug(handler->logHelper, __VA_ARGS__)
@@ -40,20 +42,26 @@
const celix_properties_t *properties;
uint32_t msgId;
celix_version_t* msgVersion;
- char* msgFqn;
+ const char* msgFqn;
pubsub_message_serialization_service_t* svc;
} pubsub_serialization_service_entry_t;
struct pubsub_serializer_handler {
celix_bundle_context_t* ctx;
+ char* filter;
+ char* serType;
bool backwardCompatible;
long serializationSvcTrackerId;
celix_log_helper_t *logHelper;
celix_thread_rwlock_t lock;
hash_map_t *serializationServices; //key = msg id, value = sorted array list with pubsub_serialization_service_entry_t*
+ hash_map_t *msgFullyQualifiedNames; //key = msg id, value = msg fqn. Non destructive map with msg fqn
};
+static void pubsub_serializerHandler_addSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
+static void pubsub_serializerHandler_removeSerializationService(pubsub_serializer_handler_t* handler, pubsub_message_serialization_service_t* svc, const celix_properties_t* svcProperties);
+
static void addSerializationService(void *handle, void* svc, const celix_properties_t *props) {
pubsub_serializer_handler_t* handler = handle;
pubsub_message_serialization_service_t* serSvc = svc;
@@ -66,7 +74,7 @@
pubsub_serializerHandler_removeSerializationService(handler, serSvc, props);
}
-int compareEntries(const void *a, const void *b) {
+static int compareEntries(const void *a, const void *b) {
const pubsub_serialization_service_entry_t* aEntry = a;
const pubsub_serialization_service_entry_t* bEntry = b;
@@ -98,61 +106,101 @@
return compatible;
}
-static const char* getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
- //NOTE assumes mutex is locked
- const char *result = NULL;
- celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
- if (entries != NULL) {
- pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
- result = entry->msgFqn;
- }
- return result;
-}
-
pubsub_serializer_handler_t* pubsub_serializerHandler_create(celix_bundle_context_t* ctx, const char* serializerType, bool backwardCompatible) {
pubsub_serializer_handler_t* handler = calloc(1, sizeof(*handler));
handler->ctx = ctx;
+ handler->serType = celix_utils_strdup(serializerType);
handler->backwardCompatible = backwardCompatible;
handler->logHelper = celix_logHelper_create(ctx, "celix_pubsub_serialization_handler");
celixThreadRwlock_create(&handler->lock, NULL);
handler->serializationServices = hashMap_create(NULL, NULL, NULL, NULL);
+ handler->msgFullyQualifiedNames = hashMap_create(NULL, NULL, NULL, NULL);
- char *filter = NULL;
- asprintf(&filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
+ asprintf(&handler->filter, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_SERVICE_SERIALIZATION_TYPE_PROPERTY, serializerType);
celix_service_tracking_options_t opts = CELIX_EMPTY_SERVICE_TRACKING_OPTIONS;
opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_NAME;
opts.filter.versionRange = PUBSUB_MESSAGE_SERIALIZATION_SERVICE_RANGE;
- opts.filter.filter = filter;
+ opts.filter.filter = handler->filter;
opts.callbackHandle = handler;
opts.addWithProperties = addSerializationService;
opts.removeWithProperties = removeSerializationService;
- handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptions(ctx, &opts);
- free(filter);
+ handler->serializationSvcTrackerId = celix_bundleContext_trackServicesWithOptionsAsync(ctx, &opts);
+
return handler;
}
+struct pubsub_serializerHandler_callback_data {
+ celix_bundle_context_t* ctx;
+ celix_log_helper_t* logHelper;
+ pubsub_serializer_handler_t* handler;
+};
+
+static void pubsub_serializerHandler_useMarkerSvcCallback(void *handle, void* svc __attribute__((unused)), const celix_properties_t* props) {
+ struct pubsub_serializerHandler_callback_data* data = handle;
+ const char* serType = celix_properties_get(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, NULL);
+ bool backwardsCompatible = celix_properties_getAsBool(props, PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_BACKWARDS_COMPATIBLE, false);
+ if (serType != NULL) {
+ data->handler = pubsub_serializerHandler_create(data->ctx, serType, backwardsCompatible);
+ } else if (data->logHelper != NULL) {
+ celix_logHelper_error(
+ data->logHelper,
+ "Cannot create serializer handler because service %s does not have a %s service property",
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY);
+ }
+}
+
+pubsub_serializer_handler_t* pubsub_serializerHandler_createForMarkerService(celix_bundle_context_t* ctx, long pubsubSerializerMarkerSvcId, celix_log_helper_t* logHelper) {
+ struct pubsub_serializerHandler_callback_data data;
+ memset(&data, 0, sizeof(data));
+ data.ctx = ctx;
+ data.logHelper = logHelper;
+
+ char filter[32];
+ snprintf(filter, 32, "(%s=%li)", OSGI_FRAMEWORK_SERVICE_ID, pubsubSerializerMarkerSvcId);
+ celix_service_use_options_t opts = CELIX_EMPTY_SERVICE_USE_OPTIONS;
+ opts.filter.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
+ opts.filter.filter = filter;
+ opts.callbackHandle = &data;
+ opts.useWithProperties = pubsub_serializerHandler_useMarkerSvcCallback;
+ bool called = celix_bundleContext_useServiceWithOptions(ctx, &opts);
+ if (!called && logHelper != NULL) {
+ celix_logHelper_error(
+ logHelper,
+ "Cannot find %s service for service id %li",
+ PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME,
+ pubsubSerializerMarkerSvcId);
+ }
+ return data.handler;
+}
+
+static void pubsub_serializerHandler_destroyCallback(void* data) {
+ pubsub_serializer_handler_t* handler = data;
+ celixThreadRwlock_destroy(&handler->lock);
+ hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
+ while (hashMapIterator_hasNext(&iter)) {
+ celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
+ for (int i = 0; i < celix_arrayList_size(entries); ++i) {
+ pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
+ celix_version_destroy(entry->msgVersion);
+ free(entry);
+ }
+ celix_arrayList_destroy(entries);
+ }
+ hashMap_destroy(handler->serializationServices, false, false);
+ hashMap_destroy(handler->msgFullyQualifiedNames, false, true);
+ celix_logHelper_destroy(handler->logHelper);
+ free(handler->serType);
+ free(handler->filter);
+ free(handler);
+}
void pubsub_serializerHandler_destroy(pubsub_serializer_handler_t* handler) {
if (handler != NULL) {
- celix_bundleContext_stopTracker(handler->ctx, handler->serializationSvcTrackerId);
- celixThreadRwlock_destroy(&handler->lock);
- hash_map_iterator_t iter = hashMapIterator_construct(handler->serializationServices);
- while (hashMapIterator_hasNext(&iter)) {
- celix_array_list_t *entries = hashMapIterator_nextValue(&iter);
- for (int i = 0; i < celix_arrayList_size(entries); ++i) {
- pubsub_serialization_service_entry_t* entry = celix_arrayList_get(entries, i);
- free(entry->msgFqn);
- celix_version_destroy(entry->msgVersion);
- free(entry);
- }
- celix_arrayList_destroy(entries);
- }
- hashMap_destroy(handler->serializationServices, false, false);
- celix_logHelper_destroy(handler->logHelper);
- free(handler);
+ celix_bundleContext_stopTrackerAsync(handler->ctx, handler->serializationSvcTrackerId, handler, pubsub_serializerHandler_destroyCallback);
}
}
@@ -190,6 +238,12 @@
}
if (valid) {
+ char* fqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
+ if (fqn == NULL) {
+ fqn = celix_utils_strdup(msgFqn);
+ hashMap_put(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId, fqn);
+ }
+
celix_array_list_t *entries = hashMap_get(handler->serializationServices, (void *) (uintptr_t) msgId);
if (entries == NULL) {
entries = celix_arrayList_create();
@@ -197,17 +251,18 @@
pubsub_serialization_service_entry_t *entry = calloc(1, sizeof(*entry));
entry->svcId = svcId;
entry->properties = svcProperties;
- entry->msgFqn = celix_utils_strdup(msgFqn);
+ entry->msgFqn = fqn;
entry->msgId = msgId;
entry->msgVersion = msgVersion;
entry->svc = svc;
celix_arrayList_add(entries, entry);
celix_arrayList_sort(entries, compareEntries);
- hashMap_put(handler->serializationServices, (void *) (uintptr_t) msgId, entries);
+ hashMap_put(handler->serializationServices, (void*)(uintptr_t)msgId, entries);
} else {
celix_version_destroy(msgVersion);
}
+
celixThreadRwlock_unlock(&handler->lock);
}
@@ -233,7 +288,6 @@
}
}
if (found != NULL) {
- free(found->msgFqn);
celix_version_destroy(found->msgVersion);
free(found);
}
@@ -322,12 +376,18 @@
return compatible;
}
-char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+bool pubsub_serializerHandler_isMessageSerializationServiceAvailable(pubsub_serializer_handler_t* handler, uint32_t msgId) {
celixThreadRwlock_readLock(&handler->lock);
- char *msgFqn = celix_utils_strdup(getMsgFqn(handler, msgId));
+ void* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ celixThreadRwlock_unlock(&handler->lock);
+ return entries != NULL;
+}
+
+const char* pubsub_serializerHandler_getMsgFqn(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ char *msgFqn = hashMap_get(handler->msgFullyQualifiedNames, (void*)(uintptr_t)msgId);
celixThreadRwlock_unlock(&handler->lock);
return msgFqn;
-
}
uint32_t pubsub_serializerHandler_getMsgId(pubsub_serializer_handler_t* handler, const char* msgFqn) {
@@ -344,6 +404,29 @@
celixThreadRwlock_unlock(&handler->lock);
return result;
}
+int pubsub_serializerHandler_getMsgMajorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ int major = -1;
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ major = celix_version_getMajor(entry->msgVersion);
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return major;
+}
+
+int pubsub_serializerHandler_getMsgMinorVersion(pubsub_serializer_handler_t* handler, uint32_t msgId) {
+ celixThreadRwlock_readLock(&handler->lock);
+ int major = -1;
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ major = celix_version_getMinor(entry->msgVersion);
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return major;
+}
size_t pubsub_serializerHandler_messageSerializationServiceCount(pubsub_serializer_handler_t* handler) {
size_t count = 0;
@@ -355,4 +438,35 @@
}
celixThreadRwlock_unlock(&handler->lock);
return count;
+}
+
+const char* pubsub_serializerHandler_getSerializationType(pubsub_serializer_handler_t* handler) {
+ return handler->serType;
+}
+
+int pubsub_serializerHandler_getMsgInfo(
+ pubsub_serializer_handler_t* handler,
+ uint32_t msgId,
+ const char** msgFqnOut,
+ int* msgMajorVersionOut,
+ int* msgMinorVersionOut) {
+ int result = CELIX_SUCCESS;
+ celixThreadRwlock_readLock(&handler->lock);
+ celix_array_list_t* entries = hashMap_get(handler->serializationServices, (void*)(uintptr_t)msgId);
+ if (entries != NULL) {
+ pubsub_serialization_service_entry_t *entry = celix_arrayList_get(entries, 0); //NOTE if an entries exists, there is at least 1 entry.
+ if (msgFqnOut != NULL) {
+ *msgFqnOut = entry->msgFqn;
+ }
+ if (msgMajorVersionOut != NULL) {
+ *msgMajorVersionOut = celix_version_getMajor(entry->msgVersion);
+ }
+ if (msgMinorVersionOut != NULL) {
+ *msgMinorVersionOut = celix_version_getMinor(entry->msgVersion);
+ }
+ } else {
+ result = CELIX_ILLEGAL_ARGUMENT;
+ }
+ celixThreadRwlock_unlock(&handler->lock);
+ return result;
}
\ No newline at end of file
diff --git a/cmake/cmake_celix/Generic.cmake b/cmake/cmake_celix/Generic.cmake
index ae82881..e44d3fa 100644
--- a/cmake/cmake_celix/Generic.cmake
+++ b/cmake/cmake_celix/Generic.cmake
@@ -30,7 +30,6 @@
function(add_celix_bundle_dependencies)
list(GET ARGN 0 TARGET)
list(REMOVE_AT ARGN 0)
- message("TEST")
foreach(BUNDLE_TARGET IN LISTS ARGN)
if (TARGET ${BUNDLE_TARGET})
get_target_property(IMPORT ${BUNDLE_TARGET} BUNDLE_IMPORTED)
diff --git a/libs/framework/src/service_registry.c b/libs/framework/src/service_registry.c
index 4e2b0c4..f73e48d 100644
--- a/libs/framework/src/service_registry.c
+++ b/libs/framework/src/service_registry.c
@@ -591,15 +591,17 @@
if (valid) {
serviceRegistration_retain(registration);
serviceReference_increaseUsage(reference, &count);
- } else {
- *out = NULL; //invalid service registration
+ if (count == 1) {
+ serviceRegistration_getService(registration, bundle, &service);
+ serviceReference_setService(reference, service);
+ }
}
celixThreadRwlock_unlock(®istry->lock);
-
- if (count == 1) {
- serviceRegistration_getService(registration, bundle, &service);
- serviceReference_setService(reference, service);
+ if(!valid) {
+ *out = NULL;
+ return status;
}
+
serviceRegistration_release(registration);
/* NOTE the out argument of sr_getService should be 'const void**'
@@ -1313,4 +1315,4 @@
} else {
fw_log(registry->framework->logger, CELIX_LOG_LEVEL_ERROR, "Cannot unregister service for service id %li. This id is not present or owned by the provided bundle (bnd id %li)", serviceId, celix_bundle_getId(bnd));
}
-}
\ No newline at end of file
+}
diff --git a/libs/utils/include/celix_byteswap.h b/libs/utils/include/celix_byteswap.h
index 47c413a..aa60e09 100644
--- a/libs/utils/include/celix_byteswap.h
+++ b/libs/utils/include/celix_byteswap.h
@@ -24,7 +24,7 @@
#if defined(__APPLE__)
/* Swap bytes in 16 bit value. */
#define bswap_16(x) \
- ((((x) & 0xff00) << 8) | (((x) & 0x00ff) >> 8))
+ ((((x) & 0xff00) >> 8) | (((x) & 0x00ff) << 8))
#define __bswap_16 bswap_16
/* Swap bytes in 32 bit value. */