blob: ed54f390ecad36a0fda0317456e29a8c8668f16b [file] [log] [blame]
/**
*Licensed to the Apache Software Foundation (ASF) under one
*or more contributor license agreements. See the NOTICE file
*distributed with this work for additional information
*regarding copyright ownership. The ASF licenses this file
*to you under the Apache License, Version 2.0 (the
*"License"); you may not use this file except in compliance
*with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*Unless required by applicable law or agreed to in writing,
*software distributed under the License is distributed on an
*"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
*specific language governing permissions and limitations
*under the License.
*/
/*
* event_admin_impl.c
*
* Created on: Jul 24, 2013
* \author <a href="mailto:dev@celix.apache.org">Apache Celix Project Team</a>
* \copyright Apache License, Version 2.0
*/
#include <stdlib.h>
#include <unistd.h>
#include "event_admin.h"
#include "event_admin_impl.h"
#include "event_handler.h"
#include "hash_map.h"
#include "utils.h"
#include "celix_log.h"
celix_status_t eventAdmin_create(bundle_context_pt context, event_admin_pt *event_admin) {
celix_status_t status = CELIX_SUCCESS;
*event_admin = calloc(1,sizeof(**event_admin));
if (!*event_admin) {
status = CELIX_ENOMEM;
} else {
(*event_admin)->channels = hashMap_create(utils_stringHash, utils_stringHash, utils_stringEquals, utils_stringEquals);
(*event_admin)->context =context;
(*event_admin)->eventListProcessor = celix_thread_default;
(*event_admin)->eventAdminRunning = false;
linkedList_create(&(*event_admin)->eventList);
status = arrayList_create(&(*event_admin)->event_handlers);
}
return status;
}
celix_status_t eventAdmin_start(event_admin_pt event_admin) {
celix_status_t status = CELIX_SUCCESS;
status = celixThread_create(&event_admin->eventListProcessor, NULL, eventProcessor, &(*event_admin));
celixThreadMutex_create(&event_admin->eventListLock, NULL);
celixThreadMutex_create(&event_admin->topicLock, NULL);
event_admin->eventAdminRunning = true;
return status;
}
celix_status_t eventAdmin_stop(event_admin_pt event_admin)
{
event_admin->eventAdminRunning = false;
return CELIX_SUCCESS;
}
celix_status_t eventAdmin_destroy(event_admin_pt event_admin) {
celix_status_t status = CELIX_SUCCESS;
status = celixThread_join(event_admin->eventListProcessor, NULL);
hashMap_destroy(event_admin->channels, false, false);
arrayList_destroy(event_admin->event_handlers);
linkedList_destroy(event_admin->eventList);
free(event_admin);
return status;
}
celix_status_t eventAdmin_postEvent(event_admin_pt event_admin, event_pt event) {
bool added = false;
while (event_admin->eventAdminRunning && added == false) {
if (celixThreadMutex_tryLock(&event_admin->eventListLock) == 0) {
//linkedList_addLast(event_admin->eventList, event);
array_list_pt event_handlers;
array_list_pt event_and_handlers;
const char *topic;
eventAdmin_getTopic(event, &topic);
arrayList_create(&event_and_handlers);
arrayList_create(&event_handlers);
eventAdmin_findHandlersByTopic(event_admin, topic, event_handlers);
arrayList_addIndex(event_and_handlers, 0, event);
arrayList_addIndex(event_and_handlers, 1, event_handlers);
linkedList_addLast(event_admin->eventList, event_and_handlers);
celixThreadMutex_unlock(&event_admin->eventListLock);
added = true;
}
}
return CELIX_SUCCESS;
}
void* eventProcessor(void *handle) {
celix_status_t status = CELIX_SUCCESS;
event_admin_pt eventAdminPt = handle;
event_pt event = NULL;
array_list_pt event_and_handlers;
arrayList_create(&event_and_handlers);
int waitcounter = 1;
while (eventAdminPt->eventAdminRunning) {
if (celixThreadMutex_tryLock(&eventAdminPt->eventListLock) == 0) {
if (linkedList_isEmpty(eventAdminPt->eventList)) {
if (waitcounter < 10) {
waitcounter++;
} else {
waitcounter = 1;
}
} else {
event_and_handlers = linkedList_removeFirst(eventAdminPt->eventList);
waitcounter = 1;
}
celixThreadMutex_unlock(&eventAdminPt->eventListLock);
} else {
if (waitcounter < 10) {
waitcounter++;
} else {
waitcounter = 1;
}
}
if (arrayList_isEmpty(event_and_handlers) == false) {
event = arrayList_get(event_and_handlers, 0);
array_list_pt event_handlers = arrayList_get(event_and_handlers, 1);
int i;
for (i = 0; i < arrayList_size(event_handlers); i += 1) {
event_handler_service_pt event_handler_service = (event_handler_service_pt) arrayList_get(event_handlers, i);
if (event_handler_service != NULL) {
event_handler_service->handle_event(&event_handler_service->event_handler, event);
}
}
const char *topic;
eventAdmin_getTopic(event, &topic);
if (status == CELIX_SUCCESS) {
eventAdmin_unlockTopic(eventAdminPt, topic);
event = NULL;
} else {
do {
status = celixThreadMutex_tryLock(&eventAdminPt->eventListLock);
} while (status != 0);
//processing event unsuccesfull since topic is locked. put event back into list for later processing.
linkedList_addFirst(eventAdminPt->eventList, event);
celixThreadMutex_unlock(&eventAdminPt->eventListLock);
}
}
usleep(waitcounter * 1000);
}
arrayList_destroy(event_and_handlers);
return CELIX_SUCCESS;
}
celix_status_t processEvent(event_admin_pt event_admin, event_pt event) {
celix_status_t status = CELIX_SUCCESS;
const char *topic;
eventAdmin_getTopic(event, &topic);
array_list_pt event_handlers;
arrayList_create(&event_handlers);
status = eventAdmin_findHandlersByTopic(event_admin, topic, event_handlers);
if (status == CELIX_SUCCESS) {
array_list_iterator_pt handlers_iterator = arrayListIterator_create(event_handlers);
while (arrayListIterator_hasNext(handlers_iterator)) {
event_handler_service_pt event_handler_service = (event_handler_service_pt) arrayListIterator_next(
handlers_iterator);
event_handler_service->handle_event(&event_handler_service->event_handler, event);
}
}
arrayList_destroy(event_handlers);
return status;
}
celix_status_t eventAdmin_sendEvent(event_admin_pt event_admin, event_pt event) {
celix_status_t status = CELIX_SUCCESS;
status = processEvent(event_admin, event);
const char *topic;
eventAdmin_getTopic(event, &topic);
status = eventAdmin_unlockTopic(event_admin, topic);
return status;
}
celix_status_t eventAdmin_findHandlersByTopic(event_admin_pt event_admin, const char *topic,
array_list_pt event_handlers) {
celix_status_t status = CELIX_SUCCESS;
hash_map_pt channels = event_admin->channels;
channel_t channel = hashMap_get(channels, topic);
if (channel != NULL) {
status = celixThreadMutex_tryLock(channel->channelLock);
if (status == 0) {
if (channel->eventHandlers != NULL && !hashMap_isEmpty(channel->eventHandlers)) {
// iterate throught the handlers and add them to the array list for result.
hash_map_iterator_pt hashmap_iterator = hashMapIterator_create(channel->eventHandlers);
while (hashMapIterator_hasNext(hashmap_iterator)) {
arrayList_add(event_handlers,
(event_handler_service_pt) hashMapIterator_nextValue(hashmap_iterator));
}
hashMapIterator_destroy(hashmap_iterator);
}
} else {
status = CELIX_ILLEGAL_STATE;
}
}
return status;
}
celix_status_t eventAdmin_unlockTopic(event_admin_pt event_admin, const char *topic) {
celix_status_t status = CELIX_SUCCESS;
hash_map_pt channels = event_admin->channels;
channel_t channel = hashMap_get(channels, topic);
if (channel != NULL) {
status = celixThreadMutex_unlock(channel->channelLock);
}
return status;
}
celix_status_t eventAdmin_createEventChannels(event_admin_pt event_admin, const char *topic,
event_handler_service_pt event_handler_service) {
celix_status_t status = CELIX_SUCCESS;
do {
status = celixThreadMutex_tryLock(&event_admin->topicLock);
} while (status != 0);
channel_t channel = hashMap_get(event_admin->channels, topic);
if (channel == NULL) {
//create channel
channel = calloc(1, sizeof(*channel));
if (!channel) {
status = CELIX_ENOMEM;
} else {
char *channel_name = strdup(topic);
channel->topic = channel_name;
channel->eventHandlers = hashMap_create(NULL,NULL,NULL,NULL);
channel->channelLock = calloc(1, sizeof(celix_thread_mutex_t));
celixThreadMutex_create(channel->channelLock, NULL);
hashMap_put(event_admin->channels, channel_name, channel);
}
}
if (channel) {
hashMap_put(channel->eventHandlers, &event_handler_service, event_handler_service);
}
status = celixThreadMutex_unlock(&event_admin->topicLock);
return status;
}
celix_status_t eventAdmin_addingService(void* handle, service_reference_pt ref, void **service) {
celix_status_t status = CELIX_SUCCESS;
event_admin_pt event_admin = handle;
status = bundleContext_getService(event_admin->context, ref, service);
return status;
}
celix_status_t eventAdmin_addedService(void* handle, service_reference_pt ref, void * service) {
celix_status_t status = CELIX_SUCCESS;
event_admin_pt event_admin = handle;
event_handler_service_pt event_handler_service = NULL;
event_handler_service = (event_handler_service_pt) service;
const char *topic = NULL;
serviceReference_getProperty(ref, (const char*)EVENT_TOPIC, &topic);
eventAdmin_createEventChannels(event_admin,topic,event_handler_service);
return status;
}
celix_status_t eventAdmin_modifiedService(void* handle, service_reference_pt ref, void * service) {
/* event_admin_pt event_admin = (event_admin_pt) handle;
logHelper_log(*event_admin->loghelper, OSGI_LOGSERVICE_ERROR, "Event admin Modified");*/
return CELIX_SUCCESS;
}
celix_status_t eventAdmin_removedService(void* handle, service_reference_pt ref, void * service) {
celix_status_t status = CELIX_SUCCESS;
event_admin_pt event_admin = handle;
do {
status = celixThreadMutex_tryLock(&event_admin->topicLock);
} while (status != 0);
const char *topic = NULL;
serviceReference_getProperty(ref, (const char *) EVENT_TOPIC, &topic);
channel_t channel = hashMap_get(event_admin->channels, topic);
hashMap_remove(channel->eventHandlers, &service);
status = celixThreadMutex_unlock(&event_admin->topicLock);
return status;
}