blob: 244faca5c921ae5575642ffd80959dd7374066c0 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* chunkfuncs.c
* Test functions for tuple chunk API. These are to be called from SQL
* for testing purposes.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "libpq-fe.h"
#include "access/tupdesc.h"
#include "cdb/cdbvars.h"
#include "cdb/tupchunk.h"
#include "cdb/tupchunklist.h"
#include "cdb/tupser.h"
#include "executor/spi.h"
#include "libpq/pqformat.h"
#include "utils/builtins.h"
#include "cdbtest.h"
#define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp)))
extern int Gp_max_tuple_chunk_size;
typedef struct ChunkedRowState
{
int maxChunkDataSize;
uint16 srcSegIdx;
uint16 tgtSegIdx;
uint16 mnid;
/* The SerTupInfo with cached info in it for serialization. */
SerTupInfo serInfo;
/* Index of row we are on. */
int rowIdx;
/* Count of which chunk in the current row we are on. */
int chunkNum;
/* List of tuple-chunks for the current row. */
TupleChunkList rowChunks;
TupleChunkListItem nextChunk;
} ChunkedRowState;
/* Static state for GetRowsAsChunks function. */
static ChunkedRowState *s_rac_state;
typedef struct DechunkedRowState
{
/* The SerTupInfo with cached info in it for serialization. */
SerTupInfo serInfo;
/* Index of row we are on. */
int rowIdx;
} DechunkedRowState;
/* Static state for GetDechunkedRows function. */
static DechunkedRowState *s_dcr_state;
typedef struct DeserializedRowState
{
/* The SerTupInfo with cached info in it for serialization. */
SerTupInfo serInfo;
/* Index of row we are on. */
int rowIdx;
} DeserializedRowState;
/* Static state for GetDeserializedRows function. */
static DeserializedRowState *s_dr_state;
/* This function executes a SQL "SELECT" statement and returns the results as
* a collection of tuple chunks. The returned rows have the following format:
*
* +------------------+--------------------+---------------------+-------------------+
* | rownum (INTEGER) | chunknum (INTEGER) | chunksize (INTEGER) | chunkdata (BYTEA) |
* +------------------+--------------------+---------------------+-------------------+
*
* This is defined as the "__getchunkedrows" data-type.
*
* Both rownum and chunknum start at 1.
*/
PG_FUNCTION_INFO_V1(GetChunkedRows__text1);
Datum GetChunkedRows__text1(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
char *pszSQL;
int rc;
TupleDesc tupdesc;
HeapTuple htup;
Datum resultValues[4]; /* Parts of the result set. */
char *resultNulls = " "; /* No NULLs in result set. */
Datum resultTuple; /* The full result. */
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/*---------------------*
* One-time operations
*/
/* Get the SQL to execute, and run it. */
pszSQL = GET_STR(PG_GETARG_TEXT_P(0));
rc = SPI_connect();
if (rc != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect() reported error.");
/* Execute read-only SQL. The 0 means "no limit on results." */
rc = SPI_execute(pszSQL, true, 0);
if (rc != SPI_OK_SELECT)
elog(ERROR, "This function requires a SELECT statement.");
/* Allocate cross-call state, so that we can keep track of where we're
* at in the processing.
*/
s_rac_state = (ChunkedRowState *) palloc0(sizeof(ChunkedRowState));
s_rac_state->rowIdx = 0;
s_rac_state->rowChunks = NULL;
s_rac_state->chunkNum = 0;
s_rac_state->maxChunkDataSize = Gp_max_tuple_chunk_size;
InitSerTupInfo(SPI_tuptable->tupdesc, &s_rac_state->serInfo);
/* Generate return-type information. This is derived from the user
* datatype "__getchunkedrows".
*/
tupdesc = RelationNameGetTupleDesc("__getchunkedrows");
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
}
/*---------------------*
* Per-call operations
*/
funcctx = SRF_PERCALL_SETUP();
if (s_rac_state->rowChunks == NULL || s_rac_state->nextChunk == NULL)
{
/* The chunk list is empty, or we have traversed all chunks in the
* current row. Go to the next row, and generate a new list of chunks
* for that row.
*/
if (s_rac_state->rowIdx < SPI_processed)
{
s_rac_state->rowChunks =
SerializeTupleIntoChunks(SPI_tuptable->vals[s_rac_state->rowIdx],
&s_rac_state->serInfo,
Gp_max_tuple_chunk_size);
s_rac_state->nextChunk = s_rac_state->rowChunks->p_first;
s_rac_state->chunkNum = 0;
}
else
{
/* No more rows. */
s_rac_state->rowChunks = NULL;
s_rac_state->nextChunk = NULL;
s_rac_state->chunkNum = 0;
}
}
if (s_rac_state->rowChunks == NULL ||
s_rac_state->rowChunks->p_first == NULL)
{
/* There are no more tuples and no more chunks. We are done. */
/* Do final cleanup. */
SPI_finish();
s_rac_state = NULL; /* Freed when mem-ctx is cleaned up. */
SRF_RETURN_DONE(funcctx);
}
else
{
/* There are more chunks. Return the next chunk as a row. */
StringInfoData serChunkData;
bytea *chunkData;
resultValues[0] = Int32GetDatum(s_rac_state->rowIdx + 1);
resultValues[1] = Int32GetDatum(s_rac_state->chunkNum + 1);
resultValues[2] = Int32GetDatum(s_rac_state->nextChunk->chunk_length);
pq_begintypsend(&serChunkData); /* Does initStringInfo() call. */
pq_sendbytes(&serChunkData, (const char *)s_rac_state->nextChunk->chunk_data,
s_rac_state->nextChunk->chunk_length);
chunkData = pq_endtypsend(&serChunkData);
resultValues[3] = PointerGetDatum(chunkData);
/* Form the tuple. */
htup = heap_formtuple(funcctx->tuple_desc, resultValues, resultNulls);
resultTuple = HeapTupleGetDatum(htup);
/* Go to the next chunk, for the next call to this function. */
s_rac_state->chunkNum++;
s_rac_state->nextChunk = s_rac_state->nextChunk->p_next;
if (s_rac_state->nextChunk == NULL)
s_rac_state->rowIdx++;
/* Return our result! */
SRF_RETURN_NEXT(funcctx, resultTuple);
}
}
/* This function executes a SQL "SELECT" statement that returns rows of
* serialized (but not chunked) tuple data, and returns the results as
* deserialized rows. The input query should produce results of type bytea.
*
* This function returns the RECORD data-type; thus, the actual result format
* must be specified in the SELECT statement, like this:
*
* SELECT * FROM getdeserializedrows('SELECT serialized FROM ser_rows')
* AS r (msgid INTEGER, msgread BOOL, msgtext VARCHAR(2000));
*
* select * from getdeserializedrows(
* 'select serialized from test_ser_rows',
* 'foo'::regtype)
* as foo ( ident integer,
* boolval boolean,
* score integer,
* notes varchar);
*
* This way, the function knows what the TupleDesc should be for the tuple it
* creates.
*/
PG_FUNCTION_INFO_V1(GetDeserializedRows__text1_regtype1);
Datum GetDeserializedRows__text1_regtype1(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
Oid typeOid;
char *pszSQL;
int rc;
HeapTuple htup;
Datum resultTuple; /* The full result. */
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/*---------------------*
* One-time operations
*/
/* Get the SQL to execute, and run it. */
pszSQL = GET_STR(PG_GETARG_TEXT_P(0));
rc = SPI_connect();
if (rc != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect() reported error.");
/* Execute read-only SQL. The 0 means "no limit on results." */
rc = SPI_execute(pszSQL, true, 0);
if (rc != SPI_OK_SELECT)
elog(ERROR, "This function requires a SELECT statement.");
/* The SQL must return rows with a single column of type BYTEA. */
if (SPI_tuptable->tupdesc->natts != 1 ||
SPI_tuptable->tupdesc->attrs[0]->atttypid != BYTEAOID)
{
elog(ERROR, "The SELECT statement result must have exactly one"
" column of type BYTEA.");
}
/* Return-type information must be specified by the caller. */
typeOid = PG_GETARG_OID(1);
funcctx->tuple_desc = TypeGetTupleDesc(typeOid, NULL);
/* Allocate cross-call state, so that we can keep track of where we're
* at in the processing.
*/
s_dr_state = (DeserializedRowState *) palloc0(sizeof(DeserializedRowState));
InitSerTupInfo(funcctx->tuple_desc, &s_dr_state->serInfo);
s_dr_state->rowIdx = 0;
}
/*---------------------*
* Per-call operations
*/
funcctx = SRF_PERCALL_SETUP();
if (s_dr_state->rowIdx < SPI_processed)
{
/* There are more rows. Deserialize the next row and return its
* contents.
*/
Datum datumSerTup;
bytea *baSerTup;
StringInfoData strInfoTup;
bool isNull;
datumSerTup = fastgetattr(SPI_tuptable->vals[s_dr_state->rowIdx], 1,
SPI_tuptable->tupdesc, &isNull);
baSerTup = DatumGetByteaP(datumSerTup);
initStringInfo(&strInfoTup);
appendBinaryStringInfo(&strInfoTup, VARDATA(baSerTup),
VARSIZE(baSerTup) - VARHDRSZ);
/* Deserialize the data stream into a tuple. */
htup = DeserializeTuple(&s_dr_state->serInfo, &strInfoTup);
resultTuple = HeapTupleGetDatum(htup);
/* Move on to the next row in the list, for next iteration. */
s_dr_state->rowIdx++;
/* Return our result! */
SRF_RETURN_NEXT(funcctx, resultTuple);
}
else
{
/* There are no more tuples. We are done. */
/* Do final cleanup. */
SPI_finish();
s_dr_state = NULL; /* Freed when mem-ctx is cleaned up. */
SRF_RETURN_DONE(funcctx);
}
}
/* This function executes a SQL "SELECT" statement that returns rows of
* chunked tuple data, and returns the results as dechunked and deserialized
* rows. The input query should produce results of type bytea.
*
* This function returns the RECORD data-type; thus, the actual result format
* must be specified in the SELECT statement, like this:
*
* SELECT * FROM getdechunkedrows(
* 'SELECT serialized FROM test_ser_rows', 'foo'::regtype)
* AS foo ( ident INTEGER, boolval BOOLEAN,
* score INTEGER, notes VARCHAR );
*
* This way, the function knows what the TupleDesc should be for the tuple it
* creates.
*/
PG_FUNCTION_INFO_V1(GetDechunkedRows__text1_regtype1);
Datum GetDechunkedRows__text1_regtype1(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
MemoryContext oldcontext;
Oid typeOid;
char *pszSQL;
int rc;
HeapTuple htup;
Datum resultTuple; /* The full result. */
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
/* switch to memory context appropriate for multiple function calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
/*---------------------*
* One-time operations
*/
/* Get the SQL to execute, and run it. */
pszSQL = GET_STR(PG_GETARG_TEXT_P(0));
rc = SPI_connect();
if (rc != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect() reported error.");
/* Execute read-only SQL. The 0 means "no limit on results." */
rc = SPI_execute(pszSQL, true, 0);
if (rc != SPI_OK_SELECT)
elog(ERROR, "This function requires a SELECT statement.");
/* The SQL must return rows with a single column of type BYTEA. */
if (SPI_tuptable->tupdesc->natts != 1 ||
SPI_tuptable->tupdesc->attrs[0]->atttypid != BYTEAOID)
{
elog(ERROR, "The SELECT statement result must have exactly one"
" column of type BYTEA.");
}
/* Return-type information must be specified by the caller. */
typeOid = PG_GETARG_OID(1);
funcctx->tuple_desc = TypeGetTupleDesc(typeOid, NULL);
/* Allocate cross-call state, so that we can keep track of where we're
* at in the processing.
*/
s_dcr_state = (DechunkedRowState *) palloc0(sizeof(DechunkedRowState));
s_dcr_state->rowIdx = 0;
InitSerTupInfo(funcctx->tuple_desc, &s_dcr_state->serInfo);
}
/*---------------------*
* Per-call operations
*/
funcctx = SRF_PERCALL_SETUP();
if (s_dcr_state->rowIdx < SPI_processed)
{
/* There are more rows. Gather all the chunks for the next row and
* dechunk them, then the row's contents.
*/
Datum datumChunk;
bytea *baChunk;
bool isNull;
TupleChunkListData tcListData;
TupleChunkListItem tcItem;
TupleChunkType lastChunkType;
/* Initialize the StringInfo to take in the input chunks. */
tcListData.p_first = NULL;
tcListData.p_last = NULL;
tcListData.num_chunks = 0;
do
{
datumChunk = fastgetattr(SPI_tuptable->vals[s_dcr_state->rowIdx],
1, SPI_tuptable->tupdesc, &isNull);
baChunk = DatumGetByteaP(datumChunk);
/* Create a new tuple chunk list item. */
tcItem = palloc0(sizeof(TupleChunkListItemData) +
VARSIZE(baChunk) - VARHDRSZ);
tcItem->chunk_length = VARSIZE(baChunk) - VARHDRSZ;
memcpy(tcItem->chunk_data, VARDATA(baChunk),
tcItem->chunk_length);
appendChunkToTCList(&tcListData, tcItem);
GetChunkType(tcItem, &lastChunkType);
s_dcr_state->rowIdx++;
}
while (lastChunkType != TC_WHOLE && lastChunkType != TC_PARTIAL_END &&
s_dcr_state->rowIdx < SPI_processed);
/* Deserialize the chunk sequence into a tuple. */
htup = CvtChunksToHeapTup(&tcListData, &s_dcr_state->serInfo);
resultTuple = HeapTupleGetDatum(htup);
/* Return our result! */
SRF_RETURN_NEXT(funcctx, resultTuple);
}
else
{
/* There are no more tuples. We are done. */
/* Do final cleanup. */
SPI_finish();
s_dcr_state = NULL; /* Freed when mem-ctx is cleaned up. */
SRF_RETURN_DONE(funcctx);
}
}