blob: 6474bcceb2d74867e85d4b2cc132fdced5776da9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*--------------------------------------------------------------------------
*
* cdbcopy.c
* Rrovides routines that executed a COPY command on an MPP cluster. These
* routines are called from the backend COPY command whenever MPP is in the
* default dispatch mode.
*
*--------------------------------------------------------------------------
*/
#include "postgres.h"
#include "gp-libpq-fe.h"
#include "gp-libpq-int.h"
#include "miscadmin.h"
#include "access/filesplit.h"
#include "tcop/tcopprot.h"
#include "cdb/cdbconn.h"
#include "cdb/cdbcopy.h"
#include "cdb/cdbdisp.h"
#include "cdb/cdbvars.h"
#include "cdb/cdblink.h"
#include "cdb/cdbdispatchresult.h"
#include "cdb/cdbfts.h"
#include "cdb/cdbgang.h"
#include "cdb/dispatcher.h"
#include "utils/memutils.h"
#include "cdb/cdbsrlz.h"
#include "commands/copy.h"
#include "optimizer/prep.h"
#include "tcop/pquery.h" /* PortalGetResource, should move to upper call! */
#include "utils/faultinjector.h"
extern int pq_putmessage(char msgtype, const char *s, size_t len);
/*
* Create a cdbCopy object that includes all the cdb
* information and state needed by the backend COPY.
*/
CdbCopy *
makeCdbCopy(bool is_copy_in, QueryResource *resource)
{
CdbCopy *c;
int seg;
c = palloc(sizeof(CdbCopy));
/* fresh start */
c->partition_num = list_length(resource->segments);
c->resource = resource;
c->remote_data_err = false;
c->io_errors = false;
c->copy_in = is_copy_in;
c->outseglist = NIL;
c->partitions = NULL;
c->ao_segnos = NIL;
initStringInfo(&(c->err_msg));
initStringInfo(&(c->err_context));
initStringInfo(&(c->copy_out_buf));
/* init gangs */
c->aotupcounts = NULL;
c->executors.segment_conns = NIL;
c->executors.errbuf.data = NULL;
c->executors.result = NULL;
c->executors.numresults = 0;
/* Initialize the state of each segment database */
c->segdb_state = (SegDbState **) palloc((c->partition_num) * sizeof(SegDbState *));
for (seg = 0; seg < c->partition_num; seg++)
{
c->segdb_state[seg] = (SegDbState *) palloc(2 * sizeof(SegDbState));
c->segdb_state[seg][0] = SEGDB_IDLE; /* Primary can't be OUT */
}
/* init seg list for copy out */
if (!c->copy_in)
{
for(seg = 0; seg < c->partition_num; seg++)
{
c->outseglist = lappend_int(c->outseglist, seg);
}
}
return c;
}
/*
* starts a copy command on a specific segment database.
*
* may pg_throw via elog/ereport.
*/
void
cdbCopyStart(CdbCopy *c, char *copyCmd, Oid relid, Oid relerror, List *err_aosegnos)
{
int seg;
MemoryContext oldcontext;
List *parsetree_list;
Node *parsetree = NULL;
List *querytree_list;
char *serializedQuerytree;
int serializedQuerytree_len;
Query *q = makeNode(Query);
/* clean err message */
c->err_msg.len = 0;
c->err_msg.data[0] = '\0';
c->err_msg.cursor = 0;
/*
* A context it is safe to build parse trees in.
* We don't want them in TopMemory, as the trees
* should only last for this one statement
*/
oldcontext = MemoryContextSwitchTo(MessageContext);
QueryContext = CurrentMemoryContext;
/* dispatch copy command to both primary and mirror writer gangs */
/*
* Let's parse the copy command into a query tree, serialize it, and
* send it down to the QE or DA.
*
* Note that we can't just use the original CopyStmt node in this routine,
* but we need to use the re-created command that copy.c prepared for us as
* it may be different from the original command in several cases (such as
* COPY into tables where a default value is evaluated on the QD).
*/
/*
* parse it to a raw parsetree
*/
parsetree_list = pg_parse_query(copyCmd);
/*
* Assume it will be one statement node, not multiple ones.
*/
parsetree = (Node *) linitial(parsetree_list);
Assert(nodeTag(parsetree) == T_CopyStmt);
/*
* Ok, we have a raw parse tree of the copy stmt.
*
* I don't think analyze and rewrite will do much to it,
* but it will at least package it up as a query node,
* which we need for serializing. And if the copy
* statement has a "select" statement embedded, this
* is essential.
*/
querytree_list = pg_analyze_and_rewrite(parsetree, copyCmd,
NULL, 0);
/*
* Again, assume it is one query node, not multiple
*/
q = (Query *)linitial(querytree_list);
Assert(IsA(q,Query));
Assert(q->commandType == CMD_UTILITY);
Assert(q->utilityStmt != NULL);
Assert(IsA(q->utilityStmt,CopyStmt));
q->querySource = QSRC_ORIGINAL;
q->canSetTag = true;
/* add in partitions for dispatch */
((CopyStmt *)q->utilityStmt)->partitions = c->partitions;
/* add in AO segno map for dispatch */
((CopyStmt *)q->utilityStmt)->ao_segnos = c->ao_segnos;
((CopyStmt *)q->utilityStmt)->err_aosegnos = err_aosegnos;
MemoryContextSwitchTo(oldcontext);
q->contextdisp = CreateQueryContextInfo();
if (c->copy_in)
{
List *result_segfileinfos = NIL;
List *err_segfileinfos = NIL;
prepareDispatchedCatalogRelation(q->contextdisp, relid, TRUE,
c->ao_segnos);
result_segfileinfos = GetResultRelSegFileInfos(relid, c->ao_segnos, result_segfileinfos);
if (OidIsValid(relerror))
{
prepareDispatchedCatalogSingleRelation(q->contextdisp, relerror, TRUE, err_aosegnos);
err_segfileinfos = fetchSegFileInfos(relerror, err_aosegnos);
}
((CopyStmt *)q->utilityStmt)->ao_segfileinfos = result_segfileinfos;
((CopyStmt *)q->utilityStmt)->err_aosegfileinfos = err_segfileinfos;
}
else
{
List *scantable_splits = NIL;
prepareDispatchedCatalogRelation(q->contextdisp, relid, FALSE, NULL);
scantable_splits = AssignAOSegFileSplitToSegment(relid, NIL,
c->partition_num,
scantable_splits);
((CopyStmt *)q->utilityStmt)->scantable_splits = scantable_splits;
}
FinalizeQueryContextInfo(q->contextdisp);
/*
* serialized the stmt tree, and dispatch it ....
*/
serializedQuerytree = serializeNode((Node *) q, &serializedQuerytree_len, NULL /*uncompressed_size*/);
Assert(serializedQuerytree != NULL);
dispatch_statement_string(copyCmd, serializedQuerytree, serializedQuerytree_len, c->resource, &c->executors, false);
DropQueryContextInfo(q->contextdisp);
/* fill in CdbCopy structure */
for (seg = 0; seg < c->partition_num; seg++)
{
c->segdb_state[seg][0] = SEGDB_COPY; /* we be jammin! */
}
return;
}
/*
* sends data to a copy command on a specific segment (usually
* the hash result of the data value).
*/
void
cdbCopySendData(CdbCopy *c, int target_seg, const char *buffer,
int nbytes)
{
SegmentDatabaseDescriptor *q;
int result;
/* clean err message */
c->err_msg.len = 0;
c->err_msg.data[0] = '\0';
c->err_msg.cursor = 0;
/*
* NOTE!! note that another DELIM was added, for the buf_converted
* in the code above. I didn't do it because it's broken right now
*/
q = list_nth(c->executors.segment_conns, target_seg);
/* transmit the COPY data */
result = PQputCopyData(q->conn, buffer, nbytes);
if (result != 1)
{
if (result == 0)
appendStringInfo(&(c->err_msg),
"Failed to send data to segment %d, attempt blocked\n",
target_seg);
if (result == -1)
appendStringInfo(&(c->err_msg),
"Failed to send data to segment %d: %s\n",
target_seg, PQerrorMessage(q->conn));
c->io_errors = true;
}
}
/*
* sends data to a copy command on a specific segment (usually
* the hash result of the data value).
*/
void
cdbCopySendDataSingle(CdbCopy *c, int target_seg, const char *buffer,
int nbytes)
{
SegmentDatabaseDescriptor *q;
int result;
/* clean err message */
c->err_msg.len = 0;
c->err_msg.data[0] = '\0';
c->err_msg.cursor = 0;
q = list_nth(c->executors.segment_conns, target_seg);
/* transmit the COPY data */
elog(DEBUG4,"PQputCopyData to segment %d\n", target_seg);
result = PQputCopyData(q->conn, buffer, nbytes);
if (result != 1)
{
if (result == 0)
appendStringInfo(&(c->err_msg),
"Failed to send data to segment %d, attempt blocked\n",
target_seg);
if (result == -1)
appendStringInfo(&(c->err_msg),
"Failed to send data to segment %d: %s\n",
target_seg, PQerrorMessage(q->conn));
c->io_errors = true;
}
}
/*
* gets a chunk of rows of data from a copy command.
* returns boolean true if done. Caller should still
* empty the leftovers in the outbuf in that case.
*/
bool cdbCopyGetData(CdbCopy *c, bool copy_cancel, uint64 *rows_processed)
{
SegmentDatabaseDescriptor *q;
PGresult *res;
int nbytes;
/* clean err message */
c->err_msg.len = 0;
c->err_msg.data[0] = '\0';
c->err_msg.cursor = 0;
/* clean out buf data */
c->copy_out_buf.len = 0;
c->copy_out_buf.data[0] = '\0';
c->copy_out_buf.cursor = 0;
/*
* MPP-7712: we used to issue the cancel-requests for each *row* we got back
* from each segment -- this is potentially millions of cancel-requests.
* Cancel requests consist of an out-of-band connection to the segment-postmaster,
* this is *not* a lightweight operation!
*/
if (copy_cancel)
{
ListCell *cur;
/* iterate through all the segments that still have data to give */
foreach(cur, c->outseglist)
{
int source_seg = lfirst_int(cur);
q = list_nth(c->executors.segment_conns, source_seg);
/* send a query cancel request to that segdb */
PQrequestCancel(q->conn);
}
}
/*
* Collect data rows from the segments that still have rows to
* give until chunk minimum size is reached
*/
while (c->copy_out_buf.len < COPYOUT_CHUNK_SIZE)
{
ListCell *cur;
/* iterate through all the segments that still have data to give */
foreach(cur, c->outseglist)
{
int source_seg = lfirst_int(cur);
char *buffer;
q = list_nth(c->executors.segment_conns, source_seg);
/* get 1 row of COPY data */
nbytes = PQgetCopyData(q->conn, &buffer, false);
/*
* SUCCESS -- got a row of data
*/
if (nbytes > 0 && buffer)
{
/* append the data row to the data chunk */
appendBinaryStringInfo(&(c->copy_out_buf), buffer, nbytes);
/* increment the rows processed counter for the end tag */
(*rows_processed)++;
PQfreemem(buffer);
}
/*
* DONE -- Got all the data rows from this segment, or a cancel request.
*/
else if (nbytes == -1)
{
/*
* Fetch any error status existing on completion of the COPY command.
*/
while (NULL != (res = PQgetResult(q->conn)))
{
/* if the COPY command had an error */
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
{
appendStringInfo(&(c->err_msg), "Error from segment %d: %s\n",
source_seg, PQresultErrorMessage(res));
c->remote_data_err = true;
}
/* free the PGresult object */
PQclear(res);
}
/* remove the segment that completed sending data from the list*/
c->outseglist = list_delete_int(c->outseglist, source_seg);
/* no more segments with data on the list. we are done */
if (list_length(c->outseglist) == 0)
return true; /* done */
/* start over from first seg as we just changes the seg list */
break;
}
/*
* ERROR!
*/
else
{
c->io_errors = true;
/* should never happen since we are blocking. Don't bother to try again, exit with error. */
if (nbytes == 0)
appendStringInfo(&(c->err_msg),
"Failed to get data from segment %d, attempt blocked\n",
source_seg);
if (nbytes == -2)
{
appendStringInfo(&(c->err_msg),
"Failed to get data from segment %d: %s\n",
source_seg, PQerrorMessage(q->conn));
/* remove the segment that completed sending data from the list*/
c->outseglist = list_delete_int(c->outseglist, source_seg);
/* no more segments with data on the list. we are done */
if (list_length(c->outseglist) == 0)
return true; /* done */
/* start over from first seg as we just changes the seg list */
break;
}
}
}
if (c->copy_out_buf.len > COPYOUT_CHUNK_SIZE)
break;
}
return false;
}
/*
* Process the results from segments after sending the end of copy command.
*/
static void
processCopyEndResults(CdbCopy *c,
int *results,
int size,
SegmentDatabaseDescriptor **failedSegDBs,
bool *err_header,
bool *first_error,
int *failed_count,
int *total_rows_rejected)
{
SegmentDatabaseDescriptor *seg_desc;
int seg;
PGresult *res;
int segment_rows_rejected = 0; /* num of rows rejected by this QE */
for (seg = 0; seg < size; seg ++)
{
int result = results[seg];
seg_desc = list_nth(c->executors.segment_conns, seg);
/* get command end status */
if (result == 0)
{
/* attempt blocked */
/*
* CDB TODO: Can this occur? The libpq documentation says,
* "this case is only possible if the connection is in
* nonblocking mode... wait for write-ready and try again",
* i.e., the proper response would be to retry, not error out.
*/
if (!(*err_header))
appendStringInfo(&(c->err_msg),
"Failed to complete COPY on the following:\n");
*err_header = true;
appendStringInfo(&(c->err_msg), "primary segment %d, attempt blocked\n",
seg);
c->io_errors = true;
}
/*
* Fetch any error status existing on completion of the COPY command.
* It is critical that for any connection that had an asynchronous
* command sent thru it, we call PQgetResult until it returns NULL.
* Otherwise, the next time a command is sent to that connection,
* it will return an error that there's a command pending.
*/
while ((res = PQgetResult(seg_desc->conn)) != NULL && PQstatus(seg_desc->conn) != CONNECTION_BAD)
{
/* if the COPY command had a data error */
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
{
/*
* Always append error from the primary. Append error from
* mirror only if its primary didn't have an error.
*
* For now, we only report the first error we get from the
* QE's.
*
* We get the error message in pieces so that we could append
* whoami to the primary error message only.
*/
if (*first_error)
{
char *pri = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
char *dtl = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
char *ctx = PQresultErrorField(res, PG_DIAG_CONTEXT);
if (pri)
appendStringInfo(&(c->err_msg), "%s", pri);
else
appendStringInfo(&(c->err_msg), "Unknown primary error.");
if (seg_desc->whoami)
appendStringInfo(&(c->err_msg)," (%s)", seg_desc->whoami);
if (dtl)
appendStringInfo(&(c->err_msg), "\n%s", dtl);
/*
* note that due to cdb_tidy_message() in elog.c "If more than
* one line, move lines after the first to errdetail" so we save
* the context in another stringInfo and fetch it in the error
* callback in copy.c, so it wouldn't appear as DETAIL...
*/
if (ctx)
appendStringInfo(&(c->err_context), "%s", ctx);
/* Indicate that the err_msg already was filled with one error */
*first_error = false;
}
c->remote_data_err = true;
}
/*
* If we are still in copy mode, tell QE to stop it. COPY_IN
* protocol has a way to say 'end of copy' but COPY_OUT doesn't.
* We have no option but sending cancel message and consume
* the output until the state transition to non-COPY.
*/
if (PQresultStatus(res) == PGRES_COPY_IN)
{
elog(LOG, "Segment still in copy in, retrying the putCopyEnd");
result = PQputCopyEnd(seg_desc->conn, NULL);
}
else if (PQresultStatus(res) == PGRES_COPY_OUT)
{
char *buffer = NULL;
elog(LOG, "Segment still in copy out, canceling QE");
/*
* I'm a bit worried about sending a cancel, as if this
* is a success case the QE gets inconsistent state than
* QD. But this code path is mostly for error handling
* and in a success case we wouldn't see COPY_OUT here.
* It's not clear what to do if this cancel failed,
* since this is not a path we can error out. FATAL maybe
* the way, but I leave it for now.
*/
PQrequestCancel(seg_desc->conn);
/* Need to consume data from QE until he recognizes cancel. */
PQgetCopyData(seg_desc->conn, &buffer, false);
if (buffer)
free(buffer);
}
/* in SREH mode, check if this seg rejected (how many) rows */
if (res->numRejected > 0)
segment_rows_rejected = res->numRejected;
/* Get AO tuple counts */
c->aotupcounts = process_aotupcounts(c->partitions, c->aotupcounts, res->aotupcounts, res->naotupcounts);
/* free the PGresult object */
int curSendback;
for (curSendback = 0 ; curSendback < res->numSendback; ++curSendback)
{
UpdateCatalogModifiedOnSegments(&res->sendback[curSendback]);
}
PQclear(res);
}
/* Finished with this segment db. */
c->segdb_state[seg][0] = SEGDB_DONE;
/*
* add number of rows rejected from this segment to the
* total of rejected rows. Only count from primary segs.
*/
if (segment_rows_rejected > 0)
*total_rows_rejected += segment_rows_rejected;
segment_rows_rejected = 0;
/* Lost the connection? */
if (PQstatus(seg_desc->conn) == CONNECTION_BAD)
{
if (!*(err_header))
appendStringInfo(&(c->err_msg),
"ERROR - Failed to complete COPY on the following:\n");
*err_header = true;
/* command error */
c->io_errors = true;
appendStringInfo(&(c->err_msg), "Primary segment %d, with error: %s\n",
seg, PQerrorMessage(seg_desc->conn));
/* Free the PGconn object. */
PQfinish(seg_desc->conn);
seg_desc->conn = NULL;
/* Let FTS deal with it! */
failedSegDBs[*failed_count] = seg_desc;
(*failed_count)++;
}
}
}
/*
* ends the copy command on all segment databases.
* c->err_msg will include the error msg(s) if any.
*
* returns the number of rows rejected by QE's. Normally
* will be 0 however in single row error handling mode
* could be larger than 0.
*/
int
cdbCopyEnd(CdbCopy *c)
{
SegmentDatabaseDescriptor *seg_desc;
SegmentDatabaseDescriptor **failedSegDBs;
int *results; /* final result of COPY command execution */
int seg;
int failed_count = 0;
int total_rows_rejected = 0; /* total num rows rejected by all QEs */
bool err_header = false;
bool first_error = true;
int size;
/* clean err message */
c->err_msg.len = 0;
c->err_msg.data[0] = '\0';
c->err_msg.cursor = 0;
/* allocate a failed segment database pointer array */
failedSegDBs = (SegmentDatabaseDescriptor **) palloc(c->partition_num * 2 * sizeof(SegmentDatabaseDescriptor *));
size = list_length(c->executors.segment_conns);
/* results from each segment */
results = (int *)palloc0(sizeof(int) * size);
for (seg = 0; seg < size; seg++)
{
seg_desc = list_nth(c->executors.segment_conns, seg);
elog(DEBUG1,"PQputCopyEnd seg %d ", seg);
/* end this COPY command */
results[seg] = PQputCopyEnd(seg_desc->conn, NULL);
}
processCopyEndResults(c, results, size,
failedSegDBs, &err_header,
&first_error, &failed_count, &total_rows_rejected);
dispatch_free_result(&c->executors);
/* If lost contact with segment db, try to reconnect. */
if (failed_count > 0)
{
elog(LOG, "%s", c->err_msg.data);
elog(LOG, "COPY passes failed segment(s) information to FTS");
FtsHandleNetFailure(failedSegDBs, failed_count);
}
pfree(results);
pfree(failedSegDBs);
return total_rows_rejected;
}