blob: 9c2114886fa37900deb212a4bffd9e297cbfaf21 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*-------------------------------------------------------------------------
*
* copy.c
* Implements the COPY utility command
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.273 2006/10/06 17:13:58 petere Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/file.h>
#include "access/fileam.h"
#include "access/filesplit.h"
#include "access/heapam.h"
#include "access/aosegfiles.h"
#include "access/appendonlywriter.h"
#include "access/xact.h"
#include "access/plugstorage.h"
#include "catalog/gp_policy.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "catalog/catalog.h"
#include "catalog/pg_exttable.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbparquetam.h"
#include "cdb/cdbpartition.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbsharedstorageop.h"
#include "commands/copy.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "postmaster/identity.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/builtins.h"
#include "cdb/cdbvars.h"
#include "cdb/cdblink.h"
#include "cdb/cdbcopy.h"
#include "cdb/cdbhash.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbrelsize.h"
#include "cdb/cdbvarblock.h"
#include "cdb/cdbbufferedappend.h"
#include "commands/vacuum.h"
#include "utils/lsyscache.h"
#include "nodes/makefuncs.h"
#include "postmaster/autovacuum.h"
#include "cdb/dispatcher.h"
/*
* in dbsize.c
*/
extern int64 calculate_relation_size(Relation rel);
/* DestReceiver for COPY (SELECT) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyState cstate; /* CopyStateData for the command */
} DR_copy;
/* non-export function prototypes */
static void DoCopyTo(CopyState cstate);
extern void CopyToDispatch(CopyState cstate);
static void CopyTo(CopyState cstate);
extern void CopyFromDispatch(CopyState cstate, List *err_segnos);
static void CopyFrom(CopyState cstate);
static char *CopyReadOidAttr(CopyState cstate, bool *isnull);
static void CopyAttributeOutText(CopyState cstate, char *string);
static void CopyAttributeOutCSV(CopyState cstate, char *string,
bool use_quote, bool single_attr);
static bool DetectLineEnd(CopyState cstate, size_t bytesread __attribute__((unused)));
static void CopyReadAttributesTextNoDelim(CopyState cstate, bool *nulls,
int num_phys_attrs, int attnum);
/* Low-level communications functions */
static void SendCopyBegin(CopyState cstate);
static void ReceiveCopyBegin(CopyState cstate);
static void SendCopyEnd(CopyState cstate);
static void CopySendData(CopyState cstate, const void *databuf, int datasize);
static void CopySendString(CopyState cstate, const char *str);
static void CopySendChar(CopyState cstate, char c);
static int CopyGetData(CopyState cstate, void *databuf, int datasize);
/* byte scaning utils */
static char *scanTextLine(CopyState cstate, const char *s, char c, size_t len);
static char *scanCSVLine(CopyState cstate, const char *s, char c1, char c2, char c3, size_t len);
static void CopyExtractRowMetaData(CopyState cstate);
static void preProcessDataLine(CopyState cstate);
static void concatenateEol(CopyState cstate);
static char *escape_quotes(const char *src);
static void attr_get_key(CopyState cstate, CdbCopy *cdbCopy, int original_lineno_for_qe,
unsigned int target_seg, AttrNumber p_nattrs, AttrNumber *attrs,
Form_pg_attribute *attr_descs, int *attr_offsets, bool *attr_nulls,
FmgrInfo *in_functions, Oid *typioparams, Datum *values);
static void copy_in_error_callback(void *arg);
static void CopyInitPartitioningState(EState *estate);
static void CopyInitDataParser(CopyState cstate);
static bool CopyCheckIsLastLine(CopyState cstate);
static int calculate_virtual_segment_number(List* candidateRelations);
/* ==========================================================================
* The follwing macros aid in major refactoring of data processing code (in
* CopyFrom(+Dispatch)). We use macros because in some cases the code must be in
* line in order to work (for example elog_dismiss() in PG_CATCH) while in
* other cases we'd like to inline the code for performance reasons.
*
* NOTE that an almost identical set of macros exists in fileam.c. If you make
* changes here you may want to consider taking a look there as well.
* ==========================================================================
*/
#define RESET_LINEBUF \
cstate->line_buf.len = 0; \
cstate->line_buf.data[0] = '\0'; \
cstate->line_buf.cursor = 0;
#define RESET_ATTRBUF \
cstate->attribute_buf.len = 0; \
cstate->attribute_buf.data[0] = '\0'; \
cstate->attribute_buf.cursor = 0;
#define RESET_LINEBUF_WITH_LINENO \
line_buf_with_lineno.len = 0; \
line_buf_with_lineno.data[0] = '\0'; \
line_buf_with_lineno.cursor = 0;
/*
* A data error happened. This code block will always be inside a PG_CATCH()
* block right when a higher stack level produced an error. We handle the error
* by checking which error mode is set (SREH or all-or-nothing) and do the right
* thing accordingly. Note that we MUST have this code in a macro (as opposed
* to a function) as elog_dismiss() has to be inlined with PG_CATCH in order to
* access local error state variables.
*
* changing me? take a look at FILEAM_HANDLE_ERROR in fileam.c as well.
*/
#define COPY_HANDLE_ERROR \
if (cstate->errMode == ALL_OR_NOTHING) \
{ \
/* re-throw error and abort */ \
if (Gp_role == GP_ROLE_DISPATCH) \
cdbCopyEnd(cdbCopy); \
PG_RE_THROW(); \
} \
else \
{ \
/* SREH - release error state and handle error */ \
\
ErrorData *edata; \
MemoryContext oldcontext;\
bool rawdata_is_a_copy = false; \
cur_row_rejected = true; \
\
/* SREH must only handle data errors. all other errors must not be caught */\
if (ERRCODE_TO_CATEGORY(elog_geterrcode()) != ERRCODE_DATA_EXCEPTION)\
{\
/* re-throw error and abort */ \
if (Gp_role == GP_ROLE_DISPATCH) \
cdbCopyEnd(cdbCopy); \
PG_RE_THROW(); \
}\
\
/* save a copy of the error info */ \
oldcontext = MemoryContextSwitchTo(cstate->cdbsreh->badrowcontext);\
edata = CopyErrorData();\
MemoryContextSwitchTo(oldcontext);\
\
if (!elog_dismiss(DEBUG5)) \
PG_RE_THROW(); /* <-- hope to never get here! */ \
\
if (Gp_role == GP_ROLE_DISPATCH)\
{\
Insist(cstate->err_loc_type == ROWNUM_ORIGINAL);\
cstate->cdbsreh->rawdata = (char *) palloc(strlen(cstate->line_buf.data) * \
sizeof(char) + 1 + 24); \
\
rawdata_is_a_copy = true; \
sprintf(cstate->cdbsreh->rawdata, "%d%c%d%c%s", \
original_lineno_for_qe, \
COPY_METADATA_DELIM, \
cstate->line_buf_converted, \
COPY_METADATA_DELIM, \
cstate->line_buf.data); \
}\
else\
{\
/* truncate trailing eol chars if we need to store this row in errtbl */ \
if (cstate->cdbsreh->errtbl) \
truncateEol(&cstate->line_buf, cstate->eol_type); \
\
if (Gp_role == GP_ROLE_EXECUTE)\
{\
/* if line has embedded rownum, update the cursor to the pos right after */ \
Insist(cstate->err_loc_type == ROWNUM_EMBEDDED);\
cstate->line_buf.cursor = 0;\
if(!cstate->md_error) \
CopyExtractRowMetaData(cstate); \
}\
\
cstate->cdbsreh->rawdata = cstate->line_buf.data + cstate->line_buf.cursor; \
}\
\
cstate->cdbsreh->is_server_enc = cstate->line_buf_converted; \
cstate->cdbsreh->linenumber = cstate->cur_lineno; \
cstate->cdbsreh->processed = ++cstate->processed; \
cstate->cdbsreh->consec_csv_err = cstate->num_consec_csv_err; \
\
/* set the error message. Use original msg and add column name if available */ \
if (cstate->cur_attname)\
{\
cstate->cdbsreh->errmsg = (char *) palloc((strlen(edata->message) + \
strlen(cstate->cur_attname) + \
10 + 1) * sizeof(char)); \
sprintf(cstate->cdbsreh->errmsg, "%s, column %s", \
edata->message, \
cstate->cur_attname); \
}\
else\
{\
cstate->cdbsreh->errmsg = pstrdup(edata->message); \
}\
\
/* after all the prep work let cdbsreh do the real work */ \
HandleSingleRowError(cstate->cdbsreh); \
\
/* cleanup any extra memory copies we made */\
if (rawdata_is_a_copy) \
pfree(cstate->cdbsreh->rawdata); \
if (!IsRejectLimitReached(cstate->cdbsreh)) \
pfree(cstate->cdbsreh->errmsg); \
\
MemoryContextReset(cstate->cdbsreh->badrowcontext);\
\
}
/*
* if in SREH mode and data error occured it was already handled in
* COPY_HANDLE_ERROR. Therefore, skip to the next row before attempting
* to do any further processing on this one. There's a QE and QD versions
* since the QE doesn't have a linebuf_with_lineno stringInfo.
*/
#define QD_GOTO_NEXT_ROW \
RESET_LINEBUF_WITH_LINENO; \
RESET_LINEBUF; \
cur_row_rejected = false; /* reset for next run */ \
continue; /* move on to the next data line */
#define QE_GOTO_NEXT_ROW \
RESET_LINEBUF; \
cur_row_rejected = false; /* reset for next run */ \
cstate->cur_attname = NULL;\
continue; /* move on to the next data line */
#define IF_REJECT_LIMIT_REACHED_ABORT \
if (IsRejectLimitReached(cstate->cdbsreh)) \
{\
char *rejectmsg_normal = "Segment reject limit reached. Aborting operation. Last error was:";\
char *rejectmsg_csv_unparsable = "Input includes invalid CSV data that corrupts the ability to parse data rows. This usually means several unescaped embedded QUOTE characters. Data is not parsable.Last error was:";\
char *finalmsg;\
\
if (CSV_IS_UNPARSABLE(cstate->cdbsreh))\
{\
/* the special "csv un-parsable" case */\
finalmsg = (char *) palloc((strlen(cstate->cdbsreh->errmsg) + \
strlen(rejectmsg_csv_unparsable) + 12 + 1)\
* sizeof(char)); \
sprintf(finalmsg, "%s %s", rejectmsg_csv_unparsable, cstate->cdbsreh->errmsg);\
}\
else\
{\
/* the normal case */\
finalmsg = (char *) palloc((strlen(cstate->cdbsreh->errmsg) + \
strlen(rejectmsg_normal) + 12 + 1)\
* sizeof(char)); \
sprintf(finalmsg, "%s %s", rejectmsg_normal, cstate->cdbsreh->errmsg);\
}\
\
if (Gp_role == GP_ROLE_DISPATCH) \
cdbCopyEnd(cdbCopy); \
\
ereport(ERROR, \
(errcode(ERRCODE_T_R_GP_REJECT_LIMIT_REACHED), \
(errmsg("%s", finalmsg) \
),errOmitLocation(true))); \
}
/*
* Send copy start/stop messages for frontend copies. These have changed
* in past protocol redesigns.
*/
static void
SendCopyBegin(CopyState cstate)
{
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* new way */
StringInfoData buf;
int natts = list_length(cstate->attnumlist);
int16 format = 0;
int i;
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, format); /* overall format */
pq_sendint(&buf, natts, 2);
for (i = 0; i < natts; i++)
pq_sendint(&buf, format, 2); /* per-column formats */
pq_endmessage(&buf);
cstate->copy_dest = COPY_NEW_FE;
}
else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
{
/* old way */
pq_putemptymessage('H');
/* grottiness needed for old COPY OUT protocol */
pq_startcopyout();
cstate->copy_dest = COPY_OLD_FE;
}
else
{
/* very old way */
pq_putemptymessage('B');
/* grottiness needed for old COPY OUT protocol */
pq_startcopyout();
cstate->copy_dest = COPY_OLD_FE;
}
}
static void
ReceiveCopyBegin(CopyState cstate)
{
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
{
/* new way */
StringInfoData buf;
int natts = list_length(cstate->attnumlist);
int16 format = 0;
int i;
pq_beginmessage(&buf, 'G');
pq_sendbyte(&buf, format); /* overall format */
pq_sendint(&buf, natts, 2);
for (i = 0; i < natts; i++)
pq_sendint(&buf, format, 2); /* per-column formats */
pq_endmessage(&buf);
cstate->copy_dest = COPY_NEW_FE;
cstate->fe_msgbuf = makeStringInfo();
}
else if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 2)
{
/* old way */
pq_putemptymessage('G');
cstate->copy_dest = COPY_OLD_FE;
}
else
{
/* very old way */
pq_putemptymessage('D');
cstate->copy_dest = COPY_OLD_FE;
}
/* We *must* flush here to ensure FE knows it can send. */
pq_flush();
}
static void
SendCopyEnd(CopyState cstate)
{
if (cstate->copy_dest == COPY_NEW_FE)
{
/* Shouldn't have any unsent data */
Assert(cstate->fe_msgbuf->len == 0);
/* Send Copy Done message */
pq_putemptymessage('c');
}
else
{
CopySendData(cstate, "\\.", 2);
/* Need to flush out the trailer (this also appends a newline) */
CopySendEndOfRow(cstate);
pq_endcopyout(false);
}
}
/*----------
* CopySendData sends output data to the destination (file or frontend)
* CopySendString does the same for null-terminated strings
* CopySendChar does the same for single characters
* CopySendEndOfRow does the appropriate thing at end of each data row
* (data is not actually flushed except by CopySendEndOfRow)
*
* NB: no data conversion is applied by these functions
*----------
*/
static void
CopySendData(CopyState cstate, const void *databuf, int datasize)
{
if (!cstate->is_copy_in) /* copy out */
{
appendBinaryStringInfo(cstate->fe_msgbuf, (char *) databuf, datasize);
}
else /* hack for: copy in */
{
/* we call copySendData in copy-in to handle results
* of default functions that we wish to send from the
* dispatcher to the executor primary and mirror segments.
* we do so by concatenating the results to line buffer.
*/
appendBinaryStringInfo(&cstate->line_buf, (char *) databuf, datasize);
}
}
static void
CopySendString(CopyState cstate, const char *str)
{
CopySendData(cstate, (void *) str, strlen(str));
}
static void
CopySendChar(CopyState cstate, char c)
{
CopySendData(cstate, &c, 1);
}
/* AXG: Note that this will both add a newline AND flush the data.
* For the dispatcher COPY TO we don't want to use this method since
* our newlines already exist. We use another new method similar to
* this one to flush the data
*/
void
CopySendEndOfRow(CopyState cstate)
{
StringInfo fe_msgbuf = cstate->fe_msgbuf;
switch (cstate->copy_dest)
{
case COPY_FILE:
/* Default line termination depends on platform */
#ifndef WIN32
CopySendChar(cstate, '\n');
#else
CopySendString(cstate, "\r\n");
#endif
(void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
1, cstate->copy_file);
if (ferror(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to COPY file: %m")));
break;
case COPY_OLD_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
CopySendChar(cstate, '\n');
if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
{
/* no hope of recovering connection sync, so FATAL */
ereport(FATAL,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection lost during COPY to stdout")));
}
break;
case COPY_NEW_FE:
/* The FE/BE protocol uses \n as newline for all platforms */
CopySendChar(cstate, '\n');
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_EXTERNAL_SOURCE:
/* we don't actually do the write here, we let the caller do it */
#ifndef WIN32
CopySendChar(cstate, '\n');
#else
CopySendString(cstate, "\r\n");
#endif
return; /* don't want to reset msgbug quite yet */
}
/* Reset fe_msgbuf to empty */
fe_msgbuf->len = 0;
fe_msgbuf->data[0] = '\0';
}
/*
* AXG: This one is equivalent to CopySendEndOfRow() besides that
* it doesn't send end of row - it just flushed the data. We need
* this method for the dispatcher COPY TO since it already has data
* with newlines (from the executors).
*/
static void
CopyToDispatchFlush(CopyState cstate)
{
StringInfo fe_msgbuf = cstate->fe_msgbuf;
switch (cstate->copy_dest)
{
case COPY_FILE:
(void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
1, cstate->copy_file);
if (ferror(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to COPY file: %m")));
break;
case COPY_OLD_FE:
if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
{
/* no hope of recovering connection sync, so FATAL */
ereport(FATAL,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("connection lost during COPY to stdout")));
}
break;
case COPY_NEW_FE:
/* Dump the accumulated row as one CopyData message */
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_EXTERNAL_SOURCE:
Insist(false); /* internal error */
break;
}
/* Reset fe_msgbuf to empty */
fe_msgbuf->len = 0;
fe_msgbuf->data[0] = '\0';
}
/*
* CopyGetData reads data from the source (file or frontend)
* CopyGetChar does the same for single characters
*
* CopyGetEof checks if EOF was detected by previous Get operation.
*
* Note: when copying from the frontend, we expect a proper EOF mark per
* protocol; if the frontend simply drops the connection, we raise error.
* It seems unwise to allow the COPY IN to complete normally in that case.
*
* NB: no data conversion is applied by these functions
*
* Returns: the number of bytes that were successfully read
* into the data buffer.
*/
static int
CopyGetData(CopyState cstate, void *databuf, int datasize)
{
size_t bytesread = 0;
switch (cstate->copy_dest)
{
case COPY_FILE:
bytesread = fread(databuf, 1, datasize, cstate->copy_file);
if (feof(cstate->copy_file))
cstate->fe_eof = true;
break;
case COPY_OLD_FE:
if (pq_getbytes((char *) databuf, datasize))
{
/* Only a \. terminator is legal EOF in old protocol */
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection")));
}
bytesread += datasize; /* update the count of bytes that were
* read so far */
break;
case COPY_NEW_FE:
while (datasize > 0 && !cstate->fe_eof)
{
int avail;
while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
{
/* Try to receive another message */
int mtype;
readmessage:
mtype = pq_getbyte();
if (mtype == EOF)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection")));
if (pq_getmessage(cstate->fe_msgbuf, 0))
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on client connection")));
switch (mtype)
{
case 'd': /* CopyData */
break;
case 'c': /* CopyDone */
/* COPY IN correctly terminated by frontend */
cstate->fe_eof = true;
return bytesread;
case 'f': /* CopyFail */
ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED),
errmsg("COPY from stdin failed: %s",
pq_getmsgstring(cstate->fe_msgbuf))));
break;
case 'H': /* Flush */
case 'S': /* Sync */
/*
* Ignore Flush/Sync for the convenience of
* client libraries (such as libpq) that may
* send those without noticing that the
* command they just sent was COPY.
*/
goto readmessage;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("unexpected message type 0x%02X during COPY from stdin",
mtype)));
break;
}
}
avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
if (avail > datasize)
avail = datasize;
pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
databuf = (void *) ((char *) databuf + avail);
bytesread += avail; /* update the count of bytes that were
* read so far */
datasize -= avail;
}
break;
case COPY_EXTERNAL_SOURCE:
Insist(false); /* RET read their own data with external_senddata() */
break;
}
return bytesread;
}
/*
* ValidateControlChars
*
* These routine is common for COPY and external tables. It validates the
* control characters (delimiter, quote, etc..) and enforces the given rules.
*
* bool copy
* - pass true if you're COPY
* - pass false if you're an exttab
*
* bool load
* - pass true for inbound data (COPY FROM, SELECT FROM exttab)
* - pass false for outbound data (COPY TO, INSERT INTO exttab)
*/
void ValidateControlChars(bool copy, bool load, bool csv_mode, char *delim,
char *null_print, char *quote, char *escape,
List *force_quote, List *force_notnull,
bool header_line, bool fill_missing, char *newline,
int num_columns)
{
bool delim_off = (pg_strcasecmp(delim, "off") == 0);
/*
* DELIMITER
*
* Only single-byte delimiter strings are supported. In addition, if the
* server encoding is a multibyte character encoding we only allow the
* delimiter to be an ASCII character (like postgresql. For more info
* on this see discussion and comments in MPP-3756).
*/
if (pg_database_encoding_max_length() == 1)
{
/* single byte encoding such as ascii, latinx and other */
if (strlen(delim) != 1 && !delim_off)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("delimiter must be a single byte character, or \'off\'")));
}
else
{
/* multi byte encoding such as utf8 */
if ((strlen(delim) != 1 || IS_HIGHBIT_SET(delim[0])) && !delim_off )
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("delimiter must be a single ASCII character, or \'off\'")));
}
if (strchr(delim, '\r') != NULL ||
strchr(delim, '\n') != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("delimiter cannot be newline or carriage return")));
if (strchr(null_print, '\r') != NULL ||
strchr(null_print, '\n') != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("null representation cannot use newline or carriage return")));
if (!csv_mode && strchr(delim, '\\') != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("delimiter cannot be backslash")));
if (strchr(null_print, delim[0]) != NULL && !delim_off)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("delimiter must not appear in the NULL specification")));
/*
* Disallow unsafe delimiter characters in non-CSV mode. We can't allow
* backslash because it would be ambiguous. We can't allow the other
* cases because data characters matching the delimiter must be
* backslashed, and certain backslash combinations are interpreted
* non-literally by COPY IN. Disallowing all lower case ASCII letters
* is more than strictly necessary, but seems best for consistency and
* future-proofing. Likewise we disallow all digits though only octal
* digits are actually dangerous.
*/
if (!csv_mode && !delim_off &&
strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789", delim[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("delimiter cannot be \"%s\"", delim)));
if (delim_off)
{
/*
* We don't support delimiter 'off' for COPY because the QD COPY
* sometimes internally adds columns to the data that it sends to
* the QE COPY modules, and it uses the delimiter for it. There
* are ways to work around this but for now it's not important and
* we simply don't support it.
*/
if (copy)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Using no delimiter is only supported for external tables")));
if (num_columns != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Using no delimiter is only possible for a single column table")));
}
/*
* HEADER
*/
if(header_line)
{
if(!copy && Gp_role == GP_ROLE_DISPATCH)
{
/* (exttab) */
if(load)
{
/* RET */
ereport(NOTICE,
(errmsg("HEADER means that each one of the data files "
"has a header row.")));
}
else
{
/* WET */
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_YET),
errmsg("HEADER is not yet supported for writable external tables")));
}
}
}
/*
* QUOTE
*/
if (!csv_mode && quote != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("quote available only in CSV mode"),
errOmitLocation(true)));
if (csv_mode && strlen(quote) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("quote must be a single character"),
errOmitLocation(true)));
if (csv_mode && strchr(null_print, quote[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
if (csv_mode && delim[0] == quote[0])
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("delimiter and quote must be different")));
/*
* ESCAPE
*/
if (csv_mode && strlen(escape) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("escape in CSV format must be a single character"),
errOmitLocation(true)));
if (!csv_mode &&
(strchr(escape, '\r') != NULL ||
strchr(escape, '\n') != NULL))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("escape representation in text format cannot use newline or carriage return")));
if (!csv_mode && strlen(escape) != 1)
{
if (pg_strcasecmp(escape, "off"))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("escape must be a single character, or [OFF/off] to disable escapes")));
}
/*
* FORCE QUOTE
*/
if (!csv_mode && force_quote != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("force quote available only in CSV mode")));
if (force_quote != NIL && load)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("force quote only available for data unloading, not loading")));
/*
* FORCE NOT NULL
*/
if (!csv_mode && force_notnull != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("force not null available only in CSV mode")));
if (force_notnull != NIL && !load)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("force not null only available for data loading, not unloading")));
if (fill_missing && !load)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("fill missing fields only available for data loading, not unloading")));
/*
* NEWLINE
*/
if (newline)
{
if (!load)
{
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_YET),
errmsg("newline currently available for data loading only, not unloading")));
}
else
{
if(pg_strcasecmp(newline, "lf") != 0 &&
pg_strcasecmp(newline, "cr") != 0 &&
pg_strcasecmp(newline, "crlf") != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid value for NEWLINE (%s)", newline),
errhint("valid options are: 'LF', 'CRLF', 'CR'")));
}
}
}
/*
* DoCopy executes the SQL COPY statement
*
* Either unload or reload contents of table <relation>, depending on <from>.
* (<from> = TRUE means we are inserting into the table.) In the "TO" case
* we also support copying the output of an arbitrary SELECT query.
*
* If <pipe> is false, transfer is between the table and the file named
* <filename>. Otherwise, transfer is between the table and our regular
* input/output stream. The latter could be either stdin/stdout or a
* socket, depending on whether we're running under Postmaster control.
*
* Iff <oids>, unload or reload the format that includes OID information.
* On input, we accept OIDs whether or not the table has an OID column,
* but silently drop them if it does not. On output, we report an error
* if the user asks for OIDs in a table that has none (not providing an
* OID column might seem friendlier, but could seriously confuse programs).
*
* If in the text format, delimit columns with delimiter <delim> and print
* NULL values as <null_print>.
*
* When loading in the text format from an input stream (as opposed to
* a file), recognize a "\." on a line by itself as EOF. Also recognize
* a stream EOF. When unloading in the text format to an output stream,
* write a "." on a line by itself at the end of the data.
*
* Do not allow a Postgres user without superuser privilege to read from
* or write to a file.
*
* Do not allow the copy if user doesn't have proper permission to access
* the table.
*/
uint64
DoCopy(const CopyStmt *stmt, const char *queryString)
{
CopyState cstate;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL || Gp_role == GP_ROLE_EXECUTE);
List *attnamelist = stmt->attlist;
List *force_quote = NIL;
List *force_notnull = NIL;
List *err_segnos = NIL;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
AclResult aclresult;
ListCell *option;
TupleDesc tupDesc;
int num_phys_attrs;
uint64 processed;
bool qe_copy_from = (is_from && (Gp_role == GP_ROLE_EXECUTE));
/* save relationOid for auto-stats */
Oid relationOid = InvalidOid;
int savedSegNum = -1;
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
/* Extract options from the statement node tree */
foreach(option, stmt->options)
{
DefElem *defel = (DefElem *) lfirst(option);
if (strcmp(defel->defname, "binary") == 0)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("BINARY is not supported")));
}
else if (strcmp(defel->defname, "oids") == 0)
{
if (cstate->oids)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->oids = intVal(defel->arg);
}
else if (strcmp(defel->defname, "delimiter") == 0)
{
if (cstate->delim)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->delim = strVal(defel->arg);
}
else if (strcmp(defel->defname, "null") == 0)
{
if (cstate->null_print)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->null_print = strVal(defel->arg);
/*
* MPP-2010: unfortunately serialization function doesn't
* distinguish between 0x0 and empty string. Therefore we
* must assume that if NULL AS was indicated and has no value
* the actual value is an empty string.
*/
if(!cstate->null_print)
cstate->null_print = "";
}
else if (strcmp(defel->defname, "csv") == 0)
{
if (cstate->csv_mode)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->csv_mode = intVal(defel->arg);
}
else if (strcmp(defel->defname, "header") == 0)
{
if (cstate->header_line)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->header_line = intVal(defel->arg);
}
else if (strcmp(defel->defname, "quote") == 0)
{
if (cstate->quote)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->quote = strVal(defel->arg);
}
else if (strcmp(defel->defname, "escape") == 0)
{
if (cstate->escape)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->escape = strVal(defel->arg);
}
else if (strcmp(defel->defname, "force_quote") == 0)
{
if (force_quote)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
force_quote = (List *) defel->arg;
}
else if (strcmp(defel->defname, "force_notnull") == 0)
{
if (force_notnull)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
force_notnull = (List *) defel->arg;
}
else if (strcmp(defel->defname, "fill_missing_fields") == 0)
{
if (cstate->fill_missing)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->fill_missing = intVal(defel->arg);
}
else if (strcmp(defel->defname, "newline") == 0)
{
if (cstate->eol_str)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
cstate->eol_str = strVal(defel->arg);
}
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
}
/* Set defaults */
cstate->err_loc_type = ROWNUM_ORIGINAL;
cstate->eol_type = EOL_UNKNOWN;
cstate->escape_off = false;
if (!cstate->delim)
cstate->delim = cstate->csv_mode ? "," : "\t";
if (!cstate->null_print)
cstate->null_print = cstate->csv_mode ? "" : "\\N";
if (cstate->csv_mode)
{
if (!cstate->quote)
cstate->quote = "\"";
if (!cstate->escape)
cstate->escape = cstate->quote;
}
if (!cstate->csv_mode && !cstate->escape)
cstate->escape = "\\"; /* default escape for text mode */
/*
* Error handling setup
*/
if(stmt->sreh)
{
/* Single row error handling requested */
SingleRowErrorDesc *sreh;
sreh = (SingleRowErrorDesc *)stmt->sreh;
if (!is_from)
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
errmsg("COPY single row error handling only available using COPY FROM")));
if (sreh->errtable)
{
cstate->errMode = SREH_LOG;
}
else
{
cstate->errMode = SREH_IGNORE;
if (sreh->is_keep)
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
errmsg("KEEP may only be specified with a LOG INTO errortable clause")));
}
cstate->cdbsreh = makeCdbSreh(sreh->is_keep,
sreh->reusing_existing_errtable,
sreh->rejectlimit,
sreh->is_limit_in_rows,
sreh->errtable,
NULL, /* assign a value later */
stmt->filename,
stmt->relation->relname);
/* if necessary warn the user of the risk of table getting dropped */
if(sreh->errtable && Gp_role == GP_ROLE_DISPATCH && !sreh->reusing_existing_errtable)
emitSameTxnWarning();
}
else
{
/* No single row error handling requested. Use "all or nothing" */
cstate->cdbsreh = NULL; /* default - no SREH */
cstate->errMode = ALL_OR_NOTHING; /* default */
}
/* We must be a QE if we received the partitioning config */
if (stmt->partitions)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
cstate->partitions = stmt->partitions;
}
/*
* Validate our control characters and their combination
*/
ValidateControlChars(true,
is_from,
cstate->csv_mode,
cstate->delim,
cstate->null_print,
cstate->quote,
cstate->escape,
force_quote,
force_notnull,
cstate->header_line,
cstate->fill_missing,
cstate->eol_str,
0 /* pass correct value when COPY supports no delim */);
if (!pg_strcasecmp(cstate->escape, "off"))
cstate->escape_off = true;
/* set end of line type if NEWLINE keyword was specified */
if (cstate->eol_str)
CopyEolStrToType(cstate);
/* Disallow file COPY except to superusers. */
if (!pipe && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from a file"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
cstate->copy_dest = COPY_FILE; /* default */
cstate->filename = (Gp_role == GP_ROLE_EXECUTE ? NULL : stmt->filename); /* QE COPY always uses STDIN */
cstate->copy_file = NULL;
cstate->fe_msgbuf = NULL;
cstate->fe_eof = false;
cstate->missing_bytes = 0;
if(!is_from)
{
if (pipe)
{
if (whereToSendOutput == DestRemote)
cstate->fe_copy = true;
else
cstate->copy_file = stdout;
}
else
{
mode_t oumask; /* Pre-existing umask value */
struct stat st;
/*
* Prevent write to relative path ... too easy to shoot oneself in the
* foot by overwriting a database file ...
*/
if (!is_absolute_path(cstate->filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));
oumask = umask((mode_t) 022);
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
umask(oumask);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m",
cstate->filename),
errOmitLocation(true)));
// Increase buffer size to improve performance (cmcdevitt)
setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(cstate->copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
}
}
}
elog(DEBUG1,"DoCopy starting");
if (stmt->relation)
{
LOCKMODE lockmode = (is_from ? RowExclusiveLock : AccessShareLock);
Assert(!stmt->query);
cstate->queryDesc = NULL;
/* Open and lock the relation, using the appropriate lock type. */
cstate->rel = heap_openrv(stmt->relation, lockmode);
/* save relation oid for auto-stats call later */
relationOid = RelationGetRelid(cstate->rel);
/* Check relation permissions. */
aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel),
GetUserId(),
required_access);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(cstate->rel));
/* check read-only transaction */
if (XactReadOnly && is_from &&
!isTempNamespace(RelationGetNamespace(cstate->rel)))
ereport(ERROR,
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("transaction is read-only")));
/* Don't allow COPY w/ OIDs to or from a table without them */
if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
tupDesc = RelationGetDescr(cstate->rel);
}
else
{
Query *query = stmt->query;
List *rewritten;
PlannedStmt *plannedstmt;
DestReceiver *dest;
Assert(query);
Assert(!is_from);
cstate->rel = NULL;
/* Don't allow COPY w/ OIDs from a select */
if (cstate->oids)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT) WITH OIDS is not supported"),
errOmitLocation(true)));
if (query->intoClause)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT INTO) is not supported"),
errOmitLocation(true)));
/*
* The query has already been through parse analysis, but not
* rewriting or planning. Do that now.
*
* Because the planner is not cool about not scribbling on its input,
* we make a preliminary copy of the source querytree. This prevents
* problems in the case that the COPY is in a portal or plpgsql
* function and is executed repeatedly. (See also the same hack in
* EXPLAIN, DECLARE CURSOR and PREPARE.) XXX the planner really
* shouldn't modify its input ... FIXME someday.
*/
query = copyObject(query);
Assert(query->commandType == CMD_SELECT);
/*
* Must acquire locks in case we didn't come fresh from the parser.
* XXX this also scribbles on query, another reason for copyObject
*/
AcquireRewriteLocks(query);
/* Rewrite through rule system */
rewritten = QueryRewrite(query);
/* We don't expect more or less than one result query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result");
query = (Query *) linitial(rewritten);
Assert(query->commandType == CMD_SELECT);
/* plan the query */
plannedstmt = planner(query, 0, NULL, QRL_ONCE);
/*
* Update snapshot command ID to ensure this query sees results of any
* previously executed queries. (It's a bit cheesy to modify
* ActiveSnapshot without making a copy, but for the limited ways in
* which COPY can be invoked, I think it's OK, because the active
* snapshot shouldn't be shared with anything else anyway.)
*/
ActiveSnapshot->curcid = GetCurrentCommandId();
/* Create dest receiver for COPY OUT */
dest = CreateDestReceiver(DestCopyOut, NULL);
((DR_copy *) dest)->cstate = cstate;
/* Create a QueryDesc requesting no output */
cstate->queryDesc = CreateQueryDesc(plannedstmt, queryString,
ActiveSnapshot, InvalidSnapshot,
dest, NULL, false);
/*
* Call ExecutorStart to prepare the plan for execution.
*
* ExecutorStart computes a result tupdesc for us
*/
ExecutorStart(cstate->queryDesc, 0);
tupDesc = cstate->queryDesc->tupDesc;
}
cstate->attnamelist = attnamelist;
/* Generate or convert list of attributes to process */
cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
num_phys_attrs = tupDesc->natts;
/* Convert FORCE QUOTE name list to per-column flags, check validity */
cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (force_quote)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
NameStr(tupDesc->attrs[attnum - 1]->attname))));
cstate->force_quote_flags[attnum - 1] = true;
}
}
/* Convert FORCE NOT NULL name list to per-column flags, check validity */
cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (force_notnull)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
if (!list_member_int(cstate->attnumlist, attnum))
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
NameStr(tupDesc->attrs[attnum - 1]->attname))));
cstate->force_notnull_flags[attnum - 1] = true;
}
/* keep the raw version too, we will need it later */
cstate->force_notnull = force_notnull;
}
/*
* Set up variables to avoid per-attribute overhead.
*/
initStringInfo(&cstate->attribute_buf);
initStringInfo(&cstate->line_buf);
cstate->processed = 0;
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to
* validate data in multibyte encodings. However, transcoding must
* be skipped for COPY FROM in executor mode since data already arrived
* in server encoding (was validated and trancoded by dispatcher mode
* COPY). For this same reason encoding_embeds_ascii can never be true
* for COPY FROM in executor mode.
*/
cstate->client_encoding = pg_get_client_encoding();
cstate->need_transcoding =
((cstate->client_encoding != GetDatabaseEncoding() ||
pg_database_encoding_max_length() > 1) && !qe_copy_from);
cstate->encoding_embeds_ascii = (qe_copy_from ? false : PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding));
cstate->line_buf_converted = (Gp_role == GP_ROLE_EXECUTE ? true : false);
setEncodingConversionProc(cstate, pg_get_client_encoding(), !is_from);
/*
* some greenplum db specific vars
*/
cstate->is_copy_in = (is_from ? true : false);
if (is_from)
{
cstate->error_on_executor = false;
initStringInfo(&(cstate->executor_err_context));
}
if (is_from) /* copy from file to database */
{
bool pipe = (cstate->filename == NULL);
bool shouldDispatch = (Gp_role == GP_ROLE_DISPATCH &&
cstate->rel->rd_cdbpolicy != NULL);
char relkind;
Assert(cstate->rel);
relkind = cstate->rel->rd_rel->relkind;
if (relkind != RELKIND_RELATION)
{
if (relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to sequence \"%s\"",
RelationGetRelationName(cstate->rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to non-table relation \"%s\"",
RelationGetRelationName(cstate->rel))));
}
if(stmt->sreh && Gp_role != GP_ROLE_EXECUTE && !cstate->rel->rd_cdbpolicy)
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED),
errmsg("COPY single row error handling only available for distributed user tables")));
if (pipe)
{
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
cstate->copy_file = stdin;
}
else
{
struct stat st;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
cstate->filename),
errOmitLocation(true)));
// Increase buffer size to improve performance (cmcdevitt)
setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(cstate->copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
}
}
/*
* Append Only Tables.
*
* If QD, build a list of all the relations (relids) that may get data
* inserted into them as a part of this operation. This includes
* the relation specified in the COPY command, plus any partitions
* that it may have. Then, call assignPerRelSegno to assign a segfile
* number to insert into each of the Append Only relations that exists
* in this global list. We generate the list now and save it in cstate.
*
* If QE - get the QD generated list from CopyStmt and each relation can
* find it's assigned segno by looking at it (during CopyFrom).
*
* Utility mode always builds a one single mapping.
*/
if(shouldDispatch)
{
Oid relid = RelationGetRelid(cstate->rel);
List *all_relids = NIL;
GpPolicy *target_policy = NULL;
int min_target_segment_num = 0;
int max_target_segment_num = 0;
QueryResource *savedResource = NULL;
target_policy = GpPolicyFetch(CurrentMemoryContext, relid);
Assert(target_policy);
/*
* For hash table we use table bucket number to request vsegs
* For random table, we use a fixed GUC value to request vsegs.
*/
if(target_policy->nattrs > 0){
min_target_segment_num = target_policy->bucketnum;
max_target_segment_num = target_policy->bucketnum;
}
else{
min_target_segment_num = 1;
max_target_segment_num = hawq_rm_nvseg_for_copy_from_perquery;
}
pfree(target_policy);
cstate->resource = AllocateResource(QRL_ONCE, 1, 1, max_target_segment_num, min_target_segment_num,NULL,0);
savedResource = GetActiveQueryResource();
SetActiveQueryResource(cstate->resource);
all_relids = lappend_oid(all_relids, relid);
if (rel_is_partitioned(relid))
{
PartitionNode *pn = RelationBuildPartitionDesc(cstate->rel, false);
all_relids = list_concat(all_relids, all_partition_relids(pn));
}
cstate->ao_segnos = assignPerRelSegno(all_relids, list_length(cstate->resource->segments));
ListCell *cell;
foreach(cell, all_relids)
{
Oid relid = lfirst_oid(cell);
CreateAppendOnlyParquetSegFileOnMaster(relid, cstate->ao_segnos);
}
/* allocate segno for error table */
if (stmt->sreh && cstate->cdbsreh->errtbl)
{
Oid relid = RelationGetRelid(cstate->cdbsreh->errtbl);
Assert(!rel_is_partitioned(relid));
err_segnos = SetSegnoForWrite(NIL, relid, list_length(cstate->resource->segments), false, true, true);
if (Gp_role == GP_ROLE_DISPATCH)
CreateAppendOnlyParquetSegFileForRelationOnMaster(
cstate->cdbsreh->errtbl,
err_segnos);
}
SetActiveQueryResource(savedResource);
}
else
{
if (cstate->cdbsreh)
{
int segno = 0;
ResultRelSegFileInfo *segfileinfo = NULL;
if (stmt->err_aosegnos != NIL)
{
segno = list_nth_int(stmt->err_aosegnos, GetQEIndex());
segfileinfo = (ResultRelSegFileInfo *)list_nth(stmt->err_aosegfileinfos, GetQEIndex());
}
cstate->cdbsreh->err_aosegno = segno;
cstate->cdbsreh->err_aosegfileinfo = segfileinfo;
}
if (stmt->ao_segnos)
{
/* We must be a QE if we received the aosegnos config */
Assert(Gp_role == GP_ROLE_EXECUTE);
cstate->ao_segnos = stmt->ao_segnos;
cstate->ao_segfileinfos = stmt->ao_segfileinfos;
}
else
{
/*
* utility mode (or dispatch mode for no policy table).
* create a one entry map for our one and only relation
*/
if (RelationIsAoRows(cstate->rel) || RelationIsParquet(cstate->rel))
{
SegfileMapNode *n = makeNode(SegfileMapNode);
n->relid = RelationGetRelid(cstate->rel);
n->segnos = SetSegnoForWrite(NIL, n->relid, 1, false, true, true);
cstate->ao_segnos = lappend(cstate->ao_segnos, n);
}
}
}
/*
* Set up is done. Get to work!
*/
if (shouldDispatch)
{
/* data needs to get dispatched to segment databases */
CopyFromDispatch(cstate, err_segnos);
}
else
{
/* data needs to get inserted locally */
CopyFrom(cstate);
}
if (!pipe)
{
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m",
cstate->filename)));
}
}
else /* copy from database to file */
{
if (stmt->scantable_splits)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
if (RelationIsAoRows(cstate->rel) || RelationIsParquet(cstate->rel))
{
cstate->splits = stmt->scantable_splits;
}
else
{
cstate->splits = NIL;
}
}
cstate->resource = NULL;
DoCopyTo(cstate);
}
/*
* Close the relation or query. If reading, we can release the
* AccessShareLock we got; if writing, we should hold the lock until end
* of transaction to ensure that updates will be committed before lock is
* released.
*/
if (cstate->rel)
heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
if (cstate->queryDesc)
{
/* Close down the query and free resources. */
if (Gp_role == GP_ROLE_DISPATCH && cstate->queryDesc->resource != NULL)
{
savedSegNum = list_length(cstate->queryDesc->resource->segments);
}
ExecutorEnd(cstate->queryDesc);
FreeQueryDesc(cstate->queryDesc);
}
/* Clean up single row error handling related memory */
if(cstate->cdbsreh)
destroyCdbSreh(cstate->cdbsreh);
/**
* Query resource allocation for COPY probably contains four parts:
* 1) query resource for COPY itself
* 2) query resource for AO/Parquet segment file on HDFS
* 3) query resource for ANALYZE
* 4) query resource for several SPI for ANALYZE
* Query resource in 2 inherits from 1, and that in 4 inherits from 3.
*
* To prevent query resource "deadlock" that many concurrent
* COPY transactions hold query resource in 1 and 2, and thus
* makes some COPY transactions pending to get query resource
* for 3 and 4, here we return query resource for 1 and 2 as
* soon as possible.
*/
if (cstate->resource)
{
FreeResource(cstate->resource);
cstate->resource = NULL;
}
/* Clean up storage (probably not really necessary) */
processed = cstate->processed;
/* MPP-4407. Logging number of tuples copied */
if (Gp_role == GP_ROLE_DISPATCH
&& is_from
&& relationOid != InvalidOid
&& GetCommandLogLevel((Node *) stmt) <= log_statement)
{
elog(DEBUG1, "type_of_statement = %s dboid = %d tableoid = %d num_tuples_modified = %u",
autostats_cmdtype_to_string(AUTOSTATS_CMDTYPE_COPY),
MyDatabaseId,
relationOid,
(unsigned int) processed);
}
/* Fix for MPP-4082. Issue automatic ANALYZE if conditions are satisfied. */
if (Gp_role == GP_ROLE_DISPATCH && is_from)
{
auto_stats(AUTOSTATS_CMDTYPE_COPY, relationOid, processed, false /* inFunction */, savedSegNum);
} /*end auto-stats block*/
if(cstate->force_quote_flags)
pfree(cstate->force_quote_flags);
if(cstate->force_notnull_flags)
pfree(cstate->force_notnull_flags);
pfree(cstate->attribute_buf.data);
pfree(cstate->line_buf.data);
pfree(cstate);
return processed;
}
/*
* calculate virtual segment number for copy statement.
* if there is hash distributed relations exist, use the max bucket number.
* if all relation are random, use the data size to determine vseg number.
*/
static int calculate_virtual_segment_number(List* candidateOids) {
ListCell* le1;
int vsegNumber = 1;
int64 totalDataSize = 0;
bool isHashRelationExist = false;
int maxHashBucketNumber = 0;
int maxSegno = 0;
foreach (le1, candidateOids)
{
Oid candidateOid = InvalidOid;
candidateOid = lfirst_oid(le1);
//Relation rel = (Relation)lfirst(le1);
Relation rel = relation_open(candidateOid, AccessShareLock);
if (candidateOid > 0 ) {
GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext,
candidateOid);
if(targetPolicy == NULL){
return GetQueryVsegNum();
}
if (targetPolicy->nattrs > 0) {
isHashRelationExist = true;
if(maxHashBucketNumber < targetPolicy->bucketnum){
maxHashBucketNumber = targetPolicy->bucketnum;
}
}
/*
* if no hash relation, we calculate the data size of all the relations.
*/
if (!isHashRelationExist) {
totalDataSize += calculate_relation_size(rel);
}
// calculate file segno.
if(RelationIsAoRows(rel))
{
FileSegTotals *fstotal = GetSegFilesTotals(rel, SnapshotNow);
if(fstotal){
if(maxSegno < fstotal->totalfilesegs){
maxSegno = fstotal->totalfilesegs;
}
pfree(fstotal);
}
}
else if(RelationIsParquet(rel))
{
ParquetFileSegTotals *fstotal = GetParquetSegFilesTotals(rel, SnapshotNow);
if(fstotal){
if (maxSegno < fstotal->totalfilesegs) {
maxSegno = fstotal->totalfilesegs;
}
pfree(fstotal);
}
}
}
relation_close(rel, AccessShareLock);
}
if (isHashRelationExist) {
vsegNumber = maxHashBucketNumber;
} else {
/*we allocate one virtual segment for each 128M data */
totalDataSize >>= 27;
vsegNumber = totalDataSize + 1;
}
Assert(vsegNumber > 0);
/*vsegNumber should be less than GetUtilPartitionNum*/
if(vsegNumber > GetQueryVsegNum()){
vsegNumber = GetQueryVsegNum();
}
// if vsegnum bigger than maxsegno, which will lead to idle QE
if(vsegNumber > maxSegno && maxSegno > 0){
vsegNumber = maxSegno;
}
return vsegNumber;
}
/*
* This intermediate routine exists mainly to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
*/
static void
DoCopyTo(CopyState cstate)
{
bool pipe = (cstate->filename == NULL);
if (cstate->rel)
{
char relkind = cstate->rel->rd_rel->relkind;
if (relkind != RELKIND_RELATION)
{
if (relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from view \"%s\"",
RelationGetRelationName(cstate->rel)),
errhint("Try the COPY (SELECT ...) TO variant."),
errOmitLocation(true)));
else if (relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from sequence \"%s\"",
RelationGetRelationName(cstate->rel)),
errOmitLocation(true)));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from non-table relation \"%s\"",
RelationGetRelationName(cstate->rel)),
errOmitLocation(true)));
}
else if (RelationIsExternal(cstate->rel))
{
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from external relation \"%s\"",
RelationGetRelationName(cstate->rel)),
errhint("Try the COPY (SELECT ...) TO variant."),
errOmitLocation(true)));
}
}
PG_TRY();
{
if (cstate->fe_copy)
SendCopyBegin(cstate);
/*
* We want to dispatch COPY TO commands only in the case that
* we are the dispatcher and we are copying from a user relation
* (a relation where data is distributed in the segment databases).
* Otherwize, if we are not the dispatcher *or* if we are
* doing COPY (SELECT) we just go straight to work, without
* dispatching COPY commands to executors.
*/
if (Gp_role == GP_ROLE_DISPATCH && cstate->rel && cstate->rel->rd_cdbpolicy)
{
int target_segment_num = 0;
/*
* copy hash table use table bucket number
* copy random table use table size.
*/
PartitionNode *pn = get_parts(cstate->rel->rd_id, 0 /*level*/ ,
0 /*parent*/, false /* inctemplate */, CurrentMemoryContext, true /*includesubparts*/);
List *lFullRelOids = NIL;
if(pn){
lFullRelOids = all_leaf_partition_relids(pn);
lFullRelOids = list_concat(lFullRelOids, all_interior_partition_relids(pn)); /* interior partitions */
}
lFullRelOids = lappend_oid(lFullRelOids, cstate->rel->rd_id);
target_segment_num = calculate_virtual_segment_number(lFullRelOids);
elog(LOG, "virtual segment number of copy to is: %d\n", target_segment_num);
cstate->resource = AllocateResource(QRL_ONCE, 1, 1, target_segment_num, target_segment_num,NULL,0);
CopyToDispatch(cstate);
}
else
CopyTo(cstate);
if (cstate->fe_copy)
SendCopyEnd(cstate);
}
PG_CATCH();
{
/*
* Make sure we turn off old-style COPY OUT mode upon error. It is
* okay to do this in all cases, since it does nothing if the mode is
* not on.
*/
pq_endcopyout(true);
PG_RE_THROW();
}
PG_END_TRY();
if (!pipe)
{
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m",
cstate->filename)));
}
}
/*
* CopyToCreateDispatchCommand
*
* Create the COPY command that will get dispatched to the QE's.
*/
static void CopyToCreateDispatchCommand(CopyState cstate,
StringInfo cdbcopy_cmd,
AttrNumber num_phys_attrs,
Form_pg_attribute *attr)
{
ListCell *cur;
bool is_first_col = true;
/* append schema and tablename */
appendStringInfo(cdbcopy_cmd, "COPY %s.%s",
quote_identifier(get_namespace_name(RelationGetNamespace(cstate->rel))),
quote_identifier(RelationGetRelationName(cstate->rel)));
/*
* append column list. NOTE: if not specified originally, attnumlist will
* include all non-dropped columns of the table by default
*/
if(num_phys_attrs > 0) /* don't append anything for zero column table */
{
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
/* We don't add dropped attributes */
if (attr[m]->attisdropped)
continue;
/* append column string. quote it if needed */
appendStringInfo(cdbcopy_cmd, (is_first_col ? "(%s" : ",%s"),
quote_identifier(NameStr(attr[m]->attname)));
is_first_col = false;
}
if (!is_first_col)
appendStringInfo(cdbcopy_cmd, ")");
}
appendStringInfo(cdbcopy_cmd, " TO STDOUT WITH");
if (cstate->oids)
appendStringInfo(cdbcopy_cmd, " OIDS");
appendStringInfo(cdbcopy_cmd, " DELIMITER AS E'%s'", cstate->delim);
appendStringInfo(cdbcopy_cmd, " NULL AS E'%s'", escape_quotes(cstate->null_print));
/* if default escape in text format ("\") leave expression out */
if (!cstate->csv_mode && strcmp(cstate->escape, "\\") != 0)
appendStringInfo(cdbcopy_cmd, " ESCAPE AS E'%s'", cstate->escape);
if (cstate->csv_mode)
{
appendStringInfo(cdbcopy_cmd, " CSV");
appendStringInfo(cdbcopy_cmd, " QUOTE AS E'%s'", escape_quotes(cstate->quote));
appendStringInfo(cdbcopy_cmd, " ESCAPE AS E'%s'", escape_quotes(cstate->escape));
/* do NOT include HEADER. Header row is created by dispatcher COPY */
}
}
/*
* Copy from relation TO file. Starts a COPY TO command on each of
* the executors and gathers all the results and writes it out.
*/
void
CopyToDispatch(CopyState cstate)
{
TupleDesc tupDesc;
int num_phys_attrs;
int attr_count;
Form_pg_attribute *attr;
CdbCopy *cdbCopy;
StringInfoData cdbcopy_err;
StringInfoData cdbcopy_cmd;
tupDesc = cstate->rel->rd_att;
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
/*
* prepare to get COPY data from segDBs:
* 1 - re-construct the orignial COPY command sent from the client.
* 2 - execute a BEGIN DTM transaction.
* 3 - send the COPY command to all segment databases.
*/
cdbCopy = makeCdbCopy(false, cstate->resource);
cdbCopy->partitions = RelationBuildPartitionDesc(cstate->rel, false);
/* XXX: lock all partitions */
/* allocate memory for error and copy strings */
initStringInfo(&cdbcopy_err);
initStringInfo(&cdbcopy_cmd);
/* create the command to send to QE's and store it in cdbcopy_cmd */
CopyToCreateDispatchCommand(cstate,
&cdbcopy_cmd,
num_phys_attrs,
attr);
/*
* Start a COPY command in every db of every segment in Greenplum Database.
*
* From this point in the code we need to be extra careful
* about error handling. ereport() must not be called until
* the COPY command sessions are closed on the executors.
* Calling ereport() will leave the executors hanging in
* COPY state.
*/
elog(DEBUG5, "COPY command sent to segdbs: %s", cdbcopy_cmd.data);
PG_TRY();
{
cdbCopyStart(cdbCopy, cdbcopy_cmd.data, RelationGetRelid(cstate->rel), InvalidOid, NIL);
}
PG_CATCH();
{
/* get error message from CopyStart */
appendBinaryStringInfo(&cdbcopy_err, cdbCopy->err_msg.data, cdbCopy->err_msg.len);
/* TODO: end COPY in all the segdbs in progress */
cdbCopyEnd(cdbCopy);
ereport(LOG,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("%s", cdbcopy_err.data)));
PG_RE_THROW();
}
PG_END_TRY();
PG_TRY();
{
/* if a header has been requested send the line */
if (cstate->header_line)
{
ListCell *cur;
bool hdr_delim = false;
/*
* For non-binary copy, we need to convert null_print to client
* encoding, because it will be sent directly with CopySendString.
*
* MPP: in here we only care about this if we need to print the
* header. We rely on the segdb server copy out to do the conversion
* before sending the data rows out. We don't need to repeat it here
*/
if (cstate->need_transcoding)
cstate->null_print = (char *)
pg_server_to_custom(cstate->null_print,
strlen(cstate->null_print),
cstate->client_encoding,
cstate->enc_conversion_proc);
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
char *colname;
if (hdr_delim)
CopySendChar(cstate, cstate->delim[0]);
hdr_delim = true;
colname = NameStr(attr[attnum - 1]->attname);
CopyAttributeOutCSV(cstate, colname, false,
list_length(cstate->attnumlist) == 1);
}
/* add a newline and flush the data */
CopySendEndOfRow(cstate);
}
/*
* This is the main work-loop. In here we keep collecting data from the
* COPY commands on the segdbs, until no more data is available. We
* keep writing data out a chunk at a time.
*/
while(true)
{
bool done;
bool copy_cancel = (QueryCancelPending ? true : false);
/* get a chunk of data rows from the QE's */
done = cdbCopyGetData(cdbCopy, copy_cancel, &cstate->processed);
/* send the chunk of data rows to destination (file or stdout) */
if(cdbCopy->copy_out_buf.len > 0) /* conditional is important! */
{
/*
* in the dispatcher we receive chunks of whole rows with row endings.
* We don't want to use CopySendEndOfRow() b/c it adds row endings and
* also b/c it's intended for a single row at a time. Therefore we need
* to fill in the out buffer and just flush it instead.
*/
CopySendData(cstate, (void *) cdbCopy->copy_out_buf.data, cdbCopy->copy_out_buf.len);
CopyToDispatchFlush(cstate);
}
if(done)
{
if(cdbCopy->remote_data_err || cdbCopy->io_errors)
appendBinaryStringInfo(&cdbcopy_err, cdbCopy->err_msg.data, cdbCopy->err_msg.len);
break;
}
}
dispatch_free_result(&cdbCopy->executors);
}
PG_CATCH();
{
dispatch_free_result(&cdbCopy->executors);
PG_RE_THROW();
}
PG_END_TRY();
/* we can throw the error now if QueryCancelPending was set previously */
CHECK_FOR_INTERRUPTS();
/*
* report all accumulated errors back to the client.
*/
if (cdbCopy->remote_data_err)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("%s", cdbcopy_err.data)));
if (cdbCopy->io_errors)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("%s", cdbcopy_err.data)));
pfree(cdbcopy_cmd.data);
pfree(cdbcopy_err.data);
pfree(cdbCopy);
}
/*
* Copy from relation or query TO file.
*/
static void
CopyTo(CopyState cstate)
{
TupleDesc tupDesc;
int num_phys_attrs;
Form_pg_attribute *attr;
ListCell *cur;
List *target_rels = NIL;
ListCell *lc;
if (cstate->rel)
{
if (cstate->partitions)
{
ListCell *lc;
List *relids = all_partition_relids(cstate->partitions);
foreach(lc, relids)
{
Oid relid = lfirst_oid(lc);
Relation rel = heap_open(relid, AccessShareLock);
target_rels = lappend(target_rels, rel);
}
}
else
target_rels = lappend(target_rels, cstate->rel);
tupDesc = RelationGetDescr(cstate->rel);
}
else
tupDesc = cstate->queryDesc->tupDesc;
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
cstate->null_print_client = cstate->null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
cstate->out_functions =
(FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
/* Get info about the columns we need to process. */
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
getTypeOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
}
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype output routines, and should be faster than retail pfree's
* anyway. (We don't need a whole econtext as CopyFrom does.)
*/
cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY TO",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* we need to convert null_print to client
* encoding, because it will be sent directly with CopySendString.
*/
if (cstate->need_transcoding)
cstate->null_print_client = pg_server_to_custom(cstate->null_print,
cstate->null_print_len,
cstate->client_encoding,
cstate->enc_conversion_proc);
/* if a header has been requested send the line */
if (cstate->header_line)
{
/* header should not be printed in execute mode. */
if (Gp_role != GP_ROLE_EXECUTE)
{
bool hdr_delim = false;
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
char *colname;
if (hdr_delim)
CopySendChar(cstate, cstate->delim[0]);
hdr_delim = true;
colname = NameStr(attr[attnum - 1]->attname);
CopyAttributeOutCSV(cstate, colname, false,
list_length(cstate->attnumlist) == 1);
}
CopySendEndOfRow(cstate);
}
}
if (cstate->rel)
{
foreach(lc, target_rels)
{
Relation rel = lfirst(lc);
Datum *values;
bool *nulls;
HeapScanDesc scandesc = NULL; /* used if heap table */
AppendOnlyScanDesc aoscandesc = NULL; /* append only table */
tupDesc = RelationGetDescr(rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
/*
* We need to update attnumlist because different partition
* entries might have dropped tables.
*/
cstate->attnumlist =
CopyGetAttnums(tupDesc, rel, cstate->attnamelist);
pfree(cstate->out_functions);
cstate->out_functions =
(FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
/* Get info about the columns we need to process. */
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
getTypeOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
}
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
if (RelationIsHeap(rel))
{
HeapTuple tuple;
scandesc = heap_beginscan(rel, ActiveSnapshot, 0, NULL);
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
{
CHECK_FOR_INTERRUPTS();
/* Deconstruct the tuple ... faster than repeated heap_getattr */
heap_deform_tuple(tuple, tupDesc, values, nulls);
/* Format and send the data */
CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
}
heap_endscan(scandesc);
}
else if(RelationIsAoRows(rel))
{
MemTuple tuple;
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupDesc);
MemTupleBinding *mt_bind = create_memtuple_binding(tupDesc);
aoscandesc = appendonly_beginscan(rel, ActiveSnapshot, 0, NULL);
aoscandesc->splits = GetFileSplitsOfSegment(cstate->splits,rel->rd_id, GetQEIndex());
while ((tuple = appendonly_getnext(aoscandesc, ForwardScanDirection, slot)) != NULL)
{
CHECK_FOR_INTERRUPTS();
/* Extract all the values of the tuple */
slot_getallattrs(slot);
values = slot_get_values(slot);
nulls = slot_get_isnull(slot);
/* Format and send the data */
CopyOneRowTo(cstate, MemTupleGetOid(tuple, mt_bind), values, nulls);
}
ExecDropSingleTupleTableSlot(slot);
appendonly_endscan(aoscandesc);
}
else if(RelationIsParquet(rel))
{
ParquetScanDesc scan = NULL;
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupDesc);
bool *proj = NULL;
int nvp = tupDesc->natts;
int i;
if (tupDesc->tdhasoid)
{
elog(ERROR, "OIDS=TRUE is not allowed on tables that use column-oriented storage. Use OIDS=FALSE");
}
proj = palloc(sizeof(bool) * nvp);
for(i=0; i<nvp; ++i)
proj[i] = true;
scan = parquet_beginscan(rel, ActiveSnapshot, 0, proj);
scan->splits = GetFileSplitsOfSegment(cstate->splits, rel->rd_id, GetQEIndex());
for(;;)
{
parquet_getnext(scan, ForwardScanDirection, slot);
if (TupIsNull(slot))
break;
/* Extract all the values of the tuple */
slot_getallattrs(slot);
values = slot_get_values(slot);
nulls = slot_get_isnull(slot);
/* Format and send the data */
CopyOneRowTo(cstate, InvalidOid, values, nulls);
}
ExecDropSingleTupleTableSlot(slot);
parquet_endscan(scan);
}
else
{
/* should never get here */
Assert(false);
}
/* partition table, so close */
if (cstate->partitions)
heap_close(rel, NoLock);
}
}
else
{
Assert(Gp_role != GP_ROLE_EXECUTE);
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
}
MemoryContextDelete(cstate->rowcontext);
}
void
CopyOneCustomRowTo(CopyState cstate, bytea *value)
{
appendBinaryStringInfo(cstate->fe_msgbuf,
VARDATA_ANY((void *) value),
VARSIZE_ANY_EXHDR((void *) value));
}
/*
* Emit one row during CopyTo().
*/
void
CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *isnulls)
{
bool need_delim = false;
FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
ListCell *cur;
char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
/* Text format has no per-tuple header, but send OID if wanted */
/* Assume digits don't need any quoting or encoding conversion */
if (cstate->oids)
{
string = DatumGetCString(DirectFunctionCall1(oidout,
ObjectIdGetDatum(tupleOid)));
CopySendString(cstate, string);
need_delim = true;
}
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
bool isnull = isnulls[attnum-1];
Datum value = values[attnum-1];
if (need_delim)
CopySendChar(cstate, cstate->delim[0]);
need_delim = true;
if (isnull)
{
CopySendString(cstate, cstate->null_print_client);
}
else
{
/* int2out or int4out ? */
if (out_functions[attnum -1].fn_oid == 39 || /* int2out or int4out */
out_functions[attnum -1].fn_oid == 43 )
{
char tmp[INT32_CHAR_SIZE];
/*
* The standard postgres way is to call the output function, but that involves one or more pallocs,
* and a call to sprintf, followed by a conversion to client charset.
* Do a fast conversion to string instead.
*/
if (out_functions[attnum -1].fn_oid == 39)
pg_itoa(DatumGetInt16(value),tmp);
else
pg_ltoa(DatumGetInt32(value),tmp);
/*
* integers shouldn't need quoting, and shouldn't need transcoding to client char set
*/
CopySendData(cstate, tmp, strlen(tmp));
}
else if (out_functions[attnum -1].fn_oid == 1702) /* numeric_out */
{
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
/*
* numeric shouldn't need quoting, and shouldn't need transcoding to client char set
*/
CopySendData(cstate, string, strlen(string));
}
else
{
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
if (cstate->csv_mode)
CopyAttributeOutCSV(cstate, string,
cstate->force_quote_flags[attnum - 1],
list_length(cstate->attnumlist) == 1);
else
CopyAttributeOutText(cstate, string);
}
}
}
/*
* Finish off the row: write it to the destination, and update the count.
* However, if we're in the context of a writable external table, we let
* the caller do it - send the data to its local external source (see
* external_insert() ).
*/
if(cstate->copy_dest != COPY_EXTERNAL_SOURCE)
{
CopySendEndOfRow(cstate);
cstate->processed++;
}
MemoryContextSwitchTo(oldcontext);
}
/*
* CopyFromCreateDispatchCommand
*
* The COPY command that needs to get dispatched to the QE's isn't necessarily
* the same command that arrived from the parser to the QD. For example, we
* always change filename to STDIN, we may pre-evaluate constant values or
* functions on the QD and send them to the QE with an extended column list.
*/
static void CopyFromCreateDispatchCommand(CopyState cstate,
StringInfo cdbcopy_cmd,
GpPolicy *policy,
AttrNumber num_phys_attrs,
AttrNumber num_defaults,
AttrNumber p_nattrs,
AttrNumber h_attnum,
int *defmap,
ExprState **defexprs,
Form_pg_attribute *attr)
{
ListCell *cur;
bool is_first_col;
int i,
p_index = 0;
AttrNumber extra_attr_count = 0; /* count extra attributes we add in the dispatcher COPY
usually non constant defaults we pre-evaluate in here */
Assert(Gp_role == GP_ROLE_DISPATCH);
/* append schema and tablename */
appendStringInfo(cdbcopy_cmd, "COPY %s.%s",
quote_identifier(get_namespace_name(RelationGetNamespace(cstate->rel))),
quote_identifier(RelationGetRelationName(cstate->rel)));
/*
* append column list. NOTE: if not specified originally, attnumlist will
* include all non-dropped columns of the table by default
*/
if(num_phys_attrs > 0) /* don't append anything for zero column table */
{
is_first_col = true;
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
/* We don't add dropped attributes */
if (attr[m]->attisdropped)
continue;
/* append column string. quote it if needed */
appendStringInfo(cdbcopy_cmd, (is_first_col ? "(%s" : ",%s"),
quote_identifier(NameStr(attr[m]->attname)));
is_first_col = false;
}
/*
* In order to maintain consistency between the primary and mirror segment data, we
* want to evaluate all table columns that are not participating in this COPY command
* and have a non-constant default values on the dispatcher. If we let them evaluate
* on the primary and mirror executors separately - they will get different values.
* Also, if the distribution column is not participating and it has any default value,
* we have to evaluate it on the dispatcher only too, so that it wouldn't hash as a null
* and inserted as a default value on the segment databases.
*
* Therefore, we include these columns in the column list for the executor COPY.
* The default values will be evaluated on the dispatcher COPY and the results for
* the added columns will be appended to each data row that is shipped to the segments.
*/
extra_attr_count = 0;
for (i = 0; i < num_defaults; i++)
{
bool add_to_list = false;
/* check 1: is this default for a distribution column? */
for (p_index = 0; p_index < p_nattrs; p_index++)
{
h_attnum = policy->attrs[p_index];
if(h_attnum - 1 == defmap[i])
add_to_list = true;
}
/* check 2: is this a non constant default? */
if(defexprs[i]->expr->type != T_Const)
add_to_list = true;
if(add_to_list)
{
/* We don't add dropped attributes */
if (attr[defmap[i]]->attisdropped)
continue;
/* append column string. quote it if needed */
appendStringInfo(cdbcopy_cmd, (is_first_col ? "(%s" : ",%s"),
quote_identifier(NameStr(attr[defmap[i]]->attname)));
extra_attr_count++;
is_first_col = false;
}
}
if (!is_first_col)
appendStringInfo(cdbcopy_cmd, ")");
}
/*
* NOTE: we used to always pass STDIN here to the QEs. But since we want
* the QEs to know the original file name for recording it in an error table
* (if they use one) we actually pass the filename here, and in the QE COPY
* we get it, save it, and then always revert back to actually using STDIN.
* (if we originally use STDIN we just pass it along and record that in the
* error table).
*/
if(cstate->filename)
appendStringInfo(cdbcopy_cmd, " FROM '%s' WITH", cstate->filename);
else
appendStringInfo(cdbcopy_cmd, " FROM STDIN WITH");
if (cstate->oids)
appendStringInfo(cdbcopy_cmd, " OIDS");
appendStringInfo(cdbcopy_cmd, " DELIMITER AS E'%s'", cstate->delim);
appendStringInfo(cdbcopy_cmd, " NULL AS E'%s'", escape_quotes(cstate->null_print));
/* if default escape in text format ("\") leave expression out */
if (!cstate->csv_mode && strcmp(cstate->escape, "\\") != 0)
appendStringInfo(cdbcopy_cmd, " ESCAPE AS E'%s'", cstate->escape);
/* if EOL is already defined it means that NEWLINE was declared. pass it along */
if (cstate->eol_type != EOL_UNKNOWN)
{
Assert(cstate->eol_str);
appendStringInfo(cdbcopy_cmd, " NEWLINE AS '%s'", cstate->eol_str);
}
if (cstate->csv_mode)
{
appendStringInfo(cdbcopy_cmd, " CSV");
appendStringInfo(cdbcopy_cmd, " QUOTE AS E'%s'", escape_quotes(cstate->quote));
appendStringInfo(cdbcopy_cmd, " ESCAPE AS E'%s'", escape_quotes(cstate->escape));
if(cstate->force_notnull)
{
ListCell *l;
is_first_col = true;
appendStringInfo(cdbcopy_cmd, " FORCE NOT NULL");
foreach(l, cstate->force_notnull)
{
const char *col_name = strVal(lfirst(l));
appendStringInfo(cdbcopy_cmd, (is_first_col ? " %s" : ",%s"),
quote_identifier(col_name));
is_first_col = false;
}
}
/* do NOT include HEADER. Header row is "swallowed" by dispatcher COPY */
}
if (cstate->fill_missing)
appendStringInfo(cdbcopy_cmd, " FILL MISSING FIELDS");
/* add single row error handling clauses if necessary */
if (cstate->errMode != ALL_OR_NOTHING)
{
if (cstate->errMode == SREH_LOG)
{
appendStringInfo(cdbcopy_cmd, " LOG ERRORS INTO %s.%s",
quote_identifier(get_namespace_name(RelationGetNamespace(cstate->cdbsreh->errtbl))),
quote_identifier(RelationGetRelationName(cstate->cdbsreh->errtbl)));
}
appendStringInfo(cdbcopy_cmd, " SEGMENT REJECT LIMIT %d %s",
cstate->cdbsreh->rejectlimit, (cstate->cdbsreh->is_limit_in_rows ? "ROWS" : "PERCENT"));
}
}
/*
* Copy FROM file to relation.
*/
void
CopyFromDispatch(CopyState cstate, List *err_segnos)
{
TupleDesc tupDesc;
Form_pg_attribute *attr;
AttrNumber num_phys_attrs,
attr_count,
num_defaults;
FmgrInfo *in_functions;
FmgrInfo *out_functions; /* for handling defaults in Greenplum Database */
Oid *typioparams;
int attnum;
int i;
int p_index;
Oid in_func_oid;
Oid out_func_oid;
Oid relerror = InvalidOid;
Datum *values;
bool *nulls;
int *attr_offsets;
int total_rejeted_from_qes = 0;
bool isnull;
bool *isvarlena;
ResultRelInfo *resultRelInfo;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
bool file_has_oids = false;
int *defmap;
ExprState **defexprs; /* array of default att expressions */
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
bool no_more_data = false;
ListCell *cur;
bool cur_row_rejected = false;
CdbCopy *cdbCopy;
/*
* This stringInfo will contain 2 types of error messages:
*
* 1) Data errors refer to errors that are a result of inappropriate
* input data or constraint violations. All data error messages
* from the segment databases will be added to this variable and
* reported back to the client at the end of the copy command
* execution on the dispatcher.
* 2) Any command execution error that occurs during this COPY session.
* Such errors will usually be failure to send data over the network,
* a COPY command that was rejected by the segment databases or any I/O
* error.
*/
StringInfoData cdbcopy_err;
/*
* a reconstructed and modified COPY command that is dispatched to segments.
*/
StringInfoData cdbcopy_cmd;
/*
* Variables for cdbpolicy
*/
GpPolicy *policy; /* the partitioning policy for this table */
AttrNumber p_nattrs; /* num of attributes in the distribution policy */
Oid *p_attr_types; /* types for each policy attribute */
/* variables for partitioning */
Datum *part_values = NULL;
Oid *part_attr_types = NULL; /* types for partitioning */
Oid *part_typio = NULL;
FmgrInfo *part_infuncs = NULL;
AttrNumber *part_attnum = NULL;
int part_attnums = 0;
/*
* Variables for original row number tracking
*/
StringInfoData line_buf_with_lineno;
int original_lineno_for_qe;
/*
* Variables for cdbhash
*/
/*
* In the case of partitioned tables with children that have different
* distribution policies, we maintain a hash table of CdbHashs and
* GpPolicies for each child table. We lazily add them to the hash --
* when a partition is returned which we haven't seen before, we makeCdbHash
* and copy the policy over.
*/
typedef struct
{
Oid relid;
CdbHash *cdbHash; /* a CdbHash API object */
GpPolicy *policy; /* policy for this cdb hash */
} cdbhashdata;
/* The actually hash table. Only initialised if we need it. */
HTAB *hashmap = NULL;
CdbHash *cdbHash = NULL;
AttrNumber h_attnum; /* hash key attribute number */
Datum h_key; /* hash key value */
unsigned int target_seg = 0; /* result segment of cdbhash */
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
num_defaults = 0;
h_attnum = 0;
/*
* Init original row number tracking vars
*/
initStringInfo(&line_buf_with_lineno);
original_lineno_for_qe = 1;
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of
* code here that basically duplicated execUtils.c ...)
*/
resultRelInfo = makeNode(ResultRelInfo);
resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
resultRelInfo->ri_RelationDesc = cstate->rel;
resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
if (resultRelInfo->ri_TrigDesc)
resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
resultRelInfo->ri_TrigInstrument = NULL;
ResultRelInfoSetSegno(resultRelInfo, cstate->ao_segnos);
ExecOpenIndices(resultRelInfo);
estate->es_result_relations = resultRelInfo;
estate->es_num_result_relations = 1;
estate->es_result_relation_info = resultRelInfo;
econtext = GetPerTupleExprContext(estate);
/*
* Pick up the required catalog information for each attribute in the
* relation, including the input function, the element type (to pass
* to the input function), and info about defaults and constraints.
*/
in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
isvarlena = (bool *) palloc(num_phys_attrs * sizeof(bool));
for (attnum = 1; attnum <= num_phys_attrs; attnum++)
{
/* We don't need info for dropped attributes */
if (attr[attnum - 1]->attisdropped)
continue;
/* Fetch the input function and typioparam info */
getTypeInputInfo(attr[attnum - 1]->atttypid,
&in_func_oid, &typioparams[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
/*
* Fetch the output function and typioparam info. We need it
* for handling default functions on the dispatcher COPY, if
* there are any.
*/
getTypeOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena[attnum - 1]);
fmgr_info(out_func_oid, &out_functions[attnum - 1]);
/* TODO: is force quote array necessary for default conversion */
/* Get default info if needed */
if (!list_member_int(cstate->attnumlist, attnum))
{
/* attribute is NOT to be copied from input */
/* use default value if one exists */
Node *defexpr = build_column_default(cstate->rel, attnum);
if (defexpr != NULL)
{
defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
estate);
defmap[num_defaults] = attnum - 1;
num_defaults++;
}
}
}
/*
* prepare to COPY data into segDBs:
* - set table partitioning information
* - set append only table relevant info for dispatch.
* - get the distribution policy for this table.
* - build a COPY command to dispatch to segdbs.
* - dispatch the modified COPY command to all segment databases.
* - prepare cdbhash for hashing on row values.
*/
cdbCopy = makeCdbCopy(true, cstate->resource);
estate->es_result_partitions = cdbCopy->partitions =
RelationBuildPartitionDesc(cstate->rel, false);
CopyInitPartitioningState(estate);
if (list_length(cstate->ao_segnos) > 0)
cdbCopy->ao_segnos = cstate->ao_segnos;
/* add cdbCopy reference to cdbSreh (if needed) */
if (cstate->errMode != ALL_OR_NOTHING)
cstate->cdbsreh->cdbcopy = cdbCopy;
/* get the CDB policy for this table and prepare for hashing */
if (estate->es_result_partitions &&
!partition_policies_equal(cstate->rel->rd_cdbpolicy,
estate->es_result_partitions))
{
/*
* This is a partitioned table that has multiple, different
* distribution policies.
*
* We build up a fake policy comprising the set of all columns used
* to distribute all children in the partition configuration. That way
* we're sure to parse all necessary columns in the input data and we
* have all column types handy.
*/
List *cols = NIL;
ListCell *lc;
HASHCTL hash_ctl;
partition_get_policies_attrs(estate->es_result_partitions,
cstate->rel->rd_cdbpolicy,
&cols);
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(cdbhashdata);
hash_ctl.hash = oid_hash;
hash_ctl.hcxt = CurrentMemoryContext;
hashmap = hash_create("partition cdb hash map",
100 /* XXX: need a better value, but what? */,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
p_nattrs = list_length(cols);
policy = palloc(sizeof(GpPolicy) + sizeof(AttrNumber) * p_nattrs);
i = 0;
foreach(lc, cols)
policy->attrs[i++] = lfirst_int(lc);
}
else
{
policy = GpPolicyCopy(CurrentMemoryContext, cstate->rel->rd_cdbpolicy);
if (policy)
p_nattrs = policy->nattrs; /* number of partitioning keys */
else
p_nattrs = 0;
/* Create hash API reference */
cdbHash = makeCdbHash(cdbCopy->partition_num, HASH_FNV_1);
}
/*
* Extract types for each partition key from the tuple descriptor,
* and convert them when necessary. We don't want to do this
* for each tuple since get_typtype() is quite expensive when called
* lots of times.
*
* The array key for p_attr_types is the attribute number of the attribute
* in question.
*/
p_attr_types = (Oid *)palloc0(num_phys_attrs * sizeof(Oid));
for (i = 0; i < p_nattrs; i++)
{
h_attnum = policy->attrs[i];
/*
* get the data type of this attribute. If it's an
* array type use anyarray, or else just use as is.
*/
if (attr[h_attnum - 1]->attndims > 0)
p_attr_types[h_attnum - 1] = ANYARRAYOID;
else
{
/* If this type is a domain type, get its base type. */
p_attr_types[h_attnum - 1] = attr[h_attnum - 1]->atttypid;
if (get_typtype(p_attr_types[h_attnum - 1]) == 'd')
p_attr_types[h_attnum - 1] =
getBaseType(p_attr_types[h_attnum - 1]);
}
}
/* allocate memory for error and copy strings */
initStringInfo(&cdbcopy_err);
initStringInfo(&cdbcopy_cmd);
/* store the COPY command string in cdbcopy_cmd */
CopyFromCreateDispatchCommand(cstate,
&cdbcopy_cmd,
policy,
num_phys_attrs,
num_defaults,
p_nattrs,
h_attnum,
defmap,
defexprs,
attr);
/*
* for optimized parsing - get the last field number in the
* file that we need to parse to have all values for the hash keys.
* (If the table has an empty distribution policy, then we don't need
* to parse any attributes really... just send the row away using
* a special cdbhash function designed for this purpose).
*/
cstate->last_hash_field = 0;
for (p_index = 0; p_index < p_nattrs; p_index++)
{
i = 1;
/*
* for this partitioning key, search for its location in the attr list.
* (note that fields may be out of order).
*/
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
if (attnum == policy->attrs[p_index])
{
if (i > cstate->last_hash_field)
cstate->last_hash_field = i;
}
if (estate->es_result_partitions)
{
if (attnum == estate->es_partition_state->max_partition_attr)
{
if (i > cstate->last_hash_field)
cstate->last_hash_field = i;
}
}
i++;
}
}
/*
* Dispatch the COPY command.
*
* From this point in the code we need to be extra careful about error
* handling. ereport() must not be called until the COPY command sessions
* are closed on the executors. Calling ereport() will leave the executors
* hanging in COPY state.
*
* For errors detected by the dispatcher, we save the error message in
* cdbcopy_err StringInfo, move on to closing all COPY sessions on the
* executors and only then raise an error. We need to make sure to TRY/CATCH
* all other errors that may be raised from elsewhere in the backend. All
* error during COPY on the executors will be detected only when we end the
* COPY session there, so we are fine there.
*/
elog(DEBUG5, "COPY command sent to segdbs: %s", cdbcopy_cmd.data);
PG_TRY();
{
if (cstate->cdbsreh && cstate->cdbsreh->errtbl)
relerror = RelationGetRelid(cstate->cdbsreh->errtbl);
cdbCopyStart(cdbCopy, cdbcopy_cmd.data,
RelationGetRelid(cstate->rel), relerror,
err_segnos);
}
PG_CATCH();
{
/* get error message from CopyStart */
appendBinaryStringInfo(&cdbcopy_err, cdbCopy->err_msg.data, cdbCopy->err_msg.len);
/* end COPY in all the segdbs in progress */
cdbCopyEnd(cdbCopy);
/* get error message from CopyEnd */
appendBinaryStringInfo(&cdbcopy_err, cdbCopy->err_msg.data, cdbCopy->err_msg.len);
ereport(LOG,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("%s", cdbcopy_err.data)));
PG_RE_THROW();
}
PG_END_TRY();
/*
* Prepare to catch AFTER triggers.
*/
//AfterTriggerBeginQuery();
/*
* Check BEFORE STATEMENT insertion triggers. It's debateable whether
* we should do this for COPY, since it's not really an "INSERT"
* statement as such. However, executing these triggers maintains
* consistency with the EACH ROW triggers that we already fire on
* COPY.
*/
//ExecBSInsertTriggers(estate, resultRelInfo);
file_has_oids = cstate->oids; /* must rely on user to tell us this... */
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
attr_offsets = (int *) palloc(num_phys_attrs * sizeof(int));
/* Set up callback to identify error line number */
errcontext.callback = copy_in_error_callback;
errcontext.arg = (void *) cstate;
errcontext.previous = error_context_stack;
error_context_stack = &errcontext;
cstate->err_loc_type = ROWNUM_ORIGINAL;
CopyInitDataParser(cstate);
do
{
size_t bytesread = 0;
/* read a chunk of data into the buffer */
PG_TRY();
{
bytesread = CopyGetData(cstate, cstate->raw_buf, RAW_BUF_SIZE);
}
PG_CATCH();
{
/*
* If we are here, we got some kind of communication error
* with the client or a bad protocol message. clean up and
* re-throw error. Note that we don't handle this error in
* any special way in SREH mode as it's not a data error.
*/
cdbCopyEnd(cdbCopy);
PG_RE_THROW();
}
PG_END_TRY();
cstate->raw_buf_done = false;
/* set buffer pointers to beginning of the buffer */
cstate->begloc = cstate->raw_buf;
cstate->raw_buf_index = 0;
/*
* continue if some bytes were read or if we didn't reach EOF. if we
* both reached EOF _and_ no bytes were read, quit the loop we are
* done
*/
if (bytesread > 0 || !cstate->fe_eof)
{
/* on first time around just throw the header line away */
if (cstate->header_line)
{
PG_TRY();
{
cstate->line_done = cstate->csv_mode ?
CopyReadLineCSV(cstate, bytesread) :
CopyReadLineText(cstate, bytesread);
}
PG_CATCH();
{
/*
* TODO: use COPY_HANDLE_ERROR here, but make sure to
* ignore this error per the "note:" below.
*/
/*
* got here? encoding conversion error occured on the
* header line (first row).
*/
if(cstate->errMode == ALL_OR_NOTHING)
{
/* re-throw error and abort */
cdbCopyEnd(cdbCopy);
PG_RE_THROW();
}
else
{
/* SREH - release error state */
if(!elog_dismiss(DEBUG5))
PG_RE_THROW(); /* hope to never get here! */
/*
* note: we don't bother doing anything special here.
* we are never interested in logging a header line
* error. just continue the workflow.
*/
}
}
PG_END_TRY();
cstate->cur_lineno++;
RESET_LINEBUF;
cstate->header_line = false;
}
while (!cstate->raw_buf_done)
{
Oid loaded_oid = InvalidOid;
GpPolicy *part_policy = NULL; /* policy for specific part */
AttrNumber part_p_nattrs = 0; /* partition policy max attno */
CdbHash *part_hash = NULL; /* hash for the part policy */
if (QueryCancelPending)
{
/* quit processing loop */
no_more_data = true;
break;
}
/* Reset the per-tuple exprcontext */
ResetPerTupleExprContext(estate);
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
MemSet(attr_offsets, 0, num_phys_attrs * sizeof(int));
/* Get the line number of the first line of this data row */
original_lineno_for_qe = cstate->cur_lineno + 1;
PG_TRY();
{
/* Actually read the line into memory here */
cstate->line_done = cstate->csv_mode ?
CopyReadLineCSV(cstate, bytesread) :
CopyReadLineText(cstate, bytesread);
}
PG_CATCH();
{
/* got here? encoding conversion/check error occurred */
COPY_HANDLE_ERROR;
}
PG_END_TRY();
if(cur_row_rejected)
{
IF_REJECT_LIMIT_REACHED_ABORT;
QD_GOTO_NEXT_ROW;
}
if(!cstate->line_done)
{
/*
* We did not finish reading a complete data line.
*
* If eof is not yet reached, we skip att parsing
* and read more data. But if eof _was_ reached it means
* that the original last data line is defective and
* we want to catch that error later on.
*/
if (!cstate->fe_eof || cstate->end_marker)
break;
}
if (file_has_oids)
{
char *oid_string;
/* can't be in CSV mode here */
oid_string = CopyReadOidAttr(cstate, &isnull);
if (isnull)
{
/* got here? null in OID column error */
if(cstate->errMode == ALL_OR_NOTHING)
{
/* report error and abort */
cdbCopyEnd(cdbCopy);
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("null OID in COPY data.")));
}
else
{
/* SREH */
cstate->cdbsreh->rejectcount++;
cur_row_rejected = true;
}
}
else
{
PG_TRY();
{
cstate->cur_attname = "oid";
loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
CStringGetDatum(oid_string)));
}
PG_CATCH();
{
/* got here? oid column conversion failed */
COPY_HANDLE_ERROR;
}
PG_END_TRY();
if (loaded_oid == InvalidOid)
{
if(cstate->errMode == ALL_OR_NOTHING)
{
/* report error and abort */
cdbCopyEnd(cdbCopy);
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data.")));
}
else
{
/* SREH */
cstate->cdbsreh->rejectcount++;
cur_row_rejected = true;
}
}
cstate->cur_attname = NULL;
}
if(cur_row_rejected)
{
IF_REJECT_LIMIT_REACHED_ABORT
QD_GOTO_NEXT_ROW;
}
}
PG_TRY();
{
/*
* parse and convert the data line attributes.
*/
if (cstate->csv_mode)
CopyReadAttributesCSV(cstate, nulls, attr_offsets, num_phys_attrs, attr);
else
CopyReadAttributesText(cstate, nulls, attr_offsets, num_phys_attrs, attr);
attr_get_key(cstate, cdbCopy,
original_lineno_for_qe,
target_seg,
p_nattrs, policy->attrs,
attr, attr_offsets, nulls,
in_functions, typioparams,
values);
/*
* Now compute defaults for only:
* 1 - the distribution column,
* 2 - any other column with a non-constant default expression
* (such as a function) that is, of course, if these columns
* not provided by the input data.
* Anything not processed here or above will remain NULL.
*/
for (i = 0; i < num_defaults; i++)
{
bool compute_default = false;
/* check 1: is this default for a distribution column? */
for (p_index = 0; p_index < p_nattrs; p_index++)
{
h_attnum = policy->attrs[p_index];
if(h_attnum - 1 == defmap[i])
compute_default = true;
}
/* check 2: is this a default function? (non-constant default) */
if(defexprs[i]->expr->type != T_Const)
compute_default = true;
if(compute_default)
{
char *string;
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
&isnull, NULL);
/*
* prepare to concatinate next value:
* remove eol characters from end of line buf
*/
truncateEol(&cstate->line_buf, cstate->eol_type);
if (isnull)
{
appendStringInfo(&cstate->line_buf, "%c%s", cstate->delim[0], cstate->null_print);
}
else
{
nulls[defmap[i]] = false;
appendStringInfo(&cstate->line_buf, "%c", cstate->delim[0]); /* write the delimiter */
string = DatumGetCString(FunctionCall3(&out_functions[defmap[i]],
values[defmap[i]],
ObjectIdGetDatum(typioparams[defmap[i]]),
Int32GetDatum(attr[defmap[i]]->atttypmod)));
if (cstate->csv_mode)
{
CopyAttributeOutCSV(cstate, string,
false, /*force_quote[attnum - 1],*/
list_length(cstate->attnumlist) == 1);
}
else
CopyAttributeOutText(cstate, string);
}
/* re-add the eol characters */
concatenateEol(cstate);
}
}
/* lock partition */
if (estate->es_result_partitions)
{
PartitionNode *n = estate->es_result_partitions;
MemoryContext cxt_save;
/* lazily initialised */
if (part_values == NULL)
{
List *pattnums = get_partition_attrs(n);
ListCell *lc;
int ii = 0;
cxt_save = MemoryContextSwitchTo(oldcontext);
part_values = palloc0(num_phys_attrs * sizeof(Datum));
part_attr_types = palloc(num_phys_attrs * sizeof(Oid));
part_typio = palloc(num_phys_attrs * sizeof(Oid));
part_infuncs =
palloc(num_phys_attrs * sizeof(FmgrInfo));
part_attnum = palloc(num_phys_attrs *
sizeof(AttrNumber));
part_attnums = list_length(pattnums);
MemoryContextSwitchTo(cxt_save);
foreach(lc, pattnums)
{
AttrNumber attnum = (AttrNumber)lfirst_int(lc);
Oid in_func_oid;
getTypeInputInfo(attr[attnum - 1]->atttypid,
&in_func_oid,
&part_typio[attnum - 1]);
fmgr_info(in_func_oid, &part_infuncs[attnum - 1]);
part_attnum[ii++] = attnum;
}
}
MemSet(part_values, 0, num_phys_attrs * sizeof(Datum));
attr_get_key(cstate, cdbCopy,
original_lineno_for_qe,
target_seg,
part_attnums,
part_attnum,
attr, attr_offsets, nulls,
part_infuncs, part_typio,
part_values);
/* values_get_partition() calls palloc() */
cxt_save = MemoryContextSwitchTo(oldcontext);
resultRelInfo = values_get_partition(part_values,
nulls,
tupDesc, estate);
MemoryContextSwitchTo(cxt_save);
/*
* If we a partition set with differing policies,
* get the policy for this particular child partition.
*/
if (hashmap)
{
bool found;
cdbhashdata *d;
Oid relid = resultRelInfo->ri_RelationDesc->rd_id;
d = hash_search(hashmap, &(relid), HASH_ENTER,
&found);
if (found)
{
part_policy = d->policy;
part_p_nattrs = part_policy->nattrs;
part_hash = d->cdbHash;
}
else
{
Relation rel = heap_open(relid, NoLock);
MemoryContext save_cxt;
/*
* Make sure this all persists the current
* iteration.
*/
save_cxt = MemoryContextSwitchTo(oldcontext);
d->relid = relid;
part_hash = d->cdbHash =
makeCdbHash(cdbCopy->partition_num, HASH_FNV_1);
part_policy = d->policy =
GpPolicyCopy(oldcontext,
rel->rd_cdbpolicy);
part_p_nattrs = part_policy->nattrs;
heap_close(rel, NoLock);
MemoryContextSwitchTo(save_cxt);
}
}
}
/*
* The the usual case or a partitioned table
* with non-divergent child table policies.
*/
if (!part_hash)
{
part_hash = cdbHash;
part_policy = policy;
part_p_nattrs = p_nattrs;
}
/*
* policy should be PARTITIONED (normal tables) or
* ENTRY
*/
if (!part_policy || part_policy->ptype == POLICYTYPE_UNDEFINED)
{
elog(FATAL, "Bad or undefined policy. (%p)", part_policy);
}
}
PG_CATCH();
{
COPY_HANDLE_ERROR;
}
PG_END_TRY();
if(cur_row_rejected)
{
IF_REJECT_LIMIT_REACHED_ABORT;
QD_GOTO_NEXT_ROW;
}
/*
* At this point in the code, values[x] is final for this
* data row -- either the input data, a null or a default
* value is in there, and constraints applied.
*
* Perform a cdbhash on this data row. Perform a hash operation
* on each attribute that is included in CDB policy (partitioning
* key columns). Send COPY data line to the target segment
* database executors. Data row will not be inserted locally.
*/
Assert(PointerIsValid(part_hash));
cdbhashinit(part_hash);
for (i = 0; i < part_p_nattrs; i++)
{
/* current attno from the policy */
h_attnum = part_policy->attrs[i];
h_key = values[h_attnum - 1]; /* value of this attr */
if (!nulls[h_attnum - 1])
cdbhash(part_hash, h_key, p_attr_types[h_attnum - 1]);
else
cdbhashnull(part_hash);
}
/*
* If this is a relation with an empty policy, there is no
* hash key to use, therefore use cdbhashnokey() to pick a
* hash value for us.
*/
if (part_p_nattrs == 0)
cdbhashnokey(part_hash);
target_seg = cdbhashreduce(part_hash); /* hash result segment */
/*
* Send data row to all databases for this segment.
* Also send the original row number with the data.
* modify the data to look like:
* "<lineno>^<linebuf_converted>^<data>"
*/
appendStringInfo(&line_buf_with_lineno, "%d%c%d%c%s",
original_lineno_for_qe,
COPY_METADATA_DELIM,
cstate->line_buf_converted, \
COPY_METADATA_DELIM, \
cstate->line_buf.data);
/* send modified data */
cdbCopySendData(cdbCopy,
target_seg,
line_buf_with_lineno.data,
line_buf_with_lineno.len);
RESET_LINEBUF_WITH_LINENO;
cstate->processed++;
if (estate->es_result_partitions)
resultRelInfo->ri_aoprocessed++;
if (cdbCopy->io_errors)
{
appendBinaryStringInfo(&cdbcopy_err, cdbCopy->err_msg.data, cdbCopy->err_msg.len);
no_more_data = true;
break;
}
RESET_LINEBUF;
} /* end while(!raw_buf_done) */
} /* end if (bytesread > 0 || !cstate->fe_eof) */
else
/* no bytes read, end of data */
{
no_more_data = true;
}
} while (!no_more_data);
/* Free p_attr_types */
pfree(p_attr_types);
/*
* Done reading input data and sending it off to the segment
* databases Now we would like to end the copy command on
* all segment databases across the cluster.
*/
total_rejeted_from_qes = cdbCopyEnd(cdbCopy);
/*
* If we quit the processing loop earlier due to a
* cancel query signal, we now throw an error.
* (Safe to do only after cdbCopyEnd).
*/
CHECK_FOR_INTERRUPTS();
if (cdbCopy->remote_data_err || cdbCopy->io_errors)
appendBinaryStringInfo(&cdbcopy_err, cdbCopy->err_msg.data, cdbCopy->err_msg.len);
if (cdbCopy->remote_data_err)
{
cstate->error_on_executor = true;
if(cdbCopy->err_context.len > 0)
appendBinaryStringInfo(&cstate->executor_err_context, cdbCopy->err_context.data, cdbCopy->err_context.len);
}
/*
* report all accumulated errors back to the client. We get here if an error
* happened in all-or-nothing error handling mode or if reject limit was
* reached in single-row error handling mode.
*/
if (cdbCopy->remote_data_err)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("%s", cdbcopy_err.data),
errOmitLocation(true)));
if (cdbCopy->io_errors)
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("%s", cdbcopy_err.data),
errOmitLocation(true)));
/*
* switch back away from COPY error context callback. don't want line
* error information anymore
*/
error_context_stack = errcontext.previous;
/*
* If we got here it means that either all the data was loaded or some rows
* were rejected in SREH mode. In other words - nothing caused an abort.
* We now want to report the actual number of rows loaded and rejected.
* If any rows were rejected from the QE COPY processes subtract this number
* from the number of rows that were successfully processed on the QD COPY
* so that we can report the correct number.
*/
if(cstate->cdbsreh)
{
int total_rejected = 0;
int total_rejected_from_qd = cstate->cdbsreh->rejectcount;
/* if used errtable, QD bad rows were sent to QEs and counted there. ignore QD count */
if (cstate->cdbsreh->errtbl)
total_rejected_from_qd = 0;
total_rejected = total_rejected_from_qd + total_rejeted_from_qes;
cstate->processed -= total_rejected;
/* emit a NOTICE with number of rejected rows */
ReportSrehResults(cstate->cdbsreh, total_rejected);
/* See if we want to DROP error table when destroying cdbsreh */
if(cstate->cdbsreh->errtbl)
SetErrorTableVerdict(cstate->cdbsreh, total_rejected);
}
/*
* Done, clean up
*/
MemoryContextSwitchTo(oldcontext);
/*
* Execute AFTER STATEMENT insertion triggers
*/
//ExecASInsertTriggers(estate, resultRelInfo);
/*
* Handle queued AFTER triggers
*/
//AfterTriggerEndQuery(estate);
resultRelInfo = estate->es_result_relations;
for (i = estate->es_num_result_relations; i > 0; i--)
{
/* update AO tuple counts */
char relstorage = RelinfoGetStorage(resultRelInfo);
if (relstorage_is_ao(relstorage))
{
if (cdbCopy->aotupcounts)
{
HTAB *ht = cdbCopy->aotupcounts;
struct {
Oid relid;
int64 tupcount;
} *ao;
bool found;
Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
ao = hash_search(ht, &relid, HASH_FIND, &found);
if (found)
{
/* find out which segnos the result rels in the QE's used */
ResultRelInfoSetSegno(resultRelInfo, cstate->ao_segnos);
/* The total tuple count would be not necessary in HAWQ 2.0 */
/*
UpdateMasterAosegTotals(resultRelInfo->ri_RelationDesc,
resultRelInfo->ri_aosegno,
ao->tupcount);
*/
}
}
else
{
ResultRelInfoSetSegno(resultRelInfo, cstate->ao_segnos);
/* The total tuple count would be not necessary in HAWQ 2.0 */
/*
UpdateMasterAosegTotals(resultRelInfo->ri_RelationDesc,
resultRelInfo->ri_aosegno,
cstate->processed);
*/
}
}
/* Close indices and then the relation itself */
ExecCloseIndices(resultRelInfo);
heap_close(resultRelInfo->ri_RelationDesc, NoLock);
resultRelInfo++;
}
/*
* free all resources besides ones that are needed for error reporting
*/
if (cdbHash)
pfree(cdbHash);
pfree(values);
pfree(nulls);
pfree(attr_offsets);
pfree(in_functions);
pfree(out_functions);
pfree(isvarlena);
pfree(typioparams);
pfree(defmap);
pfree(defexprs);
pfree(cdbcopy_cmd.data);
pfree(cdbcopy_err.data);
pfree(line_buf_with_lineno.data);
pfree(cdbCopy);
if (policy)
pfree(policy);
/* free the hash table allocated by values_get_partition(), if any */
if(estate->es_result_partitions && estate->es_partition_state->result_partition_hash != NULL)
hash_destroy(estate->es_partition_state->result_partition_hash);
/*
* Don't worry about the partition table hash map, that will be
* freed when our current memory context is freed. And that will be
* quite soon.
*/
cstate->rel = NULL; /* closed above */
FreeExecutorState(estate);
}
/*
* Copy FROM file to relation.
*/
static void
CopyFrom(CopyState cstate)
{
void *tuple;
TupleDesc tupDesc;
ExternalInsertDesc extInsertDesc;
Form_pg_attribute *attr;
AttrNumber num_phys_attrs,
attr_count,
num_defaults;
FmgrInfo *in_functions;
Oid *typioparams;
int attnum;
int i;
Oid in_func_oid;
Datum *values;
bool *nulls;
bool isnull;
ResultRelInfo *resultRelInfo;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
TupleTableSlot *slot;
bool file_has_oids;
int *defmap;
ExprState **defexprs; /* array of default att expressions */
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
int *attr_offsets;
bool no_more_data = false;
ListCell *cur;
bool cur_row_rejected = false;
int original_lineno_for_qe = 0; /* keep compiler happy (var referenced by macro) */
CdbCopy *cdbCopy = NULL; /* never used... for compiling COPY_HANDLE_ERROR */
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
num_defaults = 0;
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of
* code here that basically duplicated execUtils.c ...)
*/
resultRelInfo = makeNode(ResultRelInfo);
resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
resultRelInfo->ri_RelationDesc = cstate->rel;
resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
if (resultRelInfo->ri_TrigDesc)
resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
resultRelInfo->ri_TrigInstrument = NULL;
ResultRelInfoSetSegno(resultRelInfo, cstate->ao_segnos);
ExecOpenIndices(resultRelInfo);
estate->es_result_relations = resultRelInfo;
estate->es_num_result_relations = 1;
estate->es_result_relation_info = resultRelInfo;
estate->es_result_partitions = cstate->partitions;
CopyInitPartitioningState(estate);
/* Set up a tuple slot too */
slot = MakeSingleTupleTableSlot(tupDesc);
econtext = GetPerTupleExprContext(estate);
/*
* Pick up the required catalog information for each attribute in the
* relation, including the input function, the element type (to pass
* to the input function), and info about defaults and constraints.
*/
in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
for (attnum = 1; attnum <= num_phys_attrs; attnum++)
{
/* We don't need info for dropped attributes */
if (attr[attnum - 1]->attisdropped)
continue;
/* Fetch the input function and typioparam info */
getTypeInputInfo(attr[attnum - 1]->atttypid,
&in_func_oid, &typioparams[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
/* Get default info if needed */
if (!list_member_int(cstate->attnumlist, attnum))
{
/* attribute is NOT to be copied from input */
/* use default value if one exists */
Node *defexpr = build_column_default(cstate->rel, attnum);
if (defexpr != NULL)
{
defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
estate);
defmap[num_defaults] = attnum - 1;
num_defaults++;
}
}
}
/*
* Prepare to catch AFTER triggers.
*/
AfterTriggerBeginQuery();
/*
* Check BEFORE STATEMENT insertion triggers. It's debateable whether
* we should do this for COPY, since it's not really an "INSERT"
* statement as such. However, executing these triggers maintains
* consistency with the EACH ROW triggers that we already fire on
* COPY.
*/
ExecBSInsertTriggers(estate, resultRelInfo);
file_has_oids = cstate->oids; /* must rely on user to tell us this... */
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
attr_offsets = (int *) palloc(num_phys_attrs * sizeof(int));
/* Set up callback to identify error line number */
errcontext.callback = copy_in_error_callback;
errcontext.arg = (void *) cstate;
errcontext.previous = error_context_stack;
error_context_stack = &errcontext;
if (Gp_role == GP_ROLE_EXECUTE)
cstate->err_loc_type = ROWNUM_EMBEDDED; /* get original row num from QD COPY */
else
cstate->err_loc_type = ROWNUM_ORIGINAL; /* we can count rows by ourselves */
CopyInitDataParser(cstate);
do
{
size_t bytesread = 0;
/* read a chunk of data into the buffer */
bytesread = CopyGetData(cstate, cstate->raw_buf, RAW_BUF_SIZE);
cstate->raw_buf_done = false;
/* set buffer pointers to beginning of the buffer */
cstate->begloc = cstate->raw_buf;
cstate->raw_buf_index = 0;
/*
* continue if some bytes were read or if we didn't reach EOF. if we
* both reached EOF _and_ no bytes were read, quit the loop we are
* done
*/
if (bytesread > 0 || !cstate->fe_eof)
{
/* handle HEADER, but only if we're in utility mode */
if (cstate->header_line)
{
cstate->line_done = cstate->csv_mode ?
CopyReadLineCSV(cstate, bytesread) :
CopyReadLineText(cstate, bytesread);
cstate->cur_lineno++;
cstate->header_line = false;
RESET_LINEBUF;
}
while (!cstate->raw_buf_done)
{
bool skip_tuple;
Oid loaded_oid = InvalidOid;
char relstorage;
CHECK_FOR_INTERRUPTS();
/* Reset the per-tuple exprcontext */
ResetPerTupleExprContext(estate);
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
/* reset attribute pointers */
MemSet(attr_offsets, 0, num_phys_attrs * sizeof(int));
PG_TRY();
{
/* Actually read the line into memory here */
cstate->line_done = cstate->csv_mode ?
CopyReadLineCSV(cstate, bytesread) :
CopyReadLineText(cstate, bytesread);
}
PG_CATCH();
{
/* got here? encoding conversion/check error occurred */
COPY_HANDLE_ERROR;
}
PG_END_TRY();
if(cur_row_rejected)
{
IF_REJECT_LIMIT_REACHED_ABORT;
QE_GOTO_NEXT_ROW;
}
if(!cstate->line_done)
{
/*
* We did not finish reading a complete date line
*
* If eof is not yet reached, we skip att parsing
* and read more data. But if eof _was_ reached it means
* that the original last data line is defective and
* we want to catch that error later on.
*/
if (!cstate->fe_eof || cstate->end_marker)
break;
}
if (file_has_oids)
{
char *oid_string;
/* can't be in CSV mode here */
oid_string = CopyReadOidAttr(cstate, &isnull);
if (isnull)
{
/* got here? null in OID column error */
if(cstate->errMode == ALL_OR_NOTHING)
{
/* report error and abort */
cdbCopyEnd(cdbCopy);
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("null OID in COPY data.")));
}
else
{
/* SREH */
cstate->cdbsreh->rejectcount++;
cur_row_rejected = true;
}
}
else
{
PG_TRY();
{
cstate->cur_attname = "oid";
loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
CStringGetDatum(oid_string)));
}
PG_CATCH();
{
/* got here? oid column conversion failed */
COPY_HANDLE_ERROR;
}
PG_END_TRY();
if (loaded_oid == InvalidOid)
{
if(cstate->errMode == ALL_OR_NOTHING)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data.")));
else /* SREH */
{
cstate->cdbsreh->rejectcount++;
cur_row_rejected = true;
}
}
cstate->cur_attname = NULL;
}
if(cur_row_rejected)
{
IF_REJECT_LIMIT_REACHED_ABORT;
QE_GOTO_NEXT_ROW;
}
}
PG_TRY();
{
if (cstate->csv_mode)
CopyReadAttributesCSV(cstate, nulls, attr_offsets, num_phys_attrs, attr);
else
CopyReadAttributesText(cstate, nulls, attr_offsets, num_phys_attrs, attr);
/*
* Loop to read the user attributes on the line.
*/
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
char *string;
string = cstate->attribute_buf.data + attr_offsets[m];
if (nulls[m])
isnull = true;
else
isnull = false;
if (cstate->csv_mode && isnull && cstate->force_notnull_flags[m])
{
string = cstate->null_print; /* set to NULL string */
isnull = false;
}
/* we read an SQL NULL, no need to do anything */
if (!isnull)
{
cstate->cur_attname = NameStr(attr[m]->attname);
values[m] = InputFunctionCall(&in_functions[m],
string,
typioparams[m],
attr[m]->atttypmod);
nulls[m] = false;
cstate->cur_attname = NULL;
}
}
/*
* Now compute and insert any defaults available for the columns
* not provided by the input data. Anything not processed here or
* above will remain NULL.
*/
for (i = 0; i < num_defaults; i++)
{
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
&isnull, NULL);
if (!isnull)
nulls[defmap[i]] = false;
}
}
PG_CATCH();
{
COPY_HANDLE_ERROR; /* SREH */
}
PG_END_TRY();
if(cur_row_rejected)
{
IF_REJECT_LIMIT_REACHED_ABORT;
QE_GOTO_NEXT_ROW;
}
/*
* We might create a ResultRelInfo which needs to persist
* the per tuple context.
*/
PG_TRY();
{
MemoryContextSwitchTo(oldcontext);
if (estate->es_result_partitions)
{
resultRelInfo = values_get_partition(values, nulls,
tupDesc, estate);
estate->es_result_relation_info = resultRelInfo;
}
}
PG_CATCH();
{
COPY_HANDLE_ERROR;
}
PG_END_TRY();
if (cur_row_rejected)
{
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
IF_REJECT_LIMIT_REACHED_ABORT;
QE_GOTO_NEXT_ROW;
}
relstorage = RelinfoGetStorage(resultRelInfo);
if (relstorage == RELSTORAGE_AOROWS &&
resultRelInfo->ri_aoInsertDesc == NULL)
{
ResultRelSegFileInfo *segfileinfo = NULL;
ResultRelInfoSetSegFileInfo(resultRelInfo, cstate->ao_segfileinfos);
segfileinfo = (ResultRelSegFileInfo *)list_nth(resultRelInfo->ri_aosegfileinfos, GetQEIndex());
resultRelInfo->ri_aoInsertDesc =
appendonly_insert_init(resultRelInfo->ri_RelationDesc,
segfileinfo);
}
else if (relstorage == RELSTORAGE_PARQUET &&
resultRelInfo->ri_parquetInsertDesc == NULL)
{
ResultRelSegFileInfo *segfileinfo = NULL;
ResultRelInfoSetSegFileInfo(resultRelInfo, cstate->ao_segfileinfos);
segfileinfo = (ResultRelSegFileInfo *)list_nth(resultRelInfo->ri_aosegfileinfos, GetQEIndex());
resultRelInfo->ri_parquetInsertDesc =
parquet_insert_init(resultRelInfo->ri_RelationDesc,
segfileinfo);
}
else if (relstorage == RELSTORAGE_EXTERNAL &&
resultRelInfo->ri_extInsertDesc == NULL)
{
Relation relation = resultRelInfo->ri_RelationDesc;
ExtTableEntry *extEntry = GetExtTableEntry(RelationGetRelid(relation));
ExternalTableType formatterType = ExternalTableType_Invalid;
char *formatterName = NULL;
getExternalTableTypeInStr(extEntry->fmtcode, extEntry->fmtopts,
&formatterType, &formatterName);
if (formatterType == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (formatterType != ExternalTableType_PLUG)
{
resultRelInfo->ri_extInsertDesc =
resultRelInfo->ri_extInsertDesc =
external_insert_init(resultRelInfo->ri_RelationDesc,
0, formatterType, formatterName, NULL);
}
else
{
Assert(formatterName);
Oid procOid = LookupPlugStorageValidatorFunc(formatterName,
"insert_init");
if (OidIsValid(procOid))
{
FmgrInfo *insertInitFunc = (FmgrInfo *)palloc(sizeof(FmgrInfo));
fmgr_info(procOid, insertInitFunc);
ResultRelSegFileInfo *segfileinfo = NULL;
ResultRelInfoSetSegFileInfo(resultRelInfo,
cstate->ao_segfileinfos);
segfileinfo = (ResultRelSegFileInfo *) list_nth(
resultRelInfo->ri_aosegfileinfos,
GetQEIndex());
resultRelInfo->ri_extInsertDesc =
InvokePlugStorageFormatInsertInit(insertInitFunc,
resultRelInfo->ri_RelationDesc,
formatterType,
formatterName,
NULL,
segfileinfo->segno);
pfree(insertInitFunc);
}
else
{
elog(ERROR, "%s_insert function was not found", formatterName);
}
}
}
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
/*
* And now we can form the input tuple.
*/
if (relstorage == RELSTORAGE_AOROWS)
{
/* form a mem tuple */
tuple = (MemTuple)
memtuple_form_to(resultRelInfo->ri_aoInsertDesc->mt_bind,
values, nulls,
NULL, NULL, false);
if (cstate->oids && file_has_oids)
MemTupleSetOid(tuple, resultRelInfo->ri_aoInsertDesc->mt_bind, loaded_oid);
}
else if ((relstorage == RELSTORAGE_PARQUET) ||
((relstorage == RELSTORAGE_EXTERNAL) &&
(resultRelInfo->ri_extInsertDesc->ext_formatter_type ==
ExternalTableType_PLUG)))
{
tuple = NULL;
}
else
{
/* form a regular heap tuple */
tuple = (HeapTuple) heap_form_tuple(tupDesc, values, nulls);
if (cstate->oids && file_has_oids)
HeapTupleSetOid((HeapTuple)tuple, loaded_oid);
}
/*
* Triggers and stuff need to be invoked in query context.
*/
MemoryContextSwitchTo(oldcontext);
/* Partitions don't support triggers yet */
Assert(!(estate->es_result_partitions &&
resultRelInfo->ri_TrigDesc));
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->n_before_row[TRIGGER_EVENT_INSERT] > 0)
{
HeapTuple newtuple;
if(relstorage == RELSTORAGE_PARQUET)
{
Assert(!tuple);
elog(ERROR, "triggers are not supported on tables that use column-oriented storage");
}
Assert(resultRelInfo->ri_TrigFunctions != NULL);
newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
if (newtuple == NULL) /* "do nothing" */
skip_tuple = true;
else if (newtuple != tuple) /* modified by Trigger(s) */
{
heap_freetuple(tuple);
tuple = newtuple;
}
}
if (!skip_tuple)
{
char relstorage = RelinfoGetStorage(resultRelInfo);
if ((relstorage == RELSTORAGE_PARQUET) ||
((relstorage == RELSTORAGE_EXTERNAL) &&
(resultRelInfo->ri_extInsertDesc->ext_formatter_type ==
ExternalTableType_PLUG)))
{
ExecClearTuple(slot);
slot->PRIVATE_tts_values = values;
slot->PRIVATE_tts_isnull = nulls;
ExecStoreVirtualTuple(slot);
}
else
{
/* Place tuple in tuple slot */
ExecStoreGenericTuple(tuple, slot, false);
}
/*
* Check the constraints of the tuple
*/
if (resultRelInfo->ri_RelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
/*
* OK, store the tuple and create index entries for it
*/
if (relstorage == RELSTORAGE_AOROWS)
{
Oid tupleOid;
AOTupleId aoTupleId;
/* inserting into an append only relation */
appendonly_insert(resultRelInfo->ri_aoInsertDesc, tuple, &tupleOid, &aoTupleId);
if (resultRelInfo->ri_NumIndices > 0)
ExecInsertIndexTuples(slot, (ItemPointer)&aoTupleId, estate, false);
}
else if (relstorage == RELSTORAGE_PARQUET)
{
AOTupleId aoTupleId;
parquet_insert_values(resultRelInfo->ri_parquetInsertDesc, values, nulls, &aoTupleId);
if (resultRelInfo->ri_NumIndices > 0)
ExecInsertIndexTuples(slot, (ItemPointer)&aoTupleId, estate, false);
}
else if (relstorage == RELSTORAGE_EXTERNAL)
{
extInsertDesc = resultRelInfo->ri_extInsertDesc;
if (extInsertDesc->ext_formatter_type == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (extInsertDesc->ext_formatter_type != ExternalTableType_PLUG)
{
external_insert(extInsertDesc, slot);
}
else
{
Assert(extInsertDesc->ext_formatter_name);
FmgrInfo *insertFunc =
extInsertDesc->ext_ps_insert_funcs.insert;
if (insertFunc)
{
InvokePlugStorageFormatInsert(insertFunc,
extInsertDesc,
slot);
}
else
{
elog(ERROR, "%s_insert function was not found",
extInsertDesc->ext_formatter_name);
}
}
}
else
{
simple_heap_insert(resultRelInfo->ri_RelationDesc, tuple);
if (resultRelInfo->ri_NumIndices > 0)
ExecInsertIndexTuples(slot, &(((HeapTuple)tuple)->t_self), estate, false);
}
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple);
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
* this is the same definition used by execMain.c for counting
* tuples inserted by an INSERT command.
*
* MPP: incrementing this counter here only matters for utility
* mode. in dispatch mode only the dispatcher COPY collects row
* count, so this counter is meaningless.
*/
cstate->processed++;
if (relstorage_is_ao(relstorage))
resultRelInfo->ri_aoprocessed++;
}
RESET_LINEBUF;
} /* end while(!raw_buf_done) */
} /* end if (bytesread > 0 || !cstate->fe_eof) */
else
/* no bytes read, end of data */
{
no_more_data = true;
}
} while (!no_more_data);
/*
* Done, clean up
*/
error_context_stack = errcontext.previous;
MemoryContextSwitchTo(oldcontext);
/*
* Execute AFTER STATEMENT insertion triggers
*/
ExecASInsertTriggers(estate, resultRelInfo);
/*
* Handle queued AFTER triggers
*/
AfterTriggerEndQuery(estate);
/*
* If SREH and in executor mode send the number of rejected
* rows to the client (QD COPY).
*/
if(cstate->errMode != ALL_OR_NOTHING && Gp_role == GP_ROLE_EXECUTE)
SendNumRowsRejected(cstate->cdbsreh->rejectcount);
if (estate->es_result_partitions && Gp_role == GP_ROLE_EXECUTE)
SendAOTupCounts(estate);
/* free the hash table allocated by values_get_partition(), if any */
if(estate->es_result_partitions && estate->es_partition_state->result_partition_hash != NULL)
hash_destroy(estate->es_partition_state->result_partition_hash);
pfree(attr_offsets);
pfree(in_functions);
pfree(typioparams);
pfree(defmap);
pfree(defexprs);
ExecDropSingleTupleTableSlot(slot);
StringInfo buf = NULL;
int aocount = 0;
resultRelInfo = estate->es_result_relations;
for (i = 0; i < estate->es_num_result_relations; i++)
{
if (resultRelInfo->ri_aoInsertDesc)
++aocount;
if (resultRelInfo->ri_parquetInsertDesc)
++aocount;
resultRelInfo++;
}
if (Gp_role == GP_ROLE_EXECUTE && aocount > 0)
buf = PreSendbackChangedCatalog(aocount);
/*
* Finalize appends and close relations we opened.
*/
resultRelInfo = estate->es_result_relations;
for (i = estate->es_num_result_relations; i > 0; i--)
{
QueryContextDispatchingSendBack sendback = NULL;
if (resultRelInfo->ri_aoInsertDesc)
{
sendback = CreateQueryContextDispatchingSendBack(1);
resultRelInfo->ri_aoInsertDesc->sendback = sendback;
sendback->relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
appendonly_insert_finish(resultRelInfo->ri_aoInsertDesc);
}
if (resultRelInfo->ri_parquetInsertDesc)
{
sendback = CreateQueryContextDispatchingSendBack(
resultRelInfo->ri_parquetInsertDesc->parquet_rel->rd_att->natts);
resultRelInfo->ri_parquetInsertDesc->sendback = sendback;
sendback->relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
parquet_insert_finish(resultRelInfo->ri_parquetInsertDesc);
}
if (resultRelInfo->ri_extInsertDesc)
{
ExternalInsertDesc extInsertDesc = resultRelInfo->ri_extInsertDesc;
if (extInsertDesc->ext_formatter_type == ExternalTableType_Invalid)
{
elog(ERROR, "invalid formatter type for external table: %s", __func__);
}
else if (extInsertDesc->ext_formatter_type != ExternalTableType_PLUG)
{
external_insert_finish(extInsertDesc);
}
else
{
FmgrInfo *insertFinishFunc =
extInsertDesc->ext_ps_insert_funcs.insert_finish;
if (insertFinishFunc)
{
InvokePlugStorageFormatInsertFinish(insertFinishFunc,
extInsertDesc);
}
else
{
elog(ERROR, "%s_insert_finish function was not found",
extInsertDesc->ext_formatter_name);
}
}
}
if (sendback && relstorage_is_ao(RelinfoGetStorage(resultRelInfo)) && Gp_role == GP_ROLE_EXECUTE)
AddSendbackChangedCatalogContent(buf, sendback);
DropQueryContextDispatchingSendBack(sendback);
/* Close indices and then the relation itself */
ExecCloseIndices(resultRelInfo);
heap_close(resultRelInfo->ri_RelationDesc, NoLock);
resultRelInfo++;
}
if (Gp_role == GP_ROLE_EXECUTE && aocount > 0)
FinishSendbackChangedCatalog(buf);
cstate->rel = NULL; /* closed above */
FreeExecutorState(estate);
}
/*
* Finds the next TEXT line that is in the input buffer and loads
* it into line_buf. Returns an indication if the line that was read
* is complete (if an unescaped line-end was encountered). If we
* reached the end of buffer before the whole line was written into the
* line buffer then returns false.
*/
bool
CopyReadLineText(CopyState cstate, size_t bytesread)
{
int linesize;
char escapec = '\0';
/* mark that encoding conversion hasn't occurred yet */
cstate->line_buf_converted = false;
/*
* set the escape char for text format ('\\' by default).
*/
escapec = cstate->escape[0];
/*
* Detect end of line type if not already detected.
*/
if (cstate->eol_type == EOL_UNKNOWN)
{
cstate->quote = NULL;
if (!DetectLineEnd(cstate, bytesread))
{
/* load entire input buffer into line buf, and quit */
appendBinaryStringInfo(&cstate->line_buf, cstate->raw_buf, bytesread);
cstate->raw_buf_done = true;
cstate->line_done = CopyCheckIsLastLine(cstate);
if (cstate->line_done)
preProcessDataLine(cstate);
return cstate->line_done;
}
}
/*
* Special case: eol is CRNL, last byte of previous buffer was an
* unescaped CR and 1st byte of current buffer is NL. We check for
* that here.
*/
if (cstate->eol_type == EOL_CRLF)
{
/* if we started scanning from the 1st byte of the buffer */
if (cstate->begloc == cstate->raw_buf)
{
/* and had a CR in last byte of prev buf */
if (cstate->cr_in_prevbuf)
{
/*
* if this 1st byte in buffer is 2nd byte of line end sequence
* (linefeed)
*/
if (*(cstate->begloc) == cstate->eol_ch[1])
{
/*
* load that one linefeed byte and indicate we are done
* with the data line
*/
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, 1);
cstate->raw_buf_index++;
cstate->begloc++;
cstate->cr_in_prevbuf = false;
preProcessDataLine(cstate);
return true;
}
}
cstate->cr_in_prevbuf = false;
}
}
/*
* (we need a loop so that if eol_ch is found, but prev ch is backslash,
* we can search for the next eol_ch)
*/
while (true)
{
/* reached end of buffer */
if ((cstate->endloc = scanTextLine(cstate, cstate->begloc, cstate->eol_ch[0], bytesread - cstate->raw_buf_index)) == NULL)
{
linesize = bytesread - (cstate->begloc - cstate->raw_buf);
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, linesize);
if (cstate->eol_type == EOL_CRLF && cstate->line_buf.len > 1)
{
char *last_ch = cstate->line_buf.data + cstate->line_buf.len - 1; /* before terminating \0 */
if (*last_ch == '\r')
cstate->cr_in_prevbuf = true;
}
cstate->line_done = CopyCheckIsLastLine(cstate);
cstate->raw_buf_done = true;
break;
}
else
/* found the 1st eol ch in raw_buf. */
{
bool eol_found = true;
/*
* Load that piece of data (potentially a data line) into the line buffer,
* and update the pointers for the next scan.
*/
linesize = cstate->endloc - cstate->begloc + 1;
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, linesize);
cstate->raw_buf_index += linesize;
cstate->begloc = cstate->endloc + 1;
if (cstate->eol_type == EOL_CRLF)
{
/* check if there is a '\n' after the '\r' */
if (*(cstate->endloc + 1) == '\n')
{
/* this is a line end */
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, 1); /* load that '\n' */
cstate->raw_buf_index++;
cstate->begloc++;
}
else
/* just a CR, not a line end */
eol_found = false;
}
/*
* in some cases, this end of line char happens to be the
* last character in the buffer. we need to catch that.
*/
if (cstate->raw_buf_index >= bytesread)
cstate->raw_buf_done = true;
/*
* if eol was found, and it isn't escaped, line is done
*/
if (eol_found)
{
cstate->line_done = true;
break;
}
else
{
/* stay in the loop and process some more data. */
cstate->line_done = false;
if (eol_found)
cstate->cur_lineno++; /* increase line index for error
* reporting */
}
} /* end of found eol_ch */
}
/* Done reading a complete line. Do pre processing of the raw input data */
if (cstate->line_done)
preProcessDataLine(cstate);
/*
* check if this line is an end marker -- "\."
*/
cstate->end_marker = false;
switch (cstate->eol_type)
{
case EOL_LF:
if (!strcmp(cstate->line_buf.data, "\\.\n"))
cstate->end_marker = true;
break;
case EOL_CR:
if (!strcmp(cstate->line_buf.data, "\\.\r"))
cstate->end_marker = true;
break;
case EOL_CRLF:
if (!strcmp(cstate->line_buf.data, "\\.\r\n"))
cstate->end_marker = true;
break;
case EOL_UNKNOWN:
break;
}
if (cstate->end_marker)
{
/*
* Reached end marker. In protocol version 3 we
* should ignore anything after \. up to protocol
* end of copy data.
*/
if (cstate->copy_dest == COPY_NEW_FE)
{
while (!cstate->fe_eof)
{
CopyGetData(cstate, cstate->raw_buf, RAW_BUF_SIZE); /* eat data */
}
}
cstate->fe_eof = true;
/* we don't want to process a \. as data line, want to quit. */
cstate->line_done = false;
cstate->raw_buf_done = true;
}
return cstate->line_done;
}
/*
* Finds the next CSV line that is in the input buffer and loads
* it into line_buf. Returns an indication if the line that was read
* is complete (if an unescaped line-end was encountered). If we
* reached the end of buffer before the whole line was written into the
* line buffer then returns false.
*/
bool
CopyReadLineCSV(CopyState cstate, size_t bytesread)
{
int linesize;
char quotec = '\0',
escapec = '\0';
bool csv_is_invalid = false;
/* mark that encoding conversion hasn't occurred yet */
cstate->line_buf_converted = false;
escapec = cstate->escape[0];
quotec = cstate->quote[0];
/* ignore special escape processing if it's the same as quotec */
if (quotec == escapec)
escapec = '\0';
/*
* Detect end of line type if not already detected.
*/
if (cstate->eol_type == EOL_UNKNOWN)
{
if (!DetectLineEnd(cstate, bytesread))
{
/* EOL not found. load entire input buffer into line buf, and return */
appendBinaryStringInfo(&cstate->line_buf, cstate->raw_buf, bytesread);
cstate->line_done = CopyCheckIsLastLine(cstate);;
cstate->raw_buf_done = true;
if (cstate->line_done)
preProcessDataLine(cstate);
return cstate->line_done;
}
}
/*
* Special case: eol is CRNL, last byte of previous buffer was an
* unescaped CR and 1st byte of current buffer is NL. We check for
* that here.
*/
if (cstate->eol_type == EOL_CRLF)
{
/* if we started scanning from the 1st byte of the buffer */
if (cstate->begloc == cstate->raw_buf)
{
/* and had a CR in last byte of prev buf */
if (cstate->cr_in_prevbuf)
{
/*
* if this 1st byte in buffer is 2nd byte of line end sequence
* (linefeed)
*/
if (*(cstate->begloc) == cstate->eol_ch[1])
{
/*
* load that one linefeed byte and indicate we are done
* with the data line
*/
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, 1);
cstate->raw_buf_index++;
cstate->begloc++;
cstate->line_done = true;
preProcessDataLine(cstate);
cstate->cr_in_prevbuf = false;
return true;
}
}
cstate->cr_in_prevbuf = false;
}
}
/*
* (we need a loop so that if eol_ch is found, but we are in quotes,
* we can search for the next eol_ch)
*/
while (true)
{
/* reached end of buffer */
if ((cstate->endloc = scanCSVLine(cstate, cstate->begloc, cstate->eol_ch[0], escapec, quotec, bytesread - cstate->raw_buf_index)) == NULL)
{
linesize = bytesread - (cstate->begloc - cstate->raw_buf);
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, linesize);
if (cstate->line_buf.len > 1)
{
char *last_ch = cstate->line_buf.data + cstate->line_buf.len - 1; /* before terminating \0 */
if (*last_ch == '\r')
{
if (cstate->eol_type == EOL_CRLF)
cstate->cr_in_prevbuf = true;
}
}
cstate->line_done = CopyCheckIsLastLine(cstate);
cstate->raw_buf_done = true;
break;
}
else
/* found 1st eol char in raw_buf. */
{
bool eol_found = true;
/*
* Load that piece of data (potentially a data line) into the line buffer,
* and update the pointers for the next scan.
*/
linesize = cstate->endloc - cstate->begloc + 1;
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, linesize);
cstate->raw_buf_index += linesize;
cstate->begloc = cstate->endloc + 1;
/* end of line only if not in quotes */
if (cstate->in_quote)
{
/* buf done, but still in quote */
if (cstate->raw_buf_index >= bytesread)
cstate->raw_buf_done = true;
cstate->line_done = false;
/* update file line for error message */
/*
* TODO: for dos line end we need to do check before
* incrementing!
*/
cstate->cur_lineno++;
/*
* If we are still in quotes and linebuf len is extremely large
* then this file has bad csv and we have to stop the rolling
* snowball from getting bigger.
*/
if(cstate->line_buf.len >= gp_max_csv_line_length)
{
csv_is_invalid = true;
cstate->in_quote = false;
cstate->line_done = true;
cstate->num_consec_csv_err++;
break;
}
if (cstate->raw_buf_done)
break;
}
else
{
/* if dos eol, check for '\n' after the '\r' */
if (cstate->eol_type == EOL_CRLF)
{
if (*(cstate->endloc + 1) == '\n')
{
/* this is a line end */
appendBinaryStringInfo(&cstate->line_buf, cstate->begloc, 1); /* load that '\n' */
cstate->raw_buf_index++;
cstate->begloc++;
}
else
/* just a CR, not a line end */
eol_found = false;
}
/*
* in some cases, this end of line char happens to be the
* last character in the buffer. we need to catch that.
*/
if (cstate->raw_buf_index >= bytesread)
cstate->raw_buf_done = true;
/*
* if eol was found line is done
*/
if (eol_found)
{
cstate->line_done = true;
break;
}
}
} /* end of found eol_ch */
}
/* Done reading a complete line. Do pre processing of the raw input data */
if (cstate->line_done)
preProcessDataLine(cstate);
/*
* We have a corrupted csv format case. It is already converted to server
* encoding, *which is necessary*. Ok, we can report an error now.
*/
if(csv_is_invalid)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("data line too long. likely due to invalid csv data")));
else
cstate->num_consec_csv_err = 0; /* reset consecutive count */
/*
* check if this line is an end marker -- "\."
*/
cstate->end_marker = false;
switch (cstate->eol_type)
{
case EOL_LF:
if (!strcmp(cstate->line_buf.data, "\\.\n"))
cstate->end_marker = true;
break;
case EOL_CR:
if (!strcmp(cstate->line_buf.data, "\\.\r"))
cstate->end_marker = true;
break;
case EOL_CRLF:
if (!strcmp(cstate->line_buf.data, "\\.\r\n"))
cstate->end_marker = true;
break;
case EOL_UNKNOWN:
break;
}
if (cstate->end_marker)
{
/*
* Reached end marker. In protocol version 3 we
* should ignore anything after \. up to protocol
* end of copy data.
*/
if (cstate->copy_dest == COPY_NEW_FE)
{
while (!cstate->fe_eof)
{
CopyGetData(cstate, cstate->raw_buf, RAW_BUF_SIZE); /* eat data */
}
}
cstate->fe_eof = true;
/* we don't want to process a \. as data line, want to quit. */
cstate->line_done = false;
cstate->raw_buf_done = true;
}
return cstate->line_done;
}
/*
* Detected the eol type by looking at the first data row.
* Possible eol types are NL, CR, or CRNL. If eol type was
* detected, it is set and a boolean true is returned to
* indicated detection was successful. If the first data row
* is longer than the input buffer, we return false and will
* try again in the next buffer.
*/
static bool
DetectLineEnd(CopyState cstate, size_t bytesread __attribute__((unused)))
{
int index = 0;
int lineno = 0;
char c;
char quotec = '\0',
escapec = '\0';
bool csv = false;
/*
* CSV special case. See MPP-7819.
*
* this functions may change the in_quote value while processing.
* this is ok as we need to keep state in case we don't find EOL
* in this buffer and need to be called again to continue searching.
* BUT if EOL *was* found we must reset to the state we had since
* we are about to reprocess this buffer again in CopyReadLineCSV
* from the same starting point as we are in right now.
*/
bool save_inquote = cstate->in_quote;
bool save_lastwas = cstate->last_was_esc;
/* if user specified NEWLINE we should never be here */
Assert(!cstate->eol_str);
if (cstate->quote) /* CSV format */
{
csv = true;
quotec = cstate->quote[0];
escapec = cstate->escape[0];
/* ignore special escape processing if it's the same as quotec */
if (quotec == escapec)
escapec = '\0';
}
while (index < RAW_BUF_SIZE)
{
c = cstate->raw_buf[index];
if (csv)
{
if (cstate->in_quote && c == escapec)
cstate->last_was_esc = !cstate->last_was_esc;
if (c == quotec && !cstate->last_was_esc)
cstate->in_quote = !cstate->in_quote;
if (c != escapec)
cstate->last_was_esc = false;
}
if (c == '\n')
{
lineno++;
if (!csv || (csv && !cstate->in_quote))
{
cstate->eol_type = EOL_LF;
cstate->eol_ch[0] = '\n';
cstate->eol_ch[1] = '\0';
cstate->in_quote = save_inquote; /* see comment at declaration */
cstate->last_was_esc = save_lastwas;
return true;
}
else if(csv && cstate->in_quote && cstate->line_buf.len + index >= gp_max_csv_line_length)
{
/* we do a "line too long" CSV check for the first row as well (MPP-7869) */
cstate->in_quote = false;
cstate->line_done = true;
cstate->num_consec_csv_err++;
cstate->cur_lineno += lineno;
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("data line too long. likely due to invalid csv data")));
}
}
if (c == '\r')
{
lineno++;
if (!csv || (csv && !cstate->in_quote))
{
if (cstate->raw_buf[index + 1] == '\n') /* always safe */
{
cstate->eol_type = EOL_CRLF;
cstate->eol_ch[0] = '\r';
cstate->eol_ch[1] = '\n';
}
else
{
cstate->eol_type = EOL_CR;
cstate->eol_ch[0] = '\r';
cstate->eol_ch[1] = '\0';
}
cstate->in_quote = save_inquote; /* see comment at declaration */
cstate->last_was_esc = save_lastwas;
return true;
}
}
index++;
}
/* since we're yet to find the EOL this buffer will never be
* re-processed so add the number of rows we found so we don't lose it */
cstate->cur_lineno += lineno;
return false;
}
/*
* Return decimal value for a hexadecimal digit
*/
static
int GetDecimalFromHex(char hex)
{
if (isdigit((unsigned char)hex))
return hex - '0';
else
return tolower((unsigned char)hex) - 'a' + 10;
}
/*
* Read all TEXT attributes. Attributes are parsed from line_buf and
* inserted (all at once) to attribute_buf, while saving pointers to
* each attribute's starting position.
*
* When this routine finishes execution both the nulls array and
* the attr_offsets array are updated. The attr_offsets will include
* the offset from the beginning of the attribute array of which
* each attribute begins. If a specific attribute is not used for this
* COPY command (ommitted from the column list), a value of 0 will be assigned.
* For example: for table foo(a,b,c,d,e) and COPY foo(a,b,e)
* attr_offsets may look something like this after this routine
* returns: [0,20,0,0,55]. That means that column "a" value starts
* at byte offset 0, "b" in 20 and "e" in 55, in attribute_buf.
*
* In the attribute buffer (attribute_buf) each attribute
* is terminated with a '\0', and therefore by using the attr_offsets
* array we could point to a beginning of an attribute and have it
* behave as a C string, much like previously done in COPY.
*
* Another aspect to improving performance is reducing the frequency
* of data load into buffers. The original COPY read attribute code
* loaded a character at a time. In here we try to load a chunk of data
* at a time. Usually a chunk will include a full data row
* (unless we have an escaped delim). That effectively reduces the number of
* loads by a factor of number of bytes per row. This improves performance
* greatly, unfortunately it add more complexity to the code.
*
* Global participants in parsing logic:
*
* line_buf.cursor -- an offset from beginning of the line buffer
* that indicates where we are about to begin the next scan. Note that
* if we have WITH OIDS or if we ran CopyExtractRowMetaData this cursor is
* already shifted and is not in the beginning of line buf anymore.
*
* attribute_buf.cursor -- an offset from the beginning of the
* attribute buffer that indicates where the current attribute begins.
*/
static inline uint64 uint64_has_nullbyte(uint64 w) {
return ((w - 0x0101010101010101ull) & ~w & 0x8080808080808080ull); }
static inline uint64 uint64_has_byte(uint64 w, unsigned char b) {
w ^= b * 0x0101010101010101ull;
return ((w - 0x0101010101010101ull) & ~w & 0x8080808080808080ull);}
void
CopyReadAttributesText(CopyState cstate, bool * __restrict nulls,
int * __restrict attr_offsets, int num_phys_attrs, Form_pg_attribute * __restrict attr)
{
char delimc = cstate->delim[0]; /* delimiter character */
char escapec = cstate->escape[0]; /* escape character */
char *scan_start; /* pointer to line buffer for scan start. */
char *scan_end; /* pointer to line buffer where char was found */
char *stop;
char *scanner;
int attr_pre_len = 0;/* attr raw len, before processing escapes */
int attr_post_len = 0;/* current attr len after escaping */
int m; /* attribute index being parsed */
int bytes_remaining;/* num bytes remaining to be scanned in line
* buf */
int chunk_start; /* offset to beginning of line chunk to load */
int chunk_len = 0; /* length of chunk of data to load to attr buf */
int oct_val; /* byte value for octal escapes */
int hex_val;
int attnum = 0; /* attribute number being parsed */
int attribute = 1;
bool saw_high_bit = false;
ListCell *cur; /* cursor to attribute list used for this COPY */
/* init variables for attribute scan */
RESET_ATTRBUF;
/* cursor is now > 0 if we copy WITH OIDS */
scan_start = cstate->line_buf.data + cstate->line_buf.cursor;
chunk_start = cstate->line_buf.cursor;
cur = list_head(cstate->attnumlist);
/* check for zero column table case */
if(num_phys_attrs > 0)
{
attnum = lfirst_int(cur);
m = attnum - 1;
}
if (cstate->escape_off)
escapec = delimc; /* look only for delimiters, escapes are
* disabled */
/* have a single column only and no delim specified? take the fast track */
if (cstate->delimiter_off)
{
CopyReadAttributesTextNoDelim(cstate, nulls, num_phys_attrs,
attnum);
return;
}
/*
* Scan through the line buffer to read all attributes data
*/
while (cstate->line_buf.cursor < cstate->line_buf.len)
{
bytes_remaining = cstate->line_buf.len - cstate->line_buf.cursor;
stop = scan_start + bytes_remaining;
/*
* We can eliminate one test (for length) in the loop by replacing the
* last byte with the delimiter. We need to remember what it was so we
* can replace it later.
*/
char endchar = *(stop-1);
*(stop-1) = delimc;
/* Find the next of: delimiter, or escape, or end of buffer */
for (scanner = scan_start; *scanner != delimc && *scanner != escapec; scanner++)
;
if (scanner == (stop-1) && endchar != delimc)
{
if (endchar != escapec)
scanner++;
}
*(stop-1) = endchar;
scan_end = (*scanner != '\0' ? (char *) scanner : NULL);
if (scan_end == NULL)
{
/* GOT TO END OF LINE BUFFER */
if (cur == NULL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column"),
errOmitLocation(true)));
attnum = lfirst_int(cur);
m = attnum - 1;
/* don't count eol char(s) in attr and chunk len calculation */
if (cstate->eol_type == EOL_CRLF)
{
attr_pre_len += bytes_remaining - 2;
chunk_len = cstate->line_buf.len - chunk_start - 2;
}
else
{
attr_pre_len += bytes_remaining - 1;
chunk_len = cstate->line_buf.len - chunk_start - 1;
}
/* check if this is a NULL value or data value (assumed NULL) */
if (attr_pre_len == cstate->null_print_len
&&
strncmp(cstate->line_buf.data + cstate->line_buf.len - attr_pre_len - 1, cstate->null_print, attr_pre_len)
== 0)
nulls[m] = true;
else
nulls[m] = false;
attr_offsets[m] = cstate->attribute_buf.cursor;
/* load the last chunk, the whole buffer in most cases */
appendBinaryStringInfo(&cstate->attribute_buf, cstate->line_buf.data + chunk_start, chunk_len);
cstate->line_buf.cursor += attr_pre_len + 2; /* skip eol char and
* '\0' to exit loop */
/*
* line is done, but do we have more attributes to process?
*
* normally, remaining attributes that have no data means ERROR,
* however, with FILL MISSING FIELDS remaining attributes become
* NULL. since attrs are null by default we leave unchanged and
* avoid throwing an error, with the exception of empty data lines
* for multiple attributes, which we intentionally don't support.
*/
if (lnext(cur) != NULL)
{
if (!cstate->fill_missing)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(attr[lfirst_int(lnext(cur)) - 1]->attname)),
errOmitLocation(true)));
else if (attribute == 1 && attr_pre_len == 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\", found empty data line",
NameStr(attr[lfirst_int(lnext(cur)) - 1]->attname)),
errOmitLocation(true)));
}
}
else
/* FOUND A DELIMITER OR ESCAPE */
{
if (cur == NULL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column"),
errOmitLocation(true)));
if (*scan_end == delimc) /* found a delimiter */
{
attnum = lfirst_int(cur);
m = attnum - 1;
/* (we don't include the delimiter ch in length) */
attr_pre_len += scan_end - scan_start;
attr_post_len += scan_end - scan_start;
/* check if this is a null print or data (assumed NULL) */
if (attr_pre_len == cstate->null_print_len &&
strncmp(scan_end - attr_pre_len, cstate->null_print, attr_pre_len) == 0)
nulls[m] = true;
else
nulls[m] = false;
/* set the pointer to next attribute position */
attr_offsets[m] = cstate->attribute_buf.cursor;
/*
* update buffer cursors to our current location, +1 to skip
* the delimc
*/
cstate->line_buf.cursor = scan_end - cstate->line_buf.data + 1;
cstate->attribute_buf.cursor += attr_post_len + 1;
/* prepare scan for next attr */
scan_start = cstate->line_buf.data + cstate->line_buf.cursor;
cur = lnext(cur);
attr_pre_len = 0;
attr_post_len = 0;
/*
* for the dispatcher - stop parsing once we have
* all the hash field values. We don't need the rest.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
if (attribute == cstate->last_hash_field)
{
/*
* load the chunk from chunk_start to end of current
* attribute, not including delimiter
*/
chunk_len = cstate->line_buf.cursor - chunk_start - 1;
appendBinaryStringInfo(&cstate->attribute_buf, cstate->line_buf.data + chunk_start, chunk_len);
break;
}
}
attribute++;
}
else
/* found an escape character */
{
char nextc = *(scan_end + 1);
char newc;
int skip = 2;
chunk_len = (scan_end - cstate->line_buf.data) - chunk_start + 1;
/* load a chunk of data */
appendBinaryStringInfo(&cstate->attribute_buf, cstate->line_buf.data + chunk_start, chunk_len);
switch (nextc)
{
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
/* handle \013 */
oct_val = OCTVALUE(nextc);
nextc = *(scan_end + 2);
/*
* (no need for out bad access check since line if
* buffered)
*/
if (ISOCTAL(nextc))
{
skip++;
oct_val = (oct_val << 3) + OCTVALUE(nextc);
nextc = *(scan_end + 3);
if (ISOCTAL(nextc))
{
skip++;
oct_val = (oct_val << 3) + OCTVALUE(nextc);
}
}
newc = oct_val & 0377; /* the escaped byte value */
if (IS_HIGHBIT_SET(newc))
saw_high_bit = true;
break;
case 'x':
/* Handle \x3F */
hex_val = 0; /* init */
nextc = *(scan_end + 2); /* get char after 'x' */
if (isxdigit((unsigned char)nextc))
{
skip++;
hex_val = GetDecimalFromHex(nextc);
nextc = *(scan_end + 3); /* get second char */
if (isxdigit((unsigned char)nextc))
{
skip++;
hex_val = (hex_val << 4) + GetDecimalFromHex(nextc);
}
newc = hex_val & 0xff;
if (IS_HIGHBIT_SET(newc))
saw_high_bit = true;
}
else
{
newc = 'x';
}
break;
case 'b':
newc = '\b';
break;
case 'f':
newc = '\f';
break;
case 'n':
newc = '\n';
break;
case 'r':
newc = '\r';
break;
case 't':
newc = '\t';
break;
case 'v':
newc = '\v';
break;
default:
if (nextc == delimc)
newc = delimc;
else if (nextc == escapec)
newc = escapec;
else
{
/* no escape sequence found. it's a lone escape */
bool next_is_eol = ((nextc == '\n' && cstate->eol_type == EOL_LF) ||
(nextc == '\r' && (cstate->eol_type == EOL_CR ||
cstate->eol_type == EOL_CRLF)));
if(!next_is_eol)
{
/* take next char literally */
newc = nextc;
}
else
{
/* there isn't a next char (end of data in line). we keep the
* backslash as a literal character. We don't skip over the EOL,
* since we don't support escaping it anymore (unlike PG).
*/
newc = escapec;
skip--;
}
}
break;
}
/* update to current length, add escape and escaped chars */
attr_pre_len += scan_end - scan_start + 2;
/* update to current length, escaped char */
attr_post_len += scan_end - scan_start + 1;
/*
* Need to get rid of the escape character. This is done by
* loading the chunk up to including the escape character
* into the attribute buffer. Then overwriting the escape char
* with the escaped sequence or char, and continuing to scan
* from *after* the char than is after the escape in line_buf.
*/
*(cstate->attribute_buf.data + cstate->attribute_buf.len - 1) = newc;
cstate->line_buf.cursor = scan_end - cstate->line_buf.data + skip;
scan_start = scan_end + skip;
chunk_start = cstate->line_buf.cursor;
chunk_len = 0;
}
} /* end delimiter/backslash */
} /* end line buffer scan. */
/*
* Replace all delimiters with NULL for string termination.
* NOTE: only delimiters (NOT necessarily all delimc) are replaced.
* Example (delimc = '|'):
* - Before: f 1 | f \| 2 | f 3
* - After : f 1 \0 f | 2 \0 f 3
*/
for (attribute = 0; attribute < num_phys_attrs; attribute++)
{
if (attr_offsets[attribute] != 0)
*(cstate->attribute_buf.data + attr_offsets[attribute] - 1) = '\0';
}
/*
* MPP-6816
* If any attribute has a de-escaped octal or hex sequence with a
* high bit set, we check that the changed attribute text is still
* valid WRT encoding. We run the check on all attributes since
* such octal sequences are so rare in client data that it wouldn't
* affect performance at all anyway.
*/
if(saw_high_bit)
{
for (attribute = 0; attribute < num_phys_attrs; attribute++)
{
char *fld = cstate->attribute_buf.data + attr_offsets[attribute];
pg_verifymbstr(fld, strlen(fld), false);
}
}
}
/*
* Read all the attributes of the data line in CSV mode,
* performing de-escaping as needed. Escaping does not follow the normal
* PostgreSQL text mode, but instead "standard" (i.e. common) CSV usage.
*
* Quoted fields can span lines, in which case the line end is embedded
* in the returned string.
*
* null_print is the null marker string. Note that this is compared to
* the pre-de-escaped input string (thus if it is quoted it is not a NULL).
*----------
*/
void
CopyReadAttributesCSV(CopyState cstate, bool *nulls, int *attr_offsets,
int num_phys_attrs, Form_pg_attribute *attr)
{
char delimc = cstate->delim[0];
char quotec = cstate->quote[0];
char escapec = cstate->escape[0];
char c;
int start_cursor = cstate->line_buf.cursor;
int end_cursor = start_cursor;
int input_len = 0;
int attnum; /* attribute number being parsed */
int m = 0; /* attribute index being parsed */
int attribute = 1;
bool in_quote = false;
bool saw_quote = false;
ListCell *cur; /* cursor to attribute list used for this COPY */
/* init variables for attribute scan */
RESET_ATTRBUF;
cur = list_head(cstate->attnumlist);
if(num_phys_attrs > 0)
{
attnum = lfirst_int(cur);
m = attnum - 1;
}
for (;;)
{
end_cursor = cstate->line_buf.cursor;
/* finished processing attributes in line */
if (cstate->line_buf.cursor >= cstate->line_buf.len - 1)
{
input_len = end_cursor - start_cursor;
if (cstate->eol_type == EOL_CRLF)
{
/* ignore the leftover CR */
input_len--;
cstate->attribute_buf.data[cstate->attribute_buf.cursor - 1] = '\0';
}
/* check whether raw input matched null marker */
if(num_phys_attrs > 0)
{
if (!saw_quote && input_len == cstate->null_print_len &&
strncmp(&cstate->line_buf.data[start_cursor], cstate->null_print, input_len) == 0)
nulls[m] = true;
else
nulls[m] = false;
}
/* if zero column table and data is trying to get in */
if(num_phys_attrs == 0 && input_len > 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column"),
errOmitLocation(true)));
if(cur == NULL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column"),
errOmitLocation(true)));
if (in_quote)
{
/* next c will usually be LF, but it could also be a quote
* char if the last line of the file has no LF, and we don't
* want to error out in this case.
*/
c = cstate->line_buf.data[cstate->line_buf.cursor];
if(c != quotec)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("unterminated CSV quoted field"),
errOmitLocation(true)));
}
/*
* line is done, but do we have more attributes to process?
*
* normally, remaining attributes that have no data means ERROR,
* however, with FILL MISSING FIELDS remaining attributes become
* NULL. since attrs are null by default we leave unchanged and
* avoid throwing an error, with the exception of empty data lines
* for multiple attributes, which we intentionally don't support.
*/
if (lnext(cur) != NULL)
{
if (!cstate->fill_missing)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(attr[m + 1]->attname)),
errOmitLocation(true)));
else if (attribute == 1 && input_len == 0)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\", found empty data line",
NameStr(attr[m + 1]->attname)),
errOmitLocation(true)));
}
break;
}
c = cstate->line_buf.data[cstate->line_buf.cursor++];
/* unquoted field delimiter */
if (!in_quote && c == delimc && !cstate->delimiter_off)
{
/* check whether raw input matched null marker */
input_len = end_cursor - start_cursor;
if (cur == NULL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column"),
errOmitLocation(true)));
if(num_phys_attrs > 0)
{
if (!saw_quote && input_len == cstate->null_print_len &&
strncmp(&cstate->line_buf.data[start_cursor], cstate->null_print, input_len) == 0)
nulls[m] = true;
else
nulls[m] = false;
}
/* terminate attr string with '\0' */
appendStringInfoCharMacro(&cstate->attribute_buf, '\0');
cstate->attribute_buf.cursor++;
/* setup next attribute scan */
cur = lnext(cur);
if (cur == NULL)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column"),
errOmitLocation(true)));
saw_quote = false;
if(num_phys_attrs > 0)
{
attnum = lfirst_int(cur);
m = attnum - 1;
attr_offsets[m] = cstate->attribute_buf.cursor;
}
start_cursor = cstate->line_buf.cursor;
/*
* for the dispatcher - stop parsing once we have
* all the hash field values. We don't need the rest.
*/
if (Gp_role == GP_ROLE_DISPATCH)
{
if (attribute == cstate->last_hash_field)
break;
}
attribute++;
continue;
}
/* start of quoted field (or part of field) */
if (!in_quote && c == quotec)
{
saw_quote = true;
in_quote = true;
continue;
}
/* escape within a quoted field */
if (in_quote && c == escapec)
{
/*
* peek at the next char if available, and escape it if it is
* an escape char or a quote char
*/
if (cstate->line_buf.cursor <= cstate->line_buf.len)
{
char nextc = cstate->line_buf.data[cstate->line_buf.cursor];
if (nextc == escapec || nextc == quotec)
{
appendStringInfoCharMacro(&cstate->attribute_buf, nextc);
cstate->line_buf.cursor++;
cstate->attribute_buf.cursor++;
continue;
}
}
}
/*
* end of quoted field. Must do this test after testing for escape
* in case quote char and escape char are the same (which is the
* common case).
*/
if (in_quote && c == quotec)
{
in_quote = false;
continue;
}
appendStringInfoCharMacro(&cstate->attribute_buf, c);
cstate->attribute_buf.cursor++;
}
}
/*
* Read a single attribute line when delimiter is 'off'. This is a fast track -
* we copy the entire line buf into the attribute buf, check for null value,
* and we're done.
*
* Note that no equivalent function exists for CSV, as in CSV we still may
* need to parse quotes etc. so the functionality of delimiter_off is inlined
* inside of CopyReadAttributesCSV
*/
static void
CopyReadAttributesTextNoDelim(CopyState cstate, bool *nulls, int num_phys_attrs,
int attnum)
{
int len = 0;
Assert(num_phys_attrs == 1);
/* don't count eol char(s) in attr len calculation */
len = cstate->line_buf.len - 1;
if (cstate->eol_type == EOL_CRLF)
len--;
/* check if this is a NULL value or data value (assumed NULL) */
if (len == cstate->null_print_len &&
strncmp(cstate->line_buf.data, cstate->null_print, len) == 0)
nulls[attnum - 1] = true;
else
nulls[attnum - 1] = false;
appendBinaryStringInfo(&cstate->attribute_buf, cstate->line_buf.data, len);
}
/*
* Read the first attribute. This is mainly used to maintain support
* for an OID column. All the rest of the columns will be read at once with
* CopyReadAttributesText.
*/
static char *
CopyReadOidAttr(CopyState cstate, bool *isnull)
{
char delimc = cstate->delim[0];
char *start_loc = cstate->line_buf.data + cstate->line_buf.cursor;
char *end_loc;
int attr_len = 0;
int bytes_remaining;
/* reset attribute buf to empty */
RESET_ATTRBUF;
/* # of bytes that were not yet processed in this line */
bytes_remaining = cstate->line_buf.len - cstate->line_buf.cursor;
/* got to end of line */
if ((end_loc = scanTextLine(cstate, start_loc, delimc, bytes_remaining)) == NULL)
{
attr_len = bytes_remaining - 1; /* don't count '\n' in len calculation */
appendBinaryStringInfo(&cstate->attribute_buf, start_loc, attr_len);
cstate->line_buf.cursor += attr_len + 2; /* skip '\n' and '\0' */
}
else
/* found a delimiter */
{
/*
* (we don't care if delim was preceded with a backslash, because it's
* an invalid OID anyway)
*/
attr_len = end_loc - start_loc; /* we don't include the delimiter ch */
appendBinaryStringInfo(&cstate->attribute_buf, start_loc, attr_len);
cstate->line_buf.cursor += attr_len + 1;
}
/* check whether raw input matched null marker */
if (attr_len == cstate->null_print_len && strncmp(start_loc, cstate->null_print, attr_len) == 0)
*isnull = true;
else
*isnull = false;
return cstate->attribute_buf.data;
}
/*
* Send text representation of one attribute, with conversion and escaping
*/
#define DUMPSOFAR() \
do { \
if (ptr > start) \
CopySendData(cstate, start, ptr - start); \
} while (0)
/*
* Send text representation of one attribute, with conversion and escaping
*/
static void
CopyAttributeOutText(CopyState cstate, char *string)
{
char *ptr;
char *start;
char c;
char delimc = cstate->delim[0];
char escapec = cstate->escape[0];
if (cstate->need_transcoding)
ptr = pg_server_to_custom(string,
strlen(string),
cstate->client_encoding,
cstate->enc_conversion_proc);
else
ptr = string;
if (cstate->escape_off)
{
CopySendData(cstate, ptr, strlen(ptr));
return;
}
/*
* We have to grovel through the string searching for control characters
* and instances of the delimiter character. In most cases, though, these
* are infrequent. To avoid overhead from calling CopySendData once per
* character, we dump out all characters between escaped characters in a
* single call. The loop invariant is that the data from "start" to "ptr"
* can be sent literally, but hasn't yet been.
*
* We can skip pg_encoding_mblen() overhead when encoding is safe, because
* in valid backend encodings, extra bytes of a multibyte character never
* look like ASCII. This loop is sufficiently performance-critical that
* it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
* of the normal safe-encoding path.
*/
if (cstate->encoding_embeds_ascii)
{
start = ptr;
while ((c = *ptr) != '\0')
{
if ((unsigned char) c < (unsigned char) 0x20)
{
/*
* \r and \n must be escaped, the others are traditional. We
* prefer to dump these using the C-like notation, rather than
* a backslash and the literal character, because it makes the
* dump file a bit more proof against Microsoftish data
* mangling.
*/
switch (c)
{
case '\b':
c = 'b';
break;
case '\f':
c = 'f';
break;
case '\n':
c = 'n';
break;
case '\r':
c = 'r';
break;
case '\t':
c = 't';
break;
case '\v':
c = 'v';
break;
default:
/* If it's the delimiter, must backslash it */
if (c == delimc)
break;
/* All ASCII control chars are length 1 */
ptr++;
continue; /* fall to end of loop */
}
/* if we get here, we need to convert the control char */
DUMPSOFAR();
CopySendChar(cstate, escapec);
CopySendChar(cstate, c);
start = ++ptr; /* do not include char in next run */
}
else if (c == escapec || c == delimc)
{
DUMPSOFAR();
CopySendChar(cstate, escapec);
start = ptr++; /* we include char in next run */
}
else if (IS_HIGHBIT_SET(c))
ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
else
ptr++;
}
}
else
{
start = ptr;
while ((c = *ptr) != '\0')
{
if ((unsigned char) c < (unsigned char) 0x20)
{
/*
* \r and \n must be escaped, the others are traditional. We
* prefer to dump these using the C-like notation, rather than
* a backslash and the literal character, because it makes the
* dump file a bit more proof against Microsoftish data
* mangling.
*/
switch (c)
{
case '\b':
c = 'b';
break;
case '\f':
c = 'f';
break;
case '\n':
c = 'n';
break;
case '\r':
c = 'r';
break;
case '\t':
c = 't';
break;
case '\v':
c = 'v';
break;
default:
/* If it's the delimiter, must backslash it */
if (c == delimc)
break;
/* All ASCII control chars are length 1 */
ptr++;
continue; /* fall to end of loop */
}
/* if we get here, we need to convert the control char */
DUMPSOFAR();
CopySendChar(cstate, escapec);
CopySendChar(cstate, c);
start = ++ptr; /* do not include char in next run */
}
else if (c == escapec || c == delimc)
{
DUMPSOFAR();
CopySendChar(cstate, escapec);
start = ptr++; /* we include char in next run */
}
else
ptr++;
}
}
DUMPSOFAR();
}
/*
* Send text representation of one attribute, with conversion and
* CSV-style escaping
*/
static void
CopyAttributeOutCSV(CopyState cstate, char *string,
bool use_quote, bool single_attr)
{
char *ptr;
char *start;
char c;
char delimc = cstate->delim[0];
char quotec;
char escapec = cstate->escape[0];
/*
* MPP-8075. We may get called with cstate->quote == NULL.
*/
if (cstate->quote == NULL)
{
quotec = '"';
}
else
{
quotec = cstate->quote[0];
}
/* force quoting if it matches null_print (before conversion!) */
if (!use_quote && strcmp(string, cstate->null_print) == 0)
use_quote = true;
if (cstate->need_transcoding)
ptr = pg_server_to_custom(string,
strlen(string),
cstate->client_encoding,
cstate->enc_conversion_proc);
else
ptr = string;
/*
* Make a preliminary pass to discover if it needs quoting
*/
if (!use_quote)
{
/*
* Because '\.' can be a data value, quote it if it appears alone on a
* line so it is not interpreted as the end-of-data marker.
*/
if (single_attr && strcmp(ptr, "\\.") == 0)
use_quote = true;
else
{
char *tptr = ptr;
while ((c = *tptr) != '\0')
{
if (c == delimc || c == quotec || c == '\n' || c == '\r')
{
use_quote = true;
break;
}
if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
tptr += pg_encoding_mblen(cstate->client_encoding, tptr);
else
tptr++;
}
}
}
if (use_quote)
{
CopySendChar(cstate, quotec);
/*
* We adopt the same optimization strategy as in CopyAttributeOutText
*/
start = ptr;
while ((c = *ptr) != '\0')
{
if (c == quotec || c == escapec)
{
DUMPSOFAR();
CopySendChar(cstate, escapec);
start = ptr; /* we include char in next run */
}
if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
ptr += pg_encoding_mblen(cstate->client_encoding, ptr);
else
ptr++;
}
DUMPSOFAR();
CopySendChar(cstate, quotec);
}
else
{
/* If it doesn't need quoting, we can just dump it as-is */
CopySendString(cstate, ptr);
}
}
/*
* CopyGetAttnums - build an integer list of attnums to be copied
*
* The input attnamelist is either the user-specified column list,
* or NIL if there was none (in which case we want all the non-dropped
* columns).
*
* rel can be NULL ... it's only used for error reports.
*/
List *
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{
List *attnums = NIL;
if (attnamelist == NIL)
{
/* Generate default column list */
Form_pg_attribute *attr = tupDesc->attrs;
int attr_count = tupDesc->natts;
int i;
for (i = 0; i < attr_count; i++)
{
if (attr[i]->attisdropped)
continue;
attnums = lappend_int(attnums, i + 1);
}
}
else
{
/* Validate the user-supplied list and extract attnums */
ListCell *l;
foreach(l, attnamelist)
{
char *name = strVal(lfirst(l));
int attnum;
int i;
/* Lookup column name */
attnum = InvalidAttrNumber;
for (i = 0; i < tupDesc->natts; i++)
{
if (tupDesc->attrs[i]->attisdropped)
continue;
if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
{
attnum = tupDesc->attrs[i]->attnum;
break;
}
}
if (attnum == InvalidAttrNumber)
{
if (rel != NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
name, RelationGetRelationName(rel)),
errOmitLocation(true)));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
name),
errOmitLocation(true)));
}
/* Check for duplicates */
if (list_member_int(attnums, attnum))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" specified more than once",
name)));
attnums = lappend_int(attnums, attnum);
}
}
return attnums;
}
#define COPY_FIND_MD_DELIM \
md_delim = memchr(line_start, COPY_METADATA_DELIM, Min(32, cstate->line_buf.len)); \
if(md_delim && (md_delim != line_start)) \
{ \
value_len = md_delim - line_start + 1; \
*md_delim = '\0'; \
} \
else \
{ \
cstate->md_error = true; \
}
/*
* CopyExtractRowMetaData - extract embedded row number from data.
*
* If data is being parsed in execute mode the parser (QE) doesn't
* know the original line number (in the original file) of the current
* row. Therefore the QD sends this information along with the data.
* other metadata that the QD sends includes whether the data was
* converted to server encoding (should always be the case, unless
* encoding error happened and we're in error table mode).
*
* in:
* line_buf: <original_num>^<buf_converted>^<data for this row>
* lineno: ?
* line_buf_converted: ?
*
* out:
* line_buf: <data for this row>
* lineno: <original_num>
* line_buf_converted: <t/f>
*/
static
void CopyExtractRowMetaData(CopyState cstate)
{
char *md_delim = NULL; /* position of the metadata delimiter */
/*
* Line_buf may have already skipped an OID column if WITH OIDS defined,
* so we need to start from cursor not always from beginning of linebuf.
*/
char *line_start = cstate->line_buf.data + cstate->line_buf.cursor;
int value_len = 0;
cstate->md_error = false;
/* look for the first delimiter, and extract lineno */
COPY_FIND_MD_DELIM;
/*
* make sure MD exists. that should always be the case
* unless we run into an edge case - see MPP-8052. if that
* happens md_error is now set. we raise an error.
*/
if(cstate->md_error)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("COPY metadata not found. This probably means that there is a "
"mixture of newline types in the data. Use the NEWLINE keyword "
"in order to resolve this reliably.")));
cstate->cur_lineno = atoi(line_start);
*md_delim = COPY_METADATA_DELIM; /* restore the line_buf byte after setting it to \0 */
/* reposition line buf cursor to see next metadata value (skip lineno) */
cstate->line_buf.cursor += value_len;
line_start = cstate->line_buf.data + cstate->line_buf.cursor;
/* look for the second delimiter, and extract line_buf_converted */
COPY_FIND_MD_DELIM;
Assert(*line_start == '0' || *line_start == '1');
cstate->line_buf_converted = atoi(line_start);
*md_delim = COPY_METADATA_DELIM;
cstate->line_buf.cursor += value_len;
}
/*
* error context callback for COPY FROM
*/
static void
copy_in_error_callback(void *arg)
{
void *buff_orig;
CopyState cstate = (CopyState) arg;
/*
* Make sure to not modify the actual line_buf here (for example
* in limit_printout_length() ) as we may got here not because of
* a data error but because of an innocent elog(NOTICE) for example
* somewhere in the code. We make a copy of line_buf and use it for
* the error context purposes so that a non-data related elog call
* won't corrupt our data in line_buf.
*/
StringInfoData copy_of_line_buf;
char buffer[20];
initStringInfo(&copy_of_line_buf);
appendStringInfoString(&copy_of_line_buf, cstate->line_buf.data);
/* NB: be sure not to add anymore appendStringInfo() calls between here
* and the pfree() at the bottom.
*/
buff_orig = copy_of_line_buf.data;
/*
* If our (copy of) linebuf has the embedded original row number and other
* row-specific metadata, remove it. It is not part of the actual data, and
* should not be displayed.
*
* we skip this step, however, if md_error was previously set by
* CopyExtractRowMetaData. That should rarely happen, though.
*/
if(cstate->err_loc_type == ROWNUM_EMBEDDED && !cstate->md_error)
{
/* the following is a compacted mod of CopyExtractRowMetaData */
int value_len = 0;
char *line_start = copy_of_line_buf.data;
char *lineno_delim = memchr(line_start, COPY_METADATA_DELIM, Min(32, cstate->line_buf.len));
if (lineno_delim && (lineno_delim != line_start))
{
/*
* we only continue parsing metadata if the first extraction above
* succeeded. there are some edge cases where we may not have a line
* with MD to parse, for example if some non-copy related error
* propagated here and we don't yet have a proper data line.
* see MPP-11328
*/
value_len = lineno_delim - line_start + 1;
copy_of_line_buf.data += value_len; /* advance beyond original lineno */
copy_of_line_buf.len -= value_len;
line_start = copy_of_line_buf.data;
lineno_delim = memchr(line_start, COPY_METADATA_DELIM, Min(32, cstate->line_buf.len));
if (lineno_delim)
{
value_len = lineno_delim - line_start + 1;
copy_of_line_buf.data += value_len; /* advance beyond line_buf_converted */
copy_of_line_buf.len -= value_len;
}
}
}
/*
* If we saved the error context from a QE in cdbcopy.c append it here.
*/
if (Gp_role == GP_ROLE_DISPATCH && cstate->executor_err_context.len > 0)
{
errcontext("%s", cstate->executor_err_context.data);
pfree(buff_orig);
return;
}
if (cstate->cur_attname)
{
/* error is relevant to a particular column */
limit_printout_length(&cstate->attribute_buf);
if(!cstate->error_on_executor) /* don't print out context if error wasn't local */
errcontext("COPY %s, line %s, column %s",
cstate->cur_relname,linenumber_atoi(buffer,cstate->cur_lineno), cstate->cur_attname);
}
else
{
/* error is relevant to a particular line */
if (cstate->line_buf_converted || !cstate->need_transcoding)
{
truncateEol(&copy_of_line_buf, cstate->eol_type);
limit_printout_length(&copy_of_line_buf);
if(!cstate->error_on_executor) /* don't print out context if error wasn't local */
errcontext("COPY %s, line %s: \"%s\"", cstate->cur_relname,linenumber_atoi(buffer,cstate->cur_lineno), copy_of_line_buf.data);
}
else
{
/*
* Here, the line buffer is still in a foreign encoding,
* and indeed it's quite likely that the error is precisely
* a failure to do encoding conversion (ie, bad data). We
* dare not try to convert it, and at present there's no way
* to regurgitate it without conversion. So we have to punt
* and just report the line number.
*/
if(!cstate->error_on_executor)
errcontext("COPY %s, line %s", cstate->cur_relname,linenumber_atoi(buffer,cstate->cur_lineno));
}
}
pfree(buff_orig);
}
/*
* Make sure we don't print an unreasonable amount of COPY data in a message.
*
* It would seem a lot easier to just use the sprintf "precision" limit to
* truncate the string. However, some versions of glibc have a bug/misfeature
* that vsnprintf will always fail (return -1) if it is asked to truncate
* a string that contains invalid byte sequences for the current encoding.
* So, do our own truncation. We assume we can alter the StringInfo buffer
* holding the input data.
*/
void
limit_printout_length(StringInfo buf)
{
#define MAX_COPY_DATA_DISPLAY 100
int len;
/* Fast path if definitely okay */
if (buf->len <= MAX_COPY_DATA_DISPLAY)
return;
/* Apply encoding-dependent truncation */
len = pg_mbcliplen(buf->data, buf->len, MAX_COPY_DATA_DISPLAY);
if (buf->len <= len)
return; /* no need to truncate */
buf->len = len;
buf->data[len] = '\0';
/* Add "..." to show we truncated the input */
appendStringInfoString(buf, "...");
}
static void
attr_get_key(CopyState cstate, CdbCopy *cdbCopy, int original_lineno_for_qe,
unsigned int target_seg,
AttrNumber p_nattrs, AttrNumber *attrs,
Form_pg_attribute *attr_descs, int *attr_offsets, bool *attr_nulls,
FmgrInfo *in_functions, Oid *typioparams, Datum *values)
{
AttrNumber p_index;
/*
* Since we only need the internal format of values that
* we want to hash on (partitioning keys only), we want to
* skip converting the other values so we can run faster.
*/
for (p_index = 0; p_index < p_nattrs; p_index++)
{
ListCell *cur;
/*
* For this partitioning key, search for its location in the attr list.
* (note that fields may be out of order, so this is necessary).
*/
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
int m = attnum - 1;
char *string;
bool isnull;
if (attnum == attrs[p_index])
{
string = cstate->attribute_buf.data + attr_offsets[m];
if (attr_nulls[m])
isnull = true;
else
isnull = false;
if (cstate->csv_mode && isnull &&
cstate->force_notnull_flags[m])
{
string = cstate->null_print; /* set to NULL string */
isnull = false;
}
/* we read an SQL NULL, no need to do anything */
if (!isnull)
{
cstate->cur_attname = NameStr(attr_descs[m]->attname);
values[m] = InputFunctionCall(&in_functions[m],
string,
typioparams[m],
attr_descs[m]->atttypmod);
attr_nulls[m] = false;
cstate->cur_attname = NULL;
} /* end if (!isnull) */
break; /* go to next partitioning key
* attribute */
}
} /* end foreach */
} /* end for partitioning indexes */
}
/*
* The following are custom versions of the string function strchr().
* As opposed to the original strchr which searches through
* a string until the target character is found, or a NULL is
* found, this version will not return when a NULL is found.
* Instead it will search through a pre-defined length of
* bytes and will return only if the target character(s) is reached.
*
* If our client encoding is not a supported server encoding, we
* know that it is not safe to look at each character as trailing
* byte in a multibyte character may be a 7-bit ASCII equivalent.
* Therefore we use pg_encoding_mblen to skip to the end of the
* character.
*
* returns:
* pointer to c - if c is located within the string.
* NULL - if c was not found in specified length of search. Note:
* this DOESN'T mean that a '\0' was reached.
*/
char *
scanTextLine(CopyState cstate, const char *s, char eol, size_t len)
{
if (cstate->encoding_embeds_ascii && !cstate->line_buf_converted)
{
int mblen;
const char *end = s + len;
/* we may need to skip the end of a multibyte char from the previous buffer */
s += cstate->missing_bytes;
mblen = pg_encoding_mblen(cstate->client_encoding, s);
for (; *s != eol && s < end; s += mblen)
mblen = pg_encoding_mblen(cstate->client_encoding, s);
/*
* MPP-10802
* if last char is a partial mb char (the rest of its bytes are in the next
* buffer) save # of missing bytes for this char and skip them next time around
*/
cstate->missing_bytes = (s > end ? s - end : 0);
return ((*s == eol) ? (char *) s : NULL);
}
else
return memchr(s, eol, len);
}
char *
scanCSVLine(CopyState cstate, const char *s, char eol, char escapec, char quotec, size_t len)
{
const char *start = s;
const char *end = start + len;
if (cstate->encoding_embeds_ascii && !cstate->line_buf_converted)
{
int mblen;
/* we may need to skip the end of a multibyte char from the previous buffer */
s += cstate->missing_bytes;
mblen = pg_encoding_mblen(cstate->client_encoding, s);
for ( ; *s != eol && s < end ; s += mblen)
{
if (cstate->in_quote && *s == escapec)
cstate->last_was_esc = !cstate->last_was_esc;
if (*s == quotec && !cstate->last_was_esc)
cstate->in_quote = !cstate->in_quote;
if (*s != escapec)
cstate->last_was_esc = false;
mblen = pg_encoding_mblen(cstate->client_encoding, s);
}
/*
* MPP-10802
* if last char is a partial mb char (the rest of its bytes are in the next
* buffer) save # of missing bytes for this char and skip them next time around
*/
cstate->missing_bytes = (s > end ? s - end : 0);
}
else
/* safe to scroll byte by byte */
{
for ( ; *s != eol && s < end ; s++)
{
if (cstate->in_quote && *s == escapec)
cstate->last_was_esc = !cstate->last_was_esc;
if (*s == quotec && !cstate->last_was_esc)
cstate->in_quote = !cstate->in_quote;
if (*s != escapec)
cstate->last_was_esc = false;
}
}
if (s == end)
return NULL;
if (*s == eol)
cstate->last_was_esc = false;
return ((*s == eol) ? (char *) s : NULL);
}
/* remove end of line chars from end of a buffer */
void truncateEol(StringInfo buf, EolType eol_type)
{
int one_back = buf->len - 1;
int two_back = buf->len - 2;
if(eol_type == EOL_CRLF)
{
if(buf->len < 2)
return;
if(buf->data[two_back] == '\r' &&
buf->data[one_back] == '\n')
{
buf->data[two_back] = '\0';
buf->data[one_back] = '\0';
buf->len -= 2;
}
}
else
{
if(buf->len < 1)
return;
if(buf->data[one_back] == '\r' ||
buf->data[one_back] == '\n')
{
buf->data[one_back] = '\0';
buf->len--;
}
}
}
/*
* concatenateEol
*
* add end of line chars to end line buf.
*
*/
static void concatenateEol(CopyState cstate)
{
switch (cstate->eol_type)
{
case EOL_LF:
appendStringInfo(&cstate->line_buf, "\n");
break;
case EOL_CR:
appendStringInfo(&cstate->line_buf, "\r");
break;
case EOL_CRLF:
appendStringInfo(&cstate->line_buf, "\r\n");
break;
case EOL_UNKNOWN:
appendStringInfo(&cstate->line_buf, "\n");
break;
}
}
/*
* Escape any single quotes or backslashes in given string (from initdb.c)
*/
static char *
escape_quotes(const char *src)
{
int len = strlen(src),
i,
j;
char *result = palloc(len * 2 + 1);
for (i = 0, j = 0; i < len; i++)
{
if ((src[i]) == '\'' || (src[i]) == '\\')
result[j++] = src[i];
result[j++] = src[i];
}
result[j] = '\0';
return result;
}
/*
* copy_dest_startup --- executor startup
*/
static void
copy_dest_startup(DestReceiver *self __attribute__((unused)), int operation __attribute__((unused)), TupleDesc typeinfo __attribute__((unused)))
{
/* no-op */
}
/*
* copy_dest_receive --- receive one tuple
*/
static void
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
CopyState cstate = myState->cstate;
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
/* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot_get_values(slot), slot_get_isnull(slot));
}
/*
* copy_dest_shutdown --- executor end
*/
static void
copy_dest_shutdown(DestReceiver *self __attribute__((unused)))
{
/* no-op */
}
/*
* copy_dest_destroy --- release DestReceiver object
*/
static void
copy_dest_destroy(DestReceiver *self)
{
pfree(self);
}
/*
* CreateCopyDestReceiver -- create a suitable DestReceiver object
*/
DestReceiver *
CreateCopyDestReceiver(void)
{
DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
self->pub.receiveSlot = copy_dest_receive;
self->pub.rStartup = copy_dest_startup;
self->pub.rShutdown = copy_dest_shutdown;
self->pub.rDestroy = copy_dest_destroy;
self->pub.mydest = DestCopyOut;
self->cstate = NULL; /* will be set later */
return (DestReceiver *) self;
}
static void CopyInitPartitioningState(EState *estate)
{
if (estate->es_result_partitions)
{
estate->es_partition_state =
createPartitionState(estate->es_result_partitions,
estate->es_num_result_relations);
}
}
/*
* Initialize data loader parsing state
*/
static void CopyInitDataParser(CopyState cstate)
{
cstate->fe_eof = false;
cstate->cur_relname = RelationGetRelationName(cstate->rel);
cstate->cur_lineno = 0;
cstate->cur_attname = NULL;
cstate->null_print_len = strlen(cstate->null_print);
if (cstate->csv_mode)
{
cstate->in_quote = false;
cstate->last_was_esc = false;
cstate->num_consec_csv_err = 0;
}
/* Set up data buffer to hold a chunk of data */
MemSet(cstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char));
cstate->raw_buf[RAW_BUF_SIZE] = '\0';
cstate->line_done = true;
cstate->raw_buf_done = false;
}
/*
* CopyCheckIsLastLine
*
* This routine checks if the line being looked at is the last line of data.
* If it is, it makes sure that this line is terminated with an EOL. We must
* do this check in order to support files that don't end up EOL before EOF,
* because we want to treat that last line as normal - and be able to pre
* process it like the other lines (remove metadata chars, encoding conversion).
*
* See MPP-4406 for an example of why this is needed.
*/
static bool CopyCheckIsLastLine(CopyState cstate)
{
if (cstate->fe_eof)
{
concatenateEol(cstate);
return true;
}
return false;
}
/*
* setEncodingConversionProc
*
* COPY and External tables use a custom path to the encoding conversion
* API because external tables have their own encoding (which is not
* necessarily client_encoding). We therefore have to set the correct
* encoding conversion function pointer ourselves, to be later used in
* the conversion engine.
*
* The code here mimics a part of SetClientEncoding() in mbutils.c
*/
void setEncodingConversionProc(CopyState cstate, int client_encoding, bool iswritable)
{
Oid conversion_proc;
/*
* COPY FROM and RET: convert from client to server
* COPY TO and WET: convert from server to client
*/
if (iswritable)
conversion_proc = FindDefaultConversionProc(GetDatabaseEncoding(),
client_encoding);
else
conversion_proc = FindDefaultConversionProc(client_encoding,
GetDatabaseEncoding());
if (OidIsValid(conversion_proc))
{
/* conversion proc found */
cstate->enc_conversion_proc = palloc(sizeof(FmgrInfo));
fmgr_info(conversion_proc, cstate->enc_conversion_proc);
}
else
{
/* no conversion function (both encodings are probably the same) */
cstate->enc_conversion_proc = NULL;
}
}
/*
* preProcessDataLine
*
* When Done reading a complete data line set input row number for error report
* purposes (this also removes any metadata that was concatenated to the data
* by the QD during COPY) and convert it to server encoding if transcoding is
* needed.
*/
static
void preProcessDataLine(CopyState cstate)
{
char *cvt;
bool force_transcoding = false;
/*
* Increment line count by 1 if we have access to all the original
* data rows and can count them reliably (ROWNUM_ORIGINAL). However
* if we have ROWNUM_EMBEDDED the original row number for this row
* was sent to us with the data (courtesy of the data distributor), so
* get that number instead.
*/
if(cstate->err_loc_type == ROWNUM_ORIGINAL)
{
cstate->cur_lineno++;
}
else if(cstate->err_loc_type == ROWNUM_EMBEDDED)
{
Assert(Gp_role == GP_ROLE_EXECUTE);
/*
* Extract various metadata sent to us from the QD COPY about this row:
* 1) the original line number of the row.
* 2) if the row was converted to server encoding or not
*/
CopyExtractRowMetaData(cstate); /* sets cur_lineno internally */
/* check if QD sent us a badly encoded row, still in client_encoding,
* in order to catch the encoding error ourselves. if line_buf_converted
* is false after CopyExtractRowMetaData then we must transcode and catch
* the error. Verify that we are indeed in SREH error table mode. that's
* the only valid path for receiving an unconverted data row.
*/
if (!cstate->line_buf_converted)
{
Assert(cstate->errMode == SREH_LOG);
force_transcoding = true;
}
}
else
{
Assert(false); /* byte offset not yet supported */
}
if (cstate->need_transcoding || force_transcoding)
{
cvt = (char *) pg_custom_to_server(cstate->line_buf.data,
cstate->line_buf.len,
cstate->client_encoding,
cstate->enc_conversion_proc);
Assert(!force_transcoding); /* if force is 't' we must have failed in the conversion */
if (cvt != cstate->line_buf.data)
{
/* transfer converted data back to line_buf */
RESET_LINEBUF;
appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
pfree(cvt);
}
}
/* indicate that line buf is in server encoding */
cstate->line_buf_converted = true;
}
void CopyEolStrToType(CopyState cstate)
{
if (pg_strcasecmp(cstate->eol_str, "lf") == 0)
{
cstate->eol_type = EOL_LF;
cstate->eol_ch[0] = '\n';
cstate->eol_ch[1] = '\0';
}
else if (pg_strcasecmp(cstate->eol_str, "cr") == 0)
{
cstate->eol_type = EOL_CR;
cstate->eol_ch[0] = '\r';
cstate->eol_ch[1] = '\0';
}
else if (pg_strcasecmp(cstate->eol_str, "crlf") == 0)
{
cstate->eol_type = EOL_CRLF;
cstate->eol_ch[0] = '\r';
cstate->eol_ch[1] = '\n';
}
else /* error. must have been validated in CopyValidateControlChars() ! */
ereport(ERROR,
(errcode(ERRCODE_CDB_INTERNAL_ERROR),
errmsg("internal error in CopySetEolType. Trying to set NEWLINE %s",
cstate->eol_str)));
}