| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * rangerrest.c |
| * routines to interact with Ranger REST API |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "utils/rangerrest.h" |
| #include "utils/hsearch.h" |
| #include "cdb/cdbvars.h" |
| |
| /* |
| * A mapping from AclObjectKind to string |
| */ |
| char* AclObjectKindStr[] = |
| { |
| "table", /* pg_class */ |
| "sequence", /* pg_sequence */ |
| "database", /* pg_database */ |
| "function", /* pg_proc */ |
| "operator", /* pg_operator */ |
| "type", /* pg_type */ |
| "language", /* pg_language */ |
| "namespace", /* pg_namespace */ |
| "oplass", /* pg_opclass */ |
| "conversion", /* pg_conversion */ |
| "tablespace", /* pg_tablespace */ |
| "filespace", /* pg_filespace */ |
| "filesystem", /* pg_filesystem */ |
| "fdw", /* pg_foreign_data_wrapper */ |
| "foreign_server", /* pg_foreign_server */ |
| "protocol", /* pg_extprotocol */ |
| "none" /* MUST BE LAST */ |
| }; |
| |
| static int request_id = 1; |
| |
| static void getClientIP(char *remote_host) |
| { |
| if( MyProcPort->remote_host == NULL || strlen(MyProcPort->remote_host) == 0 ) |
| { |
| snprintf(remote_host, HOST_BUFFER_SIZE, "%s", "UNKNOWN"); |
| return; |
| } |
| if (strcmp(MyProcPort->remote_host, "[local]") == 0) |
| { |
| snprintf(remote_host, HOST_BUFFER_SIZE, "%s", "127.0.0.1"); |
| } |
| else |
| { |
| snprintf(remote_host, HOST_BUFFER_SIZE, "%s", MyProcPort->remote_host); |
| } |
| } |
| |
| /* |
| * parse ranger response |
| * @param buffer ranger response |
| * @param result_list List of RangerPrivilegeResults |
| * @return 0 parse success; -1 other error |
| */ |
| static int parse_ranger_response(char* buffer, List *result_list) |
| { |
| if (buffer == NULL || strlen(buffer) == 0) |
| return -1; |
| |
| elog(RANGER_LOG, "parse ranger restful response content : %s", buffer); |
| |
| struct json_object *response = json_tokener_parse(buffer); |
| if (response == NULL) |
| { |
| elog(WARNING, "failed to parse json tokener."); |
| return -1; |
| } |
| |
| struct json_object *accessObj = NULL; |
| if (!json_object_object_get_ex(response, "access", &accessObj)) |
| { |
| elog(WARNING, "failed to get json \"access\" field."); |
| return -1; |
| } |
| |
| int arraylen = json_object_array_length(accessObj); |
| elog(RANGER_LOG, "parse ranger response result array length: %d",arraylen); |
| for (int i=0; i< arraylen; i++){ |
| struct json_object *jvalue = NULL; |
| struct json_object *jallow = NULL; |
| struct json_object *jresource = NULL; |
| struct json_object *jprivilege = NULL; |
| |
| jvalue = json_object_array_get_idx(accessObj, i); |
| if (jvalue == NULL) |
| return -1; |
| if (!json_object_object_get_ex(jvalue, "allowed", &jallow)) |
| return -1; |
| if (!json_object_object_get_ex(jvalue, "resource", &jresource)) |
| return -1; |
| if (!json_object_object_get_ex(jvalue, "privileges", &jprivilege)) |
| return -1; |
| |
| json_bool ok = json_object_get_boolean(jallow); |
| |
| const char *resource_str = json_object_get_string(jresource); |
| const char *privilege_str = json_object_get_string(jprivilege); |
| uint32 resource_sign = string_hash(resource_str, strlen(resource_str)); |
| uint32 privilege_sign = string_hash(privilege_str, strlen(privilege_str)); |
| elog(RANGER_LOG, "ranger response access sign, resource_str: %s, privilege_str: %s", |
| resource_str, privilege_str); |
| |
| ListCell *result; |
| /* get each resource result by use sign */ |
| foreach(result, result_list) { |
| /* loop find is enough for performence*/ |
| RangerPrivilegeResults *result_ptr = (RangerPrivilegeResults *) lfirst(result); |
| /* if only one access in response, no need to check sign*/ |
| if (arraylen > 1 && |
| (result_ptr->resource_sign != resource_sign || result_ptr->privilege_sign != privilege_sign) ) |
| continue; |
| |
| if (ok == 1) |
| result_ptr->result = RANGERCHECK_OK; |
| else |
| result_ptr->result = RANGERCHECK_NO_PRIV; |
| } |
| } |
| return 0; |
| } |
| |
| /** |
| * convert a string to lower |
| */ |
| static void str_tolower(char *dest, const char *src) |
| { |
| Assert(src != NULL); |
| Assert(dest != NULL); |
| int len = strlen(src); |
| for (int i = 0; i < len; i++) |
| { |
| unsigned char ch = (unsigned char) src[i]; |
| |
| if (ch >= 'A' && ch <= 'Z') |
| ch += 'a' - 'A'; |
| *(dest+i) = ch; |
| } |
| dest[len] = '\0'; |
| } |
| |
| /** |
| * Create a JSON object for Ranger request given some parameters. |
| * example: |
| * { |
| * "requestId": 1, |
| * "user": "joe", |
| * "groups": ["admin","us"], |
| * "clientIp": "123.0.0.21", |
| * "context": "SELECT * FROM sales", |
| * "access": |
| * [ |
| * { |
| * "resource": |
| * { |
| * "database": "finance" |
| * }, |
| * "privileges": ["connect"] |
| * }, |
| * { |
| * "resource": |
| * { |
| * "database": "finance", |
| * "schema": "us", |
| * "table": "sales" |
| * }, |
| * "privileges": ["select", "insert"] |
| * } |
| * ] |
| * } |
| * |
| * @param request_list List of RangerRequestJsonArgs |
| * @param result_list List of RangerPrivilegeResults |
| * @return the parsed json object |
| */ |
| static json_object *create_ranger_request_json(List *request_list, List *result_list) |
| { |
| json_object *jrequest = json_object_new_object(); |
| json_object *juser = NULL; |
| json_object *jaccess = json_object_new_array(); |
| char *user = NULL; |
| ListCell *arg; |
| |
| int j = 0; |
| foreach(arg, request_list) |
| { |
| RangerRequestJsonArgs *arg_ptr = (RangerRequestJsonArgs *) lfirst(arg); |
| if (user == NULL) |
| { |
| user = arg_ptr->user; |
| juser = json_object_new_string(user); |
| } |
| AclObjectKind kind = arg_ptr->kind; |
| char* object = arg_ptr->object; |
| Assert(user != NULL && object != NULL); |
| elog(RANGER_LOG, "build json for ranger restful request, user:%s, kind:%s, object:%s", |
| user, AclObjectKindStr[kind], object); |
| |
| json_object *jelement = json_object_new_object(); |
| json_object *jresource = json_object_new_object(); |
| json_object *jactions = json_object_new_array(); |
| switch(kind) |
| { |
| case ACL_KIND_CLASS: |
| case ACL_KIND_SEQUENCE: |
| case ACL_KIND_PROC: |
| case ACL_KIND_NAMESPACE: |
| case ACL_KIND_LANGUAGE: |
| { |
| char *ptr = NULL; |
| char *name = NULL; |
| char *first = NULL; // could be a database or protocol or tablespace |
| char *second = NULL; // could be a schema or language |
| char *third = NULL; // could be a table or sequence or function |
| int idx = 0; |
| for (name = strtok_r(object, ".", &ptr); name; |
| name = strtok_r(NULL, ".", &ptr), idx++) |
| { |
| if (idx == 0) |
| { |
| first = pstrdup(name); |
| } |
| else if (idx == 1) |
| { |
| second = pstrdup(name); |
| } |
| else |
| { |
| third = pstrdup(name); |
| } |
| } |
| |
| if (first != NULL) |
| { |
| json_object *jfirst = json_object_new_string(first); |
| json_object_object_add(jresource, "database", jfirst); |
| } |
| if (second != NULL) |
| { |
| json_object *jsecond = json_object_new_string(second); |
| json_object_object_add(jresource, |
| (kind == ACL_KIND_LANGUAGE) ? "language" : "schema", jsecond); |
| } |
| if (third != NULL) |
| { |
| json_object *jthird = json_object_new_string(third); |
| json_object_object_add(jresource, |
| (kind == ACL_KIND_CLASS) ? "table" : |
| (kind == ACL_KIND_SEQUENCE) ? "sequence" : "function", jthird); |
| } |
| |
| if (first != NULL) |
| pfree(first); |
| if (second != NULL) |
| pfree(second); |
| if (third != NULL) |
| pfree(third); |
| break; |
| } |
| case ACL_KIND_OPER: |
| case ACL_KIND_CONVERSION: |
| case ACL_KIND_DATABASE: |
| case ACL_KIND_TABLESPACE: |
| case ACL_KIND_TYPE: |
| case ACL_KIND_FILESYSTEM: |
| case ACL_KIND_FDW: |
| case ACL_KIND_FOREIGN_SERVER: |
| case ACL_KIND_EXTPROTOCOL: |
| { |
| json_object *jobject = json_object_new_string(object); |
| json_object_object_add(jresource, AclObjectKindStr[kind], jobject); |
| break; |
| } |
| default: |
| elog(ERROR, "unsupported object kind : %s", AclObjectKindStr[kind]); |
| } // switch |
| json_object_object_add(jelement, "resource", jresource); |
| |
| ListCell *cell; |
| foreach(cell, arg_ptr->actions) |
| { |
| /* need more normalization in future */ |
| char lower_action[32]; |
| str_tolower(lower_action, (char *)cell->data.ptr_value); |
| lower_action[sizeof(lower_action)-1] = '\0'; |
| |
| json_object* jaction = json_object_new_string(lower_action); |
| json_object_array_add(jactions, jaction); |
| } |
| json_object_object_add(jelement, "privileges", jactions); |
| json_object_array_add(jaccess, jelement); |
| |
| /* set access sign */ |
| RangerPrivilegeResults *result_ptr = (RangerPrivilegeResults *)list_nth(result_list, j); |
| const char *resource_str = json_object_to_json_string(jresource); |
| const char *privilege_str = json_object_to_json_string(jactions); |
| result_ptr->resource_sign = string_hash(resource_str, strlen(resource_str)); |
| result_ptr->privilege_sign = string_hash(privilege_str, strlen(privilege_str)); |
| elog(RANGER_LOG, "request access sign, resource_str:%s, privilege_str:%s", |
| resource_str, privilege_str); |
| j++; |
| } // foreach |
| char str[32]; |
| sprintf(str,"%d",request_id); |
| json_object *jreqid = json_object_new_string(str); |
| json_object_object_add(jrequest, "requestId", jreqid); |
| json_object_object_add(jrequest, "user", juser); |
| |
| char remote_host[HOST_BUFFER_SIZE]; |
| getClientIP(remote_host); |
| json_object *jclientip = json_object_new_string(remote_host); |
| json_object_object_add(jrequest, "clientIp", jclientip); |
| |
| json_object *jcontext = json_object_new_string( |
| (debug_query_string == NULL || strlen(debug_query_string) == 0) |
| ? "connect to db" : debug_query_string); |
| json_object_object_add(jrequest, "context", jcontext); |
| json_object_object_add(jrequest, "access", jaccess); |
| |
| return jrequest; |
| } |
| |
| static size_t write_callback(char *contents, size_t size, size_t nitems, |
| void *userp) |
| { |
| size_t realsize = size * nitems; |
| CURL_HANDLE curl = (CURL_HANDLE) userp; |
| Assert(curl != NULL); |
| |
| elog(RANGER_LOG, "ranger restful response size is %d. response buffer size is %d.", curl->response.response_size, curl->response.buffer_size); |
| int original_size = curl->response.buffer_size; |
| while(curl->response.response_size + realsize >= curl->response.buffer_size) |
| { |
| /* double the buffer size if the buffer is not enough.*/ |
| curl->response.buffer_size = curl->response.buffer_size * 2; |
| } |
| if(original_size < curl->response.buffer_size) |
| { |
| /* repalloc is not same as realloc, repalloc's first parameter cannot be NULL */ |
| curl->response.buffer = repalloc(curl->response.buffer, curl->response.buffer_size); |
| } |
| elog(RANGER_LOG, "ranger restful response size is %d. response buffer size is %d.", curl->response.response_size, curl->response.buffer_size); |
| if (curl->response.buffer == NULL) |
| { |
| /* allocate memory failed. probably out of memory */ |
| elog(WARNING, "cannot allocate memory for ranger response"); |
| return 0; |
| } |
| memcpy(curl->response.buffer + curl->response.response_size, contents, realsize); |
| elog(RANGER_LOG, "read from ranger restful response: %s", curl->response.buffer); |
| curl->response.response_size += realsize; |
| curl->response.buffer[curl->response.response_size] = '\0'; |
| return realsize; |
| } |
| |
| /** |
| * @return 0 curl success; -1 curl failed |
| */ |
| static int call_ranger_rest(CURL_HANDLE curl_handle, const char* request) |
| { |
| int ret = -1; |
| int retry = 2; |
| CURLcode res; |
| bool switchToMaster = false; |
| Assert(request != NULL); |
| |
| /* |
| * If master is talking with standby RPS, for every predefined interval |
| * (controlled by a GUC hawq_rps_check_local_interval) it will check if local RPS works now. |
| */ |
| if (curl_handle->talkingWithStandby) |
| { |
| uint64_t current_time = gettime_microsec(); |
| if ((current_time - curl_handle->lastCheckTimestamp) > 1000000LL * rps_check_local_interval) |
| { |
| curl_handle->talkingWithStandby = false; |
| curl_handle->lastCheckTimestamp = 0; |
| elog(RANGER_LOG, |
| "master has been talking to standby RPS for more than %d seconds, try switching to master RPS", |
| rps_check_local_interval); |
| switchToMaster = true; |
| } |
| } |
| |
| /* |
| * try to connect standby's RPS if fail in connecting master's RPS |
| */ |
| while(retry > 0 && ret != 0) |
| { |
| /* |
| * Re-initializes all options previously set on a specified CURL handle |
| * to the default values. This puts back the handle to the same state as |
| * it was in when it was just created with curl_easy_init.It does not |
| * change the following information kept in the handle: live connections, |
| * the Session ID cache, the DNS cache, the cookies and shares. |
| */ |
| curl_easy_reset(curl_handle->curl_handle); |
| /* timeout: hard-coded temporarily and maybe should be a guc in future */ |
| curl_easy_setopt(curl_handle->curl_handle, CURLOPT_TIMEOUT, 30L); |
| |
| /* specify URL to get */ |
| StringInfoData tname; |
| initStringInfo(&tname); |
| appendStringInfo(&tname, "http://"); |
| appendStringInfo(&tname, "%s", curl_handle->talkingWithStandby?standby_addr_host:master_addr_host); |
| appendStringInfo(&tname, ":"); |
| appendStringInfo(&tname, "%d", rps_addr_port); |
| appendStringInfo(&tname, "/rps"); |
| curl_easy_setopt(curl_handle->curl_handle, CURLOPT_URL, tname.data); |
| pfree(tname.data); |
| |
| struct curl_slist *headers = NULL; |
| headers = curl_slist_append(headers, "Content-Type:application/json"); |
| curl_easy_setopt(curl_handle->curl_handle, CURLOPT_HTTPHEADER, headers); |
| |
| curl_easy_setopt(curl_handle->curl_handle, CURLOPT_POSTFIELDS,request); |
| /* send all data to this function */ |
| curl_easy_setopt(curl_handle->curl_handle, CURLOPT_WRITEFUNCTION, write_callback); |
| curl_easy_setopt(curl_handle->curl_handle, CURLOPT_WRITEDATA, (void *)curl_handle); |
| |
| res = curl_easy_perform(curl_handle->curl_handle); |
| if(request_id == INT_MAX) |
| { |
| request_id = 0; |
| } |
| request_id++; |
| /* check for errors */ |
| if(res != CURLE_OK) |
| { |
| if (retry > 1) |
| { |
| /* Don't expose this warning message to client, just record in log. |
| * The value of whereToSendOutput is DestRemote, so set it to DestNone |
| * and set back after write a warning message in log file. |
| */ |
| CommandDest commandDest = whereToSendOutput; |
| whereToSendOutput = DestNone; |
| elog(WARNING, "ranger plugin service from http://%s:%d/rps is unavailable : %s, " |
| "trying ranger plugin service at http://%s:%d/rps\n", |
| curl_handle->talkingWithStandby?standby_addr_host:master_addr_host, rps_addr_port, curl_easy_strerror(res), |
| curl_handle->talkingWithStandby?master_addr_host:standby_addr_host, rps_addr_port); |
| curl_handle->talkingWithStandby = !curl_handle->talkingWithStandby; |
| whereToSendOutput = commandDest; |
| } |
| else |
| { |
| elog(ERROR, "permission is unknown due to authorization failure, " |
| "ranger plugin service is unavailable : %s.\n", curl_easy_strerror(res)); |
| } |
| } |
| else |
| { |
| if (switchToMaster && !curl_handle->talkingWithStandby) |
| { |
| /* master's RPS has recovered, switch from standby's RPS to master's RPS */ |
| elog(LOG, "switched from standby's ranger plugin service to master's."); |
| } |
| if (curl_handle->talkingWithStandby && curl_handle->lastCheckTimestamp == 0) |
| { |
| curl_handle->lastCheckTimestamp = gettime_microsec(); |
| } |
| else if (!curl_handle->talkingWithStandby && curl_handle->lastCheckTimestamp != 0) |
| { |
| curl_handle->lastCheckTimestamp = 0; |
| } |
| ret = 0; |
| elog(RANGER_LOG, "retrieved %d bytes data from ranger restful response.", |
| curl_handle->response.response_size); |
| } |
| retry--; |
| } |
| |
| return ret; |
| } |
| |
| /* |
| * check privilege(s) from ranger |
| * @param request_list List of RangerRequestJsonArgs |
| * @param result_list List of RangerPrivilegeResults |
| * @return 0 get response from ranger and parse success; -1 other error |
| */ |
| int check_privilege_from_ranger(List *request_list, List *result_list) |
| { |
| json_object* jrequest = create_ranger_request_json(request_list, result_list); |
| Assert(jrequest != NULL); |
| |
| const char *request = json_object_to_json_string(jrequest); |
| Assert(request != NULL); |
| elog(RANGER_LOG, "send json request to ranger : %s", request); |
| |
| /* call GET method to send request*/ |
| Assert(curl_context_ranger.hasInited); |
| if (call_ranger_rest(&curl_context_ranger, request) < 0) |
| { |
| return -1; |
| } |
| |
| /* free the JSON object */ |
| json_object_put(jrequest); |
| |
| /* parse the JSON-format result */ |
| int ret = parse_ranger_response(curl_context_ranger.response.buffer, result_list); |
| if (ret < 0) |
| { |
| elog(ERROR, "parse ranger response failed, ranger response content is %s", |
| curl_context_ranger.response.buffer == NULL? "empty.":curl_context_ranger.response.buffer); |
| } |
| if (curl_context_ranger.response.buffer != NULL) |
| { |
| /* reset response size to reuse the buffer. */ |
| curl_context_ranger.response.response_size = 0; |
| } |
| |
| return ret; |
| } |