blob: 8c711424d5480e1ef39b948d69cdbae2600bba98 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <stdbool.h>
#include <unistd.h>
#include <assert.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <hiredis/hiredis.h>
#include "../service/fsof_mq.h"
#include "../service/mt_array.h"
#include "../service/fsof_zookeeper.h"
#include "../common/fsof_global.h"
#include "../common/fsof_util.h"
#include "../service/fsof_redis.h"
#include "../service/fsof_log.h"
#include "../common/fsof_url.h"
#include <sys/fcntl.h>
#include <signal.h>
//redis thread handle
pthread_t g_redis_pid;
//message queue ,store zookeeper node info
struct fsof_queue *g_message_queue;
//current zookeeper config path
static char g_zookeeper_conf[MAX_CONFIG_PATH_LEN] = {0};
//zookeeper url list
static char g_zk_list[512] = {0};
char g_str_localip[INET_ADDRSTRLEN] = {0};
void init_zookeeper_env();
static void set_all_providers_watcher(struct String_vector *list); //all providers add watcher
static void enum_provider_list(struct fsof_message *message);
static void get_zk_root_children();
//deal with signo such as SIGALARM
void sigroutine(int signo) {
//need to get zookeeper node info from zk server manually
if (signo == SIGALRM) {
fsof_log_info(ERROR,"Now recvied alarm signal\n");
get_zk_root_children();
}
}
//create thread
static void create_thread(pthread_t *thread,void *(*start_routine) (void *),void *arg) {
if (pthread_create(thread,NULL,start_routine,arg)) {
//thread create failed
fsof_log_info(INFO,"create thread function called failed!\n");
exit(-1);
}
}
//redis worker thread ,main job is getting message from g_message_queue ,then write into redis
static void * thread_redis_work(void *p) {
int ret = 0;
int i = 0;
struct fsof_message message = {0};
struct timeval start = {0};
struct timeval end = {0};
int exec_time = 0;
while(true) {
//pop message from g_message_queue
ret = fsof_mq_pop(g_message_queue,&message);
if (ret) {
//deal with message
if (message.count > 0) {
gettimeofday(&start,NULL);
ret = compare_and_set_val(&message);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (ret == 0) {
//insert succeed
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|set redis data succeed!|set_redis|%d",g_str_localip,0,exec_time);
}
} else {
gettimeofday(&start,NULL);
fsof_redis_delete(message.key);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
fsof_log_info(DEBUG,"%s|FSOF_AGENT|%d|delete redis data key succeed!|delete_redis|%d",g_str_localip,0,exec_time);
}
//enum every provider value and key
enum_provider_list(&message);
//free message content, message itself free by g_message_queue
for (i = 0; i < message.count; i++) {
if (message.value[i] != NULL) {
free(message.value[i]);
}
}
free(message.key);
if (message.value != NULL) {
free(message.value);
}
} else {
//0.1 seconds
usleep(100000);
}
}
return NULL;
}
static void parse_provider_key(struct String_vector *list,const char *path) {
int i ;
struct fsof_message message = {0};
char url_encode_data[URL_ENCODE_BUF_LEN] = {0};
int nlen = 0;
int service_len = 0;
int provider_index = 0;
char *provider_str = NULL;
//get service key
provider_str = strstr(path,PROVIDER_NAME);
provider_index = provider_str - path;
service_len = provider_index - sizeof(FSOF_ROOT_NAME);
if (service_len <= 0) {
return;
}
message.key = malloc(service_len + 1);
assert(message.key);
memcpy(message.key,path + sizeof(FSOF_ROOT_NAME),service_len);
message.key[service_len] = '\0';
if (list->count > 0) {
message.value = malloc(sizeof(char*) * list->count);
assert(message.value);
memset(message.value,0,sizeof(char*) * list->count);
int val_index = 0;
for (i = 0; i < list->count; i++) {
memcpy(url_encode_data,list->data[i],strlen(list->data[i]));
nlen = fsof_url_decode(url_encode_data,strlen(url_encode_data));
fsof_log_info(INFO,"key: %s,value: %s,nlen is %d\n",message.key,url_encode_data,nlen);
message.value[val_index] = malloc(nlen + 1);
assert(message.value[val_index]);
strcpy(message.value[val_index],url_encode_data);
message.value[val_index][nlen] = '\0';
val_index++;
memset(url_encode_data,0,URL_ENCODE_BUF_LEN);
}
message.count = val_index;
}else {
fsof_log_info(INFO,"parse key child is zero,need to delete redis key %s\n",path);
//means delete provider key
message.count = 0;
}
fsof_mq_push(g_message_queue,&message);
}
//enum provider list
static void enum_provider_list(struct fsof_message *message) {
int i ;
for (i = 0; i < message->count; i++) {
fsof_log_info(INFO,"Enum provider key value is %s,path is %s\n",message->value[i],message->key);
}
}
static void fsof_provider_node_watcher(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx) {
struct String_vector *str_list = NULL;
struct timeval start = {0};
struct timeval end = {0};
int exec_time = 0;
if (type == ZOO_CHILD_EVENT) {
fsof_log_info(INFO,"%s children node has been changed ,need to get new children list\n",path);
if (strcmp(path,FSOF_ROOT_NAME) == 0) {
gettimeofday(&start,NULL);
str_list = fsof_zk_get_children(FSOF_ROOT_NAME);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (str_list != NULL) {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data succeed!|get_zookeeper|%d",g_str_localip,0,exec_time);
set_all_providers_watcher(str_list);
} else {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data failed!|get_zookeeper|%d",g_str_localip,-1,exec_time);
}
} else {
gettimeofday(&start,NULL);
str_list = fsof_zk_get_children(path);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (str_list != NULL) {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data succeed!|get_zookeeper|%d",g_str_localip,0,exec_time);
parse_provider_key(str_list,path);
} else {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data failed!|get_zookeeper|%d",g_str_localip,-1,exec_time);
}
}
if (str_list != NULL) {
deallocate_String_vector(str_list);
free(str_list);
}
}
}
//watcher service node ,when the service node children has been changed ,need to set provider watcher
static void fsof_service_node_watcher(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx) {
struct String_vector *str_list = NULL;
struct String_vector *provider_list = NULL;
struct timeval start = {0};
struct timeval end = {0};
int exec_time = 0;
char provider_key[512] = {0};
int i;
if (type == ZOO_CHILD_EVENT) {
fsof_log_info(INFO,"service node watcher entered %s\n",path);
gettimeofday(&start,NULL);
str_list = fsof_zk_get_children(path);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (str_list != NULL) {
sprintf(provider_key,"%s%s",path,PROVIDER_NAME);
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data succeed!|get_zookeeper|%d",g_str_localip,0,exec_time);
for (i = 0; i < str_list->count; i++) {
if (strcmp(str_list->data[i],PROVIDER_NODE_NAME) == 0) {
fsof_zk_add_listener(provider_key,fsof_provider_node_watcher);
gettimeofday(&start,NULL);
provider_list = fsof_zk_get_children(provider_key);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (provider_list != NULL) { //deal
//enum_provider_list(provider_list,provider_key);
parse_provider_key(provider_list,provider_key);
deallocate_String_vector(provider_list);
free(provider_list);
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data succeed!|get_zookeeper|%d",g_str_localip,0,exec_time);
} else {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data failed!|get_zookeeper|%d",g_str_localip,-1,exec_time);
}
break;
}
}
deallocate_String_vector(str_list);
free(str_list);
} else {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data failed!|get_zookeeper|%d",g_str_localip,-1,exec_time);
}
}
}
static void set_all_providers_watcher(struct String_vector *list) {
struct String_vector *provider_list = NULL;
char provider_key[512] = {0};
struct timeval start = {0};
struct timeval end = {0};
int exec_time = 0;
int i;
for (i = 0; i < list->count; i++) {
sprintf(provider_key,"%s/%s%s",FSOF_ROOT_NAME,list->data[i],PROVIDER_NAME);
fsof_zk_add_listener(provider_key,fsof_provider_node_watcher);
gettimeofday(&start,NULL);
provider_list = fsof_zk_get_children(provider_key);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (provider_list == NULL) {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data failed!|get_zookeeper|%d",g_str_localip,-1,exec_time);
memset(provider_key,0,sizeof(provider_key));
sprintf(provider_key,"%s/%s",FSOF_ROOT_NAME,list->data[i]);
fsof_zk_add_listener(provider_key,fsof_service_node_watcher);
gettimeofday(&start,NULL);
provider_list = fsof_zk_get_children(provider_key);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (provider_list == NULL) {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data failed!|get_zookeeper|%d",g_str_localip,-1,exec_time);
} else {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data succeed!|get_zookeeper|%d",g_str_localip,0,exec_time);
}
}else {
//enum_provider_list(provider_list,provider_key);
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data succeed!|get_zookeeper|%d",g_str_localip,0,exec_time);
parse_provider_key(provider_list,provider_key);
}
memset(provider_key,0,sizeof(provider_key));
if (provider_list != NULL) {
deallocate_String_vector(provider_list);
free(provider_list);
}
}
}
static void get_zk_root_children() {
struct timeval start = {0};
struct timeval end = {0};
int exec_time = 0;
struct String_vector *str_list = NULL;
fsof_zk_add_listener(FSOF_ROOT_NAME,fsof_provider_node_watcher);
gettimeofday(&start,NULL);
str_list = fsof_zk_get_children(FSOF_ROOT_NAME);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); //calculate execute time
if (str_list != NULL) {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data succeed!|get_zookeeper|%d",g_str_localip,0,exec_time);
set_all_providers_watcher(str_list);
}else {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|get zookeeper data failed!|get_zookeeper|%d",g_str_localip,-1,exec_time);
fsof_log_info(INFO,"can't get children of key %s\n",FSOF_ROOT_NAME);
}
if (str_list != NULL) {
deallocate_String_vector(str_list);
free(str_list);
}
}
//watcher fsof root node main event :ZOO_CONNECTED_STATE ZOO_EXPIRED_SESSION_STATE
static void fsof_root_node_watcher(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx) {
if (type == ZOO_SESSION_EVENT) {
if(state == ZOO_CONNECTED_STATE) {
//begin get children from root node
get_zk_root_children();
} else if (state == ZOO_EXPIRED_SESSION_STATE || state == ZOO_CONNECTING_STATE || state == ZOO_AUTH_FAILED_STATE) {
//log info sesstion expired need rejoin
fsof_log_info(ERROR,"zookeeper session expired,need rejoin zookeeper\n");
fsof_zk_close();
init_zookeeper_env();
}
}
}
void parse_zk_config(char *src,char *dest) {
assert(src);
assert(dest);
char *ptr = NULL;
char *tail_ptr = src + strlen(src);
while ((ptr = strstr(src,"http://")) != 0) {
if ((ptr - src) == 0) {
src += strlen("http://");
}
else if ((ptr - src) > 0) {
memcpy(dest,src,ptr - src);
dest += ptr - src;
src += ptr - src + strlen("http://");
}
}
if (ptr == NULL && src != tail_ptr) {
memcpy(dest,src,tail_ptr - src);
}
}
void init_zookeeper_env() {
int ret = 0;
char *zk_str = NULL;
struct timeval start = {0};
struct timeval end = {0};
int exec_time = 0;
if (*g_zookeeper_conf != '\0') {
//parse zookeeper config
fsof_log_info(INFO,"Now begin start parse zookeeper config file %s!\n",g_zookeeper_conf);
zk_str = get_ini_key_string("fsof_setting","zk_url_list",g_zookeeper_conf);
parse_zk_config(zk_str,g_zk_list);
fsof_log_info(INFO,"Now end start parse zookeeper config file %s,zk list str is %s!\n",g_zookeeper_conf,g_zk_list);
}
//set root node watcher
fsof_zk_add_state_listener(fsof_root_node_watcher);
do{
gettimeofday(&start,NULL);
ret = fsof_zk_init(g_zk_list);
gettimeofday(&end,NULL);
exec_time = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_sec - start.tv_sec); //calculate execute time
if (ret > 0) {
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|connect zookeeper timeout!|connect_zookeeper|%d",g_str_localip,ret,exec_time);
sleep(300);
}
}while(ret);
fsof_log_info(INFO,"%s|FSOF_AGENT|%d|connect zookeeper succeed!|connect_zookeeper|%d",g_str_localip,ret,exec_time);
}
static void get_local_ip() {
struct ifaddrs * ifAddrStruct = NULL;
void * tmpAddrPtr = NULL;
getifaddrs(&ifAddrStruct);
while (ifAddrStruct != NULL) {
if (ifAddrStruct->ifa_addr->sa_family == AF_INET) {
tmpAddrPtr = &((struct sockaddr_in *)ifAddrStruct->ifa_addr)->sin_addr;
inet_ntop(AF_INET, tmpAddrPtr, g_str_localip, INET_ADDRSTRLEN);
}
ifAddrStruct = ifAddrStruct->ifa_next;
memset(g_str_localip,0,INET_ADDRSTRLEN);
}
}
void cleanup_env() {
fsof_zk_close();
fsof_redis_close();
fsof_log_close();
}
//set process to daemon
void daemonize(void) {
int fd;
/* parent exits */
if (fork() != 0) {
exit(0);
}
/* create a new session */
setsid();
/* Every output goes to /dev/null. If Redis is daemonized but
* the 'logfile' is set to 'stdout' in the configuration file
* it will not log at all. */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd , STDERR_FILENO);
close(fd);
}
}
int main(int argc,char **argv) {
int err = 0;
struct itimerval timer_setting,old_val;
int timer_val = 0;
if (argc < 2){
fprintf(stderr,"parameters contains the timer value of zookeeper unit:second and the URL config of zookeeper!\n");
fprintf(stderr,"agent time conf/fsof.ini\n");
exit(0);
}
//parameter contains the timer value of zookeeper unit:second
timer_val = atoi(argv[1]);
//dubbo-php-framework/config/global/conf/fsof.ini
strncpy(g_zookeeper_conf, argv[2], sizeof(g_zookeeper_conf) - 1 > strlen(argv[2]) ? strlen(argv[2]) : sizeof(g_zookeeper_conf) - 1);
if (access(g_zookeeper_conf, F_OK) == -1){
fprintf(stderr,"can not find conf/fsof.ini\n");
exit(0);
}
//set process to daemon ,if failed ,directly exit
daemonize();
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
signal(SIGALRM, sigroutine);
if (timer_val > 0) {
timer_setting.it_value.tv_sec = timer_val;
timer_setting.it_value.tv_usec = 0;
timer_setting.it_interval.tv_sec = timer_val;
timer_setting.it_interval.tv_usec = 0;
setitimer(ITIMER_REAL, &timer_setting, &old_val);
}
err = fsof_log_init();
if (err > 0) {
fprintf(stderr,"agent log init failed,exit!\n");
exit(-1);
}
get_local_ip();
g_message_queue = fsof_mq_create();
create_thread(&g_redis_pid,thread_redis_work,NULL);
fsof_redis_init(REDIS_UNIX_SOCK);
init_zookeeper_env();
pthread_join(g_redis_pid,NULL);
fsof_mq_destroy(g_message_queue);
fsof_log_info(INFO,"fsof agent redis thread exit,need cleanup resource!\n");
//cleanup
cleanup_env();
return 0;
}