blob: 7305ae12a59bdb044fbbf777d71f1e243f699f3f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*--------------------------------------------------------------------------
*
* cdbsreh.c
* Provides routines for single row error handling for COPY and external
* tables.
*
*--------------------------------------------------------------------------
*/
#include "postgres.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "utils/acl.h"
#include "miscadmin.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "commands/tablecmds.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/uri.h"
#include "nodes/makefuncs.h"
#include "access/transam.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbdisp.h"
static void OpenErrorTable(CdbSreh *cdbsreh, RangeVar *errortable);
static void InsertIntoErrorTable(CdbSreh *cdbsreh);
static void CloseErrorTable(CdbSreh *cdbsreh);
static void DropErrorTable(CdbSreh *cdbsreh);
static int GetNextSegid(CdbSreh *cdbsreh);
static void VerifyErrorTableAttr(Form_pg_attribute *attr,
int attrnum,
const char *expected_attname,
Oid expected_atttype,
char *relname);
static void PreprocessByteaData(char *src);
/*
* makeCdbSreh
*
* Allocate and initialize a Single Row Error Handling state object.
* Pass in the only known parameters (both we get from the SQL stmt),
* the other variables are set later on, when they are known.
*/
CdbSreh *
makeCdbSreh(bool is_keep, bool reusing_existing_errtable,
int rejectlimit, bool is_limit_in_rows,
RangeVar *errortable, ResultRelSegFileInfo *segfileinfo, char *filename, char *relname)
{
CdbSreh *h;
h = palloc(sizeof(CdbSreh));
h->errmsg = NULL;
h->rawdata = NULL;
h->linenumber = 0;
h->processed = 0;
h->relname = relname;
h->rejectlimit = rejectlimit;
h->is_limit_in_rows = is_limit_in_rows;
h->rejectcount = 0;
h->is_server_enc = false;
h->is_keep = is_keep;
h->should_drop = false; /* we'll decide later */
h->reusing_errtbl = reusing_existing_errtable;
h->cdbcopy = NULL;
h->errtbl = NULL;
h->err_aoInsertDesc = NULL;
if (segfileinfo != NULL)
{
h->err_aosegno = segfileinfo->segno;
}
else
{
h->err_aosegno = 0;
}
h->err_aosegfileinfo = segfileinfo;
h->lastsegid = 0;
h->consec_csv_err = 0;
snprintf(h->filename, sizeof(h->filename), "%s", filename ? CleanseUriString(filename) : "<stdin>");
/* error table was specified open it (and create it first if necessary) */
if(errortable)
OpenErrorTable(h, errortable);
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype input routines, and should be faster than retail pfree's
* anyway.
*/
h->badrowcontext = AllocSetContextCreate(CurrentMemoryContext,
"SrehMemCtxt",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
return h;
}
void
destroyCdbSreh(CdbSreh *cdbsreh)
{
if (cdbsreh->err_aoInsertDesc)
{
StringInfo buf = NULL;
buf = PreSendbackChangedCatalog(1);
QueryContextDispatchingSendBack sendback = NULL;
sendback = CreateQueryContextDispatchingSendBack(1);
cdbsreh->err_aoInsertDesc->sendback = sendback;
sendback->relid = RelationGetRelid(cdbsreh->errtbl);
appendonly_insert_finish(cdbsreh->err_aoInsertDesc);
if (sendback && Gp_role == GP_ROLE_EXECUTE)
AddSendbackChangedCatalogContent(buf, sendback);
DropQueryContextDispatchingSendBack(sendback);
if (sendback && Gp_role == GP_ROLE_EXECUTE)
FinishSendbackChangedCatalog(buf);
}
/* delete the bad row context */
MemoryContextDelete(cdbsreh->badrowcontext);
/* close error table */
if (cdbsreh->errtbl)
CloseErrorTable(cdbsreh);
/* drop error table if need to */
if (cdbsreh->should_drop && Gp_role == GP_ROLE_DISPATCH)
DropErrorTable(cdbsreh);
pfree(cdbsreh);
}
/*
* HandleSingleRowError
*
* The single row error handler. Gets called when a data error happened
* in SREH mode. Reponsible for making a decision of what to do at that time.
*
* Some of the main actions are:
* - Keep track of reject limit. if reached make sure notify caller.
* - If error table used, call the error logger to log this error.
* - If QD COPY send the bad row to the QE COPY to deal with.
*
*/
void HandleSingleRowError(CdbSreh *cdbsreh)
{
/* increment total number of errors for this segment */
cdbsreh->rejectcount++;
/*
* if reached the segment reject limit don't do anything.
* (this will get checked and handled later on by the caller).
*/
if(IsRejectLimitReached(cdbsreh))
return;
/*
* If not using an error table, we don't need to do anything.
*
* However, if we use an error table:
* QE - log the error in the error table.
* QD - send the bad data row to a random QE (via roundrobin).
*/
if(cdbsreh->errtbl)
{
if (Gp_role == GP_ROLE_DISPATCH)
{
cdbCopySendData(cdbsreh->cdbcopy,
GetNextSegid(cdbsreh),
cdbsreh->rawdata,
strlen(cdbsreh->rawdata));
}
else
{
/* Insert into error table */
Insist(Gp_role == GP_ROLE_EXECUTE || Gp_role == GP_ROLE_UTILITY);
InsertIntoErrorTable(cdbsreh);
}
}
return; /* OK */
}
/*
* OpenErrorTable
*
* Open the error table for this operation, and perform all necessary checks.
*/
void OpenErrorTable(CdbSreh *cdbsreh, RangeVar *errortable)
{
AclResult aclresult;
AclMode required_access = ACL_INSERT;
Oid relOid;
/* Open and lock the error relation, using the appropriate lock type. */
cdbsreh->errtbl = heap_openrv(errortable, RowExclusiveLock);
relOid = RelationGetRelid(cdbsreh->errtbl);
/* Check relation permissions. */
aclresult = pg_class_aclcheck(relOid,
GetUserId(),
required_access);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(cdbsreh->errtbl));
/* check read-only transaction */
if (XactReadOnly &&
!isTempNamespace(RelationGetNamespace(cdbsreh->errtbl)))
ereport(ERROR,
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("transaction is read-only")));
/* make sure this is a regular relation */
if (cdbsreh->errtbl->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" exists in the database but is a non table relation",
RelationGetRelationName(cdbsreh->errtbl))));
if (!RelationIsAoRows(cdbsreh->errtbl))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" exists in the database but is a non appendonly relation",
RelationGetRelationName(cdbsreh->errtbl))));
}
/*
* CloseErrorTable
*
* Done using the error table. Close it.
*/
void CloseErrorTable(CdbSreh *cdbsreh)
{
/* close the error relation */
if (cdbsreh->errtbl)
heap_close(cdbsreh->errtbl, NoLock);
}
/*
* DropErrorTable
*
* Drop the error table from the database. This function will be called from
* destroyCdbSreh when an autogenerated error table was not used in the COPY
* operation granted KEEP wasn't specified.
*
*/
static
void DropErrorTable(CdbSreh *cdbsreh)
{
RangeVar *errtbl_rv;
Insist(Gp_role == GP_ROLE_DISPATCH);
ereport(NOTICE,
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
errmsg("Dropping the auto-generated unused error table"),
errhint("Use KEEP in LOG INTO clause to force keeping the error table alive")));
errtbl_rv = makeRangeVar(NULL /*catalogname*/, get_namespace_name(RelationGetNamespace(cdbsreh->errtbl)),
RelationGetRelationName(cdbsreh->errtbl), -1);
/* DROP the relation on the QD */
RemoveRelation(errtbl_rv,DROP_RESTRICT, NULL, RELKIND_RELATION);
}
/*
* InsertIntoErrorTable
*
* Insert the information in cdbsreh into the error table we are using.
*/
void InsertIntoErrorTable(CdbSreh *cdbsreh)
{
MemTuple tuple;
Oid tupleOid;
AOTupleId aoTupleId;
bool nulls[NUM_ERRORTABLE_ATTR];
Datum values[NUM_ERRORTABLE_ATTR];
MemoryContext oldcontext;
oldcontext = MemoryContextSwitchTo(cdbsreh->badrowcontext);
/* Initialize all values for row to NULL */
MemSet(values, 0, NUM_ERRORTABLE_ATTR * sizeof(Datum));
MemSet(nulls, true, NUM_ERRORTABLE_ATTR * sizeof(bool));
/* command start time */
values[errtable_cmdtime - 1] = TimestampTzGetDatum(GetCurrentStatementStartTimestamp());
nulls[errtable_cmdtime - 1] = false;
/* line number */
if (cdbsreh->linenumber > 0)
{
values[errtable_linenum - 1] = Int64GetDatum(cdbsreh->linenumber);
nulls[errtable_linenum - 1] = false;
}
if(cdbsreh->is_server_enc)
{
/* raw data */
values[errtable_rawdata - 1] = DirectFunctionCall1(textin, CStringGetDatum(cdbsreh->rawdata));
nulls[errtable_rawdata - 1] = false;
}
else
{
/* raw bytes */
PreprocessByteaData(cdbsreh->rawdata);
values[errtable_rawbytes - 1] = DirectFunctionCall1(byteain, CStringGetDatum(cdbsreh->rawdata));
nulls[errtable_rawbytes - 1] = false;
}
/* file name */
values[errtable_filename - 1] = DirectFunctionCall1(textin, CStringGetDatum(cdbsreh->filename));
nulls[errtable_filename - 1] = false;
/* relation name */
values[errtable_relname - 1] = DirectFunctionCall1(textin, CStringGetDatum(cdbsreh->relname));
nulls[errtable_relname - 1] = false;
/* error message */
values[errtable_errmsg - 1] = DirectFunctionCall1(textin, CStringGetDatum(cdbsreh->errmsg));
nulls[errtable_errmsg - 1] = false;
MemoryContextSwitchTo(oldcontext);
/*
* cdbsreh->err_aoInsertDesc should be available untile the end of statement.
*/
oldcontext = MemoryContextSwitchTo(PortalContext);
if (cdbsreh->err_aoInsertDesc == NULL)
cdbsreh->err_aoInsertDesc = appendonly_insert_init(cdbsreh->errtbl,
cdbsreh->err_aosegfileinfo);
MemoryContextSwitchTo(oldcontext);
/* form a mem tuple */
tuple = memtuple_form_to(cdbsreh->err_aoInsertDesc->mt_bind, values, nulls, NULL, NULL, true);
/* inserting into an append only relation */
appendonly_insert(cdbsreh->err_aoInsertDesc, tuple, &tupleOid, &aoTupleId);
}
/*
* ReportSrehResults
*
* When necessary emit a NOTICE that describes the end result of the
* SREH operations. Information includes the total number of rejected
* rows, and whether rows were ignored or logged into an error table.
*/
void ReportSrehResults(CdbSreh *cdbsreh, int total_rejected)
{
if(total_rejected > 0)
{
if(cdbsreh && cdbsreh->errtbl)
ereport(NOTICE,
(errmsg("Found %d data formatting errors (%d or more "
"input rows). Errors logged into error table \"%s\"",
total_rejected, total_rejected,
RelationGetRelationName(cdbsreh->errtbl))));
else
ereport(NOTICE,
(errmsg("Found %d data formatting errors (%d or more "
"input rows). Rejected related input data.",
total_rejected, total_rejected)));
}
}
/*
* SendNumRowsRejected
*
* Using this function the QE sends back to the client QD the number
* of rows that were rejected in this last data load in SREH mode.
*/
void SendNumRowsRejected(int numrejected)
{
StringInfoData buf;
if (Gp_role != GP_ROLE_EXECUTE)
elog(FATAL, "SendNumRowsRejected: called outside of execute context.");
pq_beginmessage(&buf, 'j'); /* 'j' is the msg code for rejected records */
pq_sendint(&buf, numrejected, 4);
pq_endmessage(&buf);
}
/*
* ValidateErrorTableMetaData
*
* This function gets called if a user wants an already existing table to be
* used as an error table for some COPY or external table operation with SREH.
* In here we verify that the metadata of the user selected table matches the
* predefined requirement for an error table.
*/
void ValidateErrorTableMetaData(Relation rel)
{
TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
TupleConstr *constr = tupDesc->constr;
char *relname = RelationGetRelationName(rel);
int attr_count = tupDesc->natts;
/*
* Verify number of attributes match
*/
if(attr_count != NUM_ERRORTABLE_ATTR)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" already exists and is not of a valid "
"error table format (expected %d attributes, found %d)",
relname, NUM_ERRORTABLE_ATTR, attr_count)));
/*
* Verify this table has no constraints
*
* (TODO: this currently checks for all the contraints in TupleConstr
* which are, defaults, check, and not null. We need to check
* for unique constraints as well).
*/
if(constr)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" already exists and is not of a valid "
"error table format. It appears to have constraints "
"defined.", relname)));
if (!RelationIsAoRows(rel))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" already exists and is not of a valid "
"error table format. It appears to not a appendonly table", relname)));
if (rel->rd_cdbpolicy == NULL
|| rel->rd_cdbpolicy->ptype != POLICYTYPE_PARTITIONED
|| rel->rd_cdbpolicy->nattrs != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" already exists and is not of a valid "
"error table format. It appears to not distributed randomly", relname)));
/*
* run through each attribute at a time and verify it's what we expect
*/
VerifyErrorTableAttr(attr, errtable_cmdtime, "cmdtime", TIMESTAMPTZOID, relname);
VerifyErrorTableAttr(attr, errtable_relname, "relname", TEXTOID, relname);
VerifyErrorTableAttr(attr, errtable_filename, "filename", TEXTOID, relname);
VerifyErrorTableAttr(attr, errtable_linenum, "linenum", INT4OID, relname);
VerifyErrorTableAttr(attr, errtable_bytenum, "bytenum", INT4OID, relname);
VerifyErrorTableAttr(attr, errtable_errmsg, "errmsg", TEXTOID, relname);
VerifyErrorTableAttr(attr, errtable_rawdata, "rawdata", TEXTOID, relname);
VerifyErrorTableAttr(attr, errtable_rawbytes, "rawbytes", BYTEAOID, relname);
}
/*
* VerifyErrorTableAttr
*
* Called by ValidateErrorTableMetaData() on each table attribute to verify
* that it has the right predefined name, type and other attr characteristics.
*/
void VerifyErrorTableAttr(Form_pg_attribute *attr,
int attrnum,
const char *expected_attname,
Oid expected_atttype,
char *relname)
{
bool cur_attisdropped = attr[attrnum - 1]->attisdropped;
Name cur_attname = &(attr[attrnum - 1]->attname);
Oid cur_atttype = attr[attrnum - 1]->atttypid;
int4 cur_attndims = attr[attrnum - 1]->attndims;
if(cur_attisdropped)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" includes dropped attributes and is "
"therefore not of a valid error table format",
relname)));
if (namestrcmp(cur_attname, expected_attname) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" is an invalid error table. Expected "
"attribute \"%s\" found \"%s\"",
relname, expected_attname, NameStr(*cur_attname))));
if(cur_atttype != expected_atttype)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" is an invalid error table. Wrong data "
"type for attribute \"%s\"",
relname, NameStr(*cur_attname))));
if(cur_attndims > 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Relation \"%s\" is an invalid error table. Attribute "
"\"%s\" has more than zero dimensions (array).",
relname, NameStr(*cur_attname))));
}
/*
* SetErrorTableVerdict
*
* Do we kill (DROP) the error table or let it live? This function is called
* at the end of execution of a COPY operation involving an error table. The
* rule is - always keep it alive if KEEP was specified in the COPY command
* otherwise DROP it only if no rows were rejected *and* if it was auto
* generated at the start of this COPY command (did not exist before).
*/
void
SetErrorTableVerdict(CdbSreh *cdbsreh, int total_rejected)
{
/* never let a QE decide whether to drop the table or not */
Insist(Gp_role == GP_ROLE_DISPATCH);
/* this is for COPY use only, don't call this function for external tables */
Insist(cdbsreh->cdbcopy);
/* we can't get here if we don't have an open error table */
Insist(cdbsreh->errtbl);
if(!cdbsreh->is_keep && !cdbsreh->reusing_errtbl && total_rejected == 0)
{
cdbsreh->should_drop = true; /* mark this table to be dropped */
}
}
/*
* IsRejectLimitReached
*
* Returns true if seg reject limit reached, false otherwise.
*/
bool IsRejectLimitReached(CdbSreh *cdbsreh)
{
bool limit_reached = false;
/* special case: check for un-parsable csv format errors */
if(CSV_IS_UNPARSABLE(cdbsreh))
return true;
/* now check if actual reject limit is reached */
if(cdbsreh->is_limit_in_rows)
{
/* limit is in ROWS */
limit_reached = (cdbsreh->rejectcount >= cdbsreh->rejectlimit ? true : false);
}
else
{
/* limit is in PERCENT */
/* calculate the percent only if threshold is satisfied */
if(cdbsreh->processed > gp_reject_percent_threshold)
{
if( (cdbsreh->rejectcount * 100) / cdbsreh->processed >= cdbsreh->rejectlimit)
limit_reached = true;
}
}
return limit_reached;
}
/*
* emitSameTxnWarning()
*
* Warn the user that it's better to have the error table created in a
* transaction that is older than the one that is using it. This mostly
* can happen in COPY but also in some not so common external table use
* cases (BEGIN, CREATE EXTERNAL TABLE with errtable, SELECT..., ...).
*/
void emitSameTxnWarning(void)
{
ereport(WARNING,
(errcode(ERRCODE_T_R_GP_ERROR_TABLE_MAY_DROP),
errmsg("The error table was created in the same "
"transaction as this operation. It will get "
"dropped if transaction rolls back even if bad "
"rows are present"),
errhint("To avoid this create the error table ahead "
"of time using: CREATE TABLE <name> (cmdtime "
"timestamp with time zone, relname text, "
"filename text, linenum integer, bytenum "
"integer, errmsg text, rawdata text, rawbytes "
"bytea)")));
}
/*
* GetNextSegid
*
* Return the next sequential segment id of available segids (roundrobin).
*/
static
int GetNextSegid(CdbSreh *cdbsreh)
{
int total_segs = cdbsreh->cdbcopy->partition_num;
if(cdbsreh->lastsegid == total_segs)
cdbsreh->lastsegid = 0; /* start over from first segid */
return (cdbsreh->lastsegid++ % total_segs);
}
/*
* This function is called when we are preparing to insert a bad row that
* includes an encoding error into the bytea field of the error table
* (rawbytes). In rare occasions this bad row may also have an invalid bytea
* sequence - a backslash not followed by a valid octal sequence - in which
* case inserting into the error table will fail. In here we make a pass to
* detect if there's a risk of failing. If there isn't we just return. If there
* is we remove the backslash and replace it with a x20 char. Yes, we are
* actually modifying the user data, but this is a much better opion than
* failing the entire load. It's also a bad row - a row that will require user
* intervention anyway in order to reload.
*
* reference: MPP-2107
*
* NOTE: code is copied from esc_dec_len() in encode.c and slightly modified.
*/
static
void PreprocessByteaData(char *src)
{
const char *end = src + strlen(src);
while (src < end)
{
if (src[0] != '\\')
src++;
else if (src + 3 < end &&
(src[1] >= '0' && src[1] <= '3') &&
(src[2] >= '0' && src[2] <= '7') &&
(src[3] >= '0' && src[3] <= '7'))
{
/*
* backslash + valid octal
*/
src += 4;
}
else if (src + 1 < end &&
(src[1] == '\\'))
{
/*
* two backslashes = backslash
*/
src += 2;
}
else
{
/*
* one backslash, not followed by ### valid octal. remove the
* backslash and put a x20 in its place.
*/
src[0] = ' ';
src++;
}
}
}
/*
* IsRejectLimitValid
*
* verify that the the reject limit specified by the user is within the
* allowed values for ROWS or PERCENT.
*/
void VerifyRejectLimit(char rejectlimittype, int rejectlimit)
{
if(rejectlimittype == 'r')
{
/* ROWS */
if(rejectlimit < 2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Segment reject limit in ROWS "
"must be 2 or larger (got %d)", rejectlimit)));
}
else
{
/* PERCENT */
Assert(rejectlimittype == 'p');
if (rejectlimit < 1 || rejectlimit > 100)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("Segment reject limit in PERCENT "
"must be between 1 and 100 (got %d)", rejectlimit)));
}
}