blob: 6c9074302e6e642a38fc02b03c593389cbd54bd4 [file] [log] [blame]
#include <string.h>
#include "celix_constants.h"
#include "celix_filter.h"
#include "pubsub_utils.h"
#include "celix_bundle.h"
#include "pubsub_endpoint.h"
#include "pubsub_protocol.h"
#include "pubsub_admin.h"
#include "pubsub_message_serialization_marker.h"
struct ps_utils_serializer_selection_data {
const char *requested_serializer;
long matchingSvcId;
long matchingRanking;
};
struct ps_utils_protocol_selection_data {
const char *requested_protocol;
long matchingSvcId;
};
typedef struct ps_utils_retrieve_topic_properties_data {
const char *topic;
const char *scope;
bool isPublisher;
celix_properties_t *outEndpoint;
} ps_utils_retrieve_topic_properties_data_t;
static long getPSSerializer(celix_bundle_context_t *ctx, const char *requested_serializer) {
long svcId = -1L;
celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
opts.serviceName = PUBSUB_MESSAGE_SERIALIZATION_MARKER_NAME;
if (requested_serializer != NULL) {
char filter[512];
int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_MESSAGE_SERIALIZATION_MARKER_SERIALIZATION_TYPE_PROPERTY, requested_serializer);
if (written > 512) {
fprintf(stderr, "Cannot create serializer filter. need more than 512 char array\n");
} else {
opts.filter = filter;
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
} else {
//note findService will automatically return the highest ranking service id
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
return svcId;
}
static double getPSScore(const char *requested_admin, const char *request_qos, const char *adminType, double sampleScore, double controlScore, double defaultScore) {
double score;
if (requested_admin != NULL && strncmp(requested_admin, adminType, strlen(adminType)) == 0) {
/* We got precise specification on the pubsub_admin we want */
//Full match
score = PUBSUB_ADMIN_FULL_MATCH_SCORE;
} else if (requested_admin != NULL) {
//admin type requested, but no match -> do not select this psa
score = PUBSUB_ADMIN_NO_MATCH_SCORE;
} else if (request_qos != NULL && strncmp(request_qos, PUBSUB_UTILS_QOS_TYPE_SAMPLE, strlen(PUBSUB_UTILS_QOS_TYPE_SAMPLE)) == 0) {
//qos match
score = sampleScore;
} else if (request_qos != NULL && strncmp(request_qos, PUBSUB_UTILS_QOS_TYPE_CONTROL, strlen(PUBSUB_UTILS_QOS_TYPE_CONTROL)) == 0) {
//qos match
score = controlScore;
} else if (request_qos != NULL) {
//note unsupported qos -> defaultScore
score = defaultScore;
} else {
//default match
score = defaultScore;
}
return score;
}
static long getPSProtocol(celix_bundle_context_t *ctx, const char *requested_protocol) {
long svcId = -1L;
if (requested_protocol != NULL) {
char filter[512];
int written = snprintf(filter, 512, "(%s=%s)", PUBSUB_PROTOCOL_TYPE_KEY, requested_protocol);
if (written > 512) {
fprintf(stderr, "Cannot create protocol filter. need more than 512 char array\n");
} else {
celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
opts.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
opts.filter = filter;
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
} else {
celix_service_filter_options_t opts = CELIX_EMPTY_SERVICE_FILTER_OPTIONS;
opts.serviceName = PUBSUB_PROTOCOL_SERVICE_NAME;
opts.ignoreServiceLanguage = true;
//note findService will automatically return the highest ranking service id
svcId = celix_bundleContext_findServiceWithOptions(ctx, &opts);
}
return svcId;
}
static void getTopicPropertiesCallback(void *handle, const celix_bundle_t *bnd) {
ps_utils_retrieve_topic_properties_data_t *data = handle;
data->outEndpoint = pubsub_utils_getTopicProperties(bnd, data->scope, data->topic, data->isPublisher);
}
double pubsub_utils_matchPublisher(
celix_bundle_context_t *ctx,
long bundleId,
const char *filter,
const char *adminType,
double sampleScore,
double controlScore,
double defaultScore,
bool matchProtocol,
celix_properties_t **outTopicProperties,
long *outSerializerSvcId,
long *outProtocolSvcId) {
celix_properties_t *ep = pubsubEndpoint_createFromPublisherTrackerInfo(ctx, bundleId, filter);
const char *requested_admin = NULL;
const char *requested_qos = NULL;
requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
double score = getPSScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
const char *requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
long serializerSvcId = getPSSerializer(ctx, requested_serializer);
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
}
long protocolSvcId = -1;
if (matchProtocol) {
const char *requested_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
protocolSvcId = getPSProtocol(ctx, requested_protocol);
if (outProtocolSvcId != NULL) {
*outProtocolSvcId = protocolSvcId;
}
}
if (serializerSvcId < 0) {
score = PUBSUB_ADMIN_NO_MATCH_SCORE;
} else if (matchProtocol && protocolSvcId < 0) {
score = PUBSUB_ADMIN_NO_MATCH_SCORE;
}
if (outTopicProperties != NULL) {
*outTopicProperties = ep;
} else {
celix_properties_destroy(ep);
}
return score;
}
double pubsub_utils_matchSubscriber(
celix_bundle_context_t *ctx,
const long svcProviderBundleId,
const celix_properties_t *svcProperties,
const char *adminType,
double sampleScore,
double controlScore,
double defaultScore,
bool matchProtocol,
celix_properties_t **outTopicProperties,
long *outSerializerSvcId,
long *outProtocolSvcId) {
ps_utils_retrieve_topic_properties_data_t data;
data.isPublisher = false;
data.scope = celix_properties_get(svcProperties, PUBSUB_SUBSCRIBER_SCOPE, NULL);
data.topic = celix_properties_get(svcProperties, PUBSUB_SUBSCRIBER_TOPIC, NULL);
data.outEndpoint = NULL;
celix_bundleContext_useBundle(ctx, svcProviderBundleId, &data, getTopicPropertiesCallback);
celix_properties_t *ep = data.outEndpoint;
const char *requested_admin = NULL;
const char *requested_qos = NULL;
const char *requested_serializer = NULL;
const char *requested_protocol = NULL;
requested_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
requested_qos = celix_properties_get(ep, PUBSUB_UTILS_QOS_ATTRIBUTE_KEY, NULL);
requested_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
if (matchProtocol) {
requested_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
}
double score = getPSScore(requested_admin, requested_qos, adminType, sampleScore, controlScore, defaultScore);
long serializerSvcId = getPSSerializer(ctx, requested_serializer);
if (serializerSvcId < 0) {
score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no serializer, no match
}
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
}
if (matchProtocol) {
long protocolSvcId = getPSProtocol(ctx, requested_protocol);
if (protocolSvcId < 0) {
score = PUBSUB_ADMIN_NO_MATCH_SCORE; //no protocol, no match
}
if (outProtocolSvcId != NULL) {
*outProtocolSvcId = protocolSvcId;
}
}
if (outTopicProperties != NULL) {
*outTopicProperties = ep;
} else {
celix_properties_destroy(ep);
}
return score;
}
bool pubsub_utils_matchEndpoint(
celix_bundle_context_t *ctx,
celix_log_helper_t *logHelper,
const celix_properties_t *ep,
const char *adminType,
bool matchProtocol,
long *outSerializerSvcId,
long *outProtocolSvcId) {
bool psaMatch = false;
const char *configured_admin = celix_properties_get(ep, PUBSUB_ENDPOINT_ADMIN_TYPE, NULL);
if (configured_admin != NULL) {
psaMatch = strncmp(configured_admin, adminType, strlen(adminType)) == 0;
}
bool serMatch = false;
long serializerSvcId = -1L;
if (psaMatch) {
const char *configured_serializer = celix_properties_get(ep, PUBSUB_ENDPOINT_SERIALIZER, NULL);
serializerSvcId = getPSSerializer(ctx, configured_serializer);
serMatch = serializerSvcId >= 0;
if(!serMatch) {
celix_logHelper_log(logHelper, CELIX_LOG_LEVEL_ERROR, "Matching endpoint for technology %s but couldn't get serializer %s", configured_admin, configured_serializer);
}
}
bool match = psaMatch && serMatch;
if (matchProtocol) {
bool protMatch = false;
long protocolSvcId = -1L;
if (psaMatch) {
const char *configured_protocol = celix_properties_get(ep, PUBSUB_ENDPOINT_PROTOCOL, NULL);
protocolSvcId = getPSProtocol(ctx, configured_protocol);
protMatch = protocolSvcId >= 0;
if(!protMatch) {
celix_logHelper_log(logHelper, CELIX_LOG_LEVEL_ERROR, "Matching endpoint for technology %s but couldn't get protocol %s", configured_admin, configured_protocol);
}
}
match = match && protMatch;
if (outProtocolSvcId != NULL) {
*outProtocolSvcId = protocolSvcId;
}
}
if (outSerializerSvcId != NULL) {
*outSerializerSvcId = serializerSvcId;
}
return match;
}