HAWQ-1708. Add ORC writer in hawq
diff --git a/contrib/orc/orc.c b/contrib/orc/orc.c
index ac2f878..8dbc1c1 100644
--- a/contrib/orc/orc.c
+++ b/contrib/orc/orc.c
@@ -46,6 +46,73 @@
 PG_FUNCTION_INFO_V1(orc_validate_encodings);
 PG_FUNCTION_INFO_V1(orc_validate_datatypes);
 
+/* Accessors for pluggable storage format ORC */
+PG_FUNCTION_INFO_V1(orc_insert_init);
+PG_FUNCTION_INFO_V1(orc_insert);
+PG_FUNCTION_INFO_V1(orc_insert_finish);
+
+///* Definitions of validators for pluggable storage format ORC */
+//Datum orc_validate_interfaces(PG_FUNCTION_ARGS);
+//Datum orc_validate_options(PG_FUNCTION_ARGS);
+//Datum orc_validate_encodings(PG_FUNCTION_ARGS);
+//Datum orc_validate_datatypes(PG_FUNCTION_ARGS);
+//
+///* Definitions of accessors for pluggable storage format ORC */
+//Datum orc_insert_init(PG_FUNCTION_ARGS);
+//Datum orc_insert(PG_FUNCTION_ARGS);
+//Datum orc_insert_finish(PG_FUNCTION_ARGS);
+
+typedef struct
+{
+  int64_t second;
+  int64_t nanosecond;
+} TimestampType;
+
+typedef struct ORCFormatUserData
+{
+  ORCFormatC *fmt;
+  char **colNames;
+  int *colDatatypes;
+  int64_t *colDatatypeMods;
+  int numberOfColumns;
+  char **colRawValues;
+  Datum *colValues;
+  uint64_t *colValLength;
+  bits8 **colValNullBitmap;
+  int **colValDims;
+  char **colAddresses;
+  bool *colIsNulls;
+  bool *colToReads;
+  bool *colSpeedUpPossible;
+  bool *colSpeedUp;
+
+  bool *nulls;
+  Datum *datums;
+  int reserved;
+
+  TimestampType *colTimestamp;
+
+  int nSplits;
+  ORCFormatFileSplit *splits;
+} ORCFormatUserData;
+
+static FmgrInfo *get_orc_function(char *formatter_name, char *function_name);
+static void get_insert_functions(ExternalInsertDesc ext_insert_desc);
+static void init_format_user_data_for_write(TupleDesc tup_desc,
+    ORCFormatUserData *user_data);
+static void build_options_in_json(List *fmt_opts_defelem, int encoding,
+    char **json_str, TupleDesc tupDesc);
+static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem,
+    int encoding, int segno, TupleDesc tupDesc);
+static void build_tuple_descrition_for_write(Relation relation,
+    ORCFormatUserData *user_data);
+static void orc_parse_format_string(CopyState pstate, char *fmtstr);
+static char *orc_strtokx2(const char *s, const char *whitespace,
+    const char *delim, const char *quote, char escape, bool e_strings,
+    bool del_quotes, int encoding);
+static void orc_strip_quotes(char *source, char quote, char escape,
+    int encoding);
+
 /* Implementation of validators for pluggable storage format ORC */
 
 /*
@@ -255,3 +322,1022 @@
 
   PG_RETURN_VOID() ;
 }
+
+/*
+ * ExternalInsertDesc
+ * orc_insert_init(Relation relation,
+ *                 int formatterType,
+ *                 char *formatterName)
+ */
+Datum orc_insert_init(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage) (fcinfo->context);
+  Relation relation = ps->ps_relation;
+  int formatterType = ps->ps_formatter_type;
+  char *formatterName = ps->ps_formatter_name;
+
+  /* 1. Allocate and initialize the insert descriptor */
+  ExternalInsertDesc eid = palloc(sizeof(ExternalInsertDescData));
+  eid->ext_formatter_type = formatterType;
+  eid->ext_formatter_name = formatterName;
+
+  /* 1.1 Setup insert functions */
+  get_insert_functions(eid);
+
+  /* 1.2 Initialize basic information */
+  eid->ext_rel = relation;
+  eid->ext_noop = (Gp_role == GP_ROLE_DISPATCH);
+  eid->ext_formatter_data = NULL;
+
+  /* 1.3 Get URI string */
+  ExtTableEntry *ete = GetExtTableEntry(RelationGetRelid(relation));
+  Value *v;
+  char *uri_str;
+  int segindex = GetQEIndex();
+  int num_segs = GetQEGangNum();
+  int num_urls = list_length(ete->locations);
+  int my_url = segindex % num_urls;
+
+  if (num_urls > num_segs)
+    ereport(ERROR,
+        (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("External table has more URLs then available primary " "segments that can write into them")));
+
+  v = list_nth(ete->locations, my_url);
+  uri_str = pstrdup(v->val.str);
+
+  /*when it is hive protocol, transfer the url*/
+  Uri* tmp = ParseExternalTableUri(uri_str);
+  if (tmp->protocol == URI_HIVE)
+  {
+    if (tmp->hostname)
+    {
+      pfree(tmp->hostname);
+    }
+    tmp->hostname = pstrdup(ps->ps_scan_state->hivehost);
+    tmp->port = ps->ps_scan_state->hiveport;
+    if (tmp->path)
+    {
+      pfree(tmp->path);
+    }
+    tmp->path = pstrdup(ps->ps_scan_state->hivepath);
+    int uriLen = sizeof("hdfs") - 1 + /* *** This is a bad imp. */
+    sizeof("://") - 1 + strlen(tmp->hostname) + sizeof(':')
+        + sizeof("65535") - 1 + strlen(tmp->path);
+
+    char* url = palloc(uriLen * sizeof(char));
+    sprintf(url, "%s://%s:%d%s", "hdfs", tmp->hostname, tmp->port,
+        tmp->path);
+    eid->ext_uri = url;
+  }
+  else
+  {
+    eid->ext_uri = uri_str;
+  }
+  FreeExternalTableUri(tmp);
+
+  /* 1.4 Allocate and initialize structure which track data parsing state */
+  eid->ext_pstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
+  eid->ext_tupDesc = RelationGetDescr(relation);
+  eid->ext_values = (Datum *) palloc(eid->ext_tupDesc->natts * sizeof(Datum));
+  eid->ext_nulls = (bool *) palloc(eid->ext_tupDesc->natts * sizeof(bool));
+
+  /* 1.5 Get format options */
+  List *fmt_opts = NIL;
+  fmt_opts = lappend(fmt_opts, makeString(pstrdup(ete->fmtopts)));
+
+  /* 1.6 Initialize parse state */
+  /* 1.6.1 Initialize basic information for pstate */
+  CopyState pstate = eid->ext_pstate;
+  pstate->fe_eof = false;
+  pstate->eol_type = EOL_UNKNOWN;
+  pstate->eol_str = NULL;
+  pstate->cur_relname = RelationGetRelationName(relation);
+  pstate->cur_lineno = 0;
+  pstate->err_loc_type = ROWNUM_ORIGINAL;
+  pstate->cur_attname = NULL;
+  pstate->raw_buf_done = true; /* true so we will read data in first run */
+  pstate->line_done = true;
+  pstate->bytesread = 0;
+  pstate->custom = false;
+  pstate->header_line = false;
+  pstate->fill_missing = false;
+  pstate->line_buf_converted = false;
+  pstate->raw_buf_index = 0;
+  pstate->processed = 0;
+  pstate->filename = uri_str;
+  pstate->copy_dest = COPY_EXTERNAL_SOURCE;
+  pstate->missing_bytes = 0;
+  pstate->rel = relation;
+
+  /* 1.6.2 Setup encoding information */
+  /*
+   * Set up encoding conversion info.  Even if the client and server
+   * encodings are the same, we must apply pg_client_to_server() to validate
+   * data in multibyte encodings.
+   *
+   * Each external table specifies the encoding of its external data. We will
+   * therefore set a client encoding and client-to-server conversion procedure
+   * in here (server-to-client in WET) and these will be used in the data
+   * conversion routines (in copy.c CopyReadLineXXX(), etc).
+   */
+  int fmt_encoding = ete->encoding;
+  Insist(PG_VALID_ENCODING(fmt_encoding));
+  pstate->client_encoding = fmt_encoding;
+  Oid conversion_proc = FindDefaultConversionProc(GetDatabaseEncoding(),
+      fmt_encoding);
+
+  if (OidIsValid(conversion_proc))
+  {
+    /* conversion proc found */
+    pstate->enc_conversion_proc = palloc(sizeof(FmgrInfo));
+    fmgr_info(conversion_proc, pstate->enc_conversion_proc);
+  }
+  else
+  {
+    /* no conversion function (both encodings are probably the same) */
+    pstate->enc_conversion_proc = NULL;
+  }
+
+  pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
+  pstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(
+      pstate->client_encoding);
+
+  /* 1.6.3 Parse format options */
+  char *format_str = pstrdup((char *) strVal(linitial(fmt_opts)));
+  orc_parse_format_string(pstate, format_str);
+
+  /* 1.6.4 Setup tuple description */
+  TupleDesc tup_desc = eid->ext_tupDesc;
+  pstate->attr_offsets = (int *) palloc(tup_desc->natts * sizeof(int));
+
+  /* 1.6.5 Generate or convert list of attributes to process */
+  pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
+
+  /* 1.6.6 Convert FORCE NOT NULL name list to per-column flags, check validity */
+  pstate->force_notnull_flags = (bool *) palloc0(
+      tup_desc->natts * sizeof(bool));
+  if (pstate->force_notnull)
+  {
+    List *attnums;
+    ListCell *cur;
+
+    attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
+
+    foreach(cur, attnums)
+    {
+      int attnum = lfirst_int(cur);
+      pstate->force_notnull_flags[attnum - 1] = true;
+    }
+  }
+
+  /* 1.6.7 Take care of state that is WET specific */
+  Form_pg_attribute *attr = tup_desc->attrs;
+  ListCell *cur;
+
+  pstate->null_print_client = pstate->null_print; /* default */
+  pstate->fe_msgbuf = makeStringInfo(); /* use fe_msgbuf as a per-row buffer */
+  pstate->out_functions = (FmgrInfo *) palloc(
+      tup_desc->natts * sizeof(FmgrInfo));
+
+  foreach(cur, pstate->attnumlist)
+  /* Get info about the columns need to process */
+  {
+    int attnum = lfirst_int(cur);
+    Oid out_func_oid;
+    bool isvarlena;
+
+    getTypeOutputInfo(attr[attnum - 1]->atttypid, &out_func_oid,
+        &isvarlena);
+    fmgr_info(out_func_oid, &pstate->out_functions[attnum - 1]);
+  }
+
+  /*
+   * We need to convert null_print to client encoding, because it
+   * will be sent directly with CopySendString.
+   */
+  if (pstate->need_transcoding)
+  {
+    pstate->null_print_client = pg_server_to_custom(pstate->null_print,
+        pstate->null_print_len, pstate->client_encoding,
+        pstate->enc_conversion_proc);
+  }
+
+  /* 1.6.8 Create temporary memory context for per row process */
+  /*
+   * Create a temporary memory context that we can reset once per row to
+   * recover palloc'd memory.  This avoids any problems with leaks inside
+   * datatype input or output routines, and should be faster than retail
+   * pfree's anyway.
+   */
+  pstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+      "ExtTableMemCxt",
+      ALLOCSET_DEFAULT_MINSIZE,
+      ALLOCSET_DEFAULT_INITSIZE,
+      ALLOCSET_DEFAULT_MAXSIZE);
+
+  /* 1.7 Initialize formatter data */
+  eid->ext_formatter_data = (FormatterData *) palloc0(sizeof(FormatterData));
+  eid->ext_formatter_data->fmt_perrow_ctx = eid->ext_pstate->rowcontext;
+
+  /* 2. Setup user data */
+
+  /* 2.1 Initialize user data */
+  ORCFormatUserData *user_data = (ORCFormatUserData *) palloc0(
+      sizeof(ORCFormatUserData));
+  init_format_user_data_for_write(tup_desc, user_data);
+
+  /* 2.2 Create formatter instance */
+  List *fmt_opts_defelem = pstate->custom_formatter_params;
+  user_data->fmt = create_formatter_instance(fmt_opts_defelem, fmt_encoding,
+      ps->ps_segno, tup_desc);
+
+  /* 2.4 Build tuple description */
+  build_tuple_descrition_for_write(relation, user_data);
+
+  /* 2.5 Save user data */
+  eid->ext_ps_user_data = (void *) user_data;
+
+  if (enable_secure_filesystem && Gp_role == GP_ROLE_EXECUTE)
+  {
+    char *token = find_filesystem_credential_with_uri(uri_str);
+    SetToken(uri_str, token);
+  }
+  /* 3. Begin insert with the formatter */
+  ORCFormatBeginInsertORCFormatC(user_data->fmt, eid->ext_uri,
+      user_data->colNames, user_data->colDatatypes,
+      user_data->colDatatypeMods, user_data->numberOfColumns);
+
+  ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+  if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+  {
+    elog(ERROR, "ORC: failed to begin insert: %s (%d)",
+    e->errMessage, e->errCode);
+  }
+
+  /* 4. Save the result */
+  ps->ps_ext_insert_desc = eid;
+
+  PG_RETURN_POINTER(eid);
+}
+
+/*
+ * Oid
+ * orc_insert(ExternalInsertDesc extInsertDesc,
+ *            TupleTableSlot *tupTableSlot)
+ */
+Datum orc_insert(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage) (fcinfo->context);
+  ExternalInsertDesc eid = ps->ps_ext_insert_desc;
+  TupleTableSlot *tts = ps->ps_tuple_table_slot;
+
+  ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data);
+
+  user_data->colValues = slot_get_values(tts);
+  user_data->colIsNulls = slot_get_isnull(tts);
+
+  static bool DUMMY_BOOL = true;
+  static int8_t DUMMY_INT8 = 0;
+  static int16_t DUMMY_INT16 = 0;
+  static int32_t DUMMY_INT32 = 0;
+  static int64_t DUMMY_INT64 = 0;
+  static float DUMMY_FLOAT = 0.0;
+  static double DUMMY_DOUBLE = 0.0;
+  static char DUMMY_TEXT[1] = "";
+  static int32_t DUMMY_DATE = 0;
+  static int64_t DUMMY_TIME = 0;
+  static TimestampType DUMMY_TIMESTAMP;
+  DUMMY_TIMESTAMP.second = 0;
+  DUMMY_TIMESTAMP.nanosecond = 0;
+  static int16_t DUMMY_INT16_ARRAY[1] =
+  { 0.0 };
+  static int32_t DUMMY_INT32_ARRAY[1] =
+  { 0.0 };
+  static int64_t DUMMY_INT64_ARRAY[1] =
+  { 0.0 };
+  static float DUMMY_FLOAT_ARRAY[1] =
+  { 0.0 };
+  static double DUMMY_DOUBLE_ARRAY[1] =
+  { 0.0 };
+
+  TupleDesc tupdesc = tts->tts_tupleDescriptor;
+  user_data->numberOfColumns = tupdesc->natts;
+
+  MemoryContext per_row_context = eid->ext_pstate->rowcontext;
+  MemoryContextReset(per_row_context);
+  MemoryContext old_context = MemoryContextSwitchTo(per_row_context);
+
+  /* Get column values */
+  for (int i = 0; i < user_data->numberOfColumns; ++i)
+  {
+    int dataType = (int) (tupdesc->attrs[i]->atttypid);
+
+    user_data->colRawValues[i] = NULL;
+    user_data->colValNullBitmap[i] = NULL;
+
+    if (user_data->colIsNulls[i])
+    {
+      if (dataType == HAWQ_TYPE_CHAR)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_INT8);
+      }
+      else if (dataType == HAWQ_TYPE_INT2)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_INT16);
+      }
+      else if (dataType == HAWQ_TYPE_INT4)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_INT32);
+      }
+      else if (dataType == HAWQ_TYPE_INT8)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_INT64);
+      }
+      else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
+          || dataType == HAWQ_TYPE_BPCHAR
+          || dataType == HAWQ_TYPE_VARCHAR
+          || dataType == HAWQ_TYPE_NUMERIC)
+      {
+        user_data->colRawValues[i] = (char *) (DUMMY_TEXT);
+      }
+      else if (dataType == HAWQ_TYPE_FLOAT4)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_FLOAT);
+      }
+      else if (dataType == HAWQ_TYPE_FLOAT8)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_DOUBLE);
+      }
+      else if (dataType == HAWQ_TYPE_BOOL)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_BOOL);
+      }
+      else if (dataType == HAWQ_TYPE_DATE)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_DATE);
+      }
+      else if (dataType == HAWQ_TYPE_TIME)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_TIME);
+      }
+      else if (dataType == HAWQ_TYPE_TIMESTAMP)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_TIMESTAMP);
+      }
+      else if (dataType == HAWQ_TYPE_INT2_ARRAY)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_INT16_ARRAY);
+      }
+      else if (dataType == HAWQ_TYPE_INT4_ARRAY)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_INT32_ARRAY);
+      }
+      else if (dataType == HAWQ_TYPE_INT8_ARRAY)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_INT64_ARRAY);
+      }
+      else if (dataType == HAWQ_TYPE_FLOAT4_ARRAY)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_FLOAT_ARRAY);
+      }
+      else if (dataType == HAWQ_TYPE_FLOAT8_ARRAY)
+      {
+        user_data->colRawValues[i] = (char *) (&DUMMY_DOUBLE_ARRAY);
+      }
+      else if (dataType == HAWQ_TYPE_INVALID)
+      {
+        elog(ERROR, "HAWQ data type with id %d is invalid", dataType);
+      }
+      else
+      {
+        elog(
+            ERROR, "HAWQ data type with id %d is not supported yet", dataType);
+      }
+
+      continue;
+    }
+
+    if (dataType == HAWQ_TYPE_CHAR || dataType == HAWQ_TYPE_INT2
+        || dataType == HAWQ_TYPE_INT4 || dataType == HAWQ_TYPE_INT8
+        || dataType == HAWQ_TYPE_FLOAT4 || dataType == HAWQ_TYPE_FLOAT8
+        || dataType == HAWQ_TYPE_BOOL || dataType == HAWQ_TYPE_TIME)
+    {
+      user_data->colRawValues[i] = (char *) (&(user_data->colValues[i]));
+    }
+    else if (dataType == HAWQ_TYPE_TIMESTAMP)
+    {
+      int64_t *timestamp = (int64_t *) (&(user_data->colValues[i]));
+      user_data->colTimestamp[i].second = *timestamp / 1000000
+          + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24;
+      user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000;
+      if (user_data->colTimestamp[i].nanosecond < 0)
+        user_data->colTimestamp[i].nanosecond += 1000000000;
+      user_data->colRawValues[i] =
+          (char *) (&(user_data->colTimestamp[i]));
+    }
+    else if (dataType == HAWQ_TYPE_DATE)
+    {
+      int *date = (int *) (&(user_data->colValues[i]));
+      *date += POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE;
+      user_data->colRawValues[i] = (char *) (&(user_data->colValues[i]));
+    }
+    else if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
+        || dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR
+        || dataType == HAWQ_TYPE_NUMERIC)
+    {
+      user_data->colRawValues[i] = OutputFunctionCall(
+          &(eid->ext_pstate->out_functions[i]),
+          user_data->colValues[i]);
+
+      if (dataType != HAWQ_TYPE_BYTE && eid->ext_pstate->need_transcoding)
+      {
+        char *cvt = NULL;
+
+        cvt = pg_server_to_custom(user_data->colRawValues[i],
+            strlen(user_data->colRawValues[i]),
+            eid->ext_pstate->client_encoding,
+            eid->ext_pstate->enc_conversion_proc);
+
+        if (cvt != user_data->colRawValues[i])
+        {
+          pfree(user_data->colRawValues[i]);
+        }
+
+        user_data->colRawValues[i] = cvt;
+      }
+    }
+    else if (dataType == HAWQ_TYPE_INT2_ARRAY
+        || dataType == HAWQ_TYPE_INT4_ARRAY
+        || dataType == HAWQ_TYPE_INT8_ARRAY
+        || dataType == HAWQ_TYPE_FLOAT4_ARRAY
+        || dataType == HAWQ_TYPE_FLOAT8_ARRAY)
+    {
+      ArrayType *arr = DatumGetArrayTypeP(user_data->colValues[i]);
+      user_data->colValLength[i] = ARR_SIZE(arr) - ARR_DATA_OFFSET(arr);
+      user_data->colRawValues[i] = ARR_DATA_PTR(arr);
+      user_data->colValNullBitmap[i] = ARR_NULLBITMAP(arr);
+      // Now we only support 1 dimension array
+      if (ARR_NDIM(arr) > 1)
+      {
+        elog(ERROR, "Now we only support 1 dimension array in orc format,"
+            " your array dimension is %d", ARR_NDIM(arr));
+      }
+      else if (ARR_NDIM(arr) == 1)
+      {
+        user_data->colValDims[i] = ARR_DIMS(arr);
+      }
+      else
+      {
+        user_data->colValDims[i] = NULL;
+      }
+    }
+    else if (dataType == HAWQ_TYPE_INVALID)
+    {
+      elog(ERROR, "HAWQ data type with id %d is invalid", dataType);
+    }
+    else
+    {
+      elog(
+      ERROR, "HAWQ data type with id %d is not supported yet", dataType);
+    }
+  }
+
+  /* Pass to formatter to output */
+  ORCFormatInsertORCFormatC(user_data->fmt, user_data->colDatatypes,
+      user_data->colRawValues, user_data->colValLength,
+      user_data->colValNullBitmap, user_data->colValDims,
+      user_data->colIsNulls);
+
+  ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+  if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+  {
+    elog(ERROR, "orc_insert: failed to insert: %s(%d)",
+    e->errMessage, e->errCode);
+  }
+
+  for (int i = 0; i < user_data->numberOfColumns; ++i)
+  {
+    int dataType = (int) (tupdesc->attrs[i]->atttypid);
+
+    if (user_data->colIsNulls[i])
+    {
+      continue;
+    }
+
+    if (dataType == HAWQ_TYPE_TEXT || dataType == HAWQ_TYPE_BYTE
+        || dataType == HAWQ_TYPE_BPCHAR || dataType == HAWQ_TYPE_VARCHAR
+        || dataType == HAWQ_TYPE_NUMERIC)
+    {
+      if (user_data->colRawValues[i])
+      {
+        pfree(user_data->colRawValues[i]);
+      }
+    }
+  }
+
+  ps->ps_tuple_oid = InvalidOid;
+
+  MemoryContextSwitchTo(old_context);
+
+  PG_RETURN_OID(InvalidOid);
+}
+
+/*
+ * void
+ * orc_insert_finish(ExternalInsertDesc extInsertDesc)
+ */
+Datum orc_insert_finish(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage) (fcinfo->context);
+  ExternalInsertDesc eid = ps->ps_ext_insert_desc;
+
+  ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data);
+
+  ORCFormatEndInsertORCFormatC(user_data->fmt);
+  ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+  if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+  {
+    elog(ERROR, "ORC: failed to finish insert: %s(%d)",
+    e->errMessage, e->errCode);
+  }
+
+  ORCFormatFreeORCFormatC(&(user_data->fmt));
+
+  pfree(user_data->colDatatypes);
+  pfree(user_data->colRawValues);
+  pfree(user_data->colValLength);
+  pfree(user_data->colAddresses);
+  for (int i = 0; i < user_data->numberOfColumns; ++i)
+  {
+    pfree(user_data->colNames[i]);
+  }
+  pfree(user_data->colNames);
+  pfree(user_data);
+
+  if (eid->ext_formatter_data)
+    pfree(eid->ext_formatter_data);
+
+  if (eid->ext_formatter_name)
+    pfree(eid->ext_formatter_name);
+
+  pfree(eid);
+
+  PG_RETURN_VOID() ;
+}
+
+static FmgrInfo *get_orc_function(char *formatter_name, char *function_name)
+{
+  Assert(formatter_name);Assert(function_name);
+
+  Oid procOid = InvalidOid;
+  FmgrInfo *procInfo = NULL;
+
+  procOid = LookupPlugStorageValidatorFunc(formatter_name, function_name);
+
+  if (OidIsValid(procOid))
+  {
+    procInfo = (FmgrInfo *) palloc(sizeof(FmgrInfo));
+    fmgr_info(procOid, procInfo);
+  }
+  else
+  {
+    elog(ERROR, "%s_%s function was not found for pluggable storage",
+    formatter_name, function_name);
+  }
+
+  return procInfo;
+}
+
+static void get_insert_functions(ExternalInsertDesc ext_insert_desc)
+{
+  ext_insert_desc->ext_ps_insert_funcs.insert_init = get_orc_function("orc",
+      "insert_init");
+
+  ext_insert_desc->ext_ps_insert_funcs.insert = get_orc_function("orc",
+      "insert");
+
+  ext_insert_desc->ext_ps_insert_funcs.insert_finish = get_orc_function("orc",
+      "insert_finish");
+}
+
+static void init_format_user_data_for_write(TupleDesc tup_desc,
+    ORCFormatUserData *user_data)
+{
+  user_data->numberOfColumns = tup_desc->natts;
+  user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns);
+  user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns);
+  user_data->colDatatypeMods = palloc0(
+      sizeof(int64_t) * user_data->numberOfColumns);
+  user_data->colRawValues = palloc0(
+      sizeof(char *) * user_data->numberOfColumns);
+  user_data->colValLength = palloc0(
+      sizeof(uint64_t) * user_data->numberOfColumns);
+  user_data->colValNullBitmap = palloc0(
+      sizeof(bits8 *) * user_data->numberOfColumns);
+  user_data->colValDims = palloc0(sizeof(int *) * user_data->numberOfColumns);
+  user_data->colAddresses = palloc0(
+      sizeof(char *) * user_data->numberOfColumns);
+  user_data->colTimestamp = palloc0(
+      sizeof(TimestampType) * user_data->numberOfColumns);
+}
+
+static void build_options_in_json(List *fmt_opts_defelem, int encoding,
+    char **json_str, TupleDesc tupDesc)
+{
+  struct json_object *opt_json_object = json_object_new_object();
+
+  /* add format options for the formatter */
+  char *key_str = NULL;
+  char *val_str = NULL;
+  const char *whitespace = " \t\n\r";
+
+  int nargs = list_length(fmt_opts_defelem);
+  for (int i = 0; i < nargs; ++i)
+  {
+    key_str = ((DefElem *) (list_nth(fmt_opts_defelem, i)))->defname;
+    val_str =
+        ((Value *) ((DefElem *) (list_nth(fmt_opts_defelem, i)))->arg)->val.str;
+
+    if ((strncasecmp(key_str, "bloomfilter", strlen("bloomfilter")) == 0))
+    {
+      int attnum = tupDesc->natts;
+      json_object *j_array = json_object_new_array();
+      char *token = orc_strtokx2(val_str, whitespace, ",", NULL, 0, false, false, encoding);
+      while (token)
+      {
+        for (int j = 0; j < attnum; ++j)
+        {
+          if ((strncasecmp(token, ((Form_pg_attribute) (tupDesc->attrs[j]))->attname.data, strlen(token)) == 0))
+          {
+            json_object *j_obj = json_object_new_int(j);
+            json_object_array_add(j_array, j_obj);
+          }
+        }
+        token = orc_strtokx2(NULL, whitespace, ",", NULL, 0, false, false, encoding);;
+      }
+      json_object_object_add(opt_json_object, key_str, j_array);
+    }
+    else {
+      json_object_object_add(opt_json_object, key_str,
+        json_object_new_string(val_str));
+    }
+  }
+
+  /* add encoding option for orc */
+  if (json_object_object_get(opt_json_object, "encoding") == NULL)
+  {
+    const char *encodingStr = pg_encoding_to_char(encoding);
+    char *encodingStrLower = str_tolower(encodingStr, strlen(encodingStr));
+
+    json_object_object_add(opt_json_object, "encoding",
+        json_object_new_string(encodingStrLower));
+
+    if (encodingStrLower)
+      pfree(encodingStrLower);
+  }
+
+  *json_str = NULL;
+  if (opt_json_object != NULL)
+  {
+    const char *str = json_object_to_json_string(opt_json_object);
+    *json_str = (char *) palloc0(strlen(str) + 1);
+    strcpy(*json_str, str);
+    json_object_put(opt_json_object);
+
+    elog(DEBUG3, "formatter options are %s", *json_str);
+  }
+}
+
+static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem,
+    int fmt_encoding, int segno, TupleDesc tupDesc)
+{
+  char *fmt_opts_str = NULL;
+
+  build_options_in_json(fmt_opts_defelem, fmt_encoding, &fmt_opts_str, tupDesc);
+
+  ORCFormatC *orc_format_c = ORCFormatNewORCFormatC(fmt_opts_str, segno);
+
+  if (fmt_opts_str != NULL)
+  {
+    pfree(fmt_opts_str);
+  }
+
+  return orc_format_c;
+}
+
+static void build_tuple_descrition_for_write(Relation relation,
+    ORCFormatUserData *user_data)
+{
+  for (int i = 0; i < user_data->numberOfColumns; ++i)
+  {
+    /* 64 is the name type length */
+    user_data->colNames[i] = palloc(sizeof(char) * 64);
+
+    strcpy(user_data->colNames[i],
+        relation->rd_att->attrs[i]->attname.data);
+
+    user_data->colDatatypes[i] = map_hawq_type_to_common_plan(
+        (int) (relation->rd_att->attrs[i]->atttypid));
+
+    user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
+  }
+}
+
+static void orc_parse_format_string(CopyState pstate, char *fmtstr)
+{
+  char *token;
+  const char *whitespace = " \t\n\r";
+  char nonstd_backslash = 0;
+  int encoding = GetDatabaseEncoding();
+
+  token = orc_strtokx2(fmtstr, whitespace, NULL, NULL, 0, false, true,
+      encoding);
+  /* parse user custom options. take it as is. no validation needed */
+
+  List *l = NIL;
+  bool formatter_found = false;
+
+  if (token)
+  {
+    char *key = token;
+    char *val = NULL;
+    StringInfoData key_modified;
+
+    initStringInfo(&key_modified);
+
+    while (key)
+    {
+
+      /* MPP-14467 - replace meta chars back to original */
+      resetStringInfo(&key_modified);
+      appendStringInfoString(&key_modified, key);
+      replaceStringInfoString(&key_modified, "<gpx20>", " ");
+
+      val = orc_strtokx2(NULL, whitespace, NULL, "'", nonstd_backslash,
+      true, true, encoding);
+      if (val)
+      {
+
+        if (pg_strcasecmp(key, "formatter") == 0)
+        {
+          pstate->custom_formatter_name = pstrdup(val);
+          formatter_found = true;
+        }
+        else
+
+          l = lappend(l,
+              makeDefElem(pstrdup(key_modified.data),
+                  (Node *) makeString(pstrdup(val))));
+      }
+      else
+        goto error;
+
+      key = orc_strtokx2(NULL, whitespace, NULL, NULL, 0, false, false,
+          encoding);
+    }
+
+  }
+
+  if (!formatter_found)
+  {
+    /*
+     * If there is no formatter option specified, use format name. So
+     * we don't report error here.
+     */
+  }
+
+  pstate->custom_formatter_params = l;
+
+  return;
+
+  error: if (token)
+    ereport(ERROR,
+        (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("external table internal parse error at \"%s\"", token)));
+  else
+    ereport(ERROR,
+        (errcode(ERRCODE_GP_INTERNAL_ERROR), errmsg("external table internal parse error at end of " "line")));
+
+}
+
+static char *orc_strtokx2(const char *s, const char *whitespace,
+    const char *delim, const char *quote, char escape, bool e_strings,
+    bool del_quotes, int encoding)
+{
+  static char *storage = NULL;/* store the local copy of the users string
+   * here */
+  static char *string = NULL; /* pointer into storage where to continue on
+   * next call */
+
+  /* variously abused variables: */
+  unsigned int offset;
+  char *start;
+  char *p;
+
+  if (s)
+  {
+    //pfree(storage);
+
+    /*
+     * We may need extra space to insert delimiter nulls for adjacent
+     * tokens.  2X the space is a gross overestimate, but it's unlikely
+     * that this code will be used on huge strings anyway.
+     */
+    storage = palloc(2 * strlen(s) + 1);
+    strcpy(storage, s);
+    string = storage;
+  }
+
+  if (!storage)
+    return NULL;
+
+  /* skip leading whitespace */
+  offset = strspn(string, whitespace);
+  start = &string[offset];
+
+  /* end of string reached? */
+  if (*start == '\0')
+  {
+    /* technically we don't need to free here, but we're nice */
+    pfree(storage);
+    storage = NULL;
+    string = NULL;
+    return NULL;
+  }
+
+  /* test if delimiter character */
+  if (delim && strchr(delim, *start))
+  {
+    /*
+     * If not at end of string, we need to insert a null to terminate the
+     * returned token.  We can just overwrite the next character if it
+     * happens to be in the whitespace set ... otherwise move over the
+     * rest of the string to make room.  (This is why we allocated extra
+     * space above).
+     */
+    p = start + 1;
+    if (*p != '\0')
+    {
+      if (!strchr(whitespace, *p))
+        memmove(p + 1, p, strlen(p) + 1);
+      *p = '\0';
+      string = p + 1;
+    }
+    else
+    {
+      /* at end of string, so no extra work */
+      string = p;
+    }
+
+    return start;
+  }
+
+  /* check for E string */
+  p = start;
+  if (e_strings && (*p == 'E' || *p == 'e') && p[1] == '\'')
+  {
+    quote = "'";
+    escape = '\\'; /* if std strings before, not any more */
+    p++;
+  }
+
+  /* test if quoting character */
+  if (quote && strchr(quote, *p))
+  {
+    /* okay, we have a quoted token, now scan for the closer */
+    char thisquote = *p++;
+
+    /* MPP-6698 START
+     * unfortunately, it is possible for an external table format
+     * string to be represented in the catalog in a way which is
+     * problematic to parse: when using a single quote as a QUOTE
+     * or ESCAPE character the format string will show [quote '''].
+     * since we do not want to change how this is stored at this point
+     * (as it will affect previous versions of the software already
+     * in production) the following code block will detect this scenario
+     * where 3 quote characters follow each other, with no forth one.
+     * in that case, we will skip the second one (the first is skipped
+     * just above) and the last trailing quote will be skipped below.
+     * the result will be the actual token (''') and after stripping
+     * it due to del_quotes we'll end up with (').
+     * very ugly, but will do the job...
+     */
+    char qt = quote[0];
+
+    if (strlen(p) >= 3 && p[0] == qt && p[1] == qt && p[2] != qt)
+      p++;
+    /* MPP-6698 END */
+
+    for (; *p; p += pg_encoding_mblen(encoding, p))
+    {
+      if (*p == escape && p[1] != '\0')
+        p++; /* process escaped anything */
+      else if (*p == thisquote && p[1] == thisquote)
+        p++; /* process doubled quote */
+      else if (*p == thisquote)
+      {
+        p++; /* skip trailing quote */
+        break;
+      }
+    }
+
+    /*
+     * If not at end of string, we need to insert a null to terminate the
+     * returned token.  See notes above.
+     */
+    if (*p != '\0')
+    {
+      if (!strchr(whitespace, *p))
+        memmove(p + 1, p, strlen(p) + 1);
+      *p = '\0';
+      string = p + 1;
+    }
+    else
+    {
+      /* at end of string, so no extra work */
+      string = p;
+    }
+
+    /* Clean up the token if caller wants that */
+    if (del_quotes)
+      orc_strip_quotes(start, thisquote, escape, encoding);
+
+    return start;
+  }
+
+  /*
+   * Otherwise no quoting character.  Scan till next whitespace, delimiter
+   * or quote.  NB: at this point, *start is known not to be '\0',
+   * whitespace, delim, or quote, so we will consume at least one character.
+   */
+  offset = strcspn(start, whitespace);
+
+  if (delim)
+  {
+    unsigned int offset2 = strcspn(start, delim);
+
+    if (offset > offset2)
+      offset = offset2;
+  }
+
+  if (quote)
+  {
+    unsigned int offset2 = strcspn(start, quote);
+
+    if (offset > offset2)
+      offset = offset2;
+  }
+
+  p = start + offset;
+
+  /*
+   * If not at end of string, we need to insert a null to terminate the
+   * returned token.  See notes above.
+   */
+  if (*p != '\0')
+  {
+    if (!strchr(whitespace, *p))
+      memmove(p + 1, p, strlen(p) + 1);
+    *p = '\0';
+    string = p + 1;
+  }
+  else
+  {
+    /* at end of string, so no extra work */
+    string = p;
+  }
+
+  return start;
+}
+
+static void orc_strip_quotes(char *source, char quote, char escape,
+    int encoding)
+{
+  char *src;
+  char *dst;
+
+  Assert(source);Assert(quote);
+
+  src = dst = source;
+
+  if (*src && *src == quote)
+    src++; /* skip leading quote */
+
+  while (*src)
+  {
+    char c = *src;
+    int i;
+
+    if (c == quote && src[1] == '\0')
+      break; /* skip trailing quote */
+    else if (c == quote && src[1] == quote)
+      src++; /* process doubled quote */
+    else if (c == escape && src[1] != '\0')
+      src++; /* process escaped character */
+
+    i = pg_encoding_mblen(encoding, src);
+    while (i--)
+      *dst++ = *src++;
+  }
+
+  *dst = '\0';
+}
diff --git a/contrib/orc/orc_init.sql b/contrib/orc/orc_init.sql
index fa720b7..cae2318 100644
--- a/contrib/orc/orc_init.sql
+++ b/contrib/orc/orc_init.sql
@@ -21,7 +21,7 @@
 CREATE OR REPLACE FUNCTION pg_catalog.orc_validate_datatypes() RETURNS void
 AS '$libdir/orc.so', 'orc_validate_datatypes'
 LANGUAGE C STABLE;
-
+/*
 CREATE OR REPLACE FUNCTION pg_catalog.orc_beginscan() RETURNS bytea
 AS '$libdir/orc.so', 'orc_beginscan'
 LANGUAGE C STABLE;
@@ -45,6 +45,7 @@
 CREATE OR REPLACE FUNCTION pg_catalog.orc_stopscan() RETURNS void
 AS '$libdir/orc.so', 'orc_stopscan'
 LANGUAGE C STABLE;
+*/
 
 CREATE OR REPLACE FUNCTION pg_catalog.orc_insert_init() RETURNS bytea
 AS '$libdir/orc.so', 'orc_insert_init'
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index fce5093..4fe0446 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1539,6 +1539,9 @@
 
 	/* The type of the table that is being scanned */
 	TableType tableType;
+	char *hivehost;
+	int hiveport;
+	char *hivepath;
 
 	/* Runtime filter */
 	struct RuntimeFilterState *runtimeFilter;
diff --git a/src/include/utils/uri.h b/src/include/utils/uri.h
index 9c5aedd..28e099c 100644
--- a/src/include/utils/uri.h
+++ b/src/include/utils/uri.h
@@ -33,7 +33,8 @@
 	URI_GPFDIST,
 	URI_CUSTOM,
 	URI_GPFDISTS,
-	URI_HDFS
+	URI_HDFS,
+	URI_HIVE
 }	UriProtocol;
 
 #define PROTOCOL_FILE		"file://"
@@ -43,6 +44,7 @@
 #define PROTOCOL_GPFDISTS	"gpfdists://"
 #define PROTOCOL_PXF		"pxf://"
 #define PROTOCOL_HDFS		"hdfs://"
+#define PROTOCOL_HIVE      	"hive://"
 
 /* 
  * sometimes we don't want to parse the whole URI but just take a peek at