blob: 36018cf633237c6a0ebd35f1f5e7dca116588e96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <json-c/json.h>
#include "c.h"
#include "port.h"
#include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "nodes/pg_list.h"
#include "utils/hawq_type_mapping.h"
#include "utils/memutils.h"
#include "utils/relcache.h"
#include "utils/uri.h"
#include "utils/formatting.h"
#include "utils/lsyscache.h"
#include "utils/datetime.h"
#include "mb/pg_wchar.h"
#include "commands/defrem.h"
#include "commands/copy.h"
#include "access/tupdesc.h"
#include "access/filesplit.h"
#include "access/fileam.h"
#include "access/plugstorage.h"
#include "cdb/cdbvars.h"
#include "catalog/pg_exttable.h"
#include "catalog/namespace.h"
#include "postmaster/identity.h"
#include "nodes/makefuncs.h"
#include "nodes/plannodes.h"
#include "utils/uri.h"
#include "cdb/cdbfilesystemcredential.h"
#include "storage/cwrapper/orc-format-c.h"
#include "storage/cwrapper/hdfs-file-system-c.h"
#include "cdb/cdbvars.h"
#define ORC_TIMESTAMP_EPOCH_JDATE 2457024 /* == date2j(2015, 1, 1) */
#define MAX_ORC_ARRAY_DIMS 10000
#define ORC_NUMERIC_MAX_PRECISION 38
/* Do the module magic dance */
PG_MODULE_MAGIC
;
/* Validators for pluggable storage format ORC */
PG_FUNCTION_INFO_V1(orc_validate_interfaces);
PG_FUNCTION_INFO_V1(orc_validate_options);
PG_FUNCTION_INFO_V1(orc_validate_encodings);
PG_FUNCTION_INFO_V1(orc_validate_datatypes);
/* Accessors for pluggable storage format ORC */
PG_FUNCTION_INFO_V1(orc_beginscan);
PG_FUNCTION_INFO_V1(orc_getnext_init);
PG_FUNCTION_INFO_V1(orc_getnext);
PG_FUNCTION_INFO_V1(orc_rescan);
PG_FUNCTION_INFO_V1(orc_endscan);
PG_FUNCTION_INFO_V1(orc_stopscan);
PG_FUNCTION_INFO_V1(orc_insert_init);
PG_FUNCTION_INFO_V1(orc_insert);
PG_FUNCTION_INFO_V1(orc_insert_finish);
/* Definitions of validators for pluggable storage format ORC */
Datum orc_validate_interfaces(PG_FUNCTION_ARGS);
Datum orc_validate_options(PG_FUNCTION_ARGS);
Datum orc_validate_encodings(PG_FUNCTION_ARGS);
Datum orc_validate_datatypes(PG_FUNCTION_ARGS);
/* Definitions of accessors for pluggable storage format ORC */
Datum orc_beginscan(PG_FUNCTION_ARGS);
Datum orc_getnext_init(PG_FUNCTION_ARGS);
Datum orc_getnext(PG_FUNCTION_ARGS);
Datum orc_rescan(PG_FUNCTION_ARGS);
Datum orc_endscan(PG_FUNCTION_ARGS);
Datum orc_stopscan(PG_FUNCTION_ARGS);
Datum orc_insert_init(PG_FUNCTION_ARGS);
Datum orc_insert(PG_FUNCTION_ARGS);
Datum orc_insert_finish(PG_FUNCTION_ARGS);
typedef struct
{
int64_t second;
int64_t nanosecond;
} TimestampType;
typedef struct ORCFormatUserData
{
ORCFormatC *fmt;
char **colNames;
int *colDatatypes;
int64_t *colDatatypeMods;
int32_t numberOfColumns;
char **colRawValues;
Datum *colValues;
uint64_t *colValLength;
bits8 **colValNullBitmap;
int **colValDims;
char **colAddresses;
bool *colToReads;
int nSplits;
ORCFormatFileSplit *splits;
// for write only
TimestampType *colTimestamp;
} ORCFormatUserData;
static FmgrInfo *get_orc_function(char *formatter_name, char *function_name);
static void get_scan_functions(FileScanDesc file_scan_desc);
static void get_insert_functions(ExternalInsertDesc ext_insert_desc);
static void init_format_user_data_for_read(TupleDesc tup_desc,
ORCFormatUserData *user_data);
static void init_format_user_data_for_write(TupleDesc tup_desc,
ORCFormatUserData *user_data);
static void build_options_in_json(List *fmt_opts_defelem, int encoding,
char **json_str, TupleDesc tupDesc);
static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem,
int encoding, int segno, TupleDesc tupDesc);
static void build_file_splits(Uri *uri, ScanState *scan_state,
ORCFormatUserData *user_data);
static void build_tuple_descrition_for_read(Plan *plan, Relation relation,
ORCFormatUserData *user_data);
static void build_tuple_descrition_for_write(Relation relation,
ORCFormatUserData *user_data);
static void orc_scan_error_callback(void *arg);
static void orc_parse_format_string(CopyState pstate, char *fmtstr);
static char *orc_strtokx2(const char *s, const char *whitespace,
const char *delim, const char *quote, char escape, bool e_strings,
bool del_quotes, int encoding);
static void orc_strip_quotes(char *source, char quote, char escape,
int encoding);
/* Implementation of validators for pluggable storage format ORC */
/*
* void
* orc_validate_interfaces(char *formatName)
*/
Datum orc_validate_interfaces(PG_FUNCTION_ARGS)
{
PlugStorageValidator psv_interface =
(PlugStorageValidator) (fcinfo->context);
if (pg_strncasecmp(psv_interface->format_name, "orc", strlen("orc")) != 0)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("ORC: incorrect format name \'%s\'", psv_interface->format_name)));
}
PG_RETURN_VOID() ;
}
/*
* void
* orc_validate_options(List *formatOptions,
* char *formatStr,
* bool isWritable)
*/
Datum orc_validate_options(PG_FUNCTION_ARGS)
{
PlugStorageValidator psv = (PlugStorageValidator) (fcinfo->context);
List *format_opts = psv->format_opts;
char *format_str = psv->format_str;
bool is_writable = psv->is_writable;
TupleDesc tup_desc = psv->tuple_desc;
char *formatter = NULL;
char *compresstype = NULL;
char *bloomfilter = NULL;
char *dicthreshold = NULL;
char *bucketnum = NULL;
char *category = NULL;
ListCell *opt;
const int maxlen = 8 * 1024 - 1;
int len = 0;
foreach(opt, format_opts)
{
DefElem *defel = (DefElem *) lfirst(opt);
char *key = defel->defname;
bool need_free_value = false;
char *val = (char *) defGetString(defel, &need_free_value);
/* check formatter */
if (strncasecmp(key, "formatter", strlen("formatter")) == 0)
{
char *formatter_values[] =
{ "orc" };
checkPlugStorageFormatOption(&formatter, key, val,
true, 1, formatter_values);
}
/* check option for orc format */
if (strncasecmp(key, "compresstype", strlen("compresstype")) == 0)
{
char *compresstype_values[] =
{ "none", "snappy", "lz4" };
checkPlugStorageFormatOption(&compresstype, key, val, is_writable,
3, compresstype_values);
}
if (strncasecmp(key, "bloomfilter", strlen("bloomfilter")) == 0)
{
int attnum = tup_desc->natts;
char **attribute_names = palloc0(attnum * sizeof(char*));
for (int i = 0; i < attnum; ++i) {
int name_len = strlen(((Form_pg_attribute) (tup_desc->attrs[i]))->attname.data);
char *attribute = palloc0(name_len + 1);
strncpy(attribute, ((Form_pg_attribute) (tup_desc->attrs[i]))->attname.data, name_len);
attribute_names[i] = attribute;
}
char *dup_val = pstrdup(val);
char *token = strtok(dup_val, ",");
while (token) {
checkPlugStorageFormatOption(&bloomfilter, key, token, true, attnum, attribute_names);
bloomfilter = NULL;
token = strtok(NULL, ",");
}
}
if (strncasecmp(key, "dicthreshold", strlen("dicthreshold")) == 0)
{
checkPlugStorageFormatOption(&dicthreshold, key, val,
true, 0, NULL);
char *end;
double threshold = strtod(val, &end);
if (end == val || *end != '\0' || threshold < 0 || threshold > 1)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("dicthreshold \"%s\" must be within [0-1]", val), errOmitLocation(true)));
}
}
if (strncasecmp(key, "bucketnum", strlen("bucketnum")) == 0)
{
checkPlugStorageFormatOption(&bucketnum, key, val,
true, 0, NULL);
char *end;
long bucketnumber = strtol(val, &end, 10);
if (end == val || *end != '\0' || bucketnumber <= 0)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("bucketnum \"%s\" must be > 0", val), errOmitLocation(true)));
}
}
/* check category orc format */
if (strncasecmp(key, "category", strlen("category")) == 0)
{
char *category_values[] =
{ "internal", "external" };
checkPlugStorageFormatOption(&category, key, val,
true, 2, category_values);
}
if (strncasecmp(key, "formatter", strlen("formatter"))
&& strncasecmp(key, "compresstype", strlen("compresstype"))
&& strncasecmp(key, "bloomfilter", strlen("bloomfilter"))
&& strncasecmp(key, "dicthreshold", strlen("dicthreshold"))
&& strncasecmp(key, "bucketnum", strlen("bucketnum"))
&& strncasecmp(key, "category", strlen("category")))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("Option \"%s\" for ORC table is invalid", key), errOmitLocation(true)));
}
sprintf((char * ) format_str + len, "%s '%s' ", key, val);
len += strlen(key) + strlen(val) + 4;
if (need_free_value)
{
pfree(val);
val = NULL;
}
AssertImply(need_free_value, NULL == val);
if (len > maxlen)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("format options must be less than %d bytes in size", maxlen), errOmitLocation(true)));
}
}
if (!formatter)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("no formatter function specified"), errOmitLocation(true)));
}
PG_RETURN_VOID() ;
}
/*
* void
* orc_validate_encodings(char *encodingName)
*/
Datum orc_validate_encodings(PG_FUNCTION_ARGS)
{
PlugStorageValidator psv = (PlugStorageValidator) (fcinfo->context);
char *encoding_name = psv->encoding_name;
if (strncasecmp(encoding_name, "utf8", strlen("utf8")))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("\"%s\" is not a valid encoding for ORC external table. ", encoding_name), errOmitLocation(true)));
}
PG_RETURN_VOID() ;
}
/*
* void
* orc_validate_datatypes(TupleDesc tupDesc)
*/
Datum orc_validate_datatypes(PG_FUNCTION_ARGS)
{
PlugStorageValidator psv = (PlugStorageValidator) (fcinfo->context);
TupleDesc tup_desc = psv->tuple_desc;
for (int i = 0; i < tup_desc->natts; ++i)
{
int32_t datatype =
(int32_t) (((Form_pg_attribute) (tup_desc->attrs[i]))->atttypid);
int4 typmod = ((Form_pg_attribute) (tup_desc->attrs[i]))->atttypmod;
if (checkORCUnsupportedDataType(datatype))
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR), errmsg("unsupported data types %d for columns of external ORC table is specified.", datatype), errOmitLocation(true)));
}
if (HAWQ_TYPE_NUMERIC == datatype)
{
int4 tmp_typmod = typmod - VARHDRSZ;
int precision = (tmp_typmod >> 16) & 0xffff;
int scale = tmp_typmod & 0xffff;
if (precision < 1 || 38 < precision)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("ORC DECIMAL precision must be between 1 and 38")));
if (scale == 0)
ereport(NOTICE, (errmsg("Using a scale of zero for ORC DECIMAL")));
}
}
PG_RETURN_VOID() ;
}
/*
* FileScanDesc
* orc_beginscan(ExternalScan *extScan,
* ScanState *scanState,
* Relation relation,
* int formatterType,
* char *formatterName)
*/
Datum orc_beginscan(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage) (fcinfo->context);
ExternalScan *ext_scan = ps->ps_ext_scan;
ScanState *scan_state = ps->ps_scan_state;
Relation relation = ps->ps_relation;
int formatterType = ps->ps_formatter_type;
char *formatterName = ps->ps_formatter_name;
Index scan_rel_id = ext_scan->scan.scanrelid;
uint32 scan_counter = ext_scan->scancounter;
List *uri_list = ext_scan->uriList;
List *fmt_opts = ext_scan->fmtOpts;
int fmt_encoding = ext_scan->encoding;
List *scan_quals = ext_scan->scan.plan.qual;
/* 1. Increment relation reference count while scanning relation */
/*
* This is just to make really sure the relcache entry won't go away while
* the scan has a pointer to it. Caller should be holding the rel open
* anyway, so this is redundant in all normal scenarios...
*/
RelationIncrementReferenceCount(relation);
/* 2. Allocate and initialize the select descriptor */
FileScanDesc file_scan_desc = palloc(sizeof(FileScanDescData));
file_scan_desc->fs_inited = false;
file_scan_desc->fs_ctup.t_data = NULL;
ItemPointerSetInvalid(&file_scan_desc->fs_ctup.t_self);
file_scan_desc->fs_cbuf = InvalidBuffer;
file_scan_desc->fs_rd = relation;
file_scan_desc->fs_scanrelid = scan_rel_id;
file_scan_desc->fs_scancounter = scan_counter;
file_scan_desc->fs_scanquals = scan_quals;
file_scan_desc->fs_noop = false;
file_scan_desc->fs_file = NULL;
file_scan_desc->fs_formatter = NULL;
file_scan_desc->fs_formatter_type = formatterType;
file_scan_desc->fs_formatter_name = formatterName;
/* 2.1 Setup scan functions */
get_scan_functions(file_scan_desc);
/* 2.2 Get URI for the scan */
/*
* get the external URI assigned to us.
*
* The URI assigned for this segment is normally in the uriList list
* at the index of this segment id. However, if we are executing on
* MASTER ONLY the (one and only) entry which is destined for the master
* will be at the first entry of the uriList list.
*/
char *uri_str = NULL;
int segindex = GetQEIndex();
Value *v = NULL;
v = (Value *) list_nth(uri_list, 0);
if (v->type == T_Null)
uri_str = NULL;
else
uri_str = (char *) strVal(v);
/*
* If a uri is assigned to us - get a reference to it. Some executors
* don't have a uri to scan (if # of uri's < # of primary segdbs).
* in which case uri will be NULL. If that's the case for this
* segdb set to no-op.
*/
if (uri_str)
{
/* set external source (uri) */
file_scan_desc->fs_uri = uri_str;
ereport(DEBUG3, (errmsg_internal("fs_uri (%d) is set as %s", segindex, uri_str)));
/* NOTE: we delay actually opening the data source until external_getnext() */
}
else
{
/* segdb has no work to do. set to no-op */
file_scan_desc->fs_noop = true;
file_scan_desc->fs_uri = NULL;
}
/* 2.3 Allocate values and nulls structure */
TupleDesc tup_desc = RelationGetDescr(relation);
file_scan_desc->fs_tupDesc = tup_desc;
file_scan_desc->attr = tup_desc->attrs;
file_scan_desc->num_phys_attrs = tup_desc->natts;
file_scan_desc->values = (Datum *) palloc(
file_scan_desc->num_phys_attrs * sizeof(Datum));
file_scan_desc->nulls = (bool *) palloc(
file_scan_desc->num_phys_attrs * sizeof(bool));
/* 2.5 Allocate and initialize the structure which track data parsing state */
file_scan_desc->fs_pstate = (CopyStateData *) palloc0(
sizeof(CopyStateData));
/* 2.5.1 Initialize basic information */
CopyState pstate = file_scan_desc->fs_pstate;
pstate->fe_eof = false;
pstate->eol_type = EOL_UNKNOWN;
pstate->eol_str = NULL;
pstate->cur_relname = RelationGetRelationName(relation);
pstate->cur_lineno = 0;
pstate->err_loc_type = ROWNUM_ORIGINAL;
pstate->cur_attname = NULL;
pstate->raw_buf_done = true; /* true so we will read data in first run */
pstate->line_done = true;
pstate->bytesread = 0;
pstate->custom = false;
pstate->header_line = false;
pstate->fill_missing = false;
pstate->line_buf_converted = false;
pstate->raw_buf_index = 0;
pstate->processed = 0;
pstate->filename = uri_str;
pstate->copy_dest = COPY_EXTERNAL_SOURCE;
pstate->missing_bytes = 0;
pstate->csv_mode = false;
pstate->custom = true;
pstate->custom_formatter_func = NULL;
pstate->custom_formatter_name = NULL;
pstate->rel = relation;
/* 2.5.2 Setup encoding information */
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to validate
* data in multibyte encodings.
*
* Each external table specifies the encoding of its external data. We will
* therefore set a client encoding and client-to-server conversion procedure
* in here (server-to-client in WET) and these will be used in the data
* conversion routines (in copy.c CopyReadLineXXX(), etc).
*/
Insist(PG_VALID_ENCODING(fmt_encoding));
pstate->client_encoding = fmt_encoding;
Oid conversion_proc = FindDefaultConversionProc(fmt_encoding,
GetDatabaseEncoding());
if (OidIsValid(conversion_proc))
{
/* conversion proc found */
pstate->enc_conversion_proc = palloc(sizeof(FmgrInfo));
fmgr_info(conversion_proc, pstate->enc_conversion_proc);
}
else
{
/* no conversion function (both encodings are probably the same) */
pstate->enc_conversion_proc = NULL;
}
pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
pstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(
pstate->client_encoding);
/* 2.5.3 Parse the data format options */
char *format_str = pstrdup((char *) strVal(linitial(fmt_opts)));
orc_parse_format_string(pstate, format_str);
/* 2.5.4 Generate or convert list of attributes to process */
pstate->attr_offsets = (int *) palloc(tup_desc->natts * sizeof(int));
pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
/* 2.5.5 Convert FORCE NOT NULL name list to per-column flags, check validity */
pstate->force_notnull_flags = (bool *) palloc0(
tup_desc->natts * sizeof(bool));
if (pstate->force_notnull)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
pstate->force_notnull_flags[attnum - 1] = true;
}
}
/* 2.5.6 Take care of state that is RET specific */
initStringInfo(&pstate->attribute_buf);
initStringInfo(&pstate->line_buf);
/* Set up data buffer to hold a chunk of data */
MemSet(pstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char));
pstate->raw_buf[RAW_BUF_SIZE] = '\0';
/* 2.5.7 Create temporary memory context for per row process */
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype input or output routines, and should be faster than retail
* pfree's anyway.
*/
pstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
"ExtTableMemCxt",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* 2.6 Setup formatter information */
file_scan_desc->fs_formatter = (FormatterData *) palloc0(
sizeof(FormatterData));
initStringInfo(&file_scan_desc->fs_formatter->fmt_databuf);
file_scan_desc->fs_formatter->fmt_perrow_ctx =
file_scan_desc->fs_pstate->rowcontext;
file_scan_desc->fs_formatter->fmt_user_ctx = NULL;
/* 2.7 Set up callback to identify error line number */
file_scan_desc->errcontext.callback = orc_scan_error_callback;
file_scan_desc->errcontext.arg = (void *) file_scan_desc->fs_pstate;
file_scan_desc->errcontext.previous = error_context_stack;
/* 3. Setup user data */
/* 3.1 Initialize user data */
ORCFormatUserData *user_data = palloc0(sizeof(ORCFormatUserData));
init_format_user_data_for_read(tup_desc, user_data);
/* 3.2 Create formatter instance */
List *fmt_opts_defelem = pstate->custom_formatter_params;
user_data->fmt = create_formatter_instance(fmt_opts_defelem, fmt_encoding,
ps->ps_segno, tup_desc);
/* 3.3 Build file splits */
Uri *uri = ParseExternalTableUri(uri_str);
if (enable_secure_filesystem)
{
char *token = find_filesystem_credential_with_uri(uri_str);
SetToken(uri_str, token);
}
file_scan_desc->fs_ps_scan_state = scan_state; /* for orc rescan */
build_file_splits(uri, scan_state, user_data);
FreeExternalTableUri(uri);
/* 3.4 Build tuple description */
Plan *plan = &(ext_scan->scan.plan);
file_scan_desc->fs_ps_plan = plan;
build_tuple_descrition_for_read(plan, relation, user_data);
/* 3.5 Save user data */
file_scan_desc->fs_ps_user_data = (void *) user_data;
/* 4. Begin scan with the formatter */
ORCFormatBeginORCFormatC(user_data->fmt, user_data->splits,
user_data->nSplits, user_data->colToReads, user_data->colNames,
user_data->colDatatypes, user_data->colDatatypeMods,
user_data->numberOfColumns);
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
/* 5. Save file_scan_desc */
ps->ps_file_scan_desc = file_scan_desc;
PG_RETURN_POINTER(file_scan_desc);
}
/*
* ExternalSelectDesc
* orc_getnext_init(PlanState *planState,
* ExternalScanState *extScanState)
*/
Datum orc_getnext_init(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage) (fcinfo->context);
PlanState *plan_state = ps->ps_plan_state;
ExternalScanState *ext_scan_state = ps->ps_ext_scan_state;
ExternalSelectDesc ext_select_desc = NULL;
/*
ExternalSelectDesc ext_select_desc = (ExternalSelectDesc)palloc0(
sizeof(ExternalSelectDescData));
Plan *rootPlan = NULL;
if (plan_state != NULL)
{
ext_select_desc->projInfo = plan_state->ps_ProjInfo;
// If we have an agg type then our parent is an Agg node
rootPlan = plan_state->state->es_plannedstmt->planTree;
if (IsA(rootPlan, Agg) && ext_scan_state->parent_agg_type)
{
ext_select_desc->agg_type = ext_scan_state->parent_agg_type;
}
}
*/
ps->ps_ext_select_desc = ext_select_desc;
PG_RETURN_POINTER(ext_select_desc);
}
Datum orc_getnext(PG_FUNCTION_ARGS) {
PlugStorage ps = (PlugStorage)(fcinfo->context);
FileScanDesc fsd = ps->ps_file_scan_desc;
ORCFormatUserData *user_data = (ORCFormatUserData *)(fsd->fs_ps_user_data);
TupleTableSlot *slot = ps->ps_tuple_table_slot;
bool *nulls = slot_get_isnull(slot);
memset(nulls, true, user_data->numberOfColumns);
bool res = ORCFormatNextORCFormatC(user_data->fmt, user_data->colRawValues,
user_data->colValLength, nulls);
if (res) {
for (int32_t i = 0; i < user_data->numberOfColumns; ++i) {
// Column not to read or column is null
if (nulls[i]) continue;
switch (fsd->attr[i]->atttypid) {
case HAWQ_TYPE_BOOL: {
user_data->colValues[i] =
BoolGetDatum(*(bool *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_INT2: {
user_data->colValues[i] =
Int16GetDatum(*(int16_t *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_INT4: {
user_data->colValues[i] =
Int32GetDatum(*(int32_t *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_INT8:
case HAWQ_TYPE_TIME:
case HAWQ_TYPE_TIMESTAMP:
case HAWQ_TYPE_TIMESTAMPTZ: {
user_data->colValues[i] =
Int64GetDatum(*(int64_t *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_FLOAT4: {
user_data->colValues[i] =
Float4GetDatum(*(float *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_FLOAT8: {
user_data->colValues[i] =
Float8GetDatum(*(double *)(user_data->colRawValues[i]));
break;
}
case HAWQ_TYPE_VARCHAR:
case HAWQ_TYPE_TEXT:
case HAWQ_TYPE_BPCHAR:
case HAWQ_TYPE_BYTE:
case HAWQ_TYPE_NUMERIC: {
SET_VARSIZE((struct varlena *)(user_data->colRawValues[i]),
user_data->colValLength[i]);
user_data->colValues[i] = PointerGetDatum(user_data->colRawValues[i]);
break;
}
case HAWQ_TYPE_DATE: {
user_data->colValues[i] =
Int32GetDatum(*(int32_t *)(user_data->colRawValues[i]) -
POSTGRES_EPOCH_JDATE + UNIX_EPOCH_JDATE);
break;
}
default: {
ereport(ERROR, (errmsg_internal("ORC:%d", fsd->attr[i]->atttypid)));
break;
}
}
}
ps->ps_has_tuple = true;
slot->PRIVATE_tts_values = user_data->colValues;
TupSetVirtualTupleNValid(slot, user_data->numberOfColumns);
PG_RETURN_BOOL(true);
}
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) {
ORCFormatEndORCFormatC(user_data->fmt);
e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
ORCFormatFreeORCFormatC(&(user_data->fmt));
pfree(user_data->colRawValues);
pfree(user_data->colValues);
pfree(user_data->colToReads);
pfree(user_data->colValLength);
if (user_data->splits != NULL) {
for (int i = 0; i < user_data->nSplits; ++i) {
pfree(user_data->splits[i].fileName);
}
pfree(user_data->splits);
}
for (int i = 0; i < user_data->numberOfColumns; ++i) {
pfree(user_data->colNames[i]);
}
pfree(user_data->colNames);
pfree(user_data->colDatatypes);
pfree(user_data);
fsd->fs_ps_user_data = NULL;
ps->ps_has_tuple = false;
slot->PRIVATE_tts_values = NULL;
ExecClearTuple(slot);
} else {
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
PG_RETURN_BOOL(false);
}
/*
* void
* orc_rescan(FileScanDesc scan)
*/
Datum orc_rescan(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage) (fcinfo->context);
FileScanDesc fsd = ps->ps_file_scan_desc;
Relation relation = fsd->fs_rd;
TupleDesc tup_desc = RelationGetDescr(relation);
ORCFormatUserData *user_data = (ORCFormatUserData *) (fsd->fs_ps_user_data);
if (user_data == NULL)
{
/* 1 Initialize user data */
user_data = palloc0(sizeof(ORCFormatUserData));
init_format_user_data_for_read(fsd->fs_tupDesc, user_data);
/* 2 Create formatter instance */
List *fmt_opts_defelem = fsd->fs_pstate->custom_formatter_params;
int fmt_encoding = fsd->fs_pstate->client_encoding;
user_data->fmt = create_formatter_instance(fmt_opts_defelem,
fmt_encoding, ps->ps_segno, tup_desc);
/* 3 Build file splits */
Uri *uri = ParseExternalTableUri(fsd->fs_uri);
build_file_splits(uri, fsd->fs_ps_scan_state, user_data);
/* 4 Build tuple description */
Plan *plan = fsd->fs_ps_plan;
build_tuple_descrition_for_read(plan, fsd->fs_rd, user_data);
/* 5 Save user data */
fsd->fs_ps_user_data = (void *) user_data;
if (enable_secure_filesystem)
{
char *token = find_filesystem_credential_with_uri(fsd->fs_uri);
SetToken(fsd->fs_uri, token);
}
FreeExternalTableUri(uri);
/* 6 Begin scan with the formatter */
ORCFormatBeginORCFormatC(user_data->fmt, user_data->splits,
user_data->nSplits, user_data->colToReads, user_data->colNames,
user_data->colDatatypes, user_data->colDatatypeMods,
user_data->numberOfColumns);
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
} else {
ORCFormatRescanORCFormatC(user_data->fmt);
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
}
/* reset some parse state variables */
fsd->fs_pstate->fe_eof = false;
fsd->fs_pstate->cur_lineno = 0;
fsd->fs_pstate->cur_attname = NULL;
fsd->fs_pstate->raw_buf_done = true; /* true so we will read data in first run */
fsd->fs_pstate->line_done = true;
fsd->fs_pstate->bytesread = 0;
PG_RETURN_VOID() ;
}
/*
* void
* orc_endscan(FileScanDesc scan)
*/
Datum orc_endscan(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage) (fcinfo->context);
FileScanDesc fsd = ps->ps_file_scan_desc;
/* Clean up scan descriptor */
char *relname = pstrdup(RelationGetRelationName(fsd->fs_rd));
if (fsd->fs_pstate != NULL)
{
/*
* decrement relation reference count and free scan descriptor storage
*/
RelationDecrementReferenceCount(fsd->fs_rd);
}
if (fsd->values)
{
pfree(fsd->values);
fsd->values = NULL;
}
if (fsd->nulls)
{
pfree(fsd->nulls);
fsd->nulls = NULL;
}
if (fsd->fs_pstate != NULL && fsd->fs_pstate->rowcontext != NULL)
{
/*
* delete the row context
*/
MemoryContextDelete(fsd->fs_pstate->rowcontext);
fsd->fs_pstate->rowcontext = NULL;
}
if (fsd->fs_formatter)
{
/* TODO: check if this space is automatically freed.
* if not, then see what about freeing the user context */
if (fsd->fs_formatter->fmt_databuf.data)
pfree(fsd->fs_formatter->fmt_databuf.data);
pfree(fsd->fs_formatter);
fsd->fs_formatter = NULL;
}
/*
* free formatter information
*/
if (fsd->fs_formatter_name)
{
pfree(fsd->fs_formatter_name);
fsd->fs_formatter_name = NULL;
}
/*
* free parse state memory
*/
if (fsd->fs_pstate != NULL)
{
if (fsd->fs_pstate->attribute_buf.data)
pfree(fsd->fs_pstate->attribute_buf.data);
if (fsd->fs_pstate->line_buf.data)
pfree(fsd->fs_pstate->line_buf.data);
if (fsd->fs_pstate->attr_offsets)
pfree(fsd->fs_pstate->attr_offsets);
if (fsd->fs_pstate->force_quote_flags)
pfree(fsd->fs_pstate->force_quote_flags);
if (fsd->fs_pstate->force_notnull_flags)
pfree(fsd->fs_pstate->force_notnull_flags);
pfree(fsd->fs_pstate);
fsd->fs_pstate = NULL;
}
/* clean up error context */
error_context_stack = fsd->errcontext.previous;
pfree(relname);
PG_RETURN_VOID() ;
}
/*
* void
* orc_stopscan(FileScanDesc scan)
*/
Datum orc_stopscan(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage)(fcinfo->context);
FileScanDesc fsd = ps->ps_file_scan_desc;
ORCFormatUserData *user_data = (ORCFormatUserData *)(fsd->fs_ps_user_data);
TupleTableSlot *tts = ps->ps_tuple_table_slot;
if (!user_data) PG_RETURN_VOID();
/* If there is no error caught, it should be an end of reading split */
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) {
ORCFormatEndORCFormatC(user_data->fmt);
e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
ereport(LOG, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
ORCFormatFreeORCFormatC(&(user_data->fmt));
pfree(user_data->colRawValues);
pfree(user_data->colValues);
if (user_data->colToReads) {
pfree(user_data->colToReads);
user_data->colToReads = NULL;
}
pfree(user_data->colValLength);
if (user_data->splits != NULL) {
for (int i = 0; i < user_data->nSplits; ++i) {
pfree(user_data->splits[i].fileName);
}
pfree(user_data->splits);
}
for (int i = 0; i < user_data->numberOfColumns; ++i) {
pfree(user_data->colNames[i]);
}
pfree(user_data->colNames);
pfree(user_data->colDatatypes);
pfree(user_data);
fsd->fs_ps_user_data = NULL;
/* form empty tuple */
ps->ps_has_tuple = false;
tts->PRIVATE_tts_values = NULL;
tts->PRIVATE_tts_isnull = NULL;
ExecClearTuple(tts);
} else {
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
PG_RETURN_VOID();
}
/*
* ExternalInsertDesc
* orc_insert_init(Relation relation,
* int formatterType,
* char *formatterName)
*/
Datum orc_insert_init(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage) (fcinfo->context);
Relation relation = ps->ps_relation;
int formatterType = ps->ps_formatter_type;
char *formatterName = ps->ps_formatter_name;
/* 1. Allocate and initialize the insert descriptor */
ExternalInsertDesc eid = palloc(sizeof(ExternalInsertDescData));
eid->ext_formatter_type = formatterType;
eid->ext_formatter_name = formatterName;
/* 1.1 Setup insert functions */
get_insert_functions(eid);
/* 1.2 Initialize basic information */
eid->ext_rel = relation;
eid->ext_noop = (Gp_role == GP_ROLE_DISPATCH);
eid->ext_formatter_data = NULL;
/* 1.3 Get URI string */
ExtTableEntry *ete = GetExtTableEntry(RelationGetRelid(relation));
Value *v;
char *uri_str;
int segindex = GetQEIndex();
int num_segs = GetQEGangNum();
int num_urls = list_length(ete->locations);
int my_url = segindex % num_urls;
if (num_urls > num_segs)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("External table has more URLs then available primary " "segments that can write into them")));
v = list_nth(ete->locations, my_url);
uri_str = pstrdup(v->val.str);
/*when it is hive protocol, transfer the url*/
Uri* tmp = ParseExternalTableUri(uri_str);
if (tmp->protocol == URI_HIVE)
{
if (tmp->hostname)
{
pfree(tmp->hostname);
}
tmp->hostname = pstrdup(ps->ps_scan_state->hivehost);
tmp->port = ps->ps_scan_state->hiveport;
if (tmp->path)
{
pfree(tmp->path);
}
tmp->path = pstrdup(ps->ps_scan_state->hivepath);
int uriLen = sizeof("hdfs") - 1 + /* *** This is a bad imp. */
sizeof("://") - 1 + strlen(tmp->hostname) + sizeof(':')
+ sizeof("65535") - 1 + strlen(tmp->path);
char* url = palloc(uriLen * sizeof(char));
sprintf(url, "%s://%s:%d%s", "hdfs", tmp->hostname, tmp->port,
tmp->path);
eid->ext_uri = url;
}
else
{
eid->ext_uri = uri_str;
}
FreeExternalTableUri(tmp);
/* 1.4 Allocate and initialize structure which track data parsing state */
eid->ext_pstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
eid->ext_tupDesc = RelationGetDescr(relation);
eid->ext_values = (Datum *) palloc(eid->ext_tupDesc->natts * sizeof(Datum));
eid->ext_nulls = (bool *) palloc(eid->ext_tupDesc->natts * sizeof(bool));
/* 1.5 Get format options */
List *fmt_opts = NIL;
fmt_opts = lappend(fmt_opts, makeString(pstrdup(ete->fmtopts)));
/* 1.6 Initialize parse state */
/* 1.6.1 Initialize basic information for pstate */
CopyState pstate = eid->ext_pstate;
pstate->fe_eof = false;
pstate->eol_type = EOL_UNKNOWN;
pstate->eol_str = NULL;
pstate->cur_relname = RelationGetRelationName(relation);
pstate->cur_lineno = 0;
pstate->err_loc_type = ROWNUM_ORIGINAL;
pstate->cur_attname = NULL;
pstate->raw_buf_done = true; /* true so we will read data in first run */
pstate->line_done = true;
pstate->bytesread = 0;
pstate->custom = false;
pstate->header_line = false;
pstate->fill_missing = false;
pstate->line_buf_converted = false;
pstate->raw_buf_index = 0;
pstate->processed = 0;
pstate->filename = uri_str;
pstate->copy_dest = COPY_EXTERNAL_SOURCE;
pstate->missing_bytes = 0;
pstate->rel = relation;
/* 1.6.2 Setup encoding information */
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to validate
* data in multibyte encodings.
*
* Each external table specifies the encoding of its external data. We will
* therefore set a client encoding and client-to-server conversion procedure
* in here (server-to-client in WET) and these will be used in the data
* conversion routines (in copy.c CopyReadLineXXX(), etc).
*/
int fmt_encoding = ete->encoding;
Insist(PG_VALID_ENCODING(fmt_encoding));
pstate->client_encoding = fmt_encoding;
Oid conversion_proc = FindDefaultConversionProc(GetDatabaseEncoding(),
fmt_encoding);
if (OidIsValid(conversion_proc))
{
/* conversion proc found */
pstate->enc_conversion_proc = palloc(sizeof(FmgrInfo));
fmgr_info(conversion_proc, pstate->enc_conversion_proc);
}
else
{
/* no conversion function (both encodings are probably the same) */
pstate->enc_conversion_proc = NULL;
}
pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
pstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(
pstate->client_encoding);
/* 1.6.3 Parse format options */
char *format_str = pstrdup((char *) strVal(linitial(fmt_opts)));
orc_parse_format_string(pstate, format_str);
/* 1.6.4 Setup tuple description */
TupleDesc tup_desc = eid->ext_tupDesc;
pstate->attr_offsets = (int *) palloc(tup_desc->natts * sizeof(int));
/* 1.6.5 Generate or convert list of attributes to process */
pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
/* 1.6.6 Convert FORCE NOT NULL name list to per-column flags, check validity */
pstate->force_notnull_flags = (bool *) palloc0(
tup_desc->natts * sizeof(bool));
if (pstate->force_notnull)
{
List *attnums;
ListCell *cur;
attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
pstate->force_notnull_flags[attnum - 1] = true;
}
}
/* 1.6.7 Take care of state that is WET specific */
Form_pg_attribute *attr = tup_desc->attrs;
ListCell *cur;
pstate->null_print_client = pstate->null_print; /* default */
pstate->fe_msgbuf = makeStringInfo(); /* use fe_msgbuf as a per-row buffer */
pstate->out_functions = (FmgrInfo *) palloc(
tup_desc->natts * sizeof(FmgrInfo));
foreach(cur, pstate->attnumlist)
/* Get info about the columns need to process */
{
int attnum = lfirst_int(cur);
Oid out_func_oid;
bool isvarlena;
getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &pstate->out_functions[attnum - 1]);
}
/*
* We need to convert null_print to client encoding, because it
* will be sent directly with CopySendString.
*/
if (pstate->need_transcoding)
{
pstate->null_print_client = pg_server_to_custom(pstate->null_print,
pstate->null_print_len, pstate->client_encoding,
pstate->enc_conversion_proc);
}
/* 1.6.8 Create temporary memory context for per row process */
/*
* Create a temporary memory context that we can reset once per row to
* recover palloc'd memory. This avoids any problems with leaks inside
* datatype input or output routines, and should be faster than retail
* pfree's anyway.
*/
pstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
"ExtTableMemCxt",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* 1.7 Initialize formatter data */
eid->ext_formatter_data = (FormatterData *) palloc0(sizeof(FormatterData));
eid->ext_formatter_data->fmt_perrow_ctx = eid->ext_pstate->rowcontext;
/* 2. Setup user data */
/* 2.1 Initialize user data */
ORCFormatUserData *user_data = (ORCFormatUserData *) palloc0(
sizeof(ORCFormatUserData));
init_format_user_data_for_write(tup_desc, user_data);
/* 2.2 Create formatter instance */
List *fmt_opts_defelem = pstate->custom_formatter_params;
user_data->fmt = create_formatter_instance(fmt_opts_defelem, fmt_encoding,
ps->ps_segno, tup_desc);
/* 2.4 Build tuple description */
build_tuple_descrition_for_write(relation, user_data);
/* 2.5 Save user data */
eid->ext_ps_user_data = (void *) user_data;
if (enable_secure_filesystem && Gp_role == GP_ROLE_EXECUTE)
{
char *token = find_filesystem_credential_with_uri(uri_str);
SetToken(uri_str, token);
}
/* 3. Begin insert with the formatter */
ORCFormatBeginInsertORCFormatC(user_data->fmt, eid->ext_uri,
user_data->colNames, user_data->colDatatypes,
user_data->colDatatypeMods, user_data->numberOfColumns);
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
/* 4. Save the result */
ps->ps_ext_insert_desc = eid;
PG_RETURN_POINTER(eid);
}
/*
* Oid
* orc_insert(ExternalInsertDesc extInsertDesc,
* TupleTableSlot *tupTableSlot)
*/
Datum orc_insert(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage) (fcinfo->context);
ExternalInsertDesc eid = ps->ps_ext_insert_desc;
TupleTableSlot *tts = ps->ps_tuple_table_slot;
ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data);
user_data->colValues = slot_get_values(tts);
bool *nulls = slot_get_isnull(tts);
static bool DUMMY_BOOL = true;
static int8_t DUMMY_INT8 = 0;
static int16_t DUMMY_INT16 = 0;
static int32_t DUMMY_INT32 = 0;
static int64_t DUMMY_INT64 = 0;
static float DUMMY_FLOAT = 0.0;
static double DUMMY_DOUBLE = 0.0;
static char DUMMY_TEXT[1] = "";
static int32_t DUMMY_DATE = 0;
static int64_t DUMMY_TIME = 0;
static TimestampType DUMMY_TIMESTAMP;
DUMMY_TIMESTAMP.second = 0;
DUMMY_TIMESTAMP.nanosecond = 0;
static int16_t DUMMY_INT16_ARRAY[1] =
{ 0.0 };
static int32_t DUMMY_INT32_ARRAY[1] =
{ 0.0 };
static int64_t DUMMY_INT64_ARRAY[1] =
{ 0.0 };
static float DUMMY_FLOAT_ARRAY[1] =
{ 0.0 };
static double DUMMY_DOUBLE_ARRAY[1] =
{ 0.0 };
TupleDesc tupdesc = tts->tts_tupleDescriptor;
user_data->numberOfColumns = tupdesc->natts;
MemoryContext per_row_context = eid->ext_pstate->rowcontext;
MemoryContextReset(per_row_context);
MemoryContext old_context = MemoryContextSwitchTo(per_row_context);
/* Get column values */
for (int i = 0; i < user_data->numberOfColumns; ++i)
{
int dataType = (int) (tupdesc->attrs[i]->atttypid);
user_data->colRawValues[i] = NULL;
user_data->colValNullBitmap[i] = NULL;
if (nulls[i])
{
if (dataType == HAWQ_TYPE_CHAR)
{
user_data->colRawValues[i] = (char *) (&DUMMY_INT8);
}
else if (dataType == HAWQ_TYPE_INT2)
{
user_data->colRawValues[i] = (char *) (&DUMMY_INT16);
}
else if (dataType == HAWQ_TYPE_INT4)
{
user_data->colRawValues[i] = (char *) (&DUMMY_INT32);
}
else if (dataType == HAWQ_TYPE_INT8)
{
user_data->colRawValues[i] = (char *) (&DUMMY_INT64);
}
else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
|| dataType == HAWQ_TYPE_BPCHAR
|| dataType == HAWQ_TYPE_VARCHAR
|| dataType == HAWQ_TYPE_NUMERIC)
{
user_data->colRawValues[i] = (char *) (DUMMY_TEXT);
}
else if (dataType == HAWQ_TYPE_FLOAT4)
{
user_data->colRawValues[i] = (char *) (&DUMMY_FLOAT);
}
else if (dataType == HAWQ_TYPE_FLOAT8)
{
user_data->colRawValues[i] = (char *) (&DUMMY_DOUBLE);
}
else if (dataType == HAWQ_TYPE_BOOL)
{
user_data->colRawValues[i] = (char *) (&DUMMY_BOOL);
}
else if (dataType == HAWQ_TYPE_DATE)
{
user_data->colRawValues[i] = (char *) (&DUMMY_DATE);
}
else if (dataType == HAWQ_TYPE_TIME)
{
user_data->colRawValues[i] = (char *) (&DUMMY_TIME);
}
else if (dataType == HAWQ_TYPE_TIMESTAMP
|| dataType == HAWQ_TYPE_TIMESTAMPTZ)
{
user_data->colRawValues[i] = (char *) (&DUMMY_TIMESTAMP);
}
else if (dataType == HAWQ_TYPE_INT2_ARRAY)
{
user_data->colRawValues[i] = (char *) (&DUMMY_INT16_ARRAY);
}
else if (dataType == HAWQ_TYPE_INT4_ARRAY)
{
user_data->colRawValues[i] = (char *) (&DUMMY_INT32_ARRAY);
}
else if (dataType == HAWQ_TYPE_INT8_ARRAY)
{
user_data->colRawValues[i] = (char *) (&DUMMY_INT64_ARRAY);
}
else if (dataType == HAWQ_TYPE_FLOAT4_ARRAY)
{
user_data->colRawValues[i] = (char *) (&DUMMY_FLOAT_ARRAY);
}
else if (dataType == HAWQ_TYPE_FLOAT8_ARRAY)
{
user_data->colRawValues[i] = (char *) (&DUMMY_DOUBLE_ARRAY);
}
else if (dataType == HAWQ_TYPE_INVALID)
{
ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is invalid", dataType)));
}
else
{
ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is not supported yet", dataType)));
}
continue;
}
if (dataType == HAWQ_TYPE_CHAR || dataType == HAWQ_TYPE_INT2
|| dataType == HAWQ_TYPE_INT4 || dataType == HAWQ_TYPE_INT8
|| dataType == HAWQ_TYPE_FLOAT4 || dataType == HAWQ_TYPE_FLOAT8
|| dataType == HAWQ_TYPE_BOOL || dataType == HAWQ_TYPE_TIME)
{
user_data->colRawValues[i] = (char *) (&(user_data->colValues[i]));
}
else if (dataType == HAWQ_TYPE_TIMESTAMP || dataType == HAWQ_TYPE_TIMESTAMPTZ)
{
int64_t *timestamp = (int64_t *) (&(user_data->colValues[i]));
user_data->colTimestamp[i].second = *timestamp / 1000000
+ (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24;
user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000;
int64_t days = user_data->colTimestamp[i].second / 60 / 60 / 24;
if (user_data->colTimestamp[i].nanosecond < 0 &&
(days > POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE || days < 0))
user_data->colTimestamp[i].nanosecond += 1000000000;
if(user_data->colTimestamp[i].second<0&&user_data->colTimestamp[i].nanosecond)
user_data->colTimestamp[i].second-=1;
user_data->colRawValues[i] =
(char *) (&(user_data->colTimestamp[i]));
}
else if (dataType == HAWQ_TYPE_DATE)
{
int *date = (int *) (&(user_data->colValues[i]));
*date += POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE;
user_data->colRawValues[i] = (char *) (&(user_data->colValues[i]));
}
else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
|| dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR
|| dataType == HAWQ_TYPE_NUMERIC)
{
user_data->colRawValues[i] = OutputFunctionCall(
&(eid->ext_pstate->out_functions[i]),
user_data->colValues[i]);
if (dataType != HAWQ_TYPE_BYTE && eid->ext_pstate->need_transcoding)
{
char *cvt = NULL;
cvt = pg_server_to_custom(user_data->colRawValues[i],
strlen(user_data->colRawValues[i]),
eid->ext_pstate->client_encoding,
eid->ext_pstate->enc_conversion_proc);
if (cvt != user_data->colRawValues[i])
{
pfree(user_data->colRawValues[i]);
}
user_data->colRawValues[i] = cvt;
}
}
else if (dataType == HAWQ_TYPE_INT2_ARRAY
|| dataType == HAWQ_TYPE_INT4_ARRAY
|| dataType == HAWQ_TYPE_INT8_ARRAY
|| dataType == HAWQ_TYPE_FLOAT4_ARRAY
|| dataType == HAWQ_TYPE_FLOAT8_ARRAY)
{
ArrayType *arr = DatumGetArrayTypeP(user_data->colValues[i]);
user_data->colValLength[i] = ARR_SIZE(arr) - ARR_DATA_OFFSET(arr);
user_data->colRawValues[i] = ARR_DATA_PTR(arr);
user_data->colValNullBitmap[i] = ARR_NULLBITMAP(arr);
// Now we only support 1 dimension array
if (ARR_NDIM(arr) > 1)
{
ereport(ERROR, (errmsg_internal("Now we only support 1 dimension array in orc format,"
" your array dimension is %d", ARR_NDIM(arr))));
}
else if (ARR_NDIM(arr) == 1)
{
user_data->colValDims[i] = ARR_DIMS(arr);
}
else
{
user_data->colValDims[i] = NULL;
}
}
else if (dataType == HAWQ_TYPE_INVALID)
{
ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is invalid", dataType)));
}
else
{
ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is not supported yet", dataType)));
}
}
/* Pass to formatter to output */
ORCFormatInsertORCFormatC(user_data->fmt, user_data->colDatatypes,
user_data->colRawValues, user_data->colValLength,
user_data->colValNullBitmap, user_data->colValDims,
nulls);
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
ereport(ERROR,(errcode(e->errCode),errmsg("ORC::%s", e->errMessage)));
}
for (int i = 0; i < user_data->numberOfColumns; ++i)
{
int dataType = (int) (tupdesc->attrs[i]->atttypid);
if (nulls[i])
{
continue;
}
if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
|| dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR
|| dataType == HAWQ_TYPE_NUMERIC)
{
if (user_data->colRawValues[i])
{
pfree(user_data->colRawValues[i]);
}
}
}
ps->ps_tuple_oid = InvalidOid;
MemoryContextSwitchTo(old_context);
PG_RETURN_OID(InvalidOid);
}
/*
* void
* orc_insert_finish(ExternalInsertDesc extInsertDesc)
*/
Datum orc_insert_finish(PG_FUNCTION_ARGS)
{
PlugStorage ps = (PlugStorage) (fcinfo->context);
ExternalInsertDesc eid = ps->ps_ext_insert_desc;
ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data);
ORCFormatEndInsertORCFormatC(user_data->fmt);
ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
{
ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
}
ORCFormatFreeORCFormatC(&(user_data->fmt));
pfree(user_data->colDatatypes);
pfree(user_data->colRawValues);
pfree(user_data->colValLength);
for (int i = 0; i < user_data->numberOfColumns; ++i)
{
pfree(user_data->colNames[i]);
}
pfree(user_data->colNames);
pfree(user_data);
if (eid->ext_formatter_data)
pfree(eid->ext_formatter_data);
if (eid->ext_formatter_name)
pfree(eid->ext_formatter_name);
pfree(eid);
PG_RETURN_VOID() ;
}
static FmgrInfo *get_orc_function(char *formatter_name, char *function_name)
{
Assert(formatter_name);Assert(function_name);
Oid procOid = InvalidOid;
FmgrInfo *procInfo = NULL;
procOid = LookupPlugStorageValidatorFunc(formatter_name, function_name);
if (OidIsValid(procOid))
{
procInfo = (FmgrInfo *) palloc(sizeof(FmgrInfo));
fmgr_info(procOid, procInfo);
}
else
{
ereport(ERROR, (errmsg_internal("%s_%s function was not found for pluggable storage",
formatter_name, function_name)));
}
return procInfo;
}
static void get_scan_functions(FileScanDesc file_scan_desc)
{
file_scan_desc->fs_ps_scan_funcs.beginscan = get_orc_function("orc",
"beginscan");
file_scan_desc->fs_ps_scan_funcs.getnext_init = get_orc_function("orc",
"getnext_init");
file_scan_desc->fs_ps_scan_funcs.getnext = get_orc_function("orc",
"getnext");
file_scan_desc->fs_ps_scan_funcs.rescan = get_orc_function("orc", "rescan");
file_scan_desc->fs_ps_scan_funcs.endscan = get_orc_function("orc",
"endscan");
file_scan_desc->fs_ps_scan_funcs.stopscan = get_orc_function("orc",
"stopscan");
}
static void get_insert_functions(ExternalInsertDesc ext_insert_desc)
{
ext_insert_desc->ext_ps_insert_funcs.insert_init = get_orc_function("orc",
"insert_init");
ext_insert_desc->ext_ps_insert_funcs.insert = get_orc_function("orc",
"insert");
ext_insert_desc->ext_ps_insert_funcs.insert_finish = get_orc_function("orc",
"insert_finish");
}
static void init_format_user_data_for_read(TupleDesc tup_desc,
ORCFormatUserData *user_data)
{
user_data->numberOfColumns = tup_desc->natts;
user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns);
user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns);
user_data->colDatatypeMods = palloc0(
sizeof(int64_t) * user_data->numberOfColumns);
user_data->colValues = palloc0(sizeof(Datum) * user_data->numberOfColumns);
user_data->colRawValues = palloc0(
sizeof(char *) * user_data->numberOfColumns);
user_data->colValLength = palloc0(
sizeof(uint64_t) * user_data->numberOfColumns);
user_data->colValNullBitmap = palloc0(
sizeof(bits8 *) * user_data->numberOfColumns);
for (int i = 0; i < user_data->numberOfColumns; i++)
{
user_data->colNames[i] = NULL;
user_data->colValues[i] = NULL;
user_data->colRawValues[i] = NULL;
user_data->colValLength[i] = 0;
user_data->colValNullBitmap[i] = (bits8 *) palloc0(
sizeof(bits8) * MAX_ORC_ARRAY_DIMS);
}
}
static void init_format_user_data_for_write(TupleDesc tup_desc,
ORCFormatUserData *user_data)
{
user_data->numberOfColumns = tup_desc->natts;
user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns);
user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns);
user_data->colDatatypeMods = palloc0(
sizeof(int64_t) * user_data->numberOfColumns);
user_data->colRawValues = palloc0(
sizeof(char *) * user_data->numberOfColumns);
user_data->colValLength = palloc0(
sizeof(uint64_t) * user_data->numberOfColumns);
user_data->colValNullBitmap = palloc0(
sizeof(bits8 *) * user_data->numberOfColumns);
user_data->colValDims = palloc0(sizeof(int *) * user_data->numberOfColumns);
user_data->colTimestamp = palloc0(
sizeof(TimestampType) * user_data->numberOfColumns);
}
static void build_options_in_json(List *fmt_opts_defelem, int encoding,
char **json_str, TupleDesc tupDesc)
{
struct json_object *opt_json_object = json_object_new_object();
/* add format options for the formatter */
char *key_str = NULL;
char *val_str = NULL;
const char *whitespace = " \t\n\r";
int nargs = list_length(fmt_opts_defelem);
for (int i = 0; i < nargs; ++i)
{
key_str = ((DefElem *) (list_nth(fmt_opts_defelem, i)))->defname;
val_str =
((Value *) ((DefElem *) (list_nth(fmt_opts_defelem, i)))->arg)->val.str;
if ((strncasecmp(key_str, "bloomfilter", strlen("bloomfilter")) == 0))
{
int attnum = tupDesc->natts;
json_object *j_array = json_object_new_array();
char *token = orc_strtokx2(val_str, whitespace, ",", NULL, 0, false, false, encoding);
while (token)
{
for (int j = 0; j < attnum; ++j)
{
if ((strncasecmp(token, ((Form_pg_attribute) (tupDesc->attrs[j]))->attname.data, strlen(token)) == 0))
{
json_object *j_obj = json_object_new_int(j);
json_object_array_add(j_array, j_obj);
}
}
token = orc_strtokx2(NULL, whitespace, ",", NULL, 0, false, false, encoding);;
}
json_object_object_add(opt_json_object, key_str, j_array);
}
else {
json_object_object_add(opt_json_object, key_str,
json_object_new_string(val_str));
}
}
/* add encoding option for orc */
if (json_object_object_get(opt_json_object, "encoding") == NULL)
{
const char *encodingStr = pg_encoding_to_char(encoding);
char *encodingStrLower = str_tolower(encodingStr, strlen(encodingStr));
json_object_object_add(opt_json_object, "encoding",
json_object_new_string(encodingStrLower));
if (encodingStrLower)
pfree(encodingStrLower);
}
*json_str = NULL;
if (opt_json_object != NULL)
{
const char *str = json_object_to_json_string(opt_json_object);
*json_str = (char *) palloc0(strlen(str) + 1);
strcpy(*json_str, str);
json_object_put(opt_json_object);
ereport(DEBUG3, (errmsg_internal("formatter options are %s", *json_str)));
}
}
static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem,
int fmt_encoding, int segno, TupleDesc tupDesc)
{
char *fmt_opts_str = NULL;
build_options_in_json(fmt_opts_defelem, fmt_encoding, &fmt_opts_str, tupDesc);
ORCFormatC *orc_format_c = ORCFormatNewORCFormatC(fmt_opts_str, segno);
if (fmt_opts_str != NULL)
{
pfree(fmt_opts_str);
}
return orc_format_c;
}
static void build_file_splits(Uri *uri, ScanState *scan_state,
ORCFormatUserData *user_data)
{
if (scan_state)
{
user_data->nSplits = list_length(scan_state->splits);
user_data->splits = palloc0(
sizeof(ORCFormatFileSplit) * user_data->nSplits);
ListCell *cell = NULL;
int i = 0;
foreach(cell, scan_state->splits)
{
FileSplit origFS = (FileSplit) lfirst(cell);
user_data->splits[i].len = origFS->lengths;
user_data->splits[i].start = origFS->offsets;
if (uri->protocol == URI_HIVE)
{
uri->hostname =
(scan_state->hivehost) ?
pstrdup(scan_state->hivehost) : NULL;
uri->port = scan_state->hiveport;
}
/* build file path containing host address */
int fileNameLen = sizeof("hdfs") - 1 + /* *** This is a bad imp. */
sizeof("://") - 1 + strlen(uri->hostname) + sizeof(':')
+ sizeof("65535") - 1 + strlen(origFS->ext_file_uri_string)
+ sizeof('\0');
user_data->splits[i].fileName = palloc(fileNameLen * sizeof(char));
sprintf(user_data->splits[i].fileName, "%s://%s:%d%s", "hdfs",
uri->hostname, uri->port, origFS->ext_file_uri_string);
ereport(LOG, (errmsg_internal("fileinformation:%s",user_data->splits[i].fileName)));
i++;
}
}
else
{
user_data->nSplits = 0;
user_data->splits = NULL;
}
}
static void build_tuple_descrition_for_read(Plan *plan, Relation relation,
ORCFormatUserData *user_data)
{
user_data->colToReads = palloc0(sizeof(bool) * user_data->numberOfColumns);
for (int i = 0; i < user_data->numberOfColumns; ++i)
{
user_data->colToReads[i] = plan ? false : true;
/* 64 is the name type length */
user_data->colNames[i] = palloc(sizeof(char) * 64);
strcpy(user_data->colNames[i],
relation->rd_att->attrs[i]->attname.data);
int data_type = (int) (relation->rd_att->attrs[i]->atttypid);
user_data->colDatatypes[i] = map_hawq_type_to_common_plan(data_type);
user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
}
if (plan)
{
/* calculate columns to read for seqscan */
GetNeededColumnsForScan((Node *) plan->targetlist,
user_data->colToReads, user_data->numberOfColumns);
GetNeededColumnsForScan((Node *) plan->qual, user_data->colToReads,
user_data->numberOfColumns);
}
}
static void build_tuple_descrition_for_write(Relation relation,
ORCFormatUserData *user_data)
{
for (int i = 0; i < user_data->numberOfColumns; ++i)
{
/* 64 is the name type length */
user_data->colNames[i] = palloc(sizeof(char) * 64);
strcpy(user_data->colNames[i],
relation->rd_att->attrs[i]->attname.data);
user_data->colDatatypes[i] = map_hawq_type_to_common_plan(
(int) (relation->rd_att->attrs[i]->atttypid));
if (user_data->colDatatypes[i] == CHARID &&
relation->rd_att->attrs[i]->atttypmod == -1) {
user_data->colDatatypeMods[i] =
strlen(relation->rd_att->attrs[i]->attname.data) + VARHDRSZ;
} else {
user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
}
}
}
static void orc_scan_error_callback(void *arg)
{
CopyState cstate = (CopyState) arg;
errcontext("External table %s", cstate->cur_relname);
}
static void orc_parse_format_string(CopyState pstate, char *fmtstr)
{
char *token;
const char *whitespace = " \t\n\r";
char nonstd_backslash = 0;
int encoding = GetDatabaseEncoding();
token = orc_strtokx2(fmtstr, whitespace, NULL, NULL, 0, false, true,
encoding);
/* parse user custom options. take it as is. no validation needed */
List *l = NIL;
bool formatter_found = false;
if (token)
{
char *key = token;
char *val = NULL;
StringInfoData key_modified;
initStringInfo(&key_modified);
while (key)
{
/* MPP-14467 - replace meta chars back to original */
resetStringInfo(&key_modified);
appendStringInfoString(&key_modified, key);
replaceStringInfoString(&key_modified, "<gpx20>", " ");
val = orc_strtokx2(NULL, whitespace, NULL, "'", nonstd_backslash,
true, true, encoding);
if (val)
{
if (pg_strcasecmp(key, "formatter") == 0)
{
pstate->custom_formatter_name = pstrdup(val);
formatter_found = true;
}
else
l = lappend(l,
makeDefElem(pstrdup(key_modified.data),
(Node *) makeString(pstrdup(val))));
}
else
goto error;
key = orc_strtokx2(NULL, whitespace, NULL, NULL, 0, false, false,
encoding);
}
}
if (!formatter_found)
{
/*
* If there is no formatter option specified, use format name. So
* we don't report error here.
*/
}
pstate->custom_formatter_params = l;
return;
error: if (token)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("external table internal parse error at \"%s\"", token)));
else
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("external table internal parse error at end of " "line")));
}
static char *orc_strtokx2(const char *s, const char *whitespace,
const char *delim, const char *quote, char escape, bool e_strings,
bool del_quotes, int encoding)
{
static char *storage = NULL;/* store the local copy of the users string
* here */
static char *string = NULL; /* pointer into storage where to continue on
* next call */
/* variously abused variables: */
unsigned int offset;
char *start;
char *p;
if (s)
{
//pfree(storage);
/*
* We may need extra space to insert delimiter nulls for adjacent
* tokens. 2X the space is a gross overestimate, but it's unlikely
* that this code will be used on huge strings anyway.
*/
storage = palloc(2 * strlen(s) + 1);
strcpy(storage, s);
string = storage;
}
if (!storage)
return NULL;
/* skip leading whitespace */
offset = strspn(string, whitespace);
start = &string[offset];
/* end of string reached? */
if (*start == '\0')
{
/* technically we don't need to free here, but we're nice */
pfree(storage);
storage = NULL;
string = NULL;
return NULL;
}
/* test if delimiter character */
if (delim && strchr(delim, *start))
{
/*
* If not at end of string, we need to insert a null to terminate the
* returned token. We can just overwrite the next character if it
* happens to be in the whitespace set ... otherwise move over the
* rest of the string to make room. (This is why we allocated extra
* space above).
*/
p = start + 1;
if (*p != '\0')
{
if (!strchr(whitespace, *p))
memmove(p + 1, p, strlen(p) + 1);
*p = '\0';
string = p + 1;
}
else
{
/* at end of string, so no extra work */
string = p;
}
return start;
}
/* check for E string */
p = start;
if (e_strings && (*p == 'E' || *p == 'e') && p[1] == '\'')
{
quote = "'";
escape = '\\'; /* if std strings before, not any more */
p++;
}
/* test if quoting character */
if (quote && strchr(quote, *p))
{
/* okay, we have a quoted token, now scan for the closer */
char thisquote = *p++;
/* MPP-6698 START
* unfortunately, it is possible for an external table format
* string to be represented in the catalog in a way which is
* problematic to parse: when using a single quote as a QUOTE
* or ESCAPE character the format string will show [quote '''].
* since we do not want to change how this is stored at this point
* (as it will affect previous versions of the software already
* in production) the following code block will detect this scenario
* where 3 quote characters follow each other, with no forth one.
* in that case, we will skip the second one (the first is skipped
* just above) and the last trailing quote will be skipped below.
* the result will be the actual token (''') and after stripping
* it due to del_quotes we'll end up with (').
* very ugly, but will do the job...
*/
char qt = quote[0];
if (strlen(p) >= 3 && p[0] == qt && p[1] == qt && p[2] != qt)
p++;
/* MPP-6698 END */
for (; *p; p += pg_encoding_mblen(encoding, p))
{
if (*p == escape && p[1] != '\0')
p++; /* process escaped anything */
else if (*p == thisquote && p[1] == thisquote)
p++; /* process doubled quote */
else if (*p == thisquote)
{
p++; /* skip trailing quote */
break;
}
}
/*
* If not at end of string, we need to insert a null to terminate the
* returned token. See notes above.
*/
if (*p != '\0')
{
if (!strchr(whitespace, *p))
memmove(p + 1, p, strlen(p) + 1);
*p = '\0';
string = p + 1;
}
else
{
/* at end of string, so no extra work */
string = p;
}
/* Clean up the token if caller wants that */
if (del_quotes)
orc_strip_quotes(start, thisquote, escape, encoding);
return start;
}
/*
* Otherwise no quoting character. Scan till next whitespace, delimiter
* or quote. NB: at this point, *start is known not to be '\0',
* whitespace, delim, or quote, so we will consume at least one character.
*/
offset = strcspn(start, whitespace);
if (delim)
{
unsigned int offset2 = strcspn(start, delim);
if (offset > offset2)
offset = offset2;
}
if (quote)
{
unsigned int offset2 = strcspn(start, quote);
if (offset > offset2)
offset = offset2;
}
p = start + offset;
/*
* If not at end of string, we need to insert a null to terminate the
* returned token. See notes above.
*/
if (*p != '\0')
{
if (!strchr(whitespace, *p))
memmove(p + 1, p, strlen(p) + 1);
*p = '\0';
string = p + 1;
}
else
{
/* at end of string, so no extra work */
string = p;
}
return start;
}
static void orc_strip_quotes(char *source, char quote, char escape,
int encoding)
{
char *src;
char *dst;
Assert(source);Assert(quote);
src = dst = source;
if (*src && *src == quote)
src++; /* skip leading quote */
while (*src)
{
char c = *src;
int i;
if (c == quote && src[1] == '\0')
break; /* skip trailing quote */
else if (c == quote && src[1] == quote)
src++; /* process doubled quote */
else if (c == escape && src[1] != '\0')
src++; /* process escaped character */
i = pg_encoding_mblen(encoding, src);
while (i--)
*dst++ = *src++;
}
*dst = '\0';
}