| /*------------------------------------------------------------------------- |
| * |
| * copyfrom.c |
| * COPY <table> FROM file/program/client |
| * |
| * This file contains routines needed to efficiently load tuples into a |
| * table. That includes looking up the correct partition, firing triggers, |
| * calling the table AM function to insert the data, and updating indexes. |
| * Reading data from the input file or client and parsing it into Datums |
| * is handled in copyfromparse.c. |
| * |
| * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/commands/copyfrom.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include <ctype.h> |
| #include <unistd.h> |
| #include <sys/stat.h> |
| |
| #include "access/heapam.h" |
| #include "access/htup_details.h" |
| #include "access/tableam.h" |
| #include "access/xact.h" |
| #include "access/xlog.h" |
| #include "catalog/namespace.h" |
| #include "catalog/pg_directory_table.h" |
| #include "catalog/storage_directory_table.h" |
| #include "cdb/cdbaocsam.h" |
| #include "cdb/cdbappendonlyam.h" |
| #include "cdb/cdbdisp.h" |
| #include "cdb/cdbhash.h" |
| #include "cdb/cdbutil.h" |
| #include "cdb/cdbvars.h" |
| #include "commands/copy.h" |
| #include "commands/copyfrom_internal.h" |
| #include "commands/progress.h" |
| #include "commands/tablespace.h" |
| #include "commands/trigger.h" |
| #include "common/base64.h" |
| #include "common/cryptohash.h" |
| #include "common/md5.h" |
| #include "executor/execPartition.h" |
| #include "executor/executor.h" |
| #include "executor/nodeModifyTable.h" |
| #include "executor/tuptable.h" |
| #include "foreign/fdwapi.h" |
| #include "libpq/libpq.h" |
| #include "libpq/pqformat.h" |
| #include "miscadmin.h" |
| #include "optimizer/optimizer.h" |
| #include "pgstat.h" |
| #include "rewrite/rewriteHandler.h" |
| #include "storage/fd.h" |
| #include "storage/ufile.h" |
| #include "tcop/tcopprot.h" |
| #include "utils/builtins.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/portal.h" |
| #include "utils/rel.h" |
| #include "utils/snapmgr.h" |
| |
| /* |
| * When doing a COPY FROM through the dispatcher, the QD reads the input from |
| * the input file (or stdin or program), and forwards the data to the QE nodes, |
| * where they will actually be inserted. |
| * |
| * Ideally, the QD would just pass through each line to the QE as is, and let |
| * the QEs to do all the processing. Because the more processing the QD has |
| * to do, the more likely it is to become a bottleneck. |
| * |
| * However, the QD needs to figure out which QE to send each row to. For that, |
| * it needs to at least parse the distribution key. The distribution key might |
| * also be a DEFAULTed column, in which case the DEFAULT value needs to be |
| * evaluated in the QD. In that case, the QD must send the computed value |
| * to the QE - we cannot assume that the QE can re-evaluate the expression and |
| * arrive at the same value, at least not if the DEFAULT expression is volatile. |
| * |
| * Therefore, we need a flexible format between the QD and QE, where the QD |
| * processes just enough of each input line to figure out where to send it. |
| * It must send the values it had to parse and evaluate to the QE, as well |
| * as the rest of the original input line, so that the QE can parse the rest |
| * of it. |
| * |
| * The 'copy_from_dispatch_*' structs are used in the QD->QE stream. For each |
| * input line, the QD constructs a 'copy_from_dispatch_row' struct, and sends |
| * it to the QE. Before any rows, a QDtoQESignature is sent first, followed by |
| * a 'copy_from_dispatch_header'. When QD encounters a recoverable error that |
| * needs to be logged in the error log (LOG ERRORS SEGMENT REJECT LIMIT), it |
| * sends the erroneous raw to a QE, in a 'copy_from_dispatch_error' struct. |
| * |
| * |
| * COPY TO is simpler: The QEs form the output rows in the final form, and the QD |
| * just collects and forwards them to the client. The QD doesn't need to parse |
| * the rows at all. |
| */ |
| extern CopyStmt *glob_copystmt; |
| |
| extern bool Test_copy_qd_qe_split; |
| |
| /* Header contains information that applies to all the rows that follow. */ |
| typedef struct |
| { |
| /* |
| * First field that should be processed in the QE. Any fields before |
| * this will be included as Datums in the rows that follow. |
| */ |
| int16 first_qe_processed_field; |
| } copy_from_dispatch_header; |
| |
| typedef struct |
| { |
| /* |
| * Information about this input line. |
| * |
| * 'relid' is the target relation's OID. Normally, the same as |
| * cstate->relid, but for a partitioned relation, it indicates the target |
| * partition. Note: this must be the first field, because InvalidOid means |
| * that this is actually a 'copy_from_dispatch_error' struct. |
| * |
| * 'lineno' is the input line number, for error reporting. |
| */ |
| int64 lineno; |
| Oid relid; |
| |
| uint32 line_len; /* size of the included input line */ |
| uint32 residual_off; /* offset in the line, where QE should |
| * process remaining fields */ |
| bool delim_seen_at_end; /* conveys to QE if QD saw a delim at end |
| * of its processing */ |
| uint16 fld_count; /* # of fields that were processed in the |
| * QD. */ |
| |
| /* The input line follows. */ |
| |
| /* |
| * For each field that was parsed in the QD already, the following data follows: |
| * |
| * int16 fieldnum; |
| * <data> |
| * |
| * NULL values are not included, any attributes that are not included in |
| * the message are implicitly NULL. |
| * |
| * For pass-by-value datatypes, the <data> is the raw Datum. For |
| * simplicity, it is always sent as a full-width 8-byte Datum, regardless |
| * of the datatype's length. |
| * |
| * For other fixed width datatypes, <data> is the datatype's value. |
| * |
| * For variable-length datatypes, <data> begins with a 4-byte length field, |
| * followed by the data. Cstrings (typlen = -2) are also sent in this |
| * format. |
| */ |
| } copy_from_dispatch_row; |
| |
| /* Size of the struct, without padding at the end. */ |
| #define SizeOfCopyFromDispatchRow (offsetof(copy_from_dispatch_row, fld_count) + sizeof(uint16)) |
| |
| typedef struct |
| { |
| int64 error_marker; /* constant -1, to mark that this is an error |
| * frame rather than 'copy_from_dispatch_row' */ |
| int64 lineno; |
| uint32 errmsg_len; |
| uint32 line_len; |
| |
| /* 'errmsg' follows */ |
| /* 'line' follows */ |
| } copy_from_dispatch_error; |
| |
| /* Size of the struct, without padding at the end. */ |
| /* |
| * GPDB_114_MERGE_FIXME: last file of copy_from_dispatch_error has changed. |
| * Check if line_buf_converted need anymore? |
| */ |
| #define SizeOfCopyFromDispatchError (offsetof(copy_from_dispatch_error, line_len) + sizeof(uint32)) |
| |
| /* Low-level communications functions */ |
| static void |
| SendCopyFromForwardedTuple(CopyFromState cstate, |
| CdbCopy *cdbCopy, |
| bool toAll, |
| int target_seg, |
| Relation rel, |
| int64 lineno, |
| char *line, |
| int line_len, |
| Datum *values, |
| bool *nulls, |
| bool is_directory_table); |
| static void SendCopyFromForwardedHeader(CopyFromState cstate, CdbCopy *cdbCopy); |
| static void SendCopyFromForwardedError(CopyFromState cstate, CdbCopy *cdbCopy, char *errmsg); |
| |
| static bool NextCopyFromDispatch(CopyFromState cstate, ExprContext *econtext, |
| Datum *values, bool *nulls); |
| static bool NextCopyFromExecute(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls, bool is_directory_table); |
| static bool NextCopyFromRawFieldsX(CopyFromState cstate, char ***fields, int *nfields, |
| int stop_processing_at_field); |
| static bool NextCopyFromX(CopyFromState cstate, ExprContext *econtext, |
| Datum *values, bool *nulls); |
| static void HandleQDErrorFrame(CopyFromState cstate, char *p); |
| |
| static void CopyInitDataParser(CopyFromState cstate); |
| |
| static GpDistributionData *InitDistributionData(CopyFromState cstate, EState *estate); |
| static void FreeDistributionData(GpDistributionData *distData); |
| static void InitCopyFromDispatchSplit(CopyFromState cstate, GpDistributionData *distData, EState *estate); |
| static unsigned int GetTargetSeg(GpDistributionData *distData, TupleTableSlot *slot); |
| |
| static uint64 CopyFromDirectoryTable(CopyFromState cstate); |
| static CopyFromState BeginCopyFromDirectoryTable(ParseState *pstate, Relation rel, |
| const char *filename, bool is_program, List *options); |
| |
| /* |
| * No more than this many tuples per CopyMultiInsertBuffer |
| * |
| * Caution: Don't make this too big, as we could end up with this many |
| * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's |
| * multiInsertBuffers list. Increasing this can cause quadratic growth in |
| * memory requirements during copies into partitioned tables with a large |
| * number of partitions. |
| */ |
| #define MAX_BUFFERED_TUPLES 1000 |
| |
| /* |
| * Flush buffers if there are >= this many bytes, as counted by the input |
| * size, of tuples stored. |
| */ |
| #define MAX_BUFFERED_BYTES 65535 |
| |
| /* Trim the list of buffers back down to this number after flushing */ |
| #define MAX_PARTITION_BUFFERS 32 |
| |
| /* The buffer size of directory table files */ |
| #define DIR_FILE_BUFF_SIZE 8192 |
| |
| /* Stores multi-insert data related to a single relation in CopyFrom. */ |
| typedef struct CopyMultiInsertBuffer |
| { |
| TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ |
| ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ |
| BulkInsertState bistate; /* BulkInsertState for this rel */ |
| int nused; /* number of 'slots' containing tuples */ |
| uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy |
| * stream */ |
| } CopyMultiInsertBuffer; |
| |
| /* |
| * Stores one or many CopyMultiInsertBuffers and details about the size and |
| * number of tuples which are stored in them. This allows multiple buffers to |
| * exist at once when COPYing into a partitioned table. |
| */ |
| typedef struct CopyMultiInsertInfo |
| { |
| List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */ |
| int bufferedTuples; /* number of tuples buffered over all buffers */ |
| int bufferedBytes; /* number of bytes from all buffered tuples */ |
| CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */ |
| EState *estate; /* Executor state used for COPY */ |
| CommandId mycid; /* Command Id used for COPY */ |
| int ti_options; /* table insert options */ |
| } CopyMultiInsertInfo; |
| |
| static const char QDtoQESignature[] = "PGCOPY-QD-TO-QE\n\377\r\n"; |
| |
| /* |
| * error context callback for COPY FROM |
| * |
| * The argument for the error context must be CopyFromState. |
| */ |
| void |
| CopyFromErrorCallback(void *arg) |
| { |
| CopyFromState cstate = (CopyFromState) arg; |
| char curlineno_str[32]; |
| |
| snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT, |
| cstate->cur_lineno); |
| |
| if (cstate->opts.binary) |
| { |
| /* can't usefully display the data */ |
| if (cstate->cur_attname) |
| errcontext("COPY %s, line %s, column %s", |
| cstate->cur_relname, curlineno_str, |
| cstate->cur_attname); |
| else |
| errcontext("COPY %s, line %s", |
| cstate->cur_relname, curlineno_str); |
| } |
| else |
| { |
| if (cstate->cur_attname && cstate->cur_attval) |
| { |
| /* error is relevant to a particular column */ |
| char *attval; |
| |
| attval = limit_printout_length(cstate->cur_attval); |
| errcontext("COPY %s, line %s, column %s: \"%s\"", |
| cstate->cur_relname, curlineno_str, |
| cstate->cur_attname, attval); |
| pfree(attval); |
| } |
| else if (cstate->cur_attname) |
| { |
| /* error is relevant to a particular column, value is NULL */ |
| errcontext("COPY %s, line %s, column %s: null input", |
| cstate->cur_relname, curlineno_str, |
| cstate->cur_attname); |
| } |
| else |
| { |
| /* |
| * Error is relevant to a particular line. |
| * |
| * If line_buf still contains the correct line, print it. |
| */ |
| if (cstate->line_buf_valid) |
| { |
| char *lineval; |
| |
| lineval = limit_printout_length(cstate->line_buf.data); |
| errcontext("COPY %s, line %s: \"%s\"", |
| cstate->cur_relname, curlineno_str, lineval); |
| pfree(lineval); |
| } |
| else |
| { |
| errcontext("COPY %s, line %s", |
| cstate->cur_relname, curlineno_str); |
| } |
| } |
| } |
| } |
| |
| /* |
| * Make sure we don't print an unreasonable amount of COPY data in a message. |
| * |
| * Returns a pstrdup'd copy of the input. |
| */ |
| char * |
| limit_printout_length(const char *str) |
| { |
| #define MAX_COPY_DATA_DISPLAY 100 |
| |
| int slen = strlen(str); |
| int len; |
| char *res; |
| |
| /* Fast path if definitely okay */ |
| if (slen <= MAX_COPY_DATA_DISPLAY) |
| return pstrdup(str); |
| |
| /* Apply encoding-dependent truncation */ |
| len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY); |
| |
| /* |
| * Truncate, and add "..." to show we truncated the input. |
| */ |
| res = (char *) palloc(len + 4); |
| memcpy(res, str, len); |
| strcpy(res + len, "..."); |
| |
| return res; |
| } |
| |
| /* |
| * Allocate memory and initialize a new CopyMultiInsertBuffer for this |
| * ResultRelInfo. |
| */ |
| static CopyMultiInsertBuffer * |
| CopyMultiInsertBufferInit(ResultRelInfo *rri) |
| { |
| CopyMultiInsertBuffer *buffer; |
| |
| buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); |
| memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); |
| buffer->resultRelInfo = rri; |
| buffer->bistate = GetBulkInsertState(); |
| buffer->nused = 0; |
| |
| return buffer; |
| } |
| |
| /* |
| * Make a new buffer for this ResultRelInfo. |
| */ |
| static inline void |
| CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, |
| ResultRelInfo *rri) |
| { |
| CopyMultiInsertBuffer *buffer; |
| |
| buffer = CopyMultiInsertBufferInit(rri); |
| |
| /* Setup back-link so we can easily find this buffer again */ |
| rri->ri_CopyMultiInsertBuffer = buffer; |
| /* Record that we're tracking this buffer */ |
| miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); |
| } |
| |
| /* |
| * Initialize an already allocated CopyMultiInsertInfo. |
| * |
| * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up |
| * for that table. |
| */ |
| static void |
| CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, |
| CopyFromState cstate, EState *estate, CommandId mycid, |
| int ti_options) |
| { |
| miinfo->multiInsertBuffers = NIL; |
| miinfo->bufferedTuples = 0; |
| miinfo->bufferedBytes = 0; |
| miinfo->cstate = cstate; |
| miinfo->estate = estate; |
| miinfo->mycid = mycid; |
| miinfo->ti_options = ti_options; |
| |
| /* |
| * Only setup the buffer when not dealing with a partitioned table. |
| * Buffers for partitioned tables will just be setup when we need to send |
| * tuples their way for the first time. |
| */ |
| if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) |
| CopyMultiInsertInfoSetupBuffer(miinfo, rri); |
| } |
| |
| /* |
| * Returns true if the buffers are full |
| */ |
| static inline bool |
| CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo) |
| { |
| if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES || |
| miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) |
| return true; |
| return false; |
| } |
| |
| /* |
| * Returns true if we have no buffered tuples |
| */ |
| static inline bool |
| CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo) |
| { |
| return miinfo->bufferedTuples == 0; |
| } |
| |
| /* |
| * Write the tuples stored in 'buffer' out to the table. |
| */ |
| static inline void |
| CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, |
| CopyMultiInsertBuffer *buffer) |
| { |
| MemoryContext oldcontext; |
| int i; |
| uint64 save_cur_lineno; |
| CopyFromState cstate = miinfo->cstate; |
| EState *estate = miinfo->estate; |
| CommandId mycid = miinfo->mycid; |
| int ti_options = miinfo->ti_options; |
| bool line_buf_valid = cstate->line_buf_valid; |
| int nused = buffer->nused; |
| ResultRelInfo *resultRelInfo = buffer->resultRelInfo; |
| TupleTableSlot **slots = buffer->slots; |
| |
| /* |
| * Print error context information correctly, if one of the operations |
| * below fails. |
| */ |
| cstate->line_buf_valid = false; |
| save_cur_lineno = cstate->cur_lineno; |
| |
| /* |
| * table_multi_insert may leak memory, so switch to short-lived memory |
| * context before calling it. |
| */ |
| oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| table_multi_insert(resultRelInfo->ri_RelationDesc, |
| slots, |
| nused, |
| mycid, |
| ti_options, |
| buffer->bistate); |
| MemoryContextSwitchTo(oldcontext); |
| |
| for (i = 0; i < nused; i++) |
| { |
| /* |
| * If there are any indexes, update them for all the inserted tuples, |
| * and run AFTER ROW INSERT triggers. |
| */ |
| if (resultRelInfo->ri_NumIndices > 0) |
| { |
| List *recheckIndexes; |
| |
| cstate->cur_lineno = buffer->linenos[i]; |
| recheckIndexes = |
| ExecInsertIndexTuples(resultRelInfo, |
| buffer->slots[i], estate, false, false, |
| NULL, NIL); |
| ExecARInsertTriggers(estate, resultRelInfo, |
| slots[i], recheckIndexes, |
| cstate->transition_capture); |
| list_free(recheckIndexes); |
| } |
| |
| /* |
| * There's no indexes, but see if we need to run AFTER ROW INSERT |
| * triggers anyway. |
| */ |
| else if (resultRelInfo->ri_TrigDesc != NULL && |
| (resultRelInfo->ri_TrigDesc->trig_insert_after_row || |
| resultRelInfo->ri_TrigDesc->trig_insert_new_table)) |
| { |
| cstate->cur_lineno = buffer->linenos[i]; |
| ExecARInsertTriggers(estate, resultRelInfo, |
| slots[i], NIL, cstate->transition_capture); |
| } |
| |
| ExecClearTuple(slots[i]); |
| } |
| |
| /* Mark that all slots are free */ |
| buffer->nused = 0; |
| |
| /* reset cur_lineno and line_buf_valid to what they were */ |
| cstate->line_buf_valid = line_buf_valid; |
| cstate->cur_lineno = save_cur_lineno; |
| } |
| |
| /* |
| * Drop used slots and free member for this buffer. |
| * |
| * The buffer must be flushed before cleanup. |
| */ |
| static inline void |
| CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, |
| CopyMultiInsertBuffer *buffer) |
| { |
| int i; |
| |
| /* Ensure buffer was flushed */ |
| Assert(buffer->nused == 0); |
| |
| /* Remove back-link to ourself */ |
| buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL; |
| |
| FreeBulkInsertState(buffer->bistate); |
| |
| /* Since we only create slots on demand, just drop the non-null ones. */ |
| for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) |
| ExecDropSingleTupleTableSlot(buffer->slots[i]); |
| |
| if (RelationIsNonblockRelation(buffer->resultRelInfo->ri_RelationDesc)) |
| { |
| /* |
| * CBDB: do not call table_finish_bulk_insert for AO/AOCO or PAX tables. |
| * https://github.com/apache/cloudberry/issues/547 |
| * Do not clean up context or resource here, table_finish_bulk_insert |
| * routine will be called more than once during COPY FROM if |
| * the partition buffer is flushed but COPY is not finished. |
| */ |
| pfree(buffer); |
| return; |
| } |
| |
| table_finish_bulk_insert(buffer->resultRelInfo->ri_RelationDesc, |
| miinfo->ti_options); |
| |
| pfree(buffer); |
| } |
| |
| /* |
| * Write out all stored tuples in all buffers out to the tables. |
| * |
| * Once flushed we also trim the tracked buffers list down to size by removing |
| * the buffers created earliest first. |
| * |
| * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being |
| * used. When cleaning up old buffers we'll never remove the one for |
| * 'curr_rri'. |
| */ |
| static inline void |
| CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri) |
| { |
| ListCell *lc; |
| |
| foreach(lc, miinfo->multiInsertBuffers) |
| { |
| CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc); |
| |
| CopyMultiInsertBufferFlush(miinfo, buffer); |
| } |
| |
| miinfo->bufferedTuples = 0; |
| miinfo->bufferedBytes = 0; |
| |
| /* |
| * Trim the list of tracked buffers down if it exceeds the limit. Here we |
| * remove buffers starting with the ones we created first. It seems less |
| * likely that these older ones will be needed than the ones that were |
| * just created. |
| */ |
| while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS) |
| { |
| CopyMultiInsertBuffer *buffer; |
| |
| buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); |
| |
| /* |
| * We never want to remove the buffer that's currently being used, so |
| * if we happen to find that then move it to the end of the list. |
| */ |
| if (buffer->resultRelInfo == curr_rri) |
| { |
| miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); |
| miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer); |
| buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers); |
| } |
| |
| CopyMultiInsertBufferCleanup(miinfo, buffer); |
| miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers); |
| } |
| } |
| |
| /* |
| * Cleanup allocated buffers and free memory |
| */ |
| static inline void |
| CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo) |
| { |
| ListCell *lc; |
| |
| foreach(lc, miinfo->multiInsertBuffers) |
| CopyMultiInsertBufferCleanup(miinfo, lfirst(lc)); |
| |
| list_free(miinfo->multiInsertBuffers); |
| } |
| |
| /* |
| * Get the next TupleTableSlot that the next tuple should be stored in. |
| * |
| * Callers must ensure that the buffer is not full. |
| * |
| * Note: 'miinfo' is unused but has been included for consistency with the |
| * other functions in this area. |
| */ |
| static inline TupleTableSlot * |
| CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, |
| ResultRelInfo *rri) |
| { |
| CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; |
| int nused = buffer->nused; |
| |
| Assert(buffer != NULL); |
| Assert(nused < MAX_BUFFERED_TUPLES); |
| |
| if (buffer->slots[nused] == NULL) |
| buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); |
| return buffer->slots[nused]; |
| } |
| |
| /* |
| * Record the previously reserved TupleTableSlot that was reserved by |
| * CopyMultiInsertInfoNextFreeSlot as being consumed. |
| */ |
| static inline void |
| CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, |
| TupleTableSlot *slot, int tuplen, uint64 lineno) |
| { |
| CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; |
| |
| Assert(buffer != NULL); |
| Assert(slot == buffer->slots[buffer->nused]); |
| |
| /* Store the line number so we can properly report any errors later */ |
| buffer->linenos[buffer->nused] = lineno; |
| |
| /* Record this slot as being used */ |
| buffer->nused++; |
| |
| /* Update how many tuples are stored and their size */ |
| miinfo->bufferedTuples++; |
| miinfo->bufferedBytes += tuplen; |
| } |
| |
| static CopyStmt * |
| convertToCopyTextStmt(CopyStmt *stmt) |
| { |
| CopyStmt *copiedStmt = copyObject(stmt); |
| |
| copiedStmt->options = NIL; |
| |
| return copiedStmt; |
| } |
| |
| static char * |
| trimFilePath(char *filePath, char c) |
| { |
| char *end; |
| |
| while (*filePath == c) |
| filePath++; |
| |
| end = filePath + strlen(filePath) - 1; |
| while (end > filePath && *end == c) |
| end--; |
| |
| *(end + 1) = '\0'; |
| |
| return filePath; |
| } |
| |
| static void |
| formDirTableSlot(CopyFromState cstate, |
| Oid spcId, |
| char *relativePath, |
| int64 fileSize, |
| char *md5sum, |
| char *tags, |
| Datum *values, |
| bool *nulls) |
| { |
| TupleDesc tupDesc; |
| AttrNumber num_phys_attrs; |
| ListCell *cur; |
| char *field[5]; |
| FmgrInfo *in_functions = cstate->in_functions; |
| Oid *typioparams = cstate->typioparams; |
| List *attnumlist = cstate->attnumlist; |
| pg_time_t stampTime = (pg_time_t) time(NULL); |
| char lastModified[128]; |
| |
| pg_strftime(lastModified, sizeof(lastModified), |
| "%Y-%m-%d %H:%M:%S", |
| pg_localtime(&stampTime, log_timezone)); |
| |
| tupDesc = RelationGetDescr(cstate->rel); |
| num_phys_attrs = tupDesc->natts; |
| |
| MemSet(values, 0, num_phys_attrs * sizeof(Datum)); |
| MemSet(nulls, false, num_phys_attrs * sizeof(bool)); |
| |
| field[0] = relativePath; /* relative_path */ |
| field[1] = psprintf(INT64_FORMAT, fileSize); /* size */ |
| field[2] = lastModified; /* last_modified */ |
| field[3] = md5sum; /* md5sum */ |
| if (md5sum == NULL) |
| nulls[3] = true; |
| field[4] = tags; /* tags */ |
| if (tags == NULL) |
| nulls[4] = true; |
| |
| /* Loop to read the user attributes on the line. */ |
| foreach(cur, attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| int m = attnum - 1; |
| char *value; |
| Form_pg_attribute att = TupleDescAttr(tupDesc, m); |
| |
| value = field[m]; |
| |
| values[m] = InputFunctionCall(&in_functions[m], |
| value, |
| typioparams[m], |
| att->atttypmod); |
| } |
| } |
| |
| /* |
| * Copy From file to directory table. |
| */ |
| static uint64 |
| CopyFromDirectoryTable(CopyFromState cstate) |
| { |
| ResultRelInfo *resultRelInfo; |
| ResultRelInfo *target_resultRelInfo; |
| EState *estate = CreateExecutorState(); |
| TupleTableSlot *myslot = NULL; |
| ExprContext *econtext; |
| int bytesRead; |
| char hexMd5Sum[256]; |
| char buffer[DIR_FILE_BUFF_SIZE]; |
| int64 processed = 0; |
| int64 fileSize = 0; |
| UFile *file; |
| CdbCopy *cdbCopy = NULL; |
| char *dirTablePath; |
| char *orgiFileName; |
| char *relaFileDirPointer; |
| char relaFileDir[MAXPGPATH] = {0}; |
| char *orgiFileDir = NULL; |
| char *relaFileName; |
| TupleDesc tupdesc; |
| unsigned int targetSeg; |
| DirectoryTable *dirTable; |
| pg_cryptohash_ctx *hashCtx; |
| uint8 md5Sum[MD5_DIGEST_LENGTH]; |
| char errorMessage[256]; |
| GpDistributionData *distData = NULL; /* distribution data used to compute target seg */ |
| |
| /* |
| * We need a ResultRelInfo so we can use the regular executor's |
| * index-entry-making machinery. (There used to be a huge amount of code |
| * here that basically duplicated execUtils.c ...) |
| */ |
| ExecInitRangeTable(estate, cstate->range_table); |
| resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo); |
| ExecInitResultRelation(estate, resultRelInfo, 1); |
| |
| ExecOpenIndices(resultRelInfo, false); |
| |
| /* Prepare to catch AFTER triggers. */ |
| AfterTriggerBeginQuery(); |
| |
| /* |
| * If there are any triggers with transition tables on the named relation, |
| * we need to be prepared to capture transition tuples. |
| */ |
| cstate->transition_capture = MakeTransitionCaptureState(cstate->rel->trigdesc, |
| RelationGetRelid(cstate->rel), |
| CMD_INSERT); |
| |
| /* |
| * Check BEFORE STATEMENT insertion triggers. It's debatable whether we |
| * should do this for COPY, since it's not really an "INSERT" statement as |
| * such. However, executing these triggers maintains consistency with the |
| * EACH ROW triggers that we already fire on COPY. |
| */ |
| ExecBSInsertTriggers(estate, resultRelInfo); |
| |
| /* Assemble directory table file location. */ |
| relaFileName = trimFilePath(glob_copystmt->dirfilename, '/'); |
| relaFileDirPointer = strrchr(relaFileName, '/'); |
| if (relaFileDirPointer) |
| { |
| memcpy(relaFileDir, relaFileName, relaFileDirPointer - relaFileName); |
| } |
| |
| dirTable = GetDirectoryTable(RelationGetRelid(cstate->rel)); |
| dirTablePath = dirTable->location; |
| orgiFileName = psprintf("%s/%s", dirTablePath, relaFileName); |
| if (relaFileDir[0] != '\0') |
| orgiFileDir = psprintf("%s/%s", dirTablePath, relaFileDir); |
| |
| /* |
| * build tupledesc and slot for copy from |
| */ |
| tupdesc = CreateTemplateTupleDesc(5); |
| TupleDescInitEntry(tupdesc, (AttrNumber) 1, "relative_path", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupdesc, (AttrNumber) 2, "size", |
| INT8OID, -1, 0); |
| TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_modified", |
| TIMESTAMPTZOID, -1, 0); |
| TupleDescInitEntry(tupdesc, (AttrNumber) 4, "md5", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupdesc, (AttrNumber) 5, "tag", |
| TEXTOID, -1 ,0); |
| |
| myslot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual); |
| |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| { |
| /* |
| * Initialize information about distribution keys, needed to compute target |
| * segment for each row. |
| */ |
| distData = InitDistributionData(cstate, estate); |
| |
| /* Determine which fields we need to parse in the QD. */ |
| InitCopyFromDispatchSplit(cstate, distData, estate); |
| } |
| |
| if (cstate->dispatch_mode == COPY_DISPATCH || |
| cstate->dispatch_mode == COPY_EXECUTOR) |
| { |
| /* |
| * Now split the attnumlist into the parts that are parsed in the QD, and |
| * in QE. |
| */ |
| ListCell *lc; |
| int i = 0; |
| List *qd_attnumlist = NIL; |
| List *qe_attnumlist = NIL; |
| int first_qe_processed_field; |
| |
| first_qe_processed_field = cstate->first_qe_processed_field; |
| |
| foreach(lc, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(lc); |
| |
| if (i < first_qe_processed_field) |
| qd_attnumlist = lappend_int(qd_attnumlist, attnum); |
| else |
| qe_attnumlist = lappend_int(qe_attnumlist, attnum); |
| i++; |
| } |
| cstate->qd_attnumlist = qd_attnumlist; |
| cstate->qe_attnumlist = qe_attnumlist; |
| } |
| |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| { |
| /* |
| * We are the QD node, and we are receiving rows from client, or |
| * reading them from a file. We are not writing any data locally, |
| * instead, we determine the correct target segment for row, |
| * and forward each to the correct segment. |
| */ |
| |
| /* |
| * pre-allocate buffer for constructing a message. |
| */ |
| cstate->dispatch_msgbuf = makeStringInfo(); |
| enlargeStringInfo(cstate->dispatch_msgbuf, SizeOfCopyFromDispatchRow); |
| |
| formDirTableSlot(cstate, |
| dirTable->spcId, |
| relaFileName, |
| 0, |
| NULL, |
| cstate->opts.tags, |
| myslot->tts_values, |
| myslot->tts_isnull); |
| ExecStoreVirtualTuple(myslot); |
| |
| targetSeg = GetTargetSeg(distData, myslot); |
| |
| /* |
| * prepare to COPY data into segDBs: |
| */ |
| cdbCopy = makeCdbCopyFrom(cstate); |
| |
| /* |
| * Dispatch the COPY command. |
| */ |
| elog(DEBUG5, "COPY command sent to segdbs"); |
| |
| cdbCopyStart(cdbCopy, convertToCopyTextStmt(glob_copystmt), cstate->file_encoding); |
| |
| /* |
| * Header processing. |
| */ |
| SendCopyFromForwardedHeader(cstate, cdbCopy); |
| |
| /* in the QD, forward the row to the correct segment(s). */ |
| SendCopyFromForwardedTuple(cstate, cdbCopy, false, |
| targetSeg, |
| resultRelInfo->ri_RelationDesc, |
| cstate->cur_lineno, |
| cstate->line_buf.data, |
| cstate->line_buf.len, |
| myslot->tts_values, |
| myslot->tts_isnull, |
| true); |
| |
| for (;;) |
| { |
| CHECK_FOR_INTERRUPTS(); |
| |
| bytesRead = CopyReadBinaryData(cstate, buffer, DIR_FILE_BUFF_SIZE); |
| |
| if (bytesRead > 0) |
| { |
| cdbCopySendData(cdbCopy, targetSeg, buffer, bytesRead); |
| } |
| |
| if (bytesRead != DIR_FILE_BUFF_SIZE) |
| { |
| Assert(cstate->raw_reached_eof == true); |
| break; |
| } |
| } |
| |
| { |
| int64 total_completed_from_qes; |
| int64 total_rejected_from_qes; |
| |
| cdbCopyEnd(cdbCopy, |
| &total_completed_from_qes, |
| &total_rejected_from_qes); |
| |
| processed = total_completed_from_qes; |
| } |
| } |
| else if (cstate->dispatch_mode == COPY_EXECUTOR) |
| { |
| #define DIRECTORY_TABLE_COLUMNS 5 |
| List *recheckIndexes = NIL; |
| CommandId mycid = GetCurrentCommandId(true); |
| MemoryContext oldcontext = CurrentMemoryContext; |
| bool has_tuple = false; |
| bool update_indexes; |
| |
| econtext = GetPerTupleExprContext(estate); |
| |
| if (NextCopyFromExecute(cstate, econtext, myslot->tts_values, myslot->tts_isnull, true)) |
| { |
| has_tuple = true; |
| |
| /* |
| * Reset the per-tuple exprcontext. We do this after every tuple, to |
| * clean-up after expression evaluations etc. |
| */ |
| ResetPerTupleExprContext(estate); |
| |
| /* |
| * Switch to per-tuple context before calling NextCopyFrom, which does |
| * evaluate default expressions etc. and requires per-tuple context. |
| */ |
| MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| |
| /* |
| * NextCopyFromExecute set up estate->es_result_relation_info, |
| * and stored the tuple in the correct slot. |
| */ |
| resultRelInfo = estate->es_result_relations[0]; |
| |
| ExecStoreVirtualTuple(myslot); |
| |
| /* |
| * Constraints and where clause might reference the tableoid column, |
| * so (re-)initialize tts_tableOid before evaluating them. |
| */ |
| myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); |
| |
| /* Triggers and stuff need to be invoked in query context. */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| /* OK, store the tuple and create index entries for it */ |
| table_tuple_insert(resultRelInfo->ri_RelationDesc, |
| myslot, mycid, 0, NULL); |
| |
| recheckIndexes = ExecInsertIndexTuples(resultRelInfo, |
| myslot, |
| estate, |
| false, |
| false, |
| NULL, |
| NIL); |
| |
| /* AFTER ROW INSERT Triggers */ |
| ExecARInsertTriggers(estate, resultRelInfo, myslot, |
| recheckIndexes, cstate->transition_capture); |
| |
| list_free(recheckIndexes); |
| |
| CommandCounterIncrement(); |
| } |
| |
| if (has_tuple) |
| { |
| if (UFileExists(dirTable->spcId, orgiFileName)) |
| { |
| UFileUnlink(dirTable->spcId, orgiFileName); |
| } |
| |
| if (orgiFileDir) |
| UFileEnsurePath(dirTable->spcId, orgiFileDir); |
| |
| file = UFileOpen(dirTable->spcId, |
| orgiFileName, |
| O_CREAT | O_WRONLY, |
| errorMessage, |
| sizeof(errorMessage)); |
| |
| if (file == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to open file \"%s\": %s", orgiFileName, errorMessage))); |
| |
| /* Delete uploaded file when the transaction fails */ |
| UFileAddPendingDelete(cstate->rel, dirTable->spcId, orgiFileName, false); |
| |
| /* FIXME: |
| * |
| * Even if we use FileExist function, writing to the same file in two |
| * concurrent sessions can still cause the file content to be corrupted. |
| * |
| * 1. Some DFS don't support renaming files, which means we can't use |
| * the common technique of generating a random filename for upload and |
| * then renaming it to the final name once it's complete. |
| * |
| * 2. Seems like unique index could be a good solution to fix the issue, |
| * But unique indexes can cause concurrent sessions to wait for each |
| * other(see comments in _bt_doinsert), and uploads can take a long time, |
| * so transactions waiting for each other for a long time without releasing |
| * resources is also not ideal(I know that we can set lock timeout). |
| * |
| * Another reason for not using unique index is that the file is uploading |
| * in the copy context, based on the current copy protocol, we are not able |
| * to insert a record first, then, once the file is uploaded, we could update |
| * the record. |
| * |
| * 3. We should probably create a file status table in catalog service |
| * to keep track of files currrently being uploaded. |
| */ |
| hashCtx = pg_cryptohash_create(PG_MD5); |
| if (hashCtx == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_OUT_OF_MEMORY), |
| errmsg("failed to create md5hash context: out of memory"))); |
| pg_cryptohash_init(hashCtx); |
| |
| for (;;) |
| { |
| CHECK_FOR_INTERRUPTS(); |
| |
| bytesRead = CopyGetData(cstate, buffer, DIR_FILE_BUFF_SIZE, DIR_FILE_BUFF_SIZE); |
| |
| if (bytesRead > 0) |
| { |
| if (UFileWrite(file, buffer, bytesRead) == -1) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to write file \"%s\": %s", orgiFileName, UFileGetLastError(file)))); |
| |
| fileSize += bytesRead; |
| pg_cryptohash_update(hashCtx, (const uint8 *) buffer, bytesRead); |
| } |
| |
| if (bytesRead != DIR_FILE_BUFF_SIZE) |
| { |
| break; |
| } |
| } |
| |
| if (UFileSync(file) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("unable to sync the file \"%s\": %s", glob_copystmt->dirfilename, UFileGetLastError(file)))); |
| |
| if (UFileClose(file) < 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to close the file \"%s\": %s", glob_copystmt->dirfilename, UFileGetLastError(file)))); |
| |
| pfree(file); |
| |
| pg_cryptohash_final(hashCtx, md5Sum, sizeof(md5Sum)); |
| pg_cryptohash_free(hashCtx); |
| bytesToHex(md5Sum, hexMd5Sum); |
| |
| myslot->tts_values[1] = Int64GetDatum(fileSize); |
| myslot->tts_values[3] = CStringGetTextDatum((char *) hexMd5Sum); |
| myslot->tts_isnull[3] = false; |
| |
| simple_table_tuple_update(resultRelInfo->ri_RelationDesc, &myslot->tts_tid, myslot, |
| estate->es_snapshot, &update_indexes); |
| |
| ExecClearTuple(myslot); |
| |
| /* |
| * We count only tuples not suppressed by a BEFORE INSERT trigger |
| * or FDW; this is the same definition used by nodeModifyTable.c |
| * for counting tuples inserted by an INSERT command. Update |
| * progress of the COPY command as well. |
| * |
| * MPP: incrementing this counter here only matters for utility |
| * mode. in dispatch mode only the dispatcher COPY collects row |
| * count, so this counter is meaningless. |
| */ |
| pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
| ++processed); |
| |
| } |
| } |
| else |
| { |
| List *recheckIndexes = NIL; |
| CommandId mycid = GetCurrentCommandId(true); |
| bool update_indexes; |
| |
| formDirTableSlot(cstate, |
| dirTable->spcId, |
| relaFileName, |
| 0, |
| NULL, |
| cstate->opts.tags, |
| myslot->tts_values, |
| myslot->tts_isnull); |
| ExecStoreVirtualTuple(myslot); |
| |
| /* OK, store the tuple and create index entries for it */ |
| table_tuple_insert(resultRelInfo->ri_RelationDesc, |
| myslot, mycid, 0, NULL); |
| |
| recheckIndexes = ExecInsertIndexTuples(resultRelInfo, |
| myslot, |
| estate, |
| false, |
| false, |
| NULL, |
| NIL); |
| |
| /* AFTER ROW INSERT Triggers */ |
| ExecARInsertTriggers(estate, resultRelInfo, myslot, |
| recheckIndexes, cstate->transition_capture); |
| |
| list_free(recheckIndexes); |
| |
| CommandCounterIncrement(); |
| |
| if (UFileExists(dirTable->spcId, orgiFileName)) |
| { |
| UFileUnlink(dirTable->spcId, orgiFileName); |
| } |
| |
| if (orgiFileDir) |
| UFileEnsurePath(dirTable->spcId, orgiFileDir); |
| |
| file = UFileOpen(dirTable->spcId, |
| orgiFileName, |
| O_CREAT | O_WRONLY, |
| errorMessage, |
| sizeof(errorMessage)); |
| |
| if (file == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to open file \"%s\": %s", orgiFileName, errorMessage))); |
| |
| /* Delete uploaded file when the transaction fails */ |
| UFileAddPendingDelete(cstate->rel, dirTable->spcId, orgiFileName, false); |
| |
| hashCtx = pg_cryptohash_create(PG_MD5); |
| if (hashCtx == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_OUT_OF_MEMORY), |
| errmsg("failed to create md5hash context: out of memory"))); |
| pg_cryptohash_init(hashCtx); |
| |
| for (;;) |
| { |
| CHECK_FOR_INTERRUPTS(); |
| |
| bytesRead = CopyReadBinaryData(cstate, buffer, DIR_FILE_BUFF_SIZE); |
| |
| if (bytesRead > 0) |
| { |
| if (UFileWrite(file, buffer, bytesRead) == -1) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to write file \"%s\": %s", orgiFileName, UFileGetLastError(file)))); |
| |
| fileSize += bytesRead; |
| pg_cryptohash_update(hashCtx, (const uint8 *) buffer, bytesRead); |
| } |
| |
| if (bytesRead != DIR_FILE_BUFF_SIZE) |
| { |
| break; |
| } |
| } |
| |
| if (UFileSync(file) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("unable to sync file \"%s\": %s", glob_copystmt->dirfilename, UFileGetLastError(file)))); |
| |
| if (UFileClose(file) < 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to close the file \"%s\": %s", glob_copystmt->dirfilename, UFileGetLastError(file)))); |
| |
| pfree(file); |
| |
| pg_cryptohash_final(hashCtx, md5Sum, sizeof(md5Sum)); |
| pg_cryptohash_free(hashCtx); |
| bytesToHex(md5Sum, hexMd5Sum); |
| |
| myslot->tts_values[1] = Int64GetDatum(fileSize); |
| myslot->tts_values[3] = CStringGetTextDatum((char *) hexMd5Sum); |
| myslot->tts_isnull[3] = false; |
| |
| simple_table_tuple_update(resultRelInfo->ri_RelationDesc, &myslot->tts_tid, myslot, |
| estate->es_snapshot, &update_indexes); |
| |
| ExecClearTuple(myslot); |
| |
| /* |
| * We count only tuples not suppressed by a BEFORE INSERT trigger |
| * or FDW; this is the same definition used by nodeModifyTable.c |
| * for counting tuples inserted by an INSERT command. Update |
| * progress of the COPY command as well. |
| * |
| * MPP: incrementing this counter here only matters for utility |
| * mode. in dispatch mode only the dispatcher COPY collects row |
| * count, so this counter is meaningless. |
| */ |
| pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
| ++processed); |
| } |
| |
| cstate->filename = NULL; |
| |
| /* Execute AFTER STATEMENT insertion triggers */ |
| ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture); |
| |
| /* Handle queued AFTER triggers */ |
| AfterTriggerEndQuery(estate); |
| |
| /* |
| * In QE, send the number of rejected rows to the client (QD COPY) if |
| * SREH is on, always send the number of completed rows. |
| */ |
| if (Gp_role == GP_ROLE_EXECUTE) |
| { |
| SendNumRows((cstate->errMode != ALL_OR_NOTHING) ? cstate->cdbsreh->rejectcount : 0, processed); |
| } |
| |
| ExecResetTupleTable(estate->es_tupleTable, false); |
| |
| /* Close the result relations, including any trigger target relations */ |
| ExecCloseResultRelations(estate); |
| ExecCloseRangeTableRelations(estate); |
| |
| FreeDistributionData(distData); |
| FreeExecutorState(estate); |
| |
| return processed; |
| } |
| |
| /* |
| * Setup to read tuple from a file for COPY FROM. |
| * |
| * 'rel': Used as a template for the tuples |
| * 'options': List of DefElem. See copy_opt_item in gram.y for selections. |
| * |
| * Returns a CopyFromState, to be passed to NextCopyFrom and related functions. |
| */ |
| static CopyFromState |
| BeginCopyFromDirectoryTable(ParseState *pstate, |
| Relation rel, |
| const char *filename, |
| bool is_program, |
| List *options) |
| { |
| CopyFromState cstate; |
| bool pipe; |
| MemoryContext oldcontext; |
| TupleDesc tupDesc; |
| AttrNumber num_phys_attrs; |
| FmgrInfo *in_functions; |
| Oid *typioparams; |
| int attnum; |
| Oid in_func_oid; |
| const int progress_cols[] = { |
| PROGRESS_COPY_COMMAND, |
| PROGRESS_COPY_TYPE, |
| PROGRESS_COPY_BYTES_TOTAL |
| }; |
| int64 progress_vals[] = { |
| PROGRESS_COPY_COMMAND_FROM, |
| 0, |
| 0 |
| }; |
| |
| if (!glob_copystmt->dirfilename) |
| ereport(ERROR, |
| (errcode(ERRCODE_UNDEFINED_OBJECT), |
| errmsg("Copy from directory table file name can't be null."))); |
| |
| /* Allocate workspace and zero all fields */ |
| cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData)); |
| |
| /* |
| * We allocate everything used by a cstate in a new memory context. This |
| * avoids memory leaks during repeated use of COPY in a query. |
| */ |
| cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, |
| "COPY", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| /* Process the target relation */ |
| cstate->rel = rel; |
| |
| /* Check whether copy directory table options allowed */ |
| ProcessCopyDirectoryTableOptions(pstate, &cstate->opts, true, options, rel->rd_id); |
| |
| if (Gp_role == GP_ROLE_DISPATCH && !cstate->opts.binary) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("Only support copy binary from directory table."))); |
| |
| cstate->copy_src = COPY_FILE; /* default */ |
| |
| /* |
| * Determine the mode |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH && |
| cstate->rel && cstate->rel->rd_cdbpolicy && |
| cstate->rel->rd_cdbpolicy->ptype != POLICYTYPE_ENTRY) |
| cstate->dispatch_mode = COPY_DISPATCH; |
| /* |
| * Handle case where fdw executes on coordinator while it's acting as a segment |
| * This occurs when fdw is under a redistribute on the coordinator |
| */ |
| else if (Gp_role == GP_ROLE_EXECUTE && |
| cstate->rel && cstate->rel->rd_cdbpolicy && |
| cstate->rel->rd_cdbpolicy->ptype == POLICYTYPE_ENTRY) |
| cstate->dispatch_mode = COPY_DIRECT; |
| else if (Gp_role == GP_ROLE_EXECUTE) |
| cstate->dispatch_mode = COPY_EXECUTOR; |
| else |
| cstate->dispatch_mode = COPY_DIRECT; |
| |
| cstate->cur_relname = RelationGetRelationName(cstate->rel); |
| cstate->cur_lineno = 0; |
| cstate->cur_attname = NULL; |
| cstate->cur_attval = NULL; |
| if (filename) |
| cstate->filename = pstrdup(filename); |
| cstate->file_encoding = GetDatabaseEncoding(); |
| |
| /* |
| * Allocate buffers for the input pipeline. |
| */ |
| cstate->raw_buf = palloc(RAW_BUF_SIZE + 1); |
| cstate->raw_buf_index = cstate->raw_buf_len = 0; |
| MemSet(cstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char)); |
| cstate->raw_buf[RAW_BUF_SIZE] = '\0'; |
| cstate->raw_reached_eof = false; |
| |
| initStringInfo(&cstate->line_buf); |
| |
| /* Assign range table, we'll need it in CopyFrom. */ |
| if (pstate) |
| cstate->range_table = pstate->p_rtable; |
| |
| /* |
| * build tupledesc and slot for copy from |
| */ |
| tupDesc = CreateTemplateTupleDesc(5); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 1, "relative_path", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 2, "size", |
| INT8OID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 3, "last_modified", |
| TIMESTAMPTZOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 4, "md5", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 5, "tag", |
| TEXTOID, -1 ,0); |
| |
| num_phys_attrs = tupDesc->natts; |
| |
| cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, NIL); |
| |
| /* |
| * Pick up the required catalog information for each attribute in the |
| * relation, including the input function, the element type (to pass to |
| * the input function), and info about defaults and constraints. (Which |
| * input function we use depends on text/binary format choice.) |
| */ |
| in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); |
| typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); |
| |
| for (attnum = 1; attnum <= num_phys_attrs; attnum++) |
| { |
| Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); |
| |
| /* Fetch the input function and typioparam info */ |
| getTypeInputInfo(att->atttypid, |
| &in_func_oid, &typioparams[attnum - 1]); |
| fmgr_info(in_func_oid, &in_functions[attnum - 1]); |
| } |
| |
| /* initialize progress */ |
| pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
| cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); |
| cstate->bytes_processed = 0; |
| |
| /* We keep those variables in cstate. */ |
| cstate->in_functions = in_functions; |
| cstate->typioparams = typioparams; |
| cstate->is_program = is_program; |
| |
| pipe = (filename == NULL || cstate->dispatch_mode == COPY_EXECUTOR); |
| |
| if (pipe) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; |
| Assert(!is_program || cstate->dispatch_mode == COPY_EXECUTOR); /* the grammar does not allow this */ |
| if (whereToSendOutput == DestRemote) |
| ReceiveCopyBegin(cstate); |
| else |
| cstate->copy_file = stdin; |
| } |
| else |
| { |
| cstate->filename = pstrdup(filename); |
| |
| if (cstate->is_program) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; |
| cstate->program_pipes = open_program_pipes(cstate->filename, false); |
| cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_R); |
| if (cstate->copy_file == NULL) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not execute command \"%s\": %m", |
| cstate->filename))); |
| } |
| else |
| { |
| struct stat st; |
| |
| progress_vals[1] = PROGRESS_COPY_TYPE_FILE; |
| cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); |
| if (cstate->copy_file == NULL) |
| { |
| /* copy errno because ereport subfunctions might change it */ |
| int save_error = errno; |
| |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not open file \"%s\" for reading: %m", |
| cstate->filename), |
| (save_error == ENOENT || save_error == EACCES) ? |
| errhint("COPY FROM instructs the PostgreSQL server process to read a file. " |
| "You may want a client-side facility such as psql's \\copy.") : 0)); |
| } |
| |
| // Increase buffer size to improve performance (cmcdevitt) |
| /* GPDB_14_MERGE_FIXME: Ret value process. */ |
| setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes |
| |
| if (fstat(fileno(cstate->copy_file), &st)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not stat file \"%s\": %m", |
| cstate->filename))); |
| |
| if (S_ISDIR(st.st_mode)) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("\"%s\" is a directory", cstate->filename))); |
| |
| progress_vals[2] = st.st_size; |
| } |
| } |
| |
| pgstat_progress_update_multi_param(3, progress_cols, progress_vals); |
| |
| if (cstate->dispatch_mode == COPY_EXECUTOR && cstate->copy_src != COPY_CALLBACK) |
| { |
| /* Read special header from QD */ |
| char readSig[sizeof(QDtoQESignature)]; |
| copy_from_dispatch_header header_frame; |
| |
| if (CopyGetData(cstate, &readSig, sizeof(QDtoQESignature), sizeof(QDtoQESignature)) != sizeof(QDtoQESignature) || |
| memcmp(readSig, QDtoQESignature, sizeof(QDtoQESignature)) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("QD->QE COPY communication signature not recognized"))); |
| |
| if (CopyGetData(cstate, &header_frame, sizeof(header_frame), sizeof(header_frame)) != sizeof(header_frame)) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("invalid QD->QD COPY communication header"))); |
| |
| cstate->first_qe_processed_field = header_frame.first_qe_processed_field; |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| return cstate; |
| } |
| |
| /* |
| * Copy FROM file to relation. |
| */ |
| uint64 |
| CopyFrom(CopyFromState cstate) |
| { |
| ResultRelInfo *resultRelInfo; |
| ResultRelInfo *target_resultRelInfo; |
| ResultRelInfo *prevResultRelInfo = NULL; |
| EState *estate = CreateExecutorState(); /* for ExecConstraints() */ |
| ModifyTableState *mtstate; |
| ExprContext *econtext; |
| TupleTableSlot *singleslot = NULL; |
| MemoryContext oldcontext = CurrentMemoryContext; |
| |
| PartitionTupleRouting *proute = NULL; |
| ErrorContextCallback errcallback; |
| CommandId mycid = GetCurrentCommandId(true); |
| int ti_options = 0; /* start with default options for insert */ |
| BulkInsertState bistate = NULL; |
| CopyInsertMethod insertMethod; |
| CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ |
| int64 processed = 0; |
| int64 excluded = 0; |
| bool has_before_insert_row_trig; |
| bool has_instead_insert_row_trig; |
| bool leafpart_use_multi_insert = false; |
| |
| CdbCopy *cdbCopy = NULL; |
| bool is_check_distkey; |
| GpDistributionData *distData = NULL; /* distribution data used to compute target seg */ |
| |
| Assert(cstate->rel); |
| Assert(list_length(cstate->range_table) == 1); |
| |
| /* |
| * The target must be a plain, foreign, or partitioned relation, or have |
| * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only |
| * allowed on views, so we only hint about them in the view case.) |
| */ |
| if (cstate->rel->rd_rel->relkind != RELKIND_RELATION && |
| cstate->rel->rd_rel->relkind != RELKIND_DIRECTORY_TABLE && |
| cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE && |
| cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE && |
| !(cstate->rel->trigdesc && |
| cstate->rel->trigdesc->trig_insert_instead_row)) |
| { |
| if (cstate->rel->rd_rel->relkind == RELKIND_VIEW) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy to view \"%s\"", |
| RelationGetRelationName(cstate->rel)), |
| errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger."))); |
| else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy to materialized view \"%s\"", |
| RelationGetRelationName(cstate->rel)))); |
| else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy to sequence \"%s\"", |
| RelationGetRelationName(cstate->rel)))); |
| else |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy to non-table relation \"%s\"", |
| RelationGetRelationName(cstate->rel)))); |
| } |
| |
| if (cstate->rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE) |
| return CopyFromDirectoryTable(cstate); |
| |
| /* |
| * If the target file is new-in-transaction, we assume that checking FSM |
| * for free space is a waste of time. This could possibly be wrong, but |
| * it's unlikely. |
| */ |
| if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) && |
| (cstate->rel->rd_createSubid != InvalidSubTransactionId || |
| cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId)) |
| ti_options |= TABLE_INSERT_SKIP_FSM; |
| |
| /* |
| * Optimize if new relfilenode was created in this subxact or one of its |
| * committed children and we won't see those rows later as part of an |
| * earlier scan or command. The subxact test ensures that if this subxact |
| * aborts then the frozen rows won't be visible after xact cleanup. Note |
| * that the stronger test of exactly which subtransaction created it is |
| * crucial for correctness of this optimization. The test for an earlier |
| * scan or command tolerates false negatives. FREEZE causes other sessions |
| * to see rows they would not see under MVCC, and a false negative merely |
| * spreads that anomaly to the current session. |
| */ |
| if (cstate->opts.freeze) |
| { |
| /* |
| * We currently disallow COPY FREEZE on partitioned tables. The |
| * reason for this is that we've simply not yet opened the partitions |
| * to determine if the optimization can be applied to them. We could |
| * go and open them all here, but doing so may be quite a costly |
| * overhead for small copies. In any case, we may just end up routing |
| * tuples to a small number of partitions. It seems better just to |
| * raise an ERROR for partitioned tables. |
| */ |
| if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("cannot perform COPY FREEZE on a partitioned table"))); |
| } |
| |
| /* |
| * Tolerate one registration for the benefit of FirstXactSnapshot. |
| * Scan-bearing queries generally create at least two registrations, |
| * though relying on that is fragile, as is ignoring ActiveSnapshot. |
| * Clear CatalogSnapshot to avoid counting its registration. We'll |
| * still detect ongoing catalog scans, each of which separately |
| * registers the snapshot it uses. |
| */ |
| InvalidateCatalogSnapshot(); |
| if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals()) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_TRANSACTION_STATE), |
| errmsg("cannot perform COPY FREEZE because of prior transaction activity"))); |
| |
| if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() && |
| cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId()) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction"))); |
| |
| ti_options |= TABLE_INSERT_FROZEN; |
| } |
| |
| /* |
| * We need a ResultRelInfo so we can use the regular executor's |
| * index-entry-making machinery. (There used to be a huge amount of code |
| * here that basically duplicated execUtils.c ...) |
| */ |
| ExecInitRangeTable(estate, cstate->range_table); |
| resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo); |
| ExecInitResultRelation(estate, resultRelInfo, 1); |
| |
| /* Verify the named relation is a valid target for INSERT */ |
| if (!(cstate->rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE)) |
| CheckValidResultRel(resultRelInfo, CMD_INSERT, NULL); |
| |
| ExecOpenIndices(resultRelInfo, false); |
| |
| /* |
| * Set up a ModifyTableState so we can let FDW(s) init themselves for |
| * foreign-table result relation(s). |
| */ |
| mtstate = makeNode(ModifyTableState); |
| mtstate->ps.plan = NULL; |
| mtstate->ps.state = estate; |
| mtstate->operation = CMD_INSERT; |
| mtstate->mt_nrels = 1; |
| mtstate->resultRelInfo = resultRelInfo; |
| mtstate->rootResultRelInfo = resultRelInfo; |
| |
| if (resultRelInfo->ri_FdwRoutine != NULL && |
| resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) |
| resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, |
| resultRelInfo); |
| |
| /* Prepare to catch AFTER triggers. */ |
| AfterTriggerBeginQuery(); |
| |
| /* |
| * If there are any triggers with transition tables on the named relation, |
| * we need to be prepared to capture transition tuples. |
| * |
| * Because partition tuple routing would like to know about whether |
| * transition capture is active, we also set it in mtstate, which is |
| * passed to ExecFindPartition() below. |
| */ |
| cstate->transition_capture = mtstate->mt_transition_capture = |
| MakeTransitionCaptureState(cstate->rel->trigdesc, |
| RelationGetRelid(cstate->rel), |
| CMD_INSERT); |
| |
| /* |
| * If the named relation is a partitioned table, initialize state for |
| * CopyFrom tuple routing. |
| */ |
| if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| proute = ExecSetupPartitionTupleRouting(estate, cstate->rel); |
| |
| if (cstate->whereClause) |
| cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause), |
| &mtstate->ps); |
| |
| /* |
| * It's generally more efficient to prepare a bunch of tuples for |
| * insertion, and insert them in one table_multi_insert() call, than call |
| * table_tuple_insert() separately for every tuple. However, there are a |
| * number of reasons why we might not be able to do this. These are |
| * explained below. |
| */ |
| if (resultRelInfo->ri_TrigDesc != NULL && |
| (resultRelInfo->ri_TrigDesc->trig_insert_before_row || |
| resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) |
| { |
| /* |
| * Can't support multi-inserts when there are any BEFORE/INSTEAD OF |
| * triggers on the table. Such triggers might query the table we're |
| * inserting into and act differently if the tuples that have already |
| * been processed and prepared for insertion are not there. |
| */ |
| insertMethod = CIM_SINGLE; |
| } |
| else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL && |
| resultRelInfo->ri_TrigDesc->trig_insert_new_table) |
| { |
| /* |
| * For partitioned tables we can't support multi-inserts when there |
| * are any statement level insert triggers. It might be possible to |
| * allow partitioned tables with such triggers in the future, but for |
| * now, CopyMultiInsertInfoFlush expects that any before row insert |
| * and statement level insert triggers are on the same relation. |
| */ |
| insertMethod = CIM_SINGLE; |
| } |
| else if (resultRelInfo->ri_FdwRoutine != NULL || |
| cstate->volatile_defexprs) |
| { |
| /* |
| * Can't support multi-inserts to foreign tables or if there are any |
| * volatile default expressions in the table. Similarly to the |
| * trigger case above, such expressions may query the table we're |
| * inserting into. |
| * |
| * Note: It does not matter if any partitions have any volatile |
| * default expressions as we use the defaults from the target of the |
| * COPY command. |
| */ |
| insertMethod = CIM_SINGLE; |
| } |
| else if (contain_volatile_functions(cstate->whereClause)) |
| { |
| /* |
| * Can't support multi-inserts if there are any volatile function |
| * expressions in WHERE clause. Similarly to the trigger case above, |
| * such expressions may query the table we're inserting into. |
| */ |
| insertMethod = CIM_SINGLE; |
| } |
| else |
| { |
| /* |
| * For partitioned tables, we may still be able to perform bulk |
| * inserts. However, the possibility of this depends on which types |
| * of triggers exist on the partition. We must disable bulk inserts |
| * if the partition is a foreign table or it has any before row insert |
| * or insert instead triggers (same as we checked above for the parent |
| * table). Since the partition's resultRelInfos are initialized only |
| * when we actually need to insert the first tuple into them, we must |
| * have the intermediate insert method of CIM_MULTI_CONDITIONAL to |
| * flag that we must later determine if we can use bulk-inserts for |
| * the partition being inserted into. |
| */ |
| if (proute) |
| insertMethod = CIM_MULTI_CONDITIONAL; |
| else |
| insertMethod = CIM_MULTI; |
| |
| CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate, |
| estate, mycid, ti_options); |
| } |
| |
| /* |
| * If not using batch mode (which allocates slots as needed) set up a |
| * tuple slot too. When inserting into a partitioned table, we also need |
| * one, even if we might batch insert, to read the tuple in the root |
| * partition's form. |
| */ |
| if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL) |
| { |
| singleslot = table_slot_create(resultRelInfo->ri_RelationDesc, |
| &estate->es_tupleTable); |
| bistate = GetBulkInsertState(); |
| } |
| |
| has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
| resultRelInfo->ri_TrigDesc->trig_insert_before_row); |
| |
| has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
| resultRelInfo->ri_TrigDesc->trig_insert_instead_row); |
| |
| /* |
| * Check BEFORE STATEMENT insertion triggers. It's debatable whether we |
| * should do this for COPY, since it's not really an "INSERT" statement as |
| * such. However, executing these triggers maintains consistency with the |
| * EACH ROW triggers that we already fire on COPY. |
| */ |
| ExecBSInsertTriggers(estate, resultRelInfo); |
| |
| econtext = GetPerTupleExprContext(estate); |
| |
| /* Set up callback to identify error line number */ |
| errcallback.callback = CopyFromErrorCallback; |
| errcallback.arg = (void *) cstate; |
| errcallback.previous = error_context_stack; |
| error_context_stack = &errcallback; |
| |
| /* |
| * Do we need to check the distribution keys? Normally, the QD computes the |
| * target segment and sends the data to the correct segment. We don't need to |
| * verify that in the QE anymore. But in ON SEGMENT, we're reading data |
| * directly from a file, and there's no guarantee on what it contains, so we |
| * need to do the checking in the QE. |
| */ |
| is_check_distkey = (cstate->opts.on_segment && Gp_role == GP_ROLE_EXECUTE && gp_enable_segment_copy_checking) ? true : false; |
| |
| /* |
| * Initialize information about distribution keys, needed to compute target |
| * segment for each row. |
| */ |
| if (cstate->dispatch_mode == COPY_DISPATCH || is_check_distkey) |
| { |
| distData = InitDistributionData(cstate, estate); |
| |
| /* |
| * If this table is distributed randomly, there is nothing to check, |
| * after all. |
| */ |
| if (distData->policy == NULL || distData->policy->nattrs == 0) |
| is_check_distkey = false; |
| } |
| |
| /* Determine which fields we need to parse in the QD. */ |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| InitCopyFromDispatchSplit(cstate, distData, estate); |
| |
| if (cstate->dispatch_mode == COPY_DISPATCH || |
| cstate->dispatch_mode == COPY_EXECUTOR) |
| { |
| /* |
| * Now split the attnumlist into the parts that are parsed in the QD, and |
| * in QE. |
| */ |
| ListCell *lc; |
| int i = 0; |
| List *qd_attnumlist = NIL; |
| List *qe_attnumlist = NIL; |
| int first_qe_processed_field; |
| |
| first_qe_processed_field = cstate->first_qe_processed_field; |
| |
| foreach(lc, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(lc); |
| |
| if (i < first_qe_processed_field) |
| qd_attnumlist = lappend_int(qd_attnumlist, attnum); |
| else |
| qe_attnumlist = lappend_int(qe_attnumlist, attnum); |
| i++; |
| } |
| cstate->qd_attnumlist = qd_attnumlist; |
| cstate->qe_attnumlist = qe_attnumlist; |
| } |
| |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| { |
| /* |
| * We are the QD node, and we are receiving rows from client, or |
| * reading them from a file. We are not writing any data locally, |
| * instead, we determine the correct target segment for row, |
| * and forward each to the correct segment. |
| */ |
| |
| /* |
| * pre-allocate buffer for constructing a message. |
| */ |
| cstate->dispatch_msgbuf = makeStringInfo(); |
| enlargeStringInfo(cstate->dispatch_msgbuf, SizeOfCopyFromDispatchRow); |
| |
| /* |
| * prepare to COPY data into segDBs: |
| * - set table partitioning information |
| * - set append only table relevant info for dispatch. |
| * - get the distribution policy for this table. |
| * - build a COPY command to dispatch to segdbs. |
| * - dispatch the modified COPY command to all segment databases. |
| * - prepare cdbhash for hashing on row values. |
| */ |
| cdbCopy = makeCdbCopyFrom(cstate); |
| |
| /* |
| * Dispatch the COPY command. |
| * |
| * From this point in the code we need to be extra careful about error |
| * handling. ereport() must not be called until the COPY command sessions |
| * are closed on the executors. Calling ereport() will leave the executors |
| * hanging in COPY state. |
| * |
| * For errors detected by the dispatcher, we save the error message in |
| * cdbcopy_err StringInfo, move on to closing all COPY sessions on the |
| * executors and only then raise an error. We need to make sure to TRY/CATCH |
| * all other errors that may be raised from elsewhere in the backend. All |
| * error during COPY on the executors will be detected only when we end the |
| * COPY session there, so we are fine there. |
| */ |
| elog(DEBUG5, "COPY command sent to segdbs"); |
| |
| cdbCopyStart(cdbCopy, glob_copystmt, cstate->file_encoding); |
| |
| /* |
| * Skip header processing if dummy file get from master for COPY FROM ON |
| * SEGMENT |
| */ |
| if (!cstate->opts.on_segment) |
| { |
| SendCopyFromForwardedHeader(cstate, cdbCopy); |
| } |
| } |
| |
| CopyInitDataParser(cstate); |
| |
| table_dml_init(resultRelInfo->ri_RelationDesc, CMD_INSERT); |
| |
| for (;;) |
| { |
| TupleTableSlot *myslot; |
| bool skip_tuple; |
| unsigned int target_seg = 0; /* result segment of cdbhash */ |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* |
| * Reset the per-tuple exprcontext. We do this after every tuple, to |
| * clean-up after expression evaluations etc. |
| */ |
| ResetPerTupleExprContext(estate); |
| |
| /* select slot to (initially) load row into */ |
| if (insertMethod == CIM_SINGLE || proute) |
| { |
| myslot = singleslot; |
| Assert(myslot != NULL); |
| } |
| else |
| { |
| Assert(resultRelInfo == target_resultRelInfo); |
| Assert(insertMethod == CIM_MULTI); |
| |
| myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, |
| resultRelInfo); |
| } |
| |
| /* |
| * Switch to per-tuple context before calling NextCopyFrom, which does |
| * evaluate default expressions etc. and requires per-tuple context. |
| */ |
| MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| |
| ExecClearTuple(myslot); |
| |
| if (cstate->dispatch_mode == COPY_EXECUTOR) |
| { |
| if (!NextCopyFromExecute(cstate, econtext, myslot->tts_values, myslot->tts_isnull, false)) |
| break; |
| |
| /* |
| * NextCopyFromExecute set up estate->es_result_relation_info, |
| * and stored the tuple in the correct slot. |
| */ |
| resultRelInfo = estate->es_result_relations[0]; |
| } |
| else |
| { |
| /* Directly store the values/nulls array in the slot */ |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| { |
| if (!NextCopyFromDispatch(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) |
| break; |
| } |
| else |
| { |
| if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) |
| break; |
| } |
| } |
| |
| ExecStoreVirtualTuple(myslot); |
| |
| /* |
| * Constraints and where clause might reference the tableoid column, |
| * so (re-)initialize tts_tableOid before evaluating them. |
| */ |
| myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc); |
| |
| /* Triggers and stuff need to be invoked in query context. */ |
| MemoryContextSwitchTo(oldcontext); |
| |
| if (cstate->whereClause) |
| { |
| econtext->ecxt_scantuple = myslot; |
| /* Skip items that don't match COPY's WHERE clause */ |
| if (!ExecQual(cstate->qualexpr, econtext)) |
| { |
| /* |
| * Report that this tuple was filtered out by the WHERE |
| * clause. |
| */ |
| pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, |
| ++excluded); |
| continue; |
| } |
| } |
| |
| if (cstate->dispatch_mode != COPY_DISPATCH && is_check_distkey) |
| { |
| /* |
| * In COPY FROM ON SEGMENT, check the distribution key in the |
| * QE. |
| * Note: For partitioned tables, the order of the root table's columns can be |
| * inconsistent with the order of the partition's columns and Greenplum/PostgreSQL |
| * allows such behavior. When they have different orders, we need to re-order the |
| * TupleTableSlot (myslot) to make it match the partition's columns (see execute_attr_map_slot() |
| * for details). We must perform this check before the re-ordering of TupleTableslot, |
| * or the value of target_seg will be incorrect. |
| */ |
| if (distData->policy->nattrs != 0) |
| { |
| target_seg = GetTargetSeg(distData, myslot); |
| if (GpIdentity.segindex != target_seg) |
| { |
| PG_TRY(); |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), |
| errmsg("value of distribution key doesn't belong to segment with ID %d, it belongs to segment with ID %d", |
| GpIdentity.segindex, target_seg))); |
| } |
| PG_CATCH(); |
| { |
| HandleCopyError(cstate); |
| } |
| PG_END_TRY(); |
| } |
| } |
| } |
| /* Determine the partition to insert the tuple into */ |
| if (proute && cstate->dispatch_mode != COPY_DISPATCH) |
| { |
| TupleConversionMap *map; |
| bool got_error = false; |
| |
| /* |
| * Attempt to find a partition suitable for this tuple. |
| * ExecFindPartition() will raise an error if none can be found or |
| * if the found partition is not suitable for INSERTs. |
| */ |
| PG_TRY(); |
| { |
| resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo, |
| proute, myslot, estate); |
| } |
| PG_CATCH(); |
| { |
| /* after all the prep work let cdbsreh do the real work */ |
| HandleCopyError(cstate); |
| got_error = true; |
| MemoryContextSwitchTo(oldcontext); |
| } |
| PG_END_TRY(); |
| |
| if (got_error) |
| continue; |
| |
| if (prevResultRelInfo != resultRelInfo) |
| { |
| /* Determine which triggers exist on this partition */ |
| has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
| resultRelInfo->ri_TrigDesc->trig_insert_before_row); |
| |
| has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc && |
| resultRelInfo->ri_TrigDesc->trig_insert_instead_row); |
| |
| /* |
| * Disable multi-inserts when the partition has BEFORE/INSTEAD |
| * OF triggers, or if the partition is a foreign partition. |
| */ |
| leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL && |
| !has_before_insert_row_trig && |
| !has_instead_insert_row_trig && |
| resultRelInfo->ri_FdwRoutine == NULL; |
| |
| /* Set the multi-insert buffer to use for this partition. */ |
| if (leafpart_use_multi_insert) |
| { |
| if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) |
| CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, |
| resultRelInfo); |
| } |
| else if (insertMethod == CIM_MULTI_CONDITIONAL && |
| !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) |
| { |
| /* |
| * Flush pending inserts if this partition can't use |
| * batching, so rows are visible to triggers etc. |
| */ |
| CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); |
| } |
| |
| if (bistate != NULL) |
| ReleaseBulkInsertStatePin(bistate); |
| prevResultRelInfo = resultRelInfo; |
| } |
| |
| /* |
| * If we're capturing transition tuples, we might need to convert |
| * from the partition rowtype to root rowtype. But if there are no |
| * BEFORE triggers on the partition that could change the tuple, |
| * we can just remember the original unconverted tuple to avoid a |
| * needless round trip conversion. |
| */ |
| if (cstate->transition_capture != NULL) |
| cstate->transition_capture->tcs_original_insert_tuple = |
| !has_before_insert_row_trig ? myslot : NULL; |
| |
| /* |
| * We might need to convert from the root rowtype to the partition |
| * rowtype. |
| */ |
| map = resultRelInfo->ri_RootToPartitionMap; |
| if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert) |
| { |
| /* non batch insert */ |
| if (map != NULL) |
| { |
| TupleTableSlot *new_slot; |
| |
| new_slot = resultRelInfo->ri_PartitionTupleSlot; |
| myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot); |
| } |
| } |
| else |
| { |
| /* |
| * Prepare to queue up tuple for later batch insert into |
| * current partition. |
| */ |
| TupleTableSlot *batchslot; |
| |
| /* no other path available for partitioned table */ |
| Assert(insertMethod == CIM_MULTI_CONDITIONAL); |
| |
| batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo, |
| resultRelInfo); |
| |
| if (map != NULL) |
| myslot = execute_attr_map_slot(map->attrMap, myslot, |
| batchslot); |
| else |
| { |
| /* |
| * This looks more expensive than it is (Believe me, I |
| * optimized it away. Twice.). The input is in virtual |
| * form, and we'll materialize the slot below - for most |
| * slot types the copy performs the work materialization |
| * would later require anyway. |
| */ |
| ExecCopySlot(batchslot, myslot); |
| myslot = batchslot; |
| } |
| } |
| |
| /* ensure that triggers etc see the right relation */ |
| myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| } |
| |
| skip_tuple = false; |
| |
| /* |
| * Compute which segment this row belongs to. |
| */ |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| { |
| /* In QD, compute the target segment to send this row to. */ |
| target_seg = GetTargetSeg(distData, myslot); |
| bool send_to_all = distData && |
| GpPolicyIsReplicated(distData->policy); |
| |
| /* in the QD, forward the row to the correct segment(s). */ |
| SendCopyFromForwardedTuple(cstate, cdbCopy, send_to_all, |
| send_to_all ? 0 : target_seg, |
| resultRelInfo->ri_RelationDesc, |
| cstate->cur_lineno, |
| cstate->line_buf.data, |
| cstate->line_buf.len, |
| myslot->tts_values, |
| myslot->tts_isnull, |
| false); |
| skip_tuple = true; |
| processed++; |
| } |
| |
| /* BEFORE ROW INSERT Triggers */ |
| if (has_before_insert_row_trig) |
| { |
| if (!skip_tuple && !ExecBRInsertTriggers(estate, resultRelInfo, myslot)) |
| skip_tuple = true; /* "do nothing" */ |
| } |
| |
| if (!skip_tuple) |
| { |
| /* |
| * If there is an INSTEAD OF INSERT ROW trigger, let it handle the |
| * tuple. Otherwise, proceed with inserting the tuple into the |
| * table or foreign table. |
| */ |
| if (has_instead_insert_row_trig) |
| { |
| ExecIRInsertTriggers(estate, resultRelInfo, myslot); |
| } |
| else |
| { |
| /* Compute stored generated columns */ |
| if (resultRelInfo->ri_RelationDesc->rd_att->constr && |
| resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored) |
| ExecComputeStoredGenerated(resultRelInfo, estate, myslot, |
| CMD_INSERT); |
| |
| /* |
| * If the target is a plain table, check the constraints of |
| * the tuple. |
| */ |
| if (resultRelInfo->ri_FdwRoutine == NULL && |
| resultRelInfo->ri_RelationDesc->rd_att->constr) |
| ExecConstraints(resultRelInfo, myslot, estate); |
| |
| /* |
| * Also check the tuple against the partition constraint, if |
| * there is one; except that if we got here via tuple-routing, |
| * we don't need to if there's no BR trigger defined on the |
| * partition. |
| */ |
| if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition && |
| (proute == NULL || has_before_insert_row_trig)) |
| ExecPartitionCheck(resultRelInfo, myslot, estate, true); |
| |
| /* Store the slot in the multi-insert buffer, when enabled. */ |
| if (insertMethod == CIM_MULTI || leafpart_use_multi_insert) |
| { |
| /* |
| * The slot previously might point into the per-tuple |
| * context. For batching it needs to be longer lived. |
| */ |
| ExecMaterializeSlot(myslot); |
| |
| /* Add this tuple to the tuple buffer */ |
| CopyMultiInsertInfoStore(&multiInsertInfo, |
| resultRelInfo, myslot, |
| cstate->line_buf.len, |
| cstate->cur_lineno); |
| |
| /* |
| * If enough inserts have queued up, then flush all |
| * buffers out to their tables. |
| */ |
| if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) |
| CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); |
| } |
| else |
| { |
| List *recheckIndexes = NIL; |
| |
| /* OK, store the tuple */ |
| if (resultRelInfo->ri_FdwRoutine != NULL) |
| { |
| myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate, |
| resultRelInfo, |
| myslot, |
| NULL); |
| |
| if (myslot == NULL) /* "do nothing" */ |
| continue; /* next tuple please */ |
| |
| /* |
| * AFTER ROW Triggers might reference the tableoid |
| * column, so (re-)initialize tts_tableOid before |
| * evaluating them. |
| */ |
| myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); |
| } |
| else |
| { |
| /* OK, store the tuple and create index entries for it */ |
| table_tuple_insert(resultRelInfo->ri_RelationDesc, |
| myslot, mycid, ti_options, bistate); |
| |
| if (resultRelInfo->ri_NumIndices > 0) |
| recheckIndexes = ExecInsertIndexTuples(resultRelInfo, |
| myslot, |
| estate, |
| false, |
| false, |
| NULL, |
| NIL); |
| } |
| |
| /* AFTER ROW INSERT Triggers */ |
| ExecARInsertTriggers(estate, resultRelInfo, myslot, |
| recheckIndexes, cstate->transition_capture); |
| |
| list_free(recheckIndexes); |
| } |
| } |
| |
| /* |
| * We count only tuples not suppressed by a BEFORE INSERT trigger |
| * or FDW; this is the same definition used by nodeModifyTable.c |
| * for counting tuples inserted by an INSERT command. Update |
| * progress of the COPY command as well. |
| * |
| * MPP: incrementing this counter here only matters for utility |
| * mode. in dispatch mode only the dispatcher COPY collects row |
| * count, so this counter is meaningless. |
| */ |
| pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
| ++processed); |
| if (cstate->cdbsreh) |
| cstate->cdbsreh->processed++; |
| } |
| } |
| /* |
| * After processed data from QD, which is empty and just for workflow, now |
| * to process the data on segment, only one shot if cstate->opts.on_segment && |
| * Gp_role == GP_ROLE_DISPATCH |
| */ |
| // wrong commens here |
| if (cstate->opts.on_segment && Gp_role == GP_ROLE_EXECUTE) |
| { |
| CopyInitDataParser(cstate); |
| } |
| elog(DEBUG1, "Segment %u, Copied %lu rows.", GpIdentity.segindex, processed); |
| |
| /* Flush any remaining buffered tuples */ |
| if (insertMethod != CIM_SINGLE) |
| { |
| if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) |
| CopyMultiInsertInfoFlush(&multiInsertInfo, NULL); |
| } |
| |
| /* Done, clean up */ |
| error_context_stack = errcallback.previous; |
| |
| if (bistate != NULL) |
| FreeBulkInsertState(bistate); |
| |
| table_dml_fini(resultRelInfo->ri_RelationDesc, CMD_INSERT); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| /* |
| * Done reading input data and sending it off to the segment |
| * databases Now we would like to end the copy command on |
| * all segment databases across the cluster. |
| */ |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| { |
| int64 total_completed_from_qes; |
| int64 total_rejected_from_qes; |
| |
| cdbCopyEnd(cdbCopy, |
| &total_completed_from_qes, |
| &total_rejected_from_qes); |
| |
| /* |
| * Reset returned processed to total_completed_from_qes. |
| * |
| * processed above excludes only rejected rows on QD, it |
| * should also exclude rejected rows on QEs. |
| * |
| * NOTE: |
| * total_completed_from_qes + total_rejected_from_qes <= # of |
| * input file rows |
| * |
| * total_rejected_from_qes includes only rows rejected by |
| * SREH; however, total_completed_from_qes excludes both |
| * SREH-rejected rows and TRIGGER-rejected rows. |
| */ |
| processed = total_completed_from_qes; |
| |
| if (cstate->cdbsreh) |
| { |
| /* emit a NOTICE with number of rejected rows */ |
| uint64 total_rejected = 0; |
| uint64 total_rejected_from_qd = cstate->cdbsreh->rejectcount; |
| |
| /* |
| * If error log has been requested, then we send the row to the segment |
| * so that it can be written in the error log file. The segment process |
| * counts it again as a rejected row. So we ignore the reject count |
| * from the master and only consider the reject count from segments. |
| */ |
| if (IS_LOG_TO_FILE(cstate->cdbsreh->logerrors)) |
| total_rejected_from_qd = 0; |
| |
| total_rejected = total_rejected_from_qd + total_rejected_from_qes; |
| |
| ReportSrehResults(cstate->cdbsreh, total_rejected); |
| } |
| } |
| /* In dispatcher, we have the report for rejected count. We should have it in singlenode too. */ |
| else if (IS_SINGLENODE() && cstate->cdbsreh) |
| { |
| ReportSrehResults(cstate->cdbsreh, cstate->cdbsreh->rejectcount); |
| } |
| |
| /* Execute AFTER STATEMENT insertion triggers */ |
| ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture); |
| |
| /* Handle queued AFTER triggers */ |
| AfterTriggerEndQuery(estate); |
| |
| /* |
| * In QE, send the number of rejected rows to the client (QD COPY) if |
| * SREH is on, always send the number of completed rows. |
| */ |
| if (Gp_role == GP_ROLE_EXECUTE) |
| { |
| SendNumRows((cstate->errMode != ALL_OR_NOTHING) ? cstate->cdbsreh->rejectcount : 0, processed); |
| } |
| |
| ExecResetTupleTable(estate->es_tupleTable, false); |
| |
| /* Allow the FDW to shut down */ |
| if (target_resultRelInfo->ri_FdwRoutine != NULL && |
| target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) |
| target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate, |
| target_resultRelInfo); |
| |
| /* Tear down the multi-insert buffer data */ |
| if (insertMethod != CIM_SINGLE) |
| CopyMultiInsertInfoCleanup(&multiInsertInfo); |
| |
| /* Close all the partitioned tables, leaf partitions, and their indices */ |
| if (proute) |
| ExecCleanupTupleRouting(mtstate, proute); |
| |
| /* Close the result relations, including any trigger target relations */ |
| ExecCloseResultRelations(estate); |
| ExecCloseRangeTableRelations(estate); |
| |
| FreeDistributionData(distData); |
| |
| FreeExecutorState(estate); |
| |
| return processed; |
| } |
| |
| /* |
| * Setup to read tuples from a file for COPY FROM. |
| * |
| * 'rel': Used as a template for the tuples |
| * 'whereClause': WHERE clause from the COPY FROM command |
| * 'filename': Name of server-local file to read, NULL for STDIN |
| * 'is_program': true if 'filename' is program to execute |
| * 'data_source_cb': callback that provides the input data |
| * 'attnamelist': List of char *, columns to include. NIL selects all cols. |
| * 'options': List of DefElem. See copy_opt_item in gram.y for selections. |
| * |
| * Returns a CopyFromState, to be passed to NextCopyFrom and related functions. |
| */ |
| CopyFromState |
| BeginCopyFrom(ParseState *pstate, |
| Relation rel, |
| Node *whereClause, |
| const char *filename, |
| bool is_program, |
| copy_data_source_cb data_source_cb, |
| void *data_source_cb_extra, |
| List *attnamelist, |
| List *options) |
| { |
| CopyFromState cstate; |
| bool pipe; |
| TupleDesc tupDesc; |
| AttrNumber num_phys_attrs, |
| num_defaults; |
| FmgrInfo *in_functions; |
| Oid *typioparams; |
| int attnum; |
| Oid in_func_oid; |
| int *defmap; |
| ExprState **defexprs; |
| MemoryContext oldcontext; |
| bool volatile_defexprs; |
| const int progress_cols[] = { |
| PROGRESS_COPY_COMMAND, |
| PROGRESS_COPY_TYPE, |
| PROGRESS_COPY_BYTES_TOTAL |
| }; |
| int64 progress_vals[] = { |
| PROGRESS_COPY_COMMAND_FROM, |
| 0, |
| 0 |
| }; |
| |
| if (rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE) |
| return BeginCopyFromDirectoryTable(pstate, rel, filename, is_program, options); |
| |
| /* Allocate workspace and zero all fields */ |
| cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData)); |
| |
| /* |
| * We allocate everything used by a cstate in a new memory context. This |
| * avoids memory leaks during repeated use of COPY in a query. |
| */ |
| cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, |
| "COPY", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| /* Process the target relation */ |
| cstate->rel = rel; |
| cstate->escape_off = false; |
| |
| /* Extract options from the statement node tree */ |
| ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options, rel->rd_id); |
| |
| if (cstate->opts.escape != NULL && pg_strcasecmp(cstate->opts.escape, "off") == 0) |
| { |
| cstate->escape_off = true; |
| } |
| |
| if (cstate->opts.delim_off && !rel_is_external_table(rel->rd_id)) |
| { |
| /* |
| * We don't support delimiter 'off' for COPY because the QD COPY |
| * sometimes internally adds columns to the data that it sends to |
| * the QE COPY modules, and it uses the delimiter for it. There |
| * are ways to work around this but for now it's not important and |
| * we simply don't support it. |
| */ |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("using no delimiter is only supported for external tables"))); |
| } |
| |
| tupDesc = RelationGetDescr(cstate->rel); |
| |
| /* process commmon options or initialization */ |
| |
| /* Generate or convert list of attributes to process */ |
| cstate->attnamelist = attnamelist; |
| cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); |
| |
| num_phys_attrs = tupDesc->natts; |
| |
| /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ |
| cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
| if (cstate->opts.force_notnull) |
| { |
| List *attnums; |
| ListCell *cur; |
| |
| attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull); |
| |
| foreach(cur, attnums) |
| { |
| int attnum = lfirst_int(cur); |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (!list_member_int(cstate->attnumlist, attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
| errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", |
| NameStr(attr->attname)))); |
| cstate->opts.force_notnull_flags[attnum - 1] = true; |
| } |
| } |
| |
| /* Convert FORCE_NULL name list to per-column flags, check validity */ |
| cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
| if (cstate->opts.force_null) |
| { |
| List *attnums; |
| ListCell *cur; |
| |
| attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null); |
| |
| foreach(cur, attnums) |
| { |
| int attnum = lfirst_int(cur); |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (!list_member_int(cstate->attnumlist, attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
| errmsg("FORCE_NULL column \"%s\" not referenced by COPY", |
| NameStr(attr->attname)))); |
| cstate->opts.force_null_flags[attnum - 1] = true; |
| } |
| } |
| |
| /* Convert convert_selectively name list to per-column flags */ |
| if (cstate->opts.convert_selectively) |
| { |
| List *attnums; |
| ListCell *cur; |
| |
| cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
| |
| attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select); |
| |
| foreach(cur, attnums) |
| { |
| int attnum = lfirst_int(cur); |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (!list_member_int(cstate->attnumlist, attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
| errmsg_internal("selected column \"%s\" not referenced by COPY", |
| NameStr(attr->attname)))); |
| cstate->convert_select_flags[attnum - 1] = true; |
| } |
| } |
| |
| /* Use client encoding when ENCODING option is not specified. */ |
| if (cstate->opts.file_encoding < 0) |
| cstate->file_encoding = pg_get_client_encoding(); |
| else |
| cstate->file_encoding = cstate->opts.file_encoding; |
| |
| /* |
| * Look up encoding conversion function. |
| */ |
| if (cstate->file_encoding == GetDatabaseEncoding() || |
| cstate->file_encoding == PG_SQL_ASCII || |
| GetDatabaseEncoding() == PG_SQL_ASCII) |
| { |
| cstate->need_transcoding = false; |
| } |
| else |
| { |
| cstate->need_transcoding = true; |
| cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding, GetDatabaseEncoding()); |
| } |
| |
| /* |
| * Set up encoding conversion info. Even if the file and server encodings |
| * are the same, we must apply pg_any_to_server() to validate data in |
| * multibyte encodings. |
| * |
| * In COPY_EXECUTE mode, the dispatcher has already done the conversion. |
| */ |
| if (cstate->dispatch_mode == COPY_DISPATCH) |
| cstate->need_transcoding = false; |
| |
| cstate->copy_src = COPY_FILE; /* default */ |
| |
| cstate->whereClause = whereClause; |
| |
| /* |
| * Determine the mode |
| */ |
| if (cstate->opts.on_segment || data_source_cb) |
| cstate->dispatch_mode = COPY_DIRECT; |
| else if (Gp_role == GP_ROLE_DISPATCH && |
| cstate->rel && cstate->rel->rd_cdbpolicy && |
| cstate->rel->rd_cdbpolicy->ptype != POLICYTYPE_ENTRY) |
| cstate->dispatch_mode = COPY_DISPATCH; |
| /* |
| * Handle case where fdw executes on coordinator while it's acting as a segment |
| * This occurs when fdw is under a redistribute on the coordinator |
| */ |
| else if (Gp_role == GP_ROLE_EXECUTE && |
| cstate->rel && cstate->rel->rd_cdbpolicy && |
| cstate->rel->rd_cdbpolicy->ptype == POLICYTYPE_ENTRY) |
| cstate->dispatch_mode = COPY_DIRECT; |
| else if (Gp_role == GP_ROLE_EXECUTE) |
| cstate->dispatch_mode = COPY_EXECUTOR; |
| else |
| cstate->dispatch_mode = COPY_DIRECT; |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| /* Initialize state variables */ |
| // cstate->eol_type = EOL_UNKNOWN; /* GPDB: don't overwrite value set in ProcessCopyOptions */ |
| cstate->cur_relname = RelationGetRelationName(cstate->rel); |
| cstate->cur_lineno = 0; |
| cstate->cur_attname = NULL; |
| cstate->cur_attval = NULL; |
| |
| /* |
| * Allocate buffers for the input pipeline. |
| * |
| * attribute_buf and raw_buf are used in both text and binary modes, but |
| * input_buf and line_buf only in text mode. |
| */ |
| cstate->raw_buf = palloc(RAW_BUF_SIZE + 1); |
| cstate->raw_buf_index = cstate->raw_buf_len = 0; |
| MemSet(cstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char)); |
| cstate->raw_buf[RAW_BUF_SIZE] = '\0'; |
| cstate->raw_reached_eof = false; |
| |
| if (!cstate->opts.binary) |
| { |
| /* |
| * If encoding conversion is needed, we need another buffer to hold |
| * the converted input data. Otherwise, we can just point input_buf |
| * to the same buffer as raw_buf. |
| */ |
| if (cstate->need_transcoding) |
| { |
| cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); |
| cstate->input_buf_index = cstate->input_buf_len = 0; |
| } |
| else |
| cstate->input_buf = cstate->raw_buf; |
| cstate->input_reached_eof = false; |
| |
| initStringInfo(&cstate->line_buf); |
| } |
| |
| initStringInfo(&cstate->attribute_buf); |
| initStringInfo(&cstate->line_buf); |
| |
| /* Assign range table, we'll need it in CopyFrom. */ |
| if (pstate) |
| cstate->range_table = pstate->p_rtable; |
| |
| tupDesc = RelationGetDescr(cstate->rel); |
| num_phys_attrs = tupDesc->natts; |
| num_defaults = 0; |
| volatile_defexprs = false; |
| |
| /* |
| * Pick up the required catalog information for each attribute in the |
| * relation, including the input function, the element type (to pass to |
| * the input function), and info about defaults and constraints. (Which |
| * input function we use depends on text/binary format choice.) |
| */ |
| in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); |
| typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid)); |
| defmap = (int *) palloc(num_phys_attrs * sizeof(int)); |
| defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *)); |
| |
| for (attnum = 1; attnum <= num_phys_attrs; attnum++) |
| { |
| Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1); |
| |
| /* We don't need info for dropped attributes */ |
| if (att->attisdropped) |
| continue; |
| |
| /* Fetch the input function and typioparam info */ |
| if (cstate->opts.binary) |
| getTypeBinaryInputInfo(att->atttypid, |
| &in_func_oid, &typioparams[attnum - 1]); |
| else |
| getTypeInputInfo(att->atttypid, |
| &in_func_oid, &typioparams[attnum - 1]); |
| fmgr_info(in_func_oid, &in_functions[attnum - 1]); |
| |
| /* Get default info if needed */ |
| if (!list_member_int(cstate->attnumlist, attnum) && !att->attgenerated) |
| { |
| /* attribute is NOT to be copied from input */ |
| /* use default value if one exists */ |
| Expr *defexpr = (Expr *) build_column_default(cstate->rel, |
| attnum); |
| |
| if (defexpr != NULL) |
| { |
| /* Run the expression through planner */ |
| defexpr = expression_planner(defexpr); |
| |
| /* Initialize executable expression in copycontext */ |
| defexprs[num_defaults] = ExecInitExpr(defexpr, NULL); |
| defmap[num_defaults] = attnum - 1; |
| num_defaults++; |
| |
| /* |
| * If a default expression looks at the table being loaded, |
| * then it could give the wrong answer when using |
| * multi-insert. Since database access can be dynamic this is |
| * hard to test for exactly, so we use the much wider test of |
| * whether the default expression is volatile. We allow for |
| * the special case of when the default expression is the |
| * nextval() of a sequence which in this specific case is |
| * known to be safe for use with the multi-insert |
| * optimization. Hence we use this special case function |
| * checker rather than the standard check for |
| * contain_volatile_functions(). |
| */ |
| if (!volatile_defexprs) |
| volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr); |
| } |
| } |
| } |
| |
| |
| /* initialize progress */ |
| pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
| cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); |
| cstate->bytes_processed = 0; |
| |
| /* We keep those variables in cstate. */ |
| cstate->in_functions = in_functions; |
| cstate->typioparams = typioparams; |
| cstate->defmap = defmap; |
| cstate->defexprs = defexprs; |
| cstate->volatile_defexprs = volatile_defexprs; |
| cstate->num_defaults = num_defaults; |
| cstate->is_program = is_program; |
| |
| pipe = (filename == NULL || cstate->dispatch_mode == COPY_EXECUTOR); |
| |
| if (cstate->opts.on_segment && Gp_role == GP_ROLE_DISPATCH) |
| { |
| /* open nothing */ |
| |
| if (filename == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("STDIN is not supported by 'COPY ON SEGMENT'"))); |
| } |
| else if (data_source_cb) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; |
| cstate->copy_src = COPY_CALLBACK; |
| cstate->data_source_cb = data_source_cb; |
| cstate->data_source_cb_extra = data_source_cb_extra; |
| } |
| else if (pipe) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; |
| Assert(!is_program || cstate->dispatch_mode == COPY_EXECUTOR); /* the grammar does not allow this */ |
| if (whereToSendOutput == DestRemote) |
| ReceiveCopyBegin(cstate); |
| else |
| cstate->copy_file = stdin; |
| } |
| else |
| { |
| cstate->filename = pstrdup(filename); |
| |
| if (cstate->opts.on_segment) |
| MangleCopyFileName(&cstate->filename, cstate->cdbsreh); |
| |
| if (cstate->is_program) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; |
| cstate->program_pipes = open_program_pipes(cstate->filename, false); |
| cstate->copy_file = fdopen(cstate->program_pipes->pipes[0], PG_BINARY_R); |
| if (cstate->copy_file == NULL) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not execute command \"%s\": %m", |
| cstate->filename))); |
| } |
| else |
| { |
| struct stat st; |
| |
| progress_vals[1] = PROGRESS_COPY_TYPE_FILE; |
| cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); |
| if (cstate->copy_file == NULL) |
| { |
| /* copy errno because ereport subfunctions might change it */ |
| int save_errno = errno; |
| |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not open file \"%s\" for reading: %m", |
| cstate->filename), |
| (save_errno == ENOENT || save_errno == EACCES) ? |
| errhint("COPY FROM instructs the PostgreSQL server process to read a file. " |
| "You may want a client-side facility such as psql's \\copy.") : 0)); |
| } |
| |
| // Increase buffer size to improve performance (cmcdevitt) |
| /* GPDB_14_MERGE_FIXME: Ret value process. */ |
| setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes |
| |
| if (fstat(fileno(cstate->copy_file), &st)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not stat file \"%s\": %m", |
| cstate->filename))); |
| |
| if (S_ISDIR(st.st_mode)) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("\"%s\" is a directory", cstate->filename))); |
| |
| progress_vals[2] = st.st_size; |
| } |
| } |
| |
| pgstat_progress_update_multi_param(3, progress_cols, progress_vals); |
| |
| if (cstate->opts.on_segment && Gp_role == GP_ROLE_DISPATCH) |
| { |
| /* nothing to do */ |
| } |
| else if (cstate->dispatch_mode == COPY_EXECUTOR && cstate->copy_src != COPY_CALLBACK) |
| { |
| /* Read special header from QD */ |
| char readSig[sizeof(QDtoQESignature)]; |
| copy_from_dispatch_header header_frame; |
| |
| if (CopyGetData(cstate, &readSig, sizeof(QDtoQESignature), sizeof(QDtoQESignature)) != sizeof(QDtoQESignature) || |
| memcmp(readSig, QDtoQESignature, sizeof(QDtoQESignature)) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("QD->QE COPY communication signature not recognized"))); |
| |
| if (CopyGetData(cstate, &header_frame, sizeof(header_frame), sizeof(header_frame)) != sizeof(header_frame)) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("invalid QD->QD COPY communication header"))); |
| |
| cstate->first_qe_processed_field = header_frame.first_qe_processed_field; |
| } |
| else if (cstate->opts.binary) |
| { |
| if (cstate->rel->rd_rel->relkind != RELKIND_DIRECTORY_TABLE) |
| { |
| /* Read and verify binary header */ |
| ReceiveCopyBinaryHeader(cstate); |
| } |
| } |
| |
| /* create workspace for CopyReadAttributes results */ |
| if (!cstate->opts.binary) |
| { |
| AttrNumber attr_count = list_length(cstate->attnumlist); |
| |
| cstate->max_fields = attr_count; |
| cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); |
| } |
| |
| cstate->find_eol_with_rawreading = false; |
| MemoryContextSwitchTo(oldcontext); |
| |
| return cstate; |
| } |
| |
| /* |
| * Read raw fields in the next line for COPY FROM in text or csv mode. |
| * Return false if no more lines. |
| * |
| * An internal temporary buffer is returned via 'fields'. It is valid until |
| * the next call of the function. Since the function returns all raw fields |
| * in the input file, 'nfields' could be different from the number of columns |
| * in the relation. |
| * |
| * NOTE: force_not_null option are not applied to the returned fields. |
| */ |
| bool |
| NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) |
| { |
| return NextCopyFromRawFieldsX(cstate, fields, nfields, -1); |
| } |
| |
| static bool |
| NextCopyFromRawFieldsX(CopyFromState cstate, char ***fields, int *nfields, |
| int stop_processing_at_field) |
| { |
| int fldct; |
| bool done; |
| |
| /* only available for text or csv input */ |
| Assert(!cstate->opts.binary); |
| |
| /* on input just throw the header line away */ |
| if (cstate->cur_lineno == 0 && cstate->opts.header_line) |
| { |
| cstate->cur_lineno++; |
| if (CopyReadLine(cstate)) |
| return false; /* done */ |
| } |
| |
| cstate->cur_lineno++; |
| |
| /* Actually read the line into memory here */ |
| done = CopyReadLine(cstate); |
| |
| /* |
| * EOF at start of line means we're done. If we see EOF after some |
| * characters, we act as though it was newline followed by EOF, ie, |
| * process the line and then exit loop on next iteration. |
| */ |
| if (done && cstate->line_buf.len == 0) |
| return false; |
| |
| /* Parse the line into de-escaped field values */ |
| if (cstate->opts.csv_mode) |
| fldct = CopyReadAttributesCSV(cstate, stop_processing_at_field); |
| else |
| fldct = CopyReadAttributesText(cstate, stop_processing_at_field); |
| |
| *fields = cstate->raw_fields; |
| *nfields = fldct; |
| return true; |
| } |
| |
| bool |
| NextCopyFrom(CopyFromState cstate, ExprContext *econtext, |
| Datum *values, bool *nulls) |
| { |
| if (!cstate->cdbsreh) |
| return NextCopyFromX(cstate, econtext, values, nulls); |
| else |
| { |
| MemoryContext oldcontext = CurrentMemoryContext; |
| |
| for (;;) |
| { |
| bool got_error = false; |
| bool result = false; |
| |
| PG_TRY(); |
| { |
| result = NextCopyFromX(cstate, econtext, values, nulls); |
| } |
| PG_CATCH(); |
| { |
| HandleCopyError(cstate); /* cdbsreh->processed is updated inside here */ |
| got_error = true; |
| MemoryContextSwitchTo(oldcontext); |
| } |
| PG_END_TRY(); |
| |
| if (result) |
| cstate->cdbsreh->processed++; |
| |
| if (!got_error) |
| return result; |
| } |
| } |
| } |
| |
| /* |
| * A data error happened. This code block will always be inside a PG_CATCH() |
| * block right when a higher stack level produced an error. We handle the error |
| * by checking which error mode is set (SREH or all-or-nothing) and do the right |
| * thing accordingly. Note that we MUST have this code in a macro (as opposed |
| * to a function) as elog_dismiss() has to be inlined with PG_CATCH in order to |
| * access local error state variables. |
| * |
| * changing me? take a look at FILEAM_HANDLE_ERROR in fileam.c as well. |
| */ |
| void |
| HandleCopyError(CopyFromState cstate) |
| { |
| if (cstate->errMode == ALL_OR_NOTHING) |
| { |
| /* re-throw error and abort */ |
| PG_RE_THROW(); |
| } |
| /* SREH must only handle data errors. all other errors must not be caught */ |
| if (ERRCODE_TO_CATEGORY(elog_geterrcode()) != ERRCODE_DATA_EXCEPTION) |
| { |
| /* re-throw error and abort */ |
| PG_RE_THROW(); |
| } |
| else |
| { |
| /* SREH - release error state and handle error */ |
| MemoryContext oldcontext; |
| ErrorData *edata; |
| char *errormsg; |
| CdbSreh *cdbsreh = cstate->cdbsreh; |
| |
| cdbsreh->processed++; |
| |
| oldcontext = MemoryContextSwitchTo(cstate->cdbsreh->badrowcontext); |
| |
| /* save a copy of the error info */ |
| edata = CopyErrorData(); |
| |
| FlushErrorState(); |
| |
| /* |
| * set the error message. Use original msg and add column name if available. |
| * We do this even if we're not logging the errors, because |
| * ErrorIfRejectLimit() below will use this information in the error message, |
| * if the error count is reached. |
| */ |
| cdbsreh->rawdata->cursor = 0; |
| cdbsreh->rawdata->data = cstate->line_buf.data; |
| cdbsreh->rawdata->len = cstate->line_buf.len; |
| |
| cdbsreh->is_server_enc = true; |
| cdbsreh->linenumber = cstate->cur_lineno; |
| if (cstate->cur_attname) |
| { |
| errormsg = psprintf("%s, column %s", |
| edata->message, cstate->cur_attname); |
| } |
| else |
| { |
| errormsg = edata->message; |
| } |
| cstate->cdbsreh->errmsg = errormsg; |
| |
| if (IS_LOG_TO_FILE(cstate->cdbsreh->logerrors)) |
| { |
| if (Gp_role == GP_ROLE_DISPATCH && !cstate->opts.on_segment) |
| { |
| cstate->cdbsreh->rejectcount++; |
| |
| SendCopyFromForwardedError(cstate, cstate->cdbCopy, errormsg); |
| } |
| else |
| { |
| /* after all the prep work let cdbsreh do the real work */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| cstate->cdbsreh->rejectcount++; |
| } |
| else |
| { |
| HandleSingleRowError(cstate->cdbsreh); |
| } |
| } |
| } |
| else |
| cstate->cdbsreh->rejectcount++; |
| |
| ErrorIfRejectLimitReached(cstate->cdbsreh); |
| |
| MemoryContextSwitchTo(oldcontext); |
| MemoryContextReset(cstate->cdbsreh->badrowcontext); |
| } |
| } |
| |
| |
| /* |
| * Read next tuple from file for COPY FROM. Return false if no more tuples. |
| * |
| * 'econtext' is used to evaluate default expression for each columns not |
| * read from the file. It can be NULL when no default values are used, i.e. |
| * when all columns are read from the file. |
| * |
| * 'values' and 'nulls' arrays must be the same length as columns of the |
| * relation passed to BeginCopyFrom. This function fills the arrays. |
| */ |
| bool |
| NextCopyFromX(CopyFromState cstate, ExprContext *econtext, |
| Datum *values, bool *nulls) |
| { |
| TupleDesc tupDesc; |
| AttrNumber num_phys_attrs, |
| attr_count, |
| num_defaults = cstate->num_defaults; |
| FmgrInfo *in_functions = cstate->in_functions; |
| Oid *typioparams = cstate->typioparams; |
| int i; |
| int *defmap = cstate->defmap; |
| ExprState **defexprs = cstate->defexprs; |
| List *attnumlist; |
| int stop_processing_at_field; |
| |
| /* |
| * Figure out what fields we're going to process in this process. |
| * |
| * In the QD, set 'stop_processing_at_field' so that we only those |
| * fields that are needed in the QD. |
| */ |
| switch (cstate->dispatch_mode) |
| { |
| case COPY_DIRECT: |
| stop_processing_at_field = -1; |
| attnumlist = cstate->attnumlist; |
| break; |
| |
| case COPY_DISPATCH: |
| stop_processing_at_field = cstate->first_qe_processed_field; |
| attnumlist = cstate->qd_attnumlist; |
| break; |
| |
| case COPY_EXECUTOR: |
| stop_processing_at_field = -1; |
| attnumlist = cstate->qe_attnumlist; |
| break; |
| |
| default: |
| elog(ERROR, "unexpected COPY dispatch mode %d", cstate->dispatch_mode); |
| } |
| |
| tupDesc = RelationGetDescr(cstate->rel); |
| num_phys_attrs = tupDesc->natts; |
| attr_count = list_length(attnumlist); |
| |
| /* Initialize all values for row to NULL */ |
| MemSet(values, 0, num_phys_attrs * sizeof(Datum)); |
| MemSet(nulls, true, num_phys_attrs * sizeof(bool)); |
| |
| if (!cstate->opts.binary) |
| { |
| char **field_strings; |
| ListCell *cur; |
| int fldct; |
| int fieldno; |
| char *string; |
| |
| /* read raw fields in the next line */ |
| if (cstate->dispatch_mode != COPY_EXECUTOR) |
| { |
| if (!NextCopyFromRawFieldsX(cstate, &field_strings, &fldct, |
| stop_processing_at_field)) |
| return false; |
| } |
| else |
| { |
| /* |
| * We have received the raw line from the QD, and we just |
| * need to split it into raw fields. |
| */ |
| if (cstate->stopped_processing_at_delim && |
| cstate->line_buf.cursor <= cstate->line_buf.len) |
| { |
| if (cstate->opts.csv_mode) |
| fldct = CopyReadAttributesCSV(cstate, -1); |
| else |
| fldct = CopyReadAttributesText(cstate, -1); |
| } |
| else |
| fldct = 0; |
| field_strings = cstate->raw_fields; |
| } |
| /* |
| * Check for overflowing fields. |
| * GPDB: Change below condition compared to upstream to |
| * greater than or equal to 0 as in QE, |
| * attr_count may be equal to 0, |
| * when all fields are processed in the QD. |
| */ |
| if (fldct > attr_count) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("extra data after last expected column"))); |
| |
| /* |
| * A completely empty line is not allowed with FILL MISSING FIELDS. Without |
| * FILL MISSING FIELDS, it's almost surely an error, but not always: |
| * a table with a single text column, for example, needs to accept empty |
| * lines. |
| */ |
| if (cstate->line_buf.len == 0 && |
| cstate->opts.fill_missing && |
| list_length(cstate->attnumlist) > 1) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("missing data for column \"%s\", found empty data line", |
| NameStr(TupleDescAttr(tupDesc, 1)->attname)))); |
| } |
| |
| fieldno = 0; |
| |
| /* Loop to read the user attributes on the line. */ |
| foreach(cur, attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| int m = attnum - 1; |
| Form_pg_attribute att = TupleDescAttr(tupDesc, m); |
| |
| if (fieldno >= fldct) |
| { |
| /* |
| * Some attributes are missing. In FILL MISSING FIELDS mode, |
| * treat them as NULLs. |
| */ |
| if (!cstate->opts.fill_missing) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("missing data for column \"%s\"", |
| NameStr(att->attname)))); |
| fieldno++; |
| string = NULL; |
| } |
| else |
| string = field_strings[fieldno++]; |
| |
| if (cstate->convert_select_flags && |
| !cstate->convert_select_flags[m]) |
| { |
| /* ignore input field, leaving column as NULL */ |
| continue; |
| } |
| |
| if (cstate->opts.csv_mode) |
| { |
| if (string == NULL && |
| cstate->opts.force_notnull_flags[m]) |
| { |
| /* |
| * FORCE_NOT_NULL option is set and column is NULL - |
| * convert it to the NULL string. |
| */ |
| string = cstate->opts.null_print; |
| } |
| else if (string != NULL && cstate->opts.force_null_flags[m] |
| && strcmp(string, cstate->opts.null_print) == 0) |
| { |
| /* |
| * FORCE_NULL option is set and column matches the NULL |
| * string. It must have been quoted, or otherwise the |
| * string would already have been set to NULL. Convert it |
| * to NULL as specified. |
| */ |
| string = NULL; |
| } |
| } |
| |
| cstate->cur_attname = NameStr(att->attname); |
| cstate->cur_attval = string; |
| values[m] = InputFunctionCall(&in_functions[m], |
| string, |
| typioparams[m], |
| att->atttypmod); |
| if (string != NULL) |
| nulls[m] = false; |
| cstate->cur_attname = NULL; |
| cstate->cur_attval = NULL; |
| } |
| |
| Assert(fieldno == attr_count); |
| } |
| else if (attr_count) |
| { |
| /* binary */ |
| int16 fld_count; |
| ListCell *cur; |
| |
| cstate->cur_lineno++; |
| |
| if (!CopyGetInt16(cstate, &fld_count)) |
| { |
| /* EOF detected (end of file, or protocol-level EOF) */ |
| return false; |
| } |
| |
| if (fld_count == -1) |
| { |
| /* |
| * Received EOF marker. In a V3-protocol copy, wait for the |
| * protocol-level EOF, and complain if it doesn't come |
| * immediately. This ensures that we correctly handle CopyFail, |
| * if client chooses to send that now. |
| * |
| * Note that we MUST NOT try to read more data in an old-protocol |
| * copy, since there is no protocol-level EOF marker then. We |
| * could go either way for copy from file, but choose to throw |
| * error if there's data after the EOF marker, for consistency |
| * with the new-protocol case. |
| */ |
| char dummy; |
| |
| if (cstate->copy_src != COPY_FRONTEND && |
| CopyGetData(cstate, &dummy, 1, 1) > 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("received copy data after EOF marker"))); |
| return false; |
| } |
| |
| if (fld_count != attr_count) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("row field count is %d, expected %d", |
| (int) fld_count, attr_count))); |
| |
| i = 0; |
| foreach(cur, attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| int m = attnum - 1; |
| Form_pg_attribute att = TupleDescAttr(tupDesc, m); |
| |
| cstate->cur_attname = NameStr(att->attname); |
| i++; |
| values[m] = CopyReadBinaryAttribute(cstate, |
| &in_functions[m], |
| typioparams[m], |
| att->atttypmod, |
| &nulls[m]); |
| cstate->cur_attname = NULL; |
| } |
| } |
| |
| /* |
| * Now compute and insert any defaults available for the columns not |
| * provided by the input data. Anything not processed here or above will |
| * remain NULL. |
| * |
| * GPDB: The defaults are always computed in the QD, and are included |
| * in the QD->QE stream as pre-computed Datums. Funny indentation, to |
| * keep the indentation of the code inside the same as in upstream. |
| * (We could improve this, and compute immutable defaults that don't |
| * affect which segment the row belongs to, in the QE.) |
| */ |
| if (cstate->dispatch_mode != COPY_EXECUTOR) |
| { |
| for (i = 0; i < num_defaults; i++) |
| { |
| /* |
| * The caller must supply econtext and have switched into the |
| * per-tuple memory context in it. |
| */ |
| Assert(econtext != NULL); |
| Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); |
| |
| values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext, |
| &nulls[defmap[i]]); |
| } |
| } |
| |
| return true; |
| } |
| |
| |
| /* |
| * Like NextCopyFrom(), but used in the QD, when we want to parse the |
| * input line only partially. We only want to parse enough fields needed |
| * to determine which target segment to forward the row to. |
| * |
| * (The logic is actually within NextCopyFrom(). This is a separate |
| * function just for symmetry with NextCopyFromExecute()). |
| */ |
| static bool |
| NextCopyFromDispatch(CopyFromState cstate, ExprContext *econtext, |
| Datum *values, bool *nulls) |
| { |
| return NextCopyFrom(cstate, econtext, values, nulls); |
| } |
| |
| /* |
| * Like NextCopyFrom(), but used in the QE, when we're reading pre-processed |
| * rows from the QD. |
| */ |
| static bool |
| NextCopyFromExecute(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls, bool is_directory_table) |
| { |
| TupleDesc tupDesc; |
| AttrNumber num_phys_attrs, |
| attr_count; |
| FormData_pg_attribute *attr; |
| int i; |
| copy_from_dispatch_row frame; |
| int r; |
| bool got_error; |
| |
| if (!is_directory_table) |
| { |
| tupDesc = RelationGetDescr(cstate->rel); |
| } |
| else |
| { |
| tupDesc = CreateTemplateTupleDesc(5); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 1, "relative_path", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 2, "size", |
| INT8OID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 3, "last_modified", |
| TIMESTAMPTZOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 4, "md5", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 5, "tag", |
| TEXTOID, -1 ,0); |
| } |
| num_phys_attrs = tupDesc->natts; |
| attr_count = list_length(cstate->attnumlist); |
| |
| /* |
| * The code below reads the 'copy_from_dispatch_row' struct, and only |
| * then checks if it was actually a 'copy_from_dispatch_error' struct. |
| * That only works when 'copy_from_dispatch_error' is larger than |
| *'copy_from_dispatch_row'. |
| */ |
| StaticAssertStmt(SizeOfCopyFromDispatchError >= SizeOfCopyFromDispatchRow, |
| "copy_from_dispatch_error must be larger than copy_from_dispatch_row"); |
| |
| /* |
| * If we encounter an error while parsing the row (or we receive a row from |
| * the QD that was already marked as an erroneous row), we loop back here |
| * until we get a good row. |
| */ |
| retry: |
| got_error = false; |
| |
| r = CopyGetData(cstate, (char *) &frame, SizeOfCopyFromDispatchRow, SizeOfCopyFromDispatchRow); |
| if (r == 0) |
| return false; |
| if (r != SizeOfCopyFromDispatchRow) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| if (frame.lineno == -1) |
| { |
| HandleQDErrorFrame(cstate, (char *) &frame); |
| goto retry; |
| } |
| |
| /* Prepare for parsing the input line */ |
| attr = tupDesc->attrs; |
| num_phys_attrs = tupDesc->natts; |
| |
| /* Initialize all values for row to NULL */ |
| MemSet(values, 0, num_phys_attrs * sizeof(Datum)); |
| MemSet(nulls, true, num_phys_attrs * sizeof(bool)); |
| |
| /* check for overflowing fields */ |
| if (frame.fld_count < 0 || frame.fld_count > num_phys_attrs) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("extra data after last expected column"))); |
| |
| /* |
| * Read the input line into 'line_buf'. |
| */ |
| resetStringInfo(&cstate->line_buf); |
| enlargeStringInfo(&cstate->line_buf, frame.line_len); |
| if (CopyGetData(cstate, cstate->line_buf.data, frame.line_len, frame.line_len) != frame.line_len) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| cstate->line_buf.data[frame.line_len] = '\0'; |
| cstate->line_buf.len = frame.line_len; |
| cstate->line_buf.cursor = frame.residual_off; |
| cstate->line_buf_valid = true; |
| cstate->cur_lineno = frame.lineno; |
| cstate->stopped_processing_at_delim = frame.delim_seen_at_end; |
| |
| /* |
| * Parse any fields from the input line that were not processed in the |
| * QD already. |
| */ |
| if (!cstate->cdbsreh) |
| { |
| if (!NextCopyFromX(cstate, econtext, values, nulls)) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| } |
| } |
| else |
| { |
| MemoryContext oldcontext = CurrentMemoryContext; |
| bool result; |
| |
| PG_TRY(); |
| { |
| result = NextCopyFromX(cstate, econtext, values, nulls); |
| |
| if (!result) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| } |
| PG_CATCH(); |
| { |
| HandleCopyError(cstate); |
| got_error = true; |
| MemoryContextSwitchTo(oldcontext); |
| } |
| PG_END_TRY(); |
| } |
| |
| /* |
| * Read any attributes that were processed in the QD already. The attribute |
| * numbers in the message are already in terms of the target partition, so |
| * we do this after remapping and switching to the partition slot. |
| */ |
| for (i = 0; i < frame.fld_count; i++) |
| { |
| int16 attnum; |
| int m; |
| int32 len; |
| Datum value; |
| |
| if (CopyGetData(cstate, &attnum, sizeof(attnum), sizeof(attnum)) != sizeof(attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| |
| if (attnum < 1 || attnum > num_phys_attrs) |
| elog(ERROR, "invalid attnum received from QD: %d (num physical attributes: %d)", |
| attnum, num_phys_attrs); |
| m = attnum - 1; |
| |
| cstate->cur_attname = NameStr(attr[m].attname); |
| |
| if (attr[attnum - 1].attbyval) |
| { |
| if (CopyGetData(cstate, &value, sizeof(Datum), sizeof(Datum)) != sizeof(Datum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| } |
| else |
| { |
| char *p; |
| |
| if (attr[attnum - 1].attlen > 0) |
| { |
| len = attr[attnum - 1].attlen; |
| |
| p = palloc(len); |
| if (CopyGetData(cstate, p, len, len) != len) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| } |
| else if (attr[attnum - 1].attlen == -1) |
| { |
| /* For simplicity, varlen's are always transmitted in "long" format */ |
| if (CopyGetData(cstate, &len, sizeof(len), sizeof(len)) != sizeof(len)) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| if (len < VARHDRSZ) |
| elog(ERROR, "invalid varlen length received from QD: %d", len); |
| p = palloc(len); |
| SET_VARSIZE(p, len); |
| if (CopyGetData(cstate, p + VARHDRSZ, len - VARHDRSZ, len - VARHDRSZ) != len - VARHDRSZ) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| } |
| else if (attr[attnum - 1].attlen == -2) |
| { |
| /* |
| * Like the varlen case above, cstrings are sent with a length |
| * prefix and no terminator, so we have to NULL-terminate in |
| * memory after reading them in. |
| */ |
| if (CopyGetData(cstate, &len, sizeof(len), sizeof(len)) != sizeof(len)) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| p = palloc(len + 1); |
| if (CopyGetData(cstate, p, len, len) != len) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| p[len] = '\0'; |
| } |
| else |
| { |
| elog(ERROR, "attribute %d has invalid length %d", |
| attnum, attr[attnum - 1].attlen); |
| } |
| value = PointerGetDatum(p); |
| } |
| |
| cstate->cur_attname = NULL; |
| |
| values[m] = value; |
| nulls[m] = false; |
| } |
| |
| if (got_error) |
| goto retry; |
| |
| /* |
| * Here we should compute defaults for any columns for which we didn't |
| * get a default from the QD. But at the moment, all defaults are evaluated |
| * in the QD. |
| */ |
| return true; |
| } |
| |
| /* |
| * Parse and handle an "error frame" from QD. |
| * |
| * The caller has already read part of the frame; 'p' points to that part, |
| * of length 'len'. |
| */ |
| static void |
| HandleQDErrorFrame(CopyFromState cstate, char *p) |
| { |
| CdbSreh *cdbsreh = cstate->cdbsreh; |
| MemoryContext oldcontext; |
| copy_from_dispatch_error errframe; |
| char *errormsg; |
| char *line; |
| int r; |
| |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| |
| oldcontext = MemoryContextSwitchTo(cdbsreh->badrowcontext); |
| |
| /* |
| * Copy the part of the struct that the caller had already read, and |
| * read the rest. |
| */ |
| memcpy(&errframe, p, SizeOfCopyFromDispatchRow); |
| |
| size_t len = SizeOfCopyFromDispatchRow; |
| r = CopyGetData(cstate, ((char *) &errframe) + len,SizeOfCopyFromDispatchError - len, SizeOfCopyFromDispatchError - len); |
| if (r != SizeOfCopyFromDispatchError - len) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| |
| errormsg = palloc(errframe.errmsg_len + 1); |
| line = palloc(errframe.line_len + 1); |
| |
| r = CopyGetData(cstate, errormsg, errframe.errmsg_len, errframe.errmsg_len); |
| if (r != errframe.errmsg_len) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| errormsg[errframe.errmsg_len] = '\0'; |
| |
| r = CopyGetData(cstate, line, errframe.line_len, errframe.line_len); |
| if (r != errframe.line_len) |
| ereport(ERROR, |
| (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), |
| errmsg("unexpected EOF in COPY data"))); |
| line[errframe.line_len] = '\0'; |
| |
| cdbsreh->linenumber = errframe.lineno; |
| cdbsreh->rawdata->cursor = 0; |
| cdbsreh->rawdata->data = line; |
| cdbsreh->rawdata->len = strlen(line); |
| cdbsreh->errmsg = errormsg; |
| |
| HandleSingleRowError(cdbsreh); |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| } |
| |
| /* |
| * Inlined versions of appendBinaryStringInfo and enlargeStringInfo, for |
| * speed. |
| * |
| * NOTE: These versions don't NULL-terminate the string. We don't need |
| * it here. |
| */ |
| #define APPEND_MSGBUF_NOCHECK(buf, ptr, datalen) \ |
| do { \ |
| memcpy((buf)->data + (buf)->len, ptr, (datalen)); \ |
| (buf)->len += (datalen); \ |
| } while(0) |
| |
| #define APPEND_MSGBUF(buf, ptr, datalen) \ |
| do { \ |
| if ((buf)->len + (datalen) >= (buf)->maxlen) \ |
| enlargeStringInfo((buf), (datalen)); \ |
| memcpy((buf)->data + (buf)->len, ptr, (datalen)); \ |
| (buf)->len += (datalen); \ |
| } while(0) |
| |
| #define ENLARGE_MSGBUF(buf, needed) \ |
| do { \ |
| if ((buf)->len + (needed) >= (buf)->maxlen) \ |
| enlargeStringInfo((buf), (needed)); \ |
| } while(0) |
| |
| /* |
| * This is the sending counterpart of NextCopyFromExecute. Used in the QD, |
| * to send a row to a QE. |
| */ |
| static void |
| SendCopyFromForwardedTuple(CopyFromState cstate, |
| CdbCopy *cdbCopy, |
| bool toAll, |
| int target_seg, |
| Relation rel, |
| int64 lineno, |
| char *line, |
| int line_len, |
| Datum *values, |
| bool *nulls, |
| bool is_directory_table) |
| { |
| TupleDesc tupDesc; |
| FormData_pg_attribute *attr; |
| copy_from_dispatch_row *frame; |
| StringInfo msgbuf; |
| int num_sent_fields; |
| AttrNumber num_phys_attrs; |
| int i; |
| |
| if (!OidIsValid(RelationGetRelid(rel))) |
| elog(ERROR, "invalid target table OID in COPY"); |
| |
| if (!is_directory_table) |
| { |
| tupDesc = RelationGetDescr(rel); |
| } |
| else |
| { |
| tupDesc = CreateTemplateTupleDesc(5); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 1, "relative_path", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 2, "size", |
| INT8OID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 3, "last_modified", |
| TIMESTAMPTZOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 4, "md5", |
| TEXTOID, -1, 0); |
| TupleDescInitEntry(tupDesc, (AttrNumber) 5, "tag", |
| TEXTOID, -1 ,0); |
| } |
| attr = tupDesc->attrs; |
| num_phys_attrs = tupDesc->natts; |
| |
| /* |
| * Reset the message buffer, and reserve enough space for the header, |
| * the OID if any, and the residual line. |
| */ |
| msgbuf = cstate->dispatch_msgbuf; |
| ENLARGE_MSGBUF(msgbuf, SizeOfCopyFromDispatchRow + sizeof(Oid) + cstate->line_buf.len); |
| |
| /* the header goes to the beginning of the struct, but it will be filled in later. */ |
| msgbuf->len = SizeOfCopyFromDispatchRow; |
| |
| /* |
| * Next, any residual text that we didn't process in the QD. |
| */ |
| APPEND_MSGBUF_NOCHECK(msgbuf, cstate->line_buf.data, cstate->line_buf.len); |
| |
| /* |
| * Append attributes to the buffer. |
| */ |
| num_sent_fields = 0; |
| for (i = 0; i < num_phys_attrs; i++) |
| { |
| int16 attnum = i + 1; |
| |
| /* NULLs are simply left out of the message. */ |
| if (nulls[i]) |
| continue; |
| |
| /* |
| * Make sure we have room for the attribute number. While we're at it, |
| * also reserve room for the Datum, if it's a by-value datatype, or for |
| * the length field, if it's a varlena. Allocating both in one call |
| * saves one size-check. |
| */ |
| ENLARGE_MSGBUF(msgbuf, sizeof(int16) + sizeof(Datum)); |
| |
| /* attribute number comes first */ |
| APPEND_MSGBUF_NOCHECK(msgbuf, &attnum, sizeof(int16)); |
| |
| if (attr[i].attbyval) |
| { |
| /* we already reserved space for this above, so we can just memcpy */ |
| APPEND_MSGBUF_NOCHECK(msgbuf, &values[i], sizeof(Datum)); |
| } |
| else |
| { |
| if (attr[i].attlen > 0) |
| { |
| APPEND_MSGBUF(msgbuf, DatumGetPointer(values[i]), attr[i].attlen); |
| } |
| else if (attr[i].attlen == -1) |
| { |
| int32 len; |
| char *ptr; |
| |
| /* For simplicity, varlen's are always transmitted in "long" format */ |
| Assert(!VARATT_IS_SHORT(values[i])); |
| len = VARSIZE(values[i]); |
| ptr = VARDATA(values[i]); |
| |
| /* we already reserved space for this int */ |
| APPEND_MSGBUF_NOCHECK(msgbuf, &len, sizeof(int32)); |
| APPEND_MSGBUF(msgbuf, ptr, len - VARHDRSZ); |
| } |
| else if (attr[i].attlen == -2) |
| { |
| /* |
| * These attrs are NULL-terminated in memory, but we send |
| * them length-prefixed (like the varlen case above) so that |
| * the receiver can preallocate a data buffer. |
| */ |
| int32 len; |
| size_t slen; |
| char *ptr; |
| |
| ptr = DatumGetPointer(values[i]); |
| slen = strlen(ptr); |
| |
| if (slen > PG_INT32_MAX) |
| { |
| elog(ERROR, "attribute %d is too long (%lld bytes)", |
| attnum, (long long) slen); |
| } |
| |
| len = (int32) slen; |
| |
| APPEND_MSGBUF_NOCHECK(msgbuf, &len, sizeof(int32)); |
| APPEND_MSGBUF(msgbuf, ptr, len); |
| } |
| else |
| { |
| elog(ERROR, "attribute %d has invalid length %d", |
| attnum, attr[i].attlen); |
| } |
| } |
| |
| num_sent_fields++; |
| } |
| |
| /* |
| * Fill in the header. We reserved room for this at the beginning of the |
| * buffer. |
| */ |
| frame = (copy_from_dispatch_row *) msgbuf->data; |
| frame->lineno = lineno; |
| frame->relid = RelationGetRelid(rel); |
| frame->line_len = cstate->line_buf.len; |
| frame->residual_off = cstate->line_buf.cursor; |
| frame->fld_count = num_sent_fields; |
| frame->delim_seen_at_end = cstate->stopped_processing_at_delim; |
| |
| if (toAll) |
| cdbCopySendDataToAll(cdbCopy, msgbuf->data, msgbuf->len); |
| else |
| cdbCopySendData(cdbCopy, target_seg, msgbuf->data, msgbuf->len); |
| } |
| |
| static void |
| SendCopyFromForwardedHeader(CopyFromState cstate, CdbCopy *cdbCopy) |
| { |
| copy_from_dispatch_header header_frame; |
| |
| cdbCopySendDataToAll(cdbCopy, QDtoQESignature, sizeof(QDtoQESignature)); |
| |
| memset(&header_frame, 0, sizeof(header_frame)); |
| header_frame.first_qe_processed_field = cstate->first_qe_processed_field; |
| |
| cdbCopySendDataToAll(cdbCopy, (char *) &header_frame, sizeof(header_frame)); |
| } |
| |
| static void |
| SendCopyFromForwardedError(CopyFromState cstate, CdbCopy *cdbCopy, char *errormsg) |
| { |
| copy_from_dispatch_error *errframe; |
| StringInfo msgbuf; |
| int target_seg; |
| int errormsg_len = strlen(errormsg); |
| |
| msgbuf = cstate->dispatch_msgbuf; |
| resetStringInfo(msgbuf); |
| enlargeStringInfo(msgbuf, SizeOfCopyFromDispatchError); |
| /* allocate space for the header (we'll fill it in last). */ |
| msgbuf->len = SizeOfCopyFromDispatchError; |
| |
| appendBinaryStringInfo(msgbuf, errormsg, errormsg_len); |
| appendBinaryStringInfo(msgbuf, cstate->line_buf.data, cstate->line_buf.len); |
| |
| errframe = (copy_from_dispatch_error *) msgbuf->data; |
| |
| errframe->error_marker = -1; |
| errframe->lineno = cstate->cur_lineno; |
| errframe->line_len = cstate->line_buf.len; |
| errframe->errmsg_len = errormsg_len; |
| |
| /* send the bad data row to a random QE (via roundrobin) */ |
| if (cstate->lastsegid == cdbCopy->total_segs) |
| cstate->lastsegid = 0; /* start over from first segid */ |
| |
| target_seg = (cstate->lastsegid++ % cdbCopy->total_segs); |
| |
| cdbCopySendData(cdbCopy, target_seg, msgbuf->data, msgbuf->len); |
| } |
| |
| static void |
| EndCopyFromDirectoryTable(CopyFromState cstate) |
| { |
| /* No COPY FROM related resources except memory. */ |
| if (cstate->is_program) |
| { |
| close_program_pipes(cstate->program_pipes, true); |
| } |
| else |
| { |
| if (cstate->copy_file && FreeFile(cstate->copy_file)) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not close file \"%s\": %m", |
| cstate->filename))); |
| } |
| } |
| |
| /* Clean up single row error handling related memory */ |
| if (cstate->cdbsreh) |
| destroyCdbSreh(cstate->cdbsreh); |
| |
| pgstat_progress_end_command(); |
| |
| MemoryContextDelete(cstate->copycontext); |
| pfree(cstate); |
| } |
| |
| /* |
| * Clean up storage and release resources for COPY FROM. |
| */ |
| void |
| EndCopyFrom(CopyFromState cstate) |
| { |
| if (cstate->rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE) |
| return EndCopyFromDirectoryTable(cstate); |
| |
| /* No COPY FROM related resources except memory. */ |
| if (cstate->is_program) |
| { |
| close_program_pipes(cstate->program_pipes, true); |
| } |
| else |
| { |
| if (cstate->filename != NULL && FreeFile(cstate->copy_file)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not close file \"%s\": %m", |
| cstate->filename))); |
| } |
| |
| /* Clean up single row error handling related memory */ |
| if (cstate->cdbsreh) |
| destroyCdbSreh(cstate->cdbsreh); |
| |
| pgstat_progress_end_command(); |
| |
| MemoryContextDelete(cstate->copycontext); |
| pfree(cstate); |
| } |
| |
| /* |
| * Initialize data loader parsing state |
| */ |
| /* |
| * GPDB_14_MERGE_FIXME: |
| * Do we need this function ? |
| * Check comments in this function. |
| */ |
| static void CopyInitDataParser(CopyFromState cstate) |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| // cstate->eol_type = EOL_UNKNOWN; /* never set this, see comments in BeginCopyFrom */ |
| cstate->cur_relname = RelationGetRelationName(cstate->rel); |
| cstate->cur_lineno = 0; |
| cstate->cur_attname = NULL; |
| cstate->cur_attval = NULL; |
| cstate->opts.null_print_len = strlen(cstate->opts.null_print); |
| |
| if (!cstate->raw_buf) |
| { |
| /* Set up data buffer to hold a chunk of data */ |
| cstate->raw_buf = palloc(RAW_BUF_SIZE + 1); |
| cstate->raw_buf_index = cstate->raw_buf_len = 0; |
| MemSet(cstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char)); |
| cstate->raw_buf[RAW_BUF_SIZE] = '\0'; |
| cstate->raw_reached_eof = false; |
| |
| if (!cstate->opts.binary) |
| { |
| if (cstate->need_transcoding) |
| { |
| cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); |
| cstate->input_buf_index = cstate->input_buf_len = 0; |
| } |
| else |
| cstate->input_buf = cstate->raw_buf; |
| cstate->input_reached_eof = false; |
| |
| initStringInfo(&cstate->line_buf); |
| } |
| } |
| |
| // to keep the same with upstream |
| initStringInfo(&cstate->attribute_buf); |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| static GpDistributionData * |
| InitDistributionData(CopyFromState cstate, EState *estate) |
| { |
| GpDistributionData *distData; |
| GpPolicy *policy; |
| CdbHash *cdbHash; |
| |
| /* |
| * A non-partitioned table, or all the partitions have identical |
| * distribution policies. |
| */ |
| policy = GpPolicyCopy(cstate->rel->rd_cdbpolicy); |
| cdbHash = makeCdbHashForRelation(cstate->rel); |
| |
| distData = palloc(sizeof(GpDistributionData)); |
| distData->policy = policy; |
| distData->cdbHash = cdbHash; |
| |
| return distData; |
| } |
| |
| static void |
| FreeDistributionData(GpDistributionData *distData) |
| { |
| if (distData) |
| { |
| if (distData->policy) |
| pfree(distData->policy); |
| if (distData->cdbHash) |
| pfree(distData->cdbHash); |
| pfree(distData); |
| } |
| } |
| |
| /* |
| * Compute which fields need to be processed in the QD, and which ones can |
| * be delayed to the QE. |
| */ |
| static void |
| InitCopyFromDispatchSplit(CopyFromState cstate, GpDistributionData *distData, |
| EState *estate) |
| { |
| int first_qe_processed_field = 0; |
| Bitmapset *needed_cols = NULL; |
| ListCell *lc; |
| |
| if (cstate->opts.binary) |
| { |
| foreach(lc, cstate->attnumlist) |
| { |
| AttrNumber attnum = lfirst_int(lc); |
| needed_cols = bms_add_member(needed_cols, attnum); |
| first_qe_processed_field++; |
| } |
| } |
| else |
| { |
| int fieldno; |
| /* |
| * We need all the columns that form the distribution key. |
| */ |
| if (distData->policy) |
| { |
| for (int i = 0; i < distData->policy->nattrs; i++) |
| needed_cols = bms_add_member(needed_cols, distData->policy->attrs[i]); |
| } |
| |
| /* Get the max fieldno that contains one of the needed attributes. */ |
| fieldno = 0; |
| foreach(lc, cstate->attnumlist) |
| { |
| AttrNumber attnum = lfirst_int(lc); |
| |
| if (bms_is_member(attnum, needed_cols)) |
| first_qe_processed_field = fieldno + 1; |
| fieldno++; |
| } |
| } |
| |
| cstate->first_qe_processed_field = first_qe_processed_field; |
| |
| if (Test_copy_qd_qe_split) |
| { |
| if (first_qe_processed_field == list_length(cstate->attnumlist)) |
| elog(INFO, "all fields will be processed in the QD"); |
| else |
| elog(INFO, "first field processed in the QE: %d", first_qe_processed_field); |
| } |
| } |
| |
| static unsigned int |
| GetTargetSeg(GpDistributionData *distData, TupleTableSlot *slot) |
| { |
| unsigned int target_seg; |
| CdbHash *cdbHash = distData->cdbHash; |
| GpPolicy *policy = distData->policy; /* the partitioning policy for this table */ |
| AttrNumber p_nattrs; /* num of attributes in the distribution policy */ |
| |
| /* |
| * These might be NULL, if we're called with a "main" GpDistributionData, |
| * for a partitioned table with heterogenous partitions. The caller |
| * should've used GetDistributionPolicyForPartition() to get the right |
| * distdata object for the partition. |
| */ |
| if (!policy) |
| elog(ERROR, "missing distribution policy."); |
| if (!cdbHash) |
| elog(ERROR, "missing cdbhash"); |
| |
| /* |
| * At this point in the code, baseValues[x] is final for this |
| * data row -- either the input data, a null or a default |
| * value is in there, and constraints applied. |
| * |
| * Perform a cdbhash on this data row. Perform a hash operation |
| * on each attribute. |
| */ |
| p_nattrs = policy->nattrs; |
| if (p_nattrs > 0) |
| { |
| cdbhashinit(cdbHash); |
| |
| for (int i = 0; i < p_nattrs; i++) |
| { |
| /* current attno from the policy */ |
| AttrNumber h_attnum = policy->attrs[i]; |
| Datum d; |
| bool isnull; |
| |
| d = slot_getattr(slot, h_attnum, &isnull); |
| |
| cdbhash(cdbHash, i + 1, d, isnull); |
| } |
| |
| target_seg = cdbhashreduce(cdbHash); /* hash result segment */ |
| } |
| else if (gp_random_insert_segments > 0 && |
| gp_random_insert_segments < policy->numsegments) |
| { |
| /* Select limited random segments for data insertion. */ |
| target_seg = cdbhashrandomseg(gp_random_insert_segments); |
| } |
| else |
| { |
| /* |
| * Randomly distributed. Pick a segment at random. |
| */ |
| target_seg = cdbhashrandomseg(policy->numsegments); |
| } |
| |
| return target_seg; |
| } |