blob: efbf6a0ba398f81a7ec622978b3e40c78696d89a [file]
/*-------------------------------------------------------------------------
*
* copy.c
* Implements the COPY utility command
*
* Portions Copyright (c) 2005-2008, Greenplum inc
* Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates.
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/commands/copy.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq-int.h"
#include <ctype.h>
#include <unistd.h>
#include <sys/stat.h>
#include "access/sysattr.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/pg_authid.h"
#include "commands/copy.h"
#include "commands/copyfrom_internal.h"
#include "commands/copyto_internal.h"
#include "commands/defrem.h"
#include "executor/executor.h"
#include "libpq/libpq.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "parser/parse_coerce.h"
#include "parser/parse_collate.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
#include "storage/execute_pipe.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/snapmgr.h"
#include "access/external.h"
#include "access/url.h"
#include "catalog/catalog.h"
#include "catalog/gp_matview_aux.h"
#include "catalog/namespace.h"
#include "catalog/pg_extprotocol.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbaocsam.h"
#include "cdb/cdbconn.h"
#include "cdb/cdbcopy.h"
#include "cdb/cdbdisp_query.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbvars.h"
#include "commands/copyto_internal.h"
#include "commands/queue.h"
#include "nodes/makefuncs.h"
#include "postmaster/autostats.h"
#include "utils/metrics_utils.h"
#include "utils/resscheduler.h"
#include "utils/string_utils.h"
/* non-export function prototypes */
static uint64 CopyDispatchOnSegment(const CopyStmt *stmt);
static uint64 CopyToQueryOnSegment(CopyToState cstate);
/* Low-level communications functions */
static List *parse_joined_option_list(char *str, char *delimiter);
volatile CopyToState glob_cstate = NULL;
/* GPDB_91_MERGE_FIXME: passing through a global variable like this is ugly */
CopyStmt *glob_copystmt = NULL;
/*
* DoCopy executes the SQL COPY statement
*
* Either unload or reload contents of table <relation>, depending on <from>.
* (<from> = true means we are inserting into the table.) In the "TO" case
* we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
* or DELETE query.
*
* If <pipe> is false, transfer is between the table and the file named
* <filename>. Otherwise, transfer is between the table and our regular
* input/output stream. The latter could be either stdin/stdout or a
* socket, depending on whether we're running under Postmaster control.
*
* Do not allow a Postgres user without the 'pg_read_server_files' or
* 'pg_write_server_files' role to read from or write to a file.
*
* Do not allow the copy if user doesn't have proper permission to access
* the table or the specifically requested columns.
*/
void
DoCopy(ParseState *pstate, const CopyStmt *stmt,
int stmt_location, int stmt_len,
uint64 *processed)
{
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL || Gp_role == GP_ROLE_EXECUTE);
Relation rel = NULL;
LOCKMODE lockmode;
Oid relid;
RawStmt *query = NULL;
Node *whereClause = NULL;
List *attnamelist = stmt->attlist;
List *options;
glob_cstate = NULL;
glob_copystmt = (CopyStmt *) stmt;
options = stmt->options;
if (stmt->sreh && !is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY single row error handling only available using COPY FROM")));
lockmode = is_from ? RowExclusiveLock : AccessShareLock;
if (stmt->relation)
{
/* Open and lock the relation, using the appropriate lock type. */
rel = table_openrv(stmt->relation, lockmode);
}
/*
* Disallow COPY to/from file or program except to users with the
* appropriate role.
*/
if (!pipe)
{
if (stmt->is_program)
{
if (!is_member_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser or a member of the pg_execute_server_program role to COPY to or from an external program"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
}
else
{
if (is_from && !is_member_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES) && rel->rd_rel->relkind != RELKIND_DIRECTORY_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser or a member of the pg_read_server_files role to COPY from a file"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
if (!is_from && !is_member_of_role(GetUserId(), ROLE_PG_WRITE_SERVER_FILES) && rel->rd_rel->relkind != RELKIND_DIRECTORY_TABLE)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser or a member of the pg_write_server_files role to COPY to a file"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
}
}
if (stmt->relation)
{
ParseNamespaceItem *nsitem;
RangeTblEntry *rte;
TupleDesc tupDesc;
List *attnums;
ListCell *cur;
Assert(!stmt->query);
if (is_from && !allowSystemTableMods && IsUnderPostmaster && IsSystemRelation(rel))
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
RelationGetRelationName(rel)),
errhint("Make sure the configuration parameter allow_system_table_mods is set.")));
}
relid = RelationGetRelid(rel);
nsitem = addRangeTableEntryForRelation(pstate, rel, lockmode,
NULL, false, false);
rte = nsitem->p_rte;
rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
if (stmt->whereClause)
{
/* add nsitem to query namespace */
addNSItemToQuery(pstate, nsitem, false, true, true);
/* Transform the raw expression tree */
whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
/* Make sure it yields a boolean result. */
whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
/* we have to fix its collations too */
assign_expr_collations(pstate, whereClause);
whereClause = eval_const_expressions(NULL, whereClause);
whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
}
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, attnamelist);
foreach (cur, attnums)
{
int attno = lfirst_int(cur) -
FirstLowInvalidHeapAttributeNumber;
if (is_from)
rte->insertedCols = bms_add_member(rte->insertedCols, attno);
else
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
ExecCheckRTPerms(pstate->p_rtable, true);
/*
* Permission check for row security policies.
*
* check_enable_rls will ereport(ERROR) if the user has requested
* something invalid and will otherwise indicate if we should enable
* RLS (returns RLS_ENABLED) or not for this COPY statement.
*
* If the relation has a row security policy and we are to apply it
* then perform a "query" copy and allow the normal query processing
* to handle the policies.
*
* If RLS is not enabled for this, then just fall through to the
* normal non-filtering relation handling.
*
* GPDB: Also do this for partitioned tables. In PostgreSQL, you get
* an error:
*
* ERROR: cannot copy from partitioned table "foo"
* HINT: Try the COPY (SELECT ...) TO variant.
*
* In GPDB 6 and before, support for COPYing partitioned table was
* implemented deenop in the COPY processing code. In GPDB 7,
* partitiong was replaced with upstream impementation, but for
* backwards-compatibility, we do the translation to "COPY (SELECT
* ...)" variant automatically, just like PostgreSQL does for RLS.
*/
if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED ||
(!is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE))
{
SelectStmt *select;
ColumnRef *cr;
ResTarget *target;
RangeVar *from;
List *targetList = NIL;
if (is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY FROM not supported with row-level security"),
errhint("Use INSERT statements instead.")));
/*
* Build target list
*
* If no columns are specified in the attribute list of the COPY
* command, then the target list is 'all' columns. Therefore, '*'
* should be used as the target list for the resulting SELECT
* statement.
*
* In the case that columns are specified in the attribute list,
* create a ColumnRef and ResTarget for each column and add them
* to the target list for the resulting SELECT statement.
*/
if (!stmt->attlist)
{
cr = makeNode(ColumnRef);
cr->fields = list_make1(makeNode(A_Star));
cr->location = -1;
target = makeNode(ResTarget);
target->name = NULL;
target->indirection = NIL;
target->val = (Node *) cr;
target->location = -1;
targetList = list_make1(target);
}
else
{
ListCell *lc;
foreach(lc, stmt->attlist)
{
/*
* Build the ColumnRef for each column. The ColumnRef
* 'fields' property is a String 'Value' node (see
* nodes/value.h) that corresponds to the column name
* respectively.
*/
cr = makeNode(ColumnRef);
cr->fields = list_make1(lfirst(lc));
cr->location = -1;
/* Build the ResTarget and add the ColumnRef to it. */
target = makeNode(ResTarget);
target->name = NULL;
target->indirection = NIL;
target->val = (Node *) cr;
target->location = -1;
/* Add each column to the SELECT statement's target list */
targetList = lappend(targetList, target);
}
}
/*
* Build RangeVar for from clause, fully qualified based on the
* relation which we have opened and locked.
*/
from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
pstrdup(RelationGetRelationName(rel)),
-1);
/* Build query */
select = makeNode(SelectStmt);
select->targetList = targetList;
select->fromClause = list_make1(from);
query = makeNode(RawStmt);
query->stmt = (Node *) select;
query->stmt_location = stmt_location;
query->stmt_len = stmt_len;
/*
* Close the relation for now, but keep the lock on it to prevent
* changes between now and when we start the query-based COPY.
*
* We'll reopen it later as part of the query-based COPY.
*/
table_close(rel, NoLock);
rel = NULL;
}
}
else
{
Assert(stmt->query);
query = makeNode(RawStmt);
query->stmt = stmt->query;
query->stmt_location = stmt_location;
query->stmt_len = stmt_len;
relid = InvalidOid;
rel = NULL;
}
if (is_from)
{
CopyFromState cstate;
Assert(rel);
if (stmt->sreh && Gp_role != GP_ROLE_EXECUTE && !rel->rd_cdbpolicy)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY single row error handling only available for distributed user tables")));
/*
* GPDB_91_MERGE_FIXME: is it possible to get to this point in the code
* with a temporary relation that belongs to another session? If so, the
* following code doesn't function as expected.
*/
/* check read-only transaction and parallel mode */
if (XactReadOnly && !rel->rd_islocaltemp)
PreventCommandIfReadOnly("COPY FROM");
cstate = BeginCopyFrom(pstate, rel, whereClause,
stmt->filename, stmt->is_program,
NULL, NULL, stmt->attlist, options);
/*
* Error handling setup
*/
if (cstate->opts.sreh)
{
/* Single row error handling requested */
SingleRowErrorDesc *sreh = cstate->opts.sreh;
char log_to_file = LOG_ERRORS_DISABLE;
if (IS_LOG_TO_FILE(sreh->log_error_type))
{
cstate->errMode = SREH_LOG;
/* LOG ERRORS PERSISTENTLY for COPY is not allowed for now. */
log_to_file = LOG_ERRORS_ENABLE;
}
else
{
cstate->errMode = SREH_IGNORE;
}
cstate->cdbsreh = makeCdbSreh(sreh->rejectlimit,
sreh->is_limit_in_rows,
cstate->filename,
stmt->relation->relname,
log_to_file);
if (rel)
cstate->cdbsreh->relid = RelationGetRelid(rel);
}
else
{
/* No single row error handling requested. Use "all or nothing" */
cstate->cdbsreh = NULL; /* default - no SREH */
cstate->errMode = ALL_OR_NOTHING; /* default */
}
PG_TRY();
{
if (Gp_role == GP_ROLE_DISPATCH && cstate->opts.on_segment)
*processed = CopyDispatchOnSegment(stmt);
else
*processed = CopyFrom(cstate); /* copy from file to database */
/* Handle copy to replicated table returns processed number */
if (Gp_role == GP_ROLE_DISPATCH &&
GpPolicyIsReplicated(cstate->rel->rd_cdbpolicy))
*processed = *processed / cstate->rel->rd_cdbpolicy->numsegments;
/*
* Update view info if we actualy copy data from other place.
*/
if (IS_QD_OR_SINGLENODE() && *processed > 0)
{
SetRelativeMatviewAuxStatus(relid,
MV_DATA_STATUS_EXPIRED_INSERT_ONLY,
MV_DATA_STATUS_TRANSFER_DIRECTION_ALL);
}
}
PG_CATCH();
{
if (cstate->cdbCopy)
{
MemoryContext oldcontext = MemoryContextSwitchTo(cstate->copycontext);
cdbCopyAbort(cstate->cdbCopy);
cstate->cdbCopy = NULL;
MemoryContextSwitchTo(oldcontext);
}
PG_RE_THROW();
}
PG_END_TRY();
EndCopyFrom(cstate);
}
else
{
CopyToState cstate;
/*
* GPDB_91_MERGE_FIXME: ExecutorStart() is called in BeginCopyTo,
* but the TRY-CATCH block only starts here. If an error is
* thrown in-between, we would fail to call mppExecutorCleanup. We
* really should be using a ResourceOwner or something else for
* cleanup, instead of TRY-CATCH blocks...
*
* Update: I tried to fix this using the glob_cstate hack. It's ugly,
* but fixes at least some cases that came up in regression tests.
*/
PG_TRY();
{
if (rel && rel->rd_rel->relkind == RELKIND_DIRECTORY_TABLE)
{
cstate = BeginCopyToDirectoryTable(pstate, stmt->filename, stmt->dirfilename,
rel, stmt->is_program, options);
}
else
{
cstate = BeginCopyTo(pstate, rel, query, relid,
stmt->filename, stmt->is_program,
stmt->attlist, options);
}
/*
* "copy t to file on segment" CopyDispatchOnSegment
* "copy (select * from t) to file on segment" CopyToQueryOnSegment
* "copy t/(select * from t) to file" DoCopyTo
*/
if (Gp_role == GP_ROLE_DISPATCH && cstate->opts.on_segment)
{
if (cstate->rel)
*processed = CopyDispatchOnSegment(stmt);
else
*processed = CopyToQueryOnSegment(cstate);
}
else
*processed = DoCopyTo(cstate); /* copy from database to file */
}
PG_CATCH();
{
if (glob_cstate && glob_cstate->queryDesc)
{
/* should shutdown the mpp stuff such as interconnect and dispatch thread */
mppExecutorCleanup(glob_cstate->queryDesc);
}
PG_RE_THROW();
}
PG_END_TRY();
EndCopyTo(cstate, processed);
}
/*
* Close the relation. If reading, we can release the AccessShareLock we
* got; if writing, we should hold the lock until end of transaction to
* ensure that updates will be committed before lock is released.
*/
if (rel != NULL)
table_close(rel, (is_from ? NoLock : AccessShareLock));
/* Issue automatic ANALYZE if conditions are satisfied (MPP-4082). */
if (Gp_role == GP_ROLE_DISPATCH && is_from)
{
bool inFunction = already_under_executor_run() || utility_nested();
auto_stats(AUTOSTATS_CMDTYPE_COPY, relid, *processed, inFunction);
}
}
/*
* Process the statement option list for COPY.
*
* Scan the options list (a list of DefElem) and transpose the information
* into *opts_out, applying appropriate error checking.
*
* If 'opts_out' is not NULL, it is assumed to be filled with zeroes initially.
*
* This is exported so that external users of the COPY API can sanity-check
* a list of options. In that usage, 'opts_out' can be passed as NULL and
* the collected data is just leaked until CurrentMemoryContext is reset.
*
* Note that additional checking, such as whether column names listed in FORCE
* QUOTE actually exist, has to be applied later. This just checks for
* self-consistency of the options list.
*/
void
ProcessCopyOptions(ParseState *pstate,
CopyFormatOptions *opts_out,
bool is_from,
List *options,
Oid rel_oid)
{
bool format_specified = false;
bool freeze_specified = false;
bool header_specified = false;
ListCell *option;
/* Support external use for option sanity checking */
if (opts_out == NULL)
opts_out = (CopyFormatOptions *) palloc0(sizeof(CopyFormatOptions));
opts_out->skip_foreign_partitions = false;
opts_out->delim_off = false;
opts_out->file_encoding = -1;
/* Extract options from the statement node tree */
foreach(option, options)
{
DefElem *defel = lfirst_node(DefElem, option);
if (strcmp(defel->defname, "format") == 0)
{
char *fmt = defGetString(defel);
if (format_specified)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
format_specified = true;
if (strcmp(fmt, "text") == 0)
/* default format */ ;
else if (strcmp(fmt, "csv") == 0)
opts_out->csv_mode = true;
else if (strcmp(fmt, "binary") == 0)
opts_out->binary = true;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY format \"%s\" not recognized", fmt),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "freeze") == 0)
{
if (freeze_specified)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
freeze_specified = true;
opts_out->freeze = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "delimiter") == 0)
{
if (opts_out->delim)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->delim = defGetString(defel);
if (opts_out->delim && pg_strcasecmp(opts_out->delim, "off") == 0)
opts_out->delim_off = true;
}
else if (strcmp(defel->defname, "null") == 0)
{
if (opts_out->null_print)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->null_print = defGetString(defel);
/*
* MPP-2010: unfortunately serialization function doesn't
* distinguish between 0x0 and empty string. Therefore we
* must assume that if NULL AS was indicated and has no value
* the actual value is an empty string.
*/
if(!opts_out->null_print)
opts_out->null_print = "";
}
else if (strcmp(defel->defname, "header") == 0)
{
if (header_specified)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
header_specified = true;
opts_out->header_line = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "quote") == 0)
{
if (opts_out->quote)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->quote = defGetString(defel);
}
else if (strcmp(defel->defname, "escape") == 0)
{
if (opts_out->escape)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->escape = defGetString(defel);
}
else if (strcmp(defel->defname, "force_quote") == 0)
{
if (opts_out->force_quote || opts_out->force_quote_all)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
if (defel->arg && IsA(defel->arg, A_Star))
opts_out->force_quote_all = true;
else if (defel->arg && IsA(defel->arg, List))
opts_out->force_quote = castNode(List, defel->arg);
else if (defel->arg && IsA(defel->arg, String))
{
if (strcmp(strVal(defel->arg), "*") == 0)
opts_out->force_quote_all = true;
else
{
/* OPTIONS (force_quote 'c1,c2') */
opts_out->force_quote = parse_joined_option_list(strVal(defel->arg), ",");
}
}
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "force_not_null") == 0)
{
if (opts_out->force_notnull)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
if (defel->arg && IsA(defel->arg, List))
opts_out->force_notnull = castNode(List, defel->arg);
else if (defel->arg && IsA(defel->arg, String))
{
/* OPTIONS (force_not_null 'c1,c2') */
opts_out->force_notnull = parse_joined_option_list(strVal(defel->arg), ",");
}
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "force_null") == 0)
{
if (opts_out->force_null)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
if (defel->arg && IsA(defel->arg, List))
opts_out->force_null = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "convert_selectively") == 0)
{
/*
* Undocumented, not-accessible-from-SQL option: convert only the
* named columns to binary form, storing the rest as NULLs. It's
* allowed for the column list to be NIL.
*/
if (opts_out->convert_selectively)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->convert_selectively = true;
if (defel->arg == NULL || IsA(defel->arg, List))
opts_out->convert_select = castNode(List, defel->arg);
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "encoding") == 0)
{
if (opts_out->file_encoding >= 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->file_encoding = pg_char_to_encoding(defGetString(defel));
if (opts_out->file_encoding < 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a valid encoding name",
defel->defname),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "fill_missing_fields") == 0)
{
if (opts_out->fill_missing)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
opts_out->fill_missing = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "newline") == 0)
{
if (opts_out->eol_str)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
opts_out->eol_str = strVal(defel->arg);
}
else if (strcmp(defel->defname, "sreh") == 0)
{
if (defel->arg == NULL || !IsA(defel->arg, SingleRowErrorDesc))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("argument to option \"%s\" must be a list of column names",
defel->defname)));
if (opts_out->sreh)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
opts_out->sreh = (SingleRowErrorDesc *) defel->arg;
}
else if (strcmp(defel->defname, "on_segment") == 0)
{
if (opts_out->on_segment)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
opts_out->on_segment = true;
}
else if (strcmp(defel->defname, "skip_foreign_partitions") == 0)
{
if (opts_out->skip_foreign_partitions)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
opts_out->skip_foreign_partitions = true;
}
else if (strcmp(defel->defname, "tag") == 0)
{
if (opts_out->tags)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->tags = defGetString(defel);
}
else if (!rel_is_external_table(rel_oid))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("option \"%s\" not recognized",
defel->defname),
parser_errposition(pstate, defel->location)));
}
/*
* Check for incompatible options (must do these two before inserting
* defaults)
*/
if (opts_out->binary && opts_out->delim)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("COPY cannot specify DELIMITER in BINARY mode")));
if (opts_out->binary && opts_out->null_print)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("COPY cannot specify NULL in BINARY mode")));
opts_out->eol_type = EOL_UNKNOWN;
/* Set defaults for omitted options */
if (!opts_out->delim)
opts_out->delim = opts_out->csv_mode ? "," : "\t";
if (!opts_out->null_print)
opts_out->null_print = opts_out->csv_mode ? "" : "\\N";
opts_out->null_print_len = strlen(opts_out->null_print);
if (opts_out->csv_mode)
{
if (!opts_out->quote)
opts_out->quote = "\"";
if (!opts_out->escape)
opts_out->escape = opts_out->quote;
}
if (!opts_out->csv_mode && !opts_out->escape)
opts_out->escape = "\\"; /* default escape for text mode */
/* Only single-byte delimiter strings are supported. */
/* GPDB: This is checked later */
#if 0
if (strlen(opts_out->delim) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must be a single one-byte character")));
#endif
/* Disallow end-of-line characters */
if (strchr(opts_out->delim, '\r') != NULL ||
strchr(opts_out->delim, '\n') != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY delimiter cannot be newline or carriage return")));
if (strchr(opts_out->null_print, '\r') != NULL ||
strchr(opts_out->null_print, '\n') != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY null representation cannot use newline or carriage return")));
/*
* Disallow unsafe delimiter characters in non-CSV mode. We can't allow
* backslash because it would be ambiguous. We can't allow the other
* cases because data characters matching the delimiter must be
* backslashed, and certain backslash combinations are interpreted
* non-literally by COPY IN. Disallowing all lower case ASCII letters is
* more than strictly necessary, but seems best for consistency and
* future-proofing. Likewise we disallow all digits though only octal
* digits are actually dangerous.
*/
if (!opts_out->csv_mode && !opts_out->delim_off &&
strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
opts_out->delim[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY delimiter cannot be \"%s\"", opts_out->delim)));
/* Check header */
/*
* In PostgreSQL, HEADER is not allowed in text mode either, but in GPDB,
* only forbid it with BINARY.
*/
if (opts_out->binary && opts_out->header_line)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("COPY cannot specify HEADER in BINARY mode")));
/* Check quote */
if (!opts_out->csv_mode && opts_out->quote != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY quote available only in CSV mode")));
if (opts_out->csv_mode && strlen(opts_out->quote) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY quote must be a single one-byte character")));
if (opts_out->csv_mode && opts_out->delim[0] == opts_out->quote[0] && !opts_out->delim_off)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY delimiter and quote must be different")));
/* Check escape */
if (opts_out->csv_mode && opts_out->escape != NULL && strlen(opts_out->escape) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY escape in CSV format must be a single character")));
if (!opts_out->csv_mode && opts_out->escape != NULL &&
(strchr(opts_out->escape, '\r') != NULL ||
strchr(opts_out->escape, '\n') != NULL))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY escape representation in text format cannot use newline or carriage return")));
if (!opts_out->csv_mode && opts_out->escape != NULL && strlen(opts_out->escape) != 1)
{
if (pg_strcasecmp(opts_out->escape, "off") != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY escape must be a single character, or [OFF/off] to disable escapes")));
}
/* Check force_quote */
if (!opts_out->csv_mode && (opts_out->force_quote || opts_out->force_quote_all))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force quote available only in CSV mode")));
if ((opts_out->force_quote || opts_out->force_quote_all) && is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force quote only available using COPY TO")));
/* Check force_notnull */
if (!opts_out->csv_mode && opts_out->force_notnull != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force not null available only in CSV mode")));
if (opts_out->force_notnull != NIL && !is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force not null only available using COPY FROM")));
/* Check force_null */
if (!opts_out->csv_mode && opts_out->force_null != NIL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force null available only in CSV mode")));
if (opts_out->force_null != NIL && !is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY force null only available using COPY FROM")));
/* Don't allow the delimiter to appear in the null string. */
if (strchr(opts_out->null_print, opts_out->delim[0]) != NULL && !opts_out->delim_off)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must not appear in the NULL specification")));
/* Don't allow the CSV quote char to appear in the null string. */
if (opts_out->csv_mode &&
strchr(opts_out->null_print, opts_out->quote[0]) != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
if (opts_out->tags != NULL && !is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY with tag only available using COPY FROM")));
/*
* DELIMITER
*
* Only single-byte delimiter strings are supported. In addition, if the
* server encoding is a multibyte character encoding we only allow the
* delimiter to be an ASCII character (like postgresql. For more info
* on this see discussion and comments in MPP-3756).
*/
if (pg_database_encoding_max_length() == 1)
{
/* single byte encoding such as ascii, latinx and other */
if (strlen(opts_out->delim) != 1 && !opts_out->delim_off)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must be a single one-byte character, or \'off\'")));
}
else
{
/* multi byte encoding such as utf8 */
if ((strlen(opts_out->delim) != 1 || IS_HIGHBIT_SET(opts_out->delim[0])) && !opts_out->delim_off )
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY delimiter must be a single one-byte character, or \'off\'")));
}
if (!opts_out->csv_mode && strchr(opts_out->delim, '\\') != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("COPY delimiter cannot be backslash")));
if (opts_out->fill_missing && !is_from)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("fill missing fields only available for data loading, not unloading")));
/* Use client encoding when ENCODING option is not specified. */
if (opts_out->file_encoding < 0)
opts_out->file_encoding = pg_get_client_encoding();
/*
* NEWLINE
*/
if (opts_out->eol_str)
{
if (!is_from)
{
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_YET),
errmsg("newline currently available for data loading only, not unloading")));
}
else
{
if (pg_strcasecmp(opts_out->eol_str, "lf") == 0)
opts_out->eol_type = EOL_NL;
else if (pg_strcasecmp(opts_out->eol_str, "cr") == 0)
opts_out->eol_type = EOL_CR;
else if (pg_strcasecmp(opts_out->eol_str, "crlf") == 0)
opts_out->eol_type = EOL_CRNL;
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalid value for NEWLINE \"%s\"",
opts_out->eol_str),
errhint("Valid options are: 'LF', 'CRLF' and 'CR'.")));
}
}
}
void
ProcessCopyDirectoryTableOptions(ParseState *pstate,
CopyFormatOptions *opts_out,
bool is_from,
List *options,
Oid rel_oid)
{
bool format_specified = false;
ListCell *option;
/* Support external use for option sanity checking */
if (opts_out == NULL)
opts_out = (CopyFormatOptions *) palloc0(sizeof(CopyFormatOptions));
opts_out->file_encoding = -1;
/* Extract options from the statement node tree */
foreach(option, options)
{
DefElem *defel = lfirst_node(DefElem, option);
if (strcmp(defel->defname, "format") == 0)
{
char *fmt = defGetString(defel);
if (format_specified)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
format_specified = true;
if (strcmp(fmt, "binary") == 0)
opts_out->binary = true;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("format option is not allowed in copy binary from directory table."),
parser_errposition(pstate, defel->location)));
}
else if (strcmp(defel->defname, "tag") == 0)
{
if (opts_out->tags)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options"),
parser_errposition(pstate, defel->location)));
opts_out->tags = defGetString(defel);
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("option \"%s\" not recognized",
defel->defname),
parser_errposition(pstate, defel->location)));
}
opts_out->eol_type = EOL_UNKNOWN;
/* Set defaults for omitted options */
if (!opts_out->delim)
opts_out->delim = opts_out->csv_mode ? "," : "\t";
if (!opts_out->null_print)
opts_out->null_print = opts_out->csv_mode ? "" : "\\N";
opts_out->null_print_len = strlen(opts_out->null_print);
if (opts_out->csv_mode)
{
if (!opts_out->quote)
opts_out->quote = "\"";
if (!opts_out->escape)
opts_out->escape = opts_out->quote;
}
if (!opts_out->csv_mode && !opts_out->escape)
opts_out->escape = "\\"; /* default escape for text mode */
}
/*
* Dispatch a COPY ON SEGMENT statement to QEs.
*/
static uint64
CopyDispatchOnSegment(const CopyStmt *stmt)
{
CopyStmt *dispatchStmt;
CdbPgResults pgresults = {0};
int i;
uint64 processed = 0;
uint64 rejected = 0;
dispatchStmt = copyObject((CopyStmt *) stmt);
CdbDispatchUtilityStatement((Node *) dispatchStmt,
DF_NEED_TWO_PHASE |
DF_WITH_SNAPSHOT |
DF_CANCEL_ON_ERROR,
NIL,
&pgresults);
/*
* GPDB_91_MERGE_FIXME: SREH handling seems to be handled in a different
* place for every type of copy. This should be consolidated with the
* others.
*/
for (i = 0; i < pgresults.numResults; ++i)
{
struct pg_result *result = pgresults.pg_results[i];
processed += result->numCompleted;
rejected += result->numRejected;
}
if (rejected)
ReportSrehResults(NULL, rejected);
cdbdisp_clearCdbPgResults(&pgresults);
return processed;
}
/*
* Modify the filename in cstate->filename, and cstate->cdbsreh if any,
* for COPY ON SEGMENT.
*
* Replaces the "<SEGID>" token in the filename with this segment's ID.
*/
void
MangleCopyFileName(char **filenameptr, CdbSreh *cdbsreh)
{
Assert(filenameptr && *filenameptr);
char *filename = *filenameptr;
StringInfoData filepath;
initStringInfo(&filepath);
appendStringInfoString(&filepath, filename);
replaceStringInfoString(&filepath, "<SEG_DATA_DIR>", DataDir);
if (strstr(filename, "<SEGID>") == NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("<SEGID> is required for file name")));
char segid_buf[8];
snprintf(segid_buf, 8, "%d", GpIdentity.segindex);
replaceStringInfoString(&filepath, "<SEGID>", segid_buf);
*filenameptr = filepath.data;
/* Rename filename if error log needed */
if (NULL != cdbsreh)
{
snprintf(cdbsreh->filename,
sizeof(cdbsreh->filename), "%s",
filepath.data);
}
}
static uint64
CopyToQueryOnSegment(CopyToState cstate)
{
Assert(Gp_role != GP_ROLE_EXECUTE);
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
return 0;
}
/*
* CopyGetAttnums - build an integer list of attnums to be copied
*
* The input attnamelist is either the user-specified column list,
* or NIL if there was none (in which case we want all the non-dropped
* columns).
*
* We don't include generated columns in the generated full list and we don't
* allow them to be specified explicitly. They don't make sense for COPY
* FROM, but we could possibly allow them for COPY TO. But this way it's at
* least ensured that whatever we copy out can be copied back in.
*
* rel can be NULL ... it's only used for error reports.
*/
List *
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{
List *attnums = NIL;
if (attnamelist == NIL)
{
/* Generate default column list */
int attr_count = tupDesc->natts;
int i;
for (i = 0; i < attr_count; i++)
{
if (TupleDescAttr(tupDesc, i)->attisdropped)
continue;
if (TupleDescAttr(tupDesc, i)->attgenerated)
continue;
attnums = lappend_int(attnums, i + 1);
}
}
else
{
/* Validate the user-supplied list and extract attnums */
ListCell *l;
foreach(l, attnamelist)
{
char *name = strVal(lfirst(l));
int attnum;
int i;
/* Lookup column name */
attnum = InvalidAttrNumber;
for (i = 0; i < tupDesc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(tupDesc, i);
if (att->attisdropped)
continue;
if (namestrcmp(&(att->attname), name) == 0)
{
if (att->attgenerated)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("column \"%s\" is a generated column",
name),
errdetail("Generated columns cannot be used in COPY.")));
attnum = att->attnum;
break;
}
}
if (attnum == InvalidAttrNumber)
{
if (rel != NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
name, RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
name)));
}
/* Check for duplicates */
if (list_member_int(attnums, attnum))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" specified more than once",
name)));
attnums = lappend_int(attnums, attnum);
}
}
return attnums;
}
/* remove end of line chars from end of a buffer */
void truncateEol(StringInfo buf, EolType eol_type)
{
int one_back = buf->len - 1;
int two_back = buf->len - 2;
if(eol_type == EOL_CRNL)
{
if(buf->len < 2)
return;
if(buf->data[two_back] == '\r' &&
buf->data[one_back] == '\n')
{
buf->data[two_back] = '\0';
buf->data[one_back] = '\0';
buf->len -= 2;
}
}
else
{
if(buf->len < 1)
return;
if(buf->data[one_back] == '\r' ||
buf->data[one_back] == '\n')
{
buf->data[one_back] = '\0';
buf->len--;
}
}
}
/* wrapper for truncateEol */
void
truncateEolStr(char *str, EolType eol_type)
{
StringInfoData buf;
buf.data = str;
buf.len = strlen(str);
buf.maxlen = buf.len;
truncateEol(&buf, eol_type);
}
ProgramPipes*
open_program_pipes(char *command, bool forwrite)
{
int save_errno;
pqsigfunc save_SIGPIPE;
/* set up extvar */
extvar_t extvar;
memset(&extvar, 0, sizeof(extvar));
external_set_env_vars(&extvar, command, false, NULL, NULL, false, 0);
ProgramPipes *program_pipes = palloc(sizeof(ProgramPipes));
program_pipes->pid = -1;
program_pipes->pipes[0] = -1;
program_pipes->pipes[1] = -1;
program_pipes->shexec = make_command(command, &extvar);
/*
* Preserve the SIGPIPE handler and set to default handling. This
* allows "normal" SIGPIPE handling in the command pipeline. Normal
* for PG is to *ignore* SIGPIPE.
*/
save_SIGPIPE = pqsignal(SIGPIPE, SIG_DFL);
program_pipes->pid = popen_with_stderr(program_pipes->pipes, program_pipes->shexec, forwrite);
save_errno = errno;
/* Restore the SIGPIPE handler */
pqsignal(SIGPIPE, save_SIGPIPE);
elog(DEBUG5, "COPY ... PROGRAM command: %s", program_pipes->shexec);
if (program_pipes->pid == -1)
{
errno = save_errno;
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("can not start command: %s", command)));
}
return program_pipes;
}
void
close_program_pipes(ProgramPipes *program_pipes, bool ifThrow)
{
int ret = 0;
StringInfoData sinfo;
initStringInfo(&sinfo);
/* just return if pipes not created, like when relation does not exist */
if (!program_pipes)
{
return;
}
ret = pclose_with_stderr(program_pipes->pid, program_pipes->pipes, &sinfo);
if (ret == 0 || !ifThrow)
{
return;
}
if (ret == -1)
{
/* pclose()/wait4() ended with an error; errno should be valid */
ereport(ERROR,
(errcode_for_file_access(),
errmsg("can not close pipe: %m")));
}
else if (!WIFSIGNALED(ret))
{
/*
* pclose() returned the process termination state.
*/
ereport(ERROR,
(errcode(ERRCODE_SQL_ROUTINE_EXCEPTION),
errmsg("command error message: %s", sinfo.data)));
}
}
static List *
parse_joined_option_list(char *str, char *delimiter)
{
char *token;
char *comma;
const char *whitespace = " \t\n\r";
List *cols = NIL;
int encoding = GetDatabaseEncoding();
token = strtokx2(str, whitespace, delimiter, "\"",
0, false, false, encoding);
while (token)
{
if (token[0] == ',')
break;
cols = lappend(cols, makeString(pstrdup(token)));
/* consume the comma if any */
comma = strtokx2(NULL, whitespace, delimiter, "\"",
0, false, false, encoding);
if (!comma || comma[0] != ',')
break;
token = strtokx2(NULL, whitespace, delimiter, "\"",
0, false, false, encoding);
}
return cols;
}