blob: d4e407fdbb1806586ec10e915e583a552b2e9ca6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <json-c/json.h>
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "c.h"
#include "access/filesplit.h"
#include "utils/builtins.h"
#include "utils/uri.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbfilesystemcredential.h"
#include "storage/cwrapper/hdfs-file-system-c.h"
#include "storage/cwrapper/text-format-c.h"
/* Do the module magic dance */
PG_MODULE_MAGIC
;
PG_FUNCTION_INFO_V1(extfmtcsv_out);
PG_FUNCTION_INFO_V1(extfmtcsv_in);
PG_FUNCTION_INFO_V1(extfmttext_out);
PG_FUNCTION_INFO_V1(extfmttext_in);
Datum extfmtcsv_out(PG_FUNCTION_ARGS);
Datum extfmtcsv_in(PG_FUNCTION_ARGS);
Datum extfmttext_out(PG_FUNCTION_ARGS);
Datum extfmttext_in(PG_FUNCTION_ARGS);
typedef struct FmtUserData
{
TextFormatC *fmt;
char **colNames;
int numberOfColumns;
char **colRawValues;
Datum *colValues;
uint64_t *colValLength;
bool *colIsNulls;
bool *colToReads;
int nSplits;
TextFormatFileSplit *splits;
} FmtUserData;
char externalFmtType = '\0';
char externalFmtNameIn[64];
char externalFmtNameOut[64];
void setExtFormatterTupleDesc(TextFormatC *fmt, TupleDesc tupdesc);
void buildFormatterOptionsInJson(PG_FUNCTION_ARGS, char **jsonStr);
void beginFormatterForRead(PG_FUNCTION_ARGS);
void beginFormatterForWrite(PG_FUNCTION_ARGS);
Datum extfmtcommon_in(PG_FUNCTION_ARGS);
Datum extfmtcommon_out(PG_FUNCTION_ARGS);
Datum extfmtcommon_in(PG_FUNCTION_ARGS)
{
HeapTuple tuple; /* The result tuple to return at last */
TupleDesc tupdesc;
MemoryContext oldMemCtx = NULL;
/* Must be called via the external table format manager */
if (!CALLED_AS_FORMATTER(fcinfo))
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("%s: not called by format manager", externalFmtNameIn)));
/* Check if this is the first time calling the formatter */
if (FORMATTER_GET_MASK(fcinfo) == FMT_UNSET)
{
FORMATTER_GET_MASK(fcinfo) = FMT_SET;
externalFmtType = '\0';
PG_RETURN_VOID() ;
}
if (((FormatterData *) (fcinfo->context))->fmt_splits == NULL)
{
FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_DONE);
PG_RETURN_VOID() ;
}
tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
/* Check if the user data was created ever */
FmtUserData *userData = FORMATTER_GET_USER_CTX(fcinfo);
if (userData == NULL)
{
/* Create user data instance and set in context to keep it */
userData = palloc0(sizeof(FmtUserData));
FORMATTER_SET_USER_CTX(fcinfo, userData);
userData->numberOfColumns = tupdesc->natts;
userData->colNames = palloc0(
sizeof(char *) * userData->numberOfColumns);
userData->colValues = palloc0(
sizeof(Datum) * userData->numberOfColumns);
userData->colIsNulls = palloc0(
sizeof(bool) * userData->numberOfColumns);
userData->colRawValues = palloc0(
sizeof(char *) * userData->numberOfColumns);
userData->colValLength = palloc0(
sizeof(uint64_t) * userData->numberOfColumns);
/* Prepare formatter options */
char *fmtOptions = NULL;
buildFormatterOptionsInJson(fcinfo, &fmtOptions);
/* Create formatter instance */
userData->fmt = TextFormatNewTextFormatC(externalFmtType, fmtOptions);
/* Begin scanning by passing in split and column setting */
beginFormatterForRead(fcinfo);
if (fmtOptions != NULL)
{
pfree(fmtOptions);
}
}
bool lastBatchRow = false;
bool res = TextFormatNextTextFormatC(userData->fmt, userData->colRawValues,
userData->colValLength, userData->colIsNulls, &lastBatchRow);
if (res)
{
MemoryContext m = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
MemoryContext oldcontext = MemoryContextSwitchTo(m);
/* We have one tuple ready */
for (int i = 0; i < userData->numberOfColumns; ++i)
{
if (userData->colIsNulls[i])
{
continue;
}
/* Prepare the tuple to return. */
if (!((FormatterData *) (fcinfo->context))->fmt_needs_transcoding)
{
if (!lastBatchRow)
{
char *val = (char *) (userData->colRawValues[i]);
char oldc = *(val + userData->colValLength[i]);
*(val + userData->colValLength[i]) = '\0';
userData->colValues[i] = InputFunctionCall(
&(FORMATTER_GET_CONVERSION_FUNCS(fcinfo)[i]), val,
FORMATTER_GET_TYPIOPARAMS(fcinfo)[i],
tupdesc->attrs[i]->atttypmod);
*(val + userData->colValLength[i]) = oldc;
}
else
{
char *val = (char *) palloc(userData->colValLength[i] + 1);
memcpy(val, userData->colRawValues[i],
userData->colValLength[i]);
val[userData->colValLength[i]] = '\0';
userData->colValues[i] = InputFunctionCall(
&(FORMATTER_GET_CONVERSION_FUNCS(fcinfo)[i]), val,
FORMATTER_GET_TYPIOPARAMS(fcinfo)[i],
tupdesc->attrs[i]->atttypmod);
}
}
else
{
char *cvt = NULL;
if (!lastBatchRow)
{
char *val = (char *) (userData->colRawValues[i]);
char oldc = *(val + userData->colValLength[i]);
*(val + userData->colValLength[i]) = '\0';
FORMATTER_ENCODE_STRING(fcinfo, val, userData->colValLength[i],
cvt, true); /* is import */
Assert(cvt != NULL);
userData->colValues[i] = InputFunctionCall(
&(FORMATTER_GET_CONVERSION_FUNCS(fcinfo)[i]), cvt,
FORMATTER_GET_TYPIOPARAMS(fcinfo)[i],
tupdesc->attrs[i]->atttypmod);
*(val + userData->colValLength[i]) = oldc;
}
else
{
char *val = (char *) palloc(userData->colValLength[i] + 1);
memcpy(val, userData->colRawValues[i],
userData->colValLength[i]);
val[userData->colValLength[i]] = '\0';
FORMATTER_ENCODE_STRING(fcinfo, val, userData->colValLength[i],
cvt, true); /* is import */
Assert(cvt != NULL);
userData->colValues[i] = InputFunctionCall(
&(FORMATTER_GET_CONVERSION_FUNCS(fcinfo)[i]), cvt,
FORMATTER_GET_TYPIOPARAMS(fcinfo)[i],
tupdesc->attrs[i]->atttypmod);
}
}
}
MemoryContextSwitchTo(oldcontext);
TextFormatCompleteNextTextFormatC(userData->fmt);
tuple = heap_form_tuple(tupdesc, userData->colValues,
userData->colIsNulls);
FORMATTER_SET_TUPLE(fcinfo, tuple);
FORMATTER_RETURN_TUPLE(tuple);
}
else
{
externalFmtType = '\0';
TextFormatCompleteNextTextFormatC(userData->fmt);
/* If there is no error caught, it should be an end of reading split */
TextFormatCatchedError *err = TextFormatGetErrorTextFormatC(
userData->fmt);
if (err->errCode == ERRCODE_SUCCESSFUL_COMPLETION)
{
TextFormatEndTextFormatC(userData->fmt);
err = TextFormatGetErrorTextFormatC(userData->fmt);
if (err->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
elog(ERROR, "%s: failed to get next tuple. %s (%d)",
externalFmtNameIn,
err->errMessage, err->errCode);
}
TextFormatFreeTextFormatC(&(userData->fmt));
pfree(userData->colIsNulls);
pfree(userData->colRawValues);
pfree(userData->colValues);
pfree(userData->colToReads);
pfree(userData->colValLength);
if (userData->splits != NULL)
{
for (int i = 0; i < userData->nSplits; ++i)
{
pfree(userData->splits[i].fileName);
}
pfree(userData->splits);
}
for (int i = 0; i < userData->numberOfColumns; ++i)
{
pfree(userData->colNames[i]);
}
pfree(userData->colNames);
pfree(userData);
FORMATTER_RETURN_NOTIFICATION(fcinfo, FMT_DONE);
}
else
{
elog(ERROR, "%s: failed to get next tuple. %s (%d)",
externalFmtNameIn,
err->errMessage, err->errCode);
}
}
PG_RETURN_VOID() ;
}
/*
* extfmtcsv_in. each time this function is called, it builds one tuple from
* the input data buffer.
*/
Datum extfmtcsv_in(PG_FUNCTION_ARGS)
{
if (externalFmtType == '\0')
{
externalFmtType = TextFormatTypeCSV;
strcpy(externalFmtNameIn, "csv_in");
strcpy(externalFmtNameOut, "csv_out");
}
return extfmtcommon_in(fcinfo);
}
Datum extfmttext_in(PG_FUNCTION_ARGS)
{
if (externalFmtType == '\0')
{
externalFmtType = TextFormatTypeTXT;
strcpy(externalFmtNameIn, "text_in");
strcpy(externalFmtNameOut, "text_out");
}
return extfmtcommon_in(fcinfo);
}
Datum extfmtcommon_out(PG_FUNCTION_ARGS)
{
static char DUMMY[1] = "";
TupleDesc tupdesc = NULL;
HeapTupleData tuple;
/* Must be called via the external table format manager */
if (!CALLED_AS_FORMATTER(fcinfo))
ereport(ERROR,
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), errmsg("%s: not called by format manager", externalFmtNameOut)));
/* Check if this is the first time calling the formatter */
if (FORMATTER_GET_MASK(fcinfo) == FMT_UNSET)
{
FORMATTER_GET_MASK(fcinfo) = FMT_SET;
externalFmtType = '\0';
PG_RETURN_VOID() ;
}
/* Get tuple desc */
tupdesc = FORMATTER_GET_TUPDESC(fcinfo);
/* Get our internal description of the formatter */
FmtUserData *userData = FORMATTER_GET_USER_CTX(fcinfo);
if (userData == NULL)
{
userData = palloc0(sizeof(FmtUserData));
FORMATTER_SET_USER_CTX(fcinfo, userData);
userData->numberOfColumns = tupdesc->natts;
userData->colValues = palloc0(
sizeof(Datum) * userData->numberOfColumns);
userData->colIsNulls = palloc0(
sizeof(bool) * userData->numberOfColumns);
userData->colRawValues = palloc0(
sizeof(char *) * userData->numberOfColumns);
userData->colNames = palloc0(
sizeof(char *) * userData->numberOfColumns);
/* Prepare formatter options */
char *fmtOptions = NULL;
buildFormatterOptionsInJson(fcinfo, &fmtOptions);
/* Create formatter instance */
userData->fmt = TextFormatNewTextFormatC(externalFmtType, fmtOptions);
if (fmtOptions != NULL)
{
pfree(fmtOptions);
}
/* Begin scanning by passing in split and column setting */
beginFormatterForWrite(fcinfo);
}
if (FORMATTER_GET_MASK(fcinfo) & FMT_WRITE_END)
{
externalFmtType = '\0';
TextFormatEndInsertTextFormatC(userData->fmt);
TextFormatCatchedError *err = TextFormatGetErrorTextFormatC(
userData->fmt);
if (err->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
elog(ERROR, "%s: failed to insert: %s(%d)",
externalFmtNameOut,
err->errMessage, err->errCode);
}
TextFormatFreeTextFormatC(&(userData->fmt));
pfree(userData->colIsNulls);
pfree(userData->colRawValues);
pfree(userData->colValues);
for (int i = 0; i < userData->numberOfColumns; ++i)
{
pfree(userData->colNames[i]);
}
pfree(userData->colNames);
pfree(userData);
PG_RETURN_VOID() ;
}
/* break the input tuple into fields */
HeapTupleHeader rec = PG_GETARG_HEAPTUPLEHEADER(0);
tuple.t_len = HeapTupleHeaderGetDatumLength(rec);
ItemPointerSetInvalid(&(tuple.t_self));
tuple.t_data = rec;
heap_deform_tuple(&tuple, tupdesc, userData->colValues,
userData->colIsNulls);
MemoryContext m = FORMATTER_GET_PER_ROW_MEM_CTX(fcinfo);
MemoryContext oldcontext = MemoryContextSwitchTo(m);
/* convert to string */
for (int i = 0; i < userData->numberOfColumns; ++i)
{
userData->colRawValues[i] = DUMMY;
if (userData->colIsNulls[i])
{
continue;
}
/* workaround for preserving float precision */
int bak_extra_float_digits = extra_float_digits;
extra_float_digits = 3;
userData->colRawValues[i] = OutputFunctionCall(
&(FORMATTER_GET_CONVERSION_FUNCS(fcinfo)[i]),
userData->colValues[i]);
extra_float_digits = bak_extra_float_digits;
if (((FormatterData *) (fcinfo->context))->fmt_needs_transcoding)
{
char *cvt = NULL;
FORMATTER_ENCODE_STRING(fcinfo,
(char * )(userData->colRawValues[i]),
strlen(userData->colRawValues[i]), cvt, false); /* is export */
userData->colRawValues[i] = cvt;
}
}
/* pass to formatter to output */
TextFormatInsertTextFormatC(userData->fmt, userData->colRawValues,
userData->colIsNulls);
TextFormatCatchedError *e = TextFormatGetErrorTextFormatC(userData->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
elog(ERROR, "%s: failed to insert: %s(%d)",
externalFmtNameOut,
e->errMessage, e->errCode);
}
MemoryContextSwitchTo(oldcontext);
PG_RETURN_VOID() ;
}
Datum extfmtcsv_out(PG_FUNCTION_ARGS)
{
if (externalFmtType == '\0')
{
externalFmtType = TextFormatTypeCSV;
strcpy(externalFmtNameIn, "csv_in");
strcpy(externalFmtNameOut, "csv_out");
}
return extfmtcommon_out(fcinfo);
}
Datum extfmttext_out(PG_FUNCTION_ARGS)
{
if (externalFmtType == '\0')
{
externalFmtType = TextFormatTypeTXT;
strcpy(externalFmtNameIn, "text_in");
strcpy(externalFmtNameOut, "text_out");
}
return extfmtcommon_out(fcinfo);
}
void buildFormatterOptionsInJson(PG_FUNCTION_ARGS, char **jsonStr)
{
struct json_object *optJsonObject = json_object_new_object();
/* add those predefined */
char *keyStr = NULL;
char *valStr = NULL;
int nArgs = FORMATTER_GET_NUM_ARGS(fcinfo);
for (int i = 1; i <= nArgs; ++i)
{
keyStr = FORMATTER_GET_NTH_ARG_KEY(fcinfo, i);
valStr = FORMATTER_GET_NTH_ARG_VAL(fcinfo, i);
/* convert the delimiter and null to external table encoding */
valStr = pg_do_encoding_conversion(valStr, strlen(valStr),
GetDatabaseEncoding(),
((FormatterData*) fcinfo->context)->fmt_external_encoding);
if (strcmp(keyStr, "reject_limit") == 0)
{
json_object_object_add(optJsonObject, "reject_limit",
json_object_new_int(atoi(valStr)));
}
else if (strcmp(keyStr, "force_notnull") == 0
|| strcmp(keyStr, "force_quote") == 0)
{
/* ext formatter accepts commar splitted column names instead of dot */
int l = strlen(valStr);
for (int i = 0; i < l; ++i)
{
if (valStr[i] == '.')
valStr[i] = ',';
json_object_object_add(optJsonObject, keyStr,
json_object_new_string(valStr));
}
}
else
{
json_object_object_add(optJsonObject, keyStr,
json_object_new_string(valStr));
}
}
/* add default settings for this formatter */
int encoding = ((FormatterData *)fcinfo->context)->fmt_external_encoding;
buildDefaultFormatterOptionsInJson(encoding, externalFmtType, optJsonObject);
*jsonStr = NULL;
if (optJsonObject != NULL)
{
const char *str = json_object_to_json_string(optJsonObject);
*jsonStr = (char *) palloc0(strlen(str) + 1);
strcpy(*jsonStr, str);
json_object_put(optJsonObject);
// jsonStr is already in table encoding, elog needs db encoding
// elog(LOG, "formatter options are %s", *jsonStr);
}
}
void beginFormatterForRead(PG_FUNCTION_ARGS)
{
FmtUserData *userData = FORMATTER_GET_USER_CTX(fcinfo);
FormatterData *fmtData = (FormatterData *) (fcinfo->context);
/* parse URL to get server location etc. */
Uri *uri = ParseExternalTableUri(fmtData->fmt_url);
userData->nSplits = list_length(fmtData->fmt_splits);
userData->splits = palloc0(sizeof(TextFormatFileSplit) * userData->nSplits);
ListCell *cell = NULL;
int i = 0;
foreach(cell, fmtData->fmt_splits)
{
FileSplit origFS = (FileSplit) lfirst(cell);
userData->splits[i].len = origFS->lengths;
userData->splits[i].start = origFS->offsets;
/* build file path containing host address */
int fileNameLen = 7 + // "hdfs://"
(uri->hostname == NULL ? 0 : strlen(uri->hostname)) +
1 + // ':'
5 + // "65535"
(origFS->ext_file_uri_string == NULL ? 0 : strlen(origFS->ext_file_uri_string)) +
1; // '\0'
userData->splits[i].fileName = palloc(fileNameLen * sizeof(char));
sprintf(userData->splits[i].fileName, "hdfs://%s:%d%s",
uri->hostname == NULL ? "" : uri->hostname, uri->port,
origFS->ext_file_uri_string == NULL ? "" : origFS->ext_file_uri_string);
i++;
}
if (enable_secure_filesystem && Gp_role == GP_ROLE_EXECUTE)
{
char *token = find_filesystem_credential_with_uri(fmtData->fmt_url);
SetToken(fmtData->fmt_url, token);
}
FreeExternalTableUri(uri);
userData->colToReads = palloc0(sizeof(bool) * userData->numberOfColumns);
for (int i = 0; i < userData->numberOfColumns; ++i)
{
userData->colToReads[i] = true;
/* 64 is the name type length */
userData->colNames[i] = palloc(sizeof(char) * 64);
strcpy(userData->colNames[i],
fmtData->fmt_relation->rd_att->attrs[i]->attname.data);
}
TextFormatBeginTextFormatC(userData->fmt, userData->splits,
userData->nSplits, userData->colToReads, userData->colNames,
userData->numberOfColumns);
TextFormatCatchedError *e = TextFormatGetErrorTextFormatC(userData->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
elog(ERROR, "%s: failed to begin scan: %s(%d)",
externalFmtNameIn,
e->errMessage, e->errCode);
}
}
void beginFormatterForWrite(PG_FUNCTION_ARGS)
{
FmtUserData *userData = FORMATTER_GET_USER_CTX(fcinfo);
FormatterData *fmtData = (FormatterData *) (fcinfo->context);
/* prepare column names */
for (int i = 0; i < userData->numberOfColumns; ++i)
{
/* 64 is the name type length */
userData->colNames[i] = palloc(sizeof(char) * 64);
strcpy(userData->colNames[i],
fmtData->fmt_relation->rd_att->attrs[i]->attname.data);
}
if (enable_secure_filesystem && Gp_role == GP_ROLE_EXECUTE)
{
char *token = find_filesystem_credential_with_uri(fmtData->fmt_url);
SetToken(fmtData->fmt_url, token);
}
TextFormatBeginInsertTextFormatC(userData->fmt, fmtData->fmt_url,
userData->colNames, userData->numberOfColumns);
TextFormatCatchedError *e = TextFormatGetErrorTextFormatC(userData->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
elog(ERROR, "%s: failed to begin insert: %s(%d)",
externalFmtNameOut,
e->errMessage, e->errCode);
}
}