| /*------------------------------------------------------------------------- |
| * |
| * copyto.c |
| * COPY <table> TO file/program/client |
| * |
| * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/commands/copyto.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| #include "postgres.h" |
| |
| #include <ctype.h> |
| #include <unistd.h> |
| #include <sys/stat.h> |
| |
| #include "access/heapam.h" |
| #include "access/htup_details.h" |
| #include "access/tableam.h" |
| #include "access/xact.h" |
| #include "access/xlog.h" |
| #include "catalog/namespace.h" |
| #include "catalog/pg_directory_table.h" |
| #include "commands/copy.h" |
| #include "commands/copyto_internal.h" |
| #include "commands/progress.h" |
| #include "common/cryptohash.h" |
| #include "common/md5.h" |
| #include "executor/execdesc.h" |
| #include "executor/executor.h" |
| #include "executor/tuptable.h" |
| #include "foreign/foreign.h" |
| #include "libpq/libpq.h" |
| #include "libpq/pqformat.h" |
| #include "mb/pg_wchar.h" |
| #include "miscadmin.h" |
| #include "optimizer/optimizer.h" |
| #include "pgstat.h" |
| #include "rewrite/rewriteHandler.h" |
| #include "storage/execute_pipe.h" |
| #include "storage/fd.h" |
| #include "tcop/tcopprot.h" |
| #include "utils/builtins.h" |
| #include "utils/lsyscache.h" |
| #include "utils/memutils.h" |
| #include "utils/metrics_utils.h" |
| #include "utils/partcache.h" |
| #include "utils/rel.h" |
| #include "utils/snapmgr.h" |
| |
| #include "cdb/cdbdisp_query.h" |
| #include "cdb/cdbvars.h" |
| |
| #define EXEC_DATA_P 0 |
| |
| extern CopyStmt *glob_copystmt; |
| |
| /* NOTE: there's a copy of this in copyfromparse.c */ |
| static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; |
| |
| |
| /* non-export function prototypes */ |
| static void EndCopy(CopyToState cstate); |
| static void CopyAttributeOutText(CopyToState cstate, char *string); |
| static void CopyAttributeOutCSV(CopyToState cstate, char *string, |
| bool use_quote, bool single_attr); |
| static uint64 CopyToDispatch(CopyToState cstate); |
| static void CopyToDispatchFlush(CopyToState cstate); |
| static uint64 CopyTo(CopyToState cstate); |
| static void CopySendChar(CopyToState cstate, char c); |
| static uint64 CopyToDispatchDirectoryTable(CopyToState cstate); |
| static uint64 CopyToDirectoryTable(CopyToState cstate); |
| |
| /* Low-level communications functions */ |
| static void SendCopyBegin(CopyToState cstate); |
| static void SendCopyEnd(CopyToState cstate); |
| static void CopySendData(CopyToState cstate, const void *databuf, int datasize); |
| static void CopySendString(CopyToState cstate, const char *str); |
| static void CopySendInt32(CopyToState cstate, int32 val); |
| static void CopySendInt16(CopyToState cstate, int16 val); |
| static CopyIntoClause* MakeCopyIntoClause(CopyStmt *stmt); |
| |
| /* |
| * Send copy start/stop messages for frontend copies. These have changed |
| * in past protocol redesigns. |
| */ |
| static void |
| SendCopyBegin(CopyToState cstate) |
| { |
| StringInfoData buf; |
| int natts = list_length(cstate->attnumlist); |
| int16 format = (cstate->opts.binary ? 1 : 0); |
| int i; |
| |
| pq_beginmessage(&buf, 'H'); |
| pq_sendbyte(&buf, format); /* overall format */ |
| pq_sendint16(&buf, natts); |
| for (i = 0; i < natts; i++) |
| pq_sendint16(&buf, format); /* per-column formats */ |
| pq_endmessage(&buf); |
| cstate->copy_dest = COPY_FRONTEND; |
| } |
| |
| static void |
| SendCopyEnd(CopyToState cstate) |
| { |
| /* Shouldn't have any unsent data */ |
| Assert(cstate->fe_msgbuf->len == 0); |
| /* Send Copy Done message */ |
| pq_putemptymessage('c'); |
| } |
| |
| /*---------- |
| * CopySendData sends output data to the destination (file or frontend) |
| * CopySendString does the same for null-terminated strings |
| * CopySendChar does the same for single characters |
| * CopySendEndOfRow does the appropriate thing at end of each data row |
| * (data is not actually flushed except by CopySendEndOfRow) |
| * |
| * NB: no data conversion is applied by these functions |
| *---------- |
| */ |
| static void |
| CopySendData(CopyToState cstate, const void *databuf, int datasize) |
| { |
| appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize); |
| } |
| |
| static void |
| CopySendString(CopyToState cstate, const char *str) |
| { |
| appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str)); |
| } |
| |
| void CopySendEndOfRow(CopyToState cstate) |
| { |
| StringInfo fe_msgbuf = cstate->fe_msgbuf; |
| |
| switch (cstate->copy_dest) |
| { |
| case COPY_FILE: |
| if (!cstate->opts.binary) |
| { |
| /* Default line termination depends on platform */ |
| #ifndef WIN32 |
| CopySendChar(cstate, '\n'); |
| #else |
| CopySendString(cstate, "\r\n"); |
| #endif |
| } |
| |
| if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, |
| cstate->copy_file) != 1 || |
| ferror(cstate->copy_file)) |
| { |
| if (cstate->is_program) |
| { |
| if (errno == EPIPE) |
| { |
| /* |
| * The pipe will be closed automatically on error at |
| * the end of transaction, but we might get a better |
| * error message from the subprocess' exit code than |
| * just "Broken Pipe" |
| */ |
| if (cstate->copy_file) |
| { |
| fclose(cstate->copy_file); |
| cstate->copy_file = NULL; |
| } |
| close_program_pipes(cstate->program_pipes, true); |
| |
| /* |
| * If ClosePipeToProgram() didn't throw an error, the |
| * program terminated normally, but closed the pipe |
| * first. Restore errno, and throw an error. |
| */ |
| errno = EPIPE; |
| } |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not write to COPY program: %m"))); |
| } |
| else |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not write to COPY file: %m"))); |
| } |
| break; |
| case COPY_FRONTEND: |
| /* The FE/BE protocol uses \n as newline for all platforms */ |
| if (!cstate->opts.binary) |
| CopySendChar(cstate, '\n'); |
| |
| /* Dump the accumulated row as one CopyData message */ |
| (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); |
| break; |
| case COPY_CALLBACK: |
| /* we don't actually do the write here, we let the caller do it */ |
| #ifndef WIN32 |
| CopySendChar(cstate, '\n'); |
| #else |
| CopySendString(cstate, "\r\n"); |
| #endif |
| return; /* don't want to reset msgbuf quite yet */ |
| |
| } |
| |
| /* Update the progress */ |
| cstate->bytes_processed += fe_msgbuf->len; |
| pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); |
| |
| resetStringInfo(fe_msgbuf); |
| } |
| |
| /* |
| * These functions do apply some data conversion |
| */ |
| |
| /* |
| * CopySendInt32 sends an int32 in network byte order |
| */ |
| static inline void |
| CopySendInt32(CopyToState cstate, int32 val) |
| { |
| uint32 buf; |
| |
| buf = pg_hton32((uint32) val); |
| CopySendData(cstate, &buf, sizeof(buf)); |
| } |
| |
| /* |
| * CopySendInt16 sends an int16 in network byte order |
| */ |
| static inline void |
| CopySendInt16(CopyToState cstate, int16 val) |
| { |
| uint16 buf; |
| |
| buf = pg_hton16((uint16) val); |
| CopySendData(cstate, &buf, sizeof(buf)); |
| } |
| |
| /* |
| * Setup CopyToState to read tuples from a table or a query for COPY TO. |
| */ |
| CopyToState |
| BeginCopyTo(ParseState *pstate, |
| Relation rel, |
| RawStmt *raw_query, |
| Oid queryRelId, |
| const char *filename, |
| bool is_program, |
| List *attnamelist, |
| List *options) |
| { |
| CopyToState cstate; |
| bool pipe; |
| MemoryContext oldcontext; |
| const int progress_cols[] = { |
| PROGRESS_COPY_COMMAND, |
| PROGRESS_COPY_TYPE |
| }; |
| int64 progress_vals[] = { |
| PROGRESS_COPY_COMMAND_TO, |
| 0 |
| }; |
| |
| if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) |
| { |
| if (rel->rd_rel->relkind == RELKIND_VIEW) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy from view \"%s\"", |
| RelationGetRelationName(rel)), |
| errhint("Try the COPY (SELECT ...) TO variant."))); |
| else if (rel->rd_rel->relkind == RELKIND_MATVIEW) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy from materialized view \"%s\"", |
| RelationGetRelationName(rel)), |
| errhint("Try the COPY (SELECT ...) TO variant."))); |
| else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy from foreign table \"%s\"", |
| RelationGetRelationName(rel)), |
| errhint("Try the COPY (SELECT ...) TO variant."))); |
| else if (rel->rd_rel->relkind == RELKIND_SEQUENCE) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy from sequence \"%s\"", |
| RelationGetRelationName(rel)))); |
| else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy from partitioned table \"%s\"", |
| RelationGetRelationName(rel)), |
| errhint("Try the COPY (SELECT ...) TO variant."))); |
| else |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("cannot copy from non-table relation \"%s\"", |
| RelationGetRelationName(rel)))); |
| } |
| |
| |
| cstate = BeginCopy(pstate, rel, raw_query, queryRelId, attnamelist, |
| options, NULL); |
| pipe = (filename == NULL || (Gp_role == GP_ROLE_EXECUTE && !cstate->opts.on_segment)); |
| oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| /* Determine the mode */ |
| if (Gp_role == GP_ROLE_DISPATCH && !cstate->opts.on_segment && |
| cstate->rel && cstate->rel->rd_cdbpolicy) |
| { |
| cstate->dispatch_mode = COPY_DISPATCH; |
| } |
| else |
| cstate->dispatch_mode = COPY_DIRECT; |
| |
| if (cstate->opts.on_segment && Gp_role == GP_ROLE_DISPATCH) |
| { |
| /* in ON SEGMENT mode, we don't open anything on the dispatcher. */ |
| |
| if (filename == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("STDOUT is not supported by 'COPY ON SEGMENT'"))); |
| } |
| else if (pipe) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; |
| |
| /* the grammar does not allow this on QD*/ |
| /* on QE, this could happen */ |
| Assert(!is_program || Gp_role == GP_ROLE_EXECUTE); |
| if (whereToSendOutput != DestRemote) |
| cstate->copy_file = stdout; |
| } |
| else |
| { |
| cstate->filename = pstrdup(filename); |
| cstate->is_program = is_program; |
| |
| |
| if (cstate->opts.on_segment) |
| MangleCopyFileName(&cstate->filename, NULL); |
| |
| if (is_program) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; |
| cstate->program_pipes = open_program_pipes(cstate->filename, true); |
| cstate->copy_file = fdopen(cstate->program_pipes->pipes[EXEC_DATA_P], PG_BINARY_W); |
| if (cstate->copy_file == NULL) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not execute command \"%s\": %m", |
| cstate->filename))); |
| } |
| else |
| { |
| mode_t oumask; /* Pre-existing umask value */ |
| struct stat st; |
| |
| progress_vals[1] = PROGRESS_COPY_TYPE_FILE; |
| |
| /* |
| * Prevent write to relative path ... too easy to shoot oneself in |
| * the foot by overwriting a database file ... |
| */ |
| if (!is_absolute_path(cstate->filename)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_NAME), |
| errmsg("relative path not allowed for COPY to file"))); |
| |
| oumask = umask(S_IWGRP | S_IWOTH); |
| PG_TRY(); |
| { |
| cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); |
| } |
| PG_FINALLY(); |
| { |
| umask(oumask); |
| } |
| PG_END_TRY(); |
| if (cstate->copy_file == NULL) |
| { |
| /* copy errno because ereport subfunctions might change it */ |
| int save_errno = errno; |
| |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not open file \"%s\" for writing: %m", |
| cstate->filename), |
| (save_errno == ENOENT || save_errno == EACCES) ? |
| errhint("COPY TO instructs the PostgreSQL server process to write a file. " |
| "You may want a client-side facility such as psql's \\copy.") : 0)); |
| } |
| |
| // Increase buffer size to improve performance (cmcdevitt) |
| /* GPDB_14_MERGE_FIXME: Ret value process. */ |
| setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes |
| |
| if (fstat(fileno(cstate->copy_file), &st)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not stat file \"%s\": %m", |
| cstate->filename))); |
| |
| if (S_ISDIR(st.st_mode)) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("\"%s\" is a directory", cstate->filename))); |
| } |
| } |
| /* initialize progress */ |
| pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
| cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); |
| pgstat_progress_update_multi_param(2, progress_cols, progress_vals); |
| |
| cstate->bytes_processed = 0; |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| return cstate; |
| } |
| |
| /* |
| * Clean up storage and release resources for COPY TO. |
| */ |
| void |
| EndCopyTo(CopyToState cstate, uint64 *processed) |
| { |
| if (cstate->queryDesc != NULL) |
| { |
| /* Close down the query and free resources. */ |
| ExecutorFinish(cstate->queryDesc); |
| ExecutorEnd(cstate->queryDesc); |
| if (cstate->queryDesc->es_processed > 0) |
| *processed = cstate->queryDesc->es_processed; |
| FreeQueryDesc(cstate->queryDesc); |
| PopActiveSnapshot(); |
| } |
| |
| /* Clean up storage */ |
| EndCopy(cstate); |
| } |
| |
| /* |
| * Copy from relation or query TO file. |
| * |
| * This intermediate routine exists mainly to localize the effects of setjmp |
| * so we don't need to plaster a lot of variables with "volatile". |
| */ |
| uint64 |
| DoCopyTo(CopyToState cstate) |
| { |
| bool pipe = (cstate->filename == NULL); |
| bool fe_copy = (pipe && whereToSendOutput == DestRemote); |
| uint64 processed; |
| |
| #ifdef FAULT_INJECTOR |
| FaultInjector_InjectFaultIfSet("DoCopyToFail", DDLNotSpecified, "", ""); |
| #endif |
| |
| PG_TRY(); |
| { |
| if (fe_copy) |
| SendCopyBegin(cstate); |
| |
| /* |
| * We want to dispatch COPY TO commands only in the case that |
| * we are the dispatcher and we are copying from a user relation |
| * (a relation where data is distributed in the segment databases). |
| * Otherwize, if we are not the dispatcher *or* if we are |
| * doing COPY (SELECT) we just go straight to work, without |
| * dispatching COPY commands to executors. |
| */ |
| if (Gp_role == GP_ROLE_DISPATCH && cstate->rel && cstate->rel->rd_cdbpolicy) |
| processed = CopyToDispatch(cstate); |
| else |
| processed = CopyTo(cstate); |
| |
| if (fe_copy) |
| SendCopyEnd(cstate); |
| else if (Gp_role == GP_ROLE_EXECUTE && cstate->opts.on_segment) |
| { |
| /* |
| * For COPY ON SEGMENT command, switch back to front end |
| * before sending copy end which is "\." |
| */ |
| cstate->copy_dest = COPY_FRONTEND; |
| SendCopyEnd(cstate); |
| } |
| } |
| PG_CATCH(); |
| { |
| /* |
| * Make sure we turn off old-style COPY OUT mode upon error. It is |
| * okay to do this in all cases, since it does nothing if the mode is |
| * not on. |
| */ |
| if (Gp_role == GP_ROLE_EXECUTE && cstate->opts.on_segment) |
| cstate->copy_dest = COPY_FRONTEND; |
| |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| return processed; |
| } |
| |
| /* |
| * Emit one row during DoCopyTo(). |
| */ |
| void |
| CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) |
| { |
| bool need_delim = false; |
| FmgrInfo *out_functions = cstate->out_functions; |
| MemoryContext oldcontext; |
| ListCell *cur; |
| char *string; |
| |
| MemoryContextReset(cstate->rowcontext); |
| oldcontext = MemoryContextSwitchTo(cstate->rowcontext); |
| |
| if (cstate->opts.binary) |
| { |
| /* Binary per-tuple header */ |
| CopySendInt16(cstate, list_length(cstate->attnumlist)); |
| } |
| |
| /* Make sure the tuple is fully deconstructed */ |
| slot_getallattrs(slot); |
| |
| foreach(cur, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| Datum value = slot->tts_values[attnum - 1]; |
| bool isnull = slot->tts_isnull[attnum - 1]; |
| |
| if (!cstate->opts.binary) |
| { |
| if (need_delim) |
| CopySendChar(cstate, cstate->opts.delim[0]); |
| need_delim = true; |
| } |
| |
| if (isnull) |
| { |
| if (!cstate->opts.binary) |
| CopySendString(cstate, cstate->opts.null_print_client); |
| else |
| CopySendInt32(cstate, -1); |
| } |
| else |
| { |
| if (!cstate->opts.binary) |
| { |
| char quotec = cstate->opts.quote ? cstate->opts.quote[0] : '\0'; |
| |
| /* int2out or int4out ? */ |
| if (out_functions[attnum -1].fn_oid == 39 || /* int2out or int4out */ |
| out_functions[attnum -1].fn_oid == 43 ) |
| { |
| char tmp[33]; |
| /* |
| * The standard postgres way is to call the output function, but that involves one or more pallocs, |
| * and a call to sprintf, followed by a conversion to client charset. |
| * Do a fast conversion to string instead. |
| */ |
| |
| if (out_functions[attnum -1].fn_oid == 39) |
| pg_itoa(DatumGetInt16(value),tmp); |
| else |
| pg_ltoa(DatumGetInt32(value),tmp); |
| |
| /* |
| * Integers don't need quoting, or transcoding to client char |
| * set. We still quote them if FORCE QUOTE was used, though. |
| */ |
| if (cstate->opts.force_quote_flags[attnum - 1]) |
| CopySendChar(cstate, quotec); |
| CopySendData(cstate, tmp, strlen(tmp)); |
| if (cstate->opts.force_quote_flags[attnum - 1]) |
| CopySendChar(cstate, quotec); |
| } |
| else if (out_functions[attnum -1].fn_oid == F_NUMERIC_OUT) /* numeric_out */ |
| { |
| string = OutputFunctionCall(&out_functions[attnum - 1], |
| value); |
| /* |
| * Numerics don't need quoting, or transcoding to client char |
| * set. We still quote them if FORCE QUOTE was used, though. |
| */ |
| if (cstate->opts.force_quote_flags[attnum - 1]) |
| CopySendChar(cstate, quotec); |
| CopySendData(cstate, string, strlen(string)); |
| if (cstate->opts.force_quote_flags[attnum - 1]) |
| CopySendChar(cstate, quotec); |
| } |
| else |
| { |
| string = OutputFunctionCall(&out_functions[attnum - 1], |
| value); |
| if (cstate->opts.csv_mode) |
| CopyAttributeOutCSV(cstate, string, |
| cstate->opts.force_quote_flags[attnum - 1], |
| list_length(cstate->attnumlist) == 1); |
| else |
| CopyAttributeOutText(cstate, string); |
| } |
| } |
| else |
| { |
| bytea *outputbytes; |
| |
| outputbytes = SendFunctionCall(&out_functions[attnum - 1], |
| value); |
| CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); |
| CopySendData(cstate, VARDATA(outputbytes), |
| VARSIZE(outputbytes) - VARHDRSZ); |
| } |
| } |
| } |
| |
| /* |
| * Finish off the row: write it to the destination, and update the count. |
| * However, if we're in the context of a writable external table, we let |
| * the caller do it - send the data to its local external source (see |
| * external_insert() ). |
| */ |
| if (cstate->copy_dest != COPY_CALLBACK) |
| { |
| CopySendEndOfRow(cstate); |
| } |
| |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Send text representation of one attribute, with conversion and escaping |
| */ |
| #define DUMPSOFAR() \ |
| do { \ |
| if (ptr > start) \ |
| CopySendData(cstate, start, ptr - start); \ |
| } while (0) |
| |
| static void |
| CopyAttributeOutText(CopyToState cstate, char *string) |
| { |
| char *ptr; |
| char *start; |
| char c; |
| char delimc = cstate->opts.delim[0]; |
| char escapec = cstate->opts.escape[0]; |
| |
| if (cstate->need_transcoding) |
| ptr = pg_server_to_any(string, |
| strlen(string), |
| cstate->file_encoding); |
| else |
| ptr = string; |
| |
| if (cstate->escape_off) |
| { |
| CopySendData(cstate, ptr, strlen(ptr)); |
| return; |
| } |
| |
| /* |
| * We have to grovel through the string searching for control characters |
| * and instances of the delimiter character. In most cases, though, these |
| * are infrequent. To avoid overhead from calling CopySendData once per |
| * character, we dump out all characters between escaped characters in a |
| * single call. The loop invariant is that the data from "start" to "ptr" |
| * can be sent literally, but hasn't yet been. |
| * |
| * We can skip pg_encoding_mblen() overhead when encoding is safe, because |
| * in valid backend encodings, extra bytes of a multibyte character never |
| * look like ASCII. This loop is sufficiently performance-critical that |
| * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out |
| * of the normal safe-encoding path. |
| */ |
| if (cstate->encoding_embeds_ascii) |
| { |
| start = ptr; |
| while ((c = *ptr) != '\0') |
| { |
| if ((unsigned char) c < (unsigned char) 0x20) |
| { |
| /* |
| * \r and \n must be escaped, the others are traditional. We |
| * prefer to dump these using the C-like notation, rather than |
| * a backslash and the literal character, because it makes the |
| * dump file a bit more proof against Microsoftish data |
| * mangling. |
| */ |
| switch (c) |
| { |
| case '\b': |
| c = 'b'; |
| break; |
| case '\f': |
| c = 'f'; |
| break; |
| case '\n': |
| c = 'n'; |
| break; |
| case '\r': |
| c = 'r'; |
| break; |
| case '\t': |
| c = 't'; |
| break; |
| case '\v': |
| c = 'v'; |
| break; |
| default: |
| /* If it's the delimiter, must backslash it */ |
| if (c == delimc) |
| break; |
| /* All ASCII control chars are length 1 */ |
| ptr++; |
| continue; /* fall to end of loop */ |
| } |
| /* if we get here, we need to convert the control char */ |
| DUMPSOFAR(); |
| CopySendChar(cstate, escapec); |
| CopySendChar(cstate, c); |
| start = ++ptr; /* do not include char in next run */ |
| } |
| else if (c == '\\' || c == delimc) |
| { |
| DUMPSOFAR(); |
| CopySendChar(cstate, escapec); |
| start = ptr++; /* we include char in next run */ |
| } |
| else if (IS_HIGHBIT_SET(c)) |
| ptr += pg_encoding_mblen(cstate->file_encoding, ptr); |
| else |
| ptr++; |
| } |
| } |
| else |
| { |
| start = ptr; |
| while ((c = *ptr) != '\0') |
| { |
| if ((unsigned char) c < (unsigned char) 0x20) |
| { |
| /* |
| * \r and \n must be escaped, the others are traditional. We |
| * prefer to dump these using the C-like notation, rather than |
| * a backslash and the literal character, because it makes the |
| * dump file a bit more proof against Microsoftish data |
| * mangling. |
| */ |
| switch (c) |
| { |
| case '\b': |
| c = 'b'; |
| break; |
| case '\f': |
| c = 'f'; |
| break; |
| case '\n': |
| c = 'n'; |
| break; |
| case '\r': |
| c = 'r'; |
| break; |
| case '\t': |
| c = 't'; |
| break; |
| case '\v': |
| c = 'v'; |
| break; |
| default: |
| /* If it's the delimiter, must backslash it */ |
| if (c == delimc) |
| break; |
| /* All ASCII control chars are length 1 */ |
| ptr++; |
| continue; /* fall to end of loop */ |
| } |
| /* if we get here, we need to convert the control char */ |
| DUMPSOFAR(); |
| CopySendChar(cstate, escapec); |
| CopySendChar(cstate, c); |
| start = ++ptr; /* do not include char in next run */ |
| } |
| else if (c == '\\' || c == delimc) |
| { |
| DUMPSOFAR(); |
| CopySendChar(cstate, escapec); |
| start = ptr++; /* we include char in next run */ |
| } |
| else |
| ptr++; |
| } |
| } |
| |
| DUMPSOFAR(); |
| } |
| |
| /* |
| * Send text representation of one attribute, with conversion and |
| * CSV-style escaping |
| */ |
| static void |
| CopyAttributeOutCSV(CopyToState cstate, char *string, |
| bool use_quote, bool single_attr) |
| { |
| char *ptr; |
| char *start; |
| char c; |
| char delimc = cstate->opts.delim[0]; |
| char quotec; |
| char escapec = cstate->opts.escape[0]; |
| |
| /* |
| * MPP-8075. We may get called with cstate->opts.quote == NULL. |
| */ |
| if (cstate->opts.quote == NULL) |
| { |
| quotec = '"'; |
| } |
| else |
| { |
| quotec = cstate->opts.quote[0]; |
| } |
| |
| /* force quoting if it matches null_print (before conversion!) */ |
| if (!use_quote && strcmp(string, cstate->opts.null_print) == 0) |
| use_quote = true; |
| |
| if (cstate->need_transcoding) |
| ptr = pg_server_to_any(string, |
| strlen(string), |
| cstate->file_encoding); |
| else |
| ptr = string; |
| |
| /* |
| * Make a preliminary pass to discover if it needs quoting |
| */ |
| if (!use_quote) |
| { |
| /* |
| * Because '\.' can be a data value, quote it if it appears alone on a |
| * line so it is not interpreted as the end-of-data marker. |
| */ |
| if (single_attr && strcmp(ptr, "\\.") == 0) |
| use_quote = true; |
| else |
| { |
| char *tptr = ptr; |
| |
| while ((c = *tptr) != '\0') |
| { |
| if (c == delimc || c == quotec || c == '\n' || c == '\r') |
| { |
| use_quote = true; |
| break; |
| } |
| if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) |
| tptr += pg_encoding_mblen(cstate->file_encoding, tptr); |
| else |
| tptr++; |
| } |
| } |
| } |
| |
| if (use_quote) |
| { |
| CopySendChar(cstate, quotec); |
| |
| /* |
| * We adopt the same optimization strategy as in CopyAttributeOutText |
| */ |
| start = ptr; |
| while ((c = *ptr) != '\0') |
| { |
| if (c == quotec || c == escapec) |
| { |
| DUMPSOFAR(); |
| CopySendChar(cstate, escapec); |
| start = ptr; /* we include char in next run */ |
| } |
| if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii) |
| ptr += pg_encoding_mblen(cstate->file_encoding, ptr); |
| else |
| ptr++; |
| } |
| DUMPSOFAR(); |
| |
| CopySendChar(cstate, quotec); |
| } |
| else |
| { |
| /* If it doesn't need quoting, we can just dump it as-is */ |
| CopySendString(cstate, ptr); |
| } |
| } |
| |
| /* |
| * copy_dest_startup --- executor startup |
| */ |
| static void |
| copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) |
| { |
| if (Gp_role != GP_ROLE_EXECUTE) |
| return; |
| DR_copy *myState = (DR_copy *) self; |
| myState->cstate = BeginCopyToOnSegment(myState->queryDesc); |
| } |
| |
| /* |
| * copy_dest_receive --- receive one tuple |
| */ |
| static bool |
| copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) |
| { |
| DR_copy *myState = (DR_copy *) self; |
| CopyToState cstate = myState->cstate; |
| |
| /* Send the data */ |
| CopyOneRowTo(cstate, slot); |
| |
| /* Increment the number of processed tuples, and report the progress */ |
| pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
| ++myState->processed); |
| |
| return true; |
| } |
| |
| /* |
| * copy_dest_shutdown --- executor end |
| */ |
| static void |
| copy_dest_shutdown(DestReceiver *self) |
| { |
| if (Gp_role != GP_ROLE_EXECUTE) |
| return; |
| DR_copy *myState = (DR_copy *) self; |
| EndCopyToOnSegment(myState->cstate); |
| } |
| |
| /* |
| * copy_dest_destroy --- release DestReceiver object |
| */ |
| static void |
| copy_dest_destroy(DestReceiver *self) |
| { |
| pfree(self); |
| } |
| |
| /* |
| * CreateCopyDestReceiver -- create a suitable DestReceiver object |
| */ |
| DestReceiver * |
| CreateCopyDestReceiver(void) |
| { |
| DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy)); |
| |
| self->pub.receiveSlot = copy_dest_receive; |
| self->pub.rStartup = copy_dest_startup; |
| self->pub.rShutdown = copy_dest_shutdown; |
| self->pub.rDestroy = copy_dest_destroy; |
| self->pub.mydest = DestCopyOut; |
| |
| self->cstate = NULL; /* will be set later */ |
| self->queryDesc = NULL; /* need to be set later */ |
| self->processed = 0; |
| |
| return (DestReceiver *) self; |
| } |
| |
| /* |
| * AXG: This one is equivalent to CopySendEndOfRow() besides that |
| * it doesn't send end of row - it just flushed the data. We need |
| * this method for the dispatcher COPY TO since it already has data |
| * with newlines (from the executors). |
| */ |
| static void |
| CopyToDispatchFlush(CopyToState cstate) |
| { |
| StringInfo fe_msgbuf = cstate->fe_msgbuf; |
| |
| switch (cstate->copy_dest) |
| { |
| case COPY_FILE: |
| |
| (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, |
| 1, cstate->copy_file); |
| if (ferror(cstate->copy_file)) |
| { |
| if (cstate->is_program) |
| { |
| if (errno == EPIPE) |
| { |
| /* |
| * The pipe will be closed automatically on error at |
| * the end of transaction, but we might get a better |
| * error message from the subprocess' exit code than |
| * just "Broken Pipe" |
| */ |
| if (cstate->copy_file) |
| { |
| fclose(cstate->copy_file); |
| cstate->copy_file = NULL; |
| } |
| close_program_pipes(cstate->program_pipes, true); |
| |
| /* |
| * If close_program_pipes() didn't throw an error, |
| * the program terminated normally, but closed the |
| * pipe first. Restore errno, and throw an error. |
| */ |
| errno = EPIPE; |
| } |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not write to COPY program: %m"))); |
| } |
| else |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not write to COPY file: %m"))); |
| } |
| break; |
| case COPY_FRONTEND: |
| |
| /* Dump the accumulated row as one CopyData message */ |
| (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); |
| break; |
| case COPY_CALLBACK: |
| elog(ERROR, "unexpected destination COPY_CALLBACK to flush data"); |
| break; |
| } |
| |
| resetStringInfo(fe_msgbuf); |
| } |
| |
| CopyIntoClause* |
| MakeCopyIntoClause(CopyStmt *stmt) |
| { |
| CopyIntoClause *copyIntoClause; |
| copyIntoClause = makeNode(CopyIntoClause); |
| |
| copyIntoClause->is_program = stmt->is_program; |
| copyIntoClause->filename = stmt->filename; |
| copyIntoClause->options = stmt->options; |
| copyIntoClause->attlist = stmt->attlist; |
| |
| return copyIntoClause; |
| } |
| |
| /* |
| * Common setup routines used by BeginCopyTo. |
| * |
| * Iff <binary>, unload or reload in the binary format, as opposed to the |
| * more wasteful but more robust and portable text format. |
| * |
| * Iff <oids>, unload or reload the format that includes OID information. |
| * On input, we accept OIDs whether or not the table has an OID column, |
| * but silently drop them if it does not. On output, we report an error |
| * if the user asks for OIDs in a table that has none (not providing an |
| * OID column might seem friendlier, but could seriously confuse programs). |
| * |
| * If in the text format, delimit columns with delimiter <delim> and print |
| * NULL values as <null_print>. |
| */ |
| CopyToState |
| BeginCopy(ParseState *pstate, |
| Relation rel, |
| RawStmt *raw_query, |
| Oid queryRelId, |
| List *attnamelist, |
| List *options, |
| TupleDesc tupDesc) |
| { |
| CopyToState cstate; |
| int num_phys_attrs; |
| MemoryContext oldcontext; |
| |
| /* Allocate workspace and zero all fields */ |
| cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData)); |
| |
| glob_cstate = cstate; |
| |
| cstate->escape_off = false; |
| |
| /* |
| * We allocate everything used by a cstate in a new memory context. This |
| * avoids memory leaks during repeated use of COPY in a query. |
| */ |
| cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, |
| "COPY", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| /* Cloudberry needs this to detect custom protocol */ |
| if (rel) |
| cstate->rel = rel; |
| |
| ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */, options, rel ? rel->rd_id : InvalidOid); |
| |
| /* Extract options from the statement node tree */ |
| if (cstate->opts.escape != NULL && pg_strcasecmp(cstate->opts.escape, "off") == 0) |
| { |
| cstate->escape_off = true; |
| } |
| |
| /* Process the source/target relation or query */ |
| if (rel) |
| { |
| Assert(!raw_query); |
| |
| tupDesc = RelationGetDescr(cstate->rel); |
| } |
| else if(raw_query) |
| { |
| List *rewritten; |
| Query *query; |
| PlannedStmt *plan; |
| DestReceiver *dest; |
| |
| cstate->rel = NULL; |
| |
| /* |
| * Run parse analysis and rewrite. Note this also acquires sufficient |
| * locks on the source table(s). |
| * |
| * Because the parser and planner tend to scribble on their input, we |
| * make a preliminary copy of the source querytree. This prevents |
| * problems in the case that the COPY is in a portal or plpgsql |
| * function and is executed repeatedly. (See also the same hack in |
| * DECLARE CURSOR and PREPARE.) XXX FIXME someday. |
| */ |
| rewritten = pg_analyze_and_rewrite(copyObject(raw_query), |
| pstate->p_sourcetext, NULL, 0, |
| NULL); |
| |
| /* check that we got back something we can work with */ |
| if (rewritten == NIL) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("DO INSTEAD NOTHING rules are not supported for COPY"))); |
| } |
| else if (list_length(rewritten) > 1) |
| { |
| ListCell *lc; |
| |
| /* examine queries to determine which error message to issue */ |
| foreach(lc, rewritten) |
| { |
| Query *q = lfirst_node(Query, lc); |
| |
| if (q->querySource == QSRC_QUAL_INSTEAD_RULE) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("conditional DO INSTEAD rules are not supported for COPY"))); |
| if (q->querySource == QSRC_NON_INSTEAD_RULE) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("DO ALSO rules are not supported for the COPY"))); |
| } |
| |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("multi-statement DO INSTEAD rules are not supported for COPY"))); |
| } |
| |
| query = linitial_node(Query, rewritten); |
| |
| |
| if (cstate->opts.on_segment && IsA(query, Query)) |
| { |
| query->parentStmtType = PARENTSTMTTYPE_COPY; |
| } |
| /* Query mustn't use INTO, either */ |
| if (query->utilityStmt != NULL && |
| IsA(query->utilityStmt, CreateTableAsStmt)) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("COPY (SELECT INTO) is not supported"))); |
| |
| Assert(query->utilityStmt == NULL); |
| |
| /* |
| * Similarly the grammar doesn't enforce the presence of a RETURNING |
| * clause, but this is required here. |
| */ |
| if (query->commandType != CMD_SELECT && |
| query->returningList == NIL) |
| { |
| Assert(query->commandType == CMD_INSERT || |
| query->commandType == CMD_UPDATE || |
| query->commandType == CMD_DELETE); |
| |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("COPY query must have a RETURNING clause"))); |
| } |
| |
| /* plan the query */ |
| int cursorOptions = CURSOR_OPT_PARALLEL_OK; |
| |
| /* GPDB: Pass the IGNORE EXTERNAL PARTITION option to the planner. */ |
| if (cstate->opts.skip_foreign_partitions) |
| cursorOptions |= CURSOR_OPT_SKIP_FOREIGN_PARTITIONS; |
| |
| plan = pg_plan_query(query, pstate->p_sourcetext, cursorOptions, NULL); |
| |
| /* |
| * With row level security and a user using "COPY relation TO", we |
| * have to convert the "COPY relation TO" to a query-based COPY (eg: |
| * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add |
| * in any RLS clauses. |
| * |
| * When this happens, we are passed in the relid of the originally |
| * found relation (which we have locked). As the planner will look up |
| * the relation again, we double-check here to make sure it found the |
| * same one that we have locked. |
| */ |
| if (queryRelId != InvalidOid) |
| { |
| /* |
| * Note that with RLS involved there may be multiple relations, |
| * and while the one we need is almost certainly first, we don't |
| * make any guarantees of that in the planner, so check the whole |
| * list and make sure we find the original relation. |
| */ |
| if (!list_member_oid(plan->relationOids, queryRelId)) |
| ereport(ERROR, |
| (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
| errmsg("relation referenced by COPY statement has changed"))); |
| } |
| |
| /* |
| * Use a snapshot with an updated command ID to ensure this query sees |
| * results of any previously executed queries. |
| */ |
| PushCopiedSnapshot(GetActiveSnapshot()); |
| UpdateActiveSnapshotCommandId(); |
| |
| /* Create dest receiver for COPY OUT */ |
| dest = CreateDestReceiver(DestCopyOut); |
| ((DR_copy *) dest)->cstate = cstate; |
| |
| /* Create a QueryDesc requesting no output */ |
| cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext, |
| GetActiveSnapshot(), |
| InvalidSnapshot, |
| dest, NULL, NULL, |
| GP_INSTRUMENT_OPTS); |
| if (cstate->opts.on_segment) |
| cstate->queryDesc->plannedstmt->copyIntoClause = |
| MakeCopyIntoClause(glob_copystmt); |
| |
| /* GPDB hook for collecting query info */ |
| if (query_info_collect_hook) |
| (*query_info_collect_hook)(METRICS_QUERY_SUBMIT, cstate->queryDesc); |
| |
| /* |
| * Call ExecutorStart to prepare the plan for execution. |
| * |
| * ExecutorStart computes a result tupdesc for us |
| */ |
| ExecutorStart(cstate->queryDesc, 0); |
| |
| tupDesc = cstate->queryDesc->tupDesc; |
| } |
| |
| cstate->attnamelist = attnamelist; |
| /* Generate or convert list of attributes to process */ |
| cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist); |
| |
| num_phys_attrs = tupDesc->natts; |
| |
| /* Convert FORCE_QUOTE name list to per-column flags, check validity */ |
| cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
| if (cstate->opts.force_quote_all) |
| { |
| int i; |
| |
| for (i = 0; i < num_phys_attrs; i++) |
| cstate->opts.force_quote_flags[i] = true; |
| } |
| else if (cstate->opts.force_quote) |
| { |
| List *attnums; |
| ListCell *cur; |
| |
| attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote); |
| |
| foreach(cur, attnums) |
| { |
| int attnum = lfirst_int(cur); |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (!list_member_int(cstate->attnumlist, attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
| errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY", |
| NameStr(attr->attname)))); |
| cstate->opts.force_quote_flags[attnum - 1] = true; |
| } |
| } |
| |
| /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */ |
| cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
| if (cstate->opts.force_notnull) |
| { |
| List *attnums; |
| ListCell *cur; |
| |
| attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull); |
| |
| foreach(cur, attnums) |
| { |
| int attnum = lfirst_int(cur); |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (!list_member_int(cstate->attnumlist, attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
| errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY", |
| NameStr(attr->attname)))); |
| cstate->opts.force_notnull_flags[attnum - 1] = true; |
| } |
| } |
| |
| /* Convert FORCE_NULL name list to per-column flags, check validity */ |
| cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool)); |
| if (cstate->opts.force_null) |
| { |
| List *attnums; |
| ListCell *cur; |
| |
| attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null); |
| |
| foreach(cur, attnums) |
| { |
| int attnum = lfirst_int(cur); |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (!list_member_int(cstate->attnumlist, attnum)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), |
| errmsg("FORCE_NULL column \"%s\" not referenced by COPY", |
| NameStr(attr->attname)))); |
| cstate->opts.force_null_flags[attnum - 1] = true; |
| } |
| } |
| |
| /* Use client encoding when ENCODING option is not specified. */ |
| if (cstate->file_encoding < 0) |
| cstate->file_encoding = pg_get_client_encoding(); |
| else |
| cstate->file_encoding = cstate->opts.file_encoding; |
| |
| /* |
| * Set up encoding conversion info. Even if the file and server encodings |
| * are the same, we must apply pg_any_to_server() to validate data in |
| * multibyte encodings. |
| * |
| * In COPY_EXECUTE mode, the dispatcher has already done the conversion. |
| */ |
| if (cstate->dispatch_mode != COPY_DISPATCH) |
| { |
| cstate->need_transcoding = |
| ((cstate->file_encoding != GetDatabaseEncoding() || |
| pg_database_encoding_max_length() > 1)); |
| } |
| else |
| { |
| cstate->need_transcoding = false; |
| } |
| /* See Multibyte encoding comment above */ |
| cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); |
| |
| cstate->copy_dest = COPY_FILE; /* default */ |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| return cstate; |
| } |
| |
| CopyToState |
| BeginCopyToOnSegment(QueryDesc *queryDesc) |
| { |
| CopyToState cstate; |
| MemoryContext oldcontext; |
| ListCell *cur; |
| |
| TupleDesc tupDesc; |
| int num_phys_attrs; |
| char *filename; |
| CopyIntoClause *copyIntoClause; |
| const int progress_cols[] = { |
| PROGRESS_COPY_COMMAND, |
| PROGRESS_COPY_TYPE |
| }; |
| int64 progress_vals[] = { |
| PROGRESS_COPY_COMMAND_TO, |
| 0 |
| }; |
| |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| |
| copyIntoClause = queryDesc->plannedstmt->copyIntoClause; |
| tupDesc = queryDesc->tupDesc; |
| cstate = BeginCopy(NULL, NULL, NULL, InvalidOid, copyIntoClause->attlist, |
| copyIntoClause->options, tupDesc); |
| oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| cstate->opts.null_print_client = cstate->opts.null_print; /* default */ |
| |
| /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ |
| cstate->fe_msgbuf = makeStringInfo(); |
| |
| cstate->filename = pstrdup(copyIntoClause->filename); |
| cstate->is_program = copyIntoClause->is_program; |
| |
| if (cstate->opts.on_segment) |
| MangleCopyFileName(&cstate->filename, NULL); |
| filename = cstate->filename; |
| |
| if (cstate->is_program) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; |
| cstate->program_pipes = open_program_pipes(cstate->filename, true); |
| cstate->copy_file = fdopen(cstate->program_pipes->pipes[EXEC_DATA_P], PG_BINARY_W); |
| |
| if (cstate->copy_file == NULL) |
| ereport(ERROR, |
| (errmsg("could not execute command \"%s\": %m", |
| cstate->filename))); |
| } |
| else |
| { |
| mode_t oumask; /* Pre-existing umask value */ |
| struct stat st; |
| |
| progress_vals[1] = PROGRESS_COPY_TYPE_FILE; |
| /* |
| * Prevent write to relative path ... too easy to shoot oneself in |
| * the foot by overwriting a database file ... |
| */ |
| if (!is_absolute_path(filename)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_NAME), |
| errmsg("relative path not allowed for COPY to file"))); |
| |
| oumask = umask(S_IWGRP | S_IWOTH); |
| cstate->copy_file = AllocateFile(filename, PG_BINARY_W); |
| umask(oumask); |
| if (cstate->copy_file == NULL) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not open file \"%s\" for writing: %m", filename))); |
| |
| // Increase buffer size to improve performance (cmcdevitt) |
| /* GPDB_14_MERGE_FIXME: Ret value process. */ |
| setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes |
| |
| fstat(fileno(cstate->copy_file), &st); |
| if (S_ISDIR(st.st_mode)) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("\"%s\" is a directory", filename))); |
| } |
| |
| num_phys_attrs = tupDesc->natts; |
| /* Get info about the columns we need to process. */ |
| cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); |
| foreach(cur, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| Oid out_func_oid; |
| bool isvarlena; |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (cstate->opts.binary) |
| getTypeBinaryOutputInfo(attr->atttypid, |
| &out_func_oid, |
| &isvarlena); |
| else |
| getTypeOutputInfo(attr->atttypid, |
| &out_func_oid, |
| &isvarlena); |
| fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); |
| } |
| |
| /* |
| * Create a temporary memory context that we can reset once per row to |
| * recover palloc'd memory. This avoids any problems with leaks inside |
| * datatype output routines, and should be faster than retail pfree's |
| * anyway. (We don't need a whole econtext as CopyFrom does.) |
| */ |
| cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, |
| "COPY TO", |
| ALLOCSET_DEFAULT_MINSIZE, |
| ALLOCSET_DEFAULT_INITSIZE, |
| ALLOCSET_DEFAULT_MAXSIZE); |
| |
| if (cstate->opts.binary) |
| { |
| /* Generate header for a binary copy */ |
| int32 tmp; |
| |
| /* Signature */ |
| CopySendData(cstate, BinarySignature, 11); |
| /* Flags field */ |
| tmp = 0; |
| CopySendInt32(cstate, tmp); |
| /* No header extension */ |
| tmp = 0; |
| CopySendInt32(cstate, tmp); |
| } |
| else |
| { |
| /* if a header has been requested send the line */ |
| if (cstate->opts.header_line) |
| { |
| bool hdr_delim = false; |
| |
| foreach(cur, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| char *colname; |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (hdr_delim) |
| CopySendChar(cstate, cstate->opts.delim[0]); |
| hdr_delim = true; |
| |
| colname = NameStr(attr->attname); |
| |
| CopyAttributeOutCSV(cstate, colname, false, |
| list_length(cstate->attnumlist) == 1); |
| } |
| |
| CopySendEndOfRow(cstate); |
| } |
| } |
| |
| /* initialize progress */ |
| pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
| cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); |
| pgstat_progress_update_multi_param(2, progress_cols, progress_vals); |
| cstate->bytes_processed = 0; |
| |
| MemoryContextSwitchTo(oldcontext); |
| return cstate; |
| } |
| |
| /* |
| * Set up CopyToState for writing to a foreign or external table. |
| */ |
| CopyToState |
| BeginCopyToForeignTable(Relation forrel, List *options) |
| { |
| CopyToState cstate; |
| const int progress_cols[] = { |
| PROGRESS_COPY_COMMAND, |
| PROGRESS_COPY_TYPE |
| }; |
| int64 progress_vals[] = { |
| PROGRESS_COPY_COMMAND_TO, |
| PROGRESS_COPY_TYPE_CALLBACK |
| }; |
| |
| Assert(forrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE); |
| |
| cstate = BeginCopy(NULL, forrel, |
| NULL, /* raw_query */ |
| InvalidOid, |
| NIL, options, |
| RelationGetDescr(forrel)); |
| cstate->dispatch_mode = COPY_DIRECT; |
| |
| /* |
| * We use COPY_CALLBACK to mean that the each line should be |
| * left in fe_msgbuf. There is no actual callback! |
| */ |
| cstate->copy_dest = COPY_CALLBACK; |
| |
| /* |
| * Some more initialization, that in the normal COPY TO codepath, is done |
| * in CopyTo() itself. |
| */ |
| cstate->opts.null_print_client = cstate->opts.null_print; /* default */ |
| if (cstate->need_transcoding) |
| cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, |
| cstate->opts.null_print_len, |
| cstate->file_encoding); |
| /* initialize progress */ |
| pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
| cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); |
| pgstat_progress_update_multi_param(2, progress_cols, progress_vals); |
| cstate->bytes_processed = 0; |
| |
| return cstate; |
| } |
| |
| void EndCopyToOnSegment(CopyToState cstate) |
| { |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| |
| if (cstate->opts.binary) |
| { |
| /* Generate trailer for a binary copy */ |
| CopySendInt16(cstate, -1); |
| |
| /* Need to flush out the trailer */ |
| CopySendEndOfRow(cstate); |
| } |
| |
| MemoryContextDelete(cstate->rowcontext); |
| |
| EndCopy(cstate); |
| } |
| |
| /* |
| * Copy FROM relation TO file, in the dispatcher. Starts a COPY TO command on |
| * each of the executors and gathers all the results and writes it out. |
| */ |
| static uint64 |
| CopyToDispatch(CopyToState cstate) |
| { |
| CopyStmt *stmt = glob_copystmt; |
| TupleDesc tupDesc; |
| int num_phys_attrs; |
| int attr_count; |
| CdbCopy *cdbCopy; |
| uint64 processed = 0; |
| |
| if (cstate->rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE) |
| return CopyToDispatchDirectoryTable(cstate); |
| |
| tupDesc = cstate->rel->rd_att; |
| num_phys_attrs = tupDesc->natts; |
| attr_count = list_length(cstate->attnumlist); |
| |
| /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ |
| cstate->fe_msgbuf = makeStringInfo(); |
| |
| cdbCopy = makeCdbCopyTo(cstate); |
| |
| /* XXX: lock all partitions */ |
| |
| /* |
| * Start a COPY command in every db of every segment in Apache Cloudberry. |
| * |
| * From this point in the code we need to be extra careful |
| * about error handling. ereport() must not be called until |
| * the COPY command sessions are closed on the executors. |
| * Calling ereport() will leave the executors hanging in |
| * COPY state. |
| */ |
| elog(DEBUG5, "COPY command sent to segdbs"); |
| |
| PG_TRY(); |
| { |
| bool done; |
| |
| cdbCopyStart(cdbCopy, stmt, cstate->file_encoding); |
| |
| if (cstate->opts.binary) |
| { |
| /* Generate header for a binary copy */ |
| int32 tmp; |
| |
| /* Signature */ |
| CopySendData(cstate, (char *) BinarySignature, 11); |
| /* Flags field */ |
| tmp = 0; |
| CopySendInt32(cstate, tmp); |
| /* No header extension */ |
| tmp = 0; |
| CopySendInt32(cstate, tmp); |
| } |
| |
| /* if a header has been requested send the line */ |
| if (cstate->opts.header_line) |
| { |
| ListCell *cur; |
| bool hdr_delim = false; |
| |
| /* |
| * For non-binary copy, we need to convert null_print to client |
| * encoding, because it will be sent directly with CopySendString. |
| * |
| * MPP: in here we only care about this if we need to print the |
| * header. We rely on the segdb server copy out to do the conversion |
| * before sending the data rows out. We don't need to repeat it here |
| */ |
| if (cstate->need_transcoding) |
| cstate->opts.null_print = (char *) |
| pg_server_to_any(cstate->opts.null_print, |
| strlen(cstate->opts.null_print), |
| cstate->file_encoding); |
| |
| foreach(cur, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| char *colname; |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (hdr_delim) |
| CopySendChar(cstate, cstate->opts.delim[0]); |
| hdr_delim = true; |
| |
| colname = NameStr(attr->attname); |
| |
| CopyAttributeOutCSV(cstate, colname, false, |
| list_length(cstate->attnumlist) == 1); |
| } |
| |
| /* add a newline and flush the data */ |
| CopySendEndOfRow(cstate); |
| } |
| |
| /* |
| * This is the main work-loop. In here we keep collecting data from the |
| * COPY commands on the segdbs, until no more data is available. We |
| * keep writing data out a chunk at a time. |
| */ |
| do |
| { |
| bool copy_cancel = (QueryCancelPending ? true : false); |
| |
| /* get a chunk of data rows from the QE's */ |
| done = cdbCopyGetData(cdbCopy, copy_cancel, &processed); |
| |
| /* send the chunk of data rows to destination (file or stdout) */ |
| if (cdbCopy->copy_out_buf.len > 0) /* conditional is important! */ |
| { |
| /* |
| * in the dispatcher we receive chunks of whole rows with row endings. |
| * We don't want to use CopySendEndOfRow() b/c it adds row endings and |
| * also b/c it's intended for a single row at a time. Therefore we need |
| * to fill in the out buffer and just flush it instead. |
| */ |
| CopySendData(cstate, (void *) cdbCopy->copy_out_buf.data, cdbCopy->copy_out_buf.len); |
| CopyToDispatchFlush(cstate); |
| } |
| } while(!done); |
| |
| cdbCopyEnd(cdbCopy, NULL, NULL); |
| |
| /* now it's safe to destroy the whole dispatcher state */ |
| CdbDispatchCopyEnd(cdbCopy); |
| } |
| /* catch error from CopyStart, CopySendEndOfRow or CopyToDispatchFlush */ |
| PG_CATCH(); |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| cdbCopyAbort(cdbCopy); |
| |
| MemoryContextSwitchTo(oldcontext); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| if (cstate->opts.binary) |
| { |
| /* Generate trailer for a binary copy */ |
| CopySendInt16(cstate, -1); |
| /* Need to flush out the trailer */ |
| CopySendEndOfRow(cstate); |
| } |
| |
| /* we can throw the error now if QueryCancelPending was set previously */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| pfree(cdbCopy); |
| |
| return processed; |
| } |
| |
| /* |
| * Copy FROM directory table TO file, in the dispatcher. Starts a COPY TO command |
| * on each of the executors and gathers all the results and writes it out. |
| */ |
| static uint64 |
| CopyToDispatchDirectoryTable(CopyToState cstate) |
| { |
| CopyStmt *stmt = glob_copystmt; |
| TupleDesc tupDesc; |
| CdbCopy *cdbCopy; |
| uint64 processed = 0; |
| |
| tupDesc = cstate->rel->rd_att; |
| |
| cstate->fe_msgbuf = makeStringInfo(); |
| cdbCopy = makeCdbCopyTo(cstate); |
| |
| /* |
| * Start a COPY command in every db of every segment in Apache Cloudberry. |
| * |
| * From this point in the code we need to be extra careful |
| * about error handling. ereport() must not be called until |
| * the COPY command sessions are closed on the executors. |
| * Calling ereport() will leave the executors hanging in |
| * COPY state. |
| */ |
| elog(DEBUG5, "COPY command sent to segdbs"); |
| |
| PG_TRY(); |
| { |
| bool done; |
| |
| cdbCopyStart(cdbCopy, stmt, cstate->file_encoding); |
| |
| if (!cstate->opts.binary) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("Only support copy binary to directory table."))); |
| |
| /* |
| * This is the main work-loop. In here we keep collecting data from the |
| * COPY commands on the segdbs, until no more data is available. We |
| * keep writing data out a chunk at a time. |
| */ |
| do |
| { |
| bool copy_cancel = (QueryCancelPending ? true : false); |
| |
| /* get a chunk of data rows from the QE's */ |
| done = cdbCopyGetData(cdbCopy, copy_cancel, &processed); |
| |
| /* send the chunk of data rows to destination (file or stdout) */ |
| if (cdbCopy->copy_out_buf.len > 0) /* conditional is important! */ |
| { |
| /* |
| * in the dispatcher we receive chunks of file and flush it. |
| */ |
| CopySendData(cstate, (void *) cdbCopy->copy_out_buf.data, cdbCopy->copy_out_buf.len); |
| CopyToDispatchFlush(cstate); |
| } |
| } while (!done); |
| |
| cdbCopyEnd(cdbCopy, NULL, NULL); |
| |
| /* now it's safe to destroy the whole dispatcher state */ |
| CdbDispatchCopyEnd(cdbCopy); |
| } |
| /* catch error */ |
| PG_CATCH(); |
| { |
| MemoryContext oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| cdbCopyAbort(cdbCopy); |
| |
| MemoryContextSwitchTo(oldcontext); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| /* we can throw the error now if QueryCancelPending was set previously */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| pfree(cdbCopy); |
| |
| if (processed) |
| return 1; |
| else |
| return 0; |
| } |
| |
| /* |
| * Copy from relation or query TO file. |
| */ |
| static uint64 |
| CopyTo(CopyToState cstate) |
| { |
| TupleDesc tupDesc; |
| int num_phys_attrs; |
| ListCell *cur; |
| uint64 processed = 0; |
| |
| if (cstate->rel && cstate->rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE) |
| return CopyToDirectoryTable(cstate); |
| |
| if (cstate->rel) |
| tupDesc = RelationGetDescr(cstate->rel); |
| else |
| tupDesc = cstate->queryDesc->tupDesc; |
| num_phys_attrs = tupDesc->natts; |
| cstate->opts.null_print_client = cstate->opts.null_print; /* default */ |
| |
| /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ |
| cstate->fe_msgbuf = makeStringInfo(); |
| |
| /* Get info about the columns we need to process. */ |
| cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); |
| foreach(cur, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| Oid out_func_oid; |
| bool isvarlena; |
| Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); |
| |
| if (cstate->opts.binary) |
| getTypeBinaryOutputInfo(attr->atttypid, |
| &out_func_oid, |
| &isvarlena); |
| else |
| getTypeOutputInfo(attr->atttypid, |
| &out_func_oid, |
| &isvarlena); |
| fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); |
| } |
| |
| /* |
| * Create a temporary memory context that we can reset once per row to |
| * recover palloc'd memory. This avoids any problems with leaks inside |
| * datatype output routines, and should be faster than retail pfree's |
| * anyway. (We don't need a whole econtext as CopyFrom does.) |
| */ |
| cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, |
| "COPY TO", |
| ALLOCSET_DEFAULT_SIZES); |
| if (!cstate->opts.binary) |
| { |
| /* |
| * For non-binary copy, we need to convert null_print to file |
| * encoding, because it will be sent directly with CopySendString. |
| */ |
| if (cstate->need_transcoding) |
| cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, |
| cstate->opts.null_print_len, |
| cstate->file_encoding); |
| } |
| |
| if (Gp_role == GP_ROLE_EXECUTE && !cstate->opts.on_segment) |
| { |
| /* header should not be printed in execute mode. */ |
| } |
| else if (cstate->opts.binary) |
| { |
| /* Generate header for a binary copy */ |
| int32 tmp; |
| |
| /* Signature */ |
| CopySendData(cstate, BinarySignature, 11); |
| /* Flags field */ |
| tmp = 0; |
| CopySendInt32(cstate, tmp); |
| /* No header extension */ |
| tmp = 0; |
| CopySendInt32(cstate, tmp); |
| } |
| else |
| { |
| /* if a header has been requested send the line */ |
| if (cstate->opts.header_line) |
| { |
| bool hdr_delim = false; |
| |
| foreach(cur, cstate->attnumlist) |
| { |
| int attnum = lfirst_int(cur); |
| char *colname; |
| |
| if (hdr_delim) |
| CopySendChar(cstate, cstate->opts.delim[0]); |
| hdr_delim = true; |
| |
| colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); |
| |
| CopyAttributeOutCSV(cstate, colname, false, |
| list_length(cstate->attnumlist) == 1); |
| } |
| |
| CopySendEndOfRow(cstate); |
| } |
| } |
| |
| if (cstate->rel) |
| { |
| TupleTableSlot *slot; |
| TableScanDesc scandesc; |
| |
| scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL); |
| slot = table_slot_create(cstate->rel, NULL); |
| |
| processed = 0; |
| while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot)) |
| { |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* Deconstruct the tuple ... */ |
| slot_getallattrs(slot); |
| |
| /* Format and send the data */ |
| CopyOneRowTo(cstate, slot); |
| |
| /* |
| * Increment the number of processed tuples, and report the |
| * progress. |
| */ |
| pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
| ++processed); |
| } |
| |
| ExecDropSingleTupleTableSlot(slot); |
| table_endscan(scandesc); |
| } |
| else |
| { |
| Assert(Gp_role != GP_ROLE_EXECUTE); |
| |
| /* run the plan --- the dest receiver will send tuples */ |
| ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true); |
| processed = ((DR_copy *) cstate->queryDesc->dest)->processed; |
| } |
| |
| if (Gp_role == GP_ROLE_EXECUTE && !cstate->opts.on_segment) |
| { |
| /* |
| * Trailer should not be printed in execute mode. The dispatcher will |
| * write it once. |
| */ |
| } |
| else if (cstate->opts.binary) |
| { |
| /* Generate trailer for a binary copy */ |
| CopySendInt16(cstate, -1); |
| |
| /* Need to flush out the trailer */ |
| CopySendEndOfRow(cstate); |
| } |
| |
| if (Gp_role == GP_ROLE_EXECUTE && cstate->opts.on_segment) |
| SendNumRows(0, processed); |
| |
| MemoryContextDelete(cstate->rowcontext); |
| |
| return processed; |
| } |
| |
| /* |
| * Copy directory table To QD. |
| */ |
| static uint64 |
| CopyToDirectoryTable(CopyToState cstate) |
| { |
| uint64 processed = 0; |
| TupleTableSlot *slot; |
| TableScanDesc scandesc; |
| ScanKeyData skey; |
| Datum datum; |
| bool isnull; |
| char *relative_path; |
| char *scopedFileUrl; |
| DirectoryTable *dirTable; |
| UFile *file; |
| int64_t file_size; |
| pg_cryptohash_ctx *hashCtx; |
| char hexMd5Sum[256]; |
| uint8 md5Sum[MD5_DIGEST_LENGTH]; |
| char *md5; |
| |
| Assert(cstate->rel); |
| |
| dirTable = GetDirectoryTable(RelationGetRelid(cstate->rel)); |
| |
| /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ |
| cstate->fe_msgbuf = makeStringInfo(); |
| |
| /* |
| * Create a temporary memory context that we can reset once per row to |
| * recover palloc'd memory. This avoids any problems with leaks inside |
| * datatype output routines, and should be faster than retail pfree's |
| * anyway. (We don't need a whole econtext as CopyFrom does.) |
| */ |
| cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext, |
| "COPY TO", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| ScanKeyInit(&skey, |
| (AttrNumber) 1, |
| BTEqualStrategyNumber, F_TEXTEQ, |
| CStringGetTextDatum(cstate->dirfilename)); |
| scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 1, &skey); |
| slot = table_slot_create(cstate->rel, NULL); |
| |
| processed = 0; |
| if (table_scan_getnextslot(scandesc, ForwardScanDirection, slot)) |
| { |
| char buffer[4096]; |
| char errorMessage[256]; |
| int bytesRead; |
| |
| /* Deconstruct the tuple ... */ |
| slot_getallattrs(slot); |
| |
| datum = slot_getattr(slot, 1, &isnull); |
| Assert(isnull == false); |
| relative_path = TextDatumGetCString(datum); |
| scopedFileUrl = psprintf("%s/%s", dirTable->location, relative_path); |
| |
| hashCtx = pg_cryptohash_create(PG_MD5); |
| if (hashCtx == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_OUT_OF_MEMORY), |
| errmsg("failed to create md5hash context: out of memory"))); |
| pg_cryptohash_init(hashCtx); |
| |
| file = UFileOpen(dirTable->spcId, |
| scopedFileUrl, |
| O_RDONLY, |
| errorMessage, |
| sizeof(errorMessage)); |
| if (file == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to open file \"%s\": %s", |
| scopedFileUrl, errorMessage))); |
| |
| file_size = UFileSize(file); |
| |
| while (true) |
| { |
| bytesRead = UFileRead(file, buffer, sizeof(buffer)); |
| if (bytesRead == -1) |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("failed to read file \"%s\": %s", scopedFileUrl, UFileGetLastError(file)))); |
| |
| if (bytesRead == 0) |
| break; |
| |
| CopySendData(cstate, buffer, bytesRead); |
| pg_cryptohash_update(hashCtx, (const uint8 *) buffer, bytesRead); |
| CopySendEndOfRow(cstate); |
| } |
| |
| pg_cryptohash_final(hashCtx, md5Sum, sizeof(md5Sum)); |
| pg_cryptohash_free(hashCtx); |
| bytesToHex(md5Sum, hexMd5Sum); |
| |
| /* Get md5 from schema table */ |
| datum = slot_getattr(slot, 4, &isnull); |
| Assert(isnull == false); |
| md5 = TextDatumGetCString(datum); |
| |
| if (strcmp(md5, hexMd5Sum) != 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("Copy directory table to file failed, as file content is not consistent."))); |
| |
| if (UFileClose(file) < 0) |
| ereport(ERROR, |
| (errcode(ERRCODE_IO_ERROR), |
| errmsg("failed to close the file \"%s\": %s", |
| scopedFileUrl, UFileGetLastError(file)))); |
| |
| pfree(file); |
| |
| /* |
| * Increment the number of processed tuples, and report the |
| * progress. |
| */ |
| pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, |
| ++processed); |
| } |
| |
| ExecDropSingleTupleTableSlot(slot); |
| table_endscan(scandesc); |
| |
| MemoryContextDelete(cstate->rowcontext); |
| |
| return processed; |
| } |
| |
| void |
| CopyOneCustomRowTo(CopyToState cstate, bytea *value) |
| { |
| appendBinaryStringInfo(cstate->fe_msgbuf, |
| VARDATA_ANY((void *) value), |
| VARSIZE_ANY_EXHDR((void *) value)); |
| } |
| |
| /* |
| * Release resources allocated in a cstate for COPY TO/FROM. |
| */ |
| static void |
| EndCopy(CopyToState cstate) |
| { |
| if (cstate->is_program) |
| { |
| if (cstate->copy_file) |
| { |
| fclose(cstate->copy_file); |
| cstate->copy_file = NULL; |
| } |
| |
| close_program_pipes(cstate->program_pipes, true); |
| } |
| else |
| { |
| if (cstate->filename != NULL && FreeFile(cstate->copy_file)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not close file \"%s\": %m", |
| cstate->filename))); |
| } |
| |
| pgstat_progress_end_command(); |
| |
| MemoryContextDelete(cstate->copycontext); |
| pfree(cstate); |
| } |
| |
| void |
| CopySendChar(CopyToState cstate, char c) |
| { |
| appendStringInfoCharMacro(cstate->fe_msgbuf, c); |
| } |
| |
| CopyToState |
| BeginCopyToDirectoryTable(ParseState *pstate, |
| const char *filename, |
| const char *dirfilename, |
| Relation rel, |
| bool is_program, |
| List *options) |
| { |
| CopyToState cstate; |
| bool pipe; |
| MemoryContext oldcontext; |
| const int progress_cols[] = { |
| PROGRESS_COPY_COMMAND, |
| PROGRESS_COPY_TYPE |
| }; |
| int64 progress_vals[] = { |
| PROGRESS_COPY_COMMAND_TO, |
| 0 |
| }; |
| |
| if (!glob_copystmt->dirfilename) |
| ereport(ERROR, |
| (errcode(ERRCODE_UNDEFINED_OBJECT), |
| errmsg("COPY to directory table must specify the relative_path name."))); |
| |
| /* Allocate workspace and zero all fields */ |
| cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData)); |
| |
| glob_cstate = cstate; |
| |
| /* |
| * We allocate everying used by a cstate in a new memory context. This |
| * avoids memory leaks during repeated use of COPY in a query. |
| */ |
| cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext, |
| "COPY", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| oldcontext = MemoryContextSwitchTo(cstate->copycontext); |
| |
| /* Process the target relation */ |
| Assert(rel); |
| cstate->rel = rel; |
| |
| /* Check whether copy directory table options allowed */ |
| ProcessCopyDirectoryTableOptions(pstate, &cstate->opts, true, options, rel->rd_id); |
| |
| if (Gp_role == GP_ROLE_DISPATCH && !cstate->opts.binary) |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("Only support copy binary directory table to."))); |
| |
| cstate->file_encoding = GetDatabaseEncoding(); |
| cstate->need_transcoding = false; |
| |
| /* See Multibyte encoding comment above */ |
| cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding); |
| |
| cstate->copy_dest = COPY_FILE; /* default */ |
| |
| pipe = (filename == NULL || Gp_role == GP_ROLE_EXECUTE); |
| |
| /* Determine the mode */ |
| if (Gp_role == GP_ROLE_DISPATCH && cstate->opts.on_segment && |
| cstate->rel && cstate->rel->rd_cdbpolicy) |
| { |
| cstate->dispatch_mode = COPY_DISPATCH; |
| } |
| else |
| cstate->dispatch_mode = COPY_DIRECT; |
| |
| if (cstate->opts.on_segment && Gp_role == GP_ROLE_DISPATCH) |
| { |
| /* in On SEGMENT mode, we don't open anything on the dispatcher. */ |
| |
| if (filename == NULL) |
| ereport(ERROR, |
| (errcode(ERRCODE_SYNTAX_ERROR), |
| errmsg("STDOUT is not supported by 'COPY ON SEGMENT'"))); |
| } |
| else if (pipe) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; |
| cstate->dirfilename = pstrdup(dirfilename); |
| |
| /* |
| * the grammar does not allow this on QD. |
| * on QE, this could happen |
| */ |
| Assert(!is_program || Gp_role == GP_ROLE_EXECUTE); |
| if (whereToSendOutput != DestRemote) |
| cstate->copy_file = stdout; |
| } |
| else |
| { |
| cstate->filename = pstrdup(filename); |
| cstate->dirfilename = pstrdup(dirfilename); |
| cstate->is_program = is_program; |
| |
| if (is_program) |
| { |
| progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; |
| cstate->program_pipes = open_program_pipes(cstate->filename, true); |
| cstate->copy_file = fdopen(cstate->program_pipes->pipes[EXEC_DATA_P], PG_BINARY_W); |
| if (cstate->copy_file == NULL) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not execute command \"%s\": %m", |
| cstate->filename))); |
| } |
| else |
| { |
| mode_t oumask; /* Pre-existing umask value */ |
| struct stat st; |
| |
| progress_vals[1] = PROGRESS_COPY_TYPE_FILE; |
| |
| /* |
| * Prevent write to relative path ... too easy to shoot oneself in |
| * the foot by overwriting a database file ... |
| */ |
| if (!is_absolute_path(cstate->filename)) |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_NAME), |
| errmsg("relative path not allowed for COPY to file"))); |
| |
| oumask = umask(S_IWGRP | S_IWOTH); |
| PG_TRY(); |
| { |
| cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W); |
| } |
| PG_FINALLY(); |
| { |
| umask(oumask); |
| } |
| PG_END_TRY(); |
| if (cstate->copy_file == NULL) |
| { |
| /* copy errno because ereport subfunctions might change it */ |
| int save_errno = errno; |
| |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not open file \"%s\" for writing: %m", |
| cstate->filename), |
| (save_errno == ENOENT || save_errno == EACCES) ? |
| errhint("COPY TO instructs the PostgreSQL server process to write a file. " |
| "You may want a client-side facility such as psql's \\copy.") : 0)); |
| } |
| |
| // Increase buffer size to improve performance (cmcdevitt) |
| /* GPDB_14_MERGE_FIXME: Ret value process. */ |
| setvbuf(cstate->copy_file, NULL, _IOFBF, 393216); // 384 Kbytes |
| |
| if (fstat(fileno(cstate->copy_file), &st)) |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("could not stat file \"%s\": %m", |
| cstate->filename))); |
| |
| if (S_ISDIR(st.st_mode)) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("\"%s\" is a directory", cstate->filename))); |
| } |
| } |
| /* initialize progress */ |
| pgstat_progress_start_command(PROGRESS_COMMAND_COPY, |
| cstate->rel ? RelationGetRelid(cstate->rel) :InvalidOid); |
| pgstat_progress_update_multi_param(2, progress_cols, progress_vals); |
| |
| cstate->bytes_processed = 0; |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| return cstate; |
| } |
| |