blob: c70c43f75b91fea2bb51b9d78c83fe6d461cf3f6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbdispatchresult.c
* Functions to dispatch commands to QExecutors
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <pthread.h>
#include "gp-libpq-fe.h" /* prerequisite for libpq-int.h */
#include "gp-libpq-int.h" /* PQExpBufferData */
#include "lib/stringinfo.h" /* StringInfoData */
#include "utils/guc.h" /* log_min_messages */
#include "utils/faultinjector.h"
#include "cdb/cdbconn.h" /* SegmentDatabaseDescriptor */
#include "cdb/cdbpartition.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbsreh.h"
#include "cdb/cdbdispatchresult.h" /* me */
#include "commands/tablecmds.h"
/* This mutex serializes writes by dispatcher threads to the
* iFirstError and errcode fields of CdbDispatchResults objects.
*/
static pthread_mutex_t setErrcodeMutex = PTHREAD_MUTEX_INITIALIZER;
/*--------------------------------------------------------------------*/
/* Make sure buffer has no trailing newline, whitespace, etc. */
static void
noTrailingNewline(StringInfo buf)
{
while (buf->len > 0 &&
buf->data[buf->len-1] <= ' ' &&
buf->data[buf->len-1] > '\0')
buf->data[--buf->len] = '\0';
} /* noTrailingNewline */
static void
noTrailingNewlinePQ(PQExpBuffer buf)
{
while (buf->len > 0 &&
buf->data[buf->len-1] <= ' ' &&
buf->data[buf->len-1] > '\0')
buf->data[--buf->len] = '\0';
} /* noTrailingNewlinePQ */
/* If buffer is nonempty, make sure it has exactly one trailing newline. */
static void
oneTrailingNewline(StringInfo buf)
{
noTrailingNewline(buf);
if (buf->len > 0)
appendStringInfoChar(buf, '\n');
} /* oneTrailingNewline */
static void
oneTrailingNewlinePQ(PQExpBuffer buf)
{
noTrailingNewlinePQ(buf);
if (buf->len > 0)
appendPQExpBufferChar(buf, '\n');
} /* oneTrailingNewlinePQ */
/*--------------------------------------------------------------------*/
/*
* Create a CdbDispatchResult object, appending it to the
* resultArray of a given CdbDispatchResults object.
*/
CdbDispatchResult *
cdbdisp_makeResult(struct CdbDispatchResults *meleeResults,
struct SegmentDatabaseDescriptor *segdbDesc,
int sliceIndex)
{
CdbDispatchResult *dispatchResult;
int meleeIndex;
Assert(meleeResults &&
meleeResults->resultArray &&
meleeResults->resultCount < meleeResults->resultCapacity);
/* Allocate a slot for the new CdbDispatchResult object. */
meleeIndex = meleeResults->resultCount++;
dispatchResult = &meleeResults->resultArray[meleeIndex];
#ifdef FAULT_INJECTOR
FaultInjector_InjectFaultIfSet(
CreateCdbDispathResultObject,
DDLNotSpecified,
"", // databaseName
""); // tableName
#endif
/* Initialize CdbDispatchResult. */
dispatchResult->meleeResults = meleeResults;
dispatchResult->meleeIndex = meleeIndex;
dispatchResult->segdbDesc = segdbDesc;
dispatchResult->resultbuf = createPQExpBuffer();
dispatchResult->error_message = NULL;
dispatchResult->numrowsrejected = 0;
if (PQExpBufferBroken(dispatchResult->resultbuf))
{
destroyPQExpBuffer(dispatchResult->resultbuf);
dispatchResult->resultbuf = NULL;
/* caller is responsible for cleanup -- can't elog(ERROR, ...) from here. */
return NULL;
}
/* Reset summary indicators. */
cdbdisp_resetResult(dispatchResult);
/* Update slice map entry. */
if (sliceIndex >= 0 &&
sliceIndex < meleeResults->sliceCapacity)
{
CdbDispatchResults_SliceInfo *si = &meleeResults->sliceMap[sliceIndex];
if (si->resultBegin == si->resultEnd)
{
si->resultBegin = meleeIndex;
si->resultEnd = meleeIndex + 1;
}
else
{
Assert(si->resultBegin <= meleeIndex);
Assert(si->resultEnd <= meleeIndex);
si->resultEnd = meleeIndex + 1;
}
}
/* Insert the slice index in error messages related to this QE. */
if (segdbDesc)
cdbconn_setSliceIndex(segdbDesc, sliceIndex);
return dispatchResult;
} /* cdbdisp_initResult */
/* Destroy a CdbDispatchResult object. */
void
cdbdisp_termResult(CdbDispatchResult *dispatchResult)
{
PQExpBuffer trash;
dispatchResult->segdbDesc = NULL;
/* Free the PGresult objects. */
cdbdisp_resetResult(dispatchResult);
/* Free the error message buffer and result buffer. */
trash = dispatchResult->resultbuf;
dispatchResult->resultbuf = NULL;
destroyPQExpBuffer(trash);
trash = dispatchResult->error_message;
dispatchResult->error_message = NULL;
destroyPQExpBuffer(trash);
} /* cdbdisp_termResult */
/* Reset a CdbDispatchResult object for possible reuse. */
void
cdbdisp_resetResult(CdbDispatchResult *dispatchResult)
{
PQExpBuffer buf = dispatchResult->resultbuf;
if (buf != NULL)
{
PGresult **begp = (PGresult **)buf->data;
PGresult **endp = (PGresult **)(buf->data + buf->len);
PGresult **p;
/* Free the PGresult objects. */
for (p = begp; p < endp; ++p)
{
Assert(*p != NULL);
PQclear(*p);
}
}
/* Reset summary indicators. */
dispatchResult->errcode = 0;
dispatchResult->errindex = -1;
dispatchResult->okindex = -1;
/* Reset progress indicators. */
dispatchResult->hasDispatched = false;
dispatchResult->stillRunning = false;
dispatchResult->requestedCancel = false;
dispatchResult->wasCanceled = false;
/* Empty (but don't free) the error message buffer and result buffer. */
resetPQExpBuffer(dispatchResult->resultbuf);
resetPQExpBuffer(dispatchResult->error_message);
} /* cdbdisp_resetResult */
/* Take note of an error.
* 'errcode' is the ERRCODE_xxx value for setting the client's SQLSTATE.
* NB: This can be called from a dispatcher thread, so it must not use
* palloc/pfree or elog/ereport because they are not thread safe.
*/
void
cdbdisp_seterrcode(int errcode, /* ERRCODE_xxx or 0 */
int resultIndex, /* -1 if no PGresult */
CdbDispatchResult *dispatchResult)
{
CdbDispatchResults *meleeResults = dispatchResult->meleeResults;
/* Was the command canceled? */
if (errcode == ERRCODE_GP_OPERATION_CANCELED ||
errcode == ERRCODE_QUERY_CANCELED)
dispatchResult->wasCanceled = true;
/* If this is the first error from this QE, save the error code
* and the index of the PGresult buffer entry. We assume the
* caller has not yet added the item to the PGresult buffer.
*/
if (!dispatchResult->errcode)
{
dispatchResult->errcode =
(errcode == 0) ? ERRCODE_INTERNAL_ERROR : errcode;
if (resultIndex >= 0)
dispatchResult->errindex = resultIndex;
}
if (!meleeResults)
return;
/*
* Remember which QE reported an error first among the gangs,
* but keep quiet about cancellation done at our request.
*
* Interconnection errors are given lower precedence because often
* they are secondary to an earlier and more interesting error.
*/
if (errcode == ERRCODE_GP_OPERATION_CANCELED &&
dispatchResult->requestedCancel)
{} /* nop */
else if (meleeResults->errcode == 0 ||
(meleeResults->errcode == ERRCODE_GP_INTERCONNECTION_ERROR &&
errcode != ERRCODE_GP_INTERCONNECTION_ERROR))
{
pthread_mutex_lock(&setErrcodeMutex);
if ((errcode != 0) && (meleeResults->errcode == 0 ||
(meleeResults->errcode == ERRCODE_GP_INTERCONNECTION_ERROR &&
errcode != ERRCODE_GP_INTERCONNECTION_ERROR)))
{
meleeResults->errcode = errcode;
meleeResults->iFirstError = dispatchResult->meleeIndex;
}
pthread_mutex_unlock(&setErrcodeMutex);
}
} /* cdbdisp_seterrcode */
/* Transfer connection error messages to dispatchResult from segdbDesc. */
bool /* returns true if segdbDesc had err info */
cdbdisp_mergeConnectionErrors(CdbDispatchResult *dispatchResult,
struct SegmentDatabaseDescriptor *segdbDesc)
{
if (!segdbDesc)
return false;
if (segdbDesc->errcode == 0 &&
segdbDesc->error_message.len == 0)
return false;
/* Error code should always be accompanied by text and vice-versa. */
Assert(segdbDesc->errcode != 0 && segdbDesc->error_message.len > 0);
/* Append error message text and save error code. */
cdbdisp_appendMessage(dispatchResult, 0, segdbDesc->errcode, "%s",
segdbDesc->error_message.data);
/* Reset connection object's error info. */
segdbDesc->errcode = 0;
segdbDesc->error_message.len = 0;
segdbDesc->error_message.data[0] = '\0';
return true;
} /* cdbdisp_mergeConnectionErrors */
/* Format a message, printf-style, and append to the error_message buffer.
* Also write it to stderr if logging is enabled for messages of the
* given severity level 'elevel' (for example, DEBUG1; or 0 to suppress).
* 'errcode' is the ERRCODE_xxx value for setting the client's SQLSTATE.
* NB: This can be called from a dispatcher thread, so it must not use
* palloc/pfree or elog/ereport because they are not thread safe.
*/
void
cdbdisp_appendMessage(CdbDispatchResult *dispatchResult,
int elevel,
int errcode,
const char *fmt,
...)
{
va_list args;
int msgoff;
/* Remember first error. */
cdbdisp_seterrcode(errcode, -1, dispatchResult);
/* Allocate buffer if first message.
* Insert newline between previous message and new one.
*/
if (!dispatchResult->error_message)
{
dispatchResult->error_message = createPQExpBuffer();
if (PQExpBufferBroken(dispatchResult->error_message))
{
destroyPQExpBuffer(dispatchResult->error_message);
dispatchResult->error_message = NULL;
write_log("cdbdisp_appendMessage: allocation failed, can't save error-message.");
return;
}
}
else
oneTrailingNewlinePQ(dispatchResult->error_message);
msgoff = dispatchResult->error_message->len;
/* Format the message and append it to the buffer. */
va_start(args, fmt);
appendPQExpBufferVA(dispatchResult->error_message, fmt, args);
va_end(args);
/* Display the message on stderr for debugging, if requested.
* This helps to clarify the actual timing of threaded events.
*/
if (elevel >= log_min_messages)
{
oneTrailingNewlinePQ(dispatchResult->error_message);
write_log("%s", dispatchResult->error_message->data + msgoff);
}
/* In case the caller wants to hand the buffer to ereport(),
* follow the ereport() convention of not ending with a newline.
*/
noTrailingNewlinePQ(dispatchResult->error_message);
} /* cdbdisp_appendMessage */
/* Store a PGresult object ptr in the result buffer.
* NB: Caller must not PQclear() the PGresult object.
*/
void
cdbdisp_appendResult(CdbDispatchResult *dispatchResult,
struct pg_result *res)
{
Assert(dispatchResult && res);
/* Attach the QE identification string to the PGresult */
if (dispatchResult->segdbDesc &&
dispatchResult->segdbDesc->whoami)
pqSaveMessageField(res, PG_DIAG_GP_PROCESS_TAG,
dispatchResult->segdbDesc->whoami);
appendBinaryPQExpBuffer(dispatchResult->resultbuf, (char *)&res, sizeof(res));
} /* cdbdisp_appendResult */
/* Return the i'th PGresult object ptr (if i >= 0), or
* the n+i'th one (if i < 0), or NULL (if i out of bounds).
* NB: Caller must not PQclear() the PGresult object.
*/
struct pg_result *
cdbdisp_getPGresult(CdbDispatchResult *dispatchResult, int i)
{
if (dispatchResult)
{
PQExpBuffer buf = dispatchResult->resultbuf;
PGresult **begp = (PGresult **)buf->data;
PGresult **endp = (PGresult **)(buf->data + buf->len);
PGresult **p = (i >= 0) ? &begp[i] : &endp[i];
if (p >= begp && p < endp)
{
Assert(*p != NULL);
return *p;
}
}
return NULL;
} /* cdbdisp_getPGresult */
/* Return the number of PGresult objects in the result buffer. */
int
cdbdisp_numPGresult(CdbDispatchResult *dispatchResult)
{
return dispatchResult ? dispatchResult->resultbuf->len / sizeof(PGresult *)
: 0;
} /* cdbdisp_numPGresult */
/* Remove all of the PGresult ptrs from a CdbDispatchResult object
* and place them into an array provided by the caller. The caller
* becomes responsible for PQclear()ing them. Returns the number of
* PGresult ptrs placed in the array.
*/
int
cdbdisp_snatchPGresults(CdbDispatchResult *dispatchResult,
struct pg_result **pgresultptrs,
int maxresults)
{
PQExpBuffer buf = dispatchResult->resultbuf;
PGresult **begp = (PGresult **)buf->data;
PGresult **endp = (PGresult **)(buf->data + buf->len);
PGresult **p;
int nresults = 0;
/* Snatch the PGresult objects. */
for (p = begp; p < endp; ++p)
{
Assert(*p != NULL);
Assert(nresults < maxresults);
pgresultptrs[nresults++] = *p;
*p = NULL;
}
/* Empty our PGresult array. */
resetPQExpBuffer(buf);
dispatchResult->errindex = -1;
dispatchResult->okindex = -1;
return nresults;
} /* cdbdisp_snatchPGresults */
/* Display a CdbDispatchResult in the log for debugging.
* Call only from main thread, during or after cdbdisp_checkDispatchResults.
*/
void
cdbdisp_debugDispatchResult(CdbDispatchResult *dispatchResult,
int elevel_error,
int elevel_success)
{
char esqlstate[8];
int ires;
int nres;
/* Skip if user has messages turned off. */
if (elevel_error < log_min_messages &&
elevel_success < log_min_messages)
return;
if (dispatchResult == NULL)
return;
/* PGresult messages */
nres = cdbdisp_numPGresult(dispatchResult);
for (ires = 0; ires < nres; ++ires)
{ /* for each PGresult */
PGresult *pgresult = cdbdisp_getPGresult(dispatchResult, ires);
ExecStatusType resultStatus = PQresultStatus(pgresult);
char *whoami = PQresultErrorField(pgresult, PG_DIAG_GP_PROCESS_TAG);
if (!whoami)
whoami = "no process id";
/* QE success */
if (resultStatus == PGRES_COMMAND_OK ||
resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COPY_IN ||
resultStatus == PGRES_COPY_OUT ||
resultStatus == PGRES_EMPTY_QUERY)
{
char *cmdStatus = PQcmdStatus(pgresult);
elog(elevel_success, "DispatchResult: ok %s (%s)",
cmdStatus ? cmdStatus : "(no cmdStatus)",
whoami);
}
/* QE error or libpq error */
else
{
char *sqlstate = PQresultErrorField(pgresult, PG_DIAG_SQLSTATE);
char *pri = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_PRIMARY);
char *dtl = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_DETAIL);
int lenpri = (pri == NULL)? 0 :strlen(pri);
if (!sqlstate)
sqlstate = "no SQLSTATE";
while (lenpri > 0 &&
pri[lenpri-1] <= ' ' &&
pri[lenpri-1] > '\0')
lenpri--;
ereport(elevel_error, (
errmsg("DispatchResult: (%s) %s %.*s (%s)",
sqlstate,
PQresStatus(PQresultStatus(pgresult)),
lenpri,
pri ? pri : "",
whoami),
errdetail("(%s:%s) %s",
PQresultErrorField(pgresult, PG_DIAG_SOURCE_FILE),
PQresultErrorField(pgresult, PG_DIAG_SOURCE_LINE),
dtl ? dtl : "") ));
}
} /* for each PGresult */
/* Error found on our side of the libpq interface? */
if (dispatchResult->error_message &&
dispatchResult->error_message->len > 0)
{
cdbdisp_errcode_to_sqlstate(dispatchResult->errcode, esqlstate);
elog(elevel_error, "DispatchResult: (%s) %s",
esqlstate,
dispatchResult->error_message->data);
}
/* Connection error? */
if (dispatchResult->segdbDesc &&
dispatchResult->segdbDesc->error_message.len > 0)
{
cdbdisp_errcode_to_sqlstate(dispatchResult->segdbDesc->errcode, esqlstate);
elog(elevel_error, "DispatchResult: (%s) %s",
esqlstate,
dispatchResult->segdbDesc->error_message.data);
}
/* Should have either an error code or an ok result. */
if (!dispatchResult->errcode &&
dispatchResult->okindex < 0)
{
elog(elevel_error, "DispatchResult: No ending status from %s",
dispatchResult->segdbDesc ? dispatchResult->segdbDesc->whoami
: "?");
}
} /* cdbdisp_debugDispatchResult */
/*
* Format a CdbDispatchResult into a StringInfo buffer provided by caller.
* If verbose = true, reports all info; else reports at most one error.
*/
void
cdbdisp_dumpDispatchResult(CdbDispatchResult *dispatchResult,
bool verbose,
struct StringInfoData *buf)
{
SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
int ires;
int nres;
if (!dispatchResult ||
!buf)
return;
/* Format PGresult messages */
nres = cdbdisp_numPGresult(dispatchResult);
for (ires = 0; ires < nres; ++ires)
{ /* for each PGresult */
PGresult *pgresult = cdbdisp_getPGresult(dispatchResult, ires);
ExecStatusType resultStatus = PQresultStatus(pgresult);
char *whoami = PQresultErrorField(pgresult, PG_DIAG_GP_PROCESS_TAG);
/* QE success */
if (resultStatus == PGRES_COMMAND_OK ||
resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COPY_IN ||
resultStatus == PGRES_COPY_OUT ||
resultStatus == PGRES_EMPTY_QUERY)
{
if (verbose)
{
char *cmdStatus = PQcmdStatus(pgresult);
oneTrailingNewline(buf);
appendStringInfo(buf, "ok: %s", cmdStatus ? cmdStatus : "");
if (whoami)
appendStringInfo(buf," (%s)", whoami);
}
}
/* QE error or libpq error */
else
{
char *pri = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_PRIMARY);
char *dtl = PQresultErrorField(pgresult, PG_DIAG_MESSAGE_DETAIL);
char *ctx = PQresultErrorField(pgresult, PG_DIAG_CONTEXT);
oneTrailingNewline(buf);
if (pri)
{
appendStringInfoString(buf, pri);
}
else
{
elog(LOG,"No primary message?");
appendStringInfoString(buf,PQresultErrorMessage(pgresult));
}
if (whoami)
{
noTrailingNewline(buf);
appendStringInfo(buf," (%s)", whoami);
}
if (dtl)
{
oneTrailingNewline(buf);
appendStringInfo(buf, "%s", dtl);
}
if (ctx)
{
oneTrailingNewline(buf);
appendStringInfo(buf, "%s", ctx);
}
if (!verbose)
goto done;
}
} /* for each PGresult */
/* segdbDesc error message shouldn't be possible here, but check anyway.
* Ordinarily dispatchResult->segdbDesc is NULL here because we are
* called after it has been cleared by CdbCheckDispatchResult().
*/
if (dispatchResult->segdbDesc)
cdbdisp_mergeConnectionErrors(dispatchResult, segdbDesc);
/* Error found on our side of the libpq interface? */
if (dispatchResult->error_message &&
dispatchResult->error_message->len > 0)
{
oneTrailingNewline(buf);
appendStringInfoString(buf, dispatchResult->error_message->data);
if (!verbose)
goto done;
}
done:
noTrailingNewline(buf);
} /* cdbdisp_dumpDispatchResult */
/*--------------------------------------------------------------------*/
/*
* cdbdisp_makeDispatchResults:
* Allocates a CdbDispatchResults object in the current memory context.
* The caller is responsible for calling DestroyCdbDispatchResults on the returned
* pointer when done using it.
*/
CdbDispatchResults *
cdbdisp_makeDispatchResults(int resultCapacity,
int sliceCapacity,
bool cancelOnError)
{
CdbDispatchResults *results = (CdbDispatchResults*) palloc0(sizeof(CdbDispatchResults));
int nbytes = resultCapacity * sizeof(CdbDispatchResult);
results->resultArray = (CdbDispatchResult*) palloc0(nbytes);
results->resultCapacity = resultCapacity;
results->resultCount = 0;
results->iFirstError = -1;
results->errcode = 0;
results->cancelOnError = cancelOnError;
results->sliceMap = NULL;
results->sliceCapacity = sliceCapacity;
if (sliceCapacity > 0)
{
nbytes = sliceCapacity * sizeof(CdbDispatchResults_SliceInfo);
results->sliceMap = (CdbDispatchResults_SliceInfo*) palloc0(nbytes);
}
return results;
} /* cdbdisp_makeDispatchResults */
/*
* cdbdisp_destroyDispatchResults:
* Frees all memory allocated in CdbDispatchResults struct.
*/
void
cdbdisp_destroyDispatchResults(CdbDispatchResults * results)
{
if (!results)
return;
if (results->resultArray)
{
int i;
for (i = 0; i < results->resultCount; i++)
{
cdbdisp_termResult(&results->resultArray[i]);
}
pfree(results->resultArray);
results->resultArray = NULL;
}
if (results->sliceMap)
{
pfree(results->sliceMap);
results->sliceMap = NULL;
}
pfree(results);
} /* cdbdisp_destroyDispatchResults */
/* Format a CdbDispatchResults object.
* Appends error messages to caller's StringInfo buffer.
* Returns ERRCODE_xxx if some error was found, or 0 if no errors.
* Before calling this function, you must call CdbCheckDispatchResult().
*/
int
cdbdisp_dumpDispatchResults(struct CdbDispatchResults *meleeResults,
struct StringInfoData *buffer,
bool verbose)
{
CdbDispatchResult *dispatchResult;
/* Quick exit if no error (not counting ERRCODE_GP_OPERATION_CANCELED). */
if (!meleeResults ||
!meleeResults->errcode)
return 0;
/* Find the CdbDispatchResult of the first QE that got an error. */
Assert(meleeResults->iFirstError >= 0 &&
meleeResults->iFirstError < meleeResults->resultCount);
dispatchResult = &meleeResults->resultArray[meleeResults->iFirstError];
Assert(dispatchResult->meleeResults == meleeResults &&
dispatchResult->errcode != 0);
/* Format one QE's result. */
cdbdisp_dumpDispatchResult(dispatchResult, verbose, buffer);
/* Optionally, format results from the rest of the QEs that got errors. */
if (verbose)
{
int i;
for (i = 0; i < meleeResults->resultCount; ++i)
{
dispatchResult = &meleeResults->resultArray[i];
if (i == meleeResults->iFirstError)
continue;
if (dispatchResult->errcode)
cdbdisp_dumpDispatchResult(dispatchResult, verbose, buffer);
}
}
return meleeResults->errcode;
} /* cdbdisp_dumpDispatchResults */
/* Return sum of the cmdTuples values from CdbDispatchResult
* entries that have a successful PGresult. If sliceIndex >= 0,
* uses only the results belonging to the specified slice.
*/
int64
cdbdisp_sumCmdTuples(CdbDispatchResults *results, int sliceIndex)
{
CdbDispatchResult *dispatchResult;
CdbDispatchResult *resultEnd = cdbdisp_resultEnd(results, sliceIndex);
PGresult *pgresult;
int64 sum = 0;
for (dispatchResult = cdbdisp_resultBegin(results, sliceIndex);
dispatchResult < resultEnd;
++dispatchResult)
{
pgresult = cdbdisp_getPGresult(dispatchResult, dispatchResult->okindex);
if (pgresult &&
!dispatchResult->errcode)
{
char *cmdTuples = PQcmdTuples(pgresult);
if (cmdTuples)
sum += atoll(cmdTuples);
}
}
return sum;
} /* cdbdisp_sumCmdTuples */
/*
* If several tuples were eliminated/rejected from the result because of
* bad data formatting (this is currenly only possible in external tables
* with single row error handling) - sum up the total rows rejected from
* all QE's and notify the client.
*/
void
cdbdisp_sumRejectedRows(CdbDispatchResults *results)
{
CdbDispatchResult *dispatchResult;
CdbDispatchResult *resultEnd = cdbdisp_resultEnd(results, -1);
PGresult *pgresult;
int totalRejected = 0;
for (dispatchResult = cdbdisp_resultBegin(results, -1);
dispatchResult < resultEnd;
++dispatchResult)
{
pgresult = cdbdisp_getPGresult(dispatchResult, dispatchResult->okindex);
if (pgresult &&
!dispatchResult->errcode)
{
/* add num rows rejected from this QE to the total */
totalRejected += dispatchResult->numrowsrejected;
}
}
if(totalRejected > 0)
ReportSrehResults(NULL, totalRejected);
} /* cdbdisp_sumRejectedRows */
HTAB *
process_aotupcounts(PartitionNode *parts, HTAB *ht,
void *aotupcounts, int naotupcounts)
{
PQaoRelTupCount *ao = (PQaoRelTupCount *)aotupcounts;
if (Debug_appendonly_print_insert)
ereport(LOG,(errmsg("found %d AO tuple counts to process",
naotupcounts)));
if (naotupcounts)
{
int j;
for (j = 0; j < naotupcounts; j++)
{
if (OidIsValid(ao->aorelid))
{
bool found;
PQaoRelTupCount *entry;
if (!ht)
{
HASHCTL ctl;
/* reasonable assumption? */
long num_buckets =
list_length(all_partition_relids(parts));
num_buckets /= num_partition_levels(parts);
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(*entry);
ht = hash_create("AO hash map",
num_buckets,
&ctl,
HASH_ELEM);
}
entry = hash_search(ht,
&(ao->aorelid),
HASH_ENTER,
&found);
if (found)
entry->tupcount += ao->tupcount;
else
entry->tupcount = ao->tupcount;
if (Debug_appendonly_print_insert)
ereport(LOG,(errmsg("processed AO tuple counts for partitioned "
"relation %d. found total " INT64_FORMAT
"tuples", ao->aorelid, entry->tupcount)));
}
ao++;
}
}
return ht;
}
void
cdbdisp_handleModifiedCatalogOnSegments(CdbDispatchResults *results,
void (*handler)(QueryContextDispatchingSendBack sendback))
{
int i;
for (i = 0; i < results->resultCount; ++i)
{
CdbDispatchResult *dispatchResult = &results->resultArray[i];
int nres = cdbdisp_numPGresult(dispatchResult);
int ires;
for (ires = 0; ires < nres; ++ires)
{
/* for each PGresult */
PGresult *pgresult = cdbdisp_getPGresult(dispatchResult, ires);
if (handler && pgresult && pgresult->sendback)
{
int j;
for (j = 0 ; j < pgresult->numSendback ; ++j)
{
handler(&pgresult->sendback[j]);
}
}
}
}
}
void
cdbdisp_iterate_results_sendback(PGresult **results, int numresults,
void (*handler)(QueryContextDispatchingSendBack sendback))
{
int i, j;
if (handler == NULL)
return;
for (i = 0; i < numresults; i++)
{
if (results[i] == NULL || results[i]->sendback == 0)
continue;
for (j = 0; j < results[i]->numSendback; j++)
handler(&results[i]->sendback[j]);
}
}
/*
* sum tuple counts that were added into a partitioned AO table
*/
HTAB *
cdbdisp_sumAoPartTupCount(PartitionNode *parts,
CdbDispatchResults *results)
{
int i;
HTAB *ht = NULL;
if (!parts)
return NULL;
for (i = 0; i < results->resultCount; ++i)
{
CdbDispatchResult *dispatchResult = &results->resultArray[i];
int nres = cdbdisp_numPGresult(dispatchResult);
int ires;
for (ires = 0; ires < nres; ++ires)
{ /* for each PGresult */
PGresult *pgresult = cdbdisp_getPGresult(dispatchResult, ires);
ht = process_aotupcounts(parts, ht, (void *)pgresult->aotupcounts,
pgresult->naotupcounts);
}
}
return ht;
}
/*
* Find the max of the lastOid values returned from the QEs
*/
Oid
cdbdisp_maxLastOid(CdbDispatchResults *results, int sliceIndex)
{
CdbDispatchResult *dispatchResult;
CdbDispatchResult *resultEnd = cdbdisp_resultEnd(results, sliceIndex);
PGresult *pgresult;
Oid oid = InvalidOid;
for (dispatchResult = cdbdisp_resultBegin(results, sliceIndex);
dispatchResult < resultEnd;
++dispatchResult)
{
pgresult = cdbdisp_getPGresult(dispatchResult, dispatchResult->okindex);
if (pgresult &&
!dispatchResult->errcode)
{
Oid tmpoid = PQoidValue(pgresult);
if (tmpoid >oid)
oid = tmpoid;
}
}
return oid;
} /* cdbdisp_maxLastOid */
/* Return ptr to first resultArray entry for a given sliceIndex. */
CdbDispatchResult *
cdbdisp_resultBegin(CdbDispatchResults *results, int sliceIndex)
{
CdbDispatchResults_SliceInfo *si;
if (!results)
return NULL;
if (sliceIndex < 0)
return &results->resultArray[0];
si = &results->sliceMap[sliceIndex];
Assert(sliceIndex < results->sliceCapacity &&
si->resultBegin >= 0 &&
si->resultBegin <= si->resultEnd &&
si->resultEnd <= results->resultCount);
return &results->resultArray[si->resultBegin];
} /* cdbdisp_resultBegin */
/* Return ptr to last+1 resultArray entry for a given sliceIndex. */
CdbDispatchResult *
cdbdisp_resultEnd(CdbDispatchResults *results, int sliceIndex)
{
CdbDispatchResults_SliceInfo *si;
if (!results)
return NULL;
if (sliceIndex < 0)
return &results->resultArray[results->resultCount];
si = &results->sliceMap[sliceIndex];
return &results->resultArray[si->resultEnd];
} /* cdbdisp_resultEnd */
/*--------------------------------------------------------------------*/
/* Convert compact error code (ERRCODE_xxx) to 5-char SQLSTATE string,
* and put it into a 6-char buffer provided by caller.
*/
char * /* returns outbuf+5 */
cdbdisp_errcode_to_sqlstate(int errcode, char outbuf[6])
{
int i;
for (i = 0; i < 5; ++i)
{
outbuf[i] = PGUNSIXBIT(errcode);
errcode >>= 6;
}
outbuf[5] = '\0';
return &outbuf[5];
} /* cdbdisp_errcode_to_sqlstate */
/* Convert SQLSTATE string to compact error code (ERRCODE_xxx). */
int
cdbdisp_sqlstate_to_errcode(const char *sqlstate)
{
return MAKE_SQLSTATE(sqlstate[0], sqlstate[1], sqlstate[2],
sqlstate[3], sqlstate[4]);
} /* cdbdisp_sqlstate_to_errcode */