blob: e4bbddf62af7e60d63063ff89fea540780180f4c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "postgres.h"
#include <stdarg.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include "access/fileam.h"
#include "access/filesplit.h"
#include "access/heapam.h"
#include "access/valid.h"
#include "catalog/pg_extprotocol.h"
#include "catalog/namespace.h"
#include "commands/copy.h"
#include "commands/dbcommands.h"
#include "libpq/libpq-be.h"
#include "libpq/pqsignal.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "port.h"
#include "postmaster/postmaster.h" /* postmaster port*/
#include "utils/relcache.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/uri.h"
#include "utils/guc.h"
#include "utils/builtins.h"
#include <arpa/inet.h>
#include <fstream/gfile.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <stdint.h>
#include <curl/curl.h>
#include <fstream/fstream.h>
#include "cdb/cdbsreh.h"
#include "cdb/cdbtimer.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "access/url.h"
#include "tcop/tcopprot.h"
#ifndef FALSE
#define FALSE 0
#endif
#ifndef TRUE
#define TRUE 1
#endif
#define HOST_NAME_SIZE 100
static char *interpretError(int exitCode, char *buf, size_t buflen, char *err, size_t errlen);
static const char *getSignalNameFromCode(int signo);
static int popen_with_stderr(int *rwepipe, const char *exe, bool forwrite);
static int pclose_with_stderr(int pid, int *rwepipe, char *buf, int len);
static void gp_proto0_write_done(URL_FILE *file);
static int32 InvokeExtProtocol(void *ptr,
size_t nbytes,
URL_FILE *file,
CopyState pstate,
bool last_call,
ExternalSelectDesc desc,
List **psplits);
void extract_http_domain(char* i_path, char* o_domain, int dlen);
/* we use a global one for convenience */
CURLM *multi_handle = 0;
/*
* header_callback
*
* when a header arrives from the server curl calls this routine. In here we
* extract the information we are interested in from the header, and store it
* in the passed in callback argument (URL_FILE *) which lives in our
* application.
*/
static size_t
header_callback(void *ptr_, size_t size, size_t nmemb, void *userp)
{
URL_FILE* url = (URL_FILE *)userp;
char* ptr = ptr_;
int len = size * nmemb;
int i;
char buf[20];
Assert(size == 1);
/*
* parse the http response line (code and message) from
* the http header that we get. Basically it's the whole
* first line (e.g: "HTTP/1.0 400 time out"). We do this
* in order to capture any error message that comes from
* gpfdist, and later use it to report the error string in
* check_response() to the database user.
*/
if (url->u.curl.http_response == 0)
{
int n = nmemb;
char* p;
if (n > 0 && 0 != (p = palloc(n+1)))
{
memcpy(p, ptr, n);
p[n] = 0;
if (n > 0 && (p[n-1] == '\r' || p[n-1] == '\n'))
p[--n] = 0;
if (n > 0 && (p[n-1] == '\r' || p[n-1] == '\n'))
p[--n] = 0;
url->u.curl.http_response = p;
}
}
/*
* extract the GP-PROTO value from the HTTP header.
*/
if (len > 10 && *ptr == 'X' && 0 == strncmp("X-GP-PROTO", ptr, 10))
{
ptr += 10;
len -= 10;
while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
{
ptr++;
len--;
}
if (len > 0 && *ptr == ':')
{
ptr++;
len--;
while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
{
ptr++;
len--;
}
for (i = 0; i < sizeof(buf) - 1 && i < len; i++)
buf[i] = ptr[i];
buf[i] = 0;
url->u.curl.gp_proto = strtol(buf, 0, 0);
// elog(NOTICE, "X-GP-PROTO: %s (%d)", buf, url->u.curl.gp_proto);
}
}
return size * nmemb;
}
/*
* write_callback
*
* when data arrives from gpfdist server and curl is ready to write it
* to our application, it calls this routine. In here we will store the
* data in the application variable (URL_FILE *)file which is the passed
* in the fourth argument as a part of the callback settings.
*
* we return the number of bytes written to the application buffer
*/
static size_t
write_callback(char *buffer,
size_t size,
size_t nitems,
void *userp)
{
URL_FILE* file = (URL_FILE *)userp;
curlctl_t* curl = &file->u.curl;
const int nbytes = size * nitems;
int n;
//elog(NOTICE, "write_callback %d", nbytes);
/*
* if insufficient space in buffer make more space
*/
if (curl->in.top + nbytes >= curl->in.max)
{
/* compact ? */
if (curl->in.bot)
{
n = curl->in.top - curl->in.bot;
memmove(curl->in.ptr, curl->in.ptr + curl->in.bot, n);
curl->in.bot = 0;
curl->in.top = n;
}
/* if still insufficient space in buffer, then do realloc */
if (curl->in.top + nbytes >= curl->in.max)
{
char *newbuf;
n = curl->in.top - curl->in.bot + nbytes + 1024;
newbuf = realloc(curl->in.ptr, n);
if (!newbuf)
{
elog(ERROR, "out of memory (curl write_callback)");
}
curl->in.ptr = newbuf;
curl->in.max = n;
// elog(NOTICE, "max now at %d", n);
Assert(curl->in.top + nbytes < curl->in.max);
}
}
/* enough space. copy buffer into curl->buf */
memcpy(curl->in.ptr + curl->in.top, buffer, nbytes);
curl->in.top += nbytes;
return nbytes;
}
/*
* check_response
*
* If got an HTTP response with an error code from the server (gpfdist), report
* the error code and message it to the database user and abort operation.
*/
static int
check_response(URL_FILE *file, int *rc, const char **response_string)
{
long response_code;
char* effective_url = NULL;
CURL* curl = file->u.curl.handle;
static char buffer[30];
/* get the response code from curl */
if (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code) != CURLE_OK)
{
*rc = 500;
*response_string = "curl_easy_getinfo failed";
return -1;
}
*rc = response_code;
snprintf(buffer, sizeof buffer, "Response Code=%d", (int)response_code);
*response_string = buffer;
if (curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effective_url) != CURLE_OK)
return -1;
if (! (200 <= response_code && response_code < 300))
{
if (response_code == 0)
{
long oserrno = 0;
static char connmsg[64];
/* get the os level errno, and string representation of it */
if (curl_easy_getinfo(curl, CURLINFO_OS_ERRNO, &oserrno) == CURLE_OK)
{
if (oserrno != 0)
snprintf(connmsg, sizeof connmsg, "error code = %d (%s)",
(int) oserrno, strerror((int)oserrno));
}
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection with gpfdist failed for %s. "
"effective url: %s. %s", file->url, effective_url,
(oserrno != 0 ? connmsg : "")),
errOmitLocation(true)));
}
else
{
/* we need to sleep 1 sec to avoid this condition:
1- seg X gets an error message from gpfdist
2- seg Y gets a 500 error
3- seg Y report error before seg X, and error message
in seg X is thrown away.
*/
pg_usleep(1000000);
elog(ERROR, "http response code %ld from gpfdist (%s): %s",
response_code, file->url,
file->u.curl.http_response ? file->u.curl.http_response : "?");
}
}
return 0;
}
/*
* fill_buffer
*
* Attempt to fill the read buffer up to requested number of bytes.
* We first check if we already have the number of bytes that we
* want already in the buffer (from write_callback), and we do
* a select on the socket only if we don't have enough.
*
* return 0 if successful; raises ERROR otherwise.
*/
static int
fill_buffer(URL_FILE *file, int want)
{
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
int maxfd;
struct timeval timeout;
int nfds, e;
curlctl_t* curl = &file->u.curl;
/* elog(NOTICE, "= still_running %d, bot %d, top %d, want %d",
file->u.curl.still_running, curl->in.bot, curl->in.top, want);
*/
/* attempt to fill buffer */
while (curl->still_running && curl->in.top - curl->in.bot < want)
{
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
CHECK_FOR_INTERRUPTS();
/* set a suitable timeout to fail on */
timeout.tv_sec = 5;
timeout.tv_usec = 0;
/* get file descriptors from the transfers */
if (0 != (e = curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd)))
{
elog(ERROR, "internal error: curl_multi_fdset failed (%d - %s)",
e, curl_easy_strerror(e));
}
if (maxfd == -1)
{
/* fall through to curl_multi_perform directly */
pg_usleep(100);
nfds = 1;
}
else if (-1 == (nfds = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout)))
{
if (errno == EINTR || errno == EAGAIN)
continue;
elog(ERROR, "internal error: select failed on curl_multi_fdset (maxfd %d) (%d - %s)",
maxfd, errno, strerror(errno));
}
if (nfds > 0)
{
/* timeout or readable/writable sockets */
/* note we *could* be more efficient and not wait for
* CURLM_CALL_MULTI_PERFORM to clear here and check it on re-entry
* but that gets messy */
while (CURLM_CALL_MULTI_PERFORM ==
(e = curl_multi_perform(multi_handle, &file->u.curl.still_running)));
if (e != 0)
{
elog(ERROR, "internal error: curl_multi_perform failed (%d - %s)",
e, curl_easy_strerror(e));
}
}
/* elog(NOTICE, "- still_running %d, bot %d, top %d, want %d",
file->u.curl.still_running, curl->in.bot, curl->in.top, want);
*/
}
return 0;
}
static int
set_httpheader(URL_FILE *fcurl, const char *name, const char *value)
{
char tmp[1024];
if (strlen(name) + strlen(value) + 5 > sizeof(tmp))
{
elog(ERROR, "set_httpheader name/value is too long. name = %s, value=%s", name, value);
}
sprintf(tmp, "%s: %s", name, value);
fcurl->u.curl.x_httpheader = curl_slist_append(fcurl->u.curl.x_httpheader, tmp);
return 0;
}
static int
make_export(char *name, const char *value, char *buf)
{
char *p = buf;
int ch;
if (buf)
strcpy(p, name);
p += strlen(name);
if (buf)
strcpy(p, "='");
p += 2;
for ( ; 0 != (ch = *value); value++)
{
if (ch == '\'' || ch == '\\')
{
if (buf)
*p = '\\';
p++;
}
if (buf)
*p = ch;
p++;
}
if (buf)
strcpy(p, "' && export ");
p += strlen("' && export ");
if (buf)
strcpy(p, name);
p += strlen(name);
if (buf)
strcpy(p, " && ");
p += 4;
return p - buf;
}
static int
make_command(const char *cmd, extvar_t * ev, char *buf)
{
int sz = 0;
sz += make_export("GP_MASTER_HOST", ev->GP_MASTER_HOST, buf ? buf + sz : 0);
sz += make_export("GP_MASTER_PORT", ev->GP_MASTER_PORT, buf ? buf + sz : 0);
sz += make_export("GP_SEG_PG_CONF", ev->GP_SEG_PG_CONF, buf ? buf + sz : 0);
sz += make_export("GP_SEG_DATADIR", ev->GP_SEG_DATADIR, buf ? buf + sz : 0);
sz += make_export("GP_DATABASE", ev->GP_DATABASE, buf ? buf + sz : 0);
sz += make_export("GP_USER", ev->GP_USER, buf ? buf + sz : 0);
sz += make_export("GP_DATE", ev->GP_DATE, buf ? buf + sz : 0);
sz += make_export("GP_TIME", ev->GP_TIME, buf ? buf + sz : 0);
sz += make_export("GP_XID", ev->GP_XID, buf ? buf + sz : 0);
sz += make_export("GP_CID", ev->GP_CID, buf ? buf + sz : 0);
sz += make_export("GP_SN", ev->GP_SN, buf ? buf + sz : 0);
sz += make_export("GP_SEGMENT_ID", ev->GP_SEGMENT_ID, buf ? buf + sz : 0);
sz += make_export("GP_SEG_PORT", ev->GP_SEG_PORT, buf ? buf + sz : 0);
sz += make_export("GP_SESSION_ID", ev->GP_SESSION_ID, buf ? buf + sz : 0);
sz += make_export("GP_SEGMENT_COUNT", ev->GP_SEGMENT_COUNT, buf ? buf + sz : 0);
if (buf)
strcpy(buf + sz, cmd);
sz += strlen(cmd);
return sz + 1; /* add NUL terminator */
}
static char *
local_strstr(const char *str1, const char *str2)
{
char *cp = (char *) str1;
char *s1, *s2;
if ( !*str2 )
return((char *)str1);
while (*cp)
{
s1 = cp;
s2 = (char *) str2;
while (*s1 && (*s1==*s2))
s1++, s2++;
if (!*s2)
return(cp);
cp++;
}
return(NULL);
}
/*
* This function purpose is to make sure that the URL string contains a numerical IP address.
* The input URL is in the parameter url. The output result URL is in the output parameter - buf.
* When parameter - url already contains a numerical ip, then output parameter - buf will be a copy of url.
* For this case calling getDnsAddress method inside make_url, will serve the purpose of IP validation.
* But when parameter - url will contain a domain name, then the domain name substring will be changed to a numerical
* ip address in the buf output parameter.
*/
static int
make_url(const char *url, char *buf, bool is_ipv6)
{
char *authority_start = local_strstr(url, "//");
char *authority_end;
char *hostname_start;
char *hostname_end;
char hostname[HOST_NAME_SIZE];
char *hostip = NULL;
char portstr[9];
int len;
char *p;
int port = 80; /* default for http */
bool domain_resolved_to_ipv6 = false;
if (! authority_start)
{
elog(ERROR, "illegal url '%s'", url);
}
authority_start += 2;
authority_end = strchr(authority_start, '/');
if (! authority_end)
authority_end = authority_start + strlen(authority_start);
hostname_start = strchr(authority_start, '@');
if (! (hostname_start && hostname_start < authority_end))
hostname_start = authority_start;
if ( is_ipv6 ) /* IPV6 */
{
int len;
hostname_end = strchr(hostname_start, ']');
hostname_end += 1;
/* port number exists in this url. get it */
len = authority_end - hostname_end;
if (len > 8)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("<port> substring size must not exceed %d characters", 8)));
memcpy(portstr, hostname_end + 1, len);
portstr[len] = 0;
port = atoi(portstr);
/* skippping the brackets */
hostname_end -=1;
hostname_start +=1;
}
else
{
hostname_end = strchr(hostname_start, ':');
if (! (hostname_end && hostname_end < authority_end))
{
hostname_end = authority_end;
}
else
{
/* port number exists in this url. get it */
int len = authority_end - hostname_end;
if (len > 8)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("<port> substring size must not exceed %d characters", 8)));
memcpy(portstr, hostname_end + 1, len);
portstr[len] = 0;
port = atoi(portstr);
}
}
if ( !port )
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("<port> substring must contain only digits")));
if (hostname_end - hostname_start >= sizeof(hostname))
{
elog(ERROR, "hostname too long for url '%s'", url);
}
memcpy(hostname, hostname_start, hostname_end - hostname_start);
hostname[hostname_end - hostname_start] = 0;
hostip = getDnsAddress(hostname, port, ERROR);
/*
* test for the case where the URL originaly contained a domain name (so is_ipv6 was set to false)
* but the DNS resolution in getDnsAddress returned an IPv6 address so know we also have to put the
* square brackets [..] in the URL string.
*/
if ( strchr(hostip, ':') != NULL && !is_ipv6 ) /* hit the case where a domain name returned an IPv6 address */
domain_resolved_to_ipv6 = true;
if (!buf)
{
int len = strlen(url) + 1 - strlen(hostname) + strlen(hostip);
if ( domain_resolved_to_ipv6 )
len += 2; /* for the square brackets */
return len;
}
p = buf;
len = hostname_start - url;
strncpy(p, url, len);
p += len;
url += len;
len = strlen(hostname);
url += len;
len = strlen(hostip);
if ( domain_resolved_to_ipv6 )
{
*p = '[';
p++;
}
strncpy(p, hostip, len);
p += len;
if ( domain_resolved_to_ipv6 )
{
*p = ']';
p++;
}
strcpy(p, url);
return p - buf;
}
/*
* extract_http_domain
*
* extracts the domain string from a http url
*/
void extract_http_domain(char* i_path, char* o_domain, int dlen)
{
int domsz, cpsz;
char* p_st = (char*)local_strstr(i_path, "//");
p_st = p_st + 2;
char* p_en = strchr(p_st, '/');
domsz = p_en - p_st;
cpsz = ( domsz < dlen ) ? domsz : dlen;
memcpy(o_domain, p_st, cpsz);
}
/**
* execute_fopen()
*
* refactor the fopen code for execute into this routine
*/
URL_FILE *
url_execute_fopen(char* url, char *cmd, bool forwrite, extvar_t *ev)
{
URL_FILE* file;
int save_errno;
struct itimers savetimers;
pqsigfunc save_SIGPIPE;
int sz;
char* shexec;
if (! (file = (URL_FILE *)malloc(sizeof(URL_FILE) + strlen(url) + 1)))
{
elog(ERROR, "out of memory");
}
memset(file, 0, sizeof(URL_FILE));
file->url = ((char *) file) + sizeof(URL_FILE);
strcpy(file->url, url);
/* Execute command */
sz = make_command(cmd, ev, 0);
shexec = palloc(sz);
make_command(cmd, ev, shexec);
file->type = CFTYPE_EXEC; /* marked as a EXEC */
file->u.exec.shexec = shexec;
/* Clear process interval timers */
resetTimers(&savetimers);
/*
* Preserve the SIGPIPE handler and set to default handling. This
* allows "normal" SIGPIPE handling in the command pipeline. Normal
* for PG is to *ignore* SIGPIPE.
*/
save_SIGPIPE = pqsignal(SIGPIPE, SIG_DFL);
/* execute the user command */
file->u.exec.pid = popen_with_stderr(file->u.exec.pipes, shexec, forwrite);
save_errno = errno;
/* Restore the SIGPIPE handler */
pqsignal(SIGPIPE, save_SIGPIPE);
/* Restore process interval timers */
restoreTimers(&savetimers);
if (file->u.exec.pid == -1)
{
errno = save_errno;
free(file);
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cannot start external table command: %m"),
errdetail("Command: %s", cmd),
errOmitLocation(true)));
}
return file;
}
static bool
url_has_ipv6_format (char *url)
{
bool is6 = false;
char *ipv6 = local_strstr(url, "://[");
if ( ipv6 )
ipv6 = strchr(ipv6, ']');
if ( ipv6 )
is6 = true;
return is6;
}
/*
* url_fopen
*
* checks for URLs or types in the 'url' and basically use the real fopen() for
* standard files, or if the url happens to be a command to execute it uses
* popen to execute it.
*
* Note that we need to clean up (with url_fclose) upon an error in this function.
* This is the only function that should use url_fclose() directly. Run time errors
* (e.g in url_fread) will invoke a cleanup via the ABORT handler (AtAbort_ExtTable),
* which will internally call url_fclose().
*/
URL_FILE *
url_fopen(char *url,
bool forwrite,
extvar_t *ev,
CopyState pstate,
List *scanquals,
int *response_code,
const char **response_string)
{
/* an EXECUTE string will always be prefixed like this */
const char* exec_prefix = "execute:";
URL_FILE* file;
int e;
int ip_mode;
/*
* if 'url' starts with "execute:" then it's a command to execute and
* not a url (the command specified in CREATE EXTERNAL TABLE .. EXECUTE)
*/
if (pg_strncasecmp(url, exec_prefix, strlen(exec_prefix)) == 0)
{
char* cmd;
/* Execute command */
cmd = url + strlen(exec_prefix);
return url_execute_fopen(url, cmd, forwrite, ev);
}
if (!(file = (URL_FILE *)malloc(sizeof(URL_FILE) + strlen(url) + 1)))
{
elog(ERROR, "out of memory");
}
memset(file, 0, sizeof(URL_FILE));
file->url = ((char *) file) + sizeof(URL_FILE);
strcpy(file->url, url);
if (IS_FILE_URI(url))
{
char* path = strchr(url + strlen(PROTOCOL_FILE), '/');
struct fstream_options fo;
Insist(!forwrite);
memset(&fo, 0, sizeof fo);
if (!path)
{
free(file);
elog(ERROR, "External Table error opening file: '%s', invalid "
"file path", url);
}
file->type = CFTYPE_FILE; /* marked as local FILE */
fo.is_csv = pstate->csv_mode;
fo.quote = pstate->quote ? *pstate->quote : 0;
fo.escape = pstate->escape ? *pstate->escape : 0;
fo.header = pstate->header_line;
fo.bufsize = 32 * 1024;
pstate->header_line = 0;
/*
* Open the file stream. This includes opening the first file to be read
* and finding and preparing any other files to be opened later (if a
* wildcard or directory is used). In addition we check ahead of time
* that all the involved files exists and have proper read permissions.
*/
file->u.file.fp = fstream_open(path, &fo, response_code, response_string);
/* couldn't open local file. return NULL to display proper error */
if (!file->u.file.fp)
{
free(file);
return NULL;
}
}
else if (IS_HTTP_URI(url) || IS_GPFDIST_URI(url))
{
int sz;
bool is_ipv6 = url_has_ipv6_format(file->url);
sz = make_url(file->url, 0, is_ipv6);
file->type = CFTYPE_CURL; /* marked as URL */
if (sz < 0)
{
const char* url_cpy = pstrdup(file->url);
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "illegal URL: %s", url_cpy);
}
file->u.curl.curl_url = palloc(sz);
memset(file->u.curl.curl_url, 0, sz);
file->u.curl.for_write = forwrite;
make_url(file->url, file->u.curl.curl_url, is_ipv6);
/*
* We need to call is_url_ipv6 for the case where inside make_url function
* a domain name was transformed to an IPv6 address.
*/
if ( !is_ipv6 )
is_ipv6 = url_has_ipv6_format( file->u.curl.curl_url);
if (IS_GPFDIST_URI(file->u.curl.curl_url))
{
/* replace gpfdist:// with http:// */
file->u.curl.curl_url += 3;
memcpy(file->u.curl.curl_url, "http", 4);
pstate->header_line = 0;
}
/* initialize a curl session and get a libcurl handle for it */
if (! (file->u.curl.handle = curl_easy_init()))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_init failed");
}
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_URL, file->u.curl.curl_url)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_URL error (%d - %s)",
e, curl_easy_strerror(e));
}
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_VERBOSE, FALSE)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_VERBOSE error (%d - %s)",
e, curl_easy_strerror(e));
}
/* set callback for each header received from server */
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_HEADERFUNCTION, header_callback)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_HEADERFUNCTION error (%d - %s)",
e, curl_easy_strerror(e));
}
/* 'file' is the application variable that gets passed to header_callback */
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_WRITEHEADER, file)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_WRITEHEADER error (%d - %s)",
e, curl_easy_strerror(e));
}
/* set callback for each data block arriving from server to be written to application */
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_WRITEFUNCTION, write_callback)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_WRITEFUNCTION error (%d - %s)",
e, curl_easy_strerror(e));
}
/* 'file' is the application variable that gets passed to write_callback */
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_WRITEDATA, file)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_WRITEDATA error (%d - %s)",
e, curl_easy_strerror(e));
}
if ( !is_ipv6 )
ip_mode = CURL_IPRESOLVE_V4;
else
ip_mode = CURL_IPRESOLVE_V6;
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_IPRESOLVE, ip_mode)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_IPRESOLVE error (%d - %s)",
e, curl_easy_strerror(e));
}
/*
* set up a linked list of http headers. start with common headers
* needed for read and write operations, and continue below with
* more specifics
*/
file->u.curl.x_httpheader = NULL;
/*
* support multihomed http use cases. see MPP-11874
*/
if (IS_HTTP_URI(url))
{
char domain[HOST_NAME_SIZE] = {0};
extract_http_domain(file->url, domain, HOST_NAME_SIZE);
set_httpheader(file, "Host", domain);
}
if (set_httpheader(file, "X-GP-XID", ev->GP_XID) ||
set_httpheader(file, "X-GP-CID", ev->GP_CID) ||
set_httpheader(file, "X-GP-SN", ev->GP_SN) ||
set_httpheader(file, "X-GP-SEGMENT-ID", ev->GP_SEGMENT_ID) ||
set_httpheader(file, "X-GP-SEGMENT-COUNT", ev->GP_SEGMENT_COUNT))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: some header value is too long");
}
if(forwrite)
{
/* write specific headers */
if(set_httpheader(file, "X-GP-PROTO", "0") ||
set_httpheader(file, "Content-Type", "text/xml"))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: some header value is too long");
}
}
else
{
/* read specific - (TODO: unclear why some of these are needed) */
if(set_httpheader(file, "X-GP-PROTO", "1") ||
set_httpheader(file, "X-GP-MASTER_HOST", ev->GP_MASTER_HOST) ||
set_httpheader(file, "X-GP-MASTER_PORT", ev->GP_MASTER_PORT) ||
set_httpheader(file, "X-GP-CSVOPT", ev->GP_CSVOPT) ||
set_httpheader(file, "X-GP_SEG_PG_CONF", ev->GP_SEG_PG_CONF) ||
set_httpheader(file, "X-GP_SEG_DATADIR", ev->GP_SEG_DATADIR) ||
set_httpheader(file, "X-GP-DATABASE", ev->GP_DATABASE) ||
set_httpheader(file, "X-GP-USER", ev->GP_USER) ||
set_httpheader(file, "X-GP-SEG-PORT", ev->GP_SEG_PORT) ||
set_httpheader(file, "X-GP-SESSION-ID", ev->GP_SESSION_ID))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: some header value is too long");
}
}
{
/*
* MPP-13031
* copy #transform fragment, if present, into X-GP-TRANSFORM header
*/
char* p = local_strstr(file->url, "#transform=");
if (p && p[11])
{
if (set_httpheader(file, "X-GP-TRANSFORM", p+11))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: X-GP-TRANSFORM header value is too long");
}
}
}
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_HTTPHEADER, file->u.curl.x_httpheader)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_HTTPHEADER error (%d - %s)",
e, curl_easy_strerror(e));
}
if (!multi_handle)
{
if (! (multi_handle = curl_multi_init()))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_multi_init failed");
}
}
/*
* lets check our connection.
* start the fetch if we're SELECTing (GET request), or write an
* empty message if we're INSERTing (POST request)
*/
if (!forwrite)
{
if (CURLE_OK != (e = curl_multi_add_handle(multi_handle, file->u.curl.handle)))
{
if (CURLM_CALL_MULTI_PERFORM != e)
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_multi_add_handle failed (%d - %s)",
e, curl_easy_strerror(e));
}
}
while (CURLM_CALL_MULTI_PERFORM ==
(e = curl_multi_perform(multi_handle, &file->u.curl.still_running)));
if (e != CURLE_OK)
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_multi_perform failed (%d - %s)",
e, curl_easy_strerror(e));
}
/* read some bytes to make sure the connection is established */
fill_buffer(file, 1);
}
else
{
/* use empty message */
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_POSTFIELDS, "")))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_POSTFIELDS error (%d - %s)",
e, curl_easy_strerror(e));
}
/* post away! */
if (CURLE_OK != (e = curl_easy_perform(file->u.curl.handle)))
{
url_fclose(file, false, pstate->cur_relname);
elog(ERROR, "error: %s",
curl_easy_strerror(e));
}
}
/* check the connection */
if (check_response(file, response_code, response_string))
{
url_fclose(file, false, pstate->cur_relname);
return NULL;
}
}
else
{
/* we're using a custom protocol */
MemoryContext oldcontext;
ExtPtcFuncType ftype;
Oid procOid;
char *prot_name = pstrdup(file->url);
int url_len = strlen(file->url);
int i = 0;
file->type = CFTYPE_CUSTOM;
ftype = (forwrite ? EXTPTC_FUNC_WRITER : EXTPTC_FUNC_READER);
/* extract protocol name from url string */
while (file->url[i] != ':' && i < url_len - 1)
i++;
prot_name[i] = '\0';
procOid = LookupExtProtocolFunction(prot_name, ftype, true);
/*
* Create a memory context to store all custom UDF private
* memory. We do this in order to allow resource cleanup in
* cases of query abort. We use TopTransactionContext as a
* parent context so that it lives longer than Portal context.
* Note that we always Delete our new context, in normal execution
* and in abort (see url_fclose()).
*/
file->u.custom.protcxt = AllocSetContextCreate(TopTransactionContext,
"CustomProtocolMemCxt",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(file->u.custom.protcxt);
file->u.custom.protocol_udf = palloc(sizeof(FmgrInfo));
file->u.custom.extprotocol = (ExtProtocolData *) palloc (sizeof(ExtProtocolData));
/* we found our function. set it in custom file handler */
fmgr_info(procOid, file->u.custom.protocol_udf);
/* have the custom file handler point at the qual list */
file->u.custom.scanquals = scanquals;
MemoryContextSwitchTo(oldcontext);
file->u.custom.extprotocol->prot_user_ctx = NULL;
file->u.custom.extprotocol->prot_last_call = false;
file->u.custom.extprotocol->prot_url = NULL;
file->u.custom.extprotocol->prot_databuf = NULL;
file->u.custom.extprotocol->prot_scanquals = NIL;
pfree(prot_name);
}
return file;
}
/*
* url_fclose: Disposes of resources associated with this external web table.
*
* If failOnError is true, errors encountered while closing the resource results
* in raising an ERROR. This is particularly true for "execute:" resources where
* command termination is not reflected until close is called. If failOnClose is
* false, close errors are just logged. failOnClose should be false when closure
* is due to LIMIT clause satisfaction.
*
* relname is passed in for being available in data messages only.
*/
int
url_fclose(URL_FILE *file, bool failOnError, const char *relname)
{
enum fcurl_type_e type = file->type;
int ret = 0;/* default is good return */
int errlen = 512;
char err[512];
char* url = pstrdup(file->url);
switch (type)
{
case CFTYPE_FILE:
fstream_close(file->u.file.fp);
/* fstream_close() returns no error indication. */
break;
case CFTYPE_EXEC:
/* close the child process and related pipes */
ret = pclose_with_stderr(file->u.exec.pid, file->u.exec.pipes, err, errlen);
if (ret == 0)
{
/* pclose() ended successfully; no errors to reflect */
;
}
else if (ret == -1)
{
/* pclose()/wait4() ended with an error; errno should be valid */
if (failOnError) free(file);
ereport( (failOnError ? ERROR : LOG),
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cannot close external table %s command: %m",
(relname ? relname : "")),
errdetail("command: %s", url),
errOmitLocation(true)));
}
else
{
/*
* pclose() returned the process termination state. The interpretExitCode() function
* generates a descriptive message from the exit code.
*/
char buf[512];
/* change linefeeds to comma for better error display */
/* added to black list of ARD 'to-revert' items */
int i = 0;
int len = strlen(err);
for(i = 0; i < len - 1; i++)
{
if (err[i] == '\n' || err[i] == '\r')
err[i] = ',';
}
if (failOnError) free(file);
ereport( (failOnError ? ERROR : LOG),
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("external table %s command ended with %s",
(relname ? relname : ""),
interpretError(ret, buf, sizeof(buf)/sizeof(char), err, strlen(err))),
errdetail("Command: %s", url),
errOmitLocation(true)));
}
break;
case CFTYPE_CURL:
/*
* if WET, send a final "I'm done" request from this segment.
*/
if(file->u.curl.for_write)
gp_proto0_write_done(file);
if (file->u.curl.x_httpheader)
{
curl_slist_free_all(file->u.curl.x_httpheader);
file->u.curl.x_httpheader = NULL;
}
/* make sure the easy handle is not in the multi handle anymore */
if (file->u.curl.handle)
{
curl_multi_remove_handle(multi_handle, file->u.curl.handle);
/* cleanup */
curl_easy_cleanup(file->u.curl.handle);
file->u.curl.handle = NULL;
}
/* free any allocated buffer space */
if (file->u.curl.in.ptr)
{
free(file->u.curl.in.ptr);
file->u.curl.in.ptr = NULL;
}
if (file->u.curl.out.ptr)
{
Assert(file->u.curl.for_write);
pfree(file->u.curl.out.ptr);
file->u.curl.out.ptr = NULL;
}
file->u.curl.gp_proto = 0;
file->u.curl.error = file->u.curl.eof = 0;
memset(&file->u.curl.in, 0, sizeof(file->u.curl.in));
memset(&file->u.curl.block, 0, sizeof(file->u.curl.block));
break;
case CFTYPE_CUSTOM:
/* last call. let the user close custom resources */
if(file->u.custom.protocol_udf)
(void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL, NULL);
/* now clean up everything not cleaned by user */
MemoryContextDelete(file->u.custom.protcxt);
break;
default: /* unknown or supported type - oh dear */
free(file);
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg_internal("external table type not implemented: %d", type)));
break;
}
free(file);
pfree(url);
return ret;
}
bool
url_feof(URL_FILE *file, int bytesread)
{
bool ret = false;
switch (file->type)
{
case CFTYPE_FILE:
ret = (fstream_eof(file->u.file.fp) != 0);
break;
case CFTYPE_EXEC:
ret = (bytesread == 0);
break;
case CFTYPE_CURL:
ret = (file->u.curl.eof != 0);
break;
case CFTYPE_CUSTOM:
ret = (bytesread == 0);
break;
default: /* unknown or supported type - oh dear */
ret = true;
errno = EBADF;
break;
}
return ret;
}
bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen)
{
bool ret = 0;
int nread = 0;
switch (file->type)
{
case CFTYPE_FILE:
ret = fstream_get_error(file->u.file.fp) != 0;
break;
case CFTYPE_EXEC:
ret = (bytesread == -1);
if(ret == true && ebuflen > 0 && ebuf != NULL)
{
nread = piperead(file->u.exec.pipes[EXEC_ERR_P], ebuf, ebuflen);
if(nread != -1)
ebuf[nread] = 0;
else
strncpy(ebuf,"error string unavailable due to read error",ebuflen-1);
}
break;
case CFTYPE_CURL:
ret = (file->u.curl.error != 0);
break;
case CFTYPE_CUSTOM:
ret = (bytesread == -1);
break;
default: /* unknown or supported type - oh dear */
ret = true;
errno = EBADF;
break;
}
return ret;
}
/*
* gp_proto0_read
*
* get data from the server and handle it according to PROTO 0. In PROTO 0 we
* expect the content of the file without any kind of meta info. Simple.
*/
static size_t gp_proto0_read(char *buf, int bufsz, URL_FILE* file)
{
int n = 0;
curlctl_t* curl = &file->u.curl;
fill_buffer(file, bufsz);
/* check if there's data in the buffer - if not fill_buffer()
* either errored or EOF. For proto0, we cannot distinguish
* between error and EOF. */
n = curl->in.top - curl->in.bot;
if (n == 0 && !curl->still_running)
curl->eof = 1;
if (n > bufsz)
n = bufsz;
/* xfer data to caller */
memcpy(buf, curl->in.ptr, n);
curl->in.bot += n;
return n;
}
/*
* gp_proto1_read
*
* get data from the server and handle it according to PROTO 1. In this protocol
* each data block is tagged by meta info like this:
* byte 0: type (can be 'F'ilename, 'O'ffset, 'D'ata, 'E'rror, 'L'inenumber)
* byte 1-4: length. # bytes of following data block. in network-order.
* byte 5-X: the block itself.
*/
static size_t gp_proto1_read(char *buf, int bufsz, URL_FILE *file, CopyState pstate, char *buf2)
{
char type;
int n, len;
curlctl_t* curl = &file->u.curl;
/*
* Loop through and get all types of messages, until we get actual data,
* or until there's no more data. Then quit the loop to process it and
* return it.
*/
while (curl->block.datalen == 0 && !curl->eof)
{
/* need 5 bytes, 1 byte type + 4 bytes length */
fill_buffer(file, 5);
n = curl->in.top - curl->in.bot;
if (n == 0)
{
elog(ERROR, "gpfdist error: server closed connection.\n");
return -1;
}
if (n < 5)
{
elog(ERROR, "gpfdist error: incomplete packet - packet len %d\n", n);
return -1;
}
/* read type */
type = curl->in.ptr[curl->in.bot++];
/* read len */
memcpy(&len, &curl->in.ptr[curl->in.bot], 4);
len = ntohl(len); /* change order */
curl->in.bot += 4;
if (len < 0)
{
elog(ERROR, "gpfdist error: bad packet type %d len %d",
type, len);
return -1;
}
/* elog(NOTICE, "HEADER %c %d, bot %d, top %d", type, len,
curl->in.bot, curl->in.top);
*/
/* Error */
if (type == 'E')
{
fill_buffer(file, len);
n = curl->in.top - curl->in.bot;
if (n > len)
n = len;
if (n > 0)
{
/*
* cheat a little. swap last char and
* NUL-terminator. then print string (without last
* char) and print last char artificially
*/
char x = curl->in.ptr[curl->in.bot + n - 1];
curl->in.ptr[curl->in.bot + n - 1] = 0;
elog(ERROR, "gpfdist error - %s%c", &curl->in.ptr[curl->in.bot], x);
return -1;
}
elog(ERROR, "gpfdist error: please check gpfdist log messages.");
return -1;
}
/* Filename */
if (type == 'F')
{
if (buf != buf2)
{
curl->in.bot -= 5;
return 0;
}
if (len > 256)
{
elog(ERROR, "gpfdist error: filename too long (%d)", len);
return -1;
}
if (-1 == fill_buffer(file, len))
{
elog(ERROR, "gpfdist error: stream ends suddenly");
return -1;
}
/*
* If SREH is used we now update it with the actual file that the
* gpfdist server is reading. This is because SREH (or the client
* in general) doesn't know which file gpfdist is reading, since
* the original URL may include a wildcard or a directory listing.
*/
if (pstate->cdbsreh)
{
char fname[257];
memcpy(fname, curl->in.ptr + curl->in.bot, len);
fname[len] = 0;
snprintf(pstate->cdbsreh->filename, sizeof pstate->cdbsreh->filename,"%s [%s]", pstate->filename, fname);
}
curl->in.bot += len;
Assert(curl->in.bot <= curl->in.top);
continue;
}
/* Offset */
if (type == 'O')
{
if (len != 8)
{
elog(ERROR, "gpfdist error: offset not of length 8 (%d)", len);
return -1;
}
if (-1 == fill_buffer(file, len))
{
elog(ERROR, "gpfdist error: stream ends suddenly");
return -1;
}
curl->in.bot += 8;
Assert(curl->in.bot <= curl->in.top);
continue;
}
/* Line number */
if (type == 'L')
{
int64 line_number;
if (len != 8)
{
elog(ERROR, "gpfdist error: line number not of length 8 (%d)", len);
return -1;
}
if (-1 == fill_buffer(file, len))
{
elog(ERROR, "gpfdist error: stream ends suddenly");
return -1;
}
/*
* update the line number of the first line we're about to get from
* gpfdist. pstate will update the following lines when processing
* the data
*/
memcpy(&line_number, curl->in.ptr + curl->in.bot, len);
line_number = local_ntohll(line_number);
pstate->cur_lineno = line_number ? line_number - 1 : INT64_MIN;
curl->in.bot += 8;
Assert(curl->in.bot <= curl->in.top);
continue;
}
/* Data */
if (type == 'D')
{
curl->block.datalen = len;
curl->eof = (len == 0);
// elog(NOTICE, "D %d", curl->block.datalen);
break;
}
elog(ERROR, "gpfdist error: unknown meta type %d", type);
return -1;
}
/* read data block */
if (bufsz > curl->block.datalen)
bufsz = curl->block.datalen;
fill_buffer(file, bufsz);
n = curl->in.top - curl->in.bot;
/* if gpfdist closed connection prematurely or died catch it here */
if (n == 0 && !curl->eof)
{
curl->error = 1;
if(!curl->still_running)
elog(ERROR, "gpfdist server closed connection.\n");
}
if (n > bufsz)
n = bufsz;
memcpy(buf, curl->in.ptr + curl->in.bot, n);
curl->in.bot += n;
curl->block.datalen -= n;
// elog(NOTICE, "returning %d bytes, %d bytes left \n", n, curl->block.datalen);
return n;
}
/*
* gp_proto0_write
*
* use curl to write data to a the remote gpfdist server. We use
* a push model with a POST request.
*
*/
static void gp_proto0_write(URL_FILE *file, CopyState pstate)
{
curlctl_t* curl = &file->u.curl;
char* buf = curl->out.ptr;
int nbytes = curl->out.top;
int e;
int response_code;
const char* response_string;
if (nbytes == 0)
return;
/* post binary data */
if (CURLE_OK != (e = curl_easy_setopt(curl->handle, CURLOPT_POSTFIELDS, buf)))
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_POSTFIELDS error (%d - %s)",
e, curl_easy_strerror(e));
/* set the size of the postfields data */
if (CURLE_OK != (e = curl_easy_setopt(curl->handle, CURLOPT_POSTFIELDSIZE, nbytes)))
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_POSTFIELDSIZE error (%d - %s)",
e, curl_easy_strerror(e));
/* post away! */
if (CURLE_OK != (e = curl_easy_perform(curl->handle)))
elog(ERROR, "%s error (%d - %s)",
file->u.curl.curl_url,
e, curl_easy_strerror(e));
/* check the response from server */
if (check_response(file, &response_code, &response_string))
elog(ERROR, "error while writing data to gpfdist on %s (code %d, msg %s)",
file->u.curl.curl_url, response_code, response_string);
}
/*
* Send an empty POST request, with an added X-GP-DONE header.
*/
static void gp_proto0_write_done(URL_FILE *file)
{
int e;
set_httpheader(file, "X-GP-DONE", "1");
/* use empty message */
if (CURLE_OK != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_POSTFIELDS, "")))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_POSTFIELDS %s error (%d - %s)",
file->u.curl.curl_url,
e, curl_easy_strerror(e));
}
/* post away! */
if (CURLE_OK != (e = curl_easy_perform(file->u.curl.handle)))
{
elog(ERROR, "%s error: %s",
file->u.curl.curl_url,
curl_easy_strerror(e));
}
}
static size_t curl_fread(char *buf, int bufsz, URL_FILE* file, CopyState pstate)
{
curlctl_t* curl = &file->u.curl;
char* p = buf;
char* q = buf + bufsz;
int n;
const int gp_proto = curl->gp_proto;
if (gp_proto != 0 && gp_proto != 1)
{
elog(ERROR, "unknown gp protocol %d", curl->gp_proto);
return 0;
}
for (; p < q; p += n)
{
if (gp_proto == 0)
n = gp_proto0_read(p, q - p, file);
else
n = gp_proto1_read(p, q - p, file, pstate, buf);
//elog(NOTICE, "curl_fread %d bytes", n);
if (n <= 0)
break;
}
return p - buf;
}
static size_t curl_fwrite(char *buf, int nbytes, URL_FILE* file, CopyState pstate)
{
curlctl_t* curl = &file->u.curl;
if (curl->gp_proto != 0 && curl->gp_proto != 1)
{
elog(ERROR, "unknown gp protocol %d", curl->gp_proto);
return 0;
}
/*
* allocate data buffer if not done already
*/
if(!curl->out.ptr)
{
const int bufsize = 64 * 1024 * sizeof(char);
MemoryContext oldcontext = CurrentMemoryContext;
MemoryContextSwitchTo(CurTransactionContext); /* TODO: is there a better cxt to use? */
curl->out.ptr = (char *)palloc(bufsize);
curl->out.max = bufsize;
curl->out.bot = curl->out.top = 0;
MemoryContextSwitchTo(oldcontext);
}
/*
* if buffer is full (current item can't fit) - write it out to
* the server. if item still doesn't fit after we emptied the
* buffer, make more room.
*/
if (curl->out.top + nbytes >= curl->out.max)
{
/* item doesn't fit */
if (curl->out.top > 0)
{
/* write out existing data, empty the buffer */
gp_proto0_write(file, pstate);
curl->out.top = 0;
}
/* does it still not fit? enlarge buffer */
if (curl->out.top + nbytes >= curl->out.max)
{
int n = nbytes + 1024;
char* newbuf;
MemoryContext oldcontext = CurrentMemoryContext;
MemoryContextSwitchTo(CurTransactionContext); /* TODO: is there a better cxt to use? */
newbuf = repalloc(curl->out.ptr, n);
MemoryContextSwitchTo(oldcontext);
if (!newbuf)
elog(ERROR, "out of memory (curl_fwrite)");
curl->out.ptr = newbuf;
curl->out.max = n;
Assert(nbytes < curl->out.max);
}
}
/* copy buffer into curl->buf */
memcpy(curl->out.ptr + curl->out.top, buf, nbytes);
curl->out.top += nbytes;
return nbytes;
}
size_t
url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc, List **splits)
{
size_t want;
int n;
switch (file->type)
{
case CFTYPE_FILE:
{
struct fstream_filename_and_offset fo;
const int whole_rows = 0; /* get as much data as possible */
assert(size == 1);
/* FIXME: GP FUSION: ??? */
n = fstream_read(file->u.file.fp, ptr, nmemb, &fo, whole_rows, 0, 0);
want = n > 0 ? n : 0;
if (n > 0 && fo.line_number)
{
pstate->cur_lineno = fo.line_number - 1;
if (pstate->cdbsreh)
snprintf(pstate->cdbsreh->filename,
sizeof pstate->cdbsreh->filename,
"%s [%s]", pstate->filename, fo.fname);
}
break;
}
case CFTYPE_EXEC:
want = piperead(file->u.exec.pipes[EXEC_DATA_P], ptr, nmemb * size);
break;
case CFTYPE_CURL:
/* get data (up to nmemb * size) from the http/gpfdist server */
n = curl_fread(ptr, nmemb * size, file, pstate);
/* number of items - nb correct op - checked with glibc code*/
want = n / size;
/*printf("(fread) return %d bytes %d left\n", want,file->u.curl.buffer_pos);*/
break;
case CFTYPE_CUSTOM:
want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc, splits);
break;
default: /* unknown or supported type */
want = 0;
errno = EBADF;
break;
}
return want;
}
size_t
url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate)
{
size_t want = 0, n = 0;
switch (file->type)
{
case CFTYPE_FILE:
elog(ERROR, "CFTYPE_FILE not yet supported in url.c");
break;
case CFTYPE_EXEC:
want = pipewrite(file->u.exec.pipes[EXEC_DATA_P], ptr, nmemb * size);
break;
case CFTYPE_CURL:
/* write data to the gpfdist server via curl */
n = curl_fwrite(ptr, nmemb * size, file, pstate);
want = n / size;
break;
case CFTYPE_CUSTOM:
want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL, NULL);
break;
default: /* unknown or unsupported type */
want = 0;
errno = EBADF;
break;
}
return want;
}
/*
* flush all remaining buffered data waiting to be written out to external source
*/
void
url_fflush(URL_FILE *file, CopyState pstate)
{
switch (file->type)
{
case CFTYPE_FILE:
{
elog(ERROR, "CFTYPE_FILE not yet supported in url.c");
break;
}
case CFTYPE_EXEC:
case CFTYPE_CUSTOM:
/* data isn't buffered on app level. no op */
break;
case CFTYPE_CURL:
gp_proto0_write(file, pstate);
break;
default: /* unknown or unsupported type */
break;
}
}
void
url_rewind(URL_FILE *file, const char *relname)
{
char *url = file->url;
switch(file->type)
{
case CFTYPE_FILE:
fstream_rewind(file->u.file.fp);
break;
case CFTYPE_EXEC:
/* we'll need to execute the command again */
assert(0); /* There is no way the following code is right. */
url_fclose(file, true, relname);
/* most of these are null fo us. */
url_fopen(url, false, NULL, NULL, NIL, NULL, 0);
break;
case CFTYPE_CURL:
/* halt transaction */
curl_multi_remove_handle(multi_handle, file->u.curl.handle);
/* restart */
curl_multi_add_handle(multi_handle, file->u.curl.handle);
/* ditch buffer - write will recreate - resets stream pos*/
if (file->u.curl.in.ptr)
free(file->u.curl.in.ptr);
file->u.curl.gp_proto = 0;
file->u.curl.error = file->u.curl.eof = 0;
memset(&file->u.curl.in, 0, sizeof(file->u.curl.in));
memset(&file->u.curl.block, 0, sizeof(file->u.curl.block));
break;
case CFTYPE_CUSTOM:
elog(ERROR, "rewind support not yet implemented in custom protocol");
break;
default: /* unknown or supported type - oh dear */
break;
}
}
/*
* interpretError - formats a brief message and/or the exit code from pclose()
* (or wait4()).
*/
static char *
interpretError(int rc, char *buf, size_t buflen, char *err, size_t errlen)
{
if (WIFEXITED(rc))
{
int exitCode = WEXITSTATUS(rc);
if (exitCode >= 128)
{
/*
* If the exit code has the 128-bit set, the exit code represents
* a shell exited by signal where the signal number is exitCode - 128.
*/
exitCode -= 128;
snprintf(buf, buflen, "SHELL TERMINATED by signal %s (%d)", getSignalNameFromCode(exitCode), exitCode);
}
else if (exitCode == 0)
{
snprintf(buf, buflen, "EXITED; rc=%d", exitCode);
}
else
{
/* Exit codes from commands rarely map to strerror() strings. In here
* we show the error string returned from pclose, and omit the non
* friendly exit code interpretation */
snprintf(buf, buflen, "error. %s", err);
}
}
else if (WIFSIGNALED(rc))
{
int signalCode = WTERMSIG(rc);
snprintf(buf, buflen, "TERMINATED by signal %s (%d)", getSignalNameFromCode(signalCode), signalCode);
}
#ifndef WIN32
else if (WIFSTOPPED(rc))
{
int signalCode = WSTOPSIG(rc);
snprintf(buf, buflen, "STOPPED by signal %s (%d)", getSignalNameFromCode(signalCode), signalCode);
}
#endif
else
{
snprintf(buf, buflen, "UNRECOGNIZED termination; rc=%#x", rc);
}
return buf;
}
struct signalDef {
const int signalCode;
const char *signalName;
};
/*
* Table mapping signal numbers to signal identifiers (names).
*/
struct signalDef signals[] = {
#ifdef SIGHUP
{ SIGHUP, "SIGHUP" },
#endif
#ifdef SIGINT
{ SIGINT, "SIGINT" },
#endif
#ifdef SIGQUIT
{ SIGQUIT, "SIGQUIT" },
#endif
#ifdef SIGILL
{ SIGILL, "SIGILL" },
#endif
#ifdef SIGTRAP
{ SIGTRAP, "SIGTRAP" },
#endif
#ifdef SIGABRT
{ SIGABRT, "SIGABRT" },
#endif
#ifdef SIGEMT
{ SIGEMT, "SIGEMT" },
#endif
#ifdef SIGFPE
{ SIGFPE, "SIGFPE" },
#endif
#ifdef SIGKILL
{ SIGKILL, "SIGKILL" },
#endif
#ifdef SIGBUS
{ SIGBUS, "SIGBUS" },
#endif
#ifdef SIGSEGV
{ SIGSEGV, "SIGSEGV" },
#endif
#ifdef SIGSYS
{ SIGSYS, "SIGSYS" },
#endif
#ifdef SIGPIPE
{ SIGPIPE, "SIGPIPE" },
#endif
#ifdef SIGALRM
{ SIGALRM, "SIGALRM" },
#endif
#ifdef SIGTERM
{ SIGTERM, "SIGTERM" },
#endif
#ifdef SIGURG
{ SIGURG, "SIGURG" },
#endif
#ifdef SIGSTOP
{ SIGSTOP, "SIGSTOP" },
#endif
#ifdef SIGTSTP
{ SIGTSTP, "SIGTSTP" },
#endif
#ifdef SIGCONT
{ SIGCONT, "SIGCONT" },
#endif
#ifdef SIGCHLD
{ SIGCHLD, "SIGCHLD" },
#endif
#ifdef SIGTTIN
{ SIGTTIN, "SIGTTIN" },
#endif
#ifdef SIGTTOU
{ SIGTTOU, "SIGTTOU" },
#endif
#ifdef SIGIO
{ SIGIO, "SIGIO" },
#endif
#ifdef SIGXCPU
{ SIGXCPU, "SIGXCPU" },
#endif
#ifdef SIGXFSZ
{ SIGXFSZ, "SIGXFSZ" },
#endif
#ifdef SIGVTALRM
{ SIGVTALRM, "SIGVTALRM" },
#endif
#ifdef SIGPROF
{ SIGPROF, "SIGPROF" },
#endif
#ifdef SIGWINCH
{ SIGWINCH, "SIGWINCH" },
#endif
#ifdef SIGINFO
{ SIGINFO, "SIGINFO" },
#endif
#ifdef SIGUSR1
{ SIGUSR1, "SIGUSR1" },
#endif
#ifdef SIGUSR2
{ SIGUSR2, "SIGUSR2" },
#endif
{ -1, "" }
};
/*
* getSignalNameFromCode - gets the signal name given the signal number.
*/
static const char *
getSignalNameFromCode(int signo)
{
int i;
for (i = 0; signals[i].signalCode != -1; i++)
{
if (signals[i].signalCode == signo)
return signals[i].signalName;
}
return "UNRECOGNIZED";
}
/*
* popen_with_stderr
*
* standard popen doesn't redirect stderr from the child process.
* we need stderr in order to display the error that child process
* encountered and show it to the user. This is, therefore, a wrapper
* around a set of file descriptor redirections and a fork.
*
* if 'forwrite' is set then we set the data pipe write side on the
* parent. otherwise, we set the read side on the parent.
*/
static int popen_with_stderr(int *pipes, const char *exe, bool forwrite)
{
int data[2]; /* pipe to send data child <--> parent */
int err[2]; /* pipe to send errors child --> parent */
int pid = -1;
const int READ = 0;
const int WRITE = 1;
if (pgpipe(data) < 0)
return -1;
if (pgpipe(err) < 0)
{
close(data[READ]);
close(data[WRITE]);
return -1;
}
#ifndef WIN32
pid = fork();
if (pid > 0) /* parent */
{
if (forwrite)
{
/* parent writes to child */
close(data[READ]);
pipes[EXEC_DATA_P] = data[WRITE];
}
else
{
/* parent reads from child */
close(data[WRITE]);
pipes[EXEC_DATA_P] = data[READ];
}
close(err[WRITE]);
pipes[EXEC_ERR_P] = err[READ];
return pid;
}
else if (pid == 0) /* child */
{
/*
* set up the data pipe
*/
if (forwrite)
{
close(data[WRITE]);
close(fileno(stdin));
/* assign pipes to parent to stdin */
if (dup2(data[READ], fileno(stdin)) < 0)
{
perror("dup2 error");
exit(EXIT_FAILURE);
}
/* no longer needed after the duplication */
close(data[READ]);
}
else
{
close(data[READ]);
close(fileno(stdout));
/* assign pipes to parent to stdout */
if (dup2(data[WRITE], fileno(stdout)) < 0)
{
perror("dup2 error");
exit(EXIT_FAILURE);
}
/* no longer needed after the duplication */
close(data[WRITE]);
}
/*
* now set up the error pipe
*/
close(err[READ]);
close(fileno(stderr));
if (dup2(err[WRITE], fileno(stderr)) < 0)
{
if(forwrite)
close(data[WRITE]);
else
close(data[READ]);
perror("dup2 error");
exit(EXIT_FAILURE);
}
close(err[WRITE]);
/* go ahead and execute the user command */
execl("/bin/sh", "sh", "-c", exe, NULL);
/* if we're here an error occurred */
exit(EXIT_FAILURE);
}
else
{
if(forwrite)
{
close(data[WRITE]);
close(data[READ]);
}
else
{
close(data[READ]);
close(data[WRITE]);
}
close(err[READ]);
close(err[WRITE]);
return -1;
}
#endif
return pid;
}
/*
* pclose_with_stderr
*
* close our data and error pipes and return the child process
* termination status. if child terminated with error, 'buf' will
* point to the error string retrieved from child's stderr.
*/
static int pclose_with_stderr(int pid, int *pipes, char *buf, int len)
{
int status, nread;
/* close the data pipe. we can now read from error pipe without being blocked */
close(pipes[EXEC_DATA_P]);
nread = piperead(pipes[EXEC_ERR_P], buf, len); /* read from stderr */
if(nread != -1)
buf[nread] = 0;
else
strcpy(buf,"error string unavailable due to read error");
close(pipes[EXEC_ERR_P]);
#ifndef WIN32
waitpid(pid, &status, 0);
#else
status = -1;
#endif
return status;
}
static int32
InvokeExtProtocol(void *ptr,
size_t nbytes,
URL_FILE *file,
CopyState pstate,
bool last_call,
ExternalSelectDesc desc,
List **psplits)
{
FunctionCallInfoData fcinfo;
ExtProtocolData* extprotocol = file->u.custom.extprotocol;
FmgrInfo* extprotocol_udf = file->u.custom.protocol_udf;
Datum d;
MemoryContext oldcontext;
/* must have been created during url_fopen() */
Assert(extprotocol);
extprotocol->type = T_ExtProtocolData;
extprotocol->prot_url = file->url;
extprotocol->prot_relation = (last_call ? NULL : pstate->rel);
extprotocol->prot_databuf = (last_call ? NULL : (char *)ptr);
extprotocol->prot_maxbytes = nbytes;
extprotocol->prot_scanquals = file->u.custom.scanquals;
extprotocol->prot_last_call = last_call;
extprotocol->desc = desc;
extprotocol->splits = NULL;
if (psplits != NULL && *psplits != NULL) {
/*
* We move to read splits from arg to this context structure, so that
* means we passed split data only the first time this is called.
*/
while( list_length(*psplits)>0 )
{
FileSplit split = (FileSplit)lfirst(list_head(*psplits));
elog(LOG, "split %s:" INT64_FORMAT ", " INT64_FORMAT,
split->ext_file_uri_string,
split->offsets, split->lengths);
extprotocol->splits = lappend(extprotocol->splits, split);
*psplits = list_delete_first(*psplits);
}
}
InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
/* FmgrInfo */ extprotocol_udf,
/* nArgs */ 0,
/* Call Context */ (Node *) extprotocol,
/* ResultSetInfo */ NULL);
/* invoke the protocol within a designated memory context */
oldcontext = MemoryContextSwitchTo(file->u.custom.protcxt);
d = FunctionCallInvoke(&fcinfo);
MemoryContextSwitchTo(oldcontext);
/* We do not expect a null result */
if (fcinfo.isnull)
elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
return DatumGetInt32(d);
}