blob: 5ccd1d59d468eca5914ff1d20bc53b760ab0e822 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* mppexecplan.c
*
*
* Copyright (c) 2001-2003, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
* is hereby granted, provided that the above copyright notice and this
* paragraph and the following two paragraphs appear in all copies.
*
* IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
* LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
* DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*
*/
#include "postgres.h"
#include <ctype.h>
#include <assert.h>
#include "libpq-fe.h"
#include "libpq-int.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/tupdesc.h"
#include "commands/async.h"
#include "catalog/pg_type.h"
#include "lib/stringinfo.h"
#include "storage/ipc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "executor/spi.h"
#include "executor/spi_priv.h"
#include "nodes/print.h"
#include "cdb/cdbsrlz.h"
#include "cdbexecplan.h"
/* general utility */
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
/*
* cdb_serialize_plan takes 1 argument:
* 1) an SQL statement
* And returns the serialized plan tree.
*/
PG_FUNCTION_INFO_V1(cdb_serialize_plan);
Datum
cdb_serialize_plan(PG_FUNCTION_ARGS)
{
char* pszSQL = NULL;
char* pszOutput = NULL;
bool bPretty = true;
/*
* Grab the current memory context. So we can create the node string in it,
* rather than in the spi context that will go away when we call
* SPI_finish
*/
MemoryContext myContext = CurrentMemoryContext;
MemoryContext oldContext;
pszSQL = GET_STR(PG_GETARG_TEXT_P(0));
bPretty = PG_GETARG_BOOL(1);
if ( SPI_OK_CONNECT != SPI_connect() )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_connect failed in cdb_serialize_plan" )));
}
_SPI_plan* thePlan = (_SPI_plan *)SPI_prepare(pszSQL, 0, NULL);
if ( thePlan == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_prepare failed for sql %s", pszSQL )));
}
if ( list_length(thePlan->ptlist) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_serialize_plan can only handle 1 sql statement, but there were %d", list_length(thePlan->ptlist) )));
}
ListCell *plan_list_item = list_head(thePlan->ptlist);
oldContext = MemoryContextSwitchTo(myContext);
pszOutput = serializeNode(lfirst(plan_list_item), NULL, NULL /*uncompressed_size*/);
MemoryContextSwitchTo(oldContext);
if ( pszOutput == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("TestPlanSerialization failed for sql %s", pszSQL )));
}
SPI_freeplan(thePlan);
SPI_finish();
char *f;
if ( bPretty )
f = pretty_format_node_dump(pszOutput);
else
f = pstrdup(pszOutput);
int lenOutput = strlen(f);
int len = lenOutput + VARHDRSZ;
text* result = (text *) palloc(len);
if ( result == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
/* Set size of result string... */
SET_VARSIZE(result, len);
/* Fill data field of result string... */
char* ptr = VARDATA(result);
memcpy(ptr, f, lenOutput);
pfree(pszOutput);
pfree(f);
pfree(pszSQL);
PG_RETURN_TEXT_P(result);
}
/*
* cdb_serialize_query takes 1 argument:
* 1) an SQL statement
* And returns the serialized query tree.
*/
PG_FUNCTION_INFO_V1(cdb_serialize_query);
Datum
cdb_serialize_query(PG_FUNCTION_ARGS)
{
char* pszOutput = NULL;
char* pszSQL = NULL;
bool bPretty = true;
/*
* Grab the current memory context. So we can create the node string in it,
* rather than in the spi context that will go away when we call
* SPI_finish
*/
MemoryContext myContext = CurrentMemoryContext;
MemoryContext oldContext;
pszSQL = GET_STR(PG_GETARG_TEXT_P(0));
bPretty = PG_GETARG_BOOL(1);
if ( SPI_OK_CONNECT != SPI_connect() )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_connect failed in cdb_serialize_query" )));
}
_SPI_plan* thePlan = (_SPI_plan *)SPI_prepare(pszSQL, 0, NULL);
if ( thePlan == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_prepare failed for sql %s", pszSQL )));
}
List *query_list_list = thePlan->qtlist;
if ( list_length(query_list_list) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_serialize_query can only handle 1 sql statement, but there were %d", list_length(query_list_list) )));
}
ListCell *query_list_list_item;
foreach(query_list_list_item, query_list_list)
{
List *query_list = lfirst(query_list_list_item);
ListCell *query_list_item;
if ( list_length(query_list) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_serialize_query can only handle 1 sql statement, but there were %d", list_length(query_list) )));
}
foreach(query_list_item, query_list)
{
oldContext = MemoryContextSwitchTo(myContext);
pszOutput = serializeNode(lfirst(query_list_item), NULL, NULL /*uncompressed_size*/);
MemoryContextSwitchTo(oldContext);
if ( pszOutput == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("TestQuerySerialization failed for sql %s", pszSQL )));
}
break;
}
break;
}
SPI_freeplan(thePlan);
SPI_finish();
char *f;
if ( bPretty )
f = pretty_format_node_dump(pszOutput);
else
f = pstrdup(pszOutput);
int lenOutput = strlen(f);
int len = lenOutput + VARHDRSZ;
text* result = (text *) palloc(len);
if ( result == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
/* Set size of result string... */
SET_VARSIZE(result, len);
/* Fill data field of result string... */
char* ptr = VARDATA(result);
memcpy(ptr, f, lenOutput);
pfree(pszOutput);
pfree(f);
pfree(pszSQL);
PG_RETURN_TEXT_P(result);
}
typedef struct
{
int index;
int rows;
} QueryInfo;
#if 0 /* Retired for 3.4 -- must be updated w.r.t. PlannedStmt in order to revive. */
/*
* cdb_exec_indirect takes 1 argument:
* 1) an SQL statement
* And returns a rowset of the query results
* resulting from
* a) preparing the statment, and grabbing the plan and query trees
* from this.
* b) serializing both the plan and query trees.
* c) deserializing the plan and query tree strings into plan and query nodes.
* d) executing the query using these plan and query nodes.
* This can be compared against the rowset resulting from executing the query directly.
* If they match, the serialization may be correct. If they don't, it is definitely wrong.
*/
PG_FUNCTION_INFO_V1(cdb_exec_indirect);
Datum
cdb_exec_indirect(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
char* pszSQL = NULL;
char* pszPlan = NULL;
char* pszQuery = NULL;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
pszSQL = GET_STR(PG_GETARG_TEXT_P(0));
if ( SPI_OK_CONNECT != SPI_connect() )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_connect failed in cdb_exec_indirect" )));
}
/*
* Prepare the SQL using SPI
*/
_SPI_plan* thePlan = (_SPI_plan *)SPI_prepare(pszSQL, 0, NULL);
if ( thePlan == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_prepare failed for sql %s", pszSQL )));
}
/*
* Grab the plan tree from the _SPI_plan struct.
* Fail if there is more than 1.
*/
if ( list_length(thePlan->ptlist) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_exec_indirect can only handle 1 sql statement, but there were %d", list_length(thePlan->ptlist) )));
}
ListCell *plan_list_item = list_head(thePlan->ptlist);
/*
* Serialize the plan tree to a string
*/
pszPlan = serializeNode(lfirst(plan_list_item), NULL);
if ( pszPlan == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("serializeNode failed for sql %s", pszSQL )));
}
/*
* Grab the query tree from the _SPI_plan struct.
* Fail if there is more than 1.
*/
List *query_list_list = thePlan->qtlist;
if ( list_length(query_list_list) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_exec_indirect can only handle 1 sql statement, but there were %d", list_length(query_list_list) )));
}
ListCell *query_list_list_item;
foreach(query_list_list_item, query_list_list)
{
List *query_list = lfirst(query_list_list_item);
ListCell *query_list_item;
if ( list_length(query_list) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_exec_indirect can only handle 1 sql statement, but there were %d", list_length(query_list) )));
}
foreach(query_list_item, query_list)
{
/*
* Serialize the query tree to a string
*/
pszQuery = serializeNode(lfirst(query_list_item), NULL);
if ( pszQuery == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("serializeNode failed for sql %s", pszSQL )));
}
break;
}
break;
}
SPI_freeplan(thePlan);
/*
* Now reverse the process: deserialize the plan and query tree strings
* back into nodes.
*/
Plan *pNewPlan = (Plan *)deserializeNode( pszPlan );
Query *pNewQuery = (Query *)deserializeNode( pszQuery );
if ( pNewPlan == NULL || pNewQuery == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("Plan or Query XML deserialization failed in cdb_exec_indirect" )));
}
/*
* Execute through our cdb version of SPI_execute
*/
if ( SPI_OK_SELECT != SPI_execute_cdb( pszSQL, pNewPlan, pNewQuery, false, 0 ) )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_execute_cdb failed in cdb_exec_plan" )));
}
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
funcctx->tuple_desc = BlessTupleDesc(SPI_tuptable->tupdesc);
/* Allocate cross-call state, so that we can keep track of where we're
* at in the processing.
*/
QueryInfo *query_block = (QueryInfo *) palloc0( sizeof(QueryInfo) );
funcctx->user_fctx = (int *)query_block;
query_block->index = 0;
query_block->rows = SPI_processed;
MemoryContextSwitchTo(oldcontext);
}
/*---------------------*
* Per-call operations
*/
funcctx = SRF_PERCALL_SETUP();
QueryInfo *query_block = (QueryInfo *)funcctx->user_fctx;
if ( query_block->index < query_block->rows )
{
/*
* Get heaptuple from SPI, then deform it, and reform it using
* our tuple desc.
* If we don't do this, but rather try to pass the tuple from SPI
* directly back, we get an error because
* the tuple desc that is associated with the SPI call
* has not been blessed.
*/
HeapTuple tuple = SPI_tuptable->vals[query_block->index++];
TupleDesc tupleDesc = funcctx->tuple_desc;
Datum *values = (Datum *) palloc(tupleDesc->natts * sizeof(Datum));
char *nulls = (char *) palloc(tupleDesc->natts * sizeof(char));
heap_deformtuple(tuple, tupleDesc, values, nulls);
HeapTuple res = heap_formtuple(tupleDesc, values, nulls );
pfree(values);
pfree(nulls);
/* make the tuple into a datum */
Datum result = HeapTupleGetDatum(res);
SRF_RETURN_NEXT(funcctx, result);
}
/*
* do when there is no more left
*/
pfree(query_block);
SPI_finish();
funcctx->user_fctx = NULL;
SRF_RETURN_DONE(funcctx);
}
#endif
PG_FUNCTION_INFO_V1(cdb_test_mppexec);
Datum
cdb_test_mppexec(PG_FUNCTION_ARGS)
{
char* pszSQL = NULL;
char* pszPlan = NULL;
char* pszQuery = NULL;
pszSQL = GET_STR(PG_GETARG_TEXT_P(0));
if ( SPI_OK_CONNECT != SPI_connect() )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_connect failed in cdb_test_mppexec" )));
}
/*
* Prepare the SQL using SPI
*/
_SPI_plan* thePlan = (_SPI_plan *)SPI_prepare(pszSQL, 0, NULL);
if ( thePlan == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_prepare failed for sql %s", pszSQL )));
}
/*
* Grab the plan tree from the _SPI_plan struct.
* Fail if there is more than 1.
*/
if ( list_length(thePlan->ptlist) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_test_mppexec can only handle 1 sql statement, but there were %d", list_length(thePlan->ptlist) )));
}
ListCell *plan_list_item = list_head(thePlan->ptlist);
/*
* Serialize the plan tree to a string
*/
pszPlan = serializeNode(lfirst(plan_list_item), NULL, NULL /*uncompressed_size*/);
if ( pszPlan == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("serializeNode failed for sql %s", pszSQL )));
}
/*
* Grab the query tree from the _SPI_plan struct.
* Fail if there is more than 1.
*/
List *query_list_list = thePlan->qtlist;
if ( list_length(query_list_list) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_test_mppexec can only handle 1 sql statement, but there were %d", list_length(query_list_list) )));
}
ListCell *query_list_list_item;
foreach(query_list_list_item, query_list_list)
{
List *query_list = lfirst(query_list_list_item);
ListCell *query_list_item;
if ( list_length(query_list) != 1 )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cdb_test_mppexec can only handle 1 sql statement, but there were %d", list_length(query_list) )));
}
foreach(query_list_item, query_list)
{
/*
* Serialize the query tree to a string
*/
pszQuery = serializeNode(lfirst(query_list_item), NULL, NULL /*uncompressed_size*/);
if ( pszQuery == NULL )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("serializeNode failed for sql %s", pszSQL )));
}
break;
}
break;
}
int x;
StringInfoData buffer;
initStringInfo( &buffer );
appendStringInfo( &buffer, "mppexec '%s' '%s'", pszQuery, pszPlan);
if ( SPI_OK_SELECT != SPI_execute( buffer.data, false, 0 ) )
{
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("SPI_execute failed in cdb_test_mppexec" )));
}
bool bRtn = true;
x = SPI_processed;
elog( INFO, "Rows returned in SPI_execute_plan were %d", x );
SPI_finish();
pfree(pszSQL);
PG_RETURN_BOOL( bRtn );
}