HAWQ-1708. Add ORC writer in hawq
diff --git a/contrib/orc/orc.c b/contrib/orc/orc.c
index ac2f878..8dbc1c1 100644
--- a/contrib/orc/orc.c
+++ b/contrib/orc/orc.c
@@ -46,6 +46,73 @@
PG_FUNCTION_INFO_V1(orc_validate_encodings);
PG_FUNCTION_INFO_V1(orc_validate_datatypes);
+/* Accessors for pluggable storage format ORC */
+PG_FUNCTION_INFO_V1(orc_insert_init);
+PG_FUNCTION_INFO_V1(orc_insert);
+PG_FUNCTION_INFO_V1(orc_insert_finish);
+
+///* Definitions of validators for pluggable storage format ORC */
+//Datum orc_validate_interfaces(PG_FUNCTION_ARGS);
+//Datum orc_validate_options(PG_FUNCTION_ARGS);
+//Datum orc_validate_encodings(PG_FUNCTION_ARGS);
+//Datum orc_validate_datatypes(PG_FUNCTION_ARGS);
+//
+///* Definitions of accessors for pluggable storage format ORC */
+//Datum orc_insert_init(PG_FUNCTION_ARGS);
+//Datum orc_insert(PG_FUNCTION_ARGS);
+//Datum orc_insert_finish(PG_FUNCTION_ARGS);
+
+typedef struct
+{
+ int64_t second;
+ int64_t nanosecond;
+} TimestampType;
+
+typedef struct ORCFormatUserData
+{
+ ORCFormatC *fmt;
+ char **colNames;
+ int *colDatatypes;
+ int64_t *colDatatypeMods;
+ int numberOfColumns;
+ char **colRawValues;
+ Datum *colValues;
+ uint64_t *colValLength;
+ bits8 **colValNullBitmap;
+ int **colValDims;
+ char **colAddresses;
+ bool *colIsNulls;
+ bool *colToReads;
+ bool *colSpeedUpPossible;
+ bool *colSpeedUp;
+
+ bool *nulls;
+ Datum *datums;
+ int reserved;
+
+ TimestampType *colTimestamp;
+
+ int nSplits;
+ ORCFormatFileSplit *splits;
+} ORCFormatUserData;
+
+static FmgrInfo *get_orc_function(char *formatter_name, char *function_name);
+static void get_insert_functions(ExternalInsertDesc ext_insert_desc);
+static void init_format_user_data_for_write(TupleDesc tup_desc,
+ ORCFormatUserData *user_data);
+static void build_options_in_json(List *fmt_opts_defelem, int encoding,
+ char **json_str, TupleDesc tupDesc);
+static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem,
+ int encoding, int segno, TupleDesc tupDesc);
+static void build_tuple_descrition_for_write(Relation relation,
+ ORCFormatUserData *user_data);
+static void orc_parse_format_string(CopyState pstate, char *fmtstr);
+static char *orc_strtokx2(const char *s, const char *whitespace,
+ const char *delim, const char *quote, char escape, bool e_strings,
+ bool del_quotes, int encoding);
+static void orc_strip_quotes(char *source, char quote, char escape,
+ int encoding);
+
/* Implementation of validators for pluggable storage format ORC */
/*
@@ -255,3 +322,1022 @@
PG_RETURN_VOID() ;
}
+
+/*
+ * ExternalInsertDesc
+ * orc_insert_init(Relation relation,
+ * int formatterType,
+ * char *formatterName)
+ */
+Datum orc_insert_init(PG_FUNCTION_ARGS)
+{
+ PlugStorage ps = (PlugStorage) (fcinfo->context);
+ Relation relation = ps->ps_relation;
+ int formatterType = ps->ps_formatter_type;
+ char *formatterName = ps->ps_formatter_name;
+
+ /* 1. Allocate and initialize the insert descriptor */
+ ExternalInsertDesc eid = palloc(sizeof(ExternalInsertDescData));
+ eid->ext_formatter_type = formatterType;
+ eid->ext_formatter_name = formatterName;
+
+ /* 1.1 Setup insert functions */
+ get_insert_functions(eid);
+
+ /* 1.2 Initialize basic information */
+ eid->ext_rel = relation;
+ eid->ext_noop = (Gp_role == GP_ROLE_DISPATCH);
+ eid->ext_formatter_data = NULL;
+
+ /* 1.3 Get URI string */
+ ExtTableEntry *ete = GetExtTableEntry(RelationGetRelid(relation));
+ Value *v;
+ char *uri_str;
+ int segindex = GetQEIndex();
+ int num_segs = GetQEGangNum();
+ int num_urls = list_length(ete->locations);
+ int my_url = segindex % num_urls;
+
+ if (num_urls > num_segs)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("External table has more URLs then available primary " "segments that can write into them")));
+
+ v = list_nth(ete->locations, my_url);
+ uri_str = pstrdup(v->val.str);
+
+ /*when it is hive protocol, transfer the url*/
+ Uri* tmp = ParseExternalTableUri(uri_str);
+ if (tmp->protocol == URI_HIVE)
+ {
+ if (tmp->hostname)
+ {
+ pfree(tmp->hostname);
+ }
+ tmp->hostname = pstrdup(ps->ps_scan_state->hivehost);
+ tmp->port = ps->ps_scan_state->hiveport;
+ if (tmp->path)
+ {
+ pfree(tmp->path);
+ }
+ tmp->path = pstrdup(ps->ps_scan_state->hivepath);
+ int uriLen = sizeof("hdfs") - 1 + /* *** This is a bad imp. */
+ sizeof("://") - 1 + strlen(tmp->hostname) + sizeof(':')
+ + sizeof("65535") - 1 + strlen(tmp->path);
+
+ char* url = palloc(uriLen * sizeof(char));
+ sprintf(url, "%s://%s:%d%s", "hdfs", tmp->hostname, tmp->port,
+ tmp->path);
+ eid->ext_uri = url;
+ }
+ else
+ {
+ eid->ext_uri = uri_str;
+ }
+ FreeExternalTableUri(tmp);
+
+ /* 1.4 Allocate and initialize structure which track data parsing state */
+ eid->ext_pstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
+ eid->ext_tupDesc = RelationGetDescr(relation);
+ eid->ext_values = (Datum *) palloc(eid->ext_tupDesc->natts * sizeof(Datum));
+ eid->ext_nulls = (bool *) palloc(eid->ext_tupDesc->natts * sizeof(bool));
+
+ /* 1.5 Get format options */
+ List *fmt_opts = NIL;
+ fmt_opts = lappend(fmt_opts, makeString(pstrdup(ete->fmtopts)));
+
+ /* 1.6 Initialize parse state */
+ /* 1.6.1 Initialize basic information for pstate */
+ CopyState pstate = eid->ext_pstate;
+ pstate->fe_eof = false;
+ pstate->eol_type = EOL_UNKNOWN;
+ pstate->eol_str = NULL;
+ pstate->cur_relname = RelationGetRelationName(relation);
+ pstate->cur_lineno = 0;
+ pstate->err_loc_type = ROWNUM_ORIGINAL;
+ pstate->cur_attname = NULL;
+ pstate->raw_buf_done = true; /* true so we will read data in first run */
+ pstate->line_done = true;
+ pstate->bytesread = 0;
+ pstate->custom = false;
+ pstate->header_line = false;
+ pstate->fill_missing = false;
+ pstate->line_buf_converted = false;
+ pstate->raw_buf_index = 0;
+ pstate->processed = 0;
+ pstate->filename = uri_str;
+ pstate->copy_dest = COPY_EXTERNAL_SOURCE;
+ pstate->missing_bytes = 0;
+ pstate->rel = relation;
+
+ /* 1.6.2 Setup encoding information */
+ /*
+ * Set up encoding conversion info. Even if the client and server
+ * encodings are the same, we must apply pg_client_to_server() to validate
+ * data in multibyte encodings.
+ *
+ * Each external table specifies the encoding of its external data. We will
+ * therefore set a client encoding and client-to-server conversion procedure
+ * in here (server-to-client in WET) and these will be used in the data
+ * conversion routines (in copy.c CopyReadLineXXX(), etc).
+ */
+ int fmt_encoding = ete->encoding;
+ Insist(PG_VALID_ENCODING(fmt_encoding));
+ pstate->client_encoding = fmt_encoding;
+ Oid conversion_proc = FindDefaultConversionProc(GetDatabaseEncoding(),
+ fmt_encoding);
+
+ if (OidIsValid(conversion_proc))
+ {
+ /* conversion proc found */
+ pstate->enc_conversion_proc = palloc(sizeof(FmgrInfo));
+ fmgr_info(conversion_proc, pstate->enc_conversion_proc);
+ }
+ else
+ {
+ /* no conversion function (both encodings are probably the same) */
+ pstate->enc_conversion_proc = NULL;
+ }
+
+ pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
+ pstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(
+ pstate->client_encoding);
+
+ /* 1.6.3 Parse format options */
+ char *format_str = pstrdup((char *) strVal(linitial(fmt_opts)));
+ orc_parse_format_string(pstate, format_str);
+
+ /* 1.6.4 Setup tuple description */
+ TupleDesc tup_desc = eid->ext_tupDesc;
+ pstate->attr_offsets = (int *) palloc(tup_desc->natts * sizeof(int));
+
+ /* 1.6.5 Generate or convert list of attributes to process */
+ pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
+
+ /* 1.6.6 Convert FORCE NOT NULL name list to per-column flags, check validity */
+ pstate->force_notnull_flags = (bool *) palloc0(
+ tup_desc->natts * sizeof(bool));
+ if (pstate->force_notnull)
+ {
+ List *attnums;
+ ListCell *cur;
+
+ attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
+
+ foreach(cur, attnums)
+ {
+ int attnum = lfirst_int(cur);
+ pstate->force_notnull_flags[attnum - 1] = true;
+ }
+ }
+
+ /* 1.6.7 Take care of state that is WET specific */
+ Form_pg_attribute *attr = tup_desc->attrs;
+ ListCell *cur;
+
+ pstate->null_print_client = pstate->null_print; /* default */
+ pstate->fe_msgbuf = makeStringInfo(); /* use fe_msgbuf as a per-row buffer */
+ pstate->out_functions = (FmgrInfo *) palloc(
+ tup_desc->natts * sizeof(FmgrInfo));
+
+ foreach(cur, pstate->attnumlist)
+ /* Get info about the columns need to process */
+ {
+ int attnum = lfirst_int(cur);
+ Oid out_func_oid;
+ bool isvarlena;
+
+ getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid,
+ &isvarlena);
+ fmgr_info(out_func_oid, &pstate->out_functions[attnum - 1]);
+ }
+
+ /*
+ * We need to convert null_print to client encoding, because it
+ * will be sent directly with CopySendString.
+ */
+ if (pstate->need_transcoding)
+ {
+ pstate->null_print_client = pg_server_to_custom(pstate->null_print,
+ pstate->null_print_len, pstate->client_encoding,
+ pstate->enc_conversion_proc);
+ }
+
+ /* 1.6.8 Create temporary memory context for per row process */
+ /*
+ * Create a temporary memory context that we can reset once per row to
+ * recover palloc'd memory. This avoids any problems with leaks inside
+ * datatype input or output routines, and should be faster than retail
+ * pfree's anyway.
+ */
+ pstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+ "ExtTableMemCxt",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /* 1.7 Initialize formatter data */
+ eid->ext_formatter_data = (FormatterData *) palloc0(sizeof(FormatterData));
+ eid->ext_formatter_data->fmt_perrow_ctx = eid->ext_pstate->rowcontext;
+
+ /* 2. Setup user data */
+
+ /* 2.1 Initialize user data */
+ ORCFormatUserData *user_data = (ORCFormatUserData *) palloc0(
+ sizeof(ORCFormatUserData));
+ init_format_user_data_for_write(tup_desc, user_data);
+
+ /* 2.2 Create formatter instance */
+ List *fmt_opts_defelem = pstate->custom_formatter_params;
+ user_data->fmt = create_formatter_instance(fmt_opts_defelem, fmt_encoding,
+ ps->ps_segno, tup_desc);
+
+ /* 2.4 Build tuple description */
+ build_tuple_descrition_for_write(relation, user_data);
+
+ /* 2.5 Save user data */
+ eid->ext_ps_user_data = (void *) user_data;
+
+ if (enable_secure_filesystem && Gp_role == GP_ROLE_EXECUTE)
+ {
+ char *token = find_filesystem_credential_with_uri(uri_str);
+ SetToken(uri_str, token);
+ }
+ /* 3. Begin insert with the formatter */
+ ORCFormatBeginInsertORCFormatC(user_data->fmt, eid->ext_uri,
+ user_data->colNames, user_data->colDatatypes,
+ user_data->colDatatypeMods, user_data->numberOfColumns);
+
+ ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+ if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+ {
+ elog(ERROR, "ORC: failed to begin insert: %s (%d)",
+ e->errMessage, e->errCode);
+ }
+
+ /* 4. Save the result */
+ ps->ps_ext_insert_desc = eid;
+
+ PG_RETURN_POINTER(eid);
+}
+
+/*
+ * Oid
+ * orc_insert(ExternalInsertDesc extInsertDesc,
+ * TupleTableSlot *tupTableSlot)
+ */
+Datum orc_insert(PG_FUNCTION_ARGS)
+{
+ PlugStorage ps = (PlugStorage) (fcinfo->context);
+ ExternalInsertDesc eid = ps->ps_ext_insert_desc;
+ TupleTableSlot *tts = ps->ps_tuple_table_slot;
+
+ ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data);
+
+ user_data->colValues = slot_get_values(tts);
+ user_data->colIsNulls = slot_get_isnull(tts);
+
+ static bool DUMMY_BOOL = true;
+ static int8_t DUMMY_INT8 = 0;
+ static int16_t DUMMY_INT16 = 0;
+ static int32_t DUMMY_INT32 = 0;
+ static int64_t DUMMY_INT64 = 0;
+ static float DUMMY_FLOAT = 0.0;
+ static double DUMMY_DOUBLE = 0.0;
+ static char DUMMY_TEXT[1] = "";
+ static int32_t DUMMY_DATE = 0;
+ static int64_t DUMMY_TIME = 0;
+ static TimestampType DUMMY_TIMESTAMP;
+ DUMMY_TIMESTAMP.second = 0;
+ DUMMY_TIMESTAMP.nanosecond = 0;
+ static int16_t DUMMY_INT16_ARRAY[1] =
+ { 0.0 };
+ static int32_t DUMMY_INT32_ARRAY[1] =
+ { 0.0 };
+ static int64_t DUMMY_INT64_ARRAY[1] =
+ { 0.0 };
+ static float DUMMY_FLOAT_ARRAY[1] =
+ { 0.0 };
+ static double DUMMY_DOUBLE_ARRAY[1] =
+ { 0.0 };
+
+ TupleDesc tupdesc = tts->tts_tupleDescriptor;
+ user_data->numberOfColumns = tupdesc->natts;
+
+ MemoryContext per_row_context = eid->ext_pstate->rowcontext;
+ MemoryContextReset(per_row_context);
+ MemoryContext old_context = MemoryContextSwitchTo(per_row_context);
+
+ /* Get column values */
+ for (int i = 0; i < user_data->numberOfColumns; ++i)
+ {
+ int dataType = (int) (tupdesc->attrs[i]->atttypid);
+
+ user_data->colRawValues[i] = NULL;
+ user_data->colValNullBitmap[i] = NULL;
+
+ if (user_data->colIsNulls[i])
+ {
+ if (dataType == HAWQ_TYPE_CHAR)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_INT8);
+ }
+ else if (dataType == HAWQ_TYPE_INT2)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_INT16);
+ }
+ else if (dataType == HAWQ_TYPE_INT4)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_INT32);
+ }
+ else if (dataType == HAWQ_TYPE_INT8)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_INT64);
+ }
+ else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
+ || dataType == HAWQ_TYPE_BPCHAR
+ || dataType == HAWQ_TYPE_VARCHAR
+ || dataType == HAWQ_TYPE_NUMERIC)
+ {
+ user_data->colRawValues[i] = (char *) (DUMMY_TEXT);
+ }
+ else if (dataType == HAWQ_TYPE_FLOAT4)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_FLOAT);
+ }
+ else if (dataType == HAWQ_TYPE_FLOAT8)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_DOUBLE);
+ }
+ else if (dataType == HAWQ_TYPE_BOOL)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_BOOL);
+ }
+ else if (dataType == HAWQ_TYPE_DATE)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_DATE);
+ }
+ else if (dataType == HAWQ_TYPE_TIME)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_TIME);
+ }
+ else if (dataType == HAWQ_TYPE_TIMESTAMP)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_TIMESTAMP);
+ }
+ else if (dataType == HAWQ_TYPE_INT2_ARRAY)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_INT16_ARRAY);
+ }
+ else if (dataType == HAWQ_TYPE_INT4_ARRAY)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_INT32_ARRAY);
+ }
+ else if (dataType == HAWQ_TYPE_INT8_ARRAY)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_INT64_ARRAY);
+ }
+ else if (dataType == HAWQ_TYPE_FLOAT4_ARRAY)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_FLOAT_ARRAY);
+ }
+ else if (dataType == HAWQ_TYPE_FLOAT8_ARRAY)
+ {
+ user_data->colRawValues[i] = (char *) (&DUMMY_DOUBLE_ARRAY);
+ }
+ else if (dataType == HAWQ_TYPE_INVALID)
+ {
+ elog(ERROR, "HAWQ data type with id %d is invalid", dataType);
+ }
+ else
+ {
+ elog(
+ ERROR, "HAWQ data type with id %d is not supported yet", dataType);
+ }
+
+ continue;
+ }
+
+ if (dataType == HAWQ_TYPE_CHAR || dataType == HAWQ_TYPE_INT2
+ || dataType == HAWQ_TYPE_INT4 || dataType == HAWQ_TYPE_INT8
+ || dataType == HAWQ_TYPE_FLOAT4 || dataType == HAWQ_TYPE_FLOAT8
+ || dataType == HAWQ_TYPE_BOOL || dataType == HAWQ_TYPE_TIME)
+ {
+ user_data->colRawValues[i] = (char *) (&(user_data->colValues[i]));
+ }
+ else if (dataType == HAWQ_TYPE_TIMESTAMP)
+ {
+ int64_t *timestamp = (int64_t *) (&(user_data->colValues[i]));
+ user_data->colTimestamp[i].second = *timestamp / 1000000
+ + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24;
+ user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000;
+ if (user_data->colTimestamp[i].nanosecond < 0)
+ user_data->colTimestamp[i].nanosecond += 1000000000;
+ user_data->colRawValues[i] =
+ (char *) (&(user_data->colTimestamp[i]));
+ }
+ else if (dataType == HAWQ_TYPE_DATE)
+ {
+ int *date = (int *) (&(user_data->colValues[i]));
+ *date += POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE;
+ user_data->colRawValues[i] = (char *) (&(user_data->colValues[i]));
+ }
+ else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
+ || dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR
+ || dataType == HAWQ_TYPE_NUMERIC)
+ {
+ user_data->colRawValues[i] = OutputFunctionCall(
+ &(eid->ext_pstate->out_functions[i]),
+ user_data->colValues[i]);
+
+ if (dataType != HAWQ_TYPE_BYTE && eid->ext_pstate->need_transcoding)
+ {
+ char *cvt = NULL;
+
+ cvt = pg_server_to_custom(user_data->colRawValues[i],
+ strlen(user_data->colRawValues[i]),
+ eid->ext_pstate->client_encoding,
+ eid->ext_pstate->enc_conversion_proc);
+
+ if (cvt != user_data->colRawValues[i])
+ {
+ pfree(user_data->colRawValues[i]);
+ }
+
+ user_data->colRawValues[i] = cvt;
+ }
+ }
+ else if (dataType == HAWQ_TYPE_INT2_ARRAY
+ || dataType == HAWQ_TYPE_INT4_ARRAY
+ || dataType == HAWQ_TYPE_INT8_ARRAY
+ || dataType == HAWQ_TYPE_FLOAT4_ARRAY
+ || dataType == HAWQ_TYPE_FLOAT8_ARRAY)
+ {
+ ArrayType *arr = DatumGetArrayTypeP(user_data->colValues[i]);
+ user_data->colValLength[i] = ARR_SIZE(arr) - ARR_DATA_OFFSET(arr);
+ user_data->colRawValues[i] = ARR_DATA_PTR(arr);
+ user_data->colValNullBitmap[i] = ARR_NULLBITMAP(arr);
+ // Now we only support 1 dimension array
+ if (ARR_NDIM(arr) > 1)
+ {
+ elog(ERROR, "Now we only support 1 dimension array in orc format,"
+ " your array dimension is %d", ARR_NDIM(arr));
+ }
+ else if (ARR_NDIM(arr) == 1)
+ {
+ user_data->colValDims[i] = ARR_DIMS(arr);
+ }
+ else
+ {
+ user_data->colValDims[i] = NULL;
+ }
+ }
+ else if (dataType == HAWQ_TYPE_INVALID)
+ {
+ elog(ERROR, "HAWQ data type with id %d is invalid", dataType);
+ }
+ else
+ {
+ elog(
+ ERROR, "HAWQ data type with id %d is not supported yet", dataType);
+ }
+ }
+
+ /* Pass to formatter to output */
+ ORCFormatInsertORCFormatC(user_data->fmt, user_data->colDatatypes,
+ user_data->colRawValues, user_data->colValLength,
+ user_data->colValNullBitmap, user_data->colValDims,
+ user_data->colIsNulls);
+
+ ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+ if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+ {
+ elog(ERROR, "orc_insert: failed to insert: %s(%d)",
+ e->errMessage, e->errCode);
+ }
+
+ for (int i = 0; i < user_data->numberOfColumns; ++i)
+ {
+ int dataType = (int) (tupdesc->attrs[i]->atttypid);
+
+ if (user_data->colIsNulls[i])
+ {
+ continue;
+ }
+
+ if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
+ || dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR
+ || dataType == HAWQ_TYPE_NUMERIC)
+ {
+ if (user_data->colRawValues[i])
+ {
+ pfree(user_data->colRawValues[i]);
+ }
+ }
+ }
+
+ ps->ps_tuple_oid = InvalidOid;
+
+ MemoryContextSwitchTo(old_context);
+
+ PG_RETURN_OID(InvalidOid);
+}
+
+/*
+ * void
+ * orc_insert_finish(ExternalInsertDesc extInsertDesc)
+ */
+Datum orc_insert_finish(PG_FUNCTION_ARGS)
+{
+ PlugStorage ps = (PlugStorage) (fcinfo->context);
+ ExternalInsertDesc eid = ps->ps_ext_insert_desc;
+
+ ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data);
+
+ ORCFormatEndInsertORCFormatC(user_data->fmt);
+ ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+ if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+ {
+ elog(ERROR, "ORC: failed to finish insert: %s(%d)",
+ e->errMessage, e->errCode);
+ }
+
+ ORCFormatFreeORCFormatC(&(user_data->fmt));
+
+ pfree(user_data->colDatatypes);
+ pfree(user_data->colRawValues);
+ pfree(user_data->colValLength);
+ pfree(user_data->colAddresses);
+ for (int i = 0; i < user_data->numberOfColumns; ++i)
+ {
+ pfree(user_data->colNames[i]);
+ }
+ pfree(user_data->colNames);
+ pfree(user_data);
+
+ if (eid->ext_formatter_data)
+ pfree(eid->ext_formatter_data);
+
+ if (eid->ext_formatter_name)
+ pfree(eid->ext_formatter_name);
+
+ pfree(eid);
+
+ PG_RETURN_VOID() ;
+}
+
+static FmgrInfo *get_orc_function(char *formatter_name, char *function_name)
+{
+ Assert(formatter_name);Assert(function_name);
+
+ Oid procOid = InvalidOid;
+ FmgrInfo *procInfo = NULL;
+
+ procOid = LookupPlugStorageValidatorFunc(formatter_name, function_name);
+
+ if (OidIsValid(procOid))
+ {
+ procInfo = (FmgrInfo *) palloc(sizeof(FmgrInfo));
+ fmgr_info(procOid, procInfo);
+ }
+ else
+ {
+ elog(ERROR, "%s_%s function was not found for pluggable storage",
+ formatter_name, function_name);
+ }
+
+ return procInfo;
+}
+
+static void get_insert_functions(ExternalInsertDesc ext_insert_desc)
+{
+ ext_insert_desc->ext_ps_insert_funcs.insert_init = get_orc_function("orc",
+ "insert_init");
+
+ ext_insert_desc->ext_ps_insert_funcs.insert = get_orc_function("orc",
+ "insert");
+
+ ext_insert_desc->ext_ps_insert_funcs.insert_finish = get_orc_function("orc",
+ "insert_finish");
+}
+
+static void init_format_user_data_for_write(TupleDesc tup_desc,
+ ORCFormatUserData *user_data)
+{
+ user_data->numberOfColumns = tup_desc->natts;
+ user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns);
+ user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns);
+ user_data->colDatatypeMods = palloc0(
+ sizeof(int64_t) * user_data->numberOfColumns);
+ user_data->colRawValues = palloc0(
+ sizeof(char *) * user_data->numberOfColumns);
+ user_data->colValLength = palloc0(
+ sizeof(uint64_t) * user_data->numberOfColumns);
+ user_data->colValNullBitmap = palloc0(
+ sizeof(bits8 *) * user_data->numberOfColumns);
+ user_data->colValDims = palloc0(sizeof(int *) * user_data->numberOfColumns);
+ user_data->colAddresses = palloc0(
+ sizeof(char *) * user_data->numberOfColumns);
+ user_data->colTimestamp = palloc0(
+ sizeof(TimestampType) * user_data->numberOfColumns);
+}
+
+static void build_options_in_json(List *fmt_opts_defelem, int encoding,
+ char **json_str, TupleDesc tupDesc)
+{
+ struct json_object *opt_json_object = json_object_new_object();
+
+ /* add format options for the formatter */
+ char *key_str = NULL;
+ char *val_str = NULL;
+ const char *whitespace = " \t\n\r";
+
+ int nargs = list_length(fmt_opts_defelem);
+ for (int i = 0; i < nargs; ++i)
+ {
+ key_str = ((DefElem *) (list_nth(fmt_opts_defelem, i)))->defname;
+ val_str =
+ ((Value *) ((DefElem *) (list_nth(fmt_opts_defelem, i)))->arg)->val.str;
+
+ if ((strncasecmp(key_str, "bloomfilter", strlen("bloomfilter")) == 0))
+ {
+ int attnum = tupDesc->natts;
+ json_object *j_array = json_object_new_array();
+ char *token = orc_strtokx2(val_str, whitespace, ",", NULL, 0, false, false, encoding);
+ while (token)
+ {
+ for (int j = 0; j < attnum; ++j)
+ {
+ if ((strncasecmp(token, ((Form_pg_attribute) (tupDesc->attrs[j]))->attname.data, strlen(token)) == 0))
+ {
+ json_object *j_obj = json_object_new_int(j);
+ json_object_array_add(j_array, j_obj);
+ }
+ }
+ token = orc_strtokx2(NULL, whitespace, ",", NULL, 0, false, false, encoding);;
+ }
+ json_object_object_add(opt_json_object, key_str, j_array);
+ }
+ else {
+ json_object_object_add(opt_json_object, key_str,
+ json_object_new_string(val_str));
+ }
+ }
+
+ /* add encoding option for orc */
+ if (json_object_object_get(opt_json_object, "encoding") == NULL)
+ {
+ const char *encodingStr = pg_encoding_to_char(encoding);
+ char *encodingStrLower = str_tolower(encodingStr, strlen(encodingStr));
+
+ json_object_object_add(opt_json_object, "encoding",
+ json_object_new_string(encodingStrLower));
+
+ if (encodingStrLower)
+ pfree(encodingStrLower);
+ }
+
+ *json_str = NULL;
+ if (opt_json_object != NULL)
+ {
+ const char *str = json_object_to_json_string(opt_json_object);
+ *json_str = (char *) palloc0(strlen(str) + 1);
+ strcpy(*json_str, str);
+ json_object_put(opt_json_object);
+
+ elog(DEBUG3, "formatter options are %s", *json_str);
+ }
+}
+
+static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem,
+ int fmt_encoding, int segno, TupleDesc tupDesc)
+{
+ char *fmt_opts_str = NULL;
+
+ build_options_in_json(fmt_opts_defelem, fmt_encoding, &fmt_opts_str, tupDesc);
+
+ ORCFormatC *orc_format_c = ORCFormatNewORCFormatC(fmt_opts_str, segno);
+
+ if (fmt_opts_str != NULL)
+ {
+ pfree(fmt_opts_str);
+ }
+
+ return orc_format_c;
+}
+
+static void build_tuple_descrition_for_write(Relation relation,
+ ORCFormatUserData *user_data)
+{
+ for (int i = 0; i < user_data->numberOfColumns; ++i)
+ {
+ /* 64 is the name type length */
+ user_data->colNames[i] = palloc(sizeof(char) * 64);
+
+ strcpy(user_data->colNames[i],
+ relation->rd_att->attrs[i]->attname.data);
+
+ user_data->colDatatypes[i] = map_hawq_type_to_common_plan(
+ (int) (relation->rd_att->attrs[i]->atttypid));
+
+ user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
+ }
+}
+
+static void orc_parse_format_string(CopyState pstate, char *fmtstr)
+{
+ char *token;
+ const char *whitespace = " \t\n\r";
+ char nonstd_backslash = 0;
+ int encoding = GetDatabaseEncoding();
+
+ token = orc_strtokx2(fmtstr, whitespace, NULL, NULL, 0, false, true,
+ encoding);
+ /* parse user custom options. take it as is. no validation needed */
+
+ List *l = NIL;
+ bool formatter_found = false;
+
+ if (token)
+ {
+ char *key = token;
+ char *val = NULL;
+ StringInfoData key_modified;
+
+ initStringInfo(&key_modified);
+
+ while (key)
+ {
+
+ /* MPP-14467 - replace meta chars back to original */
+ resetStringInfo(&key_modified);
+ appendStringInfoString(&key_modified, key);
+ replaceStringInfoString(&key_modified, "<gpx20>", " ");
+
+ val = orc_strtokx2(NULL, whitespace, NULL, "'", nonstd_backslash,
+ true, true, encoding);
+ if (val)
+ {
+
+ if (pg_strcasecmp(key, "formatter") == 0)
+ {
+ pstate->custom_formatter_name = pstrdup(val);
+ formatter_found = true;
+ }
+ else
+
+ l = lappend(l,
+ makeDefElem(pstrdup(key_modified.data),
+ (Node *) makeString(pstrdup(val))));
+ }
+ else
+ goto error;
+
+ key = orc_strtokx2(NULL, whitespace, NULL, NULL, 0, false, false,
+ encoding);
+ }
+
+ }
+
+ if (!formatter_found)
+ {
+ /*
+ * If there is no formatter option specified, use format name. So
+ * we don't report error here.
+ */
+ }
+
+ pstate->custom_formatter_params = l;
+
+ return;
+
+ error: if (token)
+ ereport(ERROR,
+ (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("external table internal parse error at \"%s\"", token)));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("external table internal parse error at end of " "line")));
+
+}
+
+static char *orc_strtokx2(const char *s, const char *whitespace,
+ const char *delim, const char *quote, char escape, bool e_strings,
+ bool del_quotes, int encoding)
+{
+ static char *storage = NULL;/* store the local copy of the users string
+ * here */
+ static char *string = NULL; /* pointer into storage where to continue on
+ * next call */
+
+ /* variously abused variables: */
+ unsigned int offset;
+ char *start;
+ char *p;
+
+ if (s)
+ {
+ //pfree(storage);
+
+ /*
+ * We may need extra space to insert delimiter nulls for adjacent
+ * tokens. 2X the space is a gross overestimate, but it's unlikely
+ * that this code will be used on huge strings anyway.
+ */
+ storage = palloc(2 * strlen(s) + 1);
+ strcpy(storage, s);
+ string = storage;
+ }
+
+ if (!storage)
+ return NULL;
+
+ /* skip leading whitespace */
+ offset = strspn(string, whitespace);
+ start = &string[offset];
+
+ /* end of string reached? */
+ if (*start == '\0')
+ {
+ /* technically we don't need to free here, but we're nice */
+ pfree(storage);
+ storage = NULL;
+ string = NULL;
+ return NULL;
+ }
+
+ /* test if delimiter character */
+ if (delim && strchr(delim, *start))
+ {
+ /*
+ * If not at end of string, we need to insert a null to terminate the
+ * returned token. We can just overwrite the next character if it
+ * happens to be in the whitespace set ... otherwise move over the
+ * rest of the string to make room. (This is why we allocated extra
+ * space above).
+ */
+ p = start + 1;
+ if (*p != '\0')
+ {
+ if (!strchr(whitespace, *p))
+ memmove(p + 1, p, strlen(p) + 1);
+ *p = '\0';
+ string = p + 1;
+ }
+ else
+ {
+ /* at end of string, so no extra work */
+ string = p;
+ }
+
+ return start;
+ }
+
+ /* check for E string */
+ p = start;
+ if (e_strings && (*p == 'E' || *p == 'e') && p[1] == '\'')
+ {
+ quote = "'";
+ escape = '\\'; /* if std strings before, not any more */
+ p++;
+ }
+
+ /* test if quoting character */
+ if (quote && strchr(quote, *p))
+ {
+ /* okay, we have a quoted token, now scan for the closer */
+ char thisquote = *p++;
+
+ /* MPP-6698 START
+ * unfortunately, it is possible for an external table format
+ * string to be represented in the catalog in a way which is
+ * problematic to parse: when using a single quote as a QUOTE
+ * or ESCAPE character the format string will show [quote '''].
+ * since we do not want to change how this is stored at this point
+ * (as it will affect previous versions of the software already
+ * in production) the following code block will detect this scenario
+ * where 3 quote characters follow each other, with no forth one.
+ * in that case, we will skip the second one (the first is skipped
+ * just above) and the last trailing quote will be skipped below.
+ * the result will be the actual token (''') and after stripping
+ * it due to del_quotes we'll end up with (').
+ * very ugly, but will do the job...
+ */
+ char qt = quote[0];
+
+ if (strlen(p) >= 3 && p[0] == qt && p[1] == qt && p[2] != qt)
+ p++;
+ /* MPP-6698 END */
+
+ for (; *p; p += pg_encoding_mblen(encoding, p))
+ {
+ if (*p == escape && p[1] != '\0')
+ p++; /* process escaped anything */
+ else if (*p == thisquote && p[1] == thisquote)
+ p++; /* process doubled quote */
+ else if (*p == thisquote)
+ {
+ p++; /* skip trailing quote */
+ break;
+ }
+ }
+
+ /*
+ * If not at end of string, we need to insert a null to terminate the
+ * returned token. See notes above.
+ */
+ if (*p != '\0')
+ {
+ if (!strchr(whitespace, *p))
+ memmove(p + 1, p, strlen(p) + 1);
+ *p = '\0';
+ string = p + 1;
+ }
+ else
+ {
+ /* at end of string, so no extra work */
+ string = p;
+ }
+
+ /* Clean up the token if caller wants that */
+ if (del_quotes)
+ orc_strip_quotes(start, thisquote, escape, encoding);
+
+ return start;
+ }
+
+ /*
+ * Otherwise no quoting character. Scan till next whitespace, delimiter
+ * or quote. NB: at this point, *start is known not to be '\0',
+ * whitespace, delim, or quote, so we will consume at least one character.
+ */
+ offset = strcspn(start, whitespace);
+
+ if (delim)
+ {
+ unsigned int offset2 = strcspn(start, delim);
+
+ if (offset > offset2)
+ offset = offset2;
+ }
+
+ if (quote)
+ {
+ unsigned int offset2 = strcspn(start, quote);
+
+ if (offset > offset2)
+ offset = offset2;
+ }
+
+ p = start + offset;
+
+ /*
+ * If not at end of string, we need to insert a null to terminate the
+ * returned token. See notes above.
+ */
+ if (*p != '\0')
+ {
+ if (!strchr(whitespace, *p))
+ memmove(p + 1, p, strlen(p) + 1);
+ *p = '\0';
+ string = p + 1;
+ }
+ else
+ {
+ /* at end of string, so no extra work */
+ string = p;
+ }
+
+ return start;
+}
+
+static void orc_strip_quotes(char *source, char quote, char escape,
+ int encoding)
+{
+ char *src;
+ char *dst;
+
+ Assert(source);Assert(quote);
+
+ src = dst = source;
+
+ if (*src && *src == quote)
+ src++; /* skip leading quote */
+
+ while (*src)
+ {
+ char c = *src;
+ int i;
+
+ if (c == quote && src[1] == '\0')
+ break; /* skip trailing quote */
+ else if (c == quote && src[1] == quote)
+ src++; /* process doubled quote */
+ else if (c == escape && src[1] != '\0')
+ src++; /* process escaped character */
+
+ i = pg_encoding_mblen(encoding, src);
+ while (i--)
+ *dst++ = *src++;
+ }
+
+ *dst = '\0';
+}
diff --git a/contrib/orc/orc_init.sql b/contrib/orc/orc_init.sql
index fa720b7..cae2318 100644
--- a/contrib/orc/orc_init.sql
+++ b/contrib/orc/orc_init.sql
@@ -21,7 +21,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.orc_validate_datatypes() RETURNS void
AS '$libdir/orc.so', 'orc_validate_datatypes'
LANGUAGE C STABLE;
-
+/*
CREATE OR REPLACE FUNCTION pg_catalog.orc_beginscan() RETURNS bytea
AS '$libdir/orc.so', 'orc_beginscan'
LANGUAGE C STABLE;
@@ -45,6 +45,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.orc_stopscan() RETURNS void
AS '$libdir/orc.so', 'orc_stopscan'
LANGUAGE C STABLE;
+*/
CREATE OR REPLACE FUNCTION pg_catalog.orc_insert_init() RETURNS bytea
AS '$libdir/orc.so', 'orc_insert_init'
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index fce5093..4fe0446 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1539,6 +1539,9 @@
/* The type of the table that is being scanned */
TableType tableType;
+ char *hivehost;
+ int hiveport;
+ char *hivepath;
/* Runtime filter */
struct RuntimeFilterState *runtimeFilter;
diff --git a/src/include/utils/uri.h b/src/include/utils/uri.h
index 9c5aedd..28e099c 100644
--- a/src/include/utils/uri.h
+++ b/src/include/utils/uri.h
@@ -33,7 +33,8 @@
URI_GPFDIST,
URI_CUSTOM,
URI_GPFDISTS,
- URI_HDFS
+ URI_HDFS,
+ URI_HIVE
} UriProtocol;
#define PROTOCOL_FILE "file://"
@@ -43,6 +44,7 @@
#define PROTOCOL_GPFDISTS "gpfdists://"
#define PROTOCOL_PXF "pxf://"
#define PROTOCOL_HDFS "hdfs://"
+#define PROTOCOL_HIVE "hive://"
/*
* sometimes we don't want to parse the whole URI but just take a peek at