blob: c5724afd8a695a23b9c46e678ba76ff2a10c38c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* rangerrest.c
* routines to interact with Ranger REST API
*
*-------------------------------------------------------------------------
*/
#include "utils/rangerrest.h"
#include "utils/hsearch.h"
#include "cdb/cdbvars.h"
/*
* A mapping from AclObjectKind to string
*/
char* AclObjectKindStr[] =
{
"table", /* pg_class */
"sequence", /* pg_sequence */
"database", /* pg_database */
"function", /* pg_proc */
"operator", /* pg_operator */
"type", /* pg_type */
"language", /* pg_language */
"namespace", /* pg_namespace */
"oplass", /* pg_opclass */
"conversion", /* pg_conversion */
"tablespace", /* pg_tablespace */
"filespace", /* pg_filespace */
"filesystem", /* pg_filesystem */
"fdw", /* pg_foreign_data_wrapper */
"foreign_server", /* pg_foreign_server */
"protocol", /* pg_extprotocol */
"none" /* MUST BE LAST */
};
static int request_id = 1;
static void getClientIP(char *remote_host)
{
if( MyProcPort->remote_host == NULL || strlen(MyProcPort->remote_host) == 0 )
{
snprintf(remote_host, HOST_BUFFER_SIZE, "%s", "UNKNOWN");
return;
}
if (strcmp(MyProcPort->remote_host, "[local]") == 0)
{
snprintf(remote_host, HOST_BUFFER_SIZE, "%s", "127.0.0.1");
}
else
{
snprintf(remote_host, HOST_BUFFER_SIZE, "%s", MyProcPort->remote_host);
}
}
/*
* parse ranger response
* @param buffer ranger response
* @param result_list List of RangerPrivilegeResults
* @return 0 parse success; -1 other error
*/
static int parse_ranger_response(char* buffer, List *result_list)
{
if (buffer == NULL || strlen(buffer) == 0)
return -1;
elog(RANGER_LOG, "parse ranger restful response content : %s", buffer);
struct json_object *response = json_tokener_parse(buffer);
if (response == NULL)
{
elog(WARNING, "failed to parse json tokener.");
return -1;
}
struct json_object *accessObj = NULL;
if (!json_object_object_get_ex(response, "access", &accessObj))
{
elog(WARNING, "failed to get json \"access\" field.");
return -1;
}
int arraylen = json_object_array_length(accessObj);
elog(RANGER_LOG, "parse ranger response result array length: %d",arraylen);
for (int i=0; i< arraylen; i++){
struct json_object *jvalue = NULL;
struct json_object *jallow = NULL;
struct json_object *jresource = NULL;
struct json_object *jprivilege = NULL;
jvalue = json_object_array_get_idx(accessObj, i);
if (jvalue == NULL)
return -1;
if (!json_object_object_get_ex(jvalue, "allowed", &jallow))
return -1;
if (!json_object_object_get_ex(jvalue, "resource", &jresource))
return -1;
if (!json_object_object_get_ex(jvalue, "privileges", &jprivilege))
return -1;
json_bool ok = json_object_get_boolean(jallow);
const char *resource_str = json_object_get_string(jresource);
const char *privilege_str = json_object_get_string(jprivilege);
uint32 resource_sign = string_hash(resource_str, strlen(resource_str));
uint32 privilege_sign = string_hash(privilege_str, strlen(privilege_str));
elog(RANGER_LOG, "ranger response access sign, resource_str: %s, privilege_str: %s",
resource_str, privilege_str);
ListCell *result;
/* get each resource result by use sign */
foreach(result, result_list) {
/* loop find is enough for performence*/
RangerPrivilegeResults *result_ptr = (RangerPrivilegeResults *) lfirst(result);
/* if only one access in response, no need to check sign*/
if (arraylen > 1 &&
(result_ptr->resource_sign != resource_sign || result_ptr->privilege_sign != privilege_sign) )
continue;
if (ok == 1)
result_ptr->result = RANGERCHECK_OK;
else
result_ptr->result = RANGERCHECK_NO_PRIV;
}
}
return 0;
}
/**
* convert a string to lower
*/
static void str_tolower(char *dest, const char *src)
{
Assert(src != NULL);
Assert(dest != NULL);
int len = strlen(src);
for (int i = 0; i < len; i++)
{
unsigned char ch = (unsigned char) src[i];
if (ch >= 'A' && ch <= 'Z')
ch += 'a' - 'A';
*(dest+i) = ch;
}
dest[len] = '\0';
}
/**
* Create a JSON object for Ranger request given some parameters.
* example:
* {
* "requestId": 1,
* "user": "joe",
* "groups": ["admin","us"],
* "clientIp": "123.0.0.21",
* "context": "SELECT * FROM sales",
* "access":
* [
* {
* "resource":
* {
* "database": "finance"
* },
* "privileges": ["connect"]
* },
* {
* "resource":
* {
* "database": "finance",
* "schema": "us",
* "table": "sales"
* },
* "privileges": ["select", "insert"]
* }
* ]
* }
*
* @param request_list List of RangerRequestJsonArgs
* @param result_list List of RangerPrivilegeResults
* @return the parsed json object
*/
static json_object *create_ranger_request_json(List *request_list, List *result_list)
{
json_object *jrequest = json_object_new_object();
json_object *juser = NULL;
json_object *jaccess = json_object_new_array();
char *user = NULL;
ListCell *arg;
int j = 0;
foreach(arg, request_list)
{
RangerRequestJsonArgs *arg_ptr = (RangerRequestJsonArgs *) lfirst(arg);
if (user == NULL)
{
user = arg_ptr->user;
juser = json_object_new_string(user);
}
AclObjectKind kind = arg_ptr->kind;
char* object = arg_ptr->object;
Assert(user != NULL && object != NULL);
elog(RANGER_LOG, "build json for ranger restful request, user:%s, kind:%s, object:%s",
user, AclObjectKindStr[kind], object);
json_object *jelement = json_object_new_object();
json_object *jresource = json_object_new_object();
json_object *jactions = json_object_new_array();
switch(kind)
{
case ACL_KIND_CLASS:
case ACL_KIND_SEQUENCE:
case ACL_KIND_PROC:
case ACL_KIND_NAMESPACE:
case ACL_KIND_LANGUAGE:
{
char *ptr = NULL;
char *name = NULL;
char *first = NULL; // could be a database or protocol or tablespace
char *second = NULL; // could be a schema or language
char *third = NULL; // could be a table or sequence or function
int idx = 0;
for (name = strtok_r(object, ".", &ptr); name;
name = strtok_r(NULL, ".", &ptr), idx++)
{
if (idx == 0)
{
first = pstrdup(name);
}
else if (idx == 1)
{
second = pstrdup(name);
}
else
{
third = pstrdup(name);
}
}
if (first != NULL)
{
json_object *jfirst = json_object_new_string(first);
json_object_object_add(jresource, "database", jfirst);
}
if (second != NULL)
{
json_object *jsecond = json_object_new_string(second);
json_object_object_add(jresource,
(kind == ACL_KIND_LANGUAGE) ? "language" : "schema", jsecond);
}
if (third != NULL)
{
json_object *jthird = json_object_new_string(third);
json_object_object_add(jresource,
(kind == ACL_KIND_CLASS) ? "table" :
(kind == ACL_KIND_SEQUENCE) ? "sequence" : "function", jthird);
}
if (first != NULL)
pfree(first);
if (second != NULL)
pfree(second);
if (third != NULL)
pfree(third);
break;
}
case ACL_KIND_OPER:
case ACL_KIND_CONVERSION:
case ACL_KIND_DATABASE:
case ACL_KIND_TABLESPACE:
case ACL_KIND_TYPE:
case ACL_KIND_FILESYSTEM:
case ACL_KIND_FDW:
case ACL_KIND_FOREIGN_SERVER:
case ACL_KIND_EXTPROTOCOL:
{
json_object *jobject = json_object_new_string(object);
json_object_object_add(jresource, AclObjectKindStr[kind], jobject);
break;
}
default:
elog(ERROR, "unsupported object kind : %s", AclObjectKindStr[kind]);
} // switch
json_object_object_add(jelement, "resource", jresource);
ListCell *cell;
foreach(cell, arg_ptr->actions)
{
/* need more normalization in future */
char lower_action[32];
str_tolower(lower_action, (char *)cell->data.ptr_value);
lower_action[sizeof(lower_action)-1] = '\0';
json_object* jaction = json_object_new_string(lower_action);
json_object_array_add(jactions, jaction);
}
json_object_object_add(jelement, "privileges", jactions);
json_object_array_add(jaccess, jelement);
/* set access sign */
RangerPrivilegeResults *result_ptr = (RangerPrivilegeResults *)list_nth(result_list, j);
const char *resource_str = json_object_to_json_string(jresource);
const char *privilege_str = json_object_to_json_string(jactions);
result_ptr->resource_sign = string_hash(resource_str, strlen(resource_str));
result_ptr->privilege_sign = string_hash(privilege_str, strlen(privilege_str));
elog(RANGER_LOG, "request access sign, resource_str:%s, privilege_str:%s",
resource_str, privilege_str);
j++;
} // foreach
char str[32];
sprintf(str,"%d",request_id);
json_object *jreqid = json_object_new_string(str);
json_object_object_add(jrequest, "requestId", jreqid);
json_object_object_add(jrequest, "user", juser);
char remote_host[HOST_BUFFER_SIZE];
getClientIP(remote_host);
json_object *jclientip = json_object_new_string(remote_host);
json_object_object_add(jrequest, "clientIp", jclientip);
json_object *jcontext = json_object_new_string(
(debug_query_string == NULL || strlen(debug_query_string) == 0)
? "connect to db" : debug_query_string);
json_object_object_add(jrequest, "context", jcontext);
json_object_object_add(jrequest, "access", jaccess);
return jrequest;
}
static size_t write_callback(char *contents, size_t size, size_t nitems,
void *userp)
{
size_t realsize = size * nitems;
CURL_HANDLE curl = (CURL_HANDLE) userp;
Assert(curl != NULL);
elog(RANGER_LOG, "ranger restful response size is %d. response buffer size is %d.", curl->response.response_size, curl->response.buffer_size);
int original_size = curl->response.buffer_size;
while(curl->response.response_size + realsize >= curl->response.buffer_size)
{
/* double the buffer size if the buffer is not enough.*/
curl->response.buffer_size = curl->response.buffer_size * 2;
}
if(original_size < curl->response.buffer_size)
{
/* repalloc is not same as realloc, repalloc's first parameter cannot be NULL */
curl->response.buffer = repalloc(curl->response.buffer, curl->response.buffer_size);
}
elog(RANGER_LOG, "ranger restful response size is %d. response buffer size is %d.", curl->response.response_size, curl->response.buffer_size);
if (curl->response.buffer == NULL)
{
/* allocate memory failed. probably out of memory */
elog(WARNING, "cannot allocate memory for ranger response");
return 0;
}
memcpy(curl->response.buffer + curl->response.response_size, contents, realsize);
elog(RANGER_LOG, "read from ranger restful response: %s", curl->response.buffer);
curl->response.response_size += realsize;
curl->response.buffer[curl->response.response_size] = '\0';
return realsize;
}
/**
* @return 0 curl success; -1 curl failed
*/
static int call_ranger_rest(CURL_HANDLE curl_handle, const char* request)
{
int ret = -1;
int retry = 2;
CURLcode res;
bool switchToMaster = false;
Assert(request != NULL);
/*
* If master is talking with standby RPS, for every predefined interval
* (controlled by a GUC hawq_rps_check_local_interval) it will check if local RPS works now.
*/
if (curl_handle->talkingWithStandby)
{
uint64_t current_time = gettime_microsec();
if ((current_time - curl_handle->lastCheckTimestamp) > 1000000LL * rps_check_local_interval)
{
curl_handle->talkingWithStandby = false;
curl_handle->lastCheckTimestamp = 0;
elog(RANGER_LOG,
"master has been talking to standby RPS for more than %d seconds, try switching to master RPS",
rps_check_local_interval);
switchToMaster = true;
}
}
/*
* try to connect standby's RPS if fail in connecting master's RPS
*/
while(retry > 0 && ret != 0)
{
/*
* Re-initializes all options previously set on a specified CURL handle
* to the default values. This puts back the handle to the same state as
* it was in when it was just created with curl_easy_init.It does not
* change the following information kept in the handle: live connections,
* the Session ID cache, the DNS cache, the cookies and shares.
*/
curl_easy_reset(curl_handle->curl_handle);
/* timeout: hard-coded temporarily and maybe should be a guc in future */
curl_easy_setopt(curl_handle->curl_handle, CURLOPT_TIMEOUT, 30L);
/* specify URL to get */
StringInfoData tname;
initStringInfo(&tname);
appendStringInfo(&tname, "http://");
appendStringInfo(&tname, "%s", curl_handle->talkingWithStandby?standby_addr_host:master_addr_host);
appendStringInfo(&tname, ":");
appendStringInfo(&tname, "%d", rps_addr_port);
appendStringInfo(&tname, "/rps");
curl_easy_setopt(curl_handle->curl_handle, CURLOPT_URL, tname.data);
pfree(tname.data);
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type:application/json");
curl_easy_setopt(curl_handle->curl_handle, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl_handle->curl_handle, CURLOPT_POSTFIELDS,request);
/* send all data to this function */
curl_easy_setopt(curl_handle->curl_handle, CURLOPT_WRITEFUNCTION, write_callback);
curl_easy_setopt(curl_handle->curl_handle, CURLOPT_WRITEDATA, (void *)curl_handle);
res = curl_easy_perform(curl_handle->curl_handle);
if(request_id == INT_MAX)
{
request_id = 0;
}
request_id++;
/* check for errors */
if(res != CURLE_OK)
{
if (retry > 1)
{
/* Don't expose this warning message to client, just record in log.
* The value of whereToSendOutput is DestRemote, so set it to DestNone
* and set back after write a warning message in log file.
*/
CommandDest commandDest = whereToSendOutput;
whereToSendOutput = DestNone;
elog(WARNING, "ranger plugin service from http://%s:%d/rps is unavailable : %s, "
"trying ranger plugin service at http://%s:%d/rps\n",
curl_handle->talkingWithStandby?standby_addr_host:master_addr_host, rps_addr_port, curl_easy_strerror(res),
curl_handle->talkingWithStandby?master_addr_host:standby_addr_host, rps_addr_port);
curl_handle->talkingWithStandby = !curl_handle->talkingWithStandby;
whereToSendOutput = commandDest;
}
else
{
elog(ERROR, "permission is unknown due to authorization failure, "
"ranger plugin service is unavailable : %s.\n", curl_easy_strerror(res));
}
}
else
{
if (switchToMaster && !curl_handle->talkingWithStandby)
{
/* master's RPS has recovered, switch from standby's RPS to master's RPS */
elog(LOG, "switched from standby's ranger plugin service to master's.");
}
if (curl_handle->talkingWithStandby && curl_handle->lastCheckTimestamp == 0)
{
curl_handle->lastCheckTimestamp = gettime_microsec();
}
else if (!curl_handle->talkingWithStandby && curl_handle->lastCheckTimestamp != 0)
{
curl_handle->lastCheckTimestamp = 0;
}
ret = 0;
elog(RANGER_LOG, "retrieved %d bytes data from ranger restful response.",
curl_handle->response.response_size);
}
retry--;
}
return ret;
}
/*
* check privilege(s) from ranger
* @param request_list List of RangerRequestJsonArgs
* @param result_list List of RangerPrivilegeResults
* @return 0 get response from ranger and parse success; -1 other error
*/
int check_privilege_from_ranger(List *request_list, List *result_list)
{
json_object* jrequest = create_ranger_request_json(request_list, result_list);
Assert(jrequest != NULL);
const char *request = json_object_to_json_string(jrequest);
Assert(request != NULL);
elog(RANGER_LOG, "send json request to ranger : %s", request);
/* call GET method to send request*/
Assert(curl_context_ranger.hasInited);
if (call_ranger_rest(&curl_context_ranger, request) < 0)
{
return -1;
}
/* free the JSON object */
json_object_put(jrequest);
/* parse the JSON-format result */
int ret = parse_ranger_response(curl_context_ranger.response.buffer, result_list);
if (ret < 0)
{
elog(ERROR, "parse ranger response failed, ranger response content is %s",
curl_context_ranger.response.buffer == NULL? "empty.":curl_context_ranger.response.buffer);
}
if (curl_context_ranger.response.buffer != NULL)
{
/* reset response size to reuse the buffer. */
curl_context_ranger.response.response_size = 0;
}
return ret;
}