blob: 6b0eb9bff45af02548ca64db30e2a645eb0bfb3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "postgres.h"
#include <stdarg.h>
#include <sys/stat.h>
#include "access/fileam.h"
#include "access/heapam.h"
#include "access/valid.h"
#include "commands/dbcommands.h"
#include "catalog/namespace.h"
#include "cdb/cdbutil.h"
#include "libpq/libpq-be.h"
#include "pgstat.h"
#include "miscadmin.h"
#include "utils/relcache.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/uri.h"
#include "mb/pg_wchar.h"
#include "commands/copy.h"
#include "utils/guc.h"
#include "utils/builtins.h"
#include "nodes/makefuncs.h"
#include "cdb/cdbvars.h"
#include "postmaster/postmaster.h" /* postmaster port*/
#include <arpa/inet.h>
#include <fstream/gfile.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <curl/curl.h>
#include <fstream/fstream.h>
#include "cdb/url.h"
#ifndef FALSE
#define FALSE 0
#endif
#ifndef TRUE
#define TRUE 1
#endif
/* we use a global one for convenience */
CURLM *multi_handle = 0;
/* curl calls this routine to return HTTP header from server */
static size_t
header_callback(void *ptr_, size_t size, size_t nmemb, void *userp)
{
URL_FILE *url = (URL_FILE *)userp;
char *ptr = ptr_;
int len = size * nmemb;
int i;
char buf[20];
if (len > 10 && *ptr == 'X' && 0 == strncmp("X-GP-PROTO", ptr, 10))
{
ptr += 10, len -= 10;
while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
ptr++, len--;
if (len > 0 && *ptr == ':')
{
ptr++, len--;
while (len > 0 && (*ptr == ' ' || *ptr == '\t'))
ptr++, len--;
for (i = 0; i < sizeof(buf) - 1 && i < len; i++)
buf[i] = ptr[i];
buf[i] = 0;
url->u.curl.gp_proto = strtol(buf, 0, 0);
// elog(NOTICE, "X-GP-PROTO: %s (%d)", buf, url->u.curl.gp_proto);
}
}
return size*nmemb;
}
/* curl calls this routine to get more data */
static size_t
write_callback(char *buffer,
size_t size,
size_t nitems,
void *userp)
{
URL_FILE *file = (URL_FILE *)userp;
curlctl_t *curl = &file->u.curl;
const int nbytes = size * nitems;
int n;
// elog(NOTICE, "write_callback %d", nbytes);
/* if insufficient space in buffer ... */
if (curl->buf.top + nbytes >= curl->buf.max)
{
/* compact ? */
if (curl->buf.bot)
{
n = curl->buf.top - curl->buf.bot;
memmove(curl->buf.ptr, curl->buf.ptr + curl->buf.bot, n);
curl->buf.bot = 0;
curl->buf.top = n;
}
/* if still insufficient space in buffer, then do realloc */
if (curl->buf.top + nbytes >= curl->buf.max)
{
char *newbuf;
n = curl->buf.top - curl->buf.bot + nbytes + 1024;
newbuf = realloc(curl->buf.ptr, n);
if (!newbuf)
{
elog(ERROR, "out of memory");
return -1;
}
curl->buf.ptr = newbuf;
curl->buf.max = n;
// elog(NOTICE, "max now at %d", n);
Assert(curl->buf.top + nbytes < curl->buf.max);
}
}
/* enough space. copy buffer into curl->buf */
memcpy(curl->buf.ptr + curl->buf.top, buffer, nbytes);
curl->buf.top += nbytes;
return nbytes;
}
static int
check_response(URL_FILE *file, int *rc, const char **response_string)
{
long response_code;
char *effective_url;
int e;
static char buffer[30];
CURL* curl = file->u.curl.handle;
if (0 != (e = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code)))
{
*rc = 500;
*response_string = "curl_easy_getinfo failed";
return -1;
}
*rc = response_code;
snprintf(buffer, sizeof buffer, "Response Code=%d", (int)response_code);
*response_string = buffer;
if (0 != (e = curl_easy_getinfo(curl, CURLINFO_EFFECTIVE_URL, &effective_url)))
return -1;
if (! (200 <= response_code && response_code < 300))
{
if (response_code == 0)
elog(ERROR, "connection failed while %s", file->url);
else
{
/* we need to sleep 1 sec to avoid this condition:
1- seg X gets an error message from gpfdist
2- I gets a 500 error
3- I report error before seg X, and error message
in seg X is thrown away.
*/
sleep(1);
elog(ERROR, "http response code %ld while %s", response_code, file->url);
}
return -1;
}
return 0;
}
/* use to attempt to fill the read buffer up to requested number of bytes */
/* return 0 if successful, -1 otherwise */
static int
fill_buffer(URL_FILE *file,int want)
{
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
int maxfd;
struct timeval timeout;
int nfds, e;
curlctl_t* curl = &file->u.curl;
/* elog(NOTICE, "= still_running %d, bot %d, top %d, want %d",
file->u.curl.still_running, curl->buf.bot, curl->buf.top, want);
*/
/* attempt to fill buffer */
while (curl->still_running && curl->buf.top - curl->buf.bot < want)
{
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
/* set a suitable timeout to fail on */
timeout.tv_sec = 60; /* 1 minute */
timeout.tv_usec = 0;
/* get file descriptors from the transfers */
if (0 != (e = curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd)))
{
elog(ERROR, "internal error: curl_multi_fdset failed (%d - %s)",
e, curl_easy_strerror(e));
return -1;
}
if (maxfd <= 0)
{
curl->still_running = 0;
break;
}
if (-1 == (nfds = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout)))
{
if (errno == EINTR || errno == EAGAIN)
continue;
elog(ERROR, "internal error: select failed on curl_multi_fdset (maxfd %d) (%d - %s)",
maxfd, errno, strerror(errno));
return -1;
}
if (nfds > 0)
{
/* timeout or readable/writable sockets */
/* note we *could* be more efficient and not wait for
* CURLM_CALL_MULTI_PERFORM to clear here and check it on re-entry
* but that gets messy */
while (CURLM_CALL_MULTI_PERFORM ==
(e = curl_multi_perform(multi_handle, &file->u.curl.still_running)));
if (e != 0)
{
elog(ERROR, "internal error: curl_multi_perform failed (%d - %s)",
e, curl_easy_strerror(e));
return -1;
}
}
/* elog(NOTICE, "- still_running %d, bot %d, top %d, want %d",
file->u.curl.still_running, curl->buf.bot, curl->buf.top, want);
*/
}
return 0;
}
static int
set_httpheader(URL_FILE *fcurl, const char *name, const char *value)
{
char tmp[1024];
if (strlen(name) + strlen(value) + 5 > sizeof(tmp))
{
elog(ERROR, "set_httpheader name/value is too long. name = %s, value=%s", name, value);
return -1;
}
sprintf(tmp, "%s: %s", name, value);
fcurl->u.curl.x_httpheader = curl_slist_append(fcurl->u.curl.x_httpheader, tmp);
return 0;
}
static int
make_export(char *name, const char *value, char *buf)
{
char *p = buf;
int ch;
if (buf)
strcpy(p, name);
p += strlen(name);
if (buf)
strcpy(p, "='");
p += 2;
for ( ; 0 != (ch = *value); value++)
{
if (ch == '\'' || ch == '\\')
{
if (buf)
*p = '\\';
p++;
}
if (buf)
*p = ch;
p++;
}
if (buf)
strcpy(p, "' && export ");
p += strlen("' && export ");
if (buf)
strcpy(p, name);
p += strlen(name);
if (buf)
strcpy(p, " && ");
p += 4;
return p - buf;
}
static int
make_command(const char *cmd, extvar_t * ev, char *buf)
{
int sz = 0;
sz += make_export("GP_MASTER_HOST", ev->GP_MASTER_HOST, buf ? buf + sz : 0);
sz += make_export("GP_MASTER_PORT", ev->GP_MASTER_PORT, buf ? buf + sz : 0);
sz += make_export("GP_SEG_PG_CONF", ev->GP_SEG_PG_CONF, buf ? buf + sz : 0);
sz += make_export("GP_SEG_DATADIR", ev->GP_SEG_DATADIR, buf ? buf + sz : 0);
sz += make_export("GP_DATABASE", ev->GP_DATABASE, buf ? buf + sz : 0);
sz += make_export("GP_USER", ev->GP_USER, buf ? buf + sz : 0);
sz += make_export("GP_DATE", ev->GP_DATE, buf ? buf + sz : 0);
sz += make_export("GP_TIME", ev->GP_TIME, buf ? buf + sz : 0);
sz += make_export("GP_XID", ev->GP_XID, buf ? buf + sz : 0);
sz += make_export("GP_CID", ev->GP_CID, buf ? buf + sz : 0);
sz += make_export("GP_SN", ev->GP_SN, buf ? buf + sz : 0);
sz += make_export("GP_SEGMENT_ID", ev->GP_SEGMENT_ID, buf ? buf + sz : 0);
sz += make_export("GP_SEG_PORT", ev->GP_SEG_PORT, buf ? buf + sz : 0);
sz += make_export("GP_SESSION_ID", ev->GP_SESSION_ID, buf ? buf + sz : 0);
sz += make_export("GP_SEGMENT_COUNT", ev->GP_SEGMENT_COUNT, buf ? buf + sz : 0);
if (buf)
strcpy(buf + sz, cmd);
sz += strlen(cmd);
return sz + 1; /* add NUL terminator */
}
static char *
local_strstr(const char *str1, const char *str2)
{
char *cp = (char *) str1;
char *s1, *s2;
if ( !*str2 )
return((char *)str1);
while (*cp)
{
s1 = cp;
s2 = (char *) str2;
while (*s1 && (*s1==*s2))
s1++, s2++;
if (!*s2)
return(cp);
cp++;
}
return(NULL);
}
static int
make_url(const char *url, extvar_t *ev, char *buf)
{
char *authority_start = local_strstr(url, "//");
char *authority_end;
char *hostname_start;
char *hostname_end;
char hostname[100];
char hostip[20];
struct hostent * he;
int len;
char *p;
if (! authority_start)
{
elog(ERROR, "illegal url '%s'", url);
return -1;
}
authority_start += 2;
authority_end = strchr(authority_start, '/');
if (! authority_end)
authority_end = authority_start + strlen(authority_start);
hostname_start = strchr(authority_start, '@');
if (! (hostname_start && hostname_start < authority_end))
hostname_start = authority_start;
hostname_end = strchr(hostname_start, ':');
if (! (hostname_end && hostname_end < authority_end))
hostname_end = authority_end;
if (hostname_end - hostname_start >= sizeof(hostname))
{
elog(ERROR, "hostname too long for url '%s'", url);
return -1;
}
memcpy(hostname, hostname_start, hostname_end - hostname_start);
hostname[hostname_end - hostname_start] = 0;
he = gethostbyname(hostname);
if (he)
{
strcpy(hostip, inet_ntoa(*(struct in_addr*)he->h_addr));
/* elog(NOTICE, "IP: %s", hostip); */
}
else
{
#if 0
char line[1024];
FILE* fp;
int a, b, c, d;
snprintf(line, sizeof(line),
"python -c 'from socket import *; print gethostbyname(\"%s\") ' 2> /tmp/tt",
hostname);
line[sizeof(line)-1] = 0;
if (! (fp = popen(line, "r"))) {
line[0] = 0;
}
else {
if (0 == fgets(line, sizeof(line), fp))
line[0] = 0;
pclose(fp);
line[sizeof(line)-1] = 0;
}
if (4 != sscanf(line, "%d.%d.%d.%d", &a, &b, &c, &d)) {
elog(ERROR, "unable to resolve host '%s' in url '%s'", hostname, url);
return -1;
}
sprintf(hostip, "%d.%d.%d.%d", a, b, c, d);
/* elog(NOTICE, "IP: %s", hostip); */
#else
elog(ERROR, "unable to resolve host '%s' in url '%s'", hostname, url);
return -1;
#endif
}
if (!buf)
return strlen(url) + 1 - strlen(hostname) + strlen(hostip);
p = buf;
len = hostname_start - url;
strncpy(p, url, len); p += len; url += len;
len = strlen(hostname);
url += len;
len = strlen(hostip);
strncpy(p, hostip, len); p += len;
strcpy(p, url);
return p - buf;
}
URL_FILE *
url_fopen(char *url, const char *operation, extvar_t *ev, CopyState pstate, int *response_code, const char **response_string)
{
/* this code checks for URLs or types in the 'url' and
* basicly use the real fopen() for standard files, or
* if the url happens to be a command to execute it uses
* popen to execute it.
*/
/* an EXECUTE string will always be prefixed like this */
const char *exec_prefix = "execute:";
/*char sessionIDBuf[80];*/
URL_FILE *file;
int e;
(void)operation;
if (! (file = (URL_FILE *)malloc(sizeof(URL_FILE) + strlen(url) + 1)))
{
elog(ERROR, "out of memory");
*response_code = 500;
*response_string = "Out of memory";
return NULL;
}
memset(file, 0, sizeof(URL_FILE));
file->url = ((char *) file) + sizeof(URL_FILE);
strcpy(file->url, url);
/*
* if 'url' starts with "execute:" then it's a command to execute and
* not a url (the command specified in CREATE EXTERNAL TABLE .. EXECUTE)
*/
if (pg_strncasecmp(url, exec_prefix, strlen(exec_prefix)) == 0)
{
char *cmd = url + strlen(exec_prefix);
int sz = make_command(cmd, ev, 0);
char *shexec = palloc(sz);
make_command(cmd, ev, shexec);
file->type = CFTYPE_EXEC; /* marked as a EXEC */
file->u.exec.fp = popen(shexec, "r");
file->u.exec.shexec = shexec;
if (file->u.exec.fp == NULL)
elog( ERROR, "External Table error executing cmd: '%s'", cmd );
}
else if (IS_FILE_URI(url))
{
char *path = strchr(url + strlen(PROTOCOL_FILE), '/');
struct fstream_options fo;
memset(&fo,0,sizeof fo);
if (!path)
{
elog(ERROR, "External Table error opening file: '%s'", url);
*response_code = 400;
*response_string = "Bad Request";
return NULL;
}
file->type = CFTYPE_FILE; /* marked as local FILE */
fo.is_csv = pstate->csv_mode;
fo.quote = pstate->quote?*pstate->quote:0;
fo.escape = pstate->escape?*pstate->escape:0;
fo.header = pstate->header_line;
pstate->header_line = 0;
file->u.file.fp = fstream_open(path,&fo,response_code,response_string);
/* couldn't open local file. return NULL to display proper error */
if (!file->u.file.fp)
{
free(file);
return NULL;
}
}
else if (IS_HTTP_URI(url) || IS_GPFDIST_URI(url) || IS_GPFDISTS_URI(url))
{
int sz = make_url(file->url, ev, 0);
file->type = CFTYPE_CURL; /* marked as URL */
if (sz < 0)
{
elog(ERROR, "illegal URL: %s", file->url);
url_fclose(file);
*response_code = 400;
*response_string = "Bad Request";
return NULL;
}
file->u.curl.curl_url = palloc(sz);
make_url(file->url, ev, file->u.curl.curl_url);
if (IS_GPFDIST_URI(file->u.curl.curl_url) || IS_GPFDISTS_URI(file->u.curl.curl_url))
{
/* replace gpfdist:// with http:// */
file->u.curl.curl_url += 3;
memcpy(file->u.curl.curl_url, "http", 4);
pstate->header_line = 0;
}
if (! (file->u.curl.handle = curl_easy_init()))
{
elog(ERROR, "internal error: curl_easy_init failed");
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_init failed";
return NULL;
}
/* elog(NOTICE, "URL: %s", file->u.curl.curl_url); */
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_URL, file->u.curl.curl_url)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_URL %s error (%d - %s)",
file->u.curl.curl_url,
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_VERBOSE, FALSE)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_VERBOSE error (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
/* set callback for each header received from server */
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_HEADERFUNCTION, header_callback)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_HEADERFUNCTION error (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
/* file gets passed to header_callback */
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_WRITEHEADER, file)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_WRITEHEADER error (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
/* set callback for each data block received from server */
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_WRITEFUNCTION, write_callback)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_WRITEFUNCTION error (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
/* file gets passed to write_callback */
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_WRITEDATA, file)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_WRITEDATA error (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_IPRESOLVE, CURL_IPRESOLVE_V4)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_IPRESOLVE error (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
file->u.curl.x_httpheader = NULL;
if (set_httpheader(file, "X-GP-MASTER_HOST", ev->GP_MASTER_HOST) ||
set_httpheader(file, "X-GP-MASTER_PORT", ev->GP_MASTER_PORT) ||
set_httpheader(file, "X-GP-CSVOPT", ev->GP_CSVOPT) ||
set_httpheader(file, "X-GP_SEG_PG_CONF", ev->GP_SEG_PG_CONF) ||
set_httpheader(file, "X-GP_SEG_DATADIR", ev->GP_SEG_DATADIR) ||
set_httpheader(file, "X-GP-DATABASE", ev->GP_DATABASE) ||
set_httpheader(file, "X-GP-USER", ev->GP_USER) ||
set_httpheader(file, "X-GP-XID", ev->GP_XID) ||
set_httpheader(file, "X-GP-CID", ev->GP_CID) ||
set_httpheader(file, "X-GP-SN", ev->GP_SN) ||
set_httpheader(file, "X-GP-PROTO", "1") ||
set_httpheader(file, "X-GP-SEGMENT-ID", ev->GP_SEGMENT_ID) ||
set_httpheader(file, "X-GP-SEG-PORT", ev->GP_SEG_PORT) ||
set_httpheader(file, "X-GP-SESSION-ID", ev->GP_SESSION_ID) ||
set_httpheader(file, "X-GP-SEGMENT-COUNT", ev->GP_SEGMENT_COUNT))
{
elog(ERROR, "internal error: some variable value is too long");
url_fclose(file);
*response_code = 500;
*response_string = "Internal Server Error";
return NULL;
}
if (0 != (e = curl_easy_setopt(file->u.curl.handle, CURLOPT_HTTPHEADER, file->u.curl.x_httpheader)))
{
elog(ERROR, "internal error: curl_easy_setopt CURLOPT_WRITEDATA error (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_easy_setopt failed";
return NULL;
}
if (!multi_handle)
{
if (! (multi_handle = curl_multi_init()))
{
elog(ERROR, "internal error: curl_multi_init failed");
url_fclose(file);
*response_code = 500;
*response_string = "curl_multi_init failed";
return NULL;
}
}
if (0 != (e = curl_multi_add_handle(multi_handle, file->u.curl.handle)))
{
if (CURLM_CALL_MULTI_PERFORM != e)
{
elog(ERROR, "internal error: curl_multi_add_handle failed (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_multi_init_add_handle failed";
return NULL;
}
}
/* lets start the fetch */
while (CURLM_CALL_MULTI_PERFORM ==
(e = curl_multi_perform(multi_handle, &file->u.curl.still_running)));
if (e != 0)
{
elog(ERROR, "internal error: curl_multi_perform failed (%d - %s)",
e, curl_easy_strerror(e));
url_fclose(file);
*response_code = 500;
*response_string = "curl_multi_perform failed";
return NULL;
}
/* read some bytes to make sure the connection is established */
fill_buffer(file, 1);
/* check the connection */
if (check_response(file,response_code,response_string))
{
url_fclose(file);
return NULL;
}
}
return file;
}
int
url_fclose(URL_FILE *file)
{
int ret=0;/* default is good return */
switch (file->type)
{
case CFTYPE_FILE:
fstream_close(file->u.file.fp);
break;
case CFTYPE_EXEC:
ret = pclose(file->u.exec.fp);
break;
case CFTYPE_CURL:
if (file->u.curl.x_httpheader)
{
curl_slist_free_all(file->u.curl.x_httpheader);
file->u.curl.x_httpheader = NULL;
}
/* make sure the easy handle is not in the multi handle anymore */
if (file->u.curl.handle)
{
curl_multi_remove_handle(multi_handle, file->u.curl.handle);
/* cleanup */
curl_easy_cleanup(file->u.curl.handle);
file->u.curl.handle = NULL;
}
/* free any allocated buffer space */
if (file->u.curl.buf.ptr)
{
free(file->u.curl.buf.ptr);
file->u.curl.buf.ptr = NULL;
}
file->u.curl.gp_proto = 0;
file->u.curl.error = file->u.curl.eof = 0;
memset(&file->u.curl.buf, 0, sizeof(file->u.curl.buf));
memset(&file->u.curl.block, 0, sizeof(file->u.curl.block));
break;
default: /* unknown or supported type - oh dear */
ret=EOF;
errno=EBADF;
break;
}
free(file);
return ret;
}
int
url_feof(URL_FILE *file)
{
int ret = 0;
switch (file->type)
{
case CFTYPE_FILE:
ret = fstream_eof(file->u.file.fp);
break;
case CFTYPE_EXEC:
ret = feof(file->u.exec.fp);
break;
case CFTYPE_CURL:
ret = file->u.curl.eof;
break;
default: /* unknown or supported type - oh dear */
ret = -1;
errno = EBADF;
break;
}
return ret;
}
int url_ferror(URL_FILE *file)
{
int ret = 0;
switch (file->type)
{
case CFTYPE_FILE:
ret = fstream_get_error(file->u.file.fp)!=0;
break;
case CFTYPE_EXEC:
ret = ferror(file->u.exec.fp);
break;
case CFTYPE_CURL:
ret = file->u.curl.error ? -1 : 0;
break;
default: /* unknown or supported type - oh dear */
ret = -1;
errno = EBADF;
break;
}
return ret;
}
static size_t gp_proto0_read(char *buf, int bufsz, URL_FILE* file)
{
int n = 0;
curlctl_t* curl = &file->u.curl;
fill_buffer(file, bufsz);
/* check if there's data in the buffer - if not fill_buffer()
* either errored or EOF. For proto0, we cannot distinguish
* between error and EOF. */
n = curl->buf.top - curl->buf.bot;
if (n == 0 && !curl->still_running)
curl->eof = 1;
if (n > bufsz)
n = bufsz;
/* xfer data to caller */
memcpy(buf, curl->buf.ptr, n);
curl->buf.bot += n;
return n;
}
static size_t gp_proto1_read(char *buf, int bufsz, URL_FILE *file, CopyState pstate, char *buf2)
{
char type;
int n, len;
curlctl_t* curl = &file->u.curl;
while (curl->block.datalen == 0 && !curl->eof) {
/* need 5 bytes, 1 byte type + 4 bytes length */
fill_buffer(file, 5);
n = curl->buf.top - curl->buf.bot;
if (n == 0)
{
elog(ERROR, "gpfdist error: server closed connection.\n");
return -1;
}
if (n < 5)
{
elog(ERROR, "gpfdist error: incomplete packet - packet len %d\n", n);
return -1;
}
/* read type */
type = curl->buf.ptr[curl->buf.bot++];
/* read len */
memcpy(&len, &curl->buf.ptr[curl->buf.bot], 4);
len = ntohl(len); /* change order */
curl->buf.bot += 4;
if (len < 0)
{
elog(ERROR, "gpfdist error: bad packet type %d len %d",
type, len);
return -1;
}
/* elog(NOTICE, "HEADER %c %d, bot %d, top %d", type, len,
curl->buf.bot, curl->buf.top);
*/
if (type == 'E')
{
fill_buffer(file, len);
n = curl->buf.top - curl->buf.bot;
if (n > len)
n = len;
if (n > 0)
{
/*
* cheat a little. swap last char and
* NUL-terminator. then print string (without last
* char) and print last char artificially
*/
char x = curl->buf.ptr[curl->buf.bot + n - 1];
curl->buf.ptr[curl->buf.bot + n - 1] = 0;
elog(ERROR, "gpfdist error - %s%c", &curl->buf.ptr[curl->buf.bot], x);
return -1;
}
elog(ERROR, "gpfdist error: please check gpfdist log messages.");
return -1;
}
if (type == 'F')
{
if (buf != buf2)
{
curl->buf.bot -= 5;
return 0;
}
if (len > 256)
{
elog(ERROR, "gpfdist error: filename too long (%d)", len);
return -1;
}
if (-1 == fill_buffer(file, len))
{
elog(ERROR, "gpfdist error: stream ends suddenly");
return -1;
}
if (pstate->cdbsreh)
{
char fname[257];
memcpy(fname,curl->buf.ptr+curl->buf.bot,len);
fname[len] = 0;
snprintf(pstate->cdbsreh->filename,sizeof pstate->cdbsreh->filename,"%s [%s]",pstate->filename,fname);
}
curl->buf.bot += len;
Assert(curl->buf.bot <= curl->buf.top);
continue;
}
if (type == 'O')
{
if (len != 8)
{
elog(ERROR, "gpfdist error: offset not of length 8 (%d)", len);
return -1;
}
if (-1 == fill_buffer(file, len))
{
elog(ERROR, "gpfdist error: stream ends suddenly");
return -1;
}
curl->buf.bot += 8;
Assert(curl->buf.bot <= curl->buf.top);
continue;
}
if (type == 'L')
{
int64_t line_number;
if (len != 8)
{
elog(ERROR, "gpfdist error: line number not of length 8 (%d)", len);
return -1;
}
if (-1 == fill_buffer(file, len))
{
elog(ERROR, "gpfdist error: stream ends suddenly");
return -1;
}
memcpy(&line_number,curl->buf.ptr+curl->buf.bot,len);
line_number = local_ntohll(line_number);
pstate->cur_lineno = line_number?line_number-1:(int64_t)-1<<63;
curl->buf.bot += 8;
Assert(curl->buf.bot <= curl->buf.top);
continue;
}
if (type == 'D')
{
curl->block.datalen = len;
curl->eof = (len == 0);
// elog(NOTICE, "D %d", curl->block.datalen);
break;
}
elog(ERROR, "gpfdist error: unknown meta type %d", type);
return -1;
}
/* read data block */
if (bufsz > curl->block.datalen)
bufsz = curl->block.datalen;
n = curl->buf.top - curl->buf.bot;
// elog(NOTICE, "n = %d", n);
fill_buffer(file, bufsz);
n = curl->buf.top - curl->buf.bot;
if (n == 0 && !curl->eof)
{
//elog(NOTICE, "bufsz = %d, not eof -> error", bufsz);
curl->error = 1;
}
if (n > bufsz)
n = bufsz;
memcpy(buf, curl->buf.ptr + curl->buf.bot, n);
curl->buf.bot += n;
curl->block.datalen -= n;
// elog(NOTICE, "returning %d bytes, %d bytes left \n", n, curl->block.datalen);
return n;
}
static size_t curl_fread(char *buf, int bufsz, URL_FILE* file,CopyState pstate)
{
curlctl_t *curl = &file->u.curl;
char *p = buf;
char *q = buf + bufsz;
int n;
const int gp_proto = curl->gp_proto;
if (gp_proto != 0 && gp_proto != 1)
{
elog(ERROR, "unknown gp protocol %d", curl->gp_proto);
return 0;
}
for (; p < q; p += n)
{
if (gp_proto == 0)
n = gp_proto0_read(p, q - p, file);
else
n = gp_proto1_read(p, q - p, file,pstate,buf);
//elog(NOTICE, "curl_fread %d bytes", n);
if (n <= 0)
break;
}
return p - buf;
}
size_t
url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate)
{
size_t want;
int n;
switch (file->type)
{
case CFTYPE_FILE:
{
struct fstream_filename_and_offset fo;
assert(size==1);
n = fstream_read(file->u.file.fp,ptr,nmemb,&fo,0);
want = n > 0 ? n : 0;
if (n > 0 && fo.line_number)
{
pstate->cur_lineno = fo.line_number-1;
if (pstate->cdbsreh)
snprintf(pstate->cdbsreh->filename,sizeof pstate->cdbsreh->filename,"%s [%s]",pstate->filename,fo.fname);
}
break;
}
case CFTYPE_EXEC:
want = fread(ptr, size, nmemb, file->u.exec.fp);
break;
case CFTYPE_CURL:
n = curl_fread(ptr, nmemb * size, file,pstate);
/* number of items - nb correct op - checked
* with glibc code*/
want = n / size;
/*printf("(fread) return %d bytes %d left\n", want,file->u.curl.buffer_pos);*/
break;
default: /* unknown or supported type */
want = 0;
errno = EBADF;
break;
}
return want;
}
void
url_rewind(URL_FILE *file)
{
char *url = file->url;
switch(file->type)
{
case CFTYPE_FILE:
fstream_rewind(file->u.file.fp);
break;
case CFTYPE_EXEC:
/* we'll need to execute the command again */
assert(0); /* There is no way the following code is right. */
url_fclose(file);
/* most of these are null fo us. */
url_fopen(url, NULL, NULL,NULL,NULL,0);
break;
case CFTYPE_CURL:
/* halt transaction */
curl_multi_remove_handle(multi_handle, file->u.curl.handle);
/* restart */
curl_multi_add_handle(multi_handle, file->u.curl.handle);
/* ditch buffer - write will recreate - resets stream pos*/
if (file->u.curl.buf.ptr)
free(file->u.curl.buf.ptr);
file->u.curl.gp_proto = 0;
file->u.curl.error = file->u.curl.eof = 0;
memset(&file->u.curl.buf, 0, sizeof(file->u.curl.buf));
memset(&file->u.curl.block, 0, sizeof(file->u.curl.block));
break;
default: /* unknown or supported type - oh dear */
break;
}
}