blob: e26a73628faf0495c7ea56a72bccadbeaa7b117e [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* gpdbwritableformatter.c
*
* This is the GPDB side for serializing and deserializing a tuple to a
* common format which can be parsed/generated by Hadoop's GPDBWritable.
*
* The serialized form produced by gpdbwritableformatter_export is identical to
* GPDBWritable.write(DataOutput out).
*
* The deserialization gpwritableformatter_import can deserialize the
* bytes produced by gpdbwritableformatter_export and GPDBWritable.write.
*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "common.h"
#include "access/formatter.h"
#include "catalog/namespace.h"
#include "catalog/pg_proc.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/typcache.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"
#include <unistd.h>
/* Do the module magic dance */
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(gpdbwritableformatter_export);
PG_FUNCTION_INFO_V1(gpdbwritableformatter_import);
static const int ERR_COL_OFFSET = 9;
static const int FIRST_LINE_NUM = 1;
typedef struct {
/* The Datum/null of the tuple */
Datum *values;
bool *nulls;
int lineno;
/* The export formatted value/len */
char* *outval;
int *outlen;
int *outpadlen; /* padding length for alignment */
/* Buffer to hold the export formatted tuple */
StringInfo export_format_tuple;
/* (Binary) In/Out functions
* For import, it's input function; for export, it's output function.
* If the type is using binary format, the function will the be binary
* conversion function.
*/
FmgrInfo *io_functions;
Oid *typioparams;
} format_t;
/*
* Serialize the object using the following format:
* Total Length | Version | error | #columns | Col type | Col type |... | Null Bit array | Col val...
* 4 byte | 2 byte 1 byte | 2 byte 2 byte 2 byte ceil(#col/8) byte fixed length or var len
*
* For fixed length type, we know the length.
* In the col val, we align pad according to the alignemnt requirement of the type.
* For var length type, the alignment is always 4 byte.
* For var legnth type, col val is <4 byte length><payload val>
*/
#define GPDBWRITABLE_VERSION 2
/* for backward compatibility */
#define GPDBWRITABLE_PREV_VERSION 1
/* Bit flag */
#define GPDBWRITABLE_BITFLAG_ISNULL 1 /* Column is null */
/*
* Write a int4 to the buffer
*/
static void appendIntToBuffer(StringInfo buf, int val)
{
uint32 n32 = htonl((uint32) val);
appendBinaryStringInfo(buf, (char *) &n32, 4);
}
/*
* Read a int from the buffer, given the offset;
* it will return the int value and increase the offset
*/
static int readIntFromBuffer(char* buffer, int *offset)
{
uint32 n32;
memcpy(&n32, &buffer[*offset], sizeof(int));
*offset += 4;
return ntohl(n32);
}
/*
* Write a int2 to the buffer
*/
static void appendInt2ToBuffer(StringInfo buf, uint16 val)
{
uint16 n16 = htons((uint16) val);
appendBinaryStringInfo(buf, (char *) &n16, 2);
}
/*
* Read a int2 from the buffer, given the offset;
* it will return the int value and increase the offset
*/
static uint16 readInt2FromBuffer(char* buffer, int *offset)
{
uint16 n16;
memcpy(&n16, &buffer[*offset], sizeof(uint16));
*offset += 2;
return ntohs(n16);
}
/*
* Write a int1 to the buffer
*/
static void appendInt1ToBuffer(StringInfo buf, uint8 val)
{
unsigned char n8;
n8 = (unsigned char)val;
appendBinaryStringInfo(buf, (char *) &n8, 1);
}
/*
* Read a int1 from the buffer, given the offset;
* it will return the int value and increase the offset
*/
static uint8 readInt1FromBuffer(char* buffer, int *offset)
{
uint8 n8;
memcpy(&n8, &buffer[*offset], sizeof(uint8));
*offset += 1;
return n8;
}
/*
* Tells whether the given type will be formatted as binary or text
*/
static inline bool isBinaryFormatType(Oid typeid)
{
/* For version 1, we support binary format for these type */
return(typeid == BOOLOID ||
typeid == BYTEAOID ||
typeid == FLOAT4OID ||
typeid == FLOAT8OID ||
typeid == INT2OID ||
typeid == INT4OID ||
typeid == INT8OID);
}
/*
* Tells whether the given type is variable length
*/
static inline bool isVariableLength(Oid typeid)
{
return (typeid == BYTEAOID || !isBinaryFormatType(typeid));
}
/*
* Convert typeOID to java enum DBType.ordinal()
*/
static inline int8 getJavaEnumOrdinal(Oid typeid)
{
/* For version 1, we support binary format for these type */
switch (typeid)
{
case INT8OID: return 0;
case BOOLOID: return 1;
case FLOAT8OID: return 2;
case INT4OID: return 3;
case FLOAT4OID: return 4;
case INT2OID: return 5;
case BYTEAOID: return 6;
}
return 7;
}
/*
* Convert java enum DBType.ordinal() to typeOID
*/
static inline Oid getTypeOidFromJavaEnumOrdinal(int8 enumType)
{
switch(enumType)
{
case 0: return INT8OID;
case 1: return BOOLOID;
case 2: return FLOAT8OID;
case 3: return INT4OID;
case 4: return FLOAT4OID;
case 5: return INT2OID;
case 6: return BYTEAOID;
case 7: return TEXTOID;
default: ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("Ill-formatted record: unknown Java Enum Ordinal (%d)", enumType),
errOmitLocation(true)));
}
return 0;
}
/*
* Helper to determine the size of the null byte array
*/
static int getNullByteArraySize(int colCnt)
{
return (colCnt/8) + (colCnt%8 != 0 ? 1:0);
}
/*
* Helper routine to convert boolean array to byte array
*/
static bits8* boolArrayToByteArray(bool* data, int len, int* outlen)
{
int i, j, k;
bits8* result;
*outlen = getNullByteArraySize(len);
result = palloc0(*outlen * sizeof(bits8));
for (i = 0, j = 0, k = 7; i < len; i++) {
result[j] |= (data[i] ? 1 : 0) << k--;
if (k < 0) { j++; k = 7; }
}
return result;
}
/*
* Helper routine to convert byte array to boolean array
* It'll write the output to booldata.
*/
static void byteArrayToBoolArray(bits8* data, int len, bool** booldata, int boollen)
{
int i, j, k;
for(i=0, j=0, k=7; i<boollen; i++)
{
(*booldata)[i] = ((data[j] >> k--) & 0x01) == 1;
if (k < 0) { j++; k = 7; }
}
}
/*
* Verify external table definition matches to input data columns
*/
static void
verifyExternalTableDefinition(int16 ncolumns_remote, AttrNumber ncolumns, TupleDesc tupdesc, char *data_buf, int *bufidx)
{
StringInfoData errMsg;
memset(&errMsg, 0, sizeof(errMsg));
if (ncolumns_remote != ncolumns)
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("input data column count (%d) did not match the external table definition",
ncolumns_remote),
errOmitLocation(true)));
/* Extract Column Type and check against External Table definition */
for(int i = 0; i < ncolumns; i++)
{
Oid input_type = 0;
Oid defined_type = tupdesc->attrs[i]->atttypid;
int8 enumType = readInt1FromBuffer(data_buf, bufidx);
/* Convert enumType to type oid */
input_type = getTypeOidFromJavaEnumOrdinal(enumType);
if ((isBinaryFormatType(defined_type) || isBinaryFormatType(input_type)) &&
input_type != defined_type)
{
if(errMsg.len == 0)
initStringInfo(&errMsg);
char *intype = format_type_be(input_type);
char *deftype = format_type_be(defined_type);
char *attname = NameStr(tupdesc->attrs[i]->attname);
appendStringInfo(&errMsg, "column \"%s\" (type \"%s\", input data type \"%s\"), ",
attname, deftype, intype);
pfree(intype);
pfree(deftype);
}
}
if(errMsg.len > 0)
{
truncateStringInfo(&errMsg, errMsg.len - strlen(", ")); //omit trailing ', '
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("External table definition did not match input data: %s", errMsg.data),
errOmitLocation(true)));
}
}
Datum
gpdbwritableformatter_export(PG_FUNCTION_ARGS)
{
HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
TupleDesc tupdesc;
HeapTupleData tuple;
format_t *myData;
int datlen;
AttrNumber ncolumns = 0;
AttrNumber i;
MemoryContext per_row_ctx, oldcontext;
bits8 *nullBit;
int nullBitLen;
int endpadding;
/* Must be called via the external table format manager */
if (!CALLED_AS_FORMATTER(fcinfo))
ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cannot execute gpdbwritableformatter_export outside format manager"),
errOmitLocation(true)));
tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
/* Get our internal description of the formatter */
ncolumns = tupdesc->natts;
myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo);
/*
* Initialize the context structure
*/
if (myData == NULL)
{
if (FORMATTER_GET_EXTENCODING(fcinfo) != PG_UTF8)
ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("gpdbwritable formatter can only export UTF8 formatted data. Define the external table with ENCODING UTF8"),
errOmitLocation(true)));
myData = palloc(sizeof(format_t));
myData->values = palloc(sizeof(Datum) * ncolumns);
myData->outval = palloc(sizeof(char*) * ncolumns);
myData->nulls = palloc(sizeof(bool) * ncolumns);
myData->outlen = palloc(sizeof(int) * ncolumns);
myData->outpadlen = palloc(sizeof(int) * ncolumns);
myData->io_functions = palloc(sizeof(FmgrInfo) * ncolumns);
myData->export_format_tuple = makeStringInfo();
/* setup the text/binary input function */
for(i = 0; i < ncolumns; i++)
{
Oid type = tupdesc->attrs[i]->atttypid;
bool isvarlena;
Oid functionId;
/* External table do not support dropped columns; error out now */
if (tupdesc->attrs[i]->attisdropped)
ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cannot handle external table with dropped columns"),
errOmitLocation(true)));
/* Get the text/binary "send" function */
if (isBinaryFormatType(type))
getTypeBinaryOutputInfo(type, &(functionId), &isvarlena);
else
getTypeOutputInfo(type, &(functionId), &isvarlena);
fmgr_info(functionId, &(myData->io_functions[i]));
}
FORMATTER_SET_USER_CTX(fcinfo, myData);
}
per_row_ctx = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
oldcontext = MemoryContextSwitchTo(per_row_ctx);
/* break the input tuple into fields */
tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
ItemPointerSetInvalid(&(tuple.t_self));
tuple.t_data = rec;
heap_deform_tuple(&tuple, tupdesc, myData->values, myData->nulls);
/*
* Starting from here. The conversion to bytes is exactly the
* same as GPDBWritable.toBytes()
*/
/*
* Now, compute the total payload and header length
* header = total length (4 byte), Version (2 byte), Error (1 byte), #col (2 byte)
* col type array = #col * 1 byte
* null bit array = ceil(#col/8)
*/
datlen = sizeof(int32)+sizeof(int16)+sizeof(int8)+sizeof(int16);
datlen += ncolumns;
datlen += getNullByteArraySize(ncolumns);
/*
* We need to know the total length of the tuple. So, we've to
* transformed each column so that we know the transformed size
* and the alignment padding.
*
* Since we're computing the conversion function, we use
* per-row memory context inside the loop.
*/
for (i = 0; i < ncolumns; i++)
{
Oid type = tupdesc->attrs[i]->atttypid;
Datum val = myData->values[i];
bool nul = myData->nulls[i];
FmgrInfo *iofunc = &(myData->io_functions[i]);
int alignpadlen = 0;
if (nul)
myData->outlen[i] = 0;
else
{
if (isBinaryFormatType(type))
{
bytea* tmpval = SendFunctionCall(iofunc, val);;
/* NOTE: exclude the header length */
myData->outval[i] = VARDATA(tmpval);
myData->outlen[i] = VARSIZE_ANY_EXHDR(tmpval);
}
else
{
/* NOTE: include the "\0" in the length for text format */
myData->outval[i] = OutputFunctionCall(iofunc, val);
myData->outlen[i] = strlen(myData->outval[i])+1;
}
/* For variable length type, we added a 4 byte length header.
* So, it'll be aligned int4.
* For fixed length type, we'll use the type alignment.
*/
if (isVariableLength(type))
{
alignpadlen = INTALIGN(datlen) - datlen;
datlen += sizeof(int32);
}
else
alignpadlen = att_align(datlen, tupdesc->attrs[i]->attalign) - datlen;
myData->outpadlen[i] = alignpadlen;
datlen += alignpadlen;
}
datlen += myData->outlen[i];
}
/*
* Add the final alignment padding for the next record
*/
endpadding = DOUBLEALIGN(datlen) - datlen;
datlen += endpadding;
/* Now, we're done with per-row computation. Switch back to the
* old memory context
*/
MemoryContextSwitchTo(oldcontext);
/* Resize buffer, if needed
* The new size includes the 4 byte VARHDSZ, the entire payload
* and 1 more byte for '\0' that StringInfo always ends with.
*/
if (myData->export_format_tuple->maxlen < VARHDRSZ+datlen+1)
{
pfree(myData->export_format_tuple->data);
initStringInfoOfSize(myData->export_format_tuple, VARHDRSZ+datlen+1);
}
/* Reset the export format buffer */
resetStringInfo(myData->export_format_tuple);
/* Reserve VARHDRSZ bytes for the bytea length word */
appendStringInfoFill(myData->export_format_tuple, VARHDRSZ, '\0');
/* Construct the packet header */
appendIntToBuffer(myData->export_format_tuple, datlen);
appendInt2ToBuffer(myData->export_format_tuple, GPDBWRITABLE_VERSION);
appendInt1ToBuffer(myData->export_format_tuple, 0); /*error*/
appendInt2ToBuffer(myData->export_format_tuple, ncolumns);
/* Write col type */
for(i = 0; i < ncolumns; i++)
appendInt1ToBuffer(myData->export_format_tuple,
getJavaEnumOrdinal(tupdesc->attrs[i]->atttypid));
/* Write Nullness */
nullBit = boolArrayToByteArray(myData->nulls, ncolumns, &nullBitLen);
appendBinaryStringInfo(myData->export_format_tuple, nullBit, nullBitLen);
/* Column Value */
for(i = 0; i < ncolumns; i++)
{
if (!myData->nulls[i])
{
/* Pad the alignment byte first */
appendStringInfoFill(myData->export_format_tuple, myData->outpadlen[i], '\0');
/* For variable length type, we added a 4 byte length header */
if (isVariableLength(tupdesc->attrs[i]->atttypid))
appendIntToBuffer(myData->export_format_tuple, myData->outlen[i]);
/* Now, write the actual column value */
appendBinaryStringInfo(myData->export_format_tuple,
myData->outval[i], myData->outlen[i]);
}
}
/* End padding */
appendStringInfoFill(myData->export_format_tuple, endpadding, '\0');
Insist(myData->export_format_tuple->len == datlen + VARHDRSZ);
SET_VARSIZE(myData->export_format_tuple->data, datlen + VARHDRSZ);
PG_RETURN_BYTEA_P(myData->export_format_tuple->data);
}
Datum
gpdbwritableformatter_import(PG_FUNCTION_ARGS)
{
HeapTuple tuple;
TupleDesc tupdesc;
MemoryContext per_row_ctx, oldcontext;
format_t *myData;
AttrNumber ncolumns = 0;
AttrNumber i;
char *data_buf;
int data_cur;
int data_len;
int tuplelen = 0;
int bufidx = 0;
int16 version = 0;
int8 error_flag = 0;
int16 ncolumns_remote = 0;
int remaining = 0;
/* Must be called via the external table format manager */
if (!CALLED_AS_FORMATTER(fcinfo))
ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cannot execute gpdbwritableformatter_import outside format manager"),
errOmitLocation(true)));
tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
/* Get our internal description of the formatter */
ncolumns = tupdesc->natts;
myData = (format_t *) FORMATTER_GET_USER_CTX(fcinfo);
/*
* Initialize the context structure
*/
if (myData == NULL)
{
if (FORMATTER_GET_EXTENCODING(fcinfo) != PG_UTF8)
ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("gpdbwritable formatter can only import UTF8 formatted data. Define the external table with ENCODING UTF8"),
errOmitLocation(true)));
myData = palloc(sizeof(format_t));
myData->values = palloc(sizeof(Datum) * ncolumns);
myData->nulls = palloc(sizeof(bool) * ncolumns);
myData->lineno = FIRST_LINE_NUM;
myData->outlen = palloc(sizeof(int) * ncolumns);
myData->typioparams = (Oid *) palloc(ncolumns * sizeof(Oid));
myData->io_functions = palloc(sizeof(FmgrInfo) * ncolumns);
for (i = 0; i < ncolumns; i++)
{
Oid type = tupdesc->attrs[i]->atttypid;
Oid functionId;
/* External table do not support dropped columns; error out now */
if (tupdesc->attrs[i]->attisdropped)
ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("cannot handle external table with dropped columns"),
errOmitLocation(true)));
/* Get the text/binary "receive" function */
if (isBinaryFormatType(type))
getTypeBinaryInputInfo(type, &(functionId), &myData->typioparams[i]);
else
getTypeInputInfo(type, &(functionId), &myData->typioparams[i]);
fmgr_info(functionId, &(myData->io_functions[i]));
}
FORMATTER_SET_USER_CTX(fcinfo, myData);
}
/* get our input data buf and number of valid bytes in it */
data_buf = FORMATTER_GET_DATABUF(fcinfo);
data_len = FORMATTER_GET_DATALEN(fcinfo);
data_cur = FORMATTER_GET_DATACURSOR(fcinfo);
/* =======================================================================
* MAIN FORMATTING CODE
*
*
* ======================================================================= */
/* Get the first 4 byte; That's the length of the entire packet */
remaining = data_len - data_cur;
bufidx = data_cur;
/* NOTE: Unexpected EOF Error Handling
* The first time we noticed an unexpected EOF, we'll set the datacursor
* forward and then raise the error. But then, the framework will still
* call the formatter the function again. Now, the formatter function will
* be provided with a zero length data buffer. In this case, we should not
* raise an error again, but simply retruns "NEED MORE DATA". This is how
* the formatter framework works.
*/
if (remaining == 0 && FORMATTER_GET_SAW_EOF(fcinfo))
FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
if (remaining < 4)
{
if (FORMATTER_GET_SAW_EOF(fcinfo))
{
FORMATTER_SET_BAD_ROW_DATA(fcinfo, data_buf+data_cur, remaining);
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("unexpected end of file"),
errOmitLocation(true)));
}
FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
}
tuplelen = readIntFromBuffer(data_buf, &bufidx);
/* Now, make sure we've received the entire tuple */
if (remaining < tuplelen)
{
if (FORMATTER_GET_SAW_EOF(fcinfo))
{
FORMATTER_SET_BAD_ROW_DATA(fcinfo, data_buf+data_cur, remaining);
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("unexpected end of file"),
errOmitLocation(true)));
}
FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_NEED_MORE_DATA);
}
/* We got here. So, we've the ENTIRE tuple in the buffer */
FORMATTER_SET_BAD_ROW_DATA(fcinfo, data_buf+data_cur, tuplelen);
/* start clean */
MemSet(myData->values, 0, ncolumns * sizeof(Datum));
MemSet(myData->nulls, true, ncolumns * sizeof(bool));
per_row_ctx = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
oldcontext = MemoryContextSwitchTo(per_row_ctx);
/* extract the version, error and column count */
version = readInt2FromBuffer(data_buf, &bufidx);
if ((version != GPDBWRITABLE_VERSION) && (version != GPDBWRITABLE_PREV_VERSION))
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot import data version %d", version),
errOmitLocation(true)));
if(version == GPDBWRITABLE_VERSION)
error_flag = readInt1FromBuffer(data_buf, &bufidx);
if(error_flag)
ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION),
errmsg("%s", data_buf + bufidx + ERR_COL_OFFSET)));
ncolumns_remote = readInt2FromBuffer(data_buf, &bufidx);
/* Verify once on the first row*/
if(FIRST_LINE_NUM == myData->lineno++)
{
verifyExternalTableDefinition(ncolumns_remote, ncolumns, tupdesc, data_buf, &bufidx);
}
/* Skipping the columns' enum types*/
else
{
bufidx += ncolumns;
}
/* Extract null bit array */
{
int nullByteLen = getNullByteArraySize(ncolumns);
bits8* nullByteArray = (bits8*)(data_buf+bufidx);
bufidx += nullByteLen;
byteArrayToBoolArray(nullByteArray, nullByteLen, &myData->nulls, ncolumns);
}
/* extract column value */
for(i = 0; i < ncolumns; i++)
{
if (!myData->nulls[i])
{
FmgrInfo *iofunc = &(myData->io_functions[i]);
/* Skip the alignment padding for variable length type: always align int4 because we're reading a length header.
* we'll get the payload length from the first 4 byte.
*/
if (isVariableLength(tupdesc->attrs[i]->atttypid))
{
bufidx = INTALIGN(bufidx);
myData->outlen[i] = readIntFromBuffer(data_buf, &bufidx);
}
/* Skip the alignment padding for fixed length type: use the type alignment.
* we can use the type length attribute.
*/
else
{
bufidx = att_align(bufidx, tupdesc->attrs[i]->attalign);
myData->outlen[i] = tupdesc->attrs[i]->attlen;
}
if (isBinaryFormatType(tupdesc->attrs[i]->atttypid))
{
StringInfoData tmpbuf;
tmpbuf.data = data_buf+bufidx;
tmpbuf.maxlen = myData->outlen[i];
tmpbuf.len = myData->outlen[i];
tmpbuf.cursor = 0;
myData->values[i] = ReceiveFunctionCall(
iofunc,
&tmpbuf,
myData->typioparams[i],
tupdesc->attrs[i]->atttypmod);
}
else
{
myData->values[i] = InputFunctionCall(
iofunc,
data_buf+bufidx,
myData->typioparams[i],
tupdesc->attrs[i]->atttypmod);
}
bufidx += myData->outlen[i];
}
}
bufidx = DOUBLEALIGN(bufidx);
if(data_cur + tuplelen != bufidx)
ereport(ERROR, (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
errmsg("Tuplelen != bufidx: %d:%d:%d", tuplelen, bufidx, data_cur),
errOmitLocation(true)));
data_cur += tuplelen;
MemoryContextSwitchTo(oldcontext);
FORMATTER_SET_DATACURSOR(fcinfo, data_cur);
tuple = heap_form_tuple(tupdesc, myData->values, myData->nulls);
FORMATTER_SET_TUPLE(fcinfo, tuple);
FORMATTER_RETURN_TUPLE(tuple);
}