blob: b91ec8043fe6cbdeef1116c95a5f63ffe2402d65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <json-c/json.h>
#include "c.h"
#include "port.h"
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "access/extprotocol.h"
#include "access/filesplit.h"
#include "access/fileam.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/plugstorage.h"
#include "access/tupdesc.h"
#include "access/transam.h"
#include "catalog/namespace.h"
#include "catalog/pg_exttable.h"
#include "catalog/pg_attribute.h"
#include "cdb/cdbdatalocality.h"
#include "cdb/cdbhash.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/dbcommands.h"
#include "mb/pg_wchar.h"
#include "nodes/makefuncs.h"
#include "nodes/pg_list.h"
#include "nodes/plannodes.h"
#include "optimizer/newPlanner.h"
#include "parser/parse_type.h"
#include "postmaster/identity.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/elog.h"
#include "utils/fmgroids.h"
#include "utils/formatting.h"
#include "utils/hawq_type_mapping.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/numeric.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/uri.h"
#include "storage/cwrapper/magma-format-c.h"
#include "magma/cwrapper/magma-client-c.h"
#include "univplan/cwrapper/univplan-c.h"
/*
* Do the module magic dance
*/
PG_MODULE_MAGIC;
/*
* Validators for magma protocol in pluggable storage
*/
PG_FUNCTION_INFO_V1(magma_protocol_blocklocation);
PG_FUNCTION_INFO_V1(magma_protocol_tablesize);
PG_FUNCTION_INFO_V1(magma_protocol_databasesize);
PG_FUNCTION_INFO_V1(magma_protocol_validate);
/*
* Validators for magma format in pluggable storage
*/
PG_FUNCTION_INFO_V1(magma_validate_interfaces);
PG_FUNCTION_INFO_V1(magma_validate_options);
PG_FUNCTION_INFO_V1(magma_validate_encodings);
PG_FUNCTION_INFO_V1(magma_validate_datatypes);
/*
* Accessors for magma format in pluggable storage
*/
PG_FUNCTION_INFO_V1(magma_createtable);
PG_FUNCTION_INFO_V1(magma_droptable);
PG_FUNCTION_INFO_V1(magma_beginscan);
PG_FUNCTION_INFO_V1(magma_getnext_init);
PG_FUNCTION_INFO_V1(magma_getnext);
PG_FUNCTION_INFO_V1(magma_rescan);
PG_FUNCTION_INFO_V1(magma_endscan);
PG_FUNCTION_INFO_V1(magma_stopscan);
PG_FUNCTION_INFO_V1(magma_begindelete);
PG_FUNCTION_INFO_V1(magma_delete);
PG_FUNCTION_INFO_V1(magma_enddelete);
PG_FUNCTION_INFO_V1(magma_beginupdate);
PG_FUNCTION_INFO_V1(magma_update);
PG_FUNCTION_INFO_V1(magma_endupdate);
PG_FUNCTION_INFO_V1(magma_insert_init);
PG_FUNCTION_INFO_V1(magma_insert);
PG_FUNCTION_INFO_V1(magma_insert_finish);
/*
* Transaction for magma format
*/
PG_FUNCTION_INFO_V1(magma_transaction);
/*
* Definitions of validators for magma protocol in pluggable storage
*/
Datum magma_protocol_blocklocation(PG_FUNCTION_ARGS);
Datum magma_protocol_validate(PG_FUNCTION_ARGS);
Datum magma_getstatus(PG_FUNCTION_ARGS);
/*
* Definitions of validators for magma format in pluggable storage
*/
Datum magma_validate_interfaces(PG_FUNCTION_ARGS);
Datum magma_validate_options(PG_FUNCTION_ARGS);
Datum magma_validate_encodings(PG_FUNCTION_ARGS);
Datum magma_validate_datatypes(PG_FUNCTION_ARGS);
/*
* Definitions of accessors for magma format in pluggable storage
*/
Datum magma_createtable(PG_FUNCTION_ARGS);
Datum magma_droptable(PG_FUNCTION_ARGS);
Datum magma_beginscan(PG_FUNCTION_ARGS);
Datum magma_getnext_init(PG_FUNCTION_ARGS);
Datum magma_getnext(PG_FUNCTION_ARGS);
Datum magma_rescan(PG_FUNCTION_ARGS);
Datum magma_endscan(PG_FUNCTION_ARGS);
Datum magma_stopscan(PG_FUNCTION_ARGS);
Datum magma_begindelete(PG_FUNCTION_ARGS);
Datum magma_delete(PG_FUNCTION_ARGS);
Datum magma_enddelete(PG_FUNCTION_ARGS);
Datum magma_beginupdate(PG_FUNCTION_ARGS);
Datum magma_update(PG_FUNCTION_ARGS);
Datum magma_endupdate(PG_FUNCTION_ARGS);
Datum magma_insert_init(PG_FUNCTION_ARGS);
Datum magma_insert(PG_FUNCTION_ARGS);
Datum magma_insert_finish(PG_FUNCTION_ARGS);
/*
* Definitions of accessors for magma format index in pluggable storage
*/
Datum magma_createindex(PG_FUNCTION_ARGS);
Datum magma_dropindex(PG_FUNCTION_ARGS);
Datum magma_reindex_index(PG_FUNCTION_ARGS);
/*
* Definition of transaction for magma format
*/
Datum magma_transaction(PG_FUNCTION_ARGS);
typedef struct {
int64_t second;
int64_t nanosecond;
} TimestampType;
typedef struct MagmaTidC {
uint64_t rowid;
uint16_t rangeid;
} MagmaTidC;
typedef struct MagmaFormatUserData {
MagmaFormatC *fmt;
char *dbname;
char *schemaname;
char *tablename;
bool isMagmatp;
int *colIndexes;
bool *colIsNulls;
char **colNames;
int *colDatatypes;
int64_t *colDatatypeMods;
int32_t numberOfColumns;
char **colRawValues;
Datum *colValues;
uint64_t *colValLength;
bool *colToReads;
char *colRawTid;
MagmaTidC colTid;
// for insert/update/delete
TimestampType *colTimestamp;
bool isFirstRescan;
} MagmaFormatUserData;
static MagmaClientC *magma_client_instance;
/*
* Utility functions for magma in pluggable storage
*/
static void init_common_plan_context(CommonPlanContext *ctx);
static void free_common_plan_context(CommonPlanContext *ctx);
static FmgrInfo *get_magma_function(char *formatter_name, char *function_name);
static void get_magma_category_info(char *fmtoptstr, bool *isexternal);
static void get_magma_scan_functions(char *formatter_name,
FileScanDesc file_scan_desc);
static void get_magma_insert_functions(char *formatter_name,
ExternalInsertDesc ext_insert_desc);
static void get_magma_delete_functions(char *formatter_name,
ExternalInsertDesc ext_delete_desc);
static void get_magma_update_functions(char *formatter_name,
ExternalInsertDesc ext_update_desc);
static MagmaFormatC *create_magma_formatter_instance(List *fmt_opts_defelem,
char *serializeSchema,
int serializeSchemaLen,
int fmt_encoding,
char *formatterName,
int rangeNum);
static MagmaClientC *create_magma_client_instance();
static void init_magma_format_user_data_for_read(
TupleDesc tup_desc, MagmaFormatUserData *user_data);
static void init_magma_format_user_data_for_write(
TupleDesc tup_desc, MagmaFormatUserData *user_data, Relation relation);
static void build_options_in_json(char *serializeSchema, int serializeSchemaLen,
List *fmt_opts_defelem, int encoding, int rangeNum,
char *formatterName, char **json_str);
static void build_magma_tuple_descrition_for_read(
Plan *plan, Relation relation, MagmaFormatUserData *user_data, bool skipTid);
static void magma_scan_error_callback(void *arg);
static List *magma_parse_format_string(char *fmtname, char **fmtstr);
static char *magma_strtokx2(const char *s, const char *whitespace,
const char *delim, const char *quote, char escape,
bool e_strings, bool del_quotes, int encoding);
static void magma_strip_quotes(char *source, char quote, char escape,
int encoding);
static void magma_check_result(MagmaClientC **client);
static bool checkUnsupportedDataTypeMagma(int32_t hawqTypeID);
int32_t map_hawq_type_to_magma_type(int32_t hawqTypeID, bool isMagmatp);
char *search_hostname_by_ipaddr(const char *ipaddr);
static void getHostNameByIp(const char *ipaddr, char *hostname);
static void magma_clear(PlugStorage ps, bool clearSlot) {
FileScanDesc fsd = ps->ps_file_scan_desc;
MagmaFormatUserData *user_data = (MagmaFormatUserData *)(fsd->fs_ps_user_data);
TupleTableSlot *slot = ps->ps_tuple_table_slot;
if (user_data->fmt) {
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) {
MagmaFormatEndScanMagmaFormatC(user_data->fmt);
e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
ereport(ERROR, (errcode(e->errCode), errmsg("MAGMA:%s", e->errMessage)));
}
MagmaFormatFreeMagmaFormatC(&(user_data->fmt));
// call getnext should clear data resource
if (clearSlot) {
pfree(user_data->colRawValues);
pfree(user_data->colValues);
pfree(user_data->colToReads);
pfree(user_data->colValLength);
for (int i = 0; i < user_data->numberOfColumns; ++i)
pfree(user_data->colNames[i]);
pfree(user_data->colNames);
pfree(user_data->colDatatypes);
pfree(user_data->colDatatypeMods);
pfree(user_data->colIsNulls);
pfree(user_data);
fsd->fs_ps_user_data = NULL;
ps->ps_has_tuple = false;
slot->PRIVATE_tts_values = NULL;
ExecClearTuple(slot);
}
} else {
ereport(ERROR, (errcode(e->errCode), errmsg("MAGMA:%s", e->errMessage)));
}
}
}
static inline void ConvertTidToCtidAndRangeid(const MagmaTidC tid,
ItemPointerData *ctid,
uint32_t *tts_rangeid) {
// MagmaTidC tidVal = *(MagmaTidC *)DatumGetPointer(tid);
/* put low 48 bits rowid in ctid and high 16 bits rowid in tts_rangeid. */
ctid->ip_blkid.bi_hi = (uint16) (tid.rowid >> 32);
ctid->ip_blkid.bi_lo = (uint16) (tid.rowid >> 16);
ctid->ip_posid = tid.rowid;
*tts_rangeid = ((uint32)(tid.rowid >> 32) & 0xFFFF0000) | (uint32)tid.rangeid;
return;
}
static inline List *SortMagmaFilesByRangeId(List *files, int32_t length) {
List *sortedFiles = list_copy(files);
ListCell *cell;
blocklocation_file *blf;
uint16_t rangeId;
for (int i = 0; i < length; i++) {
cell = list_nth_cell(files, i);
blf = (blocklocation_file *)(cell->data.ptr_value);
Assert(blf->block_num > 0 && blf->locations);
rangeId = blf->locations[0].rangeId;
list_nth_replace(sortedFiles, rangeId, blf);
}
list_free(files);
return sortedFiles;
}
/*
* Get magma node status
*/
Datum magma_getstatus(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
elog(DEBUG1, "magma_getstatus begin");
ExtProtocolMagmaInfo magmadata =
palloc0(sizeof(ExtProtocolMagmaStatusData));
if (magmadata == NULL) {
elog(ERROR, "magma_getstatus: failed to allocate new space");
}
magmadata->type = T_ExtProtocolMagmaStatusData;
fcinfo->resultinfo = magmadata;
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "magma_getstatus failed to connect to magma service");
}
magmadata->magmaNodes = MagmaClientC_GetMagmaStatus(client, &(magmadata->size));
magma_check_result(&client);
elog(DEBUG1, "magma_getstatus end");
PG_RETURN_VOID();
}
/*
* Implementation of blocklocation for magma protocol in pluggable storage
*/
Datum magma_protocol_blocklocation(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
elog(DEBUG3, "magma_protocol_blocklocation begin");
/*
* Step 1. prepare instances
*/
/* Build the result instance and basic properties */
ExtProtocolBlockLocation bldata =
palloc0(sizeof(ExtProtocolBlockLocationData));
if (bldata == NULL) {
elog(ERROR,
"magma_protocol_blocklocation: failed to allocate new space");
}
bldata->type = T_ExtProtocolBlockLocationData;
fcinfo->resultinfo = bldata;
/* Build validator data */
ExtProtocolValidator pvalidator_data =
(ExtProtocolValidator)(fcinfo->context);
List *fmt_opts = pvalidator_data->format_opts;
char *dbname = pvalidator_data->dbname;
char *schemaname = pvalidator_data->schemaname;
char *tablename = pvalidator_data->tablename;
bool useClientCacheDirectly = pvalidator_data->useClientCacheDirectly;
MagmaSnapshot *snapshot = &(pvalidator_data->snapshot);
char *format_str = pstrdup((char *)strVal(linitial(fmt_opts)));
/*
* Step 2. get table schema and range distribution
*/
char *fmt_name = NULL;
List *l = magma_parse_format_string(format_str, &fmt_name);
pfree(format_str);
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to connect to magma service");
}
int16_t tableType = 0;
if (pg_strncasecmp(
fmt_name, MAGMA_STORAGE_TYPE_TP, MAGMA_STORAGE_TYPE_TP_LEN) == 0) {
tableType = MAGMACLIENTC_TABLETYPE_TP;
} else if (pg_strncasecmp(fmt_name, MAGMA_STORAGE_TYPE_AP,
MAGMA_STORAGE_TYPE_AP_LEN) == 0) {
tableType = MAGMACLIENTC_TABLETYPE_AP;
} else {
elog(ERROR,
"magma_get_blocklocation: failed to recognize table format type: [%s]",
fmt_name);
}
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
MagmaClientC_SetupSnapshot(client, snapshot);
MagmaTablePtr table = MagmaClientC_FetchTable(client, useClientCacheDirectly);
magma_check_result(&client);
elog(DEBUG3, "magma_protocol_blocklocation pass fetch table");
/*
* Step 3. map ranges to block locations
*/
bldata->serializeSchemaLen = MagmaClientC_MTGetSerializeSchemaLen(table);
bldata->serializeSchema = palloc0(bldata->serializeSchemaLen);
memcpy(bldata->serializeSchema, MagmaClientC_MTGetSerializeSchema(table),
bldata->serializeSchemaLen);
bldata->files = NIL;
blocklocation_file *blf = NULL;
// build block location files which reference cached range location
MagmaRangeDistPtr rangeDist = MagmaClientC_FetchRangeDist(client);
magma_check_result(&client);
uint32_t rgNum = MagmaClientC_RDGetNumOfRgs(rangeDist);
int32_t totalGroupNum = 0;
elog(DEBUG3, "rg num %d", rgNum);
for ( int rgIndex = 0 ; rgIndex < rgNum ; ++rgIndex ) {
uint32_t rangeNum = MagmaClientC_RDGetNumOfRangesByRg(rangeDist, rgIndex);
elog(DEBUG3, "rangeNum num %d", rangeNum);
for ( int rangeIndex = 0 ; rangeIndex < rangeNum ; ++rangeIndex ) {
// create block location file instance
blocklocation_file *blf = palloc0(sizeof(blocklocation_file));
blf->block_num = 1;
blf->file_uri = NULL; // not used field, set NULL to make it tidy
blf->locations = palloc0(sizeof(BlockLocation));
BlockLocation *bl = &(blf->locations[0]);
MagmaRangePtr rangePtr = MagmaClientC_RDGetRangeByRg(rangeDist,
rgIndex,
rangeIndex);
bl->replicaGroupId = MagmaClientC_RangeGetLeaderRgId(rangePtr);
bl->rangeId = MagmaClientC_RangeGetRangeId(rangePtr);
bl->length = 1; // always one range as one block
bl->offset = 0; // no offet
bl->corrupt = 0; // no corrupt setting
bl->numOfNodes = 1; // we save leader node only
bl->hosts = palloc0(sizeof(char *) * bl->numOfNodes);
bl->names = palloc0(sizeof(char *) * bl->numOfNodes);
bl->topologyPaths = palloc0(sizeof(char *) * bl->numOfNodes);
bl->hosts[0] = search_hostname_by_ipaddr(
MagmaClientC_RangeGetLeaderRgAddress(rangePtr));
bl->names[0] = pstrdup(MagmaClientC_RangeGetLeaderRgFullAddress(rangePtr));
bl->topologyPaths[0] = bl->names[0];
// connect block location file instance to the list
bldata->files = lappend(bldata->files, (void *)blf);
totalGroupNum++;
}
}
bldata->files = SortMagmaFilesByRangeId(bldata->files, totalGroupNum);
/*
* 4. return range locations
*/
elog(DEBUG3, "magma_protocol_blocklocation pass");
PG_RETURN_VOID();
}
/*
* Implementation of tablesize caculation for magma protocol in pluggable storage
*/
Datum magma_protocol_tablesize(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
elog(DEBUG3, "magma_protocol_tablesize begin");
/*
* Step 1. prepare instances
*/
/* Build the result instance and basic properties */
ExtProtocolTableSize tsdata =
palloc0(sizeof(ExtProtocolTableSizeData));
if (tsdata == NULL) {
elog(ERROR,
"magma_protocol_blocklocation: failed to allocate new space");
}
tsdata->type = T_ExtProtocolTableSizeData;
fcinfo->resultinfo = tsdata;
/* Build validator data */
ExtProtocolValidator pvalidator_data =
(ExtProtocolValidator)(fcinfo->context);
List *fmt_opts = pvalidator_data->format_opts;
char *dbname = pvalidator_data->dbname;
char *schemaname = pvalidator_data->schemaname;
char *tablename = pvalidator_data->tablename;
MagmaSnapshot *snapshot = &(pvalidator_data->snapshot);
char *format_str = pstrdup((char *)strVal(linitial(fmt_opts)));
/*
* Step 2. get table size
*/
char *fmt_name = NULL;
List *l = magma_parse_format_string(format_str, &fmt_name);
pfree(format_str);
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to connect to magma service");
}
int16_t tableType = 0;
if (pg_strncasecmp(
fmt_name, MAGMA_STORAGE_TYPE_TP, MAGMA_STORAGE_TYPE_TP_LEN) == 0) {
tableType = MAGMACLIENTC_TABLETYPE_TP;
} else if (pg_strncasecmp(fmt_name, MAGMA_STORAGE_TYPE_AP,
MAGMA_STORAGE_TYPE_AP_LEN) == 0) {
tableType = MAGMACLIENTC_TABLETYPE_AP;
} else {
elog(ERROR,
"magma_get_tablesize: failed to recognize table format type: [%s]",
fmt_name);
}
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
MagmaClientC_SetupSnapshot(client, snapshot);
// set size of table in tp type to zero.
if (tableType == MAGMACLIENTC_TABLETYPE_AP) {
tsdata->tablesize = MagmaClientC_GetTableSize(client);
} else {
tsdata->tablesize = 0;
}
elog(LOG,"table size in magma.c is %llu", tsdata->tablesize);
magma_check_result(&client);
elog(LOG, "magma_protocol_tablesize psss get tablesize.");
elog(DEBUG3, "magma_protocol_tablesize pass");
PG_RETURN_VOID();
}
/*
* Implementation of database calculation for magma protocol in pluggable storage
*/
Datum magma_protocol_databasesize(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
elog(DEBUG3, "magma_protocol_databasesize begin");
/*
* Step 1. prepare instances
*/
/* Build the result instance and basic properties */
ExtProtocolDatabaseSize dbsdata =
palloc0(sizeof(ExtProtocolDatabaseSizeData));
if (dbsdata == NULL) {
elog(ERROR,
"magma_protocol_databasesize: failed to allocate new space");
}
dbsdata->type = T_ExtProtocolDatabaseSizeData;
fcinfo->resultinfo = dbsdata;
/* Build validator data */
ExtProtocolValidator pvalidator_data =
(ExtProtocolValidator)(fcinfo->context);
char *dbname = pvalidator_data->dbname;
MagmaSnapshot *snapshot = &(pvalidator_data->snapshot);
/*
* Step 2. get database size
*/
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to connect to magma service");
}
MagmaClientC_SetupDatabaseInfo(client, dbname);
MagmaClientC_SetupSnapshot(client, snapshot);
dbsdata->dbsize = MagmaClientC_GetDatabaseSize(client);
elog(LOG,"dbsize in magma.c is %llu", dbsdata->dbsize);
magma_check_result(&client);
elog(LOG, "magma_protocol_databasesize psss get databasesize.");
elog(DEBUG3, "magma_protocol_tablesize pass");
PG_RETURN_VOID();
}
/*
* Implementation of validators for magma protocol in pluggable storage
*/
Datum magma_protocol_validate(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
elog(DEBUG3, "magma_protocol_validate begin");
/* Check action to be performed */
ExtProtocolValidatorData *pvalidator_data =
(ExtProtocolValidatorData *)(fcinfo->context);
/* Validate formatter options, url, and create directory in magma */
List *locs = pvalidator_data->url_list;
ListCell *cell;
foreach (cell, locs) {
char *url = (char *)strVal(lfirst(cell));
Uri *uri = ParseExternalTableUri(url);
if (uri == NULL) {
elog(ERROR,
"magma_protocol_validate :"
"invalid URI encountered %s",
url);
}
if (uri->protocol != URI_MAGMA) {
elog(ERROR,
"magma_protocol_validate :"
"invalid URI protocol encountered in %s, "
"magma:// protocol is required",
url);
}
FreeExternalTableUri(uri);
}
elog(DEBUG3, "magma_protocol_validate pass");
PG_RETURN_VOID();
}
/*
* Implementation of validators for magma format in pluggable storage
*/
/*
* void
* magma_validate_interfaces(char *formatName)
*/
Datum magma_validate_interfaces(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorageValidator psv_interface = (PlugStorageValidator)(fcinfo->context);
if (pg_strncasecmp(psv_interface->format_name, "magma",
sizeof("magma") - 1) != 0) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("magma_validate_interfaces : incorrect format name \'%s\'",
psv_interface->format_name)));
}
PG_RETURN_VOID();
}
/*
* void
* magma_validate_options(List *formatOptions,
* char *formatStr,
* bool isWritable)
*/
Datum magma_validate_options(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorageValidator psv = (PlugStorageValidator)(fcinfo->context);
List *format_opts = psv->format_opts;
char *format_str = psv->format_str;
// bool is_writable = psv->is_writable;
char *formatter = NULL;
char *category = NULL;
// char *bucketnum = NULL;
ListCell *opt;
const int maxlen = 8 * 1024 - 1;
int len = 0;
foreach (opt, format_opts) {
DefElem *defel = (DefElem *)lfirst(opt);
char *key = defel->defname;
bool need_free_value = false;
char *val = (char *)defGetString(defel, &need_free_value);
/* check formatter */
if (strncasecmp(key, "formatter", strlen("formatter")) == 0) {
char *formatter_values[] = {"magmaap", "magmatp"};
checkPlugStorageFormatOption(&formatter, key, val, true, 2,
formatter_values);
}
/* check category */
if (strncasecmp(key, "category", strlen("category")) == 0) {
char *category_values[] = {"internal", "external"};
checkPlugStorageFormatOption(&category, key, val, true, 2,
category_values);
}
if (strncasecmp(key, "bucketnum", strlen("bucketnum")) == 0) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("bucketnum of magmatp/magmaap table are not supported by "
"user defined yet"),
errOmitLocation(true)));
}
if (strncasecmp(key, "formatter", strlen("formatter")) &&
strncasecmp(key, "category", strlen("category")) &&
strncasecmp(key, "bucketnum", strlen("bucketnum"))) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("format options for magma table must be formatter"),
errOmitLocation(true)));
}
sprintf((char *)format_str + len, "%s '%s' ", key, val);
len += strlen(key) + strlen(val) + 4;
if (need_free_value) {
pfree(val);
val = NULL;
}
AssertImply(need_free_value, NULL == val);
if (len > maxlen) {
ereport(
ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("format options must be less than %d bytes in size", maxlen),
errOmitLocation(true)));
}
}
if (!formatter) {
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("no formatter function specified"), errOmitLocation(true)));
}
PG_RETURN_VOID();
}
/*
* void
* magma_validate_encodings(char *encodingName)
*/
Datum magma_validate_encodings(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorageValidator psv = (PlugStorageValidator)(fcinfo->context);
char *encoding_name = psv->encoding_name;
if (strncasecmp(encoding_name, "SQL_ASCII", strlen("SQL_ASCII"))) {
ereport(
ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("\"%s\" is not a valid encoding for external table with magma. "
"Encoding for external table with magma must be SQL_ASCII.",
encoding_name),
errOmitLocation(true)));
}
PG_RETURN_VOID();
}
/*
* void
* magma_validate_datatypes(TupleDesc tupDesc)
*/
Datum magma_validate_datatypes(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorageValidator psv = (PlugStorageValidator)(fcinfo->context);
TupleDesc tup_desc = psv->tuple_desc;
for (int i = 0; i < tup_desc->natts; ++i) {
int32_t datatype =
(int32_t)(((Form_pg_attribute)(tup_desc->attrs[i]))->atttypid);
if (checkUnsupportedDataTypeMagma(datatype)) {
ereport(
ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("unsupported data types %s for columns of table with magma "
"format is specified.",
TypeNameToString(makeTypeNameFromOid(datatype, -1))),
errOmitLocation(true)));
}
// for numeric, it must set precisions when create table
if (HAWQ_TYPE_NUMERIC == datatype)
{
// get type modifier
int4 tmp_typmod =
((Form_pg_attribute) (tup_desc->attrs[i]))->atttypmod - VARHDRSZ;
// get precision and scale values
int precision = (tmp_typmod >> 16) & 0xffff;
int scale = tmp_typmod & 0xffff;
if (precision < 1 || 38 < precision){
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("For Magma Format, DECIMAL precision must be between 1 and 38")));
}
if (scale == 0){
ereport(NOTICE,
(errmsg("Using a scale of zero for DECIMAL in Magma Format")));
}
}
}
PG_RETURN_VOID();
}
Datum magma_createindex(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
char *dbname = ps->ps_db_name;
char *schemaname = ps->ps_schema_name;
char *tablename = ps->ps_table_name;
MagmaIndex *magmaidx = &(ps->magma_idx);
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
elog(DEBUG1, "create index use index name:%s, index type:%s,"
" columns counts:%d, key counts:%d, unique:%d, primary:%d",
magmaidx->indexName, magmaidx->indexType, magmaidx->colCount,
magmaidx->keynums, magmaidx->unique, magmaidx->primary);
/* create index in magma */
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to create to magma service when create index.");
}
int16_t tableType = 0;
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
MagmaClientC_SetupSnapshot(client, snapshot);
MagmaClientC_CreateIndex(client, magmaidx);
magma_check_result(&client);
PG_RETURN_VOID();
}
Datum magma_dropindex(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
char *dbname = ps->ps_db_name;
char *schemaname = ps->ps_schema_name;
char *tablename = ps->ps_table_name;
char *indexname = ps->magma_idx.indexName;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
elog(DEBUG1, "drop index use index name:%s,", indexname);
/* drop index in magma */
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to create to magma service when drop index.");
}
int16_t tableType = 0;
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
MagmaClientC_SetupSnapshot(client, snapshot);
MagmaClientC_DropIndex(client, indexname);
magma_check_result(&client);
PG_RETURN_VOID();
}
Datum magma_reindex_index(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
char *dbname = ps->ps_db_name;
char *schemaname = ps->ps_schema_name;
char *tablename = ps->ps_table_name;
char *indexname = ps->magma_idx.indexName;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
elog(DEBUG1, "reindex index use index name:%s,", indexname);
/* reindex index in magma */
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to create to magma service when reindex index.");
}
int16_t tableType = 0;
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
MagmaClientC_SetupSnapshot(client, snapshot);
MagmaClientC_Reindex(client, indexname);
magma_check_result(&client);
PG_RETURN_VOID();
}
/*
* Implementations of accessors for magma format in pluggable storage
*/
/*
* void
* magma_createtable(char *dbname,
* char *schemaname,
* char *tablename,
* List *tableelements,
* IndexStmt *primarykey)
*/
Datum magma_createtable(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
char *dbname = ps->ps_db_name;
char *schemaname = ps->ps_schema_name;
char *tablename = ps->ps_table_name;
char *fmtName = ps->ps_formatter_name;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
List *tableelements = ps->ps_table_elements;
IndexStmt *primarykey = ps->ps_primary_key;
List *distributedkey = ps->ps_distributed_key;
// bool isexternal = ps->ps_is_external;
// List *locations = ps->ps_ext_locations;
/* get primary key */
List *pk_names = NIL;
// process 1 or multi primary keys.
if (primarykey != NULL) {
ListCell *lc;
foreach (lc, primarykey->indexParams) {
IndexElem *idx = (IndexElem *)lfirst(lc);
Assert(IsA(idx, IndexElem));
pk_names = lappend(pk_names, makeString(idx->name));
}
}
/* count number of keys and values of table */
MagmaColumn *cols = NULL;
int ncols = 0;
int nkeys = primarykey == NULL ? 0 : list_length(primarykey->indexParams);
Assert(nkeys == list_length(pk_names));
/* prepare keys and values for table creation */
cols =
(MagmaColumn *)palloc0(sizeof(MagmaColumn) * list_length(tableelements));
ListCell *element;
int16_t tableType = 0;
if (pg_strncasecmp(fmtName, "magmatp", strlen("magmatp")) == 0) {
tableType = 0;
} else if (pg_strncasecmp(fmtName, "magmaap", strlen("magmaap")) == 0) {
tableType = 1;
} else {
elog(ERROR, "magma_createtable: failed to get table format type: [%s]",
fmtName);
}
foreach (element, tableelements) {
ColumnDef *col = (ColumnDef *)(lfirst(element));
MagmaColumn *dcol = NULL;
int pkpos = list_find(pk_names, makeString(col->colname));
int dkpos = list_find(distributedkey, makeString(col->colname));
dcol = &(cols[ncols]);
// TODO(xsheng): get default value from col->raw_default
dcol->defaultValue = "";
dcol->dropped = false;
dcol->primaryKeyIndex = pkpos;
dcol->distKeyIndex = dkpos;
// TODO(xsheng): leave unimplemented sort key index, add it later
dcol->sortKeyIndex = -1;
dcol->id = ncols;
dcol->name = pstrdup(col->colname);
dcol->datatype = LookupTypeName(NULL, col->typname);
dcol->rawTypeMod = col->typname->typmod;
Oid tmpOidVal = dcol->datatype;
dcol->datatype = map_hawq_type_to_magma_type(dcol->datatype, !((bool)tableType));
switch (dcol->datatype) {
case BOOLEANID:
case TINYINTID:
case SMALLINTID:
case INTID:
case BIGINTID:
case FLOATID:
case DOUBLEID:
case TIMESTAMPID:
case DATEID:
case TIMEID: {
dcol->scale1 = 0;
dcol->scale2 = 0;
dcol->isnull = false;
} break;
case JSONBID:
case JSONID:
case BINARYID:
case CHARID:
case VARCHARID:
case STRINGID:
case DECIMALID:
case DECIMALNEWID: {
dcol->scale1 = col->typname->typmod - VARHDRSZ;
dcol->scale2 = 0;
dcol->isnull = false;
} break;
case STRUCTEXID:
case IOBASETYPEID: {
dcol->scale1 = col->typname->typmod - VARHDRSZ;
dcol->scale2 = tmpOidVal; // original oid
dcol->isnull = false;
} break;
case INVALIDTYPEID: {
elog(ERROR, "data type %s is invalid", TypeNameToString(makeTypeNameFromOid(dcol->datatype, -1)));
} break;
default: {
elog(ERROR, "data type %s is not supported yet",
TypeNameToString(makeTypeNameFromOid(dcol->datatype, -1)));
} break;
}
ncols++;
}
assert(ncols == list_length(tableelements));
/* create table in magma */
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to create to magma service");
}
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
MagmaClientC_SetupSnapshot(client, snapshot);
MagmaClientC_CreateTable(client, ncols, cols);
magma_check_result(&client);
pfree(cols);
list_free(pk_names);
PG_RETURN_VOID();
}
/*
* void
* magma_droptable(char *dbname,
* char *schemaname,
* char *tablename)
*/
Datum magma_droptable(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
// ExtTableEntry *ete = ps->ps_exttable;
char *dbname = ps->ps_db_name;
char *schemaname = ps->ps_schema_name;
char *tablename = ps->ps_table_name;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
/* drop table in magma */
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to connect to magma service");
}
int16_t tableType = 0;
// for drop table, tableType won't be used in the process, set it as default
MagmaClientC_SetupTableInfo(client, dbname, schemaname, tablename, tableType);
MagmaClientC_SetupSnapshot(client, snapshot);
MagmaClientC_DropTable(client);
magma_check_result(&client);
PG_RETURN_VOID();
}
Datum magma_beginscan(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalScan *ext_scan = ps->ps_ext_scan;
ScanState *scan_state = ps->ps_scan_state;
Relation relation = ps->ps_relation;
int formatterType = ps->ps_formatter_type;
char *formatterName = ps->ps_formatter_name;
char *serializeSchema = ps->ps_magma_serializeSchema;
int serializeSchemaLen = ps->ps_magma_serializeSchemaLen;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
Index scan_rel_id = ext_scan->scan.scanrelid;
uint32 scan_counter = ext_scan->scancounter;
List *uri_list = ext_scan->uriList;
List *fmt_opts = ext_scan->fmtOpts;
int fmt_encoding = ext_scan->encoding;
Plan *scan_plan = &(ext_scan->scan.plan);
/* Increment relation reference count while scanning relation */
/*
* This is just to make really sure the relcache entry won't go away while
* the scan has a pointer to it. Caller should be holding the rel open
* anyway, so this is redundant in all normal scenarios...
*/
RelationIncrementReferenceCount(relation);
/* Allocate and initialize the select descriptor */
FileScanDesc file_scan_desc = palloc0(sizeof(FileScanDescData));
file_scan_desc->fs_inited = false;
file_scan_desc->fs_ctup.t_data = NULL;
ItemPointerSetInvalid(&file_scan_desc->fs_ctup.t_self);
file_scan_desc->fs_cbuf = InvalidBuffer;
file_scan_desc->fs_rd = relation;
file_scan_desc->fs_scanrelid = scan_rel_id;
file_scan_desc->fs_scancounter = scan_counter;
file_scan_desc->fs_scanquals = scan_plan->qual;
file_scan_desc->fs_noop = false;
file_scan_desc->fs_file = NULL;
file_scan_desc->fs_formatter = NULL;
file_scan_desc->fs_formatter_type = formatterType;
file_scan_desc->fs_formatter_name = formatterName;
file_scan_desc->fs_serializeSchema =
pnstrdup(serializeSchema, serializeSchemaLen);
file_scan_desc->fs_serializeSchemaLen = serializeSchemaLen;
file_scan_desc->fs_ps_magma_splits = ps->ps_magma_splits;
file_scan_desc->fs_ps_magma_skip_tid = ps->ps_magma_skip_tid;
/* Setup scan functions */
get_magma_scan_functions(formatterName, file_scan_desc);
/* Get URI for the scan */
/*
* get the external URI assigned to us.
*
* The URI assigned for this segment is normally in the uriList list
* at the index of this segment id. However, if we are executing on
* MASTER ONLY the (one and only) entry which is destined for the master
* will be at the first entry of the uriList list.
*/
char *uri_str = NULL;
int segindex = GetQEIndex();
Value *v = NULL;
v = (Value *)list_nth(uri_list, 0);
uri_str = (char *)strVal(v);
if (v->type == T_Null)
uri_str = NULL;
else
uri_str = (char *)strVal(v);
/*
* If a uri is assigned to us - get a reference to it. Some executors
* don't have a uri to scan (if # of uri's < # of primary segdbs).
* in which case uri will be NULL. If that's the case for this
* segdb set to no-op.
*/
if (uri_str) {
/* set external source (uri) */
file_scan_desc->fs_uri = uri_str;
elog(DEBUG3, "fs_uri (%d) is set as %s", segindex, uri_str);
/* NOTE: we delay actually opening the data source until external_getnext()
*/
} else {
/* segdb has no work to do. set to no-op */
file_scan_desc->fs_noop = true;
file_scan_desc->fs_uri = NULL;
}
/* Allocate values and nulls structure */
TupleDesc tup_desc = RelationGetDescr(relation);
file_scan_desc->fs_tupDesc = tup_desc;
file_scan_desc->attr = tup_desc->attrs;
file_scan_desc->num_phys_attrs = tup_desc->natts;
file_scan_desc->values =
(Datum *)palloc0(file_scan_desc->num_phys_attrs * sizeof(Datum));
file_scan_desc->nulls =
(bool *)palloc0(file_scan_desc->num_phys_attrs * sizeof(bool));
/* Setup user data */
/* sliceId is no use in there, executor could ensure this */
/* currentSliceId == ps->ps_scan_state->ps.state->currentSliceIdInPlan */
if (AmISegment()) {
/* Initialize user data */
MagmaFormatUserData *user_data = palloc0(sizeof(MagmaFormatUserData));
user_data->isFirstRescan = true;
if (formatterName != NULL &&
(strncasecmp(formatterName, "magmatp", sizeof("magmatp") - 1) == 0)) {
user_data->isMagmatp = true;
} else {
user_data->isMagmatp = false;
}
// special handling for magmatp decimal
if (user_data->isMagmatp) {
file_scan_desc->in_functions = (FmgrInfo *)palloc0(
file_scan_desc->num_phys_attrs * sizeof(FmgrInfo));
file_scan_desc->typioparams =
(Oid *)palloc0(file_scan_desc->num_phys_attrs * sizeof(Oid));
bool hasNumeric = false;
for (int i = 0; i < file_scan_desc->num_phys_attrs; ++i) {
if (file_scan_desc->attr[i]->atttypid != HAWQ_TYPE_NUMERIC) continue;
hasNumeric = true;
getTypeInputInfo(file_scan_desc->attr[i]->atttypid,
&file_scan_desc->in_func_oid,
&file_scan_desc->typioparams[i]);
fmgr_info(file_scan_desc->in_func_oid,
&file_scan_desc->in_functions[i]);
}
/*
* magmatp table support numeric type with old decimal. Numeric related
* function will be called to read numeric column in magmatp table. To
* prevent OutOfMemory, InputFunctionCall() should be wrapped by pre_row_context.
* magmaap table support numeric type with new decimal. So it's unnecessary
* to add the MemoryContext.
*/
if (hasNumeric) {
file_scan_desc->fs_pstate = (CopyStateData *)palloc0(sizeof(CopyStateData));
CopyState pstate = file_scan_desc->fs_pstate;
pstate->fe_eof = false;
pstate->eol_type = EOL_UNKNOWN;
pstate->eol_str = NULL;
pstate->cur_relname = RelationGetRelationName(relation);
pstate->cur_lineno = 0;
pstate->err_loc_type = ROWNUM_ORIGINAL;
pstate->cur_attname = NULL;
pstate->raw_buf_done = true; /* true so we will read data in first run */
pstate->line_done = true;
pstate->bytesread = 0;
pstate->custom = false;
pstate->header_line = false;
pstate->fill_missing = false;
pstate->line_buf_converted = false;
pstate->raw_buf_index = 0;
pstate->processed = 0;
pstate->filename = uri_str;
pstate->copy_dest = COPY_EXTERNAL_SOURCE;
pstate->missing_bytes = 0;
pstate->csv_mode = false;
pstate->custom = true;
pstate->custom_formatter_func = NULL;
pstate->custom_formatter_name = NULL;
pstate->custom_formatter_params = NIL;
pstate->rel = relation;
pstate->client_encoding = PG_UTF8;
pstate->enc_conversion_proc = NULL;
pstate->need_transcoding = false;
pstate->encoding_embeds_ascii =
PG_ENCODING_IS_CLIENT_ONLY(pstate->client_encoding);
pstate->attr_offsets = NULL;
pstate->attnumlist = NULL;
pstate->force_quote = NIL;
pstate->force_quote_flags = NULL;
pstate->force_notnull = NIL;
pstate->force_notnull_flags = NULL;
initStringInfo(&pstate->attribute_buf);
initStringInfo(&pstate->line_buf);
MemSet(pstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char));
pstate->raw_buf[RAW_BUF_SIZE] = '\0';
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype input or output routines, and should be faster than retail
* pfree's anyway.
*/
pstate->rowcontext = AllocSetContextCreate(
CurrentMemoryContext, "ExtTableMemCxt", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
}
} else {
file_scan_desc->in_functions = NULL;
file_scan_desc->typioparams = NULL;
}
/* the number of ranges is dynamic for magma table */
int32_t nRanges = 0;
ListCell *lc_split = NULL;
foreach (lc_split, ps->ps_magma_splits) {
List *split = (List *)lfirst(lc_split);
nRanges += list_length(split);
}
init_magma_format_user_data_for_read(tup_desc, user_data);
/* Create formatter instance */
// ExtTableEntry *ete = GetExtTableEntry(RelationGetRelid(relation));
user_data->fmt = create_magma_formatter_instance(
NIL, serializeSchema, serializeSchemaLen, PG_UTF8, formatterName,
nRanges);
/* Prepare database, schema, and table information */
char *dbname = database;
char *schemaname = getNamespaceNameByOid(RelationGetNamespace(relation));
Assert(schemaname != NULL);
char *tablename = RelationGetRelationName(relation);
MagmaFormatC_SetupTarget(user_data->fmt, dbname, schemaname, tablename);
MagmaFormatC_SetupTupDesc(user_data->fmt, user_data->numberOfColumns,
user_data->colNames, user_data->colDatatypes,
user_data->colDatatypeMods,
user_data->colIsNulls);
/* Build tuple description */
Plan *plan = &(ext_scan->scan.plan);
file_scan_desc->fs_ps_plan = plan;
build_magma_tuple_descrition_for_read(plan, relation, user_data, ps->ps_magma_skip_tid);
/* prepare plan */
CommonPlanContext ctx;
init_common_plan_context(&ctx);
scan_plan->plan_parent_node_id = -1;
convert_extscan_to_common_plan(scan_plan, scan_state->splits, relation,
&ctx);
// elog(DEBUG1, "common plan: %s",
// univPlanGetJsonFormatedPlan(ctx.univplan));
int32_t size = 0;
char *planstr = univPlanSerialize(ctx.univplan, &size, false);
/* Save user data */
file_scan_desc->fs_ps_user_data = (void *)user_data;
/* Begin scan with the formatter */
bool enableShm = (strcasecmp(magma_enable_shm, "ON") == 0);
MagmaFormatBeginScanMagmaFormatC(
user_data->fmt, user_data->colToReads, snapshot, planstr, size,
enableShm, ps->ps_magma_skip_tid, magma_shm_limit_per_block * 1024);
MagmaFormatCatchedError *e =
MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_scan: failed to scan: %s(%d)", e->errMessage,
e->errCode);
}
free_common_plan_context(&ctx);
}
/* Save file_scan_desc */
ps->ps_file_scan_desc = file_scan_desc;
PG_RETURN_POINTER(file_scan_desc);
}
void init_common_plan_context(CommonPlanContext *ctx) {
ctx->univplan = univPlanNewInstance();
ctx->convertible = true;
ctx->base.node = NULL;
ctx->querySelect = false;
ctx->isMagma = true;
ctx->stmt = NULL;
ctx->setDummyTListRef = false;
ctx->scanReadStatsOnly = false;
ctx->parent = NULL;
ctx->exprBufStack = NIL;
ctx->isConvertingIndexQual = false;
ctx->idxColumns = NIL;
}
void free_common_plan_context(CommonPlanContext *ctx) {
univPlanFreeInstance(&ctx->univplan);
}
/*
* ExternalSelectDesc
* magma_getnext_init(PlanState *planState,
* ExternalScanState *extScanState)
*/
Datum magma_getnext_init(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalSelectDesc ext_select_desc = NULL;
/*
ExternalSelectDesc ext_select_desc = (ExternalSelectDesc)palloc0(
sizeof(ExternalSelectDescData));
Plan *rootPlan = NULL;
if (plan_state != NULL)
{
ext_select_desc->projInfo = plan_state->ps_ProjInfo;
// If we have an agg type then our parent is an Agg node
rootPlan = plan_state->state->es_plannedstmt->planTree;
if (IsA(rootPlan, Agg) && ext_scan_state->parent_agg_type)
{
ext_select_desc->agg_type = ext_scan_state->parent_agg_type;
}
}
*/
ps->ps_ext_select_desc = ext_select_desc;
PG_RETURN_POINTER(ext_select_desc);
}
Datum magma_getnext(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
FileScanDesc fsd = ps->ps_file_scan_desc;
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(fsd->fs_ps_user_data);
TupleTableSlot *slot = ps->ps_tuple_table_slot;
bool *nulls = slot_get_isnull(slot);
memset(nulls, true, user_data->numberOfColumns);
bool res = MagmaFormatNextMagmaFormatC(
user_data->fmt, user_data->colRawValues, user_data->colValLength, nulls,
&(user_data->colRawTid));
if (res) {
MemoryContext old_context = NULL;
if (user_data->isMagmatp && fsd->fs_pstate != NULL &&
fsd->fs_pstate->rowcontext != NULL) {
/* Free memory for previous tuple if necessary */
MemoryContextReset(fsd->fs_pstate->rowcontext);
old_context = MemoryContextSwitchTo(fsd->fs_pstate->rowcontext);
}
for (int32_t i = 0; i < user_data->numberOfColumns; ++i) {
// Column not to read or column is null
if (nulls[i]) continue;
switch (fsd->attr[i]->atttypid) {
case HAWQ_TYPE_BOOL: {
user_data->colValues[i] =
BoolGetDatum(*(bool *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_INT2: {
user_data->colValues[i] =
Int16GetDatum(*(int16_t *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_INT4: {
user_data->colValues[i] =
Int32GetDatum(*(int32_t *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_INT8:
case HAWQ_TYPE_TIME:
case HAWQ_TYPE_TIMESTAMP:
case HAWQ_TYPE_TIMESTAMPTZ: {
user_data->colValues[i] =
Int64GetDatum(*(int64_t *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_FLOAT4: {
user_data->colValues[i] =
Float4GetDatum(*(float *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_FLOAT8: {
user_data->colValues[i] =
Float8GetDatum(*(double *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_JSONB:
case HAWQ_TYPE_JSON:
case HAWQ_TYPE_VARCHAR:
case HAWQ_TYPE_TEXT:
case HAWQ_TYPE_BPCHAR:
case HAWQ_TYPE_BYTE: {
SET_VARSIZE((struct varlena *)(user_data->colRawValues[i]),
user_data->colValLength[i]);
user_data->colValues[i] = PointerGetDatum(user_data->colRawValues[i]);
break;
}
case HAWQ_TYPE_NUMERIC: {
SET_VARSIZE((struct varlena *)(user_data->colRawValues[i]),
user_data->colValLength[i]);
user_data->colValues[i] =
PointerGetDatum(user_data->colRawValues[i]);
break;
}
case HAWQ_TYPE_DATE: {
user_data->colValues[i] =
Int32GetDatum(*(int32_t *)(user_data->colRawValues[i]) -
POSTGRES_EPOCH_JDATE + UNIX_EPOCH_JDATE);
break;
}
default: {
ereport(ERROR, (errmsg_internal("MAGMA:%d", fsd->attr[i]->atttypid)));
break;
}
}
}
if (user_data->isMagmatp && fsd->fs_pstate != NULL &&
fsd->fs_pstate->rowcontext != NULL) {
MemoryContextSwitchTo(old_context);
}
if (user_data->colRawTid != NULL) {
user_data->colTid = *(MagmaTidC *)(user_data->colRawTid);
ConvertTidToCtidAndRangeid(user_data->colTid,
&(slot->PRIVATE_tts_synthetic_ctid),
&(slot->tts_rangeid));
}
ps->ps_has_tuple = true;
slot->PRIVATE_tts_values = user_data->colValues;
TupSetVirtualTupleNValid(slot, user_data->numberOfColumns);
PG_RETURN_BOOL(true);
}
magma_clear(ps, true);
PG_RETURN_BOOL(false);
}
/*
* void
* magma_rescan(FileScanDesc scan)
*/
Datum magma_rescan(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ScanState *scan_state = ps->ps_scan_state;
FileScanDesc fsd = ps->ps_file_scan_desc;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
MagmaRuntimeKeys runtimeKeys;
Assert(ps->num_run_time_keys >= 0);
if (ps->num_run_time_keys == 0) {
runtimeKeys.num = 0;
runtimeKeys.keys = NULL;
} else {
Assert(ps->runtime_key_info != NULL);
runtimeKeys.num = ps->num_run_time_keys;
runtimeKeys.keys = palloc0(ps->num_run_time_keys * sizeof(MagmaRuntimeKey));
for (int i = 0; i < ps->num_run_time_keys; ++i) {
ScanKey scan_key = ps->runtime_key_info[i].scan_key;
runtimeKeys.keys[i].flag = scan_key->sk_flags;
runtimeKeys.keys[i].attnoold = scan_key->sk_attnoold;
runtimeKeys.keys[i].value =
OutputFunctionCall(&scan_key->sk_out_func, scan_key->sk_argument);
}
}
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(fsd->fs_ps_user_data);
if (user_data != NULL) {
// There are 2 cases that user_data is not null:
// 1. If this is the first rescan, at this point, we have done
// magma_beginscan() and haven't done magma_getnext() yet.
// We don't need to create user_data from scratch, just use it.
if (user_data->isFirstRescan) {
user_data->isFirstRescan = false;
MagmaFormatReScanMagmaFormatC(user_data->fmt, &runtimeKeys);
if (runtimeKeys.keys) {
pfree(runtimeKeys.keys);
}
PG_RETURN_VOID();
}
// 2. Otherwise is not the first rescan, we should do magma_clear() here.
// This case happens with the Nested Loop Exists Join. In that case, as long
// as we can get a piece of data in magma_getnext(), we will start a new
// rescan. Therefore, we didn't do mamga_clear() in magma_getnext(), which
// resulted in the dirty user_data not being cleared.
// We don't reuse the user_data since that would make the code complex, just
// clear it and create a new one below.
magma_clear(ps, true);
}
/* Initialize user data */
user_data = palloc0(sizeof(MagmaFormatUserData));
if (fsd->fs_formatter_name != NULL &&
(strncasecmp(fsd->fs_formatter_name, "magmatp", sizeof("magmatp") - 1) == 0)) {
user_data->isMagmatp = true;
} else {
user_data->isMagmatp = false;
}
/* the number of ranges is dynamic for magma table */
int32_t nRanges = 0;
ListCell *lc_split = NULL;
foreach (lc_split, fsd->fs_ps_magma_splits) {
List *split = (List *)lfirst(lc_split);
nRanges += list_length(split);
}
init_magma_format_user_data_for_read(fsd->fs_tupDesc, user_data);
/* Create formatter instance */
user_data->fmt = create_magma_formatter_instance(
NIL, fsd->fs_serializeSchema, fsd->fs_serializeSchemaLen, PG_UTF8, fsd->fs_formatter_name, nRanges);
/* Prepare database, schema, and table information */
char *dbname = database;
char *schemaname = getNamespaceNameByOid(RelationGetNamespace(fsd->fs_rd));
Assert(schemaname != NULL);
char *tablename = RelationGetRelationName(fsd->fs_rd);
MagmaFormatC_SetupTarget(user_data->fmt, dbname, schemaname, tablename);
MagmaFormatC_SetupTupDesc(user_data->fmt, user_data->numberOfColumns,
user_data->colNames, user_data->colDatatypes,
user_data->colDatatypeMods,
user_data->colIsNulls);
/* Build tuple description */
Plan *plan = fsd->fs_ps_plan;
build_magma_tuple_descrition_for_read(plan, fsd->fs_rd, user_data, fsd->fs_ps_magma_skip_tid);
/* Build plan */
CommonPlanContext ctx;
init_common_plan_context(&ctx);
plan->plan_parent_node_id = -1;
convert_extscan_to_common_plan(plan, scan_state->splits,
fsd->fs_rd, &ctx);
int32_t size = 0;
char *planstr = univPlanSerialize(ctx.univplan, &size, false);
/* Save user data */
fsd->fs_ps_user_data = (void *)user_data;
/* Begin scan with the formatter */
bool enableShm = (strcasecmp(magma_enable_shm, "ON") == 0);
MagmaFormatBeginScanMagmaFormatC(user_data->fmt, user_data->colToReads,
snapshot, planstr, size,
enableShm, fsd->fs_ps_magma_skip_tid,
magma_shm_limit_per_block * 1024);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_scan: failed to beginscan: %s(%d)", e->errMessage,
e->errCode);
}
MagmaFormatReScanMagmaFormatC(user_data->fmt, &runtimeKeys);
if (runtimeKeys.keys) {
pfree(runtimeKeys.keys);
}
free_common_plan_context(&ctx);
PG_RETURN_VOID();
}
/*
* void
* magma_endscan(FileScanDesc scan)
*/
Datum magma_endscan(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
FileScanDesc fsd = ps->ps_file_scan_desc;
MagmaFormatUserData *user_data = (MagmaFormatUserData *)(fsd->fs_ps_user_data);
// free memory in endscan, for some subquery scenarios "getnext" might not be called
if (user_data != NULL) {
magma_clear(ps, false);
}
if (fsd->values) {
// decrement relation reference count and free scan descriptor storage
RelationDecrementReferenceCount(fsd->fs_rd);
pfree(fsd->values);
fsd->values = NULL;
}
if (fsd->nulls) {
pfree(fsd->nulls);
fsd->nulls = NULL;
}
// free formatter information
if (fsd->fs_formatter_name) {
pfree(fsd->fs_formatter_name);
fsd->fs_formatter_name = NULL;
}
if (fsd->in_functions) {
pfree(fsd->in_functions);
fsd->in_functions = NULL;
}
if (fsd->typioparams) {
pfree(fsd->typioparams);
fsd->typioparams = NULL;
}
if (fsd->fs_pstate != NULL && fsd->fs_pstate->rowcontext != NULL) {
/*
* delete the row context
*/
MemoryContextDelete(fsd->fs_pstate->rowcontext);
fsd->fs_pstate->rowcontext = NULL;
}
/*
* free parse state memory
*/
if (fsd->fs_pstate != NULL) {
if (fsd->fs_pstate->attribute_buf.data)
pfree(fsd->fs_pstate->attribute_buf.data);
if (fsd->fs_pstate->line_buf.data) pfree(fsd->fs_pstate->line_buf.data);
pfree(fsd->fs_pstate);
fsd->fs_pstate = NULL;
}
PG_RETURN_VOID();
}
/*
* void
* magma_stopscan(FileScanDesc scan)
*/
Datum magma_stopscan(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
FileScanDesc fsd = ps->ps_file_scan_desc;
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(fsd->fs_ps_user_data);
TupleTableSlot *tts = ps->ps_tuple_table_slot;
if (!user_data) PG_RETURN_VOID();
MagmaFormatStopScanMagmaFormatC(user_data->fmt);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) {
MagmaFormatEndScanMagmaFormatC(user_data->fmt);
e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "Magma: failed to finish scan: %s (%d)", e->errMessage,
e->errCode);
}
MagmaFormatFreeMagmaFormatC(&(user_data->fmt));
pfree(user_data->colRawValues);
pfree(user_data->colValues);
pfree(user_data->colToReads);
pfree(user_data->colValLength);
for (int i = 0; i < user_data->numberOfColumns; ++i)
pfree(user_data->colNames[i]);
pfree(user_data->colNames);
pfree(user_data->colDatatypes);
pfree(user_data->colDatatypeMods);
pfree(user_data->colIsNulls);
pfree(user_data);
fsd->fs_ps_user_data = NULL;
/* form empty tuple */
ps->ps_has_tuple = false;
tts->PRIVATE_tts_values = NULL;
tts->PRIVATE_tts_isnull = NULL;
ExecClearTuple(tts);
} else {
elog(ERROR, "magma_stopscan: failed to stop scan: %s(%d)", e->errMessage,
e->errCode);
}
PG_RETURN_VOID();
}
/* ExternalInsertDesc
* magma_begindelete(Relation relation)
*/
Datum magma_begindelete(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
Relation relation = ps->ps_relation;
char *serializeSchema = ps->ps_magma_serializeSchema;
int serializeSchemaLen = ps->ps_magma_serializeSchemaLen;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
/* 1. Allocate and initialize the delete descriptor */
ExternalInsertDesc edd = palloc0(sizeof(ExternalInsertDescData));
ExtTableEntry *ete = GetExtTableEntry(RelationGetRelid(relation));
int formatterType = ExternalTableType_Invalid;
char *formatterName = NULL;
getExternalTableTypeStr(ete->fmtcode, ete->fmtopts, &formatterType,
&formatterName);
/* 1.1 Setup delete functions */
get_magma_delete_functions(formatterName, edd);
List *fmt_opts = NIL;
fmt_opts = lappend(fmt_opts, makeString(pstrdup(ete->fmtopts)));
/* 1.2 Allocate and initialize structure which track data parsing state */
edd->ext_pstate = (CopyStateData *)palloc0(sizeof(CopyStateData));
edd->ext_tupDesc = RelationGetDescr(relation);
/* 1.3 Initialize parse state */
/* 1.3.1 Initialize basic information for pstate */
CopyState pstate = edd->ext_pstate;
/* 1.3.2 Setup encoding information */
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to validate
* data in multibyte encodings.
*
* Each external table specifies the encoding of its external data. We will
* therefore set a client encoding and client-to-server conversion procedure
* in here (server-to-client in WET) and these will be used in the data
* conversion routines (in copy.c CopyReadLineXXX(), etc).
*/
int fmt_encoding = ete->encoding;
Insist(PG_VALID_ENCODING(fmt_encoding));
pstate->client_encoding = fmt_encoding;
Oid conversion_proc =
FindDefaultConversionProc(GetDatabaseEncoding(), fmt_encoding);
if (OidIsValid(conversion_proc)) {
/* conversion proc found */
pstate->enc_conversion_proc = palloc0(sizeof(FmgrInfo));
fmgr_info(conversion_proc, pstate->enc_conversion_proc);
} else {
/* no conversion function (both encodings are probably the same) */
pstate->enc_conversion_proc = NULL;
}
pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
pstate->encoding_embeds_ascii =
PG_ENCODING_IS_CLIENT_ONLY(pstate->client_encoding);
/* 1.3.3 Setup tuple description */
TupleDesc tup_desc = edd->ext_tupDesc;
pstate->attr_offsets = (int *)palloc0(tup_desc->natts * sizeof(int));
/* 1.3.4 Generate or convert list of attributes to process */
pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
/* 1.3.5 Convert FORCE NOT NULL name list to per-column flags, check validity
*/
pstate->force_notnull_flags = (bool *)palloc0(tup_desc->natts * sizeof(bool));
if (pstate->force_notnull) {
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
foreach (cur, attnums) {
int attnum = lfirst_int(cur);
pstate->force_notnull_flags[attnum - 1] = true;
}
}
/* 1.3.6 Take care of state that is WET specific */
Form_pg_attribute *attr = tup_desc->attrs;
ListCell *cur;
pstate->null_print_client = pstate->null_print; /* default */
pstate->fe_msgbuf = makeStringInfo(); /* use fe_msgbuf as a per-row buffer */
pstate->out_functions =
(FmgrInfo *)palloc0(tup_desc->natts * sizeof(FmgrInfo));
foreach (cur,
pstate->attnumlist) /* Get info about the columns need to process */
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &isvarlena);
fmgr_info(out_func_oid, &pstate->out_functions[attnum - 1]);
}
/*
* We need to convert null_print to client encoding, because it
* will be sent directly with CopySendString.
*/
if (pstate->need_transcoding) {
pstate->null_print_client = pg_server_to_custom(
pstate->null_print, pstate->null_print_len, pstate->client_encoding,
pstate->enc_conversion_proc);
}
/* 1.3.7 Create temporary memory context for per row process */
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype input or output routines, and should be faster than retail
* pfree's anyway.
*/
pstate->rowcontext = AllocSetContextCreate(
CurrentMemoryContext, "ExtTableMemCxt", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
/* 1.3.8 Parse format options */
char *format_str = pstrdup((char *)strVal(linitial(fmt_opts)));
char *fmt_name = NULL;
List *l = magma_parse_format_string(format_str, &fmt_name);
pstate->custom_formatter_name = fmt_name;
pstate->custom_formatter_params = l;
pfree(format_str);
/* 1.4 Initialize formatter data */
edd->ext_formatter_data = (FormatterData *)palloc0(sizeof(FormatterData));
edd->ext_formatter_data->fmt_perrow_ctx = edd->ext_pstate->rowcontext;
/* 2. Setup user data */
/* 2.1 Get database, schema, table name for the delete */
Assert(database != NULL);
Oid namespaceOid = RelationGetNamespace(relation);
char *schema = getNamespaceNameByOid(namespaceOid);
char *table = RelationGetRelationName(relation);
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)palloc0(sizeof(MagmaFormatUserData));
if (formatterName != NULL &&
(strncasecmp(formatterName, "magmatp", sizeof("magmatp") - 1) == 0)) {
user_data->isMagmatp = true;
} else {
user_data->isMagmatp = false;
}
init_magma_format_user_data_for_write(tup_desc, user_data, relation);
/* the number of ranges is dynamic for magma table */
int32_t nRanges = 0;
ListCell *lc_split = NULL;
foreach (lc_split, ps->ps_magma_splits) {
List *split = (List *)lfirst(lc_split);
nRanges += list_length(split);
}
/* 2.2 Create formatter instance */
List *fmt_opts_defelem = pstate->custom_formatter_params;
user_data->fmt = create_magma_formatter_instance(
fmt_opts_defelem, serializeSchema, serializeSchemaLen, fmt_encoding,
formatterName, nRanges);
/*prepare hash info */
int32_t nDistKeyIndex = 0;
int16_t *distKeyIndex = NULL;
fetchDistributionPolicy(relation->rd_id, &nDistKeyIndex, &distKeyIndex);
uint32 range_to_rg_map[nRanges];
List *rg = magma_build_range_to_rg_map(ps->ps_magma_splits, range_to_rg_map);
int nRg = list_length(rg);
uint16 *rgId = palloc0(sizeof(uint16) * nRg);
char **rgUrl = palloc0(sizeof(char *) * nRg);
magma_build_rg_to_url_map(ps->ps_magma_splits, rg, rgId, rgUrl);
/* 2.3 Prepare database, schema, and table information */
MagmaFormatC_SetupTarget(user_data->fmt, database, schema, table);
MagmaFormatC_SetupTupDesc(user_data->fmt, user_data->numberOfColumns,
user_data->colNames, user_data->colDatatypes,
user_data->colDatatypeMods, user_data->colIsNulls);
int *jumpHashMap = get_jump_hash_map(nRanges);
MagmaFormatC_SetupHasher(user_data->fmt, nDistKeyIndex, distKeyIndex, nRanges,
range_to_rg_map, nRg, rgId, rgUrl, jumpHashMap,
JUMP_HASH_MAP_LENGTH);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_delete: failed to begindelete: %s(%d)", e->errMessage,
e->errCode);
}
/* 2.4 Save user data */
edd->ext_ps_user_data = (void *)user_data;
/* 3. Begin insert with the formatter */
MagmaFormatBeginDeleteMagmaFormatC(user_data->fmt, snapshot);
MagmaFormatCatchedError *e1 = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e1->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_delete: failed to begindelete: %s(%d)", e1->errMessage,
e1->errCode);
}
/* 4. Save the result */
ps->ps_ext_delete_desc = edd;
PG_RETURN_POINTER(edd);
}
/* void
* magma_delete(ExternalInsertDesc extDeleteDesc,
* TupleTableSlot *tupTableSlot)
*/
Datum magma_delete(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalInsertDesc edd = ps->ps_ext_delete_desc;
TupleTableSlot *tts = ps->ps_tuple_table_slot;
/* It may be memtuple, we need to transfer it to virtual tuple */
slot_getallattrs(tts);
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(edd->ext_ps_user_data);
user_data->colTid.rangeid = DatumGetUInt16(edd->ext_rangeId);
user_data->colTid.rowid = DatumGetUInt64(edd->ext_rowId);
user_data->colValues = slot_get_values(tts);
user_data->colIsNulls = slot_get_isnull(tts);
static bool DUMMY_BOOL = true;
static int8_t DUMMY_INT8 = 0;
static int16_t DUMMY_INT16 = 0;
static int32_t DUMMY_INT32 = 0;
static int64_t DUMMY_INT64 = 0;
static float DUMMY_FLOAT = 0.0;
static double DUMMY_DOUBLE = 0.0;
static char DUMMY_TEXT[1] = "";
static TimestampType DUMMY_TIMESTAMP = {0, 0};
MemoryContext per_row_context = edd->ext_pstate->rowcontext;
MemoryContext old_context = MemoryContextSwitchTo(per_row_context);
/* Get column values */
user_data->colRawTid = (char *)(&(user_data->colTid));
for (int i = 0; i < user_data->numberOfColumns; ++i) {
int dataType = (int)(tts->tts_tupleDescriptor->attrs[i]->atttypid);
user_data->colRawValues[i] = NULL;
if (user_data->colIsNulls[i]) {
if (dataType == HAWQ_TYPE_CHAR) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT8);
} else if (dataType == HAWQ_TYPE_INT2) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT16);
} else if (dataType == HAWQ_TYPE_INT4) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT32);
} else if (dataType == HAWQ_TYPE_INT8) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT64);
} else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE ||
dataType == HAWQ_TYPE_BPCHAR ||
dataType == HAWQ_TYPE_VARCHAR ||
dataType == HAWQ_TYPE_NUMERIC) {
user_data->colRawValues[i] = (char *)(DUMMY_TEXT);
} else if (dataType == HAWQ_TYPE_FLOAT4) {
user_data->colRawValues[i] = (char *)(&DUMMY_FLOAT);
} else if (dataType == HAWQ_TYPE_FLOAT8) {
user_data->colRawValues[i] = (char *)(&DUMMY_DOUBLE);
} else if (dataType == HAWQ_TYPE_BOOL) {
user_data->colRawValues[i] = (char *)(&DUMMY_BOOL);
} else if (dataType == HAWQ_TYPE_DATE) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT32);
} else if (dataType == HAWQ_TYPE_TIME) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT64);
} else if (dataType == HAWQ_TYPE_TIMESTAMP) {
user_data->colRawValues[i] = (char *)(&DUMMY_TIMESTAMP);
} else if (STRUCTEXID == user_data->colDatatypes[i] ||
IOBASETYPEID == user_data->colDatatypes[i]) {
user_data->colRawValues[i] = (char *)(DUMMY_TEXT);
} else if (dataType == HAWQ_TYPE_INVALID) {
elog(ERROR, "HAWQ data type %s is invalid", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
} else {
elog(ERROR, "HAWQ data type %s is not supported yet", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
continue;
}
if (dataType == HAWQ_TYPE_INT4 || dataType == HAWQ_TYPE_INT8 ||
dataType == HAWQ_TYPE_FLOAT4 || dataType == HAWQ_TYPE_FLOAT8 ||
dataType == HAWQ_TYPE_INT2 || dataType == HAWQ_TYPE_CHAR ||
dataType == HAWQ_TYPE_BOOL || dataType == HAWQ_TYPE_TIME) {
user_data->colRawValues[i] = (char *)(&(user_data->colValues[i]));
} else if (dataType == HAWQ_TYPE_DATE) {
int *date = (int *)(&(user_data->colValues[i]));
*date += POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE;
user_data->colRawValues[i] = (char *)(&(user_data->colValues[i]));
} else if (dataType == HAWQ_TYPE_TIMESTAMP) {
int64_t *timestamp = (int64_t *) (&(user_data->colValues[i]));
user_data->colTimestamp[i].second = *timestamp / 1000000
+ (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24;
user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000;
int64_t days = user_data->colTimestamp[i].second / 60 / 60 / 24;
if (user_data->colTimestamp[i].nanosecond < 0 &&
(days > POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE || days < 0))
user_data->colTimestamp[i].nanosecond += 1000000000;
if(user_data->colTimestamp[i].second < 0 && user_data->colTimestamp[i].nanosecond)
user_data->colTimestamp[i].second -= 1;
user_data->colRawValues[i] = (char *) (&(user_data->colTimestamp[i]));
} else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE ||
dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR ||
dataType == HAWQ_TYPE_NUMERIC) {
user_data->colRawValues[i] = OutputFunctionCall(
&(edd->ext_pstate->out_functions[i]), user_data->colValues[i]);
} else if (STRUCTEXID == user_data->colDatatypes[i]) {
int32_t len = VARSIZE(user_data->colValues[i]);
if (len <= 0) {
elog(ERROR, "HAWQ base type(udt) %s should not be less than 0",
TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
char *pVal = DatumGetPointer(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(VARHDRSZ + len);
// set value : the first 4 byte is length, than the raw value
// SET_VARSIZE( (struct varlena * )user_data->colRawValues[i], len);
*((int32 *)(user_data->colRawValues[i])) = len;
memcpy(user_data->colRawValues[i] + VARHDRSZ, pVal, len);
} else if (IOBASETYPEID == user_data->colDatatypes[i]) {
// get the length of basetype
bool passbyval = tts->tts_tupleDescriptor->attrs[i]->attbyval;
int32_t orilen = (int32_t)(tts->tts_tupleDescriptor->attrs[i]->attlen);
int32_t len =
get_typlen_fast(dataType, passbyval, orilen, user_data->colValues[i]);
if (1 > len) { // invalid length
elog(ERROR,
"HAWQ composite type(udt) %s got an invalid length:%d",
TypeNameToString(makeTypeNameFromOid(dataType, -1)), len);
}
if (passbyval) {
// value store in Datum directly
char *val = (char *)(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(len);
memcpy(user_data->colRawValues[i], val, len);
} else {
// value stored by pointer in Datum
char *val = DatumGetPointer(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(VARHDRSZ + len);
// set value : the first 4 byte is length, than the raw value
// SET_VARSIZE( (struct varlena * )user_data->colRawValues[i], len);
*((int32 *)(user_data->colRawValues[i])) = len;
memcpy(user_data->colRawValues[i] + VARHDRSZ, val, len);
}
} else if (dataType == HAWQ_TYPE_INVALID) {
elog(ERROR, "HAWQ data type %s is invalid", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
} else {
elog(ERROR, "HAWQ data type %s is not supported yet", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
}
/* Pass to formatter to output */
MagmaFormatDeleteMagmaFormatC(user_data->fmt, user_data->colRawTid,
user_data->colRawValues, user_data->colIsNulls);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_delete: failed to delete: %s(%d)", e->errMessage,
e->errCode);
}
MemoryContextReset(per_row_context);
MemoryContextSwitchTo(old_context);
ps->ps_tuple_oid = InvalidOid;
PG_RETURN_VOID();
}
/* void
* magma_enddelete(ExternalInsertDesc extDeleteDesc)
*/
Datum magma_enddelete(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalInsertDesc edd = ps->ps_ext_delete_desc;
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(edd->ext_ps_user_data);
MagmaFormatEndDeleteMagmaFormatC(user_data->fmt);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_delete: failed to end delete: %s(%d)", e->errMessage,
e->errCode);
}
MagmaFormatFreeMagmaFormatC(&(user_data->fmt));
for (int i = 0; i < user_data->numberOfColumns; ++i) {
pfree(user_data->colNames[i]);
}
pfree(user_data->colNames);
/*
* DO NOT pfree colValues and colIsNulls here since ExecutorEnd will call
* cleanup_slot to pfree slot->PRIVATE_tts_values and
* slot->PRIVATE_tts_isnull. Otherwise it will be freed 2 times.
*
* pfree(user_data->colValues);
* pfree(user_data->colIsNulls);
*/
pfree(user_data->colDatatypes);
pfree(user_data->colRawValues);
pfree(user_data);
if (edd->ext_formatter_data) pfree(edd->ext_formatter_data);
if (edd->ext_pstate != NULL && edd->ext_pstate->rowcontext != NULL) {
/*
* delete the row context
*/
MemoryContextDelete(edd->ext_pstate->rowcontext);
edd->ext_pstate->rowcontext = NULL;
}
pfree(edd);
PG_RETURN_VOID();
}
/* ExternalInsertDesc
* magma_beginupdate(Relation relation)
*/
Datum magma_beginupdate(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
Relation relation = ps->ps_relation;
char *serializeSchema = ps->ps_magma_serializeSchema;
int serializeSchemaLen = ps->ps_magma_serializeSchemaLen;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
/* 1. Allocate and initialize the update descriptor */
ExternalInsertDesc eud = palloc0(sizeof(ExternalInsertDescData));
ExtTableEntry *ete = GetExtTableEntry(RelationGetRelid(relation));
int formatterType = ExternalTableType_Invalid;
char *formatterName = NULL;
getExternalTableTypeStr(ete->fmtcode, ete->fmtopts, &formatterType,
&formatterName);
/* 1.1 Setup update functions */
get_magma_update_functions(formatterName, eud);
List *fmt_opts = NIL;
fmt_opts = lappend(fmt_opts, makeString(pstrdup(ete->fmtopts)));
/* 1.2 Allocate and initialize structure which track data parsing state */
eud->ext_pstate = (CopyStateData *)palloc0(sizeof(CopyStateData));
eud->ext_tupDesc = RelationGetDescr(relation);
/* 1.3 Initialize parse state */
/* 1.3.1 Initialize basic information for pstate */
CopyState pstate = eud->ext_pstate;
/* 1.3.2 Setup encoding information */
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to validate
* data in multibyte encodings.
*
* Each external table specifies the encoding of its external data. We will
* therefore set a client encoding and client-to-server conversion procedure
* in here (server-to-client in WET) and these will be used in the data
* conversion routines (in copy.c CopyReadLineXXX(), etc).
*/
int fmt_encoding = ete->encoding;
Insist(PG_VALID_ENCODING(fmt_encoding));
pstate->client_encoding = fmt_encoding;
Oid conversion_proc =
FindDefaultConversionProc(GetDatabaseEncoding(), fmt_encoding);
if (OidIsValid(conversion_proc)) {
/* conversion proc found */
pstate->enc_conversion_proc = palloc0(sizeof(FmgrInfo));
fmgr_info(conversion_proc, pstate->enc_conversion_proc);
} else {
/* no conversion function (both encodings are probably the same) */
pstate->enc_conversion_proc = NULL;
}
pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
pstate->encoding_embeds_ascii =
PG_ENCODING_IS_CLIENT_ONLY(pstate->client_encoding);
/* 1.3.3 Setup tuple description */
TupleDesc tup_desc = eud->ext_tupDesc;
pstate->attr_offsets = (int *)palloc0(tup_desc->natts * sizeof(int));
/* 1.3.4 Generate or convert list of attributes to process */
pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
/* 1.3.5 Convert FORCE NOT NULL name list to per-column flags, check validity
*/
pstate->force_notnull_flags = (bool *)palloc0(tup_desc->natts * sizeof(bool));
if (pstate->force_notnull) {
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
foreach (cur, attnums) {
int attnum = lfirst_int(cur);
pstate->force_notnull_flags[attnum - 1] = true;
}
}
/* 1.3.6 Take care of state that is WET specific */
Form_pg_attribute *attr = tup_desc->attrs;
ListCell *cur;
pstate->null_print_client = pstate->null_print; /* default */
pstate->fe_msgbuf = makeStringInfo(); /* use fe_msgbuf as a per-row buffer */
pstate->out_functions =
(FmgrInfo *)palloc0(tup_desc->natts * sizeof(FmgrInfo));
foreach (cur,
pstate->attnumlist) /* Get info about the columns need to process */
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &isvarlena);
fmgr_info(out_func_oid, &pstate->out_functions[attnum - 1]);
}
/*
* We need to convert null_print to client encoding, because it
* will be sent directly with CopySendString.
*/
if (pstate->need_transcoding) {
pstate->null_print_client = pg_server_to_custom(
pstate->null_print, pstate->null_print_len, pstate->client_encoding,
pstate->enc_conversion_proc);
}
/* 1.3.7 Create temporary memory context for per row process */
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype input or output routines, and should be faster than retail
* pfree's anyway.
*/
pstate->rowcontext = AllocSetContextCreate(
CurrentMemoryContext, "ExtTableMemCxt", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
/* 1.3.8 Parse format options */
char *format_str = pstrdup((char *)strVal(linitial(fmt_opts)));
char *fmt_name = NULL;
List *l = magma_parse_format_string(format_str, &fmt_name);
pstate->custom_formatter_name = fmt_name;
pstate->custom_formatter_params = l;
pfree(format_str);
/* 1.4 Initialize formatter data */
eud->ext_formatter_data = (FormatterData *)palloc0(sizeof(FormatterData));
eud->ext_formatter_data->fmt_perrow_ctx = eud->ext_pstate->rowcontext;
/* 2. Setup user data */
/* 2.1 Get database, schema, table name for the update */
Assert(database != NULL);
Oid namespaceOid = RelationGetNamespace(relation);
char *schema = getNamespaceNameByOid(namespaceOid);
char *table = RelationGetRelationName(relation);
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)palloc0(sizeof(MagmaFormatUserData));
if (formatterName != NULL &&
(strncasecmp(formatterName, "magmatp", sizeof("magmatp") - 1) == 0)) {
user_data->isMagmatp = true;
} else {
user_data->isMagmatp = false;
}
init_magma_format_user_data_for_write(tup_desc, user_data, relation);
/* the number of ranges is dynamic for magma table */
int32_t nRanges = 0;
ListCell *lc_split = NULL;
foreach (lc_split, ps->ps_magma_splits) {
List *split = (List *)lfirst(lc_split);
nRanges += list_length(split);
}
/* 2.2 Create formatter instance */
bool isexternal = false;
get_magma_category_info(ete->fmtopts, &isexternal);
List *fmt_opts_defelem = pstate->custom_formatter_params;
user_data->fmt = create_magma_formatter_instance(
fmt_opts_defelem, serializeSchema, serializeSchemaLen, fmt_encoding,
formatterName, nRanges);
/*prepare hash info */
int32_t nDistKeyIndex = 0;
int16_t *distKeyIndex = NULL;
fetchDistributionPolicy(relation->rd_id, &nDistKeyIndex, &distKeyIndex);
int32_t range_to_rg_map[nRanges];
List *rg = magma_build_range_to_rg_map(ps->ps_magma_splits, range_to_rg_map);
int nRg = list_length(rg);
int16_t *rgId = palloc0(sizeof(int16_t) * nRg);
char **rgUrl = palloc0(sizeof(char *) * nRg);
magma_build_rg_to_url_map(ps->ps_magma_splits, rg, rgId, rgUrl);
/* 2.3 Prepare database, schema, and table information */
MagmaFormatC_SetupTarget(user_data->fmt, database, schema, table);
MagmaFormatC_SetupTupDesc(user_data->fmt, user_data->numberOfColumns,
user_data->colNames, user_data->colDatatypes,
user_data->colDatatypeMods, user_data->colIsNulls);
int *jumpHashMap = get_jump_hash_map(nRanges);
MagmaFormatC_SetupHasher(user_data->fmt, nDistKeyIndex, distKeyIndex, nRanges,
range_to_rg_map, nRg, rgId, rgUrl, jumpHashMap,
JUMP_HASH_MAP_LENGTH);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_update: failed to begin update: %s(%d)", e->errMessage,
e->errCode);
}
/* 2.4 Save user data */
eud->ext_ps_user_data = (void *)user_data;
/* 3. Begin insert with the formatter */
MagmaFormatBeginUpdateMagmaFormatC(user_data->fmt, snapshot);
MagmaFormatCatchedError *e1 = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e1->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_update: failed to begin update: %s(%d)", e1->errMessage,
e1->errCode);
}
/* 4. Save the result */
ps->ps_ext_update_desc = eud;
PG_RETURN_POINTER(eud);
}
/* void
* magma_delete(ExternalInsertDesc extUpdDesc,
* TupleTableSlot *tupTableSlot)
*/
Datum magma_update(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalInsertDesc eud = ps->ps_ext_update_desc;
TupleTableSlot *tts = ps->ps_tuple_table_slot;
/* It may be memtuple, we need to transfer it to virtual tuple */
slot_getallattrs(tts);
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(eud->ext_ps_user_data);
user_data->colTid.rangeid = DatumGetUInt16(eud->ext_rangeId);
user_data->colTid.rowid = DatumGetUInt64(eud->ext_rowId);
user_data->colValues = slot_get_values(tts);
user_data->colIsNulls = slot_get_isnull(tts);
static bool DUMMY_BOOL = true;
static int8_t DUMMY_INT8 = 0;
static int16_t DUMMY_INT16 = 0;
static int32_t DUMMY_INT32 = 0;
static int64_t DUMMY_INT64 = 0;
static float DUMMY_FLOAT = 0.0;
static double DUMMY_DOUBLE = 0.0;
static char DUMMY_TEXT[1] = "";
static TimestampType DUMMY_TIMESTAMP = {0, 0};
MemoryContext per_row_context = eud->ext_pstate->rowcontext;
MemoryContext old_context = MemoryContextSwitchTo(per_row_context);
/* Get column values */
user_data->colRawTid = (char *)(&(user_data->colTid));
for (int i = 0; i < user_data->numberOfColumns; ++i) {
int dataType = (int)(tts->tts_tupleDescriptor->attrs[i]->atttypid);
user_data->colRawValues[i] = NULL;
if (user_data->colIsNulls[i]) {
if (dataType == HAWQ_TYPE_CHAR) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT8);
} else if (dataType == HAWQ_TYPE_INT4) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT32);
} else if (dataType == HAWQ_TYPE_INT8) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT64);
} else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE ||
dataType == HAWQ_TYPE_BPCHAR ||
dataType == HAWQ_TYPE_VARCHAR ||
dataType == HAWQ_TYPE_NUMERIC) {
user_data->colRawValues[i] = (char *)(DUMMY_TEXT);
} else if (dataType == HAWQ_TYPE_FLOAT4) {
user_data->colRawValues[i] = (char *)(&DUMMY_FLOAT);
} else if (dataType == HAWQ_TYPE_FLOAT8) {
user_data->colRawValues[i] = (char *)(&DUMMY_DOUBLE);
} else if (dataType == HAWQ_TYPE_INT2) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT16);
} else if (dataType == HAWQ_TYPE_BOOL) {
user_data->colRawValues[i] = (char *)(&DUMMY_BOOL);
} else if (dataType == HAWQ_TYPE_DATE) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT32);
} else if (dataType == HAWQ_TYPE_TIME) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT64);
} else if (dataType == HAWQ_TYPE_TIMESTAMP) {
user_data->colRawValues[i] = (char *)(&DUMMY_TIMESTAMP);
} else if (STRUCTEXID == user_data->colDatatypes[i] ||
IOBASETYPEID == user_data->colDatatypes[i]) {
user_data->colRawValues[i] = (char *)(DUMMY_TEXT);
} else if (dataType == HAWQ_TYPE_INVALID) {
elog(ERROR, "HAWQ data type %s is invalid", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
} else {
elog(ERROR, "HAWQ data type %s is not supported yet", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
continue;
}
if (dataType == HAWQ_TYPE_INT4 || dataType == HAWQ_TYPE_INT8 ||
dataType == HAWQ_TYPE_FLOAT4 || dataType == HAWQ_TYPE_FLOAT8 ||
dataType == HAWQ_TYPE_INT2 || dataType == HAWQ_TYPE_CHAR ||
dataType == HAWQ_TYPE_BOOL || dataType == HAWQ_TYPE_TIME) {
user_data->colRawValues[i] = (char *)(&(user_data->colValues[i]));
} else if (dataType == HAWQ_TYPE_DATE) {
int *date = (int *)(&(user_data->colValues[i]));
*date += POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE;
user_data->colRawValues[i] = (char *)(&(user_data->colValues[i]));
} else if (dataType == HAWQ_TYPE_TIMESTAMP) {
int64_t *timestamp = (int64_t *) (&(user_data->colValues[i]));
user_data->colTimestamp[i].second = *timestamp / 1000000
+ (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24;
user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000;
int64_t days = user_data->colTimestamp[i].second / 60 / 60 / 24;
if (user_data->colTimestamp[i].nanosecond < 0 &&
(days > POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE || days < 0))
user_data->colTimestamp[i].nanosecond += 1000000000;
if(user_data->colTimestamp[i].second < 0 && user_data->colTimestamp[i].nanosecond)
user_data->colTimestamp[i].second -= 1;
user_data->colRawValues[i] = (char *) (&(user_data->colTimestamp[i]));
} else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE ||
dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR) {
user_data->colRawValues[i] = OutputFunctionCall(
&(eud->ext_pstate->out_functions[i]), user_data->colValues[i]);
} else if (dataType == HAWQ_TYPE_NUMERIC) {
user_data->colRawValues[i] = OutputFunctionCall(
&(eud->ext_pstate->out_functions[i]), user_data->colValues[i]);
Numeric num = DatumGetNumeric(user_data->colValues[i]);
if (NUMERIC_IS_NAN(num)) {
user_data->colIsNulls[i] = true;
}
} else if (STRUCTEXID == user_data->colDatatypes[i]) {
int32_t len = VARSIZE(user_data->colValues[i]);
if (len <= 0) {
elog(ERROR, "HAWQ base type(udt) %s should not be less than 0",
TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
char *pVal = DatumGetPointer(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(VARHDRSZ + len);
// set value : the first 4 byte is length, than the raw value
// SET_VARSIZE( (struct varlena * )user_data->colRawValues[i], len);
*((int32 *)(user_data->colRawValues[i])) = len;
memcpy(user_data->colRawValues[i] + VARHDRSZ, pVal, len);
} else if (IOBASETYPEID == user_data->colDatatypes[i]) {
// get the length of basetype
bool passbyval = tts->tts_tupleDescriptor->attrs[i]->attbyval;
int32_t orilen = (int32_t)(tts->tts_tupleDescriptor->attrs[i]->attlen);
int32_t len =
get_typlen_fast(dataType, passbyval, orilen, user_data->colValues[i]);
if (1 > len) { // invalid length
elog(ERROR,
"HAWQ composite type(udt) %s got an invalid length:%d",
TypeNameToString(makeTypeNameFromOid(dataType, -1)), len);
}
if (passbyval) {
// value store in Datum directly
char *val = (char *)(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(len);
memcpy(user_data->colRawValues[i], val, len);
} else {
// value stored by pointer in Datum
char *val = DatumGetPointer(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(VARHDRSZ + len);
// set value : the first 4 byte is length, than the raw value
// SET_VARSIZE( (struct varlena * )user_data->colRawValues[i], len);
*((int32 *)(user_data->colRawValues[i])) = len;
memcpy(user_data->colRawValues[i] + VARHDRSZ, val, len);
}
} else if (dataType == HAWQ_TYPE_INVALID) {
elog(ERROR, "HAWQ data type %s is invalid", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
} else {
elog(ERROR, "HAWQ data type %s is not supported yet", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
}
/* Pass to formatter to output */
int updateCount = MagmaFormatUpdateMagmaFormatC(user_data->fmt, user_data->colRawTid,
user_data->colRawValues, user_data->colIsNulls);
ps->ps_update_count = updateCount;
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_update: failed to update: %s(%d)", e->errMessage,
e->errCode);
}
MemoryContextReset(per_row_context);
MemoryContextSwitchTo(old_context);
ps->ps_tuple_oid = InvalidOid;
// PG_RETURN_VOID();
PG_RETURN_UINT32(updateCount);
}
/* void
* magma_endupdate(ExternalInsertDesc extUpdDesc)
*/
Datum magma_endupdate(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalInsertDesc eud = ps->ps_ext_update_desc;
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(eud->ext_ps_user_data);
int updateCount = MagmaFormatEndUpdateMagmaFormatC(user_data->fmt);
ps->ps_update_count = updateCount;
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_update: failed to end update: %s(%d)", e->errMessage,
e->errCode);
}
MagmaFormatFreeMagmaFormatC(&(user_data->fmt));
for (int i = 0; i < user_data->numberOfColumns; ++i) {
pfree(user_data->colNames[i]);
}
pfree(user_data->colNames);
pfree(user_data->colDatatypes);
pfree(user_data->colRawValues);
if (eud->ext_formatter_data) {
pfree(eud->ext_formatter_data);
}
if (eud->ext_pstate != NULL && eud->ext_pstate->rowcontext != NULL) {
/*
* delete the row context
*/
MemoryContextDelete(eud->ext_pstate->rowcontext);
eud->ext_pstate->rowcontext = NULL;
}
pfree(eud);
// PG_RETURN_VOID();
PG_RETURN_UINT32(updateCount);
}
/*
* ExternalInsertDesc
* magma_insert_init(Relation relation,
* int formatterType,
* char *formatterName)
*/
Datum magma_insert_init(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
Relation relation = ps->ps_relation;
int formatterType = ps->ps_formatter_type;
char *formatterName = ps->ps_formatter_name;
char *serializeSchema = ps->ps_magma_serializeSchema;
int serializeSchemaLen = ps->ps_magma_serializeSchemaLen;
MagmaSnapshot *snapshot = &(ps->ps_snapshot);
/* 1. Allocate and initialize the insert descriptor */
ExternalInsertDesc eid = palloc0(sizeof(ExternalInsertDescData));
eid->ext_formatter_type = formatterType;
eid->ext_formatter_name = formatterName;
/* 1.1 Setup insert functions */
get_magma_insert_functions(formatterName, eid);
/* 1.2 Initialize basic information */
eid->ext_rel = relation;
eid->ext_noop = (Gp_role == GP_ROLE_DISPATCH);
eid->ext_formatter_data = NULL;
/* 1.3 Get URI string */
ExtTableEntry *ete = GetExtTableEntry(RelationGetRelid(relation));
Value *v = linitial(ete->locations);
char *uri_str = pstrdup(v->val.str);
eid->ext_uri = uri_str;
/* 1.4 Allocate and initialize structure which track data parsing state */
eid->ext_pstate = (CopyStateData *)palloc0(sizeof(CopyStateData));
eid->ext_tupDesc = RelationGetDescr(relation);
eid->ext_values = (Datum *)palloc0(eid->ext_tupDesc->natts * sizeof(Datum));
eid->ext_nulls = (bool *)palloc0(eid->ext_tupDesc->natts * sizeof(bool));
/* 1.5 Get format options */
List *fmt_opts = NIL;
fmt_opts = lappend(fmt_opts, makeString(pstrdup(ete->fmtopts)));
/* 1.6 Initialize parse state */
/* 1.6.1 Initialize basic information for pstate */
CopyState pstate = eid->ext_pstate;
pstate->fe_eof = false;
pstate->eol_type = EOL_UNKNOWN;
pstate->eol_str = NULL;
pstate->cur_relname = RelationGetRelationName(relation);
pstate->cur_lineno = 0;
pstate->err_loc_type = ROWNUM_ORIGINAL;
pstate->cur_attname = NULL;
pstate->raw_buf_done = true; /* true so we will read data in first run */
pstate->line_done = true;
pstate->bytesread = 0;
pstate->custom = false;
pstate->header_line = false;
pstate->fill_missing = false;
pstate->line_buf_converted = false;
pstate->raw_buf_index = 0;
pstate->processed = 0;
pstate->filename = uri_str;
pstate->copy_dest = COPY_EXTERNAL_SOURCE;
pstate->missing_bytes = 0;
pstate->rel = relation;
/* 1.6.2 Setup encoding information */
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to validate
* data in multibyte encodings.
*
* Each external table specifies the encoding of its external data. We will
* therefore set a client encoding and client-to-server conversion procedure
* in here (server-to-client in WET) and these will be used in the data
* conversion routines (in copy.c CopyReadLineXXX(), etc).
*/
int fmt_encoding = ete->encoding;
Insist(PG_VALID_ENCODING(fmt_encoding));
pstate->client_encoding = fmt_encoding;
Oid conversion_proc =
FindDefaultConversionProc(GetDatabaseEncoding(), fmt_encoding);
if (OidIsValid(conversion_proc)) {
/* conversion proc found */
pstate->enc_conversion_proc = palloc0(sizeof(FmgrInfo));
fmgr_info(conversion_proc, pstate->enc_conversion_proc);
} else {
/* no conversion function (both encodings are probably the same) */
pstate->enc_conversion_proc = NULL;
}
pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
pstate->encoding_embeds_ascii =
PG_ENCODING_IS_CLIENT_ONLY(pstate->client_encoding);
/* 1.6.3 Parse format options */
char *format_str = pstrdup((char *)strVal(linitial(fmt_opts)));
char *fmt_name = NULL;
List *l = magma_parse_format_string(format_str, &fmt_name);
pstate->custom_formatter_name = fmt_name;
pstate->custom_formatter_params = l;
pfree(format_str);
/* 1.6.4 Setup tuple description */
TupleDesc tup_desc = eid->ext_tupDesc;
pstate->attr_offsets = (int *)palloc0(tup_desc->natts * sizeof(int));
/* 1.6.5 Generate or convert list of attributes to process */
pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
/* 1.6.6 Convert FORCE NOT NULL name list to per-column flags, check validity
*/
pstate->force_notnull_flags = (bool *)palloc0(tup_desc->natts * sizeof(bool));
if (pstate->force_notnull) {
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
foreach (cur, attnums) {
int attnum = lfirst_int(cur);
pstate->force_notnull_flags[attnum - 1] = true;
}
}
/* 1.6.7 Take care of state that is WET specific */
Form_pg_attribute *attr = tup_desc->attrs;
ListCell *cur;
pstate->null_print_client = pstate->null_print; /* default */
pstate->fe_msgbuf = makeStringInfo(); /* use fe_msgbuf as a per-row buffer */
pstate->out_functions =
(FmgrInfo *)palloc0(tup_desc->natts * sizeof(FmgrInfo));
foreach (cur,
pstate->attnumlist) /* Get info about the columns need to process */
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid, &isvarlena);
fmgr_info(out_func_oid, &pstate->out_functions[attnum - 1]);
}
/*
* We need to convert null_print to client encoding, because it
* will be sent directly with CopySendString.
*/
if (pstate->need_transcoding) {
pstate->null_print_client = pg_server_to_custom(
pstate->null_print, pstate->null_print_len, pstate->client_encoding,
pstate->enc_conversion_proc);
}
/* 1.6.8 Create temporary memory context for per row process */
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype input or output routines, and should be faster than retail
* pfree's anyway.
*/
pstate->rowcontext = AllocSetContextCreate(
CurrentMemoryContext, "ExtTableMemCxt", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
/* 1.7 Initialize formatter data */
eid->ext_formatter_data = (FormatterData *)palloc0(sizeof(FormatterData));
eid->ext_formatter_data->fmt_perrow_ctx = eid->ext_pstate->rowcontext;
/* 2. Setup user data */
/* 2.1 Create formatter instance */
Assert(database != NULL);
Oid namespaceOid = RelationGetNamespace(relation);
char *schema = getNamespaceNameByOid(namespaceOid);
char *table = RelationGetRelationName(relation);
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)palloc0(sizeof(MagmaFormatUserData));
if (formatterName != NULL &&
(strncasecmp(formatterName, "magmatp", sizeof("magmatp") - 1) == 0)) {
user_data->isMagmatp = true;
} else {
user_data->isMagmatp = false;
}
/* the number of ranges is dynamic for magma table */
int32_t nRanges = 0;
ListCell *lc_split = NULL;
foreach (lc_split, ps->ps_magma_splits) {
List *split = (List *)lfirst(lc_split);
nRanges += list_length(split);
}
init_magma_format_user_data_for_write(tup_desc, user_data, relation);
/*2.2 Create formatter instance*/
bool isexternal = false;
get_magma_category_info(ete->fmtopts, &isexternal);
List *fmt_opts_defelem = pstate->custom_formatter_params;
user_data->fmt = create_magma_formatter_instance(
fmt_opts_defelem, serializeSchema, serializeSchemaLen, fmt_encoding,
formatterName, nRanges);
/* prepare hash info */
int32_t nDistKeyIndex = 0;
int16_t *distKeyIndex = NULL;
fetchDistributionPolicy(relation->rd_id, &nDistKeyIndex, &distKeyIndex);
uint32 range_to_rg_map[nRanges];
List *rg = magma_build_range_to_rg_map(ps->ps_magma_splits, range_to_rg_map);
int nRg = list_length(rg);
uint16 *rgId = palloc0(sizeof(uint16) * nRg);
char **rgUrl = palloc0(sizeof(char *) * nRg);
magma_build_rg_to_url_map(ps->ps_magma_splits, rg, rgId, rgUrl);
/* 2.3 Prepare database, schema, and table information */
MagmaFormatC_SetupTarget(user_data->fmt, database, schema, table);
MagmaFormatC_SetupTupDesc(user_data->fmt, user_data->numberOfColumns,
user_data->colNames, user_data->colDatatypes,
user_data->colDatatypeMods, user_data->colIsNulls);
int *jumpHashMap = get_jump_hash_map(nRanges);
MagmaFormatC_SetupHasher(user_data->fmt, nDistKeyIndex, distKeyIndex, nRanges,
range_to_rg_map, nRg, rgId, rgUrl, jumpHashMap,
JUMP_HASH_MAP_LENGTH);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_insert: failed to begin insert: %s (%d)", e->errMessage,
e->errCode);
}
eid->ext_ps_user_data = (void *)user_data;
/* 3. Begin insert with the formatter */
MagmaFormatBeginInsertMagmaFormatC(user_data->fmt, snapshot);
MagmaFormatCatchedError *e1 = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e1->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_insert: failed to begin insert: %s (%d)", e1->errMessage,
e1->errCode);
}
/* 4. Save the result */
ps->ps_ext_insert_desc = eid;
PG_RETURN_POINTER(eid);
}
/*
* Oid
* magma_insert(ExternalInsertDesc extInsertDesc,
* TupleTableSlot *tupTableSlot)
*/
Datum magma_insert(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalInsertDesc eid = ps->ps_ext_insert_desc;
TupleTableSlot *tts = ps->ps_tuple_table_slot;
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(eid->ext_ps_user_data);
user_data->colValues = slot_get_values(tts);
user_data->colIsNulls = slot_get_isnull(tts);
static bool DUMMY_BOOL = true;
static int8_t DUMMY_INT8 = 0;
static int16_t DUMMY_INT16 = 0;
static int32_t DUMMY_INT32 = 0;
static int64_t DUMMY_INT64 = 0;
static float DUMMY_FLOAT = 0.0;
static double DUMMY_DOUBLE = 0.0;
static char DUMMY_TEXT[1] = "";
static int32_t DUMMY_DATE = 0;
static int64_t DUMMY_TIME = 0;
static TimestampType DUMMY_TIMESTAMP = {0, 0};
TupleDesc tupdesc = tts->tts_tupleDescriptor;
user_data->numberOfColumns = tupdesc->natts;
MemoryContext per_row_context = eid->ext_pstate->rowcontext;
MemoryContext old_context = MemoryContextSwitchTo(per_row_context);
/* Get column values */
for (int i = 0; i < user_data->numberOfColumns; ++i) {
int dataType = (int)(tupdesc->attrs[i]->atttypid);
user_data->colRawValues[i] = NULL;
if (user_data->colIsNulls[i]) {
if (dataType == HAWQ_TYPE_CHAR) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT8);
} else if (dataType == HAWQ_TYPE_INT2) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT16);
} else if (dataType == HAWQ_TYPE_INT4) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT32);
} else if (dataType == HAWQ_TYPE_INT8) {
user_data->colRawValues[i] = (char *)(&DUMMY_INT64);
} else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE ||
dataType == HAWQ_TYPE_BPCHAR ||
dataType == HAWQ_TYPE_VARCHAR ||
dataType == HAWQ_TYPE_NUMERIC ||
dataType == HAWQ_TYPE_JSON ||
dataType == HAWQ_TYPE_JSONB) {
user_data->colRawValues[i] = (char *)(DUMMY_TEXT);
} else if (dataType == HAWQ_TYPE_FLOAT4) {
user_data->colRawValues[i] = (char *)(&DUMMY_FLOAT);
} else if (dataType == HAWQ_TYPE_FLOAT8) {
user_data->colRawValues[i] = (char *)(&DUMMY_DOUBLE);
} else if (dataType == HAWQ_TYPE_BOOL) {
user_data->colRawValues[i] = (char *)(&DUMMY_BOOL);
} else if (dataType == HAWQ_TYPE_DATE) {
user_data->colRawValues[i] = (char *)(&DUMMY_DATE);
} else if (dataType == HAWQ_TYPE_TIME) {
user_data->colRawValues[i] = (char *)(&DUMMY_TIME);
} else if (dataType == HAWQ_TYPE_TIMESTAMP) {
user_data->colRawValues[i] = (char *)(&DUMMY_TIMESTAMP);
}
// do not adjust the rowtype/basetype to any other location
else if (STRUCTEXID == user_data->colDatatypes[i] ||
IOBASETYPEID == user_data->colDatatypes[i]) {
user_data->colRawValues[i] = (char *)(DUMMY_TEXT);
} else if (dataType == HAWQ_TYPE_INVALID) {
elog(ERROR, "HAWQ data type %s is invalid", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
} else {
elog(ERROR, "HAWQ data type %s is not supported yet", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
continue;
}
if (dataType == HAWQ_TYPE_CHAR || dataType == HAWQ_TYPE_INT2 ||
dataType == HAWQ_TYPE_INT4 || dataType == HAWQ_TYPE_INT8 ||
dataType == HAWQ_TYPE_FLOAT4 || dataType == HAWQ_TYPE_FLOAT8 ||
dataType == HAWQ_TYPE_BOOL || dataType == HAWQ_TYPE_TIME) {
user_data->colRawValues[i] = (char *)(&(user_data->colValues[i]));
} else if (dataType == HAWQ_TYPE_DATE) {
int *date = (int *)(&(user_data->colValues[i]));
*date += POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE;
user_data->colRawValues[i] = (char *)(&(user_data->colValues[i]));
} else if (dataType == HAWQ_TYPE_TIMESTAMP) {
int64_t *timestamp = (int64_t *) (&(user_data->colValues[i]));
user_data->colTimestamp[i].second = *timestamp / 1000000
+ (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24;
user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000;
int64_t days = user_data->colTimestamp[i].second / 60 / 60 / 24;
if (user_data->colTimestamp[i].nanosecond < 0 &&
(days > POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE || days < 0))
user_data->colTimestamp[i].nanosecond += 1000000000;
if(user_data->colTimestamp[i].second < 0 && user_data->colTimestamp[i].nanosecond)
user_data->colTimestamp[i].second -= 1;
user_data->colRawValues[i] = (char *) (&(user_data->colTimestamp[i]));
} else if (dataType == HAWQ_TYPE_NUMERIC) {
Numeric num = DatumGetNumeric(user_data->colValues[i]);
user_data->colRawValues[i] = num;
if (NUMERIC_IS_NAN(num)) // XXX(chiyang): problematic legacy NaN
{
user_data->colIsNulls[i] = true;
}
} else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE ||
dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR
|| dataType == HAWQ_TYPE_JSON || dataType == HAWQ_TYPE_JSONB) {
struct varlena *varlen =
(struct varlena *)DatumGetPointer(user_data->colValues[i]);
user_data->colValLength[i] = VARSIZE_ANY_EXHDR(varlen);
user_data->colRawValues[i] = VARDATA_ANY(varlen);
} else if (STRUCTEXID == user_data->colDatatypes[i]) {
int32_t len = VARSIZE(user_data->colValues[i]);
if (len <= 0) {
elog(ERROR, "HAWQ base type(udt) %s should not be less than 0",
TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
char *pVal = DatumGetPointer(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(VARHDRSZ + len);
// set value : the first 4 byte is length, than the raw value
// SET_VARSIZE( (struct varlena * )user_data->colRawValues[i], len);
*((int32 *)(user_data->colRawValues[i])) = len;
memcpy(user_data->colRawValues[i] + VARHDRSZ, pVal, len);
} else if (IOBASETYPEID == user_data->colDatatypes[i]) {
// get the length of basetype
bool passbyval = tupdesc->attrs[i]->attbyval;
int32_t orilen = (int32_t)(tupdesc->attrs[i]->attlen);
int32_t len =
get_typlen_fast(dataType, passbyval, orilen, user_data->colValues[i]);
if (1 > len) { // invalid length
elog(ERROR,
"HAWQ composite type(udt) %s got an invalid length:%d",
TypeNameToString(makeTypeNameFromOid(dataType, -1)), len);
}
if (passbyval) {
// value store in Datum directly
char *val = &(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(VARHDRSZ + len);
*((int32 *)(user_data->colRawValues[i])) = len;
memcpy(user_data->colRawValues[i] + VARHDRSZ, val, len);
} else {
// value stored by pointer in Datum
char *val = DatumGetPointer(user_data->colValues[i]);
user_data->colRawValues[i] = palloc0(VARHDRSZ + len);
// set value : the first 4 byte is length, than the raw value
// SET_VARSIZE( (struct varlena * )user_data->colRawValues[i], len);
*((int32 *)(user_data->colRawValues[i])) = len;
memcpy(user_data->colRawValues[i] + VARHDRSZ, val, len);
}
} else if (dataType == HAWQ_TYPE_INVALID) {
elog(ERROR, "HAWQ data type %s is invalid", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
} else {
elog(ERROR, "HAWQ data type %s is not supported yet", TypeNameToString(makeTypeNameFromOid(dataType, -1)));
}
}
/* Pass to formatter to output */
MagmaFormatInsertMagmaFormatC(user_data->fmt, user_data->colRawValues,
user_data->colValLength, user_data->colIsNulls);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_insert: failed to insert: %s(%d)", e->errMessage,
e->errCode);
}
ps->ps_tuple_oid = InvalidOid;
MemoryContextReset(per_row_context);
MemoryContextSwitchTo(old_context);
PG_RETURN_OID(InvalidOid);
}
/*
* void
* magma_insert_finish(ExternalInsertDesc extInsertDesc)
*/
Datum magma_insert_finish(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
PlugStorage ps = (PlugStorage)(fcinfo->context);
ExternalInsertDesc eid = ps->ps_ext_insert_desc;
MagmaFormatUserData *user_data =
(MagmaFormatUserData *)(eid->ext_ps_user_data);
MagmaFormatEndInsertMagmaFormatC(user_data->fmt);
MagmaFormatCatchedError *e = MagmaFormatGetErrorMagmaFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
elog(ERROR, "magma_insert: failed to end insert: %s(%d)", e->errMessage,
e->errCode);
}
MagmaFormatFreeMagmaFormatC(&(user_data->fmt));
for (int i = 0; i < user_data->numberOfColumns; ++i) {
pfree(user_data->colNames[i]);
}
pfree(user_data->colNames);
pfree(user_data->colDatatypes);
pfree(user_data->colRawValues);
pfree(user_data->colDatatypeMods);
pfree(user_data->colTimestamp);
pfree(user_data);
if (eid->ext_formatter_data) pfree(eid->ext_formatter_data);
if (eid->ext_formatter_name) pfree(eid->ext_formatter_name);
if (eid->ext_pstate != NULL && eid->ext_pstate->rowcontext != NULL) {
/*
* delete the row context
*/
MemoryContextDelete(eid->ext_pstate->rowcontext);
eid->ext_pstate->rowcontext = NULL;
}
pfree(eid);
PG_RETURN_VOID();
}
/*
* void
* magma_transaction(PlugStorageTransaction transaction)
*/
Datum magma_transaction(PG_FUNCTION_ARGS) {
checkOushuDbExtensiveFunctionSupport(__func__);
elog(DEBUG3, "magma_transaction begin");
PlugStorage ps = (PlugStorage)(fcinfo->context);
PlugStorageTransaction pst = ps->ps_transaction;
PlugStorageTransactionCommand txn_command = pst->pst_transaction_command;
MagmaClientC *client = create_magma_client_instance();
if (client == NULL) {
elog(ERROR, "failed to connect to magma service");
}
MagmaClientC_SetupSnapshot(client, pst->pst_transaction_snapshot);
switch (txn_command) {
case PS_TXN_CMD_START_TRANSACTION: {
pst->pst_transaction_state = MagmaClientC_StartTransaction(client);
pst->pst_transaction_snapshot = NULL;
pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
pst->pst_transaction_id = InvalidTransactionId;
pst->pst_transaction_snapshot = NULL;
elog(DEBUG1, "magma_transaction: start transaction");
magma_check_result(&client);
break;
}
case PS_TXN_CMD_COMMIT_TRANSACTION:
if (pst->pst_transaction_snapshot == NULL) {
elog(DEBUG1, "magma_transaction: commit snapshot: NULL");
} else {
elog(DEBUG1,
"magma_transaction: commit snapshot: (%llu, %u, %llu, %u)",
pst->pst_transaction_snapshot->currentTransaction.txnId,
pst->pst_transaction_snapshot->currentTransaction.txnStatus,
pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
pst->pst_transaction_snapshot->txnActions.txnActionSize);
}
MagmaClientC_CommitTransaction(client);
magma_check_result(&client);
break;
case PS_TXN_CMD_ABORT_TRANSACTION:
if (pst->pst_transaction_snapshot == NULL) {
elog(DEBUG1, "magma_transaction: abort snapshot: NULL");
} else {
elog(DEBUG1,
"magma_transaction: abort snapshot: (%llu, %u, %llu, %u)",
pst->pst_transaction_snapshot->currentTransaction.txnId,
pst->pst_transaction_snapshot->currentTransaction.txnStatus,
pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
pst->pst_transaction_snapshot->txnActions.txnActionSize);
}
if (pst->pst_transaction_status != PS_TXN_STS_DEFAULT &&
pst->pst_transaction_id != InvalidTransactionId &&
pst->pst_transaction_snapshot != NULL) {
MagmaClientC_AbortTransaction(client, PlugStorageGetIsCleanupAbort());
pst->pst_transaction_snapshot = NULL;
pst->pst_transaction_id = InvalidTransactionId;
pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
magma_check_result(&client);
}
break;
case PS_TXN_CMD_GET_SNAPSHOT: {
MagmaClientC_CleanupTableInfo(client);
int i = 0;
ListCell *lc;
foreach (lc, ps->magma_talbe_full_names) {
MagmaTableFullName* mtfn = lfirst(lc);
MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
mtfn->tableName, 0);
++i;
}
pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
if (pst->pst_transaction_snapshot == NULL) {
pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
pst->pst_transaction_id = InvalidTransactionId;
pst->pst_transaction_snapshot = NULL;
elog(DEBUG1, "magma_transaction: get snapshot: NULL");
} else {
elog(DEBUG1, "magma_transaction: get snapshot: (%llu, %u, %llu, %u)",
pst->pst_transaction_snapshot->currentTransaction.txnId,
pst->pst_transaction_snapshot->currentTransaction.txnStatus,
pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
pst->pst_transaction_snapshot->txnActions.txnActionSize);
}
magma_check_result(&client);
break;
}
case PS_TXN_CMD_GET_TRANSACTIONID: {
MagmaClientC_CleanupTableInfo(client);
int i = 0;
ListCell *lc;
foreach (lc, ps->magma_talbe_full_names) {
MagmaTableFullName* mtfn = lfirst(lc);
MagmaClientC_AddTableInfo(client, mtfn->databaseName, mtfn->schemaName,
mtfn->tableName, 0);
++i;
}
pst->pst_transaction_state = MagmaClientC_GetTransctionId(client);
pst->pst_transaction_snapshot = MagmaClientC_GetSnapshot(client);
if (pst->pst_transaction_snapshot == NULL) {
pst->pst_transaction_status = PS_TXN_STS_DEFAULT;
pst->pst_transaction_id = InvalidTransactionId;
pst->pst_transaction_snapshot = NULL;
elog(DEBUG1, "magma_transaction: get transaction state: NULL");
} else {
elog(DEBUG1, "magma_transaction: get transaction state: (%llu, %u, %llu, %u)",
pst->pst_transaction_snapshot->currentTransaction.txnId,
pst->pst_transaction_snapshot->currentTransaction.txnStatus,
pst->pst_transaction_snapshot->txnActions.txnActionStartOffset,
pst->pst_transaction_snapshot->txnActions.txnActionSize);
}
magma_check_result(&client);
break;
}
default:
elog(ERROR, "Transaction command for magma is invalid %d", txn_command);
break;
}
PG_RETURN_VOID();
}
static void get_magma_category_info(char *fmtoptstr, bool *isexternal) {
// do nothing now.
char *fmt_name = NULL;
List *l = magma_parse_format_string(fmtoptstr, &fmt_name);
ListCell *opt;
foreach (opt, l) {
DefElem *defel = (DefElem *)lfirst(opt);
char *key = defel->defname;
bool need_free_value = false;
char *val = (char *)defGetString(defel, &need_free_value);
/* check category */
if (strncasecmp(key, "category", strlen("category")) == 0) {
if (strncasecmp(val, "internal", strlen("internal")) == 0) {
isexternal = false;
}
if (strncasecmp(val, "external", strlen("external")) == 0) {
isexternal = true;
}
}
}
}
static FmgrInfo *get_magma_function(char *formatter_name, char *function_name) {
Assert(formatter_name);
Assert(function_name);
Oid procOid = InvalidOid;
FmgrInfo *procInfo = NULL;
procOid = LookupPlugStorageValidatorFunc(formatter_name, function_name);
if (OidIsValid(procOid)) {
procInfo = (FmgrInfo *)palloc0(sizeof(FmgrInfo));
fmgr_info(procOid, procInfo);
} else {
elog(ERROR, "%s_%s function was not found for pluggable storage",
formatter_name, function_name);
}
return procInfo;
}
static void get_magma_scan_functions(char *formatter_name,
FileScanDesc file_scan_desc) {
file_scan_desc->fs_ps_scan_funcs.beginscan =
get_magma_function(formatter_name, "beginscan");
file_scan_desc->fs_ps_scan_funcs.getnext_init =
get_magma_function(formatter_name, "getnext_init");
file_scan_desc->fs_ps_scan_funcs.getnext =
get_magma_function(formatter_name, "getnext");
file_scan_desc->fs_ps_scan_funcs.rescan =
get_magma_function(formatter_name, "rescan");
file_scan_desc->fs_ps_scan_funcs.endscan =
get_magma_function(formatter_name, "endscan");
file_scan_desc->fs_ps_scan_funcs.stopscan =
get_magma_function(formatter_name, "stopscan");
}
static void get_magma_insert_functions(char *formatter_name,
ExternalInsertDesc ext_insert_desc) {
ext_insert_desc->ext_ps_insert_funcs.insert_init =
get_magma_function(formatter_name, "insert_init");
ext_insert_desc->ext_ps_insert_funcs.insert =
get_magma_function(formatter_name, "insert");
ext_insert_desc->ext_ps_insert_funcs.insert_finish =
get_magma_function(formatter_name, "insert_finish");
}
static void get_magma_delete_functions(char *formatter_name,
ExternalInsertDesc ext_delete_desc) {
ext_delete_desc->ext_ps_delete_funcs.begindeletes =
get_magma_function(formatter_name, "begindelete");
ext_delete_desc->ext_ps_delete_funcs.deletes =
get_magma_function(formatter_name, "delete");
ext_delete_desc->ext_ps_delete_funcs.enddeletes =
get_magma_function(formatter_name, "enddelete");
}
static void get_magma_update_functions(char *formatter_name,
ExternalInsertDesc ext_update_desc) {
ext_update_desc->ext_ps_update_funcs.beginupdates =
get_magma_function(formatter_name, "beginupdate");
ext_update_desc->ext_ps_update_funcs.updates =
get_magma_function(formatter_name, "update");
ext_update_desc->ext_ps_update_funcs.endupdates =
get_magma_function(formatter_name, "endupdate");
}
static void build_options_in_json(char *serializeSchema, int serializeSchemaLen,
List *fmt_opts_defelem, int encoding, int rangeNum,
char *formatterName, char **json_str) {
struct json_object *opt_json_object = json_object_new_object();
/* add format options for the formatter */
char *key_str = NULL;
char *val_str = NULL;
// const char *whitespace = " \t\n\r";
int nargs = list_length(fmt_opts_defelem);
for (int i = 0; i < nargs; ++i) {
key_str = ((DefElem *)(list_nth(fmt_opts_defelem, i)))->defname;
val_str =
((Value *)((DefElem *)(list_nth(fmt_opts_defelem, i)))->arg)->val.str;
json_object_object_add(opt_json_object, key_str,
json_object_new_string(val_str));
}
/* add encoding option for orc */
if (json_object_object_get(opt_json_object, "encoding") == NULL) {
const char *encodingStr = pg_encoding_to_char(encoding);
char *encodingStrLower = str_tolower(encodingStr, strlen(encodingStr));
json_object_object_add(opt_json_object, "encoding",
json_object_new_string(encodingStrLower));
if (encodingStrLower) pfree(encodingStrLower);
}
/* add magma_range_num option for magma */
if (json_object_object_get(opt_json_object, "magma_range_num") == NULL) {
json_object_object_add(opt_json_object, "magma_range_num",
json_object_new_int64(rangeNum));
}
/* add magma_serialized_schema option for magma */
if (json_object_object_get(opt_json_object, "serialized_schema") == NULL) {
json_object_object_add(
opt_json_object, "serialized_schema",
json_object_new_string_len(serializeSchema, serializeSchemaLen));
}
/* add magma_format_type option for magma */
if (json_object_object_get(opt_json_object, "magma_format_type") == NULL) {
char *magma_type = NULL;
if (formatterName != NULL &&
(strncasecmp(formatterName, "magmatp", strlen("magmatp")) == 0)) {
magma_type = "0";
} else if (strncasecmp(formatterName, "magmaap", strlen("magmaap")) ==
0) {
magma_type = "1";
}
json_object_object_add(opt_json_object, "magma_format_type",
json_object_new_string(magma_type));
}
*json_str = NULL;
if (opt_json_object != NULL) {
const char *str = json_object_to_json_string(opt_json_object);
*json_str = (char *)palloc0(strlen(str) + 1);
strcpy(*json_str, str);
json_object_put(opt_json_object);
elog(DEBUG3, "formatter options are %s", *json_str);
}
}
static MagmaFormatC *create_magma_formatter_instance(List *fmt_opts_defelem,
char *serializeSchema,
int serializeSchemaLen,
int fmt_encoding,
char *formatterName,
int rangeNum) {
char *fmt_opts_str = NULL;
build_options_in_json(serializeSchema, serializeSchemaLen, fmt_opts_defelem,
fmt_encoding, rangeNum, formatterName, &fmt_opts_str);
MagmaFormatC *magma_format_c = MagmaFormatNewMagmaFormatC(fmt_opts_str);
if (fmt_opts_str != NULL) {
pfree(fmt_opts_str);
}
return magma_format_c;
}
static MagmaClientC *create_magma_client_instance() {
if (magma_client_instance != NULL) {
MagmaClientC_ResetMagmaClient4Reuse(&magma_client_instance);
return magma_client_instance;
}
magma_client_instance = MagmaClientC_NewMagmaClient(magma_nodes_url);
MagmaResult *result = MagmaClientC_GetResult(magma_client_instance);
if (result->level == MAGMA_ERROR) {
MagmaClientC_FreeMagmaClient(&magma_client_instance);
elog(ERROR, "%s", result->message);
}
return magma_client_instance;
}
static void init_magma_format_user_data_for_read(
TupleDesc tup_desc, MagmaFormatUserData *user_data) {
user_data->numberOfColumns = tup_desc->natts;
user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns);
user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns);
user_data->colDatatypeMods = palloc0(
sizeof(int64_t) * user_data->numberOfColumns);
user_data->colValues = palloc0(sizeof(Datum) * user_data->numberOfColumns);
user_data->colRawValues = palloc0(
sizeof(char *) * user_data->numberOfColumns);
user_data->colValLength = palloc0(
sizeof(uint64_t) * user_data->numberOfColumns);
user_data->colIsNulls = palloc0(sizeof(bool) * user_data->numberOfColumns);
for (int i = 0; i < user_data->numberOfColumns; i++) {
Form_pg_attribute attr = tup_desc->attrs[i];
user_data->colNames[i] = pstrdup(attr->attname.data);
user_data->colDatatypes[i] = map_hawq_type_to_magma_type(attr->atttypid, user_data->isMagmatp);
user_data->colDatatypeMods[i] = attr->atttypmod;
user_data->colRawTid = NULL;
user_data->colValues[i] = NULL;
user_data->colRawValues[i] = NULL;
user_data->colValLength[i] = 0;
user_data->colIsNulls[i] = false;
}
}
static void init_magma_format_user_data_for_write(
TupleDesc tup_desc, MagmaFormatUserData *user_data, Relation relation) {
user_data->numberOfColumns = tup_desc->natts;
user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns);
user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns);
user_data->colRawValues =
palloc0(sizeof(char *) * user_data->numberOfColumns);
user_data->colValLength =
palloc0(sizeof(uint64_t) * user_data->numberOfColumns);
user_data->colDatatypeMods =
palloc0(sizeof(int64_t) * user_data->numberOfColumns);
user_data->colIsNulls = palloc0(sizeof(bool) * user_data->numberOfColumns);
user_data->colTimestamp =
palloc0(sizeof(TimestampType) * user_data->numberOfColumns);
for (int i = 0; i < user_data->numberOfColumns; ++i) {
Form_pg_attribute attr = tup_desc->attrs[i];
user_data->colNames[i] = pstrdup(attr->attname.data);
user_data->colDatatypes[i] = map_hawq_type_to_magma_type(attr->atttypid, user_data->isMagmatp);
user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
user_data->colIsNulls[i] = !(relation->rd_att->attrs[i]->attnotnull);
}
}
static void build_magma_tuple_descrition_for_read(
Plan *plan, Relation relation, MagmaFormatUserData *user_data, bool skipTid) {
user_data->colToReads = palloc0(sizeof(bool) * user_data->numberOfColumns);
for (int i = 0; i < user_data->numberOfColumns; ++i)
{
user_data->colToReads[i] = plan ? false : true;
/* 64 is the name type length */
user_data->colNames[i] = palloc(sizeof(char) * 64);
strcpy(user_data->colNames[i],
relation->rd_att->attrs[i]->attname.data);
int data_type = (int) (relation->rd_att->attrs[i]->atttypid);
user_data->colDatatypes[i] = map_hawq_type_to_common_plan(data_type);
user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
}
if (plan)
{
/* calculate columns to read for seqscan */
GetNeededColumnsForScan((Node *) plan->targetlist,
user_data->colToReads, user_data->numberOfColumns);
GetNeededColumnsForScan((Node *) plan->qual, user_data->colToReads,
user_data->numberOfColumns);
// if (skipTid) {
// int32_t i = 0;
// for (; i < user_data->numberOfColumns; ++i) {
// if (user_data->colToReads[i]) break;
// }
// if (i == user_data->numberOfColumns) user_data->colToReads[0] = true;
// }
}
}
static void magma_scan_error_callback(void *arg) {
CopyState cstate = (CopyState)arg;
errcontext("External table %s", cstate->cur_relname);
}
static List *magma_parse_format_string(char *fmtstr, char **fmtname) {
char *token;
const char *whitespace = " \t\n\r";
char nonstd_backslash = 0;
int encoding = GetDatabaseEncoding();
token =
magma_strtokx2(fmtstr, whitespace, NULL, NULL, 0, false, true, encoding);
/* parse user custom options. take it as is. no validation needed */
List *l = NIL;
bool formatter_found = false;
if (token) {
char *key = token;
char *val = NULL;
StringInfoData key_modified;
initStringInfo(&key_modified);
while (key) {
/* MPP-14467 - replace meta chars back to original */
resetStringInfo(&key_modified);
appendStringInfoString(&key_modified, key);
replaceStringInfoString(&key_modified, "<gpx20>", " ");
val = magma_strtokx2(NULL, whitespace, NULL, "'", nonstd_backslash, true,
true, encoding);
if (val) {
if (pg_strcasecmp(key, "formatter") == 0) {
*fmtname = pstrdup(val);
formatter_found = true;
} else
l = lappend(l, makeDefElem(pstrdup(key_modified.data),
(Node *)makeString(pstrdup(val))));
} else
goto error;
key = magma_strtokx2(NULL, whitespace, NULL, NULL, 0, false, false,
encoding);
}
}
if (!formatter_found) {
/*
* If there is no formatter option specified, use format name. So
* we don't report error here.
*/
}
return l;
error:
if (token)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("external table internal parse error at \"%s\"", token)));
else
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("external table internal parse error at end of "
"line")));
return NIL;
}
static char *magma_strtokx2(const char *s, const char *whitespace,
const char *delim, const char *quote, char escape,
bool e_strings, bool del_quotes, int encoding) {
static char *storage = NULL; /* store the local copy of the users string
* here */
static char *string = NULL; /* pointer into storage where to continue on
* next call */
/* variously abused variables: */
unsigned int offset;
char *start;
char *p;
if (s) {
// pfree(storage);
/*
* We may need extra space to insert delimiter nulls for adjacent
* tokens. 2X the space is a gross overestimate, but it's unlikely
* that this code will be used on huge strings anyway.
*/
storage = palloc0(2 * strlen(s) + 1);
strcpy(storage, s);
string = storage;
}
if (!storage) return NULL;
/* skip leading whitespace */
offset = strspn(string, whitespace);
start = &string[offset];
/* end of string reached? */
if (*start == '\0') {
/* technically we don't need to free here, but we're nice */
pfree(storage);
storage = NULL;
string = NULL;
return NULL;
}
/* test if delimiter character */
if (delim && strchr(delim, *start)) {
/*
* If not at end of string, we need to insert a null to terminate the
* returned token. We can just overwrite the next character if it
* happens to be in the whitespace set ... otherwise move over the
* rest of the string to make room. (This is why we allocated extra
* space above).
*/
p = start + 1;
if (*p != '\0') {
if (!strchr(whitespace, *p)) memmove(p + 1, p, strlen(p) + 1);
*p = '\0';
string = p + 1;
} else {
/* at end of string, so no extra work */
string = p;
}
return start;
}
/* check for E string */
p = start;
if (e_strings && (*p == 'E' || *p == 'e') && p[1] == '\'') {
quote = "'";
escape = '\\'; /* if std strings before, not any more */
p++;
}
/* test if quoting character */
if (quote && strchr(quote, *p)) {
/* okay, we have a quoted token, now scan for the closer */
char thisquote = *p++;
/* MPP-6698 START
* unfortunately, it is possible for an external table format
* string to be represented in the catalog in a way which is
* problematic to parse: when using a single quote as a QUOTE
* or ESCAPE character the format string will show [quote '''].
* since we do not want to change how this is stored at this point
* (as it will affect previous versions of the software already
* in production) the following code block will detect this scenario
* where 3 quote characters follow each other, with no forth one.
* in that case, we will skip the second one (the first is skipped
* just above) and the last trailing quote will be skipped below.
* the result will be the actual token (''') and after stripping
* it due to del_quotes we'll end up with (').
* very ugly, but will do the job...
*/
char qt = quote[0];
if (strlen(p) >= 3 && p[0] == qt && p[1] == qt && p[2] != qt) p++;
/* MPP-6698 END */
for (; *p; p += pg_encoding_mblen(encoding, p)) {
if (*p == escape && p[1] != '\0')
p++; /* process escaped anything */
else if (*p == thisquote && p[1] == thisquote)
p++; /* process doubled quote */
else if (*p == thisquote) {
p++; /* skip trailing quote */
break;
}
}
/*
* If not at end of string, we need to insert a null to terminate the
* returned token. See notes above.
*/
if (*p != '\0') {
if (!strchr(whitespace, *p)) memmove(p + 1, p, strlen(p) + 1);
*p = '\0';
string = p + 1;
} else {
/* at end of string, so no extra work */
string = p;
}
/* Clean up the token if caller wants that */
if (del_quotes) magma_strip_quotes(start, thisquote, escape, encoding);
return start;
}
/*
* Otherwise no quoting character. Scan till next whitespace, delimiter
* or quote. NB: at this point, *start is known not to be '\0',
* whitespace, delim, or quote, so we will consume at least one character.
*/
offset = strcspn(start, whitespace);
if (delim) {
unsigned int offset2 = strcspn(start, delim);
if (offset > offset2) offset = offset2;
}
if (quote) {
unsigned int offset2 = strcspn(start, quote);
if (offset > offset2) offset = offset2;
}
p = start + offset;
/*
* If not at end of string, we need to insert a null to terminate the
* returned token. See notes above.
*/
if (*p != '\0') {
if (!strchr(whitespace, *p)) memmove(p + 1, p, strlen(p) + 1);
*p = '\0';
string = p + 1;
} else {
/* at end of string, so no extra work */
string = p;
}
return start;
}
static void magma_strip_quotes(char *source, char quote, char escape,
int encoding) {
char *src;
char *dst;
Assert(source);
Assert(quote);
src = dst = source;
if (*src && *src == quote) src++; /* skip leading quote */
while (*src) {
char c = *src;
int i;
if (c == quote && src[1] == '\0')
break; /* skip trailing quote */
else if (c == quote && src[1] == quote)
src++; /* process doubled quote */
else if (c == escape && src[1] != '\0')
src++; /* process escaped character */
i = pg_encoding_mblen(encoding, src);
while (i--) *dst++ = *src++;
}
*dst = '\0';
}
static void magma_check_result(MagmaClientC **client) {
Assert(client != NULL && *client != NULL);
MagmaResult *result = MagmaClientC_GetResult(*client);
Assert(result != NULL);
switch (result->level) {
case 0: // DEBUG
elog(DEBUG3, "%s", result->message);
break;
case 1: // LOG
elog(LOG, "%s", result->message);
break;
case 2: // INFO
elog(INFO, "%s", result->message);
break;
case 3: // NOTICE
elog(NOTICE, "%s", result->message);
break;
case 4: // WARNING
elog(WARNING, "%s", result->message);
break;
case 5: // ERROR
elog(ERROR, "%s", result->message);
break;
default:
elog(ERROR, "invalid error level %d", result->level);
break;
}
}
bool checkUnsupportedDataTypeMagma(int32_t hawqTypeID) {
switch (hawqTypeID) {
case HAWQ_TYPE_BOOL:
case HAWQ_TYPE_INT2:
case HAWQ_TYPE_INT4:
case HAWQ_TYPE_INT8:
case HAWQ_TYPE_FLOAT4:
case HAWQ_TYPE_FLOAT8:
case HAWQ_TYPE_CHAR:
case HAWQ_TYPE_TEXT:
case HAWQ_TYPE_BYTE:
case HAWQ_TYPE_BPCHAR:
case HAWQ_TYPE_VARCHAR:
case HAWQ_TYPE_DATE:
case HAWQ_TYPE_TIME:
case HAWQ_TYPE_TIMESTAMP:
case HAWQ_TYPE_INT2_ARRAY:
case HAWQ_TYPE_INT4_ARRAY:
case HAWQ_TYPE_INT8_ARRAY:
case HAWQ_TYPE_FLOAT4_ARRAY:
case HAWQ_TYPE_FLOAT8_ARRAY:
case HAWQ_TYPE_NUMERIC:
case HAWQ_TYPE_JSON:
case HAWQ_TYPE_JSONB:
return false;
default:
return true;
}
}
/*
static int rangeCmp(const void *p1, const void *p2) {
MagmaRange *r1 = (MagmaRange *)p1;
MagmaRange *r2 = (MagmaRange *)p2;
// replicaGroupid frist
if (r1->replicaGroups[0].id < r2->replicaGroups[0].id) {
return -1;
}
if (r1->replicaGroups[0].id > r2->replicaGroups[0].id) {
return 1;
}
// inner Group second by the first three bits in rangeid
if (r1->groupId < r2->groupId) {
return -1;
}
if (r1->groupId > r2->groupId) {
return 1;
}
return 0;
}
*/
int32_t map_hawq_type_to_magma_type(int32_t hawqTypeID, bool isMagmatp) {
switch (hawqTypeID) {
case HAWQ_TYPE_BOOL:
return BOOLEANID;
case HAWQ_TYPE_CHAR:
return TINYINTID;
case HAWQ_TYPE_INT2:
return SMALLINTID;
case HAWQ_TYPE_INT4:
return INTID;
case HAWQ_TYPE_INT8:
case HAWQ_TYPE_TID:
return BIGINTID;
case HAWQ_TYPE_FLOAT4:
return FLOATID;
case HAWQ_TYPE_FLOAT8:
return DOUBLEID;
case HAWQ_TYPE_NUMERIC:
return DECIMALNEWID;
case HAWQ_TYPE_DATE:
return DATEID;
case HAWQ_TYPE_BPCHAR:
return CHARID;
case HAWQ_TYPE_VARCHAR:
return VARCHARID;
case HAWQ_TYPE_NAME:
case HAWQ_TYPE_TEXT:
return STRINGID;
case HAWQ_TYPE_JSON:
return JSONID;
case HAWQ_TYPE_JSONB:
return JSONBID;
case HAWQ_TYPE_TIME:
return TIMEID;
case HAWQ_TYPE_TIMESTAMPTZ:
case HAWQ_TYPE_TIMESTAMP:
case HAWQ_TYPE_TIMETZ:
return TIMESTAMPID;
case HAWQ_TYPE_INTERVAL:
return INTERVALID;
case HAWQ_TYPE_MONEY:
case HAWQ_TYPE_BIT:
case HAWQ_TYPE_VARBIT:
case HAWQ_TYPE_BYTE:
case HAWQ_TYPE_XML:
case HAWQ_TYPE_MACADDR:
case HAWQ_TYPE_INET:
case HAWQ_TYPE_CIDR:
return BINARYID;
case HAWQ_TYPE_INT2_ARRAY:
return SMALLINTARRAYID;
case HAWQ_TYPE_INT4_ARRAY:
return INTARRAYID;
case HAWQ_TYPE_INT8_ARRAY:
return BIGINTARRAYID;
case HAWQ_TYPE_FLOAT4_ARRAY:
return FLOATARRAYID;
case HAWQ_TYPE_FLOAT8_ARRAY:
return DOUBLEARRAYID;
case HAWQ_TYPE_TEXT_ARRAY:
return STRINGARRAYID;
case HAWQ_TYPE_BPCHAR_ARRAY:
return BPCHARARRAYID;
case HAWQ_TYPE_POINT:
case HAWQ_TYPE_LSEG:
case HAWQ_TYPE_PATH:
case HAWQ_TYPE_BOX:
case HAWQ_TYPE_POLYGON:
case HAWQ_TYPE_CIRCLE:
default:
return type_is_rowtype(hawqTypeID)
? (STRUCTEXID)
: (type_is_basetype(hawqTypeID) ? IOBASETYPEID
: INVALIDTYPEID);
}
}