blob: 595280cef6b558e2089e8532c2d2e77857f8724a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "access/pxfuriparser.h"
#include "access/pxfutils.h"
#include "catalog/pg_exttable.h"
#include "utils/guc.h"
#include "utils/formatting.h"
#include "utils/uri.h"
static const char* segwork_substring = "segwork=";
static const char segwork_separator = '@';
static const int EMPTY_VALUE_LEN = 2;
static void GPHDUri_parse_protocol(GPHDUri *uri, char **cursor);
static void GPHDUri_parse_authority(GPHDUri *uri, char **cursor);
static void GPHDUri_parse_data(GPHDUri *uri, char **cursor);
static void GPHDUri_parse_options(GPHDUri *uri, char **cursor);
static List* GPHDUri_parse_option(char* pair, GPHDUri *uri);
static void GPHDUri_free_options(GPHDUri *uri);
static void GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str);
static List* GPHDUri_parse_fragment(char* fragment, List* fragments);
static void GPHDUri_free_fragments(GPHDUri *uri);
static void GPHDUri_debug_print_options(GPHDUri *uri);
static void GPHDUri_debug_print_segwork(GPHDUri *uri);
static void GPHDUri_fetch_authority_from_ha_nn(GPHDUri *uri, char *nameservice);
char* normalize_key_name(const char* key);
/* parseGPHDUri
*
* Go over a URI string and parse it into its various components while
* verifying valid structure given a specific target protocol.
*
* URI format:
* <protocol name>://<authority>/<data>?<option>&<option>&<...>&segwork=<segwork>
*
*
* protocol name - must be 'pxf'
* authority - host:port
* data - data path (directory name/table name/etc., depending on target)
* options - valid options are dependent on the protocol. Each
* option is a key value pair.
* segwork option is not a user option but a gp master
* option. It is removed as fast as possible from the uri
* so errors, won't be printed with it to the user.
*
* inputs:
* 'uri_str' - the raw uri str
*
* returns:
* a parsed uri as a GPHDUri structure, or reports a format error.
*/
GPHDUri*
parseGPHDUri(const char *uri_str)
{
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
char *cursor;
uri->uri = GPHDUri_dup_without_segwork(uri_str);
cursor = uri->uri;
GPHDUri_parse_segwork(uri, uri_str);
GPHDUri_parse_protocol(uri, &cursor);
GPHDUri_parse_authority(uri, &cursor);
GPHDUri_parse_data(uri, &cursor);
GPHDUri_parse_options(uri, &cursor);
return uri;
}
/* parseGPHDUriForMetadata
*
* Go over a URI string and parse it into its various components while
* verifying valid structure given a specific target protocol.
*
* URI format:
* <authority>/
* authority - host:port
* inputs:
* 'uri_str' - the raw uri str
*
* returns:
* a parsed uri as a GPHDUri structure, or reports a format error.
*/
GPHDUri*
parseGPHDUriForMetadata(char *uri_str)
{
GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri));
uri->uri = pstrdup(uri_str);
GPHDUri_parse_authority(uri, &uri_str);
return uri;
}
void
freeGPHDUri(GPHDUri *uri)
{
pfree(uri->protocol);
GPHDUri_free_fragments(uri);
pfree(uri->host);
pfree(uri->port);
pfree(uri->data);
if (uri->profile)
pfree(uri->profile);
GPHDUri_free_options(uri);
if (uri->ha_nodes)
GPHD_HA_release_nodes(uri->ha_nodes);
pfree(uri);
}
void
freeGPHDUriForMetadata(GPHDUri *uri)
{
pfree(uri->host);
pfree(uri->port);
if (uri->profile)
pfree(uri->profile);
pfree(uri);
}
/*
* GPHDUri_get_value_for_opt
*
* Given a key, find the matching val and assign it to 'val'.
* If 'emit_error' is set, report an error and quit if the
* requested key or its value is missing.
*
* Returns 0 if the key was found, -1 otherwise.
*/
int
GPHDUri_get_value_for_opt(GPHDUri *uri, char *key, char **val, bool emit_error)
{
ListCell *item;
foreach(item, uri->options)
{
OptionData *data = (OptionData*)lfirst(item);
if (pg_strcasecmp(data->key, key) == 0)
{
*val = data->value;
if (emit_error && !(*val))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("No value assigned to the %s option in "
"%s", key, uri->uri)));
return 0;
}
}
if (emit_error)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Missing %s option in %s", key, uri->uri)));
return -1;
}
/*
* GPHDUri_verify_no_duplicate_options
* verify each option appears only once (case insensitive)
*/
void
GPHDUri_verify_no_duplicate_options(GPHDUri *uri)
{
ListCell *option = NULL;
List *duplicateKeys = NIL;
List *previousKeys = NIL;
StringInfoData duplicates;
initStringInfo(&duplicates);
foreach(option, uri->options)
{
OptionData *data = (OptionData*)lfirst(option);
Value *key = makeString(str_toupper(data->key, strlen(data->key)));
if(!list_member(previousKeys, key))
{
previousKeys = lappend(previousKeys, key);
}
else if(!list_member(duplicateKeys, key))
{
duplicateKeys = lappend(duplicateKeys, key);
appendStringInfo(&duplicates, "%s, ", strVal(key));
}
}
if(duplicates.len > 0)
{
truncateStringInfo(&duplicates, duplicates.len - strlen(", ")); //omit trailing ', '
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: Duplicate option(s): %s", uri->uri, duplicates.data)));
}
list_free(duplicateKeys);
list_free(previousKeys);
pfree(duplicates.data);
}
/*
* GPHDUri_verify_core_options_exist
* This function is given a list of core options to verify their existence.
*/
void
GPHDUri_verify_core_options_exist(GPHDUri *uri, List *coreOptions)
{
char *key = NULL;
ListCell *coreOption = NULL;
StringInfoData missing;
initStringInfo(&missing);
foreach(coreOption, coreOptions)
{
bool optExist = false;
ListCell *option = NULL;
foreach(option, uri->options)
{
key = ((OptionData*)lfirst(option))->key;
if (pg_strcasecmp(key, lfirst(coreOption)) == 0)
{
optExist = true;
break;
}
}
if(!optExist)
{
appendStringInfo(&missing, "%s and ", (char*)lfirst(coreOption));
}
}
if(missing.len > 0)
{
truncateStringInfo(&missing, missing.len - strlen(" and ")); //omit trailing ' and '
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: PROFILE or %s option(s) missing", uri->uri, missing.data)));
}
pfree(missing.data);
}
/*
* GPHDUri_parse_protocol
*
* Parse the protocol section of the URI which is passed down
* in 'cursor', having 'cursor' point at the current string
* location. Set the protocol string and the URI type.
*
* See parseGPHDUri header for URI structure description.
*/
static void
GPHDUri_parse_protocol(GPHDUri *uri, char **cursor)
{
const char *ptc_sep = "://";
int ptc_sep_len = strlen(ptc_sep);
char *post_ptc;
char *start = *cursor;
int ptc_len;
post_ptc = strstr(start, ptc_sep);
if(!post_ptc)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s", uri->uri)));
ptc_len = post_ptc - start;
uri->protocol = pnstrdup(start, ptc_len);
if (!IS_PXF_URI(uri->uri))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s : unsupported protocol '%s'",
uri->uri, uri->protocol)));
/* set cursor to new position and return */
*cursor = start + ptc_len + ptc_sep_len;
}
/*
* GPHDUri_parse_authority
*
* Parse the authority section of the URI which is passed down
* in 'cursor', having 'cursor' point at the current string
* location.
* authority string can have one of two (2) forms:
* host:port
* ha_nameservice_string
*
* See parseGPHDUri header for URI structure description.
*/
static void
GPHDUri_parse_authority(GPHDUri *uri, char **cursor)
{
char *portstart, *end, *ipv6, *hostport;
int totlen, hostlen;
const long max_port_number = 65535;
long port;
char *hoststart = *cursor;
/* implicit authority 'localhost:defport' (<ptc>:///) */
if (*hoststart == '/')
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s : missing authority section",
uri->uri)));
end = strchr(hoststart, '/');
if (!end)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s : missing authority section",
uri->uri)));
/* host:port string*/
totlen = end - hoststart;
hostport = pnstrdup(hoststart, totlen);
/* find the portstart ':' */
ipv6 = strchr(hostport, ']');
if (ipv6) /* IPV6 */
portstart = strchr(ipv6, ':');
else /* IPV4 */
portstart = strchr(hostport, ':');
if (portstart) /* the authority is of the form host:port */
{
uri->port = pstrdup(portstart + 1);
hostlen = portstart - hostport;
uri->host = pnstrdup(hostport, hostlen);
uri->ha_nodes = NULL; /* signal that we are not in the HA case */
}
else /* the authority is a nameservice string - we are going to find the HighAvailibility NN */
GPHDUri_fetch_authority_from_ha_nn(uri, hostport);
pfree(hostport);
*cursor = ++end;
port = atol(uri->port);
if (port <=0 || port > max_port_number)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid port: %s for authority host %s",
uri->port, uri->host)));
/* if pxf_isilon is true, ignore the port in the uri
* and use pxf_service_port instead to access PXF.
*/
if (pxf_isilon)
{
sprintf(uri->port, "%d", pxf_service_port);
}
}
/*
* GPHDUri_fetch_authority_from_ha_nn
*
* Fetch authority from the high-availability Namenode pair.
* In case we got a simple string for the authority instead
* of a host:port form, we assume that we received a nameservice
* string describing a High-Availability Namenode couple.
* For this case we are going to read the HDFS client configuration
* in order to retrieve the active Namenode host:port based on the
* nameservice string.
* TODO:
* This is a temporary solution that will be removed once the PXF servlet
* will stop using the HDFS namenode/datanodes as a hosting application
* server and will move to an independent stand-alone application server
*/
static void
GPHDUri_fetch_authority_from_ha_nn(GPHDUri *uri, char *nameservice)
{
uri->ha_nodes = GPHD_HA_load_nodes(nameservice);
/*
* Let's try our luck with the first node. In case the first one is not active
* the failover code around call_rest() - pxfmasterapi.c, will try the second
* node
*/
uri->host = pstrdup(uri->ha_nodes->nodes[0]);
uri->port = pstrdup(uri->ha_nodes->restports[0]);
}
/*
* GPHDUri_parse_data
*
* Parse the data section of the URI which is passed down
* in 'cursor', having 'cursor' point at the current string
* location.
*
* See parseGPHDUri header for URI structure description.
*/
static void
GPHDUri_parse_data(GPHDUri *uri, char **cursor)
{
char *start = *cursor;
char *options_section = strrchr(start, '?');
size_t data_len;
/*
* If there exists an 'options' section, the data section length
* is from start point to options section point. Otherwise, the
* data section length is the remaining string length from start.
*/
if (options_section)
data_len = options_section - start;
else
data_len = strlen(start);
uri->data = pnstrdup(start, data_len);
*cursor += data_len;
}
/*
* GPHDUri_parse_options
*
* Parse the data section of the URI which is passed down
* in 'cursor', having 'cursor' point at the current string
* location.
*
* See parseGPHDUri header for URI structure description.
*/
static void
GPHDUri_parse_options(GPHDUri *uri, char **cursor)
{
char *dup = pstrdup(*cursor);
char *start = dup;
/* option section must start with '?'. if absent, there are no options */
if (!start || start[0] != '?')
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: missing options section",
uri->uri)));
/* skip '?' */
start++;
/* sanity check */
if (strlen(start) < 2)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: invalid option after '?'",
uri->uri)));
/* ok, parse the options now */
const char *sep = "&";
char *strtok_context;
char *pair;
for (pair = strtok_r(start, sep, &strtok_context);
pair;
pair = strtok_r(NULL, sep, &strtok_context))
{
uri->options = GPHDUri_parse_option(pair, uri);
}
pfree(dup);
}
/*
* Parse an option in the form:
* <key>=<value>
* to OptionData object (key and value).
*/
static List*
GPHDUri_parse_option(char* pair, GPHDUri *uri)
{
char *sep;
int pair_len, key_len, value_len;
OptionData* option_data;
option_data = palloc0(sizeof(OptionData));
pair_len = strlen(pair);
sep = strchr(pair, '=');
if (sep == NULL) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: option '%s' missing '='", uri->uri, pair)));
}
if (strchr(sep + 1, '=') != NULL) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: option '%s' contains duplicate '='", uri->uri, pair)));
}
key_len = sep - pair;
if (key_len == 0) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: option '%s' missing key before '='", uri->uri, pair)));
}
value_len = pair_len - key_len + 1;
if (value_len == EMPTY_VALUE_LEN) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Invalid URI %s: option '%s' missing value after '='", uri->uri, pair)));
}
option_data->key = pnstrdup(pair,key_len);
option_data->value = pnstrdup(sep + 1, value_len);
char *x_gp_key = normalize_key_name(option_data->key);
if (strcmp(x_gp_key, "X-GP-OPTIONS-PROFILE") == 0)
{
uri->profile = pstrdup(option_data->value);
}
pfree(x_gp_key);
return lappend(uri->options, option_data);
}
/*
* Free options list
*/
static void
GPHDUri_free_options(GPHDUri *uri)
{
ListCell *option = NULL;
foreach(option, uri->options)
{
OptionData *data = (OptionData*)lfirst(option);
pfree(data->key);
pfree(data->value);
pfree(data);
}
list_free(uri->options);
uri->options = NIL;
}
/*
* GPHDUri_debug_print
*
* For debugging while development only.
*/
void
GPHDUri_debug_print(GPHDUri *uri)
{
elog(NOTICE,
"URI: %s, Host: %s, Port: %s, Data Path: %s",
uri->uri,
uri->host,
uri->port,
uri->data);
if (uri->profile)
{
elog(NOTICE, "Profile: %s", uri->profile);
}
GPHDUri_debug_print_options(uri);
GPHDUri_debug_print_segwork(uri);
}
/*
* GPHDUri_debug_print_options
*
* For debugging while development only.
*/
void
GPHDUri_debug_print_options(GPHDUri *uri)
{
ListCell *item;
int count = 0;
elog(NOTICE, "options section data: ");
foreach(item, uri->options)
{
OptionData *data = (OptionData*)lfirst(item);
elog(NOTICE,
"%u: key: %s, value: %s",
count, data->key, data->value);
++count;
}
}
/*
* GPHDUri_debug_print_segwork
*
* For debugging while development only.
*/
void
GPHDUri_debug_print_segwork(GPHDUri *uri)
{
ListCell *item;
int count = 0;
StringInfoData fragment_data;
initStringInfo(&fragment_data);
elog(NOTICE, "segwork section data: ");
foreach(item, uri->fragments)
{
FragmentData *data = (FragmentData*)lfirst(item);
appendStringInfo(&fragment_data, "%u: authority: %s, index %s",
count, data->authority, data->index);
appendStringInfo(&fragment_data, ", fragment metadata: %s",
data->fragment_md ? data->fragment_md : "NULL");
if (data->user_data)
appendStringInfo(&fragment_data, ", user data : %s", data->user_data);
if (data->profile)
appendStringInfo(&fragment_data, ", profile : %s", data->profile);
elog(NOTICE, "%s", fragment_data.data);
++count;
resetStringInfo(&fragment_data);
}
pfree(fragment_data.data);
}
/*
* GPHDUri_parse_segwork parses the segwork section of the uri.
* ...&segwork=<size>@<ip>@<port>@<index><size>@<ip>@<port>@<index><size>...
*/
static void
GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str)
{
char *segwork;
char *fragment;
char *size_end;
int fragment_size, count = 0;
/* skip segwork= */
segwork = strstr(uri_str, segwork_substring);
if (segwork == NULL)
return;
segwork += strlen(segwork_substring);
/*
* read next segment.
* each segment is prefixed its size.
*/
while (segwork && strlen(segwork))
{
/* expect size */
size_end = strchr(segwork, segwork_separator);
Assert(size_end != NULL);
*size_end = '\0';
fragment_size = atoi(segwork);
segwork = size_end + 1; /* skip the size field */
Assert(fragment_size <= strlen(segwork));
fragment = pnstrdup(segwork, fragment_size);
elog(DEBUG2, "GPHDUri_parse_segwork: fragment #%d, size %d, str %s", count, fragment_size, fragment);
uri->fragments = GPHDUri_parse_fragment(fragment, uri->fragments);
segwork += fragment_size;
++count;
pfree(fragment);
}
}
/*
* Parsed a fragment string in the form:
* <ip>@<port>@<index>[@user_data] - 192.168.1.1@1422@1[@user_data]
* to authority ip:port - 192.168.1.1:1422
* to index - 1
* user data is optional
*/
static List*
GPHDUri_parse_fragment(char* fragment, List* fragments)
{
if (!fragment)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.fragment string is null.")));
}
char *dup_frag = pstrdup(fragment);
char *value_start;
char *value_end;
StringInfoData authority_formatter;
FragmentData* fragment_data;
fragment_data = palloc0(sizeof(FragmentData));
initStringInfo(&authority_formatter);
value_start = dup_frag;
/* expect ip */
value_end = strchr(value_start, segwork_separator);
if (value_end == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.")));
}
*value_end = '\0';
appendStringInfo(&authority_formatter, "%s:", value_start);
value_start = value_end + 1;
/* expect port */
value_end = strchr(value_start, segwork_separator);
if (value_end == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.")));
}
*value_end = '\0';
appendStringInfo(&authority_formatter, "%s", value_start);
fragment_data->authority = pstrdup(authority_formatter.data);
pfree(authority_formatter.data);
value_start = value_end + 1;
/* expect source name */
value_end = strchr(value_start, segwork_separator);
if (value_end == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.")));
}
*value_end = '\0';
fragment_data->source_name = pstrdup(value_start);
value_start = value_end + 1;
/* expect index */
value_end = strchr(value_start, segwork_separator);
if (value_end == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.")));
}
*value_end = '\0';
fragment_data->index = pstrdup(value_start);
value_start = value_end + 1;
/* expect fragment metadata */
Assert(value_start);
value_end = strchr(value_start, segwork_separator);
if (value_end == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.")));
}
*value_end = '\0';
fragment_data->fragment_md = pstrdup(value_start);
value_start = value_end + 1;
/* expect user data */
Assert(value_start);
value_end = strchr(value_start, segwork_separator);
if (value_end == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.")));
}
*value_end = '\0';
fragment_data->user_data = pstrdup(value_start);
value_start = value_end + 1;
/* expect for profile */
Assert(value_start);
value_end = strchr(value_start, segwork_separator);
if (value_end == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.")));
}
*value_end = '\0';
if (strlen(value_start) > 0)
fragment_data->profile = pstrdup(value_start);
return lappend(fragments, fragment_data);
}
/*
* Free fragment data
*/
static void
GPHDUri_free_fragment(FragmentData *data)
{
if (data->authority)
pfree(data->authority);
if (data->fragment_md)
pfree(data->fragment_md);
if (data->index)
pfree(data->index);
if (data->profile)
pfree(data->profile);
if (data->source_name)
pfree(data->source_name);
if (data->user_data)
pfree(data->user_data);
pfree(data);
}
/*
* Free fragments list
*/
static void
GPHDUri_free_fragments(GPHDUri *uri)
{
ListCell *fragment = NULL;
foreach(fragment, uri->fragments)
{
FragmentData *data = (FragmentData*)lfirst(fragment);
GPHDUri_free_fragment(data);
}
list_free(uri->fragments);
uri->fragments = NIL;
}
/*
* Returns a uri without the segwork section.
* segwork section removed so users won't get it
* when an error occurs and the uri is printed
*/
char*
GPHDUri_dup_without_segwork(const char* uri)
{
char *segwork;
char *no_segwork;
no_segwork = pstrdup(uri);
segwork = strstr(no_segwork, segwork_substring);
/* If segwork_substring was not found,
* just return a dup of the string
*/
if (segwork != NULL)
{
/* back 1 char to include either & or ? */
--segwork;
*segwork = 0;
}
return no_segwork;
}
/* --------------------------------
* RelationIsExternalPxf -
*
* Check if a table is a readable external PXF tbl.
* --------------------------------
*/
bool RelationIsExternalPxfReadOnly(Relation rel, StringInfo location)
{
ExtTableEntry *tbl;
List *locsList;
ListCell *cell;
if (!RelationIsExternal(rel))
return false;
tbl = GetExtTableEntry(rel->rd_id);
Assert(tbl);
/* Nothing to do for writable tables */
if (tbl->iswritable)
{
elog(DEBUG2, "RelationIsExternalPxfReadOnly: relation %s is writable external table",
RelationGetRelationName(rel));
return false;
}
locsList = tbl->locations;
foreach(cell, locsList)
{
char* locsItem = strVal(lfirst(cell));
if (!locsItem)
continue;
if (IS_PXF_URI(locsItem))
{
appendStringInfoString(location, locsItem);
pfree(tbl);
return true;
}
}
pfree(tbl);
return false;
}
/*
* Full name of the HEADER KEY expected by the PXF service
* Converts input string to upper case and prepends "X-GP-OPTIONS" string
*
*/
char* normalize_key_name(const char* key)
{
if (!key || strlen(key) == 0)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("internal error in pxfheaders.c:normalize_key_name. Parameter key is null or empty.")));
}
StringInfoData formatter;
initStringInfo(&formatter);
char* upperCasedKey = str_toupper(pstrdup(key), strlen(key));
appendStringInfo(&formatter, "X-GP-OPTIONS-%s", upperCasedKey);
pfree(upperCasedKey);
return formatter.data;
}