blob: 131ec14e2a325e85582e8db679695ef8910b1711 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <stdbool.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include "celix_api.h"
#include "celix_log_helper.h"
#include "utils.h"
#include "discovery.h"
#include "discovery_impl.h"
#include "etcd.h"
#include "etcd_watcher.h"
#include "endpoint_discovery_poller.h"
struct etcd_watcher {
etcdlib_t *etcdlib;
int ttl;
discovery_t *discovery;
celix_log_helper_t **loghelper;
hash_map_pt entries;
celix_thread_mutex_t watcherLock;
celix_thread_t watcherThread;
volatile bool running;
};
#define MAX_ROOTNODE_LENGTH 128
#define MAX_LOCALNODE_LENGTH 4096
#define MAX_VALUE_LENGTH 256
#define CFG_ETCD_ROOT_PATH "DISCOVERY_ETCD_ROOT_PATH"
#define DEFAULT_ETCD_ROOTPATH "discovery"
#define CFG_ETCD_SERVER_IP "DISCOVERY_ETCD_SERVER_IP"
#define DEFAULT_ETCD_SERVER_IP "127.0.0.1"
#define CFG_ETCD_SERVER_PORT "DISCOVERY_ETCD_SERVER_PORT"
#define DEFAULT_ETCD_SERVER_PORT 2379
// be careful - this should be higher than the curl timeout
#define CFG_ETCD_TTL "DISCOVERY_ETCD_TTL"
#define DEFAULT_ETCD_TTL 30
// note that the rootNode shouldn't have a leading slash
static celix_status_t etcdWatcher_getRootPath(celix_bundle_context_t *context, char* rootNode) {
celix_status_t status = CELIX_SUCCESS;
const char* rootPath = NULL;
if (((bundleContext_getProperty(context, CFG_ETCD_ROOT_PATH, &rootPath)) != CELIX_SUCCESS) || (!rootPath)) {
strncpy(rootNode, DEFAULT_ETCD_ROOTPATH, MAX_ROOTNODE_LENGTH);
}
else {
strncpy(rootNode, rootPath, MAX_ROOTNODE_LENGTH);
}
return status;
}
static celix_status_t etcdWatcher_getLocalNodePath(celix_bundle_context_t *context, char* localNodePath) {
celix_status_t status = CELIX_SUCCESS;
char rootPath[MAX_ROOTNODE_LENGTH];
const char* uuid = NULL;
if ((etcdWatcher_getRootPath(context, rootPath) != CELIX_SUCCESS)) {
status = CELIX_ILLEGAL_STATE;
}
else if (((bundleContext_getProperty(context, CELIX_FRAMEWORK_UUID, &uuid)) != CELIX_SUCCESS) || (!uuid)) {
status = CELIX_ILLEGAL_STATE;
}
else if (rootPath[strlen(rootPath) - 1] == '/') {
snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s%s", rootPath, uuid);
}
else {
snprintf(localNodePath, MAX_LOCALNODE_LENGTH, "%s/%s", rootPath, uuid);
}
return status;
}
static void add_node(const char *key, const char *value, void* arg) {
discovery_t *discovery = (discovery_t *) arg;
endpointDiscoveryPoller_addDiscoveryEndpoint(discovery->poller, (char *) value);
}
/*
* retrieves all already existing discovery endpoints
* from etcd and adds them to the poller.
*
* returns the modifiedIndex of the last modified
* discovery endpoint (see etcd documentation).
*/
static celix_status_t etcdWatcher_addAlreadyExistingWatchpoints(etcd_watcher_t *watcher, discovery_t *discovery, long long* highestModified) {
celix_status_t status = CELIX_SUCCESS;
char rootPath[MAX_ROOTNODE_LENGTH];
status = etcdWatcher_getRootPath(discovery->context, rootPath);
if (status == CELIX_SUCCESS) {
if(etcdlib_get_directory(watcher->etcdlib, rootPath, add_node, discovery, highestModified)) {
status = CELIX_ILLEGAL_ARGUMENT;
}
}
return status;
}
static celix_status_t etcdWatcher_addOwnFramework(etcd_watcher_t *watcher)
{
char localNodePath[MAX_LOCALNODE_LENGTH];
char *value = NULL;
char url[MAX_VALUE_LENGTH];
int modIndex;
char* endpoints = NULL;
celix_bundle_context_t *context = watcher->discovery->context;
endpoint_discovery_server_t *server = watcher->discovery->server;
// register own framework
celix_status_t status;
if ((status = etcdWatcher_getLocalNodePath(context, localNodePath)) != CELIX_SUCCESS) {
return status;
}
if (endpointDiscoveryServer_getUrl(server, url, MAX_VALUE_LENGTH) != CELIX_SUCCESS) {
snprintf(url, MAX_VALUE_LENGTH, "http://%s:%s/%s", DEFAULT_SERVER_IP, DEFAULT_SERVER_PORT, DEFAULT_SERVER_PATH);
}
endpoints = url;
watcher->ttl = celix_bundleContext_getPropertyAsLong(context, CFG_ETCD_TTL, DEFAULT_ETCD_TTL);
if (etcdlib_get(watcher->etcdlib, localNodePath, &value, &modIndex) != ETCDLIB_RC_OK) {
etcdlib_set(watcher->etcdlib, localNodePath, endpoints, watcher->ttl, false);
} else if (etcdlib_set(watcher->etcdlib, localNodePath, endpoints, watcher->ttl , true) != ETCDLIB_RC_OK) {
celix_logHelper_log(*watcher->loghelper, CELIX_LOG_LEVEL_WARNING, "Cannot register local discovery");
}
FREE_MEM(value);
return CELIX_SUCCESS;
}
static celix_status_t etcdWatcher_addEntry(etcd_watcher_t *watcher, char* key, char* value) {
celix_status_t status = CELIX_BUNDLE_EXCEPTION;
endpoint_discovery_poller_t *poller = watcher->discovery->poller;
if (!hashMap_containsKey(watcher->entries, key)) {
status = endpointDiscoveryPoller_addDiscoveryEndpoint(poller, value);
if (status == CELIX_SUCCESS) {
hashMap_put(watcher->entries, strdup(key), strdup(value));
}
}
return status;
}
static celix_status_t etcdWatcher_removeEntry(etcd_watcher_t *watcher, char* key, char* value) {
celix_status_t status = CELIX_BUNDLE_EXCEPTION;
endpoint_discovery_poller_t *poller = watcher->discovery->poller;
hash_map_entry_pt entry = hashMap_getEntry(watcher->entries, key);
if (entry != NULL) {
void* origKey = hashMapEntry_getKey(entry);
void* value = hashMap_remove(watcher->entries, key);
free(origKey);
// check if there is another entry with the same value
hash_map_iterator_pt iter = hashMapIterator_create(watcher->entries);
unsigned int valueFound = 0;
while (hashMapIterator_hasNext(iter) && valueFound <= 1) {
if (strcmp(value, hashMapIterator_nextValue(iter)) == 0)
valueFound++;
}
hashMapIterator_destroy(iter);
if (valueFound == 0)
status = endpointDiscoveryPoller_removeDiscoveryEndpoint(poller, value);
free(value);
}
return status;
}
/*
* performs (blocking) etcd_watch calls to check for
* changing discovery endpoint information within etcd.
*/
static void* etcdWatcher_run(void* data) {
etcd_watcher_t *watcher = (etcd_watcher_t *) data;
time_t timeBeforeWatch = time(NULL);
char rootPath[MAX_ROOTNODE_LENGTH];
long long highestModified = 0;
celix_bundle_context_t *context = watcher->discovery->context;
etcdWatcher_addAlreadyExistingWatchpoints(watcher, watcher->discovery, &highestModified);
etcdWatcher_getRootPath(context, rootPath);
while (watcher->running) {
char *rkey = NULL;
char *value = NULL;
char *preValue = NULL;
char *action = NULL;
long long modIndex;
if (etcdlib_watch(watcher->etcdlib, rootPath, highestModified + 1, &action, &preValue, &value, &rkey, &modIndex) == 0 && action != NULL) {
if (strcmp(action, "set") == 0) {
etcdWatcher_addEntry(watcher, rkey, value);
} else if (strcmp(action, "delete") == 0) {
etcdWatcher_removeEntry(watcher, rkey, value);
} else if (strcmp(action, "expire") == 0) {
etcdWatcher_removeEntry(watcher, rkey, value);
} else if (strcmp(action, "update") == 0) {
etcdWatcher_addEntry(watcher, rkey, value);
} else {
celix_logHelper_log(*watcher->loghelper, CELIX_LOG_LEVEL_INFO, "Unexpected action: %s", action);
}
highestModified = modIndex;
} else if (time(NULL) - timeBeforeWatch <= (watcher->ttl / 4)) {
sleep(watcher->ttl / 4);
}
FREE_MEM(action);
FREE_MEM(value);
FREE_MEM(preValue);
FREE_MEM(rkey);
// update own framework uuid
if (time(NULL) - timeBeforeWatch > (watcher->ttl / 4)) {
etcdWatcher_addOwnFramework(watcher);
timeBeforeWatch = time(NULL);
}
}
return NULL;
}
/*
* the ectdWatcher needs to have access to the endpoint_discovery_poller and therefore is only
* allowed to be created after the endpoint_discovery_poller
*/
celix_status_t etcdWatcher_create(discovery_t *discovery, celix_bundle_context_t *context, etcd_watcher_t **watcher)
{
if (discovery == NULL) {
return CELIX_BUNDLE_EXCEPTION;
}
(*watcher) = calloc(1, sizeof(struct etcd_watcher));
if (!*watcher) {
return CELIX_ENOMEM;
}
else
{
(*watcher)->discovery = discovery;
(*watcher)->loghelper = &discovery->loghelper;
(*watcher)->entries = hashMap_create(utils_stringHash, NULL, utils_stringEquals, NULL);
(*watcher)->ttl = DEFAULT_ETCD_TTL;
}
const char* etcd_server = celix_bundleContext_getProperty(context, CFG_ETCD_SERVER_IP, DEFAULT_ETCD_SERVER_IP);
long etcd_port = celix_bundleContext_getPropertyAsLong(context, CFG_ETCD_SERVER_PORT, DEFAULT_ETCD_SERVER_PORT);
(*watcher)->etcdlib = etcdlib_create(etcd_server, (int)etcd_port, ETCDLIB_NO_CURL_INITIALIZATION);
celix_status_t status = CELIX_SUCCESS;
if ((*watcher)->etcdlib == NULL) {
status = CELIX_BUNDLE_EXCEPTION;
}
if (status == CELIX_SUCCESS) {
etcdWatcher_addOwnFramework(*watcher);
status = celixThreadMutex_create(&(*watcher)->watcherLock, NULL);
}
if (status == CELIX_SUCCESS) {
if (celixThreadMutex_lock(&(*watcher)->watcherLock) == CELIX_SUCCESS) {
status = celixThread_create(&(*watcher)->watcherThread, NULL, etcdWatcher_run, *watcher);
if (status == CELIX_SUCCESS) {
(*watcher)->running = true;
}
celixThreadMutex_unlock(&(*watcher)->watcherLock);
}
}
return status;
}
celix_status_t etcdWatcher_destroy(etcd_watcher_t *watcher) {
celix_status_t status = CELIX_SUCCESS;
char localNodePath[MAX_LOCALNODE_LENGTH];
celixThreadMutex_lock(&watcher->watcherLock);
watcher->running = false;
celixThreadMutex_unlock(&watcher->watcherLock);
celixThread_join(watcher->watcherThread, NULL);
// register own framework
status = etcdWatcher_getLocalNodePath(watcher->discovery->context, localNodePath);
if (status != CELIX_SUCCESS || etcdlib_del(watcher->etcdlib, localNodePath) == false)
{
celix_logHelper_log(*watcher->loghelper, CELIX_LOG_LEVEL_WARNING, "Cannot remove local discovery registration.");
}
watcher->loghelper = NULL;
hashMap_destroy(watcher->entries, true, true);
etcdlib_destroy(watcher->etcdlib);
free(watcher);
return status;
}