HAWQ-1628. Add hdfs protocol for pluggable storage framework
diff --git a/contrib/Makefile b/contrib/Makefile
index 695e92a..e5daff9 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -9,6 +9,7 @@
 		extprotocol \
 		gp_cancel_query \
 		formatter_fixedwidth \
+		exthdfs\
 		hawq-hadoop
 
 ifeq ($(with_pgcrypto), yes)
diff --git a/contrib/exthdfs/Makefile b/contrib/exthdfs/Makefile
new file mode 100644
index 0000000..e247664
--- /dev/null
+++ b/contrib/exthdfs/Makefile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+MODULE_big = exthdfs
+OBJS       = exthdfs.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport)
+
+override CFLAGS += -lhdfs3
+
+ifdef USE_PGXS
+PGXS := $(shell pg_config --pgxs)
+include $(PGXS)
+else
+subdir = contrib/exthdfs
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/exthdfs/common.h b/contrib/exthdfs/common.h
new file mode 100644
index 0000000..4111909
--- /dev/null
+++ b/contrib/exthdfs/common.h
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _EXTHDFS_COMMON_H_
+#define _EXTHDFS_COMMON_H_
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "access/extprotocol.h"
+#include "access/fileam.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_exttable.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "miscadmin.h"
+
+#include <fcntl.h>
+
+#endif  // _EXTHDFS_COMMON_H_
+
diff --git a/contrib/exthdfs/exthdfs.c b/contrib/exthdfs/exthdfs.c
new file mode 100644
index 0000000..1378734
--- /dev/null
+++ b/contrib/exthdfs/exthdfs.c
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "postgres.h"
+
+#include "common.h"
+#include "access/extprotocol.h"
+#include "cdb/cdbdatalocality.h"
+#include "storage/fd.h"
+#include "storage/filesystem.h"
+#include "utils/uri.h"
+
+
+
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation);
+PG_FUNCTION_INFO_V1(hdfsprotocol_validate);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS);
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
+{
+
+	// Build the result instance
+	int nsize = 0;
+	int numOfBlock = 0;
+	ExtProtocolBlockLocationData *bldata =
+		palloc0(sizeof(ExtProtocolBlockLocationData));
+	if (bldata == NULL)
+	{
+		elog(ERROR, "hdfsprotocol_blocklocation : "
+                    "cannot allocate due to no memory");
+	}
+	bldata->type = T_ExtProtocolBlockLocationData;
+	fcinfo->resultinfo = bldata;
+
+	ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *)
+												(fcinfo->context);
+
+
+	 // Parse URI of the first location, we expect all locations uses the same
+	 // name node server. This is checked in validation function.
+
+	char *first_uri_str = (char *)strVal(lfirst(list_head(pvalidator_data->url_list)));
+	Uri *uri = ParseExternalTableUri(first_uri_str);
+
+	elog(DEBUG3, "hdfsprotocol_blocklocation : "
+				 "extracted HDFS name node address %s:%d",
+				 uri->hostname, uri->port);
+
+	// Create file system instance
+	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+	if (fs == NULL)
+	{
+		elog(ERROR, "hdfsprotocol_blocklocation : "
+					"failed to create HDFS instance to connect to %s:%d",
+					uri->hostname, uri->port);
+	}
+
+	// Clean up uri instance as we don't need it any longer
+	FreeExternalTableUri(uri);
+
+	// Check all locations to get files to fetch location.
+	ListCell *lc = NULL;
+	foreach(lc, pvalidator_data->url_list)
+	{
+		// Parse current location URI.
+		char *url = (char *)strVal(lfirst(lc));
+		Uri *uri = ParseExternalTableUri(url);
+		if (uri == NULL)
+		{
+			elog(ERROR, "hdfsprotocol_blocklocation : "
+						"invalid URI encountered %s", url);
+		}
+
+		 //
+		 // NOTICE: We temporarily support only directories as locations. We plan
+		 //        to extend the logic to specifying single file as one location
+		 //         very soon.
+
+
+		// get files contained in the path.
+		hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+		if (fiarray == NULL)
+		{
+			elog(ERROR, "hdfsprotocol_blocklocation : "
+						"failed to get files of path %s",
+						uri->path);
+		}
+
+		int i = 0 ;
+		// Call block location api to get data location for each file
+		for (i = 0 ; i < nsize ; i++)
+		{
+			hdfsFileInfo *fi = &fiarray[i];
+
+			// break condition of this for loop
+			if (fi == NULL) {break;}
+
+			// Build file name full path.
+			const char *fname = fi->mName;
+			char *fullpath = palloc0(                // slash
+									 strlen(fname) +      // name
+									 1);                  // \0
+			sprintf(fullpath, "%s", fname);
+
+			elog(DEBUG3, "hdfsprotocol_blocklocation : "
+						 "built full path file %s", fullpath);
+
+			// Get file full length.
+			int64_t len = fi->mSize;
+
+			elog(DEBUG3, "hdfsprotocol_blocklocation : "
+					     "got file %s length " INT64_FORMAT,
+					     fullpath, len);
+
+			if (len == 0) {
+				pfree(fullpath);
+				continue;
+			}
+
+			// Get block location data for this file
+			BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
+			if (bla == NULL)
+			{
+				elog(ERROR, "hdfsprotocol_blocklocation : "
+							"failed to get block location of path %s. "
+							"It is reported generally due to HDFS service errors or "
+							"another session's ongoing writing.",
+							fullpath);
+			}
+
+			// Add file full path and its block number as result.
+			blocklocation_file *blf = palloc0(sizeof(blocklocation_file));
+			blf->file_uri = pstrdup(fullpath);
+			blf->block_num = numOfBlock;
+			blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num);
+
+			elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks",
+					  	 fullpath, blf->block_num);
+
+			// We don't need it any longer
+			pfree(fullpath);
+			int bidx = 0;
+			// Add block information as a list.
+			for (bidx = 0 ; bidx < blf->block_num ; bidx++)
+			{
+				BlockLocation *blo = &bla[bidx];
+				BlockLocation *bl = &(blf->locations[bidx]);
+				bl->numOfNodes = blo->numOfNodes;
+				bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+				bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+				bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+				bl->offset = blo->offset;
+				bl->length = blo->length;
+				bl->corrupt = blo->corrupt;
+
+				int nidx = 0 ;
+				for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++)
+				{
+					bl->hosts[nidx] = pstrdup(*blo[nidx].hosts);
+					bl->names[nidx] = pstrdup(*blo[nidx].names);
+					bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths);
+				}
+			}
+
+			bldata->files = lappend(bldata->files, (void *)(blf));
+
+			// Clean up block location instances created by the lib.
+			hdfsFreeFileBlockLocations(bla,numOfBlock);
+		}
+
+		// Clean up URI instance in loop as we don't need it any longer
+		FreeExternalTableUri(uri);
+
+		// Clean up file info array created by the lib for this location.
+		hdfsFreeFileInfo(fiarray,nsize);
+	}
+
+	// destroy fs instance
+	hdfsDisconnect(fs);
+
+	PG_RETURN_VOID();
+
+}
+
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS)
+{
+	elog(DEBUG3, "hdfsprotocol_validate() begin");
+
+	/* Check which action should perform. */
+	ExtProtocolValidatorData *pvalidator_data =
+       (ExtProtocolValidatorData *)(fcinfo->context);
+
+	if (pvalidator_data->forceCreateDir)
+		Assert(pvalidator_data->url_list && pvalidator_data->url_list->length == 1);
+
+	if (pvalidator_data->direction == EXT_VALIDATE_WRITE)
+	{
+		/* accept only one directory location */
+		if (list_length(pvalidator_data->url_list) != 1)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("hdfsprotocol_validate : "
+							"only one location url is supported for writable external hdfs")));
+		}
+	}
+
+	/* Go through first round to get formatter type */
+	bool isCsv = false;
+	bool isText = false;
+	bool isOrc = false;
+	ListCell *optcell = NULL;
+	foreach(optcell, pvalidator_data->format_opts)
+	{
+		DefElem *de = (DefElem *)lfirst(optcell);
+		if (strcasecmp(de->defname, "formatter") == 0)
+		{
+			char *val = strVal(de->arg);
+			if (strcasecmp(val, "csv") == 0)
+			{
+				isCsv = true;
+			}
+			else if (strcasecmp(val, "text") == 0)
+			{
+				isText = true;
+			}
+			else if (strcasecmp(val, "orc") == 0)
+			{
+				isOrc = true;
+			}
+		}
+	}
+	/*if(1)
+	{
+		ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"no formatter is supported for external hdfs")));
+	}*/
+	if (!isCsv && !isText && !isOrc)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("hdfsprotocol_validate : "
+						"only 'csv', 'text' and 'orc' formatter is supported for external hdfs")));
+	}
+	Assert(isCsv || isText || isOrc);
+
+	/* Validate formatter options */
+	foreach(optcell, pvalidator_data->format_opts)
+	{
+		DefElem *de = (DefElem *)lfirst(optcell);
+		if (strcasecmp(de->defname, "delimiter") == 0)
+		{
+			char *val = strVal(de->arg);
+			/* Validation 1. User can not specify 'OFF' in delimiter */
+			if (strcasecmp(val, "off") == 0)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"'off' value of 'delimiter' option is not supported")));
+			}
+			/* Validation 2. Can specify multibytes characters */
+			if (strlen(val) < 1)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'delimiter' option accepts multibytes characters")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "escape") == 0)
+		{
+			char *val = strVal(de->arg);
+			/* Validation 3. User can not specify 'OFF' in delimiter */
+			if (strcasecmp(val, "off") == 0)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"'off' value of 'escape' option is not supported")));
+			}
+			/* Validation 4. Can only specify one character */
+			if (strlen(val) != 1)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'escape' option accepts single character")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "newline") == 0)
+		{
+			char *val = strVal(de->arg);
+			/* Validation 5. only accept 'lf', 'cr', 'crlf' */
+			if (strcasecmp(val, "lf") != 0 &&
+				strcasecmp(val, "cr") != 0 &&
+				strcasecmp(val, "crlf") != 0)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"the value of 'newline' option can only be "
+								"'lf', 'cr' or 'crlf'")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "quote") == 0)
+		{
+			/* This is allowed only for csv mode formatter */
+			if (!isCsv)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'quote' option is only available in 'csv' formatter")));
+			}
+
+			char *val = strVal(de->arg);
+			/* Validation 5. Can only specify one character */
+			if (strlen(val) != 1)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'quote' option accepts single character")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "force_notnull") == 0)
+		{
+			/* This is allowed only for csv mode formatter */
+			if (!isCsv)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'force_notnull' option is only available in 'csv' formatter")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "force_quote") == 0)
+		{
+			/* This is allowed only for csv mode formatter */
+			if (!isCsv)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'force_quote' option is only available in 'csv' formatter")));
+			}
+		}
+	}
+
+	/* All urls should
+	 * 1) have the same protocol name 'hdfs',
+	 * 2) the same hdfs namenode server address
+	 */
+	/* Check all locations to get files to fetch location. */
+	char *nnaddr = NULL;
+	int nnport = -1;
+	ListCell *lc = NULL;
+	foreach(lc, pvalidator_data->url_list)
+	{
+		/* Parse current location URI. */
+		char *url = (char *)strVal(lfirst(lc));
+		Uri *uri = ParseExternalTableUri(url);
+		if (uri == NULL)
+		{
+			elog(ERROR, "hdfsprotocol_validate : "
+						"invalid URI encountered %s", url);
+		}
+
+		if (uri->protocol != URI_HDFS)
+		{
+			elog(ERROR, "hdfsprotocol_validate : "
+						"invalid URI protocol encountered in %s, "
+						"hdfs:// protocol is required",
+						url);
+		}
+
+		if (nnaddr == NULL)
+		{
+			nnaddr = pstrdup(uri->hostname);
+			nnport = uri->port;
+		}
+		else
+		{
+			if (strcmp(nnaddr, uri->hostname) != 0)
+			{
+				elog(ERROR, "hdfsprotocol_validate : "
+							"different name server addresses are detected, "
+							"both %s and %s are found",
+							nnaddr, uri->hostname);
+			}
+			if (nnport != uri->port)
+			{
+				elog(ERROR, "hdfsprotocol_validate : "
+							"different name server ports are detected, "
+							"both %d and %d are found",
+							nnport, uri->port);
+			}
+		}
+
+		/* SHOULD ADD LOGIC HERE TO CREATE UNEXISTING PATH */
+		if (pvalidator_data->forceCreateDir) {
+
+		  elog(LOG, "hdfs_validator() forced creating dir");
+
+		  /* Create file system instance */
+		  	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+			if (fs == NULL)
+			{
+				elog(ERROR, "hdfsprotocol_validate : "
+							"failed to create HDFS instance to connect to %s:%d",
+							uri->hostname, uri->port);
+			}
+
+			if (hdfsExists(fs, uri->path) == -1)
+				elog(ERROR, "hdfsprotocol_validate : "
+						"Location \"%s\" is not exist",
+						uri->path);
+
+		 /* destroy fs instance */
+			hdfsDisconnect(fs);
+		}
+
+		/* Clean up temporarily created instances */
+		FreeExternalTableUri(uri);
+		if (nnaddr != NULL)
+		{
+			pfree(nnaddr);
+		}
+	}
+
+	elog(LOG, "passed validating hdfs protocol options");
+
+	/**************************************************************************
+	 * This is a bad implementation that we check formatter options here. Should
+	 * be moved to call formatter specific validation UDFs.
+	 **************************************************************************/
+
+	PG_RETURN_VOID();
+}
+
diff --git a/contrib/extprotocol/gpextprotocol.c b/contrib/extprotocol/gpextprotocol.c
index 419b923..07f1731 100644
--- a/contrib/extprotocol/gpextprotocol.c
+++ b/contrib/extprotocol/gpextprotocol.c
@@ -324,5 +324,5 @@
 	if (uri->protocol)
 		pfree(uri->protocol);
 	
-	pfree(uri);
+	FreeExternalTableUri(uri);
 }
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 692a5db..0d2527d 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -86,14 +86,17 @@
 						   char *uri, int rejectlimit,
 						   bool islimitinrows, Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding);
 
-static void FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
-										 int					nArgs,
-										 CopyState 				pstate,
-										 FormatterData*			formatter,
-										 Relation 				rel,
-										 TupleDesc 				tupDesc,
-										 FmgrInfo			   *convFuncs,
-										 Oid                   *typioparams);
+static void
+FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
+							 int					nArgs,
+							 CopyState 				pstate,
+							 FormatterData		   *formatter,
+							 Relation 				rel,
+							 TupleDesc 				tupDesc,
+							 FmgrInfo			   *convFuncs,
+							 Oid                   *typioparams,
+							 char				   *url,
+							 ScanState			   *ss);
 
 static void open_external_readable_source(FileScanDesc scan);
 static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
@@ -305,12 +308,54 @@
 	scan->errcontext.previous = error_context_stack;
 
 	//pgstat_initstats(relation);
-
+	external_populate_formatter_actionmask(scan->fs_pstate, scan->fs_formatter);
 	return scan;
 }
 
 
 /* ----------------
+ * external_populate_formatter_actionmask
+ * ----------------
+ */
+void external_populate_formatter_actionmask(CopyState pstate,
+											FormatterData *formatter)
+{
+	/* We just call the formatter in function to populate the mask */
+	formatter->fmt_mask = FMT_UNSET;
+
+	if (pstate->custom_formatter_func == NULL)
+	{
+		formatter->fmt_mask |= FMT_NEEDEXTBUFF;
+		elog(LOG, "external scan needs an external protocol to cooperate");
+		return;
+	}
+
+	Datum d;
+	FunctionCallInfoData fcinfo;
+	/* per call formatter prep */
+	FunctionCallPrepareFormatter(&fcinfo,
+								 0,
+								 pstate,
+								 formatter,
+								 NULL,
+								 NULL,
+								 NULL,
+								 NULL,
+								 NULL,
+								 NULL);
+	d = FunctionCallInvoke(&fcinfo);
+
+	if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+	{
+		elog(LOG, "external scan needs an external protocol to cooperate");
+	}
+	else
+	{
+		elog(LOG, "external scan needs only formatter to manipulate data");
+	}
+}
+
+/* ----------------
 *		external_rescan  - (re)start a scan of an external file
 * ----------------
 */
@@ -525,13 +570,15 @@
 	/*
 	 * open the external source (local file or http).
 	 *
-	 * NOTE: external_beginscan() seems like the natural place for this call. However,
-	 * in queries with more than one gang each gang will initialized all the nodes
-	 * of the plan (but actually executed only the nodes in it's local slice)
-	 * This means that external_beginscan() (and external_endscan() too) will get called
-	 * more than needed and we'll end up opening too many http connections when
-	 * they are not expected (see MPP-1261). Therefore we instead do it here on the
-	 * first time around only.
+	 * NOTE: external_beginscan() seems like the natural place for this call.
+	 * However, in queries with more than one gang each gang will initialized
+	 * all the nodes of the plan (but actually executed only the nodes in it's
+	 * local slice)
+	 *
+	 * This means that external_beginscan() (and external_endscan() too) will
+	 * get called more than needed and we'll end up opening too many http
+	 * connections when they are not expected (see MPP-1261). Therefore we
+	 * instead do it here on the first time around only.
 	 */
 
 	/*
@@ -539,7 +586,7 @@
 	 * load external protocol.
 	 */
 
-	if (scan->fs_file == NULL)
+	if (scan->fs_file == NULL && (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF))
 		open_external_readable_source(scan);
 
 	/* Note: no locking manipulations needed */
@@ -567,6 +614,7 @@
 	return true;
 }
 
+
 /*
  * external_insert_init
  *
@@ -575,7 +623,7 @@
  */
 ExternalInsertDesc
 external_insert_init(Relation rel, int errAosegno,
-                     ExternalTableType formatterType, char *formatterName)
+                     int formatterType, char *formatterName, PlannedStmt* plannedstmt)
 {
 	ExternalInsertDesc	extInsertDesc;
 	ExtTableEntry*		extentry;
@@ -634,8 +682,12 @@
 		/* get a url to use. we use seg number modulo total num of urls */
 		v = list_nth(extentry->locations, my_url);
 		uri_str = pstrdup(v->val.str);
+		Uri* uri = ParseExternalTableUri(uri_str);
+
 		extInsertDesc->ext_uri = uri_str;
 
+		FreeExternalTableUri(uri);
+
 		/*elog(NOTICE, "seg %d got url number %d: %s", segindex, my_url, uri_str);*/
 	}
 
@@ -668,6 +720,10 @@
 	{
 		extInsertDesc->ext_formatter_data = (FormatterData *) palloc0 (sizeof(FormatterData));
 		extInsertDesc->ext_formatter_data->fmt_perrow_ctx = extInsertDesc->ext_pstate->rowcontext;
+
+		/* First call formatter in function to get action mask */
+		external_populate_formatter_actionmask(extInsertDesc->ext_pstate,
+											   extInsertDesc->ext_formatter_data);
 	}
 
 	return extInsertDesc;
@@ -687,7 +743,6 @@
 Oid
 external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
 {
-
 	HeapTuple		instup = ExecFetchSlotHeapTuple(tupTableSlot);
 	TupleDesc 		tupDesc = extInsertDesc->ext_tupDesc;
 	Datum*			values = extInsertDesc->ext_values;
@@ -695,14 +750,18 @@
 	CopyStateData*  pstate = extInsertDesc->ext_pstate;
 	bool			customFormat = extInsertDesc->ext_pstate->custom;
 
-
 	if(extInsertDesc->ext_noop)
 		return InvalidOid;
 
 
 	/* Open our output file or output stream if not yet open */
-	if(!extInsertDesc->ext_file && !extInsertDesc->ext_noop)
+	if(!extInsertDesc->ext_file &&
+	   !extInsertDesc->ext_noop &&
+	   (extInsertDesc->ext_formatter_data == NULL ||
+	   (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF)))
+	{
 		open_external_writable_source(extInsertDesc);
+	}
 
 	/*
 	 * deconstruct the tuple and format it into text
@@ -730,15 +789,34 @@
 		/* must have been created during insert_init */
 		Assert(formatter);
 
-		/* per call formatter prep */
-		FunctionCallPrepareFormatter(&fcinfo,
-									 1,
-									 pstate,
-									 formatter,
-									 extInsertDesc->ext_rel,
-									 extInsertDesc->ext_tupDesc,
-									 pstate->out_functions,
-									 NULL);
+		if ((formatter->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+		{
+			/* per call formatter prep */
+			FunctionCallPrepareFormatter(&fcinfo,
+										 0,
+										 pstate,
+										 formatter,
+										 extInsertDesc->ext_rel,
+										 extInsertDesc->ext_tupDesc,
+										 pstate->out_functions,
+										 NULL,
+										 extInsertDesc->ext_uri,
+										 NULL);
+		}
+		else
+		{
+			/* per call formatter prep */
+			FunctionCallPrepareFormatter(&fcinfo,
+										 1,
+										 pstate,
+										 formatter,
+										 extInsertDesc->ext_rel,
+										 extInsertDesc->ext_tupDesc,
+										 pstate->out_functions,
+										 NULL,
+										 NULL,
+										 NULL);
+		}
 
 		/* Mark the correct record type in the passed tuple */
 		HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
@@ -748,18 +826,22 @@
 		fcinfo.argnull[0] = false;
 
 		d = FunctionCallInvoke(&fcinfo);
+
 		MemoryContextReset(formatter->fmt_perrow_ctx);
 
-		/* We do not expect a null result */
-		if (fcinfo.isnull)
-			elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
+		if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+		{
+			/* We do not expect a null result */
+			if (fcinfo.isnull)
+				elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
 
-		b = DatumGetByteaP(d);
+			b = DatumGetByteaP(d);
 
-		CopyOneCustomRowTo(pstate, b);
+			CopyOneCustomRowTo(pstate, b);
+		}
 	}
 
-	if (extInsertDesc->ext_formatter_data == NULL)
+	if (extInsertDesc->ext_formatter_data == NULL || (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF))
 	{
 		/* Write the data into the external source */
 		external_senddata((URL_FILE*)extInsertDesc->ext_file, pstate);
@@ -769,7 +851,6 @@
 		pstate->fe_msgbuf->data[0] = '\0';
 	}
 	pstate->processed++;
-
 	return HeapTupleGetOid(instup);
 }
 
@@ -782,6 +863,28 @@
 void
 external_insert_finish(ExternalInsertDesc extInsertDesc)
 {
+	/* Tell formatter to close */
+	if (extInsertDesc->ext_formatter_data != NULL &&
+		(extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+	{
+		Datum d;
+		FunctionCallInfoData fcinfo;
+
+		extInsertDesc->ext_formatter_data->fmt_mask |= FMT_WRITE_END;
+
+		/* per call formatter prep */
+		FunctionCallPrepareFormatter(&fcinfo,
+									 0,
+									 extInsertDesc->ext_pstate,
+									 extInsertDesc->ext_formatter_data,
+									 NULL,
+									 NULL,
+									 NULL,
+									 NULL,
+									 NULL,
+									 NULL);
+		d = FunctionCallInvoke(&fcinfo);
+	}
 
 	/*
 	 * Close the external source
@@ -805,6 +908,7 @@
 	pfree(extInsertDesc);
 }
 
+
 /* ==========================================================================
  * The follwing macros aid in major refactoring of data processing code (in
  * externalgettup() ). We use macros because in some cases the code must be in
@@ -1042,102 +1146,102 @@
 static HeapTuple
 externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
 {
-		HeapTuple	tuple = NULL;
-		CopyState	pstate = scan->fs_pstate;
-		bool        needData = false;
+	HeapTuple	tuple = NULL;
+	CopyState	pstate = scan->fs_pstate;
+	bool        needData = false;
 
-		/* If we either got things to read or stuff to process */
-		while (!pstate->fe_eof || !pstate->raw_buf_done)
+	/* If we either got things to read or stuff to process */
+	while (!pstate->fe_eof || !pstate->raw_buf_done)
+	{
+		/* need to fill our buffer with data? */
+		if (pstate->raw_buf_done)
 		{
-			/* need to fill our buffer with data? */
-			if (pstate->raw_buf_done)
+		    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+			pstate->begloc = pstate->raw_buf;
+			pstate->raw_buf_done = (pstate->bytesread==0);
+			pstate->raw_buf_index = 0;
+
+			/* on first time around just throw the header line away */
+			if (pstate->header_line && pstate->bytesread > 0)
 			{
-			    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
-				pstate->begloc = pstate->raw_buf;
-				pstate->raw_buf_done = (pstate->bytesread==0);
-				pstate->raw_buf_index = 0;
-
-				/* on first time around just throw the header line away */
-				if (pstate->header_line && pstate->bytesread > 0)
+				PG_TRY();
 				{
-					PG_TRY();
-					{
-						readHeaderLine(pstate);
-					}
-					PG_CATCH();
-					{
-						/*
-						 * got here? encoding conversion error occurred on the
-						 * header line (first row).
-						 */
-						if (pstate->errMode == ALL_OR_NOTHING)
-						{
-							PG_RE_THROW();
-						}
-						else
-						{
-							/* SREH - release error state */
-							if (!elog_dismiss(DEBUG5))
-								PG_RE_THROW(); /* hope to never get here! */
-
-							/*
-							 * note: we don't bother doing anything special here.
-							 * we are never interested in logging a header line
-							 * error. just continue the workflow.
-							 */
-						}
-					}
-					PG_END_TRY();
-
-					EXT_RESET_LINEBUF;
-					pstate->header_line = false;
+					readHeaderLine(pstate);
 				}
-			}
-
-			/* while there is still data in our buffer */
-			while (!pstate->raw_buf_done || needData)
-			{
-				DataLineStatus ret_mode = parse_next_line(scan);
-
-				if(ret_mode == LINE_OK)
+				PG_CATCH();
 				{
-					/* convert to heap tuple */
-					/* XXX This is bad code.  Planner should be able to
-					 * decide whether we need heaptuple or memtuple upstream,
-					 * so make the right decision here.
+					/*
+					 * got here? encoding conversion error occurred on the
+					 * header line (first row).
 					 */
-					tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
-					pstate->processed++;
-					MemoryContextReset(pstate->rowcontext);
-					return tuple;
+					if (pstate->errMode == ALL_OR_NOTHING)
+					{
+						PG_RE_THROW();
+					}
+					else
+					{
+						/* SREH - release error state */
+						if (!elog_dismiss(DEBUG5))
+							PG_RE_THROW(); /* hope to never get here! */
+
+						/*
+						 * note: we don't bother doing anything special here.
+						 * we are never interested in logging a header line
+						 * error. just continue the workflow.
+						 */
+					}
 				}
-				else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
-				{
-					/* error was handled in parse_next_line. move to the next */
-					continue;
-				}
-				else if(ret_mode == END_MARKER)
-				{
-					scan->fs_inited = false;
-					return NULL;
-				}
-				else
-				{
-					/* try to get more data if possible */
-					Assert((ret_mode == NEED_MORE_DATA) ||
-						   (ret_mode == LINE_ERROR && pstate->raw_buf_done));
-					needData = true;
-					break;
-				}
+				PG_END_TRY();
+
+				EXT_RESET_LINEBUF;
+				pstate->header_line = false;
 			}
 		}
 
-		/*
-		 * if we got here we finished reading all the data.
-		 */
-		scan->fs_inited = false;
+		/* while there is still data in our buffer */
+		while (!pstate->raw_buf_done || needData)
+		{
+			DataLineStatus ret_mode = parse_next_line(scan);
 
-		return NULL;
+			if(ret_mode == LINE_OK)
+			{
+				/* convert to heap tuple */
+				/* XXX This is bad code.  Planner should be able to
+				 * decide whether we need heaptuple or memtuple upstream,
+				 * so make the right decision here.
+				 */
+				tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
+				pstate->processed++;
+				MemoryContextReset(pstate->rowcontext);
+				return tuple;
+			}
+			else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
+			{
+				/* error was handled in parse_next_line. move to the next */
+				continue;
+			}
+			else if(ret_mode == END_MARKER)
+			{
+				scan->fs_inited = false;
+				return NULL;
+			}
+			else
+			{
+				/* try to get more data if possible */
+				Assert((ret_mode == NEED_MORE_DATA) ||
+					   (ret_mode == LINE_ERROR && pstate->raw_buf_done));
+				needData = true;
+				break;
+			}
+		}
+	}
+
+	/*
+	 * if we got here we finished reading all the data.
+	 */
+	scan->fs_inited = false;
+
+	return NULL;
 
 
 }
@@ -1145,149 +1249,241 @@
 static HeapTuple
 externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
 {
-		HeapTuple   	tuple;
-		CopyState		pstate = scan->fs_pstate;
-		FormatterData*	formatter = scan->fs_formatter;
-		bool			no_more_data = false;
-		MemoryContext 	oldctxt = CurrentMemoryContext;
+	HeapTuple   	tuple;
+	CopyState		pstate = scan->fs_pstate;
+	FormatterData*	formatter = scan->fs_formatter;
+	bool			no_more_data = false;
+	MemoryContext 	oldctxt = CurrentMemoryContext;
 
-		Assert(formatter);
+	Assert(formatter);
 
-		/* while didn't finish processing the entire file */
-		while (!no_more_data)
+	/* while didn't finish processing the entire file */
+	while (!no_more_data)
+	{
+		/* need to fill our buffer with data? */
+		if (pstate->raw_buf_done)
 		{
-			/* need to fill our buffer with data? */
-			if (pstate->raw_buf_done)
-			{
-				int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
-				if ( bytesread > 0 )
-					appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
-				pstate->raw_buf_done = false;
+			int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+			if ( bytesread > 0 )
+				appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
+			pstate->raw_buf_done = false;
 
-				/* HEADER not yet supported ... */
-				if(pstate->header_line)
-					elog(ERROR, "header line in custom format is not yet supported");
-			}
-
-			if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
-			{
-				/* while there is still data in our buffer */
-				while (!pstate->raw_buf_done)
-				{
-					bool	error_caught = false;
-
-					/*
-					 * Invoke the custom formatter function.
-					 */
-					PG_TRY();
-					{
-						Datum					d;
-						FunctionCallInfoData	fcinfo;
-
-						/* per call formatter prep */
-						FunctionCallPrepareFormatter(&fcinfo,
-													 0,
-													 pstate,
-													 formatter,
-													 scan->fs_rd,
-													 scan->fs_tupDesc,
-													 scan->in_functions,
-													 scan->typioparams);
-						d = FunctionCallInvoke(&fcinfo);
-
-					}
-					PG_CATCH();
-					{
-						error_caught = true;
-
-						MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
-
-						/*
-						 * Save any bad row information that was set
-						 * by the user in the formatter UDF (if any).
-						 * Then handle the error in FILEAM_HANDLE_ERROR.
-						 */
-						pstate->cur_lineno = formatter->fmt_badrow_num;
-						pstate->cur_byteno = formatter->fmt_bytesread;
-						resetStringInfo(&pstate->line_buf);
-
-						if (formatter->fmt_badrow_len > 0)
-						{
-							if (formatter->fmt_badrow_data)
-								appendBinaryStringInfo(&pstate->line_buf,
-													   formatter->fmt_badrow_data,
-													   formatter->fmt_badrow_len);
-
-							formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
-							if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
-								formatter->fmt_databuf.cursor < 0 )
-							{
-								formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
-							}
-						}
-
-						FILEAM_HANDLE_ERROR;
-
-						MemoryContextSwitchTo(oldctxt);
-					}
-					PG_END_TRY();
-
-					/*
-					 * Examine the function results. If an error was caught
-					 * we already handled it, so after checking the reject
-					 * limit, loop again and call the UDF for the next tuple.
-					 */
-					if (!error_caught)
-					{
-						switch (formatter->fmt_notification)
-						{
-							case FMT_NONE:
-
-								/* got a tuple back */
-
-								tuple = formatter->fmt_tuple;
-								pstate->processed++;
-								MemoryContextReset(formatter->fmt_perrow_ctx);
-
-								return tuple;
-
-							case FMT_NEED_MORE_DATA:
-
-								/*
-								 * Callee consumed all data in the buffer.
-								 * Prepare to read more data into it.
-								 */
-								pstate->raw_buf_done = true;
-								justifyDatabuf(&formatter->fmt_databuf);
-
-								continue;
-
-							default:
-								elog(ERROR, "unsupported formatter notification (%d)",
-											formatter->fmt_notification);
-								break;
-						}
-					}
-					else
-					{
-						FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
-					}
-
-				}
-			}
-			else
-			{
-				no_more_data = true;
-			}
+			/* HEADER not yet supported ... */
+			if(pstate->header_line)
+				elog(ERROR, "header line in custom format is not yet supported");
 		}
 
-		/*
-		 * if we got here we finished reading all the data.
-		 */
-		Assert(no_more_data);
-		scan->fs_inited = false;
+		if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
+		{
+			/* while there is still data in our buffer */
+			while (!pstate->raw_buf_done)
+			{
+				bool	error_caught = false;
 
-		return NULL;
+				/*
+				 * Invoke the custom formatter function.
+				 */
+				PG_TRY();
+				{
+					Datum					d;
+					FunctionCallInfoData	fcinfo;
+
+					/* per call formatter prep */
+					FunctionCallPrepareFormatter(&fcinfo,
+												 0,
+												 pstate,
+												 formatter,
+												 scan->fs_rd,
+												 scan->fs_tupDesc,
+												 scan->in_functions,
+												 scan->typioparams,
+												 ((URL_FILE *)(scan->fs_file))->url,
+												 ss);
+					d = FunctionCallInvoke(&fcinfo);
+
+				}
+				PG_CATCH();
+				{
+					error_caught = true;
+
+					MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
+
+					/*
+					 * Save any bad row information that was set
+					 * by the user in the formatter UDF (if any).
+					 * Then handle the error in FILEAM_HANDLE_ERROR.
+					 */
+					pstate->cur_lineno = formatter->fmt_badrow_num;
+					pstate->cur_byteno = formatter->fmt_bytesread;
+					resetStringInfo(&pstate->line_buf);
+
+					if (formatter->fmt_badrow_len > 0)
+					{
+						if (formatter->fmt_badrow_data)
+							appendBinaryStringInfo(&pstate->line_buf,
+												   formatter->fmt_badrow_data,
+												   formatter->fmt_badrow_len);
+
+						formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
+						if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
+							formatter->fmt_databuf.cursor < 0 )
+						{
+							formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
+						}
+					}
+
+					FILEAM_HANDLE_ERROR;
+
+					MemoryContextSwitchTo(oldctxt);
+				}
+				PG_END_TRY();
+
+				/*
+				 * Examine the function results. If an error was caught
+				 * we already handled it, so after checking the reject
+				 * limit, loop again and call the UDF for the next tuple.
+				 */
+				if (!error_caught)
+				{
+					switch (formatter->fmt_notification)
+					{
+						case FMT_NONE:
+
+							/* got a tuple back */
+
+							tuple = formatter->fmt_tuple;
+							pstate->processed++;
+							MemoryContextReset(formatter->fmt_perrow_ctx);
+
+							return tuple;
+
+						case FMT_NEED_MORE_DATA:
+
+							/*
+							 * Callee consumed all data in the buffer.
+							 * Prepare to read more data into it.
+							 */
+							pstate->raw_buf_done = true;
+							justifyDatabuf(&formatter->fmt_databuf);
+
+							continue;
+
+						default:
+							elog(ERROR, "unsupported formatter notification (%d)",
+										formatter->fmt_notification);
+							break;
+					}
+				}
+				else
+				{
+					FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
+				}
+			}
+		}
+		else
+		{
+			no_more_data = true;
+		}
+	}
+
+	/*
+	 * if we got here we finished reading all the data.
+	 */
+	Assert(no_more_data);
+	scan->fs_inited = false;
+
+	return NULL;
+}
+
+static HeapTuple
+externalgettup_custom_noextprot(FileScanDesc scan,
+								ExternalSelectDesc desc,
+								ScanState *ss)
+{
+	HeapTuple   	tuple;
+	CopyState		pstate = scan->fs_pstate;
+	FormatterData*	formatter = scan->fs_formatter;
+	bool			no_more_data = false;
+	MemoryContext 	oldctxt = CurrentMemoryContext;
+
+	Assert(formatter);
+
+	/* while didn't finish processing the entire file */
+	while (!no_more_data)
+	{
+		bool error_caught = false;
+
+		/*
+		 * Invoke the custom formatter function.
+		 */
+		PG_TRY();
+		{
+			Datum					d;
+			FunctionCallInfoData	fcinfo;
+
+			/* per call formatter prep */
+			FunctionCallPrepareFormatter(&fcinfo,
+										 0,
+										 pstate,
+										 formatter,
+										 scan->fs_rd,
+										 scan->fs_tupDesc,
+										 scan->in_functions,
+										 scan->typioparams,
+										 scan->fs_uri,
+										 ss);
+			d = FunctionCallInvoke(&fcinfo);
+
+		}
+		PG_CATCH();
+		{
+			error_caught = true;
+			MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
+			FILEAM_HANDLE_ERROR;
+			MemoryContextSwitchTo(oldctxt);
+		}
+		PG_END_TRY();
+
+		if (!error_caught)
+		{
+			switch (formatter->fmt_notification)
+			{
+				case FMT_NONE:
+				{
+					/* got a tuple back */
+					tuple = formatter->fmt_tuple;
+					pstate->processed++;
+					MemoryContextReset(formatter->fmt_perrow_ctx);
+					return tuple;
+				}
+				case FMT_DONE:
+				{
+					no_more_data = true;
+					break;
+				}
+				default:
+				{
+					elog(ERROR, "unsupported formatter notification (%d)",
+								formatter->fmt_notification);
+					break;
+				}
+			}
+		}
+		else
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION),
+						(errmsg("formatter reported error")),
+						errOmitLocation(true)));
+		}
+	}
+
+	/*
+	 * if we got here we finished reading all the data.
+	 */
+	Assert(no_more_data);
+	scan->fs_inited = false;
+	return NULL;
 }
 
 /* ----------------
@@ -1322,11 +1518,19 @@
 		/* (set current state...) */
 	}
 
+	/***********************************************************
+	 * This version has always custom formatter and fs defined.
+	 ***********************************************************/
 	if (!custom)
-		return externalgettup_defined(scan, desc, ss); /* text/csv */
+		return externalgettup_defined(scan, desc, ss); // text/csv
+	else if (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF)
+	{
+		return externalgettup_custom(scan, desc, ss);
+	}
 	else
-		return externalgettup_custom(scan, desc, ss);  /* custom   */
-
+	{
+		return externalgettup_custom_noextprot(scan, desc, ss);
+	}
 }
 /*
  * setCustomFormatter
@@ -1346,7 +1550,23 @@
 		Oid		argList[1];
 		Oid		returnOid;
 
-		funcname = lappend(funcname, makeString(formatter_name));
+		char*   new_formatter_name = (char *)palloc0(strlen(formatter_name) + 5);
+		if (!iswritable)
+		{
+			sprintf(new_formatter_name, "%s_in", formatter_name);
+		}
+		else
+		{
+			sprintf(new_formatter_name, "%s_out", formatter_name);
+		}
+
+		/* update to all lowercase string */
+		for ( int i = 0 ; new_formatter_name[i] != '\0' ; ++i )
+		{
+			new_formatter_name[i] = tolower(new_formatter_name[i]);
+		}
+
+		funcname = lappend(funcname, makeString(new_formatter_name));
 
 		if(iswritable)
 		{
@@ -1363,7 +1583,7 @@
 		if (!OidIsValid(procOid))
 			ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
 							errmsg("formatter function %s of type %s was not found.",
-									formatter_name,
+									new_formatter_name,
 									(iswritable ? "writable" : "readable")),
 							errhint("Create it with CREATE FUNCTION."),
 							errOmitLocation(true)));
@@ -1372,7 +1592,7 @@
 		if (get_func_rettype(procOid) != returnOid)
 			ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
 							errmsg("formatter function %s of type %s has an incorrect return type",
-									formatter_name,
+									new_formatter_name,
 									(iswritable ? "writable" : "readable")),
 							errOmitLocation(true)));
 
@@ -1381,9 +1601,12 @@
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
 					 errmsg("formatter function %s is not declared STABLE.",
-							 formatter_name),
+							 new_formatter_name),
 					 errOmitLocation(true)));
 
+		if(NULL != new_formatter_name)
+			pfree(new_formatter_name);
+
 		return procOid;
 }
 
@@ -1664,7 +1887,9 @@
 							 Relation 				rel,
 							 TupleDesc 				tupDesc,
 							 FmgrInfo			   *convFuncs,
-							 Oid                   *typioparams)
+							 Oid                   *typioparams,
+							 char				   *url,
+							 ScanState			   *ss)
 {
 	formatter->type = T_FormatterData;
 	formatter->fmt_relation = rel;
@@ -1680,6 +1905,8 @@
 	formatter->fmt_needs_transcoding = pstate->need_transcoding;
 	formatter->fmt_conversion_proc = pstate->enc_conversion_proc;
 	formatter->fmt_external_encoding = pstate->client_encoding;
+	formatter->fmt_url = url;
+	formatter->fmt_splits = ss == NULL ? NULL : ss->splits;
 
 	InitFunctionCallInfoData(/* FunctionCallInfoData */ *fcinfo,
 							 /* FmgrInfo */ pstate->custom_formatter_func,
@@ -1688,7 +1915,6 @@
 							 /* ResultSetInfo */ NULL);
 }
 
-
 /*
  * open the external source for scanning (RET only)
  *
@@ -2360,6 +2586,48 @@
 	return start;
 }
 
+bool hasErrTblInFmtOpts(List *fmtOpts) {
+	char	*format_str = pstrdup((char *) strVal(linitial(fmtOpts)));
+	const	char *whitespace = " \t\n\r";
+	int	encoding = GetDatabaseEncoding();
+	char *key = strtokx2(format_str, whitespace, NULL, NULL,
+												0, false, true, encoding);
+	while (key) {
+		if (pg_strcasecmp(key, "err_table") == 0)
+			return true;
+		key = strtokx2(NULL, whitespace, NULL, NULL,
+									   0, false, false, encoding);
+	}
+	return false;
+}
+
+char *getExtTblCategoryInFmtOptsStr(char *fmtStr)
+{
+	const char	*whitespace = " \t\n\r";
+	const char	*quote = "'";
+	int			encoding = GetDatabaseEncoding();
+
+	char *key = strtokx2(fmtStr, whitespace, NULL, NULL,
+	                     0, false, true, encoding);
+	char *val = strtokx2(NULL, whitespace, NULL, quote,
+	                     0, false, true, encoding);
+
+	while (key && val)
+	{
+		if (pg_strncasecmp(key, "category", strlen("category")) == 0)
+		{
+			return pstrdup(val);
+		}
+
+		key = strtokx2(NULL, whitespace, NULL, NULL,
+		               0, false, false, encoding);
+		val = strtokx2(NULL, whitespace, NULL, quote,
+		               0, false, true, encoding);
+	}
+
+	return NULL;
+}
+
 char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr)
 {
 	const char	*whitespace = " \t\n\r";
diff --git a/src/backend/access/external/plugstorage.c b/src/backend/access/external/plugstorage.c
index ad7d260..fe7c82a 100644
--- a/src/backend/access/external/plugstorage.c
+++ b/src/backend/access/external/plugstorage.c
@@ -479,24 +479,33 @@
 ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
 		                                             Relation relation,
                                                      int formatterType,
-                                                     char *formatterName)
+                                                     char *formatterName,
+													PlannedStmt *plannedstmt,
+													int segno )
 {
 	PlugStorageData psdata;
 	FunctionCallInfoData fcinfo;
 
-	psdata.type               = T_PlugStorageData;
-	psdata.ps_relation        = relation;
-	psdata.ps_formatter_type  = formatterType;
-	psdata.ps_formatter_name  = formatterName;
+	psdata.type = T_PlugStorageData;
+	psdata.ps_relation = relation;
+	psdata.ps_formatter_type = formatterType;
+	psdata.ps_formatter_name = formatterName;
+	psdata.ps_segno = segno;
 
-	InitFunctionCallInfoData(fcinfo,
-	                         func,
-	                         0,
-	                         (Node *)(&psdata),
-	                         NULL);
 
+	psdata.ps_scan_state = palloc0(sizeof(ScanState));
+
+	InitFunctionCallInfoData(fcinfo,  // FunctionCallInfoData
+	                         func,    // FmgrInfo
+	                         0,       // nArgs
+	                         (Node *)(&psdata),  // Call Context
+	                         NULL);              // ResultSetInfo
+
+	// Invoke function
 	FunctionCallInvoke(&fcinfo);
 
+
+	// We do not expect a null result
 	if (fcinfo.isnull)
 	{
 		elog(ERROR, "function %u returned NULL",
@@ -505,6 +514,7 @@
 
 	ExternalInsertDesc extInsertDesc = psdata.ps_ext_insert_desc;
 
+	pfree(psdata.ps_scan_state);
 	return extInsertDesc;
 }
 
diff --git a/src/backend/catalog/cdb_external_extensions.sql b/src/backend/catalog/cdb_external_extensions.sql
index e3a8080..d11797f 100644
--- a/src/backend/catalog/cdb_external_extensions.sql
+++ b/src/backend/catalog/cdb_external_extensions.sql
@@ -47,3 +47,15 @@
 CREATE OR REPLACE FUNCTION fixedwidth_out(record) RETURNS bytea 
 AS '$libdir/fixedwidth.so', 'fixedwidth_out'
 LANGUAGE C STABLE;
+
+------------------------------------------------------------------
+-- external HDFS
+------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION hdfs_validate() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_validate'
+LANGUAGE C STABLE;
+
+CREATE OR REPLACE FUNCTION hdfs_blocklocation() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_blocklocation'
+LANGUAGE C STABLE;
+
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 18a8acf..e159c83 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -47,7 +47,11 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "port.h"
 
+#include "access/fileam.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/sysattr.h"
@@ -78,38 +82,42 @@
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_type.h"
-#include "cdb/cdbappendonlyam.h"
-#include "cdb/cdbpartition.h"
+
 #include "cdb/cdbanalyze.h"
+#include "cdb/cdbappendonlyam.h"
+#include "cdb/cdbmirroredfilesysobj.h"
 #include "cdb/cdbparquetfooterprocessor.h"
+#include "cdb/cdbparquetstoragewrite.h"
+#include "cdb/cdbpartition.h"
+#include "cdb/cdbpersistentfilesysobj.h"
+#include "cdb/cdbsharedstorageop.h"
+#include "cdb/cdbvars.h"
 #include "commands/dbcommands.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
-#include "miscadmin.h"
+
 #include "nodes/makefuncs.h"
+#include "nodes/pg_list.h"
+#include "nodes/value.h"
 #include "optimizer/clauses.h"
 #include "optimizer/var.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
+#include "pg_config_manual.h"
+#include "storage/fd.h"
 #include "storage/smgr.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"             /* CDB: GetMemoryChunkContext */
+#include "utils/palloc.h"
 #include "utils/relcache.h"
 #include "utils/syscache.h"
-#include "utils/guc.h"
-#include "cdb/cdbvars.h"
-
-#include "cdb/cdbsharedstorageop.h"
-#include "cdb/cdbmirroredfilesysobj.h"
-#include "cdb/cdbpersistentfilesysobj.h"
-#include "cdb/cdbparquetstoragewrite.h"
-#include "catalog/gp_persistent.h"
-
-#include "utils/guc.h"
+#include "utils/uri.h"
 
 typedef struct pg_result PGresult;
 extern void PQclear(PGresult *res);
@@ -2544,7 +2552,52 @@
 	 * External table? If so, delete the pg_exttable tuple.
 	 */
 	if (is_external_rel)
+	{
+		/* Step 1. remove uri on file system */
+		rel = relation_open(relid, AccessExclusiveLock);
+		ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
+		char *path = (char *) strVal(linitial(exttbl->locations));
+		char *searchKey = (char *) palloc0 (MAXPGPATH);
+		char *fileSpacePath = NULL;
+		GetFilespacePathForTablespace(get_database_dts(MyDatabaseId),
+			                          &fileSpacePath);
+		sprintf(searchKey, "%s/ExtErrTbl/",fileSpacePath);
+		char *match = strstr(path,searchKey);
+		if (match)
+		{
+			RemovePath(path, 1);
+		}
+
+		/* Get category for the external table */
+		List *entry_locations = exttbl->locations;
+		Assert(entry_locations);
+		ListCell *entry_location = list_head(entry_locations);
+		char *url = ((Value*)lfirst(entry_location))->val.str;
+		char *category = getExtTblCategoryInFmtOptsStr(exttbl->fmtopts);
+
+		/* Remove data for internal table */
+		if (category != NULL &&
+		    pg_strncasecmp(category, "internal", strlen("internal")) == 0)
+		{
+
+
+			if (IS_HDFS_URI(url))   /* ORC, TEXT, CSV */
+			{
+				// orc, text, csv only support one location.
+				Assert(list_length(entry_locations) == 1);
+				RemovePath(url, 1);
+			}
+		}
+
+		if (category)
+		{
+			pfree(category);
+		}
+		relation_close(rel, AccessExclusiveLock);
+
+		/* Step 2. remove pg_exttable entry */
 		RemoveExtTableEntry(relid);
+	}
 
 	if (is_foreign_rel)
 		RemoveForeignTableEntry(relid);
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index b451f68..f7d4472 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -29,14 +29,17 @@
 
 #include "access/genam.h"
 #include "access/aomd.h"
+#include "access/extprotocol.h"
 #include "access/heapam.h"
 #include "access/filesplit.h"
 #include "access/parquetsegfiles.h"
+#include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/catquery.h"
 #include "catalog/pg_exttable.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_extprotocol.h"
 #include "cdb/cdbdatalocality.h"
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
@@ -52,6 +55,7 @@
 #include "optimizer/planmain.h"
 #include "parser/parsetree.h"
 #include "storage/fd.h"
+#include "parser/parse_func.h"
 #include "postmaster/identity.h"
 #include "cdb/cdbmetadatacache.h"
 #include "resourcemanager/utils/network_utils.h"
@@ -87,6 +91,11 @@
 	List *full_range_tables; /* every table include result relation  */
 } range_table_collector_context;
 
+typedef struct collect_scan_rangetable_context {
+	plan_tree_base_prefix base;
+	List *range_tables; // range table for scan only
+	List *full_range_tables;  // full range table
+} collect_scan_rangetable_context;
 /*
  * structure containing information about how much a
  * host holds.
@@ -123,10 +132,14 @@
 	int64 logiceof;
 	int host;
 	bool is_local_read;
+	char *ext_file_uri;
 } File_Split;
 
 typedef enum DATALOCALITY_RELATION_TYPE {
-	DATALOCALITY_APPENDONLY, DATALOCALITY_PARQUET, DATALOCALITY_UNKNOWN
+	DATALOCALITY_APPENDONLY,
+	DATALOCALITY_PARQUET,
+	DATALOCALITY_HDFS,
+	DATALOCALITY_UNKNOWN
 } DATALOCALITY_RELATION_TYPE;
 
 /*
@@ -290,6 +303,7 @@
  */
 typedef struct split_to_segment_mapping_context {
 	range_table_collector_context rtc_context;
+	collect_scan_rangetable_context srtc_context;
 	data_dist_stat_context dds_context;
 	collect_hdfs_split_location_context chsl_context;
 	hostname_volume_stat_context host_context;
@@ -315,9 +329,9 @@
 
 	int64 total_metadata_logic_len;
 
-    int metadata_cache_time_us;
-    int alloc_resource_time_us;
-    int cal_datalocality_time_us;
+	int metadata_cache_time_us;
+	int alloc_resource_time_us;
+	int cal_datalocality_time_us;
 } split_to_segment_mapping_context;
 
 typedef struct vseg_list{
@@ -331,14 +345,19 @@
 static void init_split_assignment_result(Split_Assignment_Result *result,
 		int host_num);
 
-static void init_datalocality_context(split_to_segment_mapping_context *context);
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+		split_to_segment_mapping_context *context);
 
 static bool range_table_collector_walker(Node *node,
 		range_table_collector_context *context);
 
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
 		range_table_collector_context *context);
 
+static bool collect_scan_rangetable(Node *node,
+		collect_scan_rangetable_context *cxt);
+
+
 static void convert_range_tables_to_oids_and_check_table_functions(List **range_tables, bool* isUDFExists,
 		MemoryContext my_memorycontext);
 
@@ -349,6 +368,8 @@
 static int64 get_block_locations_and_claculte_table_size(
 		split_to_segment_mapping_context *collector_context);
 
+static bool dataStoredInHdfs(Relation rel);
+
 static List *get_virtual_segments(QueryResource *resource);
 
 static List *run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, QueryResource ** resourcePtr,
@@ -368,6 +389,12 @@
 		Relation_Data *rel_data, int* hitblocks,
 		int* allblocks, GpPolicy *targetPolicy);
 
+static void ExternalGetHdfsFileDataLocation(Relation relation,
+		split_to_segment_mapping_context *context, int64 splitsize,
+		Relation_Data *rel_data, int* allblocks);
+
+Oid LookupCustomProtocolBlockLocationFunc(char *protoname);
+
 static BlockLocation *fetch_hdfs_data_block_location(char *filepath, int64 len,
 		int *block_num, RelFileNode rnode, uint32_t segno, double* hit_ratio);
 
@@ -464,6 +491,9 @@
 static int64 set_maximum_segment_volume_parameter(Relation_Data *rel_data,
 		int host_num, double* maxSizePerSegment);
 
+static void InvokeHDFSProtocolBlockLocation(Oid    procOid,
+                                            List  *locs,
+                                            List **blockLocations);
 /*
  * Setup /cleanup the memory context for this run
  * of data locality algorithm.
@@ -500,13 +530,17 @@
 	return;
 }
 
-static void init_datalocality_context(split_to_segment_mapping_context *context) {
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+		split_to_segment_mapping_context *context) {
 	context->old_memorycontext = CurrentMemoryContext;
 	context->datalocality_memorycontext = DataLocalityMemoryContext;
 
 	context->chsl_context.relations = NIL;
 	context->rtc_context.range_tables = NIL;
-	context->rtc_context.full_range_tables = NIL;
+	context->rtc_context.full_range_tables = plannedstmt->rtable;
+	context->srtc_context.range_tables = NIL;
+	context->srtc_context.full_range_tables = plannedstmt->rtable;
+	context->srtc_context.base.node = (Node *)plannedstmt;
 
 	context->externTableForceSegNum = 0;
 	context->externTableLocationSegNum = 0;
@@ -592,7 +626,7 @@
  * collect_range_tables: collect all range table relations, and store
  * them into the range_table_collector_context.
  */
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
 		range_table_collector_context *context) {
 
 	query_tree_walker(query, range_table_collector_walker, (void *) context,
@@ -613,9 +647,27 @@
 		}
 		context->range_tables = new_range_tables;
 	}
-	context->full_range_tables = full_range_table;
 	return;
 }
+
+bool collect_scan_rangetable(Node *node,
+		collect_scan_rangetable_context *cxt) {
+	if (NULL == node) return false;
+
+	switch (nodeTag(node)) {
+	case T_ExternalScan:
+	case T_AppendOnlyScan:
+	case T_ParquetScan: {
+		RangeTblEntry  *rte = rt_fetch(((Scan *)node)->scanrelid,
+											   cxt->full_range_tables);
+		cxt->range_tables = lappend(cxt->range_tables, rte);
+	}
+	default:
+		break;
+	}
+
+	return plan_tree_walker(node, collect_scan_rangetable, cxt);
+}
 /*
  *
  */
@@ -750,6 +802,19 @@
 					if (uri && uri->protocol == URI_CUSTOM && is_pxf_protocol(uri)) {
 						isPxf = true;
 					}
+					else if (uri && (is_hdfs_protocol(uri))) {
+						relation_close(rel, AccessShareLock);
+						if (targetPolicy->nattrs > 0)
+						{
+							/*select the maximum hash bucket number as hashSegNum of query*/
+							if (context->hashSegNum < targetPolicy->bucketnum)
+							{
+								context->hashSegNum = targetPolicy->bucketnum;
+								context->keep_hash = true;
+							}
+						}
+						continue;
+					}
 				}
 			}
 			if (extEnrty->command || isPxf) {
@@ -844,37 +909,14 @@
 		/*
 		 * We only consider the data stored in HDFS.
 		 */
-		if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
-			Relation_Data *rel_data = NULL;
-			/*
-			 * Get pg_appendonly information for this table.
-			 */
-			AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
-
-			rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
+		bool isDataStoredInHdfs = dataStoredInHdfs(rel);
+		if (isDataStoredInHdfs ) {
+			GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
+			Relation_Data *rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
 			rel_data->relid = rel_oid;
 			rel_data->files = NIL;
 			rel_data->partition_parent_relid = 0;
 			rel_data->block_count = 0;
-
-			GpPolicy *targetPolicy = NULL;
-			targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
-			/*
-			 * Based on the pg_appendonly information, calculate the data
-			 * location information associated with this relation.
-			 */
-			if (RelationIsAoRows(rel)) {
-				rel_data->type = DATALOCALITY_APPENDONLY;
-				AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
-						aoEntry->splitsize, rel_data, &hitblocks,
-						&allblocks, targetPolicy);
-			} else {
-				rel_data->type = DATALOCALITY_PARQUET;
-				ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
-						context->split_size, rel_data, &hitblocks,
-						&allblocks, targetPolicy);
-			}
-
 			bool isResultRelation = true;
 			ListCell *nonResultlc;
 			foreach(nonResultlc, context->rtc_context.range_tables)
@@ -884,6 +926,54 @@
 					isResultRelation = false;
 				}
 			}
+
+			if (!isResultRelation) {
+				// skip the relation not in scan nodes
+				// for partition table scan optimization;
+				ListCell *rtc;
+				bool found = false;
+				foreach(rtc, context->srtc_context.range_tables) {
+					RangeTblEntry *rte = lfirst(rtc);
+					if (rel_oid == rte->relid) {
+						found = true;
+						break;
+					}
+				}
+				if (!found) {
+					relation_close(rel, AccessShareLock);
+					continue;
+				}
+			}
+
+			if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+				/*
+				 * Get pg_appendonly information for this table.
+				 */
+				AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
+				/*
+				 * Based on the pg_appendonly information, calculate the data
+				 * location information associated with this relation.
+				 */
+				if (RelationIsAoRows(rel)) {
+					rel_data->type = DATALOCALITY_APPENDONLY;
+					AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+							aoEntry->splitsize, rel_data, &hitblocks,
+							&allblocks, targetPolicy);
+				} else {
+					rel_data->type = DATALOCALITY_PARQUET;
+					ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+							context->split_size, rel_data, &hitblocks,
+							&allblocks, targetPolicy);
+				}
+			} else if (RelationIsExternal(rel)) {
+				if (isDataStoredInHdfs) {
+					// we can't use metadata cache, so hitblocks will always be 0
+					rel_data->type = DATALOCALITY_HDFS;
+					ExternalGetHdfsFileDataLocation(rel, context, context->split_size,
+					                                rel_data, &allblocks);
+				}
+			}
+
 			if (!isResultRelation) {
 				total_size += rel_data->total_size;
 				totalFileCount += list_length(rel_data->files);
@@ -915,8 +1005,7 @@
 	}
 	context->total_file_count = totalFileCount;
 	context->total_size = total_size;
-    
-    context->metadata_cache_time_us = eclaspeTime;
+	context->metadata_cache_time_us = eclaspeTime;
 
 	if(debug_datalocality_time){
 		elog(LOG, "metadata overall execution time: %d us. \n", eclaspeTime);
@@ -924,6 +1013,25 @@
 	return total_size;
 }
 
+bool dataStoredInHdfs(Relation rel) {
+	if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+		return true;
+	} else if (RelationIsExternal(rel)) {
+		ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id);
+		bool isHdfsProtocol = false;
+		if (!extEnrty->command && extEnrty->locations) {
+			char* first_uri_str = (char *) strVal(lfirst(list_head(extEnrty->locations)));
+			if (first_uri_str) {
+				Uri* uri = ParseExternalTableUri(first_uri_str);
+				if (uri && is_hdfs_protocol(uri)) {
+					isHdfsProtocol = true;
+				}
+			}
+		}
+		return isHdfsProtocol;
+	}
+	return false;
+}
 /*
  * search_host_in_stat_context: search a host name in the statistic
  * context; if not found, create a new one.
@@ -1579,7 +1687,237 @@
 	return;
 }
 
+static void InvokeHDFSProtocolBlockLocation(Oid    procOid,
+                                            List  *locs,
+                                            List **blockLocations)
+{
+	ExtProtocolValidatorData   *validator_data;
+	FmgrInfo				   *validator_udf;
+	FunctionCallInfoData		fcinfo;
 
+	validator_data = (ExtProtocolValidatorData *)
+					 palloc0 (sizeof(ExtProtocolValidatorData));
+	validator_udf = palloc(sizeof(FmgrInfo));
+	fmgr_info(procOid, validator_udf);
+
+	validator_data->type 		= T_ExtProtocolValidatorData;
+	validator_data->url_list 	= locs;
+	validator_data->format_opts = NULL;
+	validator_data->errmsg		= NULL;
+	validator_data->direction 	= EXT_VALIDATE_READ;
+	validator_data->action		= EXT_VALID_ACT_GETBLKLOC;
+
+	InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
+							 /* FmgrInfo */ validator_udf,
+							 /* nArgs */ 0,
+							 /* Call Context */ (Node *) validator_data,
+							 /* ResultSetInfo */ NULL);
+
+	/* invoke validator. if this function returns - validation passed */
+	FunctionCallInvoke(&fcinfo);
+
+	ExtProtocolBlockLocationData *bls =
+		(ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
+	/* debug output block location. */
+	if (bls != NULL)
+	{
+		ListCell *c;
+		int i = 0 ,j = 0;
+		foreach(c, bls->files)
+		{
+			blocklocation_file *blf = (blocklocation_file *)(lfirst(c));
+			elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
+					     blf->file_uri, blf->block_num);
+			for ( i = 0 ; i < blf->block_num ; ++i )
+			{
+				BlockLocation *pbl = &(blf->locations[i]);
+				elog(DEBUG3, "DEBUG LOCATION for block %d : %d, "
+						     INT64_FORMAT ", " INT64_FORMAT ", %d",
+						     i,
+						     pbl->corrupt, pbl->length, pbl->offset,
+							 pbl->numOfNodes);
+				for ( j = 0 ; j < pbl->numOfNodes ; ++j )
+				{
+					elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s",
+							     i,
+							     pbl->hosts[j], pbl->names[j],
+								 pbl->topologyPaths[j]);
+				}
+			}
+		}
+	}
+
+	elog(DEBUG3, "after invoking get block location API");
+
+	/* get location data from fcinfo.resultinfo. */
+	if (bls != NULL)
+	{
+		Assert(bls->type == T_ExtProtocolBlockLocationData);
+		while(list_length(bls->files) > 0)
+		{
+			void *v = lfirst(list_head(bls->files));
+			bls->files = list_delete_first(bls->files);
+			*blockLocations = lappend(*blockLocations, v);
+		}
+	}
+	pfree(validator_data);
+	pfree(validator_udf);
+}
+
+Oid
+LookupCustomProtocolBlockLocationFunc(char *protoname)
+{
+	List*	funcname 	= NIL;
+	Oid		procOid		= InvalidOid;
+	Oid		argList[1];
+	Oid		returnOid;
+
+	char*   new_func_name = (char *)palloc0(strlen(protoname) + 16);
+	sprintf(new_func_name, "%s_blocklocation", protoname);
+	funcname = lappend(funcname, makeString(new_func_name));
+	returnOid = VOIDOID;
+	procOid = LookupFuncName(funcname, 0, argList, true);
+
+	if (!OidIsValid(procOid))
+		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
+						errmsg("protocol function %s was not found.",
+								new_func_name),
+						errhint("Create it with CREATE FUNCTION."),
+						errOmitLocation(true)));
+
+	/* check return type matches */
+	if (get_func_rettype(procOid) != returnOid)
+		ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						errmsg("protocol function %s has an incorrect return type",
+								new_func_name),
+						errOmitLocation(true)));
+
+	/* check allowed volatility */
+	if (func_volatile(procOid) != PROVOLATILE_STABLE)
+		ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+				 	 	errmsg("protocol function %s is not declared STABLE.",
+						new_func_name),
+						errOmitLocation(true)));
+	pfree(new_func_name);
+
+	return procOid;
+}
+
+static void ExternalGetHdfsFileDataLocation(
+				Relation relation,
+				split_to_segment_mapping_context *context,
+				int64 splitsize,
+				Relation_Data *rel_data,
+				int* allblocks) {
+	ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
+	Assert(ext_entry->locations != NIL);
+	int64 total_size = 0;
+	int segno = 1;
+
+	/*
+	 * Step 1. get external HDFS location from URI.
+	 */
+	char* first_uri_str = (char *) strVal(lfirst(list_head(ext_entry->locations)));
+	/* We must have at least one location. */
+	Assert(first_uri_str != NULL);
+	Uri* uri = ParseExternalTableUri(first_uri_str);
+	bool isHdfs = false;
+	if (uri != NULL && is_hdfs_protocol(uri)) {
+		isHdfs = true;
+	}
+	Assert(isHdfs);  /* Currently, we accept HDFS only. */
+
+    /*
+     * Step 2. Get function to call for getting location information. This work
+     * is done by validator function registered for this external protocol.
+     */
+	Oid procOid = InvalidOid;
+	if (isHdfs) {
+		procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
+	}
+	else
+	{
+		Assert(false);
+	}
+
+	/*
+	 * Step 3. Call validator to get location data.
+	 */
+
+	/* Prepare function call parameter by passing into location string. This is
+	 * only called at dispatcher side. */
+	List *bls = NULL; /* Block locations */
+	if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
+	{
+		InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, &bls);
+	}
+
+	/*
+	 * Step 4. Build data location info for optimization after this call.
+	 */
+
+	/* Go through each files */
+	ListCell *cbl = NULL;
+	foreach(cbl, bls)
+	{
+		blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
+		BlockLocation *locations = f->locations;
+		int block_num = f->block_num;
+		int64 logic_len = 0;
+		*allblocks += block_num;
+		if ((locations != NULL) && (block_num > 0)) {
+			// calculate length for one specific file
+			for (int j = 0; j < block_num; ++j) {
+				logic_len += locations[j].length;
+		//		locations[j].lowerBoundInc = NULL;
+		//		locations[j].upperBoundExc = NULL;
+			}
+			total_size += logic_len;
+
+			Block_Host_Index * host_index = update_data_dist_stat(context,
+					locations, block_num);
+
+			Relation_File *file = (Relation_File *) palloc(sizeof(Relation_File));
+			if (atoi(strrchr(f->file_uri, '/') + 1) > 0)
+			  file->segno = atoi(strrchr(f->file_uri, '/') + 1);
+			else
+			  file->segno = segno++;
+			file->block_num = block_num;
+			file->locations = locations;
+			file->hostIDs = host_index;
+			file->logic_len = logic_len;
+
+			// do the split logic
+			int realSplitNum = 0;
+			int split_num = file->block_num;
+			int64 offset = 0;
+			File_Split *splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
+			while (realSplitNum < split_num) {
+					splits[realSplitNum].host = -1;
+					splits[realSplitNum].is_local_read = true;
+					splits[realSplitNum].offset = offset;
+					splits[realSplitNum].length = file->locations[realSplitNum].length;
+					splits[realSplitNum].logiceof = logic_len;
+					splits[realSplitNum].ext_file_uri = pstrdup(f->file_uri);
+
+					if (logic_len - offset <= splits[realSplitNum].length) {
+							splits[realSplitNum].length = logic_len - offset;
+							++realSplitNum;
+							break;
+					}
+					offset += splits[realSplitNum].length;
+					++realSplitNum;
+			}
+			file->split_num = realSplitNum;
+			file->splits = splits;
+			context->total_split_count += realSplitNum;
+
+			rel_data->files = lappend(rel_data->files, file);
+		}
+	}
+	context->total_metadata_logic_len += total_size;
+	rel_data->total_size = total_size;
+}
 /*
  * step 1 search segments with local read and segment is not full after being assigned the block
  * step 2 search segments with local read and segment is not full before being assigned the block
@@ -4021,14 +4359,17 @@
  * fixedVsegNum is used by PBE, since all the execute should use the same number of vsegs.
  */
 SplitAllocResult *
-calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
-		List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum, int fixedVsegNum) {
+calculate_planner_segment_num(PlannedStmt *plannedstmt, Query *query,
+		QueryResourceLife resourceLife, int fixedVsegNum) {
 	SplitAllocResult *result = NULL;
 	QueryResource *resource = NULL;
-
 	List *virtual_segments = NIL;
 	List *alloc_result = NIL;
+	Node *planTree = plannedstmt->planTree;
+	GpPolicy *intoPolicy = plannedstmt->intoPolicy;
+	int sliceNum = plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1;
 	split_to_segment_mapping_context context;
+	context.chsl_context.relations = NIL;
 
 	int planner_segments = 0; /*virtual segments number for explain statement */
 
@@ -4061,9 +4402,11 @@
 	{
 		init_datalocality_memory_context();
 
-		init_datalocality_context(&context);
+		init_datalocality_context(plannedstmt, &context);
 
-		collect_range_tables(query, fullRangeTable, &(context.rtc_context));
+		collect_range_tables(query, &(context.rtc_context));
+
+		collect_scan_rangetable(planTree, &(context.srtc_context));
 
 		bool isTableFunctionExists = false;
 
@@ -4104,6 +4447,9 @@
 
 		/* get block location and calculate relation size*/
 		get_block_locations_and_claculte_table_size(&context);
+		if(context.chsl_context.relations){
+			Relation_Data* tmp = (Relation_Data*) lfirst(context.chsl_context.relations->tail);
+		}
 
 		/*use inherit resource*/
 		if (resourceLife == QRL_INHERIT) {
diff --git a/src/backend/cdb/cdbpartition.c b/src/backend/cdb/cdbpartition.c
index a887ee4..963983f 100644
--- a/src/backend/cdb/cdbpartition.c
+++ b/src/backend/cdb/cdbpartition.c
@@ -4787,9 +4787,9 @@
 static void
 fixup_table_storage_options(CreateStmt *ct)
 {
-	if (!ct->options)
+	if (!ct->base.options)
 	{
-    	ct->options = list_make2(makeDefElem("appendonly",
+    	ct->base.options = list_make2(makeDefElem("appendonly",
 											 (Node *)makeString("true")),
 								 makeDefElem("orientation",
 											 (Node *)makeString("column")));
@@ -5243,11 +5243,11 @@
 	ct = (CreateStmt *)linitial((List *)pUtl);
 	Assert(IsA(ct, CreateStmt));
 	
-	ct->is_add_part = true; /* subroutines need to know this */
+	ct->base.is_add_part = true; /* subroutines need to know this */
 	ct->ownerid = ownerid;
 	
-	if (!ct->distributedBy)
-		ct->distributedBy = make_dist_clause(rel);
+	if (!ct->base.distributedBy)
+		ct->base.distributedBy = make_dist_clause(rel);
 	
 	if (bSetTemplate)
 	/* if creating a template, silence partition name messages */
@@ -6377,7 +6377,7 @@
 	 * name of the parent relation
 	 */
 	
-	ct->relation = makeRangeVar(
+	ct->base.relation = makeRangeVar(
 								NULL /*catalogname*/, 
 								get_namespace_name(RelationGetNamespace(par_rel)),
 								RelationGetRelationName(par_rel), -1);
@@ -6429,7 +6429,7 @@
 		Oid 			 skipTableRelid	 	 = InvalidOid; 
 		List			*attr_encodings		 = NIL;
 		
-		ct->partitionBy = (Node *)pBy;
+		ct->base.partitionBy = (Node *)pBy;
 
 		/* this parse_analyze expands the phony create of a partitioned table
 		 * that we just build into the constituent commands we need to create
@@ -6517,7 +6517,7 @@
 						gs->objtype = ACL_OBJECT_RELATION;
 						gs->cooked_privs = cp;
 						
-						gs->objects = list_make1(copyObject(t->relation));
+						gs->objects = list_make1(copyObject(t->base.relation));
 						
 						pt = parse_analyze((Node *)gs, NULL, NULL, 0);
 						l1 = list_concat(l1, pt);
@@ -6545,7 +6545,7 @@
 			{
 				CreateStmt *t = (CreateStmt *)((Query *)s)->utilityStmt;
 				
-				skipTableRelid = RangeVarGetRelid(t->relation, true, false /*allowHcatalog*/);
+				skipTableRelid = RangeVarGetRelid(t->base.relation, true, false /*allowHcatalog*/);
 			}
 		}
 
@@ -8376,7 +8376,7 @@
 	/* Caller should check this! */
 	Assert(stmt->partitionBy && !stmt->is_part_child);
 	
-	foreach( lc_elt, stmt->tableElts )
+	foreach( lc_elt, stmt->base.tableElts )
 	{
 		Node * elt = lfirst(lc_elt);
 		
@@ -8512,7 +8512,7 @@
 				FkConstraint *fcon = list_nth(unnamed_cons, i);
 			
 				fcon->constr_name = 
-					ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+					ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
 														   colname,
 														   label,
 														   used_names);
@@ -8528,7 +8528,7 @@
 				if ( 0 != strcmp(label, "pkey") )
 					colname = list_nth(unnamed_cons_col, i);
 				
-				con->name = ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+				con->name = ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
 																   colname,
 																   label,
 																   used_names);
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index ddd9677..ab70e5a 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -54,6 +54,7 @@
 #include "access/aosegfiles.h"
 #include "access/parquetsegfiles.h"
 #include "access/hash.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
@@ -64,6 +65,7 @@
 #include "cdb/cdbheap.h"
 #include "cdb/cdbhash.h"
 #include "commands/vacuum.h"
+#include "commands/dbcommands.h"
 #include "executor/executor.h"
 #include "lib/stringinfo.h"
 #include "libpq/pqformat.h"             /* pq_beginmessage() etc. */
@@ -80,12 +82,15 @@
 #include "utils/memutils.h"
 #include "utils/syscache.h"
 #include "utils/tuplesort.h"
+#include "utils/palloc.h"
 #include "utils/pg_locale.h"
 #include "utils/builtins.h"
 #include "utils/inval.h"
+#include "utils/uri.h"
 #include "cdb/cdbvars.h"
 #include "cdb/cdbanalyze.h"
 #include "cdb/cdbrelsize.h"
+#include "cdb/cdbdatalocality.h"
 #include "utils/fmgroids.h"
 #include "storage/backendid.h"
 #include "executor/spi.h"
@@ -93,6 +98,8 @@
 #include "catalog/pg_namespace.h"
 #include "utils/debugbreak.h"
 #include "nodes/makefuncs.h"
+#include "nodes/nodes.h"
+#include "nodes/parsenodes.h"
 
 #include "commands/analyzeutils.h"
 
@@ -153,9 +160,15 @@
 static void gp_statistics_estimate_reltuples_relpages_heap(Relation rel, float4 *reltuples, float4 *relpages);
 static void gp_statistics_estimate_reltuples_relpages_ao_rows(Relation rel, float4 *reltuples, float4 *relpages);
 static void gp_statistics_estimate_reltuples_relpages_parquet(Relation rel, float4 *reltuples, float4 *relpages);
+static void gp_statistics_estimate_reltuples_relpages_external(Relation rel, float4 *relTuples, float4 *relPages);
 static void analyzeEstimateReltuplesRelpages(Oid relationOid, float4 *relTuples, float4 *relPages, bool rootonly);
 static void analyzeEstimateIndexpages(Oid relationOid, Oid indexOid, float4 *indexPages);
 
+static void getExternalRelTuples(Oid relationOid, float4 *relTuples);
+static void getExternalRelPages(Oid relationOid, float4 *relPages , Relation rel);
+static float4 getExtrelPagesHDFS(Uri *uri);
+static bool isExternalHDFSProtocol(Oid relOid);
+
 /* Attribute-type related functions */
 static bool isOrderedAndHashable(Oid relationOid, const char *attributeName);
 static bool isBoolType(Oid relationOid, const char *attributeName);
@@ -442,6 +455,17 @@
 							"Please run ANALYZE on the root partition table.",
 							get_rel_name(relationOid))));
 		}
+		else if (!isExternalHDFSProtocol(relationOid))
+		{
+					/*
+					 * Support analyze for external table.
+					 * For now, HDFS protocol external table is supported.
+					 */
+			ereport(WARNING,
+					 (errmsg("skipping \"%s\" --- cannot analyze external table with non-HDFS or non-MAGMA protocol. "
+					         "Please run ANALYZE on external table with HDFS or MAGMA protocol.",
+					         get_rel_name(relationOid))));
+		}
 		else
 		{
 			lRelOids = list_make1_oid(relationOid);
@@ -989,8 +1013,10 @@
 	while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx)))
 	{
 		Oid candidateOid = HeapTupleGetOid(tuple);
-		if (analyzePermitted(candidateOid)
-						&& candidateOid != StatisticRelationId)
+		bool isExternalHDFS = isExternalHDFSProtocol(candidateOid);
+		if (analyzePermitted(candidateOid) &&
+				    candidateOid != StatisticRelationId &&
+				    isExternalHDFS)
 		{
 			*fullRelOids = lappend_oid(*fullRelOids, candidateOid);
 		}
@@ -998,8 +1024,9 @@
 		{
 			continue;
 		}
-		if (analyzePermitted(candidateOid)
-				&& candidateOid != StatisticRelationId)
+		if (analyzePermitted(candidateOid) &&
+				    candidateOid != StatisticRelationId &&
+				    isExternalHDFS)
 		{
 			lRelOids = lappend_oid(lRelOids, candidateOid);
 		}
@@ -1705,6 +1732,11 @@
 				gp_statistics_estimate_reltuples_relpages_parquet(rel, &partRelTuples,
 						&partRelPages);
 			}
+			else if( RelationIsExternal(rel))
+			{
+				gp_statistics_estimate_reltuples_relpages_external(rel, &partRelTuples,
+						&partRelPages);
+			}
 
 			relation_close(rel, AccessShareLock);
 			*relPages += partRelPages;
@@ -3266,3 +3298,380 @@
 	pfree(fstotal);
 	return;
 }
+
+/**
+ * This method estimates the number of tuples and pages in an extern relation. We can not get accurate tuple counts
+ * and pages counts in the catalog. Therefore, we have to get reltuples and relpages manually.
+ *
+ * Input:
+ * 	rel - Relation. Must be an external table.
+ *
+ * Output:
+ * 	reltuples - exact number of tuples in relation.
+ * 	relpages  - exact number of pages.
+ */
+static void gp_statistics_estimate_reltuples_relpages_external(Relation rel, float4 *relTuples, float4 *relPages){
+	Oid extRelationOid = RelationGetRelid(rel);
+	getExternalRelTuples(extRelationOid, relTuples);
+	getExternalRelPages(extRelationOid, relPages, rel);
+}
+
+/**
+ * This method called by analyzeExternalEstimateReltuplesRelpages,
+ * to get External Relation reltuple counts, we run count(*) sql manually
+ *
+ * Input:
+ * 	extRelationOid - External Table Relation Oid
+ * Output:
+ * 	relTuples - exact number of tuples in relation.
+ */
+static void getExternalRelTuples(Oid extRelationOid, float4 *relTuples){
+	const char *schemaName = NULL;
+	const char *tableName = NULL;
+	schemaName = get_namespace_name(get_rel_namespace(extRelationOid)); /* must be pfreed */
+	tableName = get_rel_name(extRelationOid); /* must be pfreed */
+
+	StringInfoData str;
+	initStringInfo(&str);
+	appendStringInfo(&str, "select count(*)::float4 from %s.%s as Ta",
+			quote_identifier(schemaName),
+			quote_identifier(tableName));
+
+	spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+								spiCallback_getSingleResultRowColumnAsFloat4, relTuples);
+	pfree(tableName);
+	pfree(schemaName);
+	pfree(str.data);
+}
+
+/**
+ * This method called by analyzeExternalEstimateReltuplesRelpages,to get External Relation relpages counts.
+ * We call GetExtTableEntry method to get get List of external Table Locations.And then we go through every
+ * location url to sum the count of relpages.External Relation now support some different protocals, therefore
+ * we need to process them in different way.
+ *
+ * Input:
+ * 	extRelationOid - External Table Relation Oid
+ * Output:
+ * 	relTuples - exact number of pages in relation.
+ */
+static void getExternalRelPages(Oid extRelationOid, float4 *relPages , Relation rel){
+
+	ExtTableEntry* entry = GetExtTableEntry(extRelationOid);
+	List* extLocations = entry->locations;
+	int	num_urls = list_length(extLocations);
+	ListCell *cell = list_head(extLocations);
+	ListCell *cellTmp = NULL;
+	while(cell != NULL)
+	{
+		char *url = pstrdup(((Value*)lfirst(cell))->val.str);
+		Assert(url != NULL);
+		Uri *uri = ParseExternalTableUri(url);
+		Assert(uri != NULL);
+		switch (uri->protocol){
+			case URI_HDFS:
+				*relPages += getExtrelPagesHDFS(uri);
+				break;
+
+			/*
+			 * to be done
+			 */
+			case URI_GPFDIST:
+                *relPages = 1.0;
+                elog(NOTICE,"In external table ANALYZE command are not supported in GPFDIST location so far.");
+                break;
+            case URI_FILE:
+                *relPages = 1.0;
+                elog(NOTICE,"In external table ANALYZE command are not supported in FILE location so far.");
+                break;
+			case URI_FTP:
+				*relPages = 1.0;
+				elog(NOTICE,"In external table ANALYZE command are not supported in FTP location so far.");
+                break;
+			case URI_HTTP:
+				*relPages = 1.0;
+				elog(NOTICE,"In external table ANALYZE command are not supported in HTTP location so far.");
+                break;
+			case URI_CUSTOM:
+				*relPages = 1.0;
+				elog(NOTICE,"In external table ANALYZE command are not supported in CUSTOM location so far.");
+                break;
+			case URI_GPFDISTS:
+				*relPages = 1.0;
+				elog(NOTICE,"In external table ANALYZE command are not supported in GPFDISTS location so far.");
+                break;
+			default:
+				*relPages = 1.0;
+                elog(NOTICE,"should not go here");
+				break;
+		}
+
+		cell = cell->next;
+
+		/* free resourse */
+		pfree(url);
+		if(uri->customprotocol != NULL){ pfree(uri->customprotocol);}
+		pfree(uri->hostname);
+		if(uri->path!=NULL){pfree(uri->path);}
+		FreeExternalTableUri(uri);
+	}
+	/* pfree entry->location*/
+	list_free_deep(extLocations);
+	/* pfree entry */
+	if(entry->fmtopts != NULL){ pfree(entry->fmtopts);}
+	if(entry->command != NULL){ pfree(entry->command);}
+	pfree(entry);
+}
+
+/**
+ * This method get the number of pages external table which location uri protocol is HDFS. We hold that
+ * the concept of the page number in  external table is same as the concept of block number in hdfs.
+ * Therefore we get the number of pages for external table by get the number of blocks in hdfs
+ *
+ * Input:
+ * 	uri - hdfs uri which refers to external table storage location, uri can refer to a file or a folder
+ *
+ */
+static float4 getExtrelPagesHDFS(Uri *uri){
+	int numOfBlock = 0;
+	int nsize = 0;
+	float4 relpages = 0.0;
+	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+
+	//hdfsFileInfo *fiarray = hdfsGetPathInfo(fs, uri->path);
+	hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+	if (fs == NULL)
+	{
+		elog(ERROR, "hdfsprotocol_blocklocation : "
+					"failed to get files of path %s",
+					uri->path);
+	}
+
+	/* Call block location api to get data location for each file */
+	for (int i = 0 ; i < nsize ; i++)
+	{
+//		FscHdfsFileInfoC *fi = FscHdfsGetFileInfoFromArray(fiarray, i);
+		hdfsFileInfo *fi = &fiarray[i];
+		/* break condition of this for loop */
+		if (fi == NULL) {break;}
+
+		/* Build file name full path. */
+		const char *fname = fi->mName;
+		char *fullpath = palloc0(
+								 strlen(fname) +      /* name  */
+								 1);                  /* \0    */
+		sprintf(fullpath, "%s/%s", uri->path, fname);
+
+		/* Get file full length. */
+	//	int64_t len = FscHdfsGetFileInfoLength(fi);
+		int64_t len = fi->mSize;
+		if (len == 0) {
+			pfree(fullpath);
+			continue;
+		}
+
+		/* Get block location data for this file */
+		BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
+		if (bla == NULL)
+		{
+			elog(ERROR, "hdfsprotocol_blocklocation : "
+						"failed to get block location of path %s. "
+						"It is reported generally due to HDFS service errors or "
+						"another session's ongoing writing.",
+						fullpath);
+		}
+
+		relpages += numOfBlock;
+
+
+		/* We don't need it any longer */
+		pfree(fullpath);
+
+		/* Clean up block location instances created by the lib. */
+		hdfsFreeFileBlockLocations(&bla,numOfBlock);
+	}
+
+	/* Clean up file info array created by the lib for this location. */
+//	FscHdfsFreeFileInfoArrayC(&fiarray);
+	hdfsFreeFileInfo(fiarray,nsize);
+	hdfsDisconnect(fs);
+	return relpages;
+}
+
+
+/*
+ * Get total bytes of external table with HDFS protocol
+ */
+uint64 GetExternalTotalBytesHDFS(Uri *uri)
+{
+	uint64 totalBytes = 0;
+	int nsize = 0;
+
+	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+
+	hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+	if (fiarray == NULL)
+	{
+		elog(ERROR, "hdfsprotocol_blocklocation : "
+		            "failed to get files of path %s.",
+		            uri->path);
+	}
+
+	/* Call block location api to get data location for each file */
+	for (int i = 0 ; i < nsize ; i++)
+	{
+		hdfsFileInfo *fi = &fiarray[i];
+
+		/* Break condition of this for loop */
+		if (fi == NULL)
+		{
+			break;
+		}
+
+		/* Get file full length. */
+		totalBytes += fi->mSize;
+
+	}
+
+	/* Clean up file info array created by the lib for this location. */
+	hdfsFreeFileInfo(fiarray,nsize);
+		hdfsDisconnect(fs);
+
+	return totalBytes;
+}
+
+/*
+ * Get total bytes of external table
+ */
+uint64 GetExternalTotalBytes(Relation rel)
+{
+	Oid extRelOid = RelationGetRelid(rel);
+	ExtTableEntry *entry = GetExtTableEntry(extRelOid);
+	List *extLocations = entry->locations;
+	int num_urls = list_length(extLocations);
+	ListCell *cell = list_head(extLocations);
+	ListCell *cellTmp = NULL;
+	uint64 totalBytes = 0;
+
+	while(cell != NULL)
+	{
+		char *url = pstrdup(((Value*)lfirst(cell))->val.str);
+		Assert(url != NULL);
+
+		Uri *uri = ParseExternalTableUri(url);
+		Assert(uri != NULL);
+
+		switch (uri->protocol)
+		{
+		case URI_HDFS:
+			totalBytes += GetExternalTotalBytesHDFS(uri);
+			break;
+		/*
+		 * Support analyze for external table.
+		 * For now, HDFS protocol external table is supported.
+		 */
+		case URI_GPFDIST:
+			totalBytes += 0;
+			elog(ERROR,"In external table ANALYZE command are not supported in GPFDIST location so far.");
+			break;
+
+		case URI_FILE:
+			totalBytes += 0;
+			elog(ERROR,"In external table ANALYZE command are not supported in FILE location so far.");
+			break;
+
+		case URI_FTP:
+			totalBytes += 0;
+			elog(ERROR,"In external table ANALYZE command are not supported in FTP location so far.");
+			break;
+
+		case URI_HTTP:
+			totalBytes += 0;
+			elog(ERROR,"In external table ANALYZE command are not supported in HTTP location so far.");
+			break;
+
+		case URI_CUSTOM:
+			totalBytes += 0;
+			elog(ERROR,"In external table ANALYZE command are not supported in CUSTOM location so far.");
+			break;
+
+		case URI_GPFDISTS:
+			totalBytes += 0;
+			elog(ERROR,"In external table ANALYZE command are not supported in GPFDISTS location so far.");
+			break;
+
+		default:
+			totalBytes += 0;
+			elog(ERROR,"should not go here");
+			break;
+		}
+
+		cell = cell->next;
+
+		/* free resourse */
+		pfree(url);
+		if (uri->customprotocol != NULL)
+		{
+			pfree(uri->customprotocol);
+		}
+		pfree(uri->hostname);
+
+		if (uri->path != NULL)
+		{
+			pfree(uri->path);
+		}
+		FreeExternalTableUri(uri);
+
+	}
+	/* pfree entry->location*/
+	list_free_deep(extLocations);
+	/* pfree entry */
+	if (entry->fmtopts != NULL)
+	{
+		pfree(entry->fmtopts);
+	}
+	if (entry->command != NULL)
+	{
+		pfree(entry->command);
+	}
+	pfree(entry);
+
+	return totalBytes;
+}
+
+/*
+ * Check if a relation is external table with HDFS protocol
+ */
+static bool isExternalHDFSProtocol(Oid relOid)
+{
+	bool ret = true;
+
+	Relation rel = try_relation_open(relOid, AccessShareLock, false);
+	if (rel != NULL)
+	{
+		if ((rel->rd_rel->relkind == RELKIND_RELATION) &&
+		    RelationIsExternal(rel))
+		{
+			ExtTableEntry* entry = GetExtTableEntry(relOid);
+			List* extLocations = entry->locations;
+			ListCell *cell = list_head(extLocations);
+			while(cell != NULL)
+			{
+				char *url = ((Value*)lfirst(cell))->val.str;
+				Assert(url != NULL);
+			//	if (!IS_HDFS_URI(url))
+				if (!IS_HDFS_URI(url))
+				{
+					ret = false;
+					break;
+				}
+
+				cell = cell->next;
+			}
+		}
+
+		relation_close(rel, AccessShareLock);
+	}
+
+	return ret;
+}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6e08bf0..9c21148 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -4308,8 +4308,9 @@
 					else if (formatterType != ExternalTableType_PLUG)
 					{
 						resultRelInfo->ri_extInsertDesc =
-								external_insert_init(resultRelInfo->ri_RelationDesc,
-							                         0, formatterType, formatterName);
+								resultRelInfo->ri_extInsertDesc =
+										external_insert_init(resultRelInfo->ri_RelationDesc,
+											                      0, formatterType, formatterName, NULL);
 					}
 					else
 					{
@@ -4323,11 +4324,20 @@
 							FmgrInfo *insertInitFunc = (FmgrInfo *)palloc(sizeof(FmgrInfo));
 							fmgr_info(procOid, insertInitFunc);
 
+							ResultRelSegFileInfo *segfileinfo = NULL;
+							ResultRelInfoSetSegFileInfo(resultRelInfo,
+												cstate->ao_segfileinfos);
+							segfileinfo = (ResultRelSegFileInfo *) list_nth(
+												resultRelInfo->ri_aosegfileinfos,
+												GetQEIndex());
+
 							resultRelInfo->ri_extInsertDesc =
-							        InvokePlugStorageFormatInsertInit(insertInitFunc,
-							                                          resultRelInfo->ri_RelationDesc,
-							                                          formatterType,
-							                                          formatterName);
+									InvokePlugStorageFormatInsertInit(insertInitFunc,
+						                                      resultRelInfo->ri_RelationDesc,
+														                 formatterType,
+														                 formatterName,
+																		  NULL,
+														                  segfileinfo->segno);
 
 							pfree(insertInitFunc);
 						}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index bd8b7d4..dd85f32 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -903,6 +903,7 @@
 				   errOmitLocation(true)));
 }
 
+
 static void
 ComputeIndexAttrs(IndexInfo *indexInfo,
 				  Oid *classOidP,
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index f1259d6..c82d570 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -65,6 +65,8 @@
 
 #include "postmaster/seqserver.h"
 
+#include "catalog/pg_exttable.h"
+
 
 /*
  * We don't want to log each fetching of a value from a sequence,
@@ -415,7 +417,7 @@
 	stmt->oidInfo.aosegIndexOid = 0;
 	stmt->oidInfo.aoblkdirOid = 0;
 	stmt->oidInfo.aoblkdirIndexOid = 0;
-	stmt->tableElts = NIL;
+	stmt->base.tableElts = NIL;
 	for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
 	{
 		ColumnDef  *coldef = makeNode(ColumnDef);
@@ -478,20 +480,20 @@
 				value[i - 1] = BoolGetDatum(false);
 				break;
 		}
-		stmt->tableElts = lappend(stmt->tableElts, coldef);
+		stmt->base.tableElts = lappend(stmt->base.tableElts, coldef);
 	}
 
-	stmt->relation = seq->sequence;
-	stmt->inhRelations = NIL;
-	stmt->constraints = NIL;
-	stmt->options = list_make1(defWithOids(false));
-	stmt->oncommit = ONCOMMIT_NOOP;
-	stmt->tablespacename = NULL;
-	stmt->relKind = RELKIND_SEQUENCE;
+	stmt->base.relation = seq->sequence;
+	stmt->base.inhRelations = NIL;
+	stmt->base.constraints = NIL;
+	stmt->base.options = list_make1(defWithOids(false));
+	stmt->base.oncommit = ONCOMMIT_NOOP;
+	stmt->base.tablespacename = NULL;
+	stmt->base.relKind = RELKIND_SEQUENCE;
 	stmt->oidInfo.comptypeOid = seq->comptypeOid;
 	stmt->ownerid = GetUserId();
 
-	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP);
+	seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP, NonCustomFormatType);
 
     /*
      * Open and lock the new sequence.  (This lock is redundant; an
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c920260..156e04c 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -32,6 +32,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "fmgr.h"
 
 #include <math.h>
 #include <fcntl.h>
@@ -49,6 +50,7 @@
 #include "access/reloptions.h"
 #include "access/xact.h"
 #include "access/transam.h"
+#include "access/plugstorage.h"
 #include "catalog/catalog.h"
 #include "catalog/catquery.h"
 #include "catalog/dependency.h"
@@ -68,6 +70,7 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_operator.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
 #include "catalog/pg_tablespace.h"
@@ -94,6 +97,8 @@
 #include "nodes/makefuncs.h"
 #include "nodes/print.h"
 #include "nodes/relation.h"
+#include "nodes/parsenodes.h"
+#include "nodes/value.h"
 #include "optimizer/clauses.h"
 #include "optimizer/plancat.h"
 #include "optimizer/planner.h"
@@ -126,6 +131,7 @@
 #include "utils/syscache.h"
 #include "utils/tqual.h"
 #include "utils/uri.h"
+#include "utils/formatting.h"
 #include "mb/pg_wchar.h"
 
 #include "cdb/cdbdisp.h"
@@ -143,6 +149,7 @@
 #include "cdb/cdbquerycontextdispatching.h"
 #include "cdb/dispatcher.h"
 #include "cdb/cdbdispatchresult.h"
+#include "optimizer/planmain.h"
 
 /*
  * ON COMMIT action list
@@ -387,12 +394,35 @@
 					   const char *newname,
 					   bool fk_scan,
 					   bool update_relname);
-static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool iswritable, bool* isCustom);
+static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb,
+		bool iswritable, bool forceCreateDir, bool* isCustom);
 static Datum transformExecOnClause(List	*on_clause, int *preferred_segment_num, bool iswritable);
 static char transformFormatType(char *formatname);
-static Datum transformFormatOpts(char formattype, List *formatOpts, int numcols, bool iswritable);
+static void checkCustomFormatOptString(char **opt,
+                                       const char *key,
+                                       const char *val,
+                                       const bool needopt,
+                                       const int nvalidvals,
+                                       const char **validvals);
+static void checkCustomFormatEncoding(const char *formatterName,
+                                      const char *encodingName);
+static char *checkCustomFormatOptions(const char *formatterName,
+                                      List *formatOpts,
+                                      const bool isWritable,
+                                      const char *delimiter);
+static void checkCustomFormatDateTypes(const char *formatterName,
+                                       TupleDesc tupleDesc);
+static Datum transformFormatOpts(char formattype,
+                                 char *formatname,
+                                 char *formattername,
+                                 List *formatOpts,
+                                 int numcols,
+                                 bool iswritable,
+                                 GpPolicy *policy);
 
-static Oid DefineRelation_int(CreateStmt *stmt, char relkind, char relstorage);
+Oid DefineRelation(CreateStmt *stmt, char relkind, char relstorage, const char *formattername);
+static Oid DefineRelation_int(CreateStmt *stmt, char relkind,
+                              char relstorage, const char *formattername);
 
 static void ATExecPartAddInternal(Relation rel, Node *def);
 
@@ -407,12 +437,14 @@
 static bool prebuild_temp_table(Relation rel, RangeVar *tmpname, List *distro,
 								List *opts, List **hidden_types, bool isTmpTableAo);
 static void ATPartitionCheck(AlterTableType subtype, Relation rel, bool rejectroot, bool recursing);
-static void InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, List *locs, List* fmtopts);
-
+static void InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, bool forceCreateDir,
+		List *locs, List* fmtopts);
 static char *alterTableCmdString(AlterTableType subtype);
 
 static bool isPgDefaultTablespace(const char *tablespacename);
 
+static Oid LookupCustomProtocolValidatorFunc(char *protoname);
+
 bool
 isPgDefaultTablespace(const char *tablespacename){
 	if(tablespacename == NULL)
@@ -432,20 +464,20 @@
  * ----------------------------------------------------------------
  */
 Oid
-DefineRelation(CreateStmt *stmt, char relkind, char relstorage)
+DefineRelation(CreateStmt *stmt, char relkind, char relstorage, const char *formattername)
 {
     Oid reloid = 0;
-    Assert(stmt->relation->schemaname == NULL || strlen(stmt->relation->schemaname)>0);
+    Assert(stmt->base.relation->schemaname == NULL || strlen(stmt->base.relation->schemaname)>0);
 
     /* forbid create non-system table on tablespace pg_default */
-    if((!IsBootstrapProcessingMode()) && (isPgDefaultTablespace(stmt->tablespacename)))
+    if((!IsBootstrapProcessingMode()) && (isPgDefaultTablespace(stmt->base.tablespacename)))
     {
     	ereport(ERROR,
     					(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
     					errmsg("Creating table on tablespace 'pg_default' is not allowed")));
     }
 
-    reloid = DefineRelation_int(stmt, relkind, relstorage);
+    reloid = DefineRelation_int(stmt, relkind, relstorage, formattername);
 
     return reloid;
 }
@@ -454,11 +486,12 @@
 Oid
 DefineRelation_int(CreateStmt *stmt,
                    char relkind,
-                   char relstorage)
+                   char relstorage,
+                   const char *formattername)
 {
 	char		relname[NAMEDATALEN];
 	Oid			namespaceId;
-	List	   *schema = stmt->tableElts;
+	List	   *schema = stmt->base.tableElts;
 	Oid			relationId = InvalidOid;
 	Oid			tablespaceId;
 	Relation	rel;
@@ -482,12 +515,12 @@
 	 * Truncate relname to appropriate length (probably a waste of time, as
 	 * parser should have done this already).
 	 */
-	StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
+	StrNCpy(relname, stmt->base.relation->relname, NAMEDATALEN);
 
 	/*
 	 * Check consistency of arguments
 	 */
-	if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
+	if (stmt->base.oncommit != ONCOMMIT_NOOP && !stmt->base.relation->istemp)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 				 errmsg("ON COMMIT can only be used on temporary tables"),
@@ -498,7 +531,7 @@
 	 * Check we have permission to create there. Skip check if bootstrapping,
 	 * since permissions machinery may not be working yet.
 	 */
-	namespaceId = RangeVarGetCreationNamespace(stmt->relation);
+	namespaceId = RangeVarGetCreationNamespace(stmt->base.relation);
 
 	if (!IsBootstrapProcessingMode())
 	{
@@ -532,21 +565,21 @@
 		 * We shouldn't go through the regular default case for these because we
 		 * don't want to pick up the value from the default_tablespace guc.
 		 */
-		Assert(!stmt->tablespacename);
+		Assert(!stmt->base.tablespacename);
 		tablespaceId = InvalidOid;
 	}
-	else if (stmt->tablespacename)
+	else if (stmt->base.tablespacename)
 	{
 		/*
 		 * Tablespace specified on the command line, or was passed down by
 		 * dispatch.
 		 */
-		tablespaceId = get_tablespace_oid(stmt->tablespacename);
+		tablespaceId = get_tablespace_oid(stmt->base.tablespacename);
 		if (!OidIsValid(tablespaceId))
 			ereport(ERROR,
 					(errcode(ERRCODE_UNDEFINED_OBJECT),
 					 errmsg("tablespace \"%s\" does not exist",
-							stmt->tablespacename),
+							stmt->base.tablespacename),
 					 errOmitLocation(true)));
 	}
 	else
@@ -573,7 +606,7 @@
 	/*
 	 * Parse and validate reloptions, if any.
 	 */
-	reloptions = transformRelOptions((Datum) 0, stmt->options, true, false);
+	reloptions = transformRelOptions((Datum) 0, stmt->base.options, true, false);
 
 	/*
 	 * Accept and only accept tidycat option during upgrade.
@@ -610,17 +643,17 @@
 	 * inherited attributes. Update the offsets of the distribution attributes
 	 * in GpPolicy if necessary.
 	 */
-	isPartitioned = stmt->partitionBy ? true : false;
-	schema = MergeAttributes(schema, stmt->inhRelations,
-							 stmt->relation->istemp, isPartitioned,
+	isPartitioned = stmt->base.partitionBy ? true : false;
+	schema = MergeAttributes(schema, stmt->base.inhRelations,
+							 stmt->base.relation->istemp, isPartitioned,
 							 &inheritOids, &old_constraints, &parentOidCount, stmt->policy);
 
 	/*
 	 * If a partition table, and user not specify pagesize and rowgroupsize, specify the default
 	 * pagesize to 1MB, rowgroupsize to 8MB.
 	 */
-	if((stmt->partitionBy) || (stmt->is_part_child)){
-		reloptions = AddDefaultPageRowGroupSize(reloptions, stmt->options);
+	if((stmt->base.partitionBy) || (stmt->base.is_part_child)){
+		reloptions = AddDefaultPageRowGroupSize(reloptions, stmt->base.options);
 	}
 
 	/*
@@ -632,10 +665,21 @@
 	 */
 	descriptor = BuildDescForRelation(schema);
 
-	localHasOids = interpretOidsOption(stmt->options);
+	localHasOids = interpretOidsOption(stmt->base.options);
 	descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
 
 	/*
+	 * Check supported data types for pluggable format, i.e., orc
+	 * Need to remove this check if all data types are supported for orc format.
+	 */
+	if ((relkind == RELKIND_RELATION) && (relstorage == RELSTORAGE_EXTERNAL) &&
+	    formattername && pg_strncasecmp(formattername, "text", strlen("text")) &&
+		pg_strncasecmp(formattername, "csv", strlen("csv")))
+	{
+		checkCustomFormatDateTypes(formattername, descriptor);
+	}
+
+	/*
 	 * old_constraints: pre-cooked constraints from CREATE TABLE ... INHERIT ...
 	 * stmt->constraints: might have some pre-cooked constraints passed by analyze.c,
 	 * 		due to LIKE tab INCLUDING CONSTRAINTS
@@ -651,8 +695,8 @@
 		{
 			Constraint *cdef = (Constraint *) lfirst(listptr);
 			if (cdef->contype == CONSTR_CHECK &&
-					add_nonduplicate_cooked_constraint(cdef, stmt->constraints))
-				stmt->constraints = lappend(stmt->constraints, cdef);
+					add_nonduplicate_cooked_constraint(cdef, stmt->base.constraints))
+				stmt->base.constraints = lappend(stmt->base.constraints, cdef);
 		}
 	}
 
@@ -674,8 +718,8 @@
     }
 
 	/* MPP-8405: disallow OIDS on partitioned tables */
-	if ((stmt->partitionBy ||
-		 stmt->is_part_child) &&
+	if ((stmt->base.partitionBy ||
+		 stmt->base.is_part_child) &&
 		descriptor->tdhasoid && 
 		IsNormalProcessingMode() &&
         (Gp_role == GP_ROLE_DISPATCH))
@@ -690,7 +734,7 @@
     if (stmt->oidInfo.relOid)
         elog(DEBUG4, "DefineRelation relOid=%d schemaname=%s ",
              stmt->oidInfo.relOid,
-             stmt->relation->schemaname ? stmt->relation->schemaname : "");
+             stmt->base.relation->schemaname ? stmt->base.relation->schemaname : "");
 
 	relationId = heap_create_with_catalog(relname,
 										  namespaceId,
@@ -705,7 +749,7 @@
 										  localHasOids,
 										  /* bufferPoolBulkLoad */ false,
 										  parentOidCount,
-										  stmt->oncommit,
+										  stmt->base.oncommit,
                                           stmt->policy,  /*CDB*/
                                           reloptions,
 										  allowSystemTableModsDDL,
@@ -729,7 +773,7 @@
 	 * backends can't see the new rel anyway until we commit), but it keeps
 	 * the lock manager from complaining about deadlock risks.
 	 */
-	if (stmt->is_part_child)
+	if (stmt->base.is_part_child)
 		rel = relation_open(relationId, NoLock);
 	else
 		rel = relation_open(relationId, AccessExclusiveLock);
@@ -780,8 +824,8 @@
 	/*
 	 * Parse and add the defaults/constraints, if any.
 	 */
-	if (rawDefaults || stmt->constraints)
-		AddRelationConstraints(rel, rawDefaults, stmt->constraints);
+	if (rawDefaults || stmt->base.constraints)
+		AddRelationConstraints(rel, rawDefaults, stmt->base.constraints);
 
 	if (stmt->attr_encodings)
 		AddRelationAttributeEncodings(rel, stmt->attr_encodings);
@@ -939,26 +983,174 @@
 	char*					  commandString = NULL;
 	char					  rejectlimittype = '\0';
 	char					  formattype;
+	char*					  formattername = NULL;
 	int						  rejectlimit = -1;
 	int						  encoding = -1;
 	int preferred_segment_num = -1;
 	bool					  issreh = false; /* is single row error handling requested? */
+	bool					  isexternal = createExtStmt->isexternal;
 	bool					  iswritable = createExtStmt->iswritable;
 	bool					  isweb = createExtStmt->isweb;
+	bool 					  forceCreateDir = createExtStmt->forceCreateDir;
+
+	bool 					  isExternalHdfs = false;
+	char*					  location = NULL;
+	int						  location_len = NULL;
 
 	/*
 	 * now set the parameters for keys/inheritance etc. Most of these are
 	 * uninteresting for external relations...
 	 */
-	createStmt->relation = createExtStmt->relation;
-	createStmt->tableElts = createExtStmt->tableElts;
-	createStmt->inhRelations = NIL;
-	createStmt->constraints = NIL;
-	createStmt->options = NIL;
-	createStmt->oncommit = ONCOMMIT_NOOP;
-	createStmt->tablespacename = NULL;
+	createStmt->base = createExtStmt->base;
+	// external table options is not compatible with internal table
+	// set NIL here
+	createStmt->base.options = NIL;
 	createStmt->policy = createExtStmt->policy; /* policy was set in transform */
-	
+
+	 /*
+	 * Recognize formatter option if there are some tokens found in parser.
+	 * This design is to give CREATE EXTERNAL TABLE DDL the flexiblity to
+	 * support user defined external formatter options.
+	 */
+	recognizeExternalRelationFormatterOptions(createExtStmt);
+
+	/*
+	 * Get tablespace, database, schema for the relation
+	 */
+	RangeVar *rel = createExtStmt->base.relation;
+	// get tablespace name for the relation
+	Oid tablespace_id = (gp_upgrade_mode) ? DEFAULTTABLESPACE_OID : GetDefaultTablespace();
+	if (!OidIsValid(tablespace_id))
+	{
+		tablespace_id = get_database_dts(MyDatabaseId);
+	}
+	char *tablespace_name = get_tablespace_name(tablespace_id);
+
+	// get database name for the relation
+	char *database_name = rel->catalogname ? rel->catalogname : get_database_name(MyDatabaseId);
+
+	// get schema name for the relation
+	char *schema_name = get_namespace_name(RangeVarGetCreationNamespace(rel));
+
+	// get table name for the relation
+	char *table_name = rel->relname;
+
+	/*
+	 * Do some special logic when we use custom
+	 */
+	if (exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION)
+	{
+		if (exttypeDesc->location_list == NIL)
+		{
+			if (dfs_url == NULL)
+			{
+				ereport(ERROR,
+				        (errcode(ERRCODE_SYNTAX_ERROR),
+				         errmsg("Cannot create table on HDFS when the service is not available"),
+				         errhint("Check HDFS service and hawq_dfs_url configuration"),
+				         errOmitLocation(true)));
+			}
+
+			location_len = strlen(PROTOCOL_HDFS) +         /* hdfs:// */
+			               strlen(dfs_url) +               /* hawq_dfs_url */
+			               // 1 + strlen(filespace_name) + /* '/' + filespace name */
+			               1 + strlen(tablespace_name) +   /* '/' + tablespace name */
+			               1 + strlen(database_name) +     /* '/' + database name */
+			               1 + strlen(schema_name) +       /* '/' + schema name */
+			               1 + strlen(table_name) + 1;     /* '/' + table name + '\0' */
+
+			char *path;
+
+			if (createExtStmt->parentPath == NULL) {
+				path = (char *)palloc(sizeof(char) * location_len);
+				sprintf(path, "%s%s/%s/%s/%s/%s",
+						PROTOCOL_HDFS,
+						dfs_url, /* filespace_name, */ tablespace_name,
+						database_name, schema_name, table_name);
+			}
+			else {
+				path = (char *)palloc(sizeof(char) *
+						(location_len + strlen(createExtStmt->parentPath)+1));
+				sprintf(path, "%s%s/%s/%s/%s/%s/%s",
+						PROTOCOL_HDFS,
+						dfs_url, /* filespace_name, */ tablespace_name,
+						database_name, schema_name,
+						createExtStmt->parentPath, table_name);
+			}
+
+			exttypeDesc->location_list = list_make1((Node *) makeString(path));
+		}
+
+		/* Check the location to extract protocol */
+		ListCell   *first_uri = list_head(exttypeDesc->location_list);
+		Value		*v = lfirst(first_uri);
+		char		*uri_str = pstrdup(v->val.str);
+		Uri			*uri = ParseExternalTableUri(uri_str);
+		bool 		isHdfs = is_hdfs_protocol(uri);
+
+		pfree(uri_str);
+		FreeExternalTableUri(uri);
+
+		if (isHdfs)
+		{
+			/* We have an HDFS external protocol */
+			isExternalHdfs = true;
+		}
+
+	}
+
+	if (isExternalHdfs)
+	{
+		if (list_length(exttypeDesc->location_list)!= 1)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+							errmsg("Only support 1 external HDFS location. "
+									"Now input %d location(s)",
+									list_length(exttypeDesc->location_list))));
+	}
+
+
+	if (isExternalHdfs)
+	{
+		/*
+		 * We force any specified formatter to be custom.
+		 */
+		if (strcmp(createExtStmt->format, "custom") != 0)
+		{
+			/* There should be no "formatter" option which we will add now */
+			ListCell *cell = NULL;
+			foreach(cell, createExtStmt->base.options)
+			{
+				DefElem *de = (DefElem *)lfirst(cell);
+				if (strcmp(de->defname, "formatter") == 0)
+				{
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("Invalid FORMATTER option. "
+									"Should not specify formatter option")));
+				}
+			}
+
+			formattername = str_tolower(createExtStmt->format,
+			                            strlen(createExtStmt->format));
+
+			createExtStmt->base.options =
+			        lappend(createExtStmt->base.options,
+			                makeDefElem("formatter",
+			                            (Node *)makeString(pstrdup(formattername))));
+			pfree(createExtStmt->format);
+			createExtStmt->format = pstrdup("custom");
+		}
+
+		/*
+		 * Add table category (EXTERNAL or INTERNAL) in format options
+		 */
+		const char *kind = createExtStmt->isexternal ? "external" : "internal";
+		createExtStmt->base.options =
+				lappend(createExtStmt->base.options,
+				        makeDefElem("category", (Node *)makeString(kind)));
+	}
+
 	bool isCustom = false;
 	switch(exttypeDesc->exttabletype)
 	{
@@ -966,11 +1158,11 @@
 
 			/* Parse and validate URI strings (LOCATION clause) */
 			locationUris = transformLocationUris(exttypeDesc->location_list,
-									 createExtStmt->formatOpts,
-									 isweb, iswritable,&isCustom);
+									 createExtStmt->base.options,
+									 isweb, iswritable, forceCreateDir, &isCustom);
 			if(!isCustom){
 				int locLength = list_length(exttypeDesc->location_list);
-				if (createStmt->policy && locLength > 0 && locLength > createStmt->policy->bucketnum)
+				if (createStmt->policy && locLength >= 0 && locLength > createStmt->policy->bucketnum)
 				{
 					createStmt->policy->bucketnum = locLength;
 				}
@@ -1007,11 +1199,11 @@
 	 * - Always allow if superuser.
 	 * - Never allow EXECUTE or 'file' exttables if not superuser.
 	 * - Allow http, gpfdist or gpfdists tables if pg_auth has the right permissions
-	 *   for this role and for this type of table, or if gp_external_grant_privileges 
+	 *   for this role and for this type of table, or if gp_external_grant_privileges
 	 *   is on (gp_external_grant_privileges should be deprecated at some point).
 	 */
 	if(!superuser() && Gp_role == GP_ROLE_DISPATCH)
-	{		
+	{
 		if(exttypeDesc->exttabletype == EXTTBL_TYPE_EXECUTE)
 		{
 			ereport(ERROR,
@@ -1021,12 +1213,19 @@
 		}
 		else
 		{
-			ListCell   *first_uri = list_head(exttypeDesc->location_list);
-			Value		*v = lfirst(first_uri);
-			char		*uri_str = pstrdup(v->val.str);
-			Uri			*uri = ParseExternalTableUri(uri_str);
+			ListCell *first_uri;
+			Value *v;
+			char *uri_str;
+			Uri *uri;
 
-			Assert(exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION);
+			if(exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION)
+			{
+				first_uri = list_head(exttypeDesc->location_list);
+				v = lfirst(first_uri);
+				uri_str = pstrdup(v->val.str);
+				uri = ParseExternalTableUri(uri_str);
+			}
+
 
 			if(uri->protocol == URI_FILE)
 			{
@@ -1042,7 +1241,7 @@
 				 * permissions in pg_auth for creating this table.
 				 */
 
-				bool		 isnull;				
+				bool		 isnull;
 				Oid			 userid = GetUserId();
 				cqContext	*pcqCtx;
 				HeapTuple	 tuple;
@@ -1055,16 +1254,16 @@
 							 ObjectIdGetDatum(userid)));
 
 				tuple = caql_getnext(pcqCtx);
-								
+
 				if (!HeapTupleIsValid(tuple))
 					ereport(ERROR,
 							(errcode(ERRCODE_UNDEFINED_OBJECT),
-							 errmsg("role \"%s\" does not exist (in DefineExternalRelation)", 
+							 errmsg("role \"%s\" does not exist (in DefineExternalRelation)",
 									 GetUserNameFromId(userid))));
 
 				if ( (uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && iswritable)
 				{
-					Datum 	d_wextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreatewextgpfd, 
+					Datum 	d_wextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreatewextgpfd,
 													  &isnull);
 					bool	createwextgpfd = (isnull ? false : DatumGetBool(d_wextgpfd));
 
@@ -1076,7 +1275,7 @@
 				}
 				else if ( (uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && !iswritable)
 				{
-					Datum 	d_rextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterextgpfd, 
+					Datum 	d_rextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterextgpfd,
 													  &isnull);
 					bool	createrextgpfd = (isnull ? false : DatumGetBool(d_rextgpfd));
 
@@ -1089,7 +1288,7 @@
 				}
 				else if (uri->protocol == URI_HTTP && !iswritable)
 				{
-					Datum 	d_exthttp = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterexthttp, 
+					Datum 	d_exthttp = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterexthttp,
 													 &isnull);
 					bool	createrexthttp = (isnull ? false : DatumGetBool(d_exthttp));
 
@@ -1105,27 +1304,56 @@
 					char*		protname = uri->customprotocol;
 					Oid			ptcId = LookupExtProtocolOid(protname, false);
 					AclResult	aclresult;
-					
+
 					/* Check we have the right permissions on this protocol */
 					if (!pg_extprotocol_ownercheck(ptcId, ownerId))
-					{	
+					{
 						AclMode mode = (iswritable ? ACL_INSERT : ACL_SELECT);
-						
+
 						aclresult = pg_extprotocol_aclcheck(ptcId, ownerId, mode);
-						
+
 						if (aclresult != ACLCHECK_OK)
 							aclcheck_error(aclresult, ACL_KIND_EXTPROTOCOL, protname);
 					}
 				}
+				/* magma follow the same ack check as HDFS */
+				else if (uri->protocol == URI_HDFS )
+				{
+					Datum d_wexthdfs = caql_getattr(pcqCtx, Anum_pg_authid_rolcreatewexthdfs, &isnull);
+					bool createwexthdfs = (isnull ? false : DatumGetBool(d_wexthdfs));
+					if (iswritable) {
+						if (!createwexthdfs)
+						{
+							ereport(ERROR,
+									(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+									errmsg("permission denied: no privilege to create a writable hdfs external table"),
+											errOmitLocation(true)));
+						}
+					}
+					else
+					{
+						/* A role that can create writable external hdfs table can always
+						 * create readable external hdfs table.*/
+						Datum d_rexthdfs = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterexthdfs, &isnull);
+						bool createrexthdfs = (isnull ? false : DatumGetBool(d_rexthdfs));
+						if (!createrexthdfs && !createwexthdfs)
+						{
+							ereport(ERROR,
+								(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+								errmsg("permission denied: no privilege to create a readable hdfs external table"),
+										errOmitLocation(true)));
+						}
+					}
+				}
 				else
 				{
 					ereport(ERROR,
 							(errcode(ERRCODE_INTERNAL_ERROR),
 							errmsg("internal error in DefineExternalRelation. "
-								   "protocol is %d, writable is %d", 
+								   "protocol is %d, writable is %d",
 								   uri->protocol, iswritable)));
 				}
-				
+
 				caql_endscan(pcqCtx);
 			}
 			FreeExternalTableUri(uri);
@@ -1134,24 +1362,12 @@
 	}
 
 	/*
-	 * Parse and validate FORMAT clause.
-	 */
-	formattype = transformFormatType(createExtStmt->format);
-	
-	formatOptStr = transformFormatOpts(formattype,
-									   createExtStmt->formatOpts,
-									   list_length(createExtStmt->tableElts),
-									   iswritable);
-
-	/*
 	 * Parse single row error handling info if available
 	 */
 	singlerowerrorDesc = (SingleRowErrorDesc *)createExtStmt->sreh;
 
 	if(singlerowerrorDesc)
 	{
-		Assert(!iswritable);
-		
 		issreh = true;
 
 		/* get reject limit, and reject limit type */
@@ -1175,6 +1391,42 @@
 	}
 
 	/*
+	 * Parse and validate FORMAT clause.
+	 *
+	 * We force formatter as 'custom' if it is external hdfs protocol
+	 */
+	formattype = (isExternalHdfs) ?
+	             'b' : transformFormatType(createExtStmt->format);
+
+	if ((formattype == 'b') )
+	{
+		Oid procOid = InvalidOid;
+
+		procOid = LookupPlugStorageValidatorFunc(formattername, "validate_interfaces");
+
+		if (OidIsValid(procOid))
+		{
+			InvokePlugStorageValidationFormatInterfaces(procOid, formattername);
+		}
+		else
+		{
+			ereport(ERROR,
+			        (errcode(ERRCODE_SYNTAX_ERROR),
+			        errmsg("unsupported format '%s' in pluggable storage", formattername),
+			        errhint("make sure format is installed"),
+			        errOmitLocation(true)));
+		}
+	}
+
+	formatOptStr = transformFormatOpts(formattype,
+	                                   createExtStmt->format,
+	                                   formattername,
+	                                   createExtStmt->base.options,
+	                                   list_length(createExtStmt->base.tableElts),
+	                                   iswritable,
+	                                   createExtStmt->policy);
+
+	/*
 	 * Parse external table data encoding
 	 */
 	foreach(option, createExtStmt->encoding)
@@ -1199,6 +1451,13 @@
 		{
 			encoding = intVal(dencoding->arg);
 			encoding_name = pg_encoding_to_char(encoding);
+
+			/* custom format */
+			if (formattername)
+			{
+				checkCustomFormatEncoding(formattername, encoding_name);
+			}
+
 			if (strcmp(encoding_name, "") == 0 ||
 				pg_valid_client_encoding(encoding_name) < 0)
 				ereport(ERROR,
@@ -1210,6 +1469,14 @@
 		else if (IsA(dencoding->arg, String))
 		{
 			encoding_name = strVal(dencoding->arg);
+
+			if (formattername)
+			{
+				checkCustomFormatEncoding(formattername, encoding_name);
+			}
+
+			/* custom format */
+
 			if (pg_valid_client_encoding(encoding_name) < 0)
 				ereport(ERROR,
 						(errcode(ERRCODE_UNDEFINED_OBJECT),
@@ -1227,7 +1494,6 @@
 	if (encoding < 0)
 		encoding = pg_get_client_encoding();
 
-
     /*
 	 * First, create the pg_class and other regular relation catalog entries.
 	 * Under the covers this will dispatch a CREATE TABLE statement to all the
@@ -1235,7 +1501,8 @@
 	 */
 	Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY);
 
-	reloid = DefineRelation(createStmt, RELKIND_RELATION, RELSTORAGE_EXTERNAL);
+	reloid = DefineRelation(createStmt, RELKIND_RELATION,
+	                        RELSTORAGE_EXTERNAL, formattername);
 
 	/*
 	 * Now we take care of pg_exttable and dependency with error table (if any).
@@ -1250,7 +1517,7 @@
 	if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
 		Assert(reloid != InvalidOid);
 	else
-		reloid = RangeVarGetRelid(createExtStmt->relation, true, false /*allowHcatalog*/);
+		reloid = RangeVarGetRelid(createExtStmt->base.relation, true, false /*allowHcatalog*/);
 
 	/*
 	 * create a pg_exttable entry for this external table.
@@ -1278,7 +1545,6 @@
 		referenced;
 		Oid		fmtErrTblOid = RangeVarGetRelid(((SingleRowErrorDesc *)createExtStmt->sreh)->errtable, true, false /*allowHcatalog*/);
 
-		Assert(!createExtStmt->iswritable);
 		Assert(fmtErrTblOid != InvalidOid);
 
 		myself.classId = RelationRelationId;
@@ -1289,6 +1555,16 @@
 		referenced.objectSubId = 0;
 		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
 	}
+
+	/*
+	 * Add primary key constraint in pg_constraint for custom external table.
+	 * We don't add primary key index in pg_index for now.
+	 */
+
+	if (formattername)
+	{
+		pfree(formattername);
+	}
 }
 
 extern void
@@ -1303,13 +1579,13 @@
 	 * now set the parameters for keys/inheritance etc. Most of these are
 	 * uninteresting for external relations...
 	 */
-	createStmt->relation = createForeignStmt->relation;
-	createStmt->tableElts = createForeignStmt->tableElts;
-	createStmt->inhRelations = NIL;
-	createStmt->constraints = NIL;
-	createStmt->options = NIL;
-	createStmt->oncommit = ONCOMMIT_NOOP;
-	createStmt->tablespacename = NULL;
+	createStmt->base.relation = createForeignStmt->relation;
+	createStmt->base.tableElts = createForeignStmt->tableElts;
+	createStmt->base.inhRelations = NIL;
+	createStmt->base.constraints = NIL;
+	createStmt->base.options = NIL;
+	createStmt->base.oncommit = ONCOMMIT_NOOP;
+	createStmt->base.tablespacename = NULL;
 	createStmt->policy = NULL; /* for now, we use "master only" type of distribution */
 	
 	/* (permissions are checked in foreigncmd.c:InsertForeignTableEntry() ) */
@@ -1320,7 +1596,7 @@
 	 * QEs.
 	 */
 	if(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
-		reloid = DefineRelation(createStmt, RELKIND_RELATION, RELSTORAGE_FOREIGN);
+		reloid = DefineRelation(createStmt, RELKIND_RELATION, RELSTORAGE_FOREIGN, NonCustomFormatType);
 
 	/*
 	 * Now we take care of pg_foreign_table
@@ -1389,12 +1665,12 @@
 DefinePartitionedRelation(CreateStmt *stmt, Oid relOid)
 {
 
-	if (stmt->postCreate)
+	if (stmt->base.postCreate)
 	{
 		List *pQry;
 		Node *pUtl;
 		DestReceiver *dest = None_Receiver;
-		List *pL1 = (List *)stmt->postCreate;
+		List *pL1 = (List *)stmt->base.postCreate;
 
 		pQry = parse_analyze(linitial(pL1), NULL, NULL, 0);
 
@@ -12724,18 +13000,18 @@
 		char *dstr = "__gp_atsdb_droppedcol";
 		DestReceiver *dest = None_Receiver;
 
-		cs->relKind = RELKIND_RELATION;
-		cs->distributedBy = distro;
-		cs->relation = tmpname;
+		cs->base.relKind = RELKIND_RELATION;
+		cs->base.distributedBy = distro;
+		cs->base.relation = tmpname;
 		cs->ownerid = rel->rd_rel->relowner;
-		cs->tablespacename = get_tablespace_name(rel->rd_rel->reltablespace);
+		cs->base.tablespacename = get_tablespace_name(rel->rd_rel->reltablespace);
 		cs->buildAoBlkdir = false;
 
 		if (isTmpTableAo &&
 			rel->rd_rel->relhasindex)
 			cs->buildAoBlkdir = true;
 
-		cs->options = opts;
+		cs->base.options = opts;
 
 		for (attno = 0; attno < tupdesc->natts; attno++)
 		{
@@ -12795,7 +13071,7 @@
 
 			tname->location = -1;
 			cd->typname = tname;
-			cs->tableElts = lappend(cs->tableElts, cd);
+			cs->base.tableElts = lappend(cs->base.tableElts, cd);
 		}
 		parsetrees = parse_analyze((Node *)cs, NULL, NULL, 0);
 		Assert(list_length(parsetrees) == 1);
@@ -16057,14 +16333,14 @@
         inh->options = list_make3_int(CREATE_TABLE_LIKE_INCLUDING_DEFAULTS,
 									  CREATE_TABLE_LIKE_INCLUDING_CONSTRAINTS,
 									  CREATE_TABLE_LIKE_INCLUDING_INDEXES);
-		ct->tableElts = list_make1(inh);
-		ct->distributedBy = list_copy(distro); /* must preserve the list for later */
+		ct->base.tableElts = list_make1(inh);
+		ct->base.distributedBy = list_copy(distro); /* must preserve the list for later */
 
 		/* should be unique enough */
 		snprintf(tmpname, NAMEDATALEN, "pg_temp_%u", relid);
 		tmprv = makeRangeVar(NULL /*catalogname*/, nspname, tmpname, -1);
-		ct->relation = tmprv;
-		ct->relKind = RELKIND_RELATION;
+		ct->base.relation = tmprv;
+		ct->base.relKind = RELKIND_RELATION;
 		ct->ownerid = rel->rd_rel->relowner;
 		ct->is_split_part = true;
 		parsetrees = parse_analyze((Node *)ct, NULL, NULL, 0);
@@ -16273,8 +16549,8 @@
 					parname = pstrdup(prule->topRule->parname);
 			}
 
-			mycs->relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
-			mycs->relKind = RELKIND_RELATION;
+			mycs->base.relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
+			mycs->base.relKind = RELKIND_RELATION;
 
 			/*
 			 * If the new partition is column oriented, initialize the column
@@ -16286,9 +16562,9 @@
 			 * analysis.
 			 */
 			if (colencs)
-				mycs->tableElts = list_copy(colencs);
+				mycs->base.tableElts = list_copy(colencs);
 
-			mycs->options = orient;
+			mycs->base.options = orient;
 
 			mypid->idtype = AT_AP_IDNone;
 			mypid->location = -1;
@@ -17452,25 +17728,29 @@
  * The result is a text array but we declare it as Datum to avoid
  * including array.h in analyze.h.
  */
-static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool iswritable, bool* isCustom)
+static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool iswritable,
+		bool forceCreateDir, bool* isCustom)
 {
 	ListCell   *cell;
 	ArrayBuildState *astate;
 	Datum		result;
 	UriProtocol first_protocol = URI_FILE; /* initialize to keep gcc quiet */
 	bool		first_uri = true;
+	char* first_customprotocal;
 	
 #define FDIST_DEF_PORT 8080
 
-	/* Parser should not let this happen */
-	Assert(locs != NIL);
+	Uri			*uri;
+	text		*t;
+	char		*uri_str_orig;
+	char		*uri_str_final;
+	Size		len;
+	Value		*v;
 
 	/* We build new array using accumArrayResult */
 	astate = NULL;
 
-	/*
-	 * first, check for duplicate URI entries
-	 */
+	/* first, check for duplicate URI entries */
 	foreach(cell, locs)
 	{
 		Value		*v1 = lfirst(cell);
@@ -17491,17 +17771,10 @@
 		}
 	}
 
-	/*
-	 * iterate through the user supplied URI list from LOCATION clause.
-	 */
+	/* iterate through the user supplied URI list from LOCATION clause. */
 	foreach(cell, locs)
 	{
-		Uri			*uri;
-		text		*t;
-		char		*uri_str_orig;
-		char		*uri_str_final;
-		Size		len;
-		Value		*v = lfirst(cell);
+		v = lfirst(cell);
 		
 		/* get the current URI string from the command */
 		uri_str_orig = pstrdup(v->val.str);
@@ -17528,6 +17801,7 @@
 	        errOmitLocation(true)));
 		}
 
+
 		/* no port was specified for gpfdist, gpfdists or hdfs. add the default */
 		if ((uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && uri->port == -1)
 		{
@@ -17561,7 +17835,6 @@
 		if (first_uri && uri->protocol == URI_CUSTOM)
 		{
 			Oid		procOid = InvalidOid;
-			
 			procOid = LookupExtProtocolFunction(uri->customprotocol, 
 												EXTPTC_FUNC_VALIDATOR, 
 												false);
@@ -17569,10 +17842,25 @@
 			if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
 				InvokeProtocolValidation(procOid, 
 										 uri->customprotocol, 
-										 iswritable, 
+										 iswritable, forceCreateDir,
 										 locs, fmtopts);
+			first_customprotocal = uri->customprotocol;
 		}
 		
+		if (first_uri && uri->protocol == URI_HDFS)
+		{
+			Oid		procOid = InvalidOid;
+
+			procOid = LookupCustomProtocolValidatorFunc("hdfs");
+			if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
+			{
+				InvokeProtocolValidation(procOid,
+										 uri->customprotocol,
+										 iswritable, forceCreateDir,
+										 locs, fmtopts);
+			}
+		}
+
 		if(first_uri)
 		{
 		    first_protocol = uri->protocol;
@@ -17580,10 +17868,9 @@
 		    if(uri->protocol == URI_CUSTOM){
 		    		*isCustom = true;
 		    }
-		} 
-		    	
+		}
 
-		if(uri->protocol != first_protocol)
+		if(uri->protocol != first_protocol || (first_protocol == URI_CUSTOM && strcmp(first_customprotocal, uri->customprotocol) != 0))
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -17622,7 +17909,9 @@
 							"\'%s\'", uri_str_final),
 					errhint("Specify the explicit path and file name to write into.")));
 		
-		if ((uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && iswritable && uri->path[strlen(uri->path) - 1] == '/')
+		if ((uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) &&
+			iswritable &&
+			uri->path[strlen(uri->path) - 1] == '/')
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					errmsg("Unsupported use of a directory name in a writable gpfdist(s) external table : "
@@ -17652,7 +17941,47 @@
 		result = (Datum) 0;
 
 	return result;
+}
 
+static Oid
+LookupCustomProtocolValidatorFunc(char *protoname)
+{
+	List*	funcname 	= NIL;
+	Oid		procOid		= InvalidOid;
+	Oid		argList[1];
+	Oid		returnOid;
+
+	elog(LOG, "find validator func for %s", protoname);
+
+	char*   new_func_name = (char *)palloc0(strlen(protoname) + 16);
+	sprintf(new_func_name, "%s_validate", protoname);
+	funcname = lappend(funcname, makeString(new_func_name));
+	returnOid = VOIDOID;
+	procOid = LookupFuncName(funcname, 0, argList, true);
+
+	if (!OidIsValid(procOid))
+		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
+						errmsg("protocol function %s was not found.",
+								new_func_name),
+						errhint("Create it with CREATE FUNCTION."),
+						errOmitLocation(true)));
+
+	/* check return type matches */
+	if (get_func_rettype(procOid) != returnOid)
+		ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						errmsg("protocol function %s has an incorrect return type",
+								new_func_name),
+						errOmitLocation(true)));
+
+	/* check allowed volatility */
+	if (func_volatile(procOid) != PROVOLATILE_STABLE)
+		ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+				 	 	errmsg("protocol function %s is not declared STABLE.",
+						new_func_name),
+						errOmitLocation(true)));
+	pfree(new_func_name);
+
+	return procOid;
 }
 
 static Datum transformExecOnClause(List	*on_clause, int *preferred_segment_num, bool iswritable)
@@ -17820,12 +18149,207 @@
 
 
 /*
+ * Check if values for options of custom external table are valid
+ */
+static void checkCustomFormatOptString(char **opt,
+                                       const char *key,
+                                       const char *val,
+                                       const bool needopt,
+                                       const int nvalidvals,
+                                       const char **validvals)
+{
+	Assert(opt);
+
+	/* check if need to check option */
+	if (!needopt)
+	{
+		ereport(ERROR,
+		        (errcode(ERRCODE_SYNTAX_ERROR),
+		        errmsg("redundant option %s", key),
+		        errOmitLocation(true)));
+	}
+
+	/* check if option is redundant */
+	if (*opt)
+	{
+		ereport(ERROR,
+		        (errcode(ERRCODE_SYNTAX_ERROR),
+		        errmsg("conflicting or redundant options \"%s\"", key),
+		        errOmitLocation(true)));
+	}
+
+	*opt = val;
+
+	/* check if value for option is valid */
+	bool valid = false;
+	for (int i = 0; i < nvalidvals; i++)
+	{
+		if (strncasecmp(*opt, validvals[i], strlen(validvals[i])) == 0)
+		{
+			valid = true;
+		}
+	}
+
+	if (!valid)
+	{
+		ereport(ERROR,
+		        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+		        errmsg("invalid value for option %s: \"%s\"", key, val),
+		        errOmitLocation(true)));
+	}
+}
+/*
+ * Check if given encoding for custom external table is supported.
+ *
+ * For custom external table, the valid format type kinds are:
+ * 1) 't' for TEXT
+ * 2) 'c' for CSV
+ * 3) 'b' for pluggable format, i.e., ORC which currently support UTF8 encoding.
+ */
+static void checkCustomFormatEncoding(const char *formatterName,
+                                      const char *encodingName)
+{
+	Oid	procOid = InvalidOid;
+
+	procOid = LookupPlugStorageValidatorFunc(formatterName, "validate_encodings");
+
+	if (OidIsValid(procOid))
+	{
+		InvokePlugStorageValidationFormatEncodings(procOid, encodingName);
+	}
+}
+
+/*
+ * Check if options for custom external table are valid
+ */
+static char *checkCustomFormatOptions(const char *formattername,
+                                     List *formatOpts,
+                                     const bool isWritable,
+                                     const char *delimiter)
+{
+
+	/* General options for custom external table */
+	const int	maxlen = 8 * 1024 - 1;
+	char		*format_str = NULL;
+	format_str = (char *) palloc0(maxlen + 1);
+
+	/* built-in TEXT, CSV format */
+	if (pg_strncasecmp(formattername, "text", strlen("text")) == 0 ||
+	    pg_strncasecmp(formattername, "csv", strlen("csv")) == 0)
+	{
+		char			*formatter = NULL;
+		ListCell		*option;
+		int				len = 0;
+
+		StringInfoData	key_modified;
+		initStringInfo(&key_modified);
+
+		foreach(option, formatOpts)
+		{
+			DefElem    *defel = (DefElem *) lfirst(option);
+			char	   *key = defel->defname;
+			bool need_free_value = false;
+			char	   *val = defGetString(defel, &need_free_value);
+
+			/* check formatter */
+			if (strncasecmp(key, "formatter", strlen("formatter")) == 0)
+			{
+				char *formatterValues[] = {"text", "csv"};
+				checkCustomFormatOptString(&formatter, key, val, true, 2, formatterValues);
+			}
+
+			/* check option for text/csv format */
+
+			/* MPP-14467 - replace any space chars with meta char */
+			resetStringInfo(&key_modified);
+			appendStringInfoString(&key_modified, key);
+			replaceStringInfoString(&key_modified, " ", "<gpx20>");
+
+			sprintf((char *) format_str + len, "%s '%s' ", key_modified.data, val);
+			len += strlen(key_modified.data) + strlen(val) + 4;
+
+			if (need_free_value)
+			{
+				pfree(val);
+				val = NULL;
+			}
+
+			AssertImply(need_free_value, NULL == val);
+
+			if (len > maxlen)
+			{
+				ereport(ERROR,
+				        (errcode(ERRCODE_SYNTAX_ERROR),
+				        errmsg("format options must be less than %d bytes in size", maxlen),
+				        errOmitLocation(true)));
+			}
+		}
+
+		if(!formatter)
+		{
+			ereport(ERROR,
+			        (errcode(ERRCODE_SYNTAX_ERROR),
+			        errmsg("no formatter function specified"),
+			        errOmitLocation(true)));
+		}
+		else if (strncasecmp(formatter, "text", strlen("text")) == 0)
+		{
+			if (delimiter &&
+			    strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789", delimiter[0]) != NULL)
+			{
+				ereport(ERROR,
+				        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				        errmsg("delimiter cannot be \"%s\"", delimiter)));
+			}
+		}
+	}
+	/* pluggable format */
+	else
+	{
+		Oid	procOid = InvalidOid;
+
+		procOid = LookupPlugStorageValidatorFunc(formattername, "validate_options");
+
+		if (OidIsValid(procOid))
+		{
+			InvokePlugStorageValidationFormatOptions(procOid, formatOpts,
+			                                         format_str, isWritable);
+		}
+	}
+
+	return format_str;
+}
+/*
+ * Check if the data types for custom external table are valid
+ *
+ * Currently supported data types for ORC external tables are:
+ * SMALLINT, INT, BIGINT, REAL, FLOAT, DOUBLE PRECISION, BOOL, VARCHAR, or TEXT
+ *
+ * Need to revise this if more data types are supported for ORC external table.
+ */
+static void checkCustomFormatDateTypes(const char *formatterName,
+                                       TupleDesc tupleDesc)
+{
+	if (formatterName == NULL)
+		return;
+
+	Oid	procOid = InvalidOid;
+
+	procOid = LookupPlugStorageValidatorFunc(formatterName, "validate_datatypes");
+
+	if (OidIsValid(procOid))
+	{
+		InvokePlugStorageValidationFormatDataTypes(procOid, tupleDesc);
+	}
+}
+
+/*
  * Transform the FORMAT options into a text field. Parse the
  * options and validate them for their respective format type.
  *
  * The result is a text field that includes the format string.
  */
-static Datum transformFormatOpts(char formattype, List *formatOpts, int numcols, bool iswritable)
+static Datum transformFormatOpts(char formattype, char *formatname, char *formattername, List *formatOpts, int numcols, bool iswritable, GpPolicy *policy)
 {
 	ListCell   *option;
 	Datum		result;
@@ -17836,6 +18360,8 @@
 	char *escape = NULL;
 	char *eol_str = NULL;
 	char *formatter = NULL;
+	char *compresstype = NULL;
+	char *rlecoder = NULL;
 	bool header_line = false;
 	bool fill_missing = false;
 	List *force_notnull = NIL;
@@ -17852,7 +18378,7 @@
 	{
 		foreach(option, formatOpts)
 		{
-			DefElem    *defel = (DefElem *) lfirst(option);
+			DefElem    *defel = (DefElem *)lfirst(option);
 
 			if (strcmp(defel->defname, "delimiter") == 0)
 			{
@@ -17949,9 +18475,9 @@
 					 defel->defname);
 		}
 
-		/*
-		 * Set defaults
-		 */
+
+		 /* Set defaults */
+
 		if (!delim)
 			delim = fmttype_is_csv(formattype) ? "," : "\t";
 
@@ -17967,7 +18493,7 @@
 		}
 
 		if (!fmttype_is_csv(formattype) && !escape)
-			escape = "\\";			/* default escape for text mode */
+			escape = "\\";		/* default escape for text mode */
 
 		/*
 		 * re-construct the FORCE NOT NULL list string.
@@ -17991,9 +18517,9 @@
 			}
 		}
 
-		/*
-		 * re-construct the FORCE QUOTE list string.
-		 */
+
+		 /* re-construct the FORCE QUOTE list string. */
+
 		if(force_quote)
 		{
 			ListCell   *l;
@@ -18041,11 +18567,11 @@
 			  1 +									/* space					*/
 			  4 + 1 + 1 + strlen(null_print) + 1 +	/* "null 'str'"				*/
 			  1 +									/* space					*/
-			  6 + 1 + 1 + strlen(escape) + 1;		/* "escape 'c' or 'off' 	*/
+			  6 + 1 + 1 + strlen(escape) + 1;		/* "escape 'c' or 'off'		*/
 	
 		if (fmttype_is_csv(formattype))
-			len +=	1 +								/* space					*/
-					5 + 1 + 3;						/* "quote 'x'"				*/
+			len +=	1 +								/* space */
+					5 + 1 + 3;						/* "quote 'x'" */
 	
 		len += header_line ? strlen(" header") : 0;
 		len += fill_missing ? strlen(" fill missing fields") : 0;
@@ -18076,68 +18602,31 @@
 			/* should never happen */
 			Assert(false);
 		}
-
 	}
 	else
 	{
-		/* custom format */
-		
-		int 		len = 0;
-		const int	maxlen = 8 * 1024 - 1;
-		StringInfoData key_modified;
-		
-		initStringInfo(&key_modified);
-		
-		format_str = (char *) palloc0(maxlen + 1);
-		
-		foreach(option, formatOpts)
+		/* custom/parquet/orc format */
+		format_str = checkCustomFormatOptions(formattername, formatOpts, iswritable, delim);
+
+		/* set default hash key */
+		ListCell *opt;
+		foreach(opt, formatOpts)
 		{
-			DefElem    *defel = (DefElem *) lfirst(option);
-			char	   *key = defel->defname;
+			DefElem *defel = (DefElem *) lfirst(opt);
+			char *key = defel->defname;
 			bool need_free_value = false;
-			char	   *val = defGetString(defel, &need_free_value);
-			
-			if (strcmp(key, "formatter") == 0)
+			char *val = (char *) defGetString(defel, &need_free_value);
+			if (strncasecmp(key, "bucketnum", strlen("bucketnum")) == 0)
 			{
-				if (formatter)
-					ereport(ERROR,
-							(errcode(ERRCODE_SYNTAX_ERROR),
-							 errmsg("conflicting or redundant options"),
-									   errOmitLocation(true)));
-					
-				formatter = strVal(defel->arg);
+				policy->bucketnum = atoi(val);
 			}
-			
-			/* MPP-14467 - replace any space chars with meta char */
-			resetStringInfo(&key_modified);
-			appendStringInfoString(&key_modified, key);
-			replaceStringInfoString(&key_modified, " ", "<gpx20>");
-			
-			sprintf((char *) format_str + len, "%s '%s' ", key_modified.data, val);
-			len += strlen(key_modified.data) + strlen(val) + 4;
-			
 			if (need_free_value)
 			{
 				pfree(val);
 				val = NULL;
 			}
-
-			AssertImply(need_free_value, NULL == val);
-
-			if (len > maxlen)
-				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("format options must be less than %d bytes in size", maxlen),
-								   errOmitLocation(true)));			
 		}
-		
-		if(!formatter)
-			ereport(ERROR,
-					(errcode(ERRCODE_SYNTAX_ERROR),
-					 errmsg("no formatter function specified"),
-							   errOmitLocation(true)));
 	}
-	
 
 	/* convert c string to text datum */
 	result = DirectFunctionCall1(textin, CStringGetDatum(format_str));
@@ -18340,7 +18829,8 @@
 }
 
 static void 
-InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, List *locs, List* fmtopts)
+InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, bool forceCreateDir,
+		List *locs, List* fmtopts)
 {
 	
 	ExtProtocolValidatorData   *validator_data;
@@ -18354,14 +18844,16 @@
 	validator_data->type 		= T_ExtProtocolValidatorData;
 	validator_data->url_list 	= locs;
 	validator_data->format_opts = fmtopts;
+	validator_data->forceCreateDir = forceCreateDir;
 	validator_data->errmsg		= NULL;
 	validator_data->direction 	= (iswritable ? EXT_VALIDATE_WRITE :
 											    EXT_VALIDATE_READ);
+	validator_data->action		= EXT_VALID_ACT_ARGUMENTS;
 	
 	InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, 
 							 /* FmgrInfo */ validator_udf, 
-							 /* nArgs */ 0, 
-							 /* Call Context */ (Node *) validator_data, 
+							 /* nArgs */ 0,
+							 /* Call Context */ (Node *) validator_data,
 							 /* ResultSetInfo */ NULL);
 	
 	/* invoke validator. if this function returns - validation passed */
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 6c745cc..d8ece33 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -42,6 +42,7 @@
 #include "catalog/pg_compression.h"
 #include "catalog/pg_constraint.h"
 #include "catalog/pg_depend.h"
+#include "catalog/pg_exttable.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_type.h"
 #include "catalog/pg_type_encoding.h"
@@ -1294,19 +1295,19 @@
 	 * now set the parameters for keys/inheritance etc. All of these are
 	 * uninteresting for composite types...
 	 */
-	createStmt->relation = (RangeVar *) typevar;
-	createStmt->tableElts = coldeflist;
-	createStmt->inhRelations = NIL;
-	createStmt->constraints = NIL;
-	createStmt->options = list_make1(defWithOids(false));
-	createStmt->oncommit = ONCOMMIT_NOOP;
-	createStmt->tablespacename = NULL;
+	createStmt->base.relation = (RangeVar *) typevar;
+	createStmt->base.tableElts = coldeflist;
+	createStmt->base.inhRelations = NIL;
+	createStmt->base.constraints = NIL;
+	createStmt->base.options = list_make1(defWithOids(false));
+	createStmt->base.oncommit = ONCOMMIT_NOOP;
+	createStmt->base.tablespacename = NULL;
 
 	/*
 	 * finally create the relation...
 	 */
-	return  DefineRelation(createStmt, RELKIND_COMPOSITE_TYPE, RELSTORAGE_VIRTUAL);
-
+	return  DefineRelation(createStmt, RELKIND_COMPOSITE_TYPE,
+		                       RELSTORAGE_VIRTUAL, NonCustomFormatType);
 	/*
 	 * DefineRelation already dispatches this.
 	 *
diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c
index a132adb..f8fc301 100644
--- a/src/backend/commands/user.c
+++ b/src/backend/commands/user.c
@@ -2271,7 +2271,8 @@
 		if(strcasecmp(val, "gpfdist") != 0 && 
 		   strcasecmp(val, "gpfdists") != 0 &&
 		   strcasecmp(val, "http") != 0 &&
-		   strcasecmp(val, "gphdfs") != 0)
+		   strcasecmp(val, "gphdfs") != 0&&
+		   strcasecmp(val, "hdfs") != 0)
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
 					 errmsg("invalid %s value \"%s\"", key, val)));
@@ -2303,7 +2304,7 @@
 	const int	numvals = 6;
 	const char *keys[] = { "type", "protocol"};	 /* order matters for validation. don't change! */
 	const char *vals[] = { /* types     */ "readable", "writable", 
-						   /* protocols */ "gpfdist", "gpfdists" , "http", "gphdfs"};
+						   /* protocols */ "gpfdist", "gpfdists" , "http", "gphdfs", "hdfs"};
 
 	if(list_length(l) > 2)
 		ereport(ERROR,
@@ -2450,6 +2451,19 @@
 					createwexthdfs_specified = true;
 				}
 			}
+			else if(strcasecmp(extauth->protocol, "hdfs") == 0)
+			{
+				if(strcasecmp(extauth->type, "readable") == 0)
+				{
+					*createrexthdfs = true;
+					createrexthdfs_specified = true;
+				}
+				else
+				{
+					*createwexthdfs = true;
+					createwexthdfs_specified = true;
+				}
+			}
 			else /* http */
 			{
 				if(strcasecmp(extauth->type, "readable") == 0)
@@ -2518,6 +2532,23 @@
 					*createwexthdfs = false;
 				}
 			}
+			else if(strcasecmp(extauth->protocol, "hdfs") == 0)
+			{
+				if(strcasecmp(extauth->type, "readable") == 0)
+				{
+					if(createrexthdfs_specified)
+						conflict = true;
+
+					*createrexthdfs = false;
+				}
+				else
+				{
+					if(createwexthdfs_specified)
+						conflict = true;
+
+					*createwexthdfs = false;
+				}
+			}
 			else /* http */
 			{
 				if(strcasecmp(extauth->type, "readable") == 0)
diff --git a/src/backend/commands/view.c b/src/backend/commands/view.c
index 67af585..967e938 100644
--- a/src/backend/commands/view.c
+++ b/src/backend/commands/view.c
@@ -59,6 +59,7 @@
 #include "cdb/cdbvars.h"
 #include "cdb/cdbcat.h"
 
+#include "catalog/pg_exttable.h"
 
 static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
 static bool isViewOnTempTable_walker(Node *node, void *context);
@@ -244,21 +245,21 @@
 		 * now set the parameters for keys/inheritance etc. All of these are
 		 * uninteresting for views...
 		 */
-		createStmt->relation = (RangeVar *) relation;
-		createStmt->tableElts = attrList;
-		createStmt->inhRelations = NIL;
-		createStmt->constraints = NIL;
-		createStmt->options = list_make1(defWithOids(false));
-		createStmt->oncommit = ONCOMMIT_NOOP;
-		createStmt->tablespacename = NULL;
-		createStmt->relKind = RELKIND_VIEW;
+		createStmt->base.relation = (RangeVar *) relation;
+		createStmt->base.tableElts = attrList;
+		createStmt->base.inhRelations = NIL;
+		createStmt->base.constraints = NIL;
+		createStmt->base.options = list_make1(defWithOids(false));
+		createStmt->base.oncommit = ONCOMMIT_NOOP;
+		createStmt->base.tablespacename = NULL;
+		createStmt->base.relKind = RELKIND_VIEW;
 
 		/*
 		 * finally create the relation (this will error out if there's an
 		 * existing view, so we don't need more code to complain if "replace"
 		 * is false).
 		 */
-		newviewOid =  DefineRelation(createStmt, RELKIND_VIEW, RELSTORAGE_VIRTUAL);
+		newviewOid =  DefineRelation(createStmt, RELKIND_VIEW, RELSTORAGE_VIRTUAL, NonCustomFormatType);
 		if(comptypeOid)
 			*comptypeOid = createStmt->oidInfo.comptypeOid;
 		return newviewOid;
diff --git a/src/backend/executor/execDML.c b/src/backend/executor/execDML.c
index 23ba53d..db54aee 100644
--- a/src/backend/executor/execDML.c
+++ b/src/backend/executor/execDML.c
@@ -421,7 +421,7 @@
 			else if (formatterType != ExternalTableType_PLUG)
 			{
 				resultRelInfo->ri_extInsertDesc = external_insert_init(
-						resultRelationDesc, 0, formatterType, formatterName);
+						resultRelationDesc, 0, formatterType, formatterName, estate->es_plannedstmt);
 			}
 			else
 			{
@@ -435,11 +435,21 @@
 					FmgrInfo insertInitFunc;
 					fmgr_info(procOid, &insertInitFunc);
 
+
+					ResultRelSegFileInfo *segfileinfo = NULL;
+					ResultRelInfoSetSegFileInfo(resultRelInfo,
+							estate->es_result_segfileinfos);
+					segfileinfo = (ResultRelSegFileInfo *) list_nth(
+							resultRelInfo->ri_aosegfileinfos, GetQEIndex());
+
 					resultRelInfo->ri_extInsertDesc =
-					        InvokePlugStorageFormatInsertInit(&insertInitFunc,
-					                                          resultRelationDesc,
-					                                          formatterType,
-					                                          formatterName);
+
+					InvokePlugStorageFormatInsertInit(&insertInitFunc,
+													 resultRelationDesc,
+													 formatterType,
+													 formatterName,
+													 estate->es_plannedstmt,
+													 segfileinfo->segno);
 				}
 				else
 				{
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 28357a9..0cd0b6d 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2927,20 +2927,22 @@
 {
 	CreateStmt *newnode = makeNode(CreateStmt);
 
-	COPY_NODE_FIELD(relation);
-	COPY_NODE_FIELD(tableElts);
-	COPY_NODE_FIELD(inhRelations);
-	COPY_NODE_FIELD(constraints);
-	COPY_NODE_FIELD(options);
-	COPY_SCALAR_FIELD(oncommit);
-	COPY_STRING_FIELD(tablespacename);
-	COPY_NODE_FIELD(distributedBy);
+	COPY_SCALAR_FIELD(base.relKind);
+	COPY_NODE_FIELD(base.relation);
+	COPY_NODE_FIELD(base.tableElts);
+	COPY_NODE_FIELD(base.inhRelations);
+	COPY_NODE_FIELD(base.constraints);
+	COPY_NODE_FIELD(base.options);
+	COPY_SCALAR_FIELD(base.oncommit);
+	COPY_STRING_FIELD(base.tablespacename);
+	COPY_NODE_FIELD(base.distributedBy);
+	COPY_SCALAR_FIELD(base.is_part_child);
+	COPY_SCALAR_FIELD(base.is_add_part);
 	COPY_SCALAR_FIELD(oidInfo.relOid);
 	COPY_SCALAR_FIELD(oidInfo.comptypeOid);
 	COPY_SCALAR_FIELD(oidInfo.toastOid);
 	COPY_SCALAR_FIELD(oidInfo.toastIndexOid);
 	COPY_SCALAR_FIELD(oidInfo.toastComptypeOid);
-	COPY_SCALAR_FIELD(relKind);
 	COPY_SCALAR_FIELD(relStorage);
 	if (from->policy)
 	{
@@ -2950,8 +2952,6 @@
 		newnode->policy = NULL;
 	/* postCreate omitted (why?) */
 	COPY_NODE_FIELD(deferredStmts);
-	COPY_SCALAR_FIELD(is_part_child);
-	COPY_SCALAR_FIELD(is_add_part);
 	COPY_SCALAR_FIELD(ownerid);
 	COPY_SCALAR_FIELD(buildAoBlkdir);
 	COPY_NODE_FIELD(attr_encodings);
@@ -3133,16 +3133,26 @@
 {
 	CreateExternalStmt *newnode = makeNode(CreateExternalStmt);
 
-	COPY_NODE_FIELD(relation);
-	COPY_NODE_FIELD(tableElts);
+	COPY_SCALAR_FIELD(base.relKind);
+	COPY_NODE_FIELD(base.relation);
+	COPY_NODE_FIELD(base.tableElts);
+	COPY_NODE_FIELD(base.inhRelations);
+	COPY_NODE_FIELD(base.constraints);
+	COPY_NODE_FIELD(base.options);
+	COPY_SCALAR_FIELD(base.oncommit);
+	COPY_STRING_FIELD(base.tablespacename);
+	COPY_NODE_FIELD(base.distributedBy);
+	COPY_SCALAR_FIELD(base.is_part_child);
+	COPY_SCALAR_FIELD(base.is_add_part);
 	COPY_NODE_FIELD(exttypedesc);
 	COPY_STRING_FIELD(format);
-	COPY_NODE_FIELD(formatOpts);
 	COPY_SCALAR_FIELD(isweb);
 	COPY_SCALAR_FIELD(iswritable);
+	COPY_SCALAR_FIELD(isexternal);
+	COPY_SCALAR_FIELD(forceCreateDir);
+	COPY_STRING_FIELD(parentPath);
 	COPY_NODE_FIELD(sreh);
 	COPY_NODE_FIELD(encoding);
-	COPY_NODE_FIELD(distributedBy);	
 	if (from->policy)
 	{
 		COPY_POINTER_FIELD(policy,sizeof(GpPolicy) + from->policy->nattrs*sizeof(from->policy->attrs[0]));
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 5619450..5f20eeb 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1079,25 +1079,25 @@
 static bool
 _equalCreateStmt(CreateStmt *a, CreateStmt *b)
 {
-	COMPARE_NODE_FIELD(relation);
-	COMPARE_NODE_FIELD(tableElts);
-	COMPARE_NODE_FIELD(inhRelations);
-	COMPARE_NODE_FIELD(constraints);
-	COMPARE_NODE_FIELD(options);
-	COMPARE_SCALAR_FIELD(oncommit);
-	COMPARE_STRING_FIELD(tablespacename);
-	COMPARE_NODE_FIELD(distributedBy);
+	COMPARE_SCALAR_FIELD(base.relKind);
+	COMPARE_NODE_FIELD(base.relation);
+	COMPARE_NODE_FIELD(base.tableElts);
+	COMPARE_NODE_FIELD(base.inhRelations);
+	COMPARE_NODE_FIELD(base.constraints);
+	COMPARE_NODE_FIELD(base.options);
+	COMPARE_SCALAR_FIELD(base.oncommit);
+	COMPARE_STRING_FIELD(base.tablespacename);
+	COMPARE_NODE_FIELD(base.distributedBy);
+	COMPARE_SCALAR_FIELD(base.is_part_child);
+	COMPARE_SCALAR_FIELD(base.is_add_part);
 	COMPARE_SCALAR_FIELD(oidInfo.relOid);
 	COMPARE_SCALAR_FIELD(oidInfo.comptypeOid);
 	COMPARE_SCALAR_FIELD(oidInfo.toastOid);
 	COMPARE_SCALAR_FIELD(oidInfo.toastIndexOid);
-	COMPARE_SCALAR_FIELD(relKind);
 	COMPARE_SCALAR_FIELD(relStorage);
 	/* policy omitted */
 	/* postCreate omitted */
 	/* deferredStmts omitted */
-	COMPARE_SCALAR_FIELD(is_part_child);
-	COMPARE_SCALAR_FIELD(is_add_part);
 	COMPARE_SCALAR_FIELD(is_split_part);
 	COMPARE_SCALAR_FIELD(ownerid);
 	COMPARE_SCALAR_FIELD(buildAoBlkdir);
@@ -1141,16 +1141,25 @@
 static bool
 _equalCreateExternalStmt(CreateExternalStmt *a, CreateExternalStmt *b)
 {
-	COMPARE_NODE_FIELD(relation);
-	COMPARE_NODE_FIELD(tableElts);
+	COMPARE_SCALAR_FIELD(base.relKind);
+	COMPARE_NODE_FIELD(base.relation);
+	COMPARE_NODE_FIELD(base.tableElts);
+	COMPARE_NODE_FIELD(base.inhRelations);
+	COMPARE_NODE_FIELD(base.constraints);
+	COMPARE_NODE_FIELD(base.options);
+	COMPARE_SCALAR_FIELD(base.oncommit);
+	COMPARE_STRING_FIELD(base.tablespacename);
+	COMPARE_NODE_FIELD(base.distributedBy);
+	COMPARE_SCALAR_FIELD(base.is_part_child);
+	COMPARE_SCALAR_FIELD(base.is_add_part);
 	COMPARE_NODE_FIELD(exttypedesc);
 	COMPARE_STRING_FIELD(format);
-	COMPARE_NODE_FIELD(formatOpts);
 	COMPARE_SCALAR_FIELD(isweb);
 	COMPARE_SCALAR_FIELD(iswritable);
+	COMPARE_SCALAR_FIELD(forceCreateDir);
+	COMPARE_STRING_FIELD(parentPath);
 	COMPARE_NODE_FIELD(sreh);
 	COMPARE_NODE_FIELD(encoding);
-	COMPARE_NODE_FIELD(distributedBy);
 
 	return true;
 }
diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c
index 1ebd11f..507ef53 100644
--- a/src/backend/nodes/outfast.c
+++ b/src/backend/nodes/outfast.c
@@ -2006,14 +2006,17 @@
 {
 	WRITE_NODE_TYPE("CREATESTMT");
 
-	WRITE_NODE_FIELD(relation);
-	WRITE_NODE_FIELD(tableElts);
-	WRITE_NODE_FIELD(inhRelations);
-	WRITE_NODE_FIELD(constraints);
-	WRITE_NODE_FIELD(options);
-	WRITE_ENUM_FIELD(oncommit, OnCommitAction);
-	WRITE_STRING_FIELD(tablespacename);
-	WRITE_NODE_FIELD(distributedBy);
+	WRITE_CHAR_FIELD(base.relKind);
+	WRITE_NODE_FIELD(base.relation);
+	WRITE_NODE_FIELD(base.tableElts);
+	WRITE_NODE_FIELD(base.inhRelations);
+	WRITE_NODE_FIELD(base.constraints);
+	WRITE_NODE_FIELD(base.options);
+	WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+	WRITE_STRING_FIELD(base.tablespacename);
+	WRITE_NODE_FIELD(base.distributedBy);
+	WRITE_BOOL_FIELD(base.is_part_child);
+	WRITE_BOOL_FIELD(base.is_add_part);
 	WRITE_OID_FIELD(oidInfo.relOid);
 	WRITE_OID_FIELD(oidInfo.comptypeOid);
 	WRITE_OID_FIELD(oidInfo.toastOid);
@@ -2025,13 +2028,10 @@
 	WRITE_OID_FIELD(oidInfo.aoblkdirOid);
 	WRITE_OID_FIELD(oidInfo.aoblkdirIndexOid);
 	WRITE_OID_FIELD(oidInfo.aoblkdirComptypeOid);
-	WRITE_CHAR_FIELD(relKind);
 	WRITE_CHAR_FIELD(relStorage);
 	/* policy omitted */
 	/* postCreate - for analysis, QD only */
 	/* deferredStmts - for analysis, QD only */
-	WRITE_BOOL_FIELD(is_part_child);
-	WRITE_BOOL_FIELD(is_add_part);
 	WRITE_BOOL_FIELD(is_split_part);
 	WRITE_OID_FIELD(ownerid);
 	WRITE_BOOL_FIELD(buildAoBlkdir);
@@ -2232,16 +2232,26 @@
 {
 	WRITE_NODE_TYPE("CREATEEXTERNALSTMT");
 
-	WRITE_NODE_FIELD(relation);
-	WRITE_NODE_FIELD(tableElts);
+	WRITE_CHAR_FIELD(base.relKind);
+	WRITE_NODE_FIELD(base.relation);
+	WRITE_NODE_FIELD(base.tableElts);
+	WRITE_NODE_FIELD(base.inhRelations);
+	WRITE_NODE_FIELD(base.constraints);
+	WRITE_NODE_FIELD(base.options);
+	WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+	WRITE_STRING_FIELD(base.tablespacename);
+	WRITE_NODE_FIELD(base.distributedBy);
+	WRITE_BOOL_FIELD(base.is_part_child);
+	WRITE_BOOL_FIELD(base.is_add_part);
 	WRITE_NODE_FIELD(exttypedesc);
 	WRITE_STRING_FIELD(format);
-	WRITE_NODE_FIELD(formatOpts);
 	WRITE_BOOL_FIELD(isweb);
 	WRITE_BOOL_FIELD(iswritable);
+	WRITE_BOOL_FIELD(isexternal);
+	WRITE_BOOL_FIELD(forceCreateDir);
+	WRITE_STRING_FIELD(parentPath);
 	WRITE_NODE_FIELD(sreh);
 	WRITE_NODE_FIELD(encoding);
-	WRITE_NODE_FIELD(distributedBy);
 
 }
 
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index cf6bf04..5894184 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2111,15 +2111,18 @@
 {
 	WRITE_NODE_TYPE("CREATESTMT");
 
-	WRITE_NODE_FIELD(relation);
-	WRITE_NODE_FIELD(tableElts);
-	WRITE_NODE_FIELD(inhRelations);
-	WRITE_NODE_FIELD(constraints);
-	WRITE_NODE_FIELD(options);
-	WRITE_ENUM_FIELD(oncommit, OnCommitAction);
-	WRITE_STRING_FIELD(tablespacename);
-	WRITE_NODE_FIELD(distributedBy);
-	WRITE_NODE_FIELD(partitionBy);
+	WRITE_CHAR_FIELD(base.relKind);
+	WRITE_NODE_FIELD(base.relation);
+	WRITE_NODE_FIELD(base.tableElts);
+	WRITE_NODE_FIELD(base.inhRelations);
+	WRITE_NODE_FIELD(base.constraints);
+	WRITE_NODE_FIELD(base.options);
+	WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+	WRITE_STRING_FIELD(base.tablespacename);
+	WRITE_NODE_FIELD(base.distributedBy);
+	WRITE_BOOL_FIELD(base.is_part_child);
+	WRITE_BOOL_FIELD(base.is_add_part);
+	WRITE_NODE_FIELD(base.partitionBy);
 	WRITE_OID_FIELD(oidInfo.relOid);
 	WRITE_OID_FIELD(oidInfo.comptypeOid);
 	WRITE_OID_FIELD(oidInfo.toastOid);
@@ -2131,13 +2134,10 @@
 	WRITE_OID_FIELD(oidInfo.aoblkdirOid);
 	WRITE_OID_FIELD(oidInfo.aoblkdirIndexOid);
 	WRITE_OID_FIELD(oidInfo.aoblkdirComptypeOid);
-	WRITE_CHAR_FIELD(relKind);
 	WRITE_CHAR_FIELD(relStorage);
 	/* policy omitted */
 	/* postCreate omitted */
 	WRITE_NODE_FIELD(deferredStmts);
-	WRITE_BOOL_FIELD(is_part_child);
-	WRITE_BOOL_FIELD(is_add_part);
 	WRITE_BOOL_FIELD(is_split_part);
 	WRITE_OID_FIELD(ownerid);
 	WRITE_BOOL_FIELD(buildAoBlkdir);
@@ -2170,16 +2170,27 @@
 {
 	WRITE_NODE_TYPE("CREATEEXTERNALSTMT");
 
-	WRITE_NODE_FIELD(relation);
-	WRITE_NODE_FIELD(tableElts);
+	WRITE_CHAR_FIELD(base.relKind);
+	WRITE_NODE_FIELD(base.relation);
+	WRITE_NODE_FIELD(base.tableElts);
+	WRITE_NODE_FIELD(base.inhRelations);
+	WRITE_NODE_FIELD(base.constraints);
+	WRITE_NODE_FIELD(base.options);
+	WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+	WRITE_STRING_FIELD(base.tablespacename);
+	WRITE_NODE_FIELD(base.distributedBy);
+	WRITE_BOOL_FIELD(base.is_part_child);
+	WRITE_BOOL_FIELD(base.is_add_part);
+	WRITE_NODE_FIELD(base.partitionBy);
 	WRITE_NODE_FIELD(exttypedesc);
 	WRITE_STRING_FIELD(format);
-	WRITE_NODE_FIELD(formatOpts);
 	WRITE_BOOL_FIELD(isweb);
 	WRITE_BOOL_FIELD(iswritable);
+	WRITE_BOOL_FIELD(isexternal);
+	WRITE_BOOL_FIELD(forceCreateDir);
+	WRITE_STRING_FIELD(parentPath);
 	WRITE_NODE_FIELD(sreh);
 	WRITE_NODE_FIELD(encoding);
-	WRITE_NODE_FIELD(distributedBy);
 }
 
 static void
diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c
index f9aee80..2cc7035 100644
--- a/src/backend/nodes/readfast.c
+++ b/src/backend/nodes/readfast.c
@@ -2033,14 +2033,17 @@
 {
 	READ_LOCALS(CreateStmt);
 
-	READ_NODE_FIELD(relation);
-	READ_NODE_FIELD(tableElts);
-	READ_NODE_FIELD(inhRelations);
-	READ_NODE_FIELD(constraints);
-	READ_NODE_FIELD(options);
-	READ_ENUM_FIELD(oncommit,OnCommitAction);
-	READ_STRING_FIELD(tablespacename);
-	READ_NODE_FIELD(distributedBy);
+	READ_CHAR_FIELD(base.relKind);
+	READ_NODE_FIELD(base.relation);
+	READ_NODE_FIELD(base.tableElts);
+	READ_NODE_FIELD(base.inhRelations);
+	READ_NODE_FIELD(base.constraints);
+	READ_NODE_FIELD(base.options);
+	READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+	READ_STRING_FIELD(base.tablespacename);
+	READ_NODE_FIELD(base.distributedBy);
+	READ_BOOL_FIELD(base.is_part_child);
+	READ_BOOL_FIELD(base.is_add_part);
 	READ_OID_FIELD(oidInfo.relOid);
 	READ_OID_FIELD(oidInfo.comptypeOid);
 	READ_OID_FIELD(oidInfo.toastOid);
@@ -2052,13 +2055,10 @@
 	READ_OID_FIELD(oidInfo.aoblkdirOid);
 	READ_OID_FIELD(oidInfo.aoblkdirIndexOid);
 	READ_OID_FIELD(oidInfo.aoblkdirComptypeOid);
-	READ_CHAR_FIELD(relKind);
 	READ_CHAR_FIELD(relStorage);
 	/* policy omitted */
 	/* postCreate - for analysis, QD only */
 	/* deferredStmts - for analysis, QD only */
-	READ_BOOL_FIELD(is_part_child);
-	READ_BOOL_FIELD(is_add_part);
 	READ_BOOL_FIELD(is_split_part);
 	READ_OID_FIELD(ownerid);
 	READ_BOOL_FIELD(buildAoBlkdir);
@@ -2070,16 +2070,16 @@
 	 * Some extra checks to make sure we didn't get lost
 	 * during serialization/deserialization
 	 */
-	Assert(local_node->relKind == RELKIND_INDEX ||
-		   local_node->relKind == RELKIND_RELATION ||
-		   local_node->relKind == RELKIND_SEQUENCE ||
-		   local_node->relKind == RELKIND_UNCATALOGED ||
-		   local_node->relKind == RELKIND_TOASTVALUE ||
-		   local_node->relKind == RELKIND_VIEW ||
-		   local_node->relKind == RELKIND_COMPOSITE_TYPE ||
-		   local_node->relKind == RELKIND_AOSEGMENTS ||
-		   local_node->relKind == RELKIND_AOBLOCKDIR);
-	Assert(local_node->oncommit <= ONCOMMIT_DROP);
+	Assert(local_node->base.relKind == RELKIND_INDEX ||
+		   local_node->base.relKind == RELKIND_RELATION ||
+		   local_node->base.relKind == RELKIND_SEQUENCE ||
+		   local_node->base.relKind == RELKIND_UNCATALOGED ||
+		   local_node->base.relKind == RELKIND_TOASTVALUE ||
+		   local_node->base.relKind == RELKIND_VIEW ||
+		   local_node->base.relKind == RELKIND_COMPOSITE_TYPE ||
+		   local_node->base.relKind == RELKIND_AOSEGMENTS ||
+		   local_node->base.relKind == RELKIND_AOBLOCKDIR);
+	Assert(local_node->base.oncommit <= ONCOMMIT_DROP);
 
 	READ_DONE();
 }
@@ -2316,16 +2316,26 @@
 {
 	READ_LOCALS(CreateExternalStmt);
 
-	READ_NODE_FIELD(relation);
-	READ_NODE_FIELD(tableElts);
+	READ_CHAR_FIELD(base.relKind);
+	READ_NODE_FIELD(base.relation);
+	READ_NODE_FIELD(base.tableElts);
+	READ_NODE_FIELD(base.inhRelations);
+	READ_NODE_FIELD(base.constraints);
+	READ_NODE_FIELD(base.options);
+	READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+	READ_STRING_FIELD(base.tablespacename);
+	READ_NODE_FIELD(base.distributedBy);
+	READ_BOOL_FIELD(base.is_part_child);
+	READ_BOOL_FIELD(base.is_add_part);
 	READ_NODE_FIELD(exttypedesc);
 	READ_STRING_FIELD(format);
-	READ_NODE_FIELD(formatOpts);
 	READ_BOOL_FIELD(isweb);
 	READ_BOOL_FIELD(iswritable);
+	READ_BOOL_FIELD(isexternal);
+	READ_BOOL_FIELD(forceCreateDir);
+	READ_STRING_FIELD(parentPath);
 	READ_NODE_FIELD(sreh);
 	READ_NODE_FIELD(encoding);
-	READ_NODE_FIELD(distributedBy);
 
 	READ_DONE();
 }
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index cbfbb53..dc7de27 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2154,16 +2154,18 @@
 {
 	READ_LOCALS(CreateStmt);
 
-	READ_NODE_FIELD(relation);
-	READ_NODE_FIELD(tableElts);
-	READ_NODE_FIELD(inhRelations);
-	READ_NODE_FIELD(constraints);
-
-	READ_NODE_FIELD(options);
-	READ_ENUM_FIELD(oncommit,OnCommitAction);
-	READ_STRING_FIELD(tablespacename);
-	READ_NODE_FIELD(distributedBy);
-	READ_NODE_FIELD(partitionBy);
+	READ_CHAR_FIELD(base.relKind);
+	READ_NODE_FIELD(base.relation);
+	READ_NODE_FIELD(base.tableElts);
+	READ_NODE_FIELD(base.inhRelations);
+	READ_NODE_FIELD(base.constraints);
+	READ_NODE_FIELD(base.options);
+	READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+	READ_STRING_FIELD(base.tablespacename);
+	READ_NODE_FIELD(base.distributedBy);
+	READ_BOOL_FIELD(base.is_part_child);
+	READ_BOOL_FIELD(base.is_add_part);
+	READ_NODE_FIELD(base.partitionBy);
 	READ_OID_FIELD(oidInfo.relOid);
 	READ_OID_FIELD(oidInfo.comptypeOid);
 	READ_OID_FIELD(oidInfo.toastOid);
@@ -2175,13 +2177,10 @@
 	READ_OID_FIELD(oidInfo.aoblkdirOid);
 	READ_OID_FIELD(oidInfo.aoblkdirIndexOid);
 	READ_OID_FIELD(oidInfo.aoblkdirComptypeOid);
-	READ_CHAR_FIELD(relKind);
 	READ_CHAR_FIELD(relStorage);
 	/* policy omitted */
 	/* postCreate omitted */
 	READ_NODE_FIELD(deferredStmts);
-	READ_BOOL_FIELD(is_part_child);
-	READ_BOOL_FIELD(is_add_part);
 	READ_BOOL_FIELD(is_split_part);
 	READ_OID_FIELD(ownerid);
 	READ_BOOL_FIELD(buildAoBlkdir);
@@ -2300,16 +2299,27 @@
 {
 	READ_LOCALS(CreateExternalStmt);
 
-	READ_NODE_FIELD(relation);
-	READ_NODE_FIELD(tableElts);
+	READ_CHAR_FIELD(base.relKind);
+	READ_NODE_FIELD(base.relation);
+	READ_NODE_FIELD(base.tableElts);
+	READ_NODE_FIELD(base.inhRelations);
+	READ_NODE_FIELD(base.constraints);
+	READ_NODE_FIELD(base.options);
+	READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+	READ_STRING_FIELD(base.tablespacename);
+	READ_NODE_FIELD(base.distributedBy);
+	READ_BOOL_FIELD(base.is_part_child);
+	READ_BOOL_FIELD(base.is_add_part);
+	READ_NODE_FIELD(base.partitionBy);
 	READ_NODE_FIELD(exttypedesc);
 	READ_STRING_FIELD(format);
-	READ_NODE_FIELD(formatOpts);
 	READ_BOOL_FIELD(isweb);
 	READ_BOOL_FIELD(iswritable);
+	READ_BOOL_FIELD(isexternal);
+	READ_BOOL_FIELD(forceCreateDir);
+	READ_STRING_FIELD(parentPath);
 	READ_NODE_FIELD(sreh);
 	READ_NODE_FIELD(encoding);
-	READ_NODE_FIELD(distributedBy);
 	local_node->policy = NULL;
 	
 	READ_DONE();
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 7a7f261..cc8fc0e 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1127,6 +1127,11 @@
 	return false;
 }
 
+bool is_hdfs_protocol(Uri *uri)
+{
+	return uri->protocol == URI_HDFS;
+}
+
 /*
  * create plan for pxf
  */
@@ -1441,6 +1446,10 @@
 		segdb_file_map = create_pxf_plan(segdb_file_map, rel, total_primaries, ctx, scan_relid);
 
 	}
+	else if (using_location && is_hdfs_protocol(uri))
+	{
+			// nothing to do
+	}
 	/* (2) */
 	else if(using_location && (uri->protocol == URI_GPFDIST || 
 							   uri->protocol == URI_GPFDISTS || 
@@ -1839,17 +1848,30 @@
 	/* data encoding */
 	encoding = rel->ext_encoding;
 
-	scan_plan = make_externalscan(tlist,
-								  scan_clauses,
-								  scan_relid,
-								  filenames,
-								  fmtopts,
-								  rel->fmttype,
-								  ismasteronly,
-								  rejectlimit,
-								  islimitinrows,
-								  fmtErrTblOid,
-								  encoding);
+	if (using_location && (is_hdfs_protocol(uri)))
+		scan_plan = make_externalscan(tlist,
+										  scan_clauses,
+										  scan_relid,
+										  rel->locationlist,
+										  fmtopts,
+										  rel->fmttype,
+										  ismasteronly,
+										  rejectlimit,
+										  islimitinrows,
+										  fmtErrTblOid,
+										  encoding);
+	else
+		scan_plan = make_externalscan(tlist,
+											scan_clauses,
+											scan_relid,
+											filenames,
+											fmtopts,
+											rel->fmttype,
+											ismasteronly,
+											rejectlimit,
+											islimitinrows,
+											fmtErrTblOid,
+											encoding);
 
 	copy_path_costsize(ctx->root, &scan_plan->scan.plan, best_path);
 
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index d754df3..e34da95 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -318,11 +318,7 @@
     /*
      * Now, we want to allocate resource.
      */
-    allocResult = calculate_planner_segment_num(my_parse, plannedstmt->resource->life,
-                                        plannedstmt->rtable, plannedstmt->intoPolicy,
-                                        plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1,
-                                        -1);
-
+    allocResult = calculate_planner_segment_num(plannedstmt, my_parse, plannedstmt->resource->life, -1);
     Assert(allocResult);
 
     ppResult->saResult = *allocResult;
@@ -628,9 +624,7 @@
             /*
              * Now, we want to allocate resource.
              */
-            allocResult = calculate_planner_segment_num(my_parse, resourceLife,
-                    plannedstmt->rtable, plannedstmt->intoPolicy,
-                    plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1, -1);
+            allocResult = calculate_planner_segment_num(plannedstmt, my_parse, resourceLife, -1);
 
             Assert(allocResult);
 
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 5024389..3d953c2 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -41,10 +41,15 @@
  *-------------------------------------------------------------------------
  */
 
+
 #include "postgres.h"
+#include "port.h"
+
+#include <uuid/uuid.h>
 
 #include "access/heapam.h"
 #include "access/reloptions.h"
+#include "access/plugstorage.h"
 #include "catalog/catquery.h"
 #include "catalog/gp_policy.h"
 #include "catalog/heap.h"
@@ -52,6 +57,8 @@
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_compression.h"
+#include "catalog/pg_constraint.h"
+#include "catalog/pg_exttable.h"
 #include "catalog/pg_partition.h"
 #include "catalog/pg_partition_rule.h"
 #include "catalog/pg_operator.h"
@@ -59,6 +66,7 @@
 #include "catalog/pg_type_encoding.h"
 #include "cdb/cdbpartition.h"
 #include "cdb/cdbparquetstoragewrite.h"
+#include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/prepare.h"
 #include "commands/tablecmds.h"
@@ -66,10 +74,14 @@
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "nodes/nodeFuncs.h"
+#include "nodes/nodes.h"
+#include "nodes/pg_list.h"
+#include "nodes/value.h"
 #include "optimizer/clauses.h"
 #include "optimizer/plancat.h"
 #include "optimizer/tlist.h"
 #include "optimizer/var.h"
+#include "optimizer/planmain.h"
 #include "parser/analyze.h"
 #include "parser/gramparse.h"
 #include "parser/parse_agg.h"
@@ -88,7 +100,9 @@
 #include "utils/builtins.h"
 #include "utils/datum.h"
 #include "utils/lsyscache.h"
+#include "utils/palloc.h"
 #include "utils/syscache.h"
+#include "utils/uri.h"
 
 #include "cdb/cdbappendonlyam.h"
 #include "cdb/cdbvars.h"
@@ -228,6 +242,9 @@
 static void transformTableConstraint(ParseState *pstate,
 						 CreateStmtContext *cxt,
 						 Constraint *constraint);
+static void transformExtTableConstraint(ParseState *pstate,
+                                        CreateStmtContext *cxt,
+                                        Constraint *constraint);
 static void transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
 									 List *distributedBy, GpPolicy **policyp, List *options,
 									 List *likeDistributedBy,
@@ -235,7 +252,7 @@
 									 bool iswritable,
 									 bool onmaster);
 static void transformDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
-								   List *distributedBy, GpPolicy ** policy, List *options, 
+								   List *distributedBy, GpPolicy ** policy, List *options,
 								   List *likeDistributedBy,
 								   bool bQuiet);
 static void transformPartitionBy(ParseState *pstate,
@@ -523,7 +540,7 @@
 	 */
 	if (pstate->parentParseState == NULL && query->utilityStmt &&
 		IsA(query->utilityStmt, CreateStmt) &&
-		((CreateStmt *)query->utilityStmt)->partitionBy)
+		((CreateStmt *)query->utilityStmt)->base.partitionBy)
 	{
 		/*
 		 * We just break the statements into two lists: alter statements and
@@ -1572,7 +1589,7 @@
 		return;
 
 	/* Generate a hash table for all the columns */
-	foreach(lc, stmt->tableElts)
+	foreach(lc, stmt->base.tableElts)
 	{
 		Node *n = lfirst(lc);
 
@@ -1600,7 +1617,7 @@
 				cacheFlags = HASH_ELEM;
 
 				ht = hash_create("column info cache",
-								 list_length(stmt->tableElts),
+								 list_length(stmt->base.tableElts),
 								 &cacheInfo, cacheFlags);
 			}
 
@@ -1620,7 +1637,7 @@
 						 errmsg("column \"%s\" duplicated",
 								colname),
 						 errOmitLocation(true)));
-				
+
 			}
 			ce->count = 0;
 		}
@@ -1715,7 +1732,7 @@
 		Datum options;
 		bool isnull;
 
-		options = caql_getattr(pcqCtx, 
+		options = caql_getattr(pcqCtx,
 							   Anum_pg_type_encoding_typoptions,
 							   &isnull);
 
@@ -1730,7 +1747,7 @@
 
 /*
  * Make a default column storage directive from a WITH clause
- * Ignore options in the WITH clause that don't appear in 
+ * Ignore options in the WITH clause that don't appear in
  * storage_directives for column-level compression.
  */
 List *
@@ -1848,7 +1865,7 @@
 				 * try and set the same options!
 				 */
 
-				if (encodings_overlap(stmt->options, c->encoding, false))
+				if (encodings_overlap(stmt->base.options, c->encoding, false))
 					ereport(ERROR,
 						    (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
 						 	 errmsg("DEFAULT COLUMN ENCODING clause cannot "
@@ -1863,7 +1880,7 @@
 	 */
 	if (!deflt)
 	{
-		tmpenc = form_default_storage_directive(stmt->options);
+		tmpenc = form_default_storage_directive(stmt->base.options);
 	}
 	else
 	{
@@ -1875,7 +1892,7 @@
 		deflt = makeNode(ColumnReferenceStorageDirective);
 		deflt->deflt = true;
 		deflt->encoding = transformStorageEncodingClause(tmpenc);
-	}	
+	}
 
 	/*
 	 * Loop over all columns. If a column has a column reference storage clause
@@ -1981,11 +1998,12 @@
 
 
 	cxt.stmtType = "CREATE TABLE";
-	cxt.relation = stmt->relation;
-	cxt.inhRelations = stmt->inhRelations;
+	cxt.isExternalTable = false;
+	cxt.relation = stmt->base.relation;
+	cxt.inhRelations = stmt->base.inhRelations;
 	cxt.isalter = false;
-	cxt.isaddpart = stmt->is_add_part;
-	cxt.columns = NIL;
+	cxt.isaddpart = stmt->base.is_add_part;
+    cxt.columns = NIL;
 	cxt.ckconstraints = NIL;
 	cxt.fkconstraints = NIL;
 	cxt.ixconstraints = NIL;
@@ -1994,12 +2012,12 @@
 	cxt.alist = NIL;
 	cxt.dlist = NIL; /* for deferred analysis requiring the created table */
 	cxt.pkey = NULL;
-	cxt.hasoids = interpretOidsOption(stmt->options);
+    cxt.hasoids = interpretOidsOption(stmt->base.options);
 
 	stmt->policy = NULL;
 
 	/* Disallow inheritance in combination with partitioning. */
-	if (stmt->inhRelations && (stmt->partitionBy || stmt->is_part_child ))
+	if (stmt->base.inhRelations && (stmt->base.partitionBy || stmt->base.is_part_child ))
 	{
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
@@ -2007,7 +2025,7 @@
 	}
 
 	/* Only on top-most partitioned tables. */
-	if ( stmt->partitionBy && !stmt->is_part_child )
+	if ( stmt->base.partitionBy && !stmt->base.is_part_child )
 	{
 		fixCreateStmtForPartitionedTable(stmt);
 	}
@@ -2016,7 +2034,7 @@
 	 * Run through each primary element in the table creation clause. Separate
 	 * column defs from constraints, and do preliminary analysis.
 	 */
-	foreach(elements, stmt->tableElts)
+	foreach(elements, stmt->base.tableElts)
 	{
 		Node	   *element = lfirst(elements);
 
@@ -2045,8 +2063,8 @@
 										 (InhRelation *) element, false);
 
 					if (Gp_role == GP_ROLE_DISPATCH && isBeginning &&
-						stmt->distributedBy == NIL &&
-						stmt->inhRelations == NIL &&
+						stmt->base.distributedBy == NIL &&
+						stmt->base.inhRelations == NIL &&
 						stmt->policy == NULL)
 					{
 						likeDistributedBy = getLikeDistributionPolicy((InhRelation *) element);
@@ -2079,7 +2097,7 @@
 	/*
 	 * Postprocess constraints that give rise to index definitions.
 	 */
-	transformIndexConstraints(pstate, &cxt, stmt->is_add_part || stmt->is_split_part);
+	transformIndexConstraints(pstate, &cxt, stmt->base.is_add_part || stmt->is_split_part);
 
 	/*
 	 * Carry any deferred analysis statements forward.  Added for MPP-13750
@@ -2095,7 +2113,7 @@
 	 * Postprocess foreign-key constraints.
 	 * But don't cascade FK constraints to parts, yet.
 	 */
-	if ( ! stmt->is_part_child )
+	if ( ! stmt->base.is_part_child )
 		transformFKConstraints(pstate, &cxt, true, false);
 
 	/*
@@ -2113,9 +2131,9 @@
 	 * to the partition which the user wants to be non-AO. Just ignore it
 	 * instead.
 	 */
-	if (stmt->is_part_child)
+	if (stmt->base.is_part_child)
 	{
-		if (co_explicitly_disabled(stmt->options) || !stenc)
+		if (co_explicitly_disabled(stmt->base.options) || !stenc)
 			stmt->attr_encodings = NIL;
 		else
 		{
@@ -2132,29 +2150,29 @@
 	/*
 	 * Postprocess Greenplum Database distribution columns
 	 */
-	if (stmt->is_part_child ||
-		(stmt->partitionBy &&
+	if (stmt->base.is_part_child ||
+		(stmt->base.partitionBy &&
 		 (
 			 /* be very quiet if set subpartn template */
-			 (((PartitionBy *)(stmt->partitionBy))->partQuiet ==
+			 (((PartitionBy *)(stmt->base.partitionBy))->partQuiet ==
 			  PART_VERBO_NOPARTNAME) ||
 			 (
 				 /* quiet for partitions of depth > 0 */
-				 (((PartitionBy *)(stmt->partitionBy))->partDepth != 0) &&
-				 (((PartitionBy *)(stmt->partitionBy))->partQuiet !=
+				 (((PartitionBy *)(stmt->base.partitionBy))->partDepth != 0) &&
+				 (((PartitionBy *)(stmt->base.partitionBy))->partQuiet !=
 				  PART_VERBO_NORMAL)
 					 )
 				 )
 				))
 			bQuiet = true; /* silence distro messages for partitions */
 
-	transformDistributedBy(pstate, &cxt, stmt->distributedBy, &stmt->policy, stmt->options,
+	transformDistributedBy(pstate, &cxt, stmt->base.distributedBy, &stmt->policy, stmt->base.options,
 						   likeDistributedBy, bQuiet);
 
 	/*
 	 * Process table partitioning clause
 	 */
-	transformPartitionBy(pstate, &cxt, stmt, stmt->partitionBy, stmt->policy);
+	transformPartitionBy(pstate, &cxt, stmt, stmt->base.partitionBy, stmt->policy);
 
 	/*
 	 * Output results.
@@ -2162,147 +2180,657 @@
 	q = makeNode(Query);
 	q->commandType = CMD_UTILITY;
 	q->utilityStmt = (Node *) stmt;
-	stmt->tableElts = cxt.columns;
-	stmt->constraints = cxt.ckconstraints;
+	stmt->base.tableElts = cxt.columns;
+	stmt->base.constraints = cxt.ckconstraints;
 	*extras_before = list_concat(*extras_before, cxt.blist);
 	*extras_after = list_concat(cxt.alist, *extras_after);
 
 	return q;
 }
 
-static Query *
-transformCreateExternalStmt(ParseState *pstate, CreateExternalStmt *stmt,
-							List **extras_before, List **extras_after)
-{
-	CreateStmtContext cxt;
-	Query	   *q;
-	ListCell   *elements;
-	ExtTableTypeDesc	*exttypeDesc = NULL;
-	List  	   *likeDistributedBy = NIL;
-	bool	    bQuiet = false;	/* shut up transformDistributedBy messages */
-	bool		onmaster = false;
-	bool		iswritable = stmt->iswritable;
+enum PreDefinedFormatterOptionVALTYPE {
+  PREDEF_FMTOPT_VAL_NO,
+  PREDEF_FMTOPT_VAL_STRING,
+  PREDEF_FMTOPT_VAL_SIGNEDINTEGER,
+  PREDEF_FMTOPT_VAL_COLNAMELIST
+};
 
-	cxt.stmtType = "CREATE EXTERNAL TABLE";
-	cxt.relation = stmt->relation;
-	cxt.inhRelations = NIL;
-	cxt.hasoids = false;
-	cxt.isalter = false;
-	cxt.columns = NIL;
-	cxt.ckconstraints = NIL;
-	cxt.fkconstraints = NIL;
-	cxt.ixconstraints = NIL;
-	cxt.pkey = NULL;
+enum PreDefinedFormatterOptionID {
+  PREDEF_FMT_OPT_ID_DELIMITER,
+  PREDEF_FMT_OPT_ID_NULL,
+  PREDEF_FMT_OPT_ID_HEADER,
+  PREDEF_FMT_OPT_ID_QUOTE,
+  PREDEF_FMT_OPT_ID_ESCAPE,
+  PREDEF_FMT_OPT_ID_FORCENOTNULL,
+  PREDEF_FMT_OPT_ID_FORCEQUOTE,
+  PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS,
+  PREDEF_FMT_OPT_ID_NEWLINE,
+  PREDEF_FMT_OPT_ID_UNPREDEFINED,
+  PREDEF_FMT_OPT_ID_ILLEGAL
+};
 
-	cxt.blist = NIL;
-	cxt.alist = NIL;
+typedef struct PreDefinedFormatterOption {
+  char keyword[3][32];
+  int nKeyword;
+  bool hasValue;
+  enum PreDefinedFormatterOptionVALTYPE valueType;
+  enum PreDefinedFormatterOptionID optID;
+} PreDefinedFormatterOption;
 
-	/*
-	 * Run through each primary element in the table creation clause. Separate
-	 * column defs from constraints, and do preliminary analysis.
-	 */
-	foreach(elements, stmt->tableElts)
-	{
-		Node	   *element = lfirst(elements);
+#define PREDEF_FMTOPT_SIZE 9
 
-		switch (nodeTag(element))
-		{
-			case T_ColumnDef:
-				transformColumnDefinition(pstate, &cxt,
-										  (ColumnDef *) element);
-				break;
+enum PreDefinedFormatterOptionID MatchExternalRelationFormatterOption(
+    PreDefinedFormatterOption *options, ListCell *head) {
+  ListCell *p1 = head;
+  ListCell *p2 = head->next;
+  ListCell *p3 = p2 == NULL ? NULL : p2->next;
+  ListCell *p4 = p3 == NULL ? NULL : p3->next;
 
-			case T_Constraint:
-			case T_FkConstraint:
-				/* should never happen. If it does fix gram.y */
-				elog(ERROR, "node type %d not supported for external tables",
-					 (int) nodeTag(element));
-				break;
+  DefElem *de1 = (DefElem *)lfirst(p1);
+  DefElem *de2 = p2 == NULL ? NULL : (DefElem *)lfirst(p2);
+  DefElem *de3 = p3 == NULL ? NULL : (DefElem *)lfirst(p3);
+  DefElem *de4 = p4 == NULL ? NULL : (DefElem *)lfirst(p4);
 
-			case T_InhRelation:
-				{
-					/* LIKE */
-					bool	isBeginning = (cxt.columns == NIL);
+  if (strcmp("#ident", de1->defname) != 0) {
+    return PREDEF_FMT_OPT_ID_ILLEGAL; /* must start with a #ident elem */
+  }
 
-					transformInhRelation(pstate, &cxt,
-										 (InhRelation *) element, true);
+  for (int i = 0; i < PREDEF_FMTOPT_SIZE; ++i) {
+    PreDefinedFormatterOption *pdOpt = &(options[i]);
+    if (pdOpt->nKeyword == 1 &&
+        strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) == 0) {
+      if (!options[i].hasValue) {
+        return options[i].optID; /* Got no value option */
+      } else if (p2 == NULL) {
+        return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */
+      } else if (((strcmp("#string", de2->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) ||
+                 ((strcmp("#int", de2->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) ||
+                 ((strcmp("#collist", de2->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) ||
+                 ((strcmp("#ident", de2->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) {
+        return options[i].optID; /* Got option having one value */
+      } else {
+        return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */
+      }
+    } else if (pdOpt->nKeyword == 2 && de2 != NULL &&
+               strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) ==
+                   0 &&
+               strcasecmp(pdOpt->keyword[1], ((Value *)(de2->arg))->val.str) ==
+                   0) {
+      if (!options[i].hasValue) {
+        return options[i].optID; /* got no value option */
+      } else if (de3 == NULL) {
+        return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */
+      } else if (((strcmp("#string", de3->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) ||
+                 ((strcmp("#int", de3->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) ||
+                 ((strcmp("#collist", de3->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) ||
+                 ((strcmp("#ident", de3->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) {
+        return options[i].optID; /* Got option having one value */
+      } else {
+        return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */
+      }
+    } else if (pdOpt->nKeyword == 3 && de2 != NULL && de3 != NULL &&
+               strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) ==
+                   0 &&
+               strcasecmp(pdOpt->keyword[1], ((Value *)(de2->arg))->val.str) ==
+                   0 &&
+               strcasecmp(pdOpt->keyword[2], ((Value *)(de3->arg))->val.str) ==
+                   0) {
+      if (!options[i].hasValue) {
+        return options[i].optID; /* got no value option */
+      } else if (de4 == NULL) {
+        return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */
+      } else if (((strcmp("#string", de4->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) ||
+                 ((strcmp("#int", de4->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) ||
+                 ((strcmp("#collist", de4->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) ||
+                 ((strcmp("#ident", de4->defname) == 0) &&
+                  (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) {
+        return options[i].optID; /* Got option having one value */
+      } else {
+        return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */
+      }
+    }
+  }
 
-					if (Gp_role == GP_ROLE_DISPATCH && isBeginning &&
-						stmt->distributedBy == NIL &&
-						stmt->policy == NULL &&
-						iswritable /* dont bother if readable table */)
-					{
-						likeDistributedBy = getLikeDistributionPolicy((InhRelation *) element);
-					}
-				}
-				break;
+  /*
+   * We expect user defined special options which should be consumed
+   * further by customized formatter.
+   */
+  return PREDEF_FMT_OPT_ID_UNPREDEFINED;
+}
 
-			default:
-				elog(ERROR, "unrecognized node type: %d",
-					 (int) nodeTag(element));
-				break;
-		}
-	}
+void recognizeExternalRelationFormatterOptions(
+    CreateExternalStmt *createExtStmt) {
+  PreDefinedFormatterOption options[PREDEF_FMTOPT_SIZE] = {
+      {{"delimiter", "", ""},
+       1,
+       true,
+       PREDEF_FMTOPT_VAL_STRING,
+       PREDEF_FMT_OPT_ID_DELIMITER},
+      {{"null", "", ""},
+       1,
+       true,
+       PREDEF_FMTOPT_VAL_STRING,
+       PREDEF_FMT_OPT_ID_NULL},
+      {{"header", "", ""},
+       1,
+       false,
+       PREDEF_FMTOPT_VAL_NO,
+       PREDEF_FMT_OPT_ID_HEADER},
+      {{"quote", "", ""},
+       1,
+       true,
+       PREDEF_FMTOPT_VAL_STRING,
+       PREDEF_FMT_OPT_ID_QUOTE},
+      {{"escape", "", ""},
+       1,
+       true,
+       PREDEF_FMTOPT_VAL_STRING,
+       PREDEF_FMT_OPT_ID_ESCAPE},
+      {{"force", "not", "null"},
+       3,
+       true,
+       PREDEF_FMTOPT_VAL_COLNAMELIST,
+       PREDEF_FMT_OPT_ID_FORCENOTNULL},
+      {{"force", "quote", ""},
+       2,
+       true,
+       PREDEF_FMTOPT_VAL_COLNAMELIST,
+       PREDEF_FMT_OPT_ID_FORCEQUOTE},
+      {{"fill", "missing", "fields"},
+       3,
+       false,
+       PREDEF_FMTOPT_VAL_NO,
+       PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS},
+      {{"newline", "", ""},
+       1,
+       true,
+       PREDEF_FMTOPT_VAL_STRING,
+       PREDEF_FMT_OPT_ID_NEWLINE}};
 
-	/*
-	 * Check if this is an EXECUTE ON MASTER table. We'll need this information
-	 * in transformExternalDistributedBy. While at it, we also check if an error
-	 * table is attempted to be used on ON MASTER table and error if so.
-	 */
-	if(!iswritable)
-	{
-		exttypeDesc = (ExtTableTypeDesc *)stmt->exttypedesc;
+  List *newOpts = NULL;
+  ListCell *optCell = list_head(createExtStmt->base.options);
 
-		if(exttypeDesc->exttabletype == EXTTBL_TYPE_EXECUTE)
-		{
-			ListCell   *exec_location_opt;
+  /* Add restriction of error lines */
+  if (createExtStmt->sreh != NULL) {
+    /* Handle error table specification and reject number per segment */
+    SingleRowErrorDesc *errDesc = (SingleRowErrorDesc *)createExtStmt->sreh;
+    if (errDesc->rejectlimit > 0 && errDesc->is_hdfs_protocol_text) {
+      newOpts = lappend(
+          newOpts,
+          makeDefElem("reject_limit", makeInteger(errDesc->rejectlimit)));
+      if (errDesc->hdfsLoc)
+        newOpts =
+            lappend(newOpts,
+                    makeDefElem("err_table",
+                                (Node *)makeString(pstrdup(errDesc->hdfsLoc))));
+    }
+  }
 
-			foreach(exec_location_opt, exttypeDesc->on_clause)
-			{
-				DefElem    *defel = (DefElem *) lfirst(exec_location_opt);
+  while (optCell != NULL) {
+    /* Try a match now. */
+    enum PreDefinedFormatterOptionID id =
+        MatchExternalRelationFormatterOption(options, optCell);
+    switch (id) {
+      case PREDEF_FMT_OPT_ID_DELIMITER: {
+        DefElem *de = (DefElem *)lfirst(optCell->next);
+        Value *v = (Value *)(de->arg);
+        DefElem *newde =
+            makeDefElem("delimiter", (Node *)makeString(v->val.str));
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_NULL: {
+        DefElem *de = (DefElem *)lfirst(optCell->next);
+        Value *v = (Value *)(de->arg);
+        DefElem *newde = makeDefElem("null", (Node *)makeString(v->val.str));
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_HEADER: {
+        DefElem *newde = makeDefElem("header", (Node *)makeInteger(TRUE));
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_QUOTE: {
+        DefElem *de = (DefElem *)lfirst(optCell->next);
+        Value *v = (Value *)(de->arg);
+        DefElem *newde = makeDefElem("quote", (Node *)makeString(v->val.str));
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_ESCAPE: {
+        DefElem *de = (DefElem *)lfirst(optCell->next);
+        Value *v = (Value *)(de->arg);
+        DefElem *newde = makeDefElem("escape", (Node *)makeString(v->val.str));
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_FORCENOTNULL: {
+        DefElem *newde = NULL;
+        DefElem *de = (DefElem *)lfirst(optCell->next->next->next);
+        if (strcmp("#ident", de->defname) == 0) {
+          /*
+           * The case there is only one column name which is recognized
+           * as a ident string.
+           */
+          Value *v = (Value *)(de->arg);
+          List *collist = list_make1(makeString(v->val.str));
+          newde = makeDefElem("force_notnull", (Node *)collist);
 
-				if (strcmp(defel->defname, "master") == 0)
-				{
-					SingleRowErrorDesc *srehDesc = (SingleRowErrorDesc *)stmt->sreh;
+        } else {
+          /*
+           * There are multiple column names in a list already
+           * recognized by parser.
+           */
+          List *collist = NULL;
+          ListCell *colCell = NULL;
+          foreach (colCell, (List *)(de->arg)) {
+            collist = lappend(collist,
+                              makeString(((Value *)lfirst(colCell))->val.str));
+            elog(LOG, "recognized column list colname:%s",
+                 ((Value *)lfirst(colCell))->val.str);
+          }
+          newde = makeDefElem("force_notnull", (Node *)collist);
 
-					onmaster = true;
+          /* TODO: check where the old instance is freed */
+        }
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_FORCEQUOTE: {
+        DefElem *newde = NULL;
+        DefElem *de = (DefElem *)lfirst(optCell->next->next);
+        if (strcmp("#ident", de->defname) == 0) {
+          /*
+           * The case there is only one column name which is recognized
+           * as a ident string.
+           */
+          Value *v = (Value *)(de->arg);
+          List *collist = list_make1(makeString(v->val.str));
+          newde = makeDefElem("force_quote", (Node *)collist);
 
-					if(srehDesc && srehDesc->errtable)
-						ereport(ERROR,
-								(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-								 errmsg("External web table with ON MASTER clause "
-										"cannot use error tables.")));
-				}
-			}
-		}
-	}
+        } else {
+          /*
+           * There are multiple column names in a list already
+           * recognized by parser.
+           */
+          List *collist = NULL;
+          ListCell *colCell = NULL;
+          foreach (colCell, (List *)(de->arg)) {
+            collist = lappend(collist,
+                              makeString(((Value *)lfirst(colCell))->val.str));
+            elog(LOG, "recognized column list colname:%s",
+                 ((Value *)lfirst(colCell))->val.str);
+          }
+          newde = makeDefElem("force_quote", (Node *)collist);
 
-	/*
-	 * Check if we need to create an error table. If so, add it to the
-	 * before list.
-	 */
-	if(stmt->sreh && ((SingleRowErrorDesc *)stmt->sreh)->errtable)
-		transformSingleRowErrorHandling(pstate, &cxt,
-										(SingleRowErrorDesc *) stmt->sreh);
+          /* TODO: check where the old instance is freed */
+        }
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS: {
+        DefElem *newde =
+            makeDefElem("fill_missing_fields", (Node *)makeInteger(TRUE));
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_NEWLINE: {
+        DefElem *de = (DefElem *)lfirst(optCell->next);
+        Value *v = (Value *)(de->arg);
+        DefElem *newde = makeDefElem("newline", (Node *)makeString(v->val.str));
+        newOpts = lappend(newOpts, newde);
+        optCell = optCell->next->next;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_UNPREDEFINED: {
+        /*
+         * In case it is a user defined option. we combind all continuous
+         * ident until we see a string constant or a integer constant.
+         * So this means user defined formatter's user defined option
+         * values can only be string or integer values.
+         */
+        int c = 0;
+        int identlength = 0;
+        ListCell *walkerCell = optCell;
+        while (walkerCell != NULL &&
+               strcmp("#ident", ((DefElem *)lfirst(walkerCell))->defname) ==
+                   0) {
+          c++;
+          Value *v = (Value *)(((DefElem *)lfirst(walkerCell))->arg);
+          identlength += strlen(v->val.str) + 1;
+          walkerCell = walkerCell->next;
+        }
 
-	transformETDistributedBy(pstate, &cxt, stmt->distributedBy, &stmt->policy, NULL,/*no WITH options for ET*/
-							 likeDistributedBy, bQuiet, iswritable, onmaster);
+        /* Decide the value part */
+        Node *value = NULL;
+        if (walkerCell == NULL) {
+          /* The case the option without value. we set TRUE for it. */
+          value = makeInteger(TRUE);
+        } else {
+          DefElem *de = (DefElem *)lfirst(walkerCell);
+          if (strcmp("#collist", de->defname) == 0) {
+            /*
+             * We don't accept column name list value types for
+             * customized formatter's user defined options.
+             */
+            ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                            errmsg(
+                                "cannot support column name list as an unknown "
+                                "option's value"),
+                            errOmitLocation(true)));
+          } else if (strcmp("#int", de->defname) == 0) {
+            value = makeInteger(((Value *)(de->arg))->val.ival);
+          } else {
+            value = makeString(((Value *)(de->arg))->val.str);
+          }
+        }
 
-	Assert(cxt.ckconstraints == NIL);
-	Assert(cxt.fkconstraints == NIL);
-	Assert(cxt.ixconstraints == NIL);
+        /* Build key part. */
+        char *newKey = (char *)palloc0(sizeof(char) * identlength);
+        ListCell *walkerCell2 = optCell;
+        int counter = 0;
+        for (; counter < c; counter++, walkerCell2 = walkerCell2->next) {
+          Value *v = (Value *)(((DefElem *)lfirst(walkerCell2))->arg);
+          if (counter > 0) {
+            strcat(newKey, "_");
+          }
+          strcat(newKey, v->val.str);
+        }
 
-	/*
-	 * Output results.
-	 */
-	q = makeNode(Query);
-	q->commandType = CMD_UTILITY;
-	q->utilityStmt = (Node *) stmt;
-	stmt->tableElts = cxt.columns;
-	*extras_before = list_concat(*extras_before, cxt.blist);
-	*extras_after = list_concat(cxt.alist, *extras_after);
+        DefElem *newde = makeDefElem(newKey, (Node *)value);
+        newOpts = lappend(newOpts, newde);
 
-	return q;
+        if (walkerCell)
+          optCell = walkerCell->next;
+        else
+          optCell = NULL;
+        break;
+      }
+      case PREDEF_FMT_OPT_ID_ILLEGAL: {
+        ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("cannot recognize full formatter option list"),
+                        errOmitLocation(true)));
+      }
+    }
+  }
+
+  /* Use new list to replace the old one */
+  createExtStmt->base.options = newOpts;
+}
+
+static Query *transformCreateExternalStmt(ParseState *pstate,
+                                          CreateExternalStmt *stmt,
+                                          List **extras_before,
+                                          List **extras_after) {
+  CreateStmtContext cxt;
+  Query *q;
+  ListCell *elements;
+  ExtTableTypeDesc *desc = NULL;
+  List *likeDistributedBy = NIL;
+  bool bQuiet = false; /* shut up transformDistributedBy messages */
+  bool onmaster = false;
+  bool iswritable = stmt->iswritable;
+  bool isPluggableStorage = false;
+  if (!stmt->forceCreateDir) stmt->forceCreateDir = stmt->iswritable;
+
+  cxt.stmtType = "CREATE EXTERNAL TABLE";
+  cxt.isExternalTable = true;
+  cxt.relation = stmt->base.relation;
+  cxt.inhRelations = stmt->base.inhRelations;
+  cxt.isaddpart = stmt->base.is_add_part;
+  cxt.iswritable = stmt->iswritable;
+  cxt.exttypedesc = stmt->exttypedesc;
+  cxt.format = stmt->format;
+  cxt.parentPath = stmt->parentPath;
+  cxt.hasoids = false;
+  cxt.isalter = false;
+  cxt.columns = NIL;
+  cxt.ckconstraints = NIL;
+  cxt.fkconstraints = NIL;
+  cxt.ixconstraints = NIL;
+  cxt.inh_indexes = NIL;
+  cxt.pkey = NULL;
+
+  cxt.blist = NIL;
+  cxt.alist = NIL;
+
+  /*
+   * Build type description of internal table in pluggable storage
+   * framework based on format
+   */
+  desc = (ExtTableTypeDesc *)(stmt->exttypedesc);
+  if (desc->exttabletype == EXTTBL_TYPE_UNKNOWN) {
+    if (stmt->format == NULL) {
+      ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+                      errmsg("Internal table must have format specification"),
+                      errhint("Use CREATE TABLE FORMAT instead"),
+                      errOmitLocation(true)));
+    }
+
+    /* orc, text, csv on hdfs */
+    else if (pg_strncasecmp(stmt->format, "orc", strlen("orc")) == 0 ||
+             pg_strncasecmp(stmt->format, "text", strlen("text")) == 0 ||
+             pg_strncasecmp(stmt->format, "csv", strlen("csv")) == 0) {
+      desc->exttabletype = EXTTBL_TYPE_LOCATION;
+      desc->location_list = NIL;
+      // desc->location_list = list_make1((Node *) makeString(PROTOCOL_HDFS));
+      desc->command_string = NULL;
+      desc->on_clause = NIL;
+    } else {
+      ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+                      errmsg("Format \"%s\" for internal table is invalid",
+                             stmt->format)));
+    }
+    isPluggableStorage = true;
+  }
+
+  if (desc->exttabletype == EXTTBL_TYPE_LOCATION) {
+
+    ListCell *loc_cell = list_head(desc->location_list);
+    if (loc_cell == NIL) {
+      if (pg_strncasecmp(stmt->format, "orc", strlen("orc")) &&
+          pg_strncasecmp(stmt->format, "text", strlen("text")) &&
+          pg_strncasecmp(stmt->format, "csv", strlen("csv"))) {
+        ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg(
+                            "Internal table on hdfs must be \'orc\', "
+                            "\'text\', or \'csv\' format")));
+      }
+      isPluggableStorage = true;
+    } else {
+      Value *loc_val = lfirst(loc_cell);
+      char *loc_str = pstrdup(loc_val->val.str);
+      bool is_hdfs_protocol = IS_HDFS_URI(loc_str);
+      isPluggableStorage = is_hdfs_protocol;
+
+
+      if (is_hdfs_protocol &&
+          (pg_strncasecmp(stmt->format, "orc", strlen("orc")) &&
+           pg_strncasecmp(stmt->format, "text", strlen("text")) &&
+           pg_strncasecmp(stmt->format, "csv", strlen("csv")))) {
+        ereport(
+            ERROR,
+            (errcode(ERRCODE_SYNTAX_ERROR),
+             errmsg(
+                 "LOCATION using hdfs url \'%s\' does not "
+                 "support \'%s\' format",
+                 loc_str, stmt->format),
+             errhint("Use \"FORMAT \'orc\', \'text\', or \'csv\'\" instead"),
+             errOmitLocation(true)));
+      }
+    }
+  }
+
+  // handle error table for text/csv pluggable storage
+  if (stmt->sreh && isPluggableStorage &&
+      (strcasecmp(stmt->format, "text") == 0 ||
+       strcasecmp(stmt->format, "csv") == 0)) {
+    SingleRowErrorDesc *errDesc = (SingleRowErrorDesc *)stmt->sreh;
+
+    if (!errDesc->is_limit_in_rows) {
+      ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+                      errmsg(
+                          "Single row error handling with percentage limit is "
+                          "not accepted for pluggable storage")));
+    }
+
+    errDesc->is_hdfs_protocol_text = true;
+    if (errDesc->errtable) {
+      errDesc->hdfsLoc = (char *)palloc0(MAXPGPATH);
+      char *fileSpacePath = NULL;
+      GetFilespacePathForTablespace(get_database_dts(MyDatabaseId),
+                                    &fileSpacePath);
+   /*   uuid_t uuid;
+      char buf[1024];
+      uuid_generate(uuid);
+      uuid_unparse(uuid, buf);
+      sprintf(errDesc->hdfsLoc, "%s/ExtErrTbl/%s", fileSpacePath, buf);*/
+    }
+  }
+
+  // Only on top-most partitioned tables
+  if (stmt->base.partitionBy && !stmt->base.is_part_child) {
+    if (isPluggableStorage)
+      fixCreateStmtForPartitionedTable(&stmt->base);
+    else
+      elog(ERROR,
+           "Partition external table only supported for pluggable storage");
+  }
+
+  /*
+   * Run through each primary element in the table creation clause. Separate
+   * column defs from constraints, and do preliminary analysis.
+   */
+  foreach (elements, stmt->base.tableElts) {
+    Node *element = lfirst(elements);
+
+    switch (nodeTag(element)) {
+      case T_ColumnDef:
+        transformColumnDefinition(pstate, &cxt, (ColumnDef *)element);
+        break;
+
+      case T_Constraint:
+        transformExtTableConstraint(pstate, &cxt, (Constraint *)element);
+        break;
+
+      case T_FkConstraint:
+        /* should never happen. If it does fix gram.y */
+        elog(ERROR, "node type %d not supported for external tables",
+             (int)nodeTag(element));
+        break;
+
+      case T_InhRelation: {
+        /* LIKE */
+        bool isBeginning = (cxt.columns == NIL);
+
+        transformInhRelation(pstate, &cxt, (InhRelation *)element,
+                             !isPluggableStorage);
+
+        if (Gp_role == GP_ROLE_DISPATCH && isBeginning &&
+            stmt->base.distributedBy == NIL && stmt->policy == NULL &&
+            iswritable /* dont bother if readable table */) {
+          likeDistributedBy = getLikeDistributionPolicy((InhRelation *)element);
+        }
+      } break;
+
+      default:
+        elog(ERROR, "unrecognized node type: %d", (int)nodeTag(element));
+        break;
+    }
+  }
+
+  /*
+   * transformIndexConstraints wants cxt.alist to contain only index
+   * statements, so transfer anything we already have into extras_after
+   * immediately.
+   */
+  *extras_after = list_concat(cxt.alist, *extras_after);
+  cxt.alist = NIL;
+
+  /*
+   * Postprocess constraints that give rise to index definitions.
+   */
+  transformIndexConstraints(pstate, &cxt, false);
+
+  /*
+   * Check if this is an EXECUTE ON MASTER table. We'll need this information
+   * in transformExternalDistributedBy. While at it, we also check if an error
+   * table is attempted to be used on ON MASTER table and error if so.
+   */
+  if (!iswritable) {
+    desc = (ExtTableTypeDesc *)stmt->exttypedesc;
+
+    if (desc->exttabletype == EXTTBL_TYPE_EXECUTE) {
+      ListCell *exec_location_opt;
+
+      foreach (exec_location_opt, desc->on_clause) {
+        DefElem *defel = (DefElem *)lfirst(exec_location_opt);
+
+        if (strcmp(defel->defname, "master") == 0) {
+          SingleRowErrorDesc *srehDesc = (SingleRowErrorDesc *)stmt->sreh;
+
+          onmaster = true;
+
+          if (srehDesc && srehDesc->errtable)
+            ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                            errmsg(
+                                "External web table with ON MASTER clause "
+                                "cannot use error tables.")));
+        }
+      }
+    }
+  }
+
+  /*
+   * Check if we need to create an error table. If so, add it to the
+   * before list.
+   */
+  if (stmt->sreh && ((SingleRowErrorDesc *)stmt->sreh)->errtable)
+    transformSingleRowErrorHandling(pstate, &cxt,
+                                    (SingleRowErrorDesc *)stmt->sreh);
+
+  transformETDistributedBy(pstate, &cxt, stmt->base.distributedBy,
+                           &stmt->policy, NULL, /*no WITH options for ET*/
+                           likeDistributedBy, bQuiet, iswritable, onmaster);
+
+  // Process table partitioning clause
+  if (isPluggableStorage)
+    transformPartitionBy(pstate, &cxt, &stmt->base, stmt->base.partitionBy,
+                         stmt->policy);
+
+  /*
+   * Output results.
+   */
+  q = makeNode(Query);
+  q->commandType = CMD_UTILITY;
+  q->utilityStmt = (Node *)stmt;
+  stmt->base.tableElts = cxt.columns;
+  stmt->base.constraints = cxt.ckconstraints;
+  stmt->pkey = cxt.pkey;
+  *extras_before = list_concat(*extras_before, cxt.blist);
+  *extras_after = list_concat(cxt.alist, *extras_after);
+
+  return q;
 }
 
 static Query *
@@ -2314,8 +2842,9 @@
 	ListCell   *elements;
 
 	cxt.stmtType = "CREATE FOREIGN TABLE";
-	cxt.relation = stmt->relation;
-	cxt.inhRelations = NIL;
+	cxt.isExternalTable = false;
+    cxt.relation = stmt->relation;
+    cxt.inhRelations = NIL;
 	cxt.hasoids = false;
 	cxt.isalter = false;
 	cxt.columns = NIL;
@@ -2572,9 +3101,9 @@
 							(errcode(ERRCODE_SYNTAX_ERROR),
 							 errmsg("multiple default values specified for column \"%s\" of table \"%s\"",
 								  column->colname, cxt->relation->relname)));
-				/* 
-				 * Note: DEFAULT NULL maps to constraint->raw_expr == NULL 
-				 * 
+				/*
+				 * Note: DEFAULT NULL maps to constraint->raw_expr == NULL
+				 *
 				 * We lose the knowledge that the user specified DEFAULT NULL at
 				 * this point, so we record it in default_is_null
 				 */
@@ -2643,7 +3172,23 @@
 	}
 }
 
+static void transformExtTableConstraint(ParseState *pstate,
+                                        CreateStmtContext *cxt,
+                                        Constraint *constraint) {
+  switch (constraint->contype) {
+    case CONSTR_PRIMARY:
+      cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
+      break;
 
+    case CONSTR_CHECK:
+      cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
+      break;
+
+    default:
+      elog(ERROR, "unrecognized constraint type: %d", constraint->contype);
+      break;
+  }
+}
 
 /*
  * transformETDistributedBy - transform DISTRIBUTED BY clause for
@@ -2658,75 +3203,63 @@
  * this is an EXECUTE table with ON MASTER specified, in which case
  * we create no policy so that the master will be accessed.
  */
-static void
-transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
-						 List *distributedBy, GpPolicy **policyp, List *options,
-						 List *likeDistributedBy,
-						 bool bQuiet,
-						 bool iswritable,
-						 bool onmaster)
-{
-	int			maxattrs = 200;
-	GpPolicy*	p = NULL;
+static void transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
+                                     List *distributedBy, GpPolicy **policyp,
+                                     List *options, List *likeDistributedBy,
+                                     bool bQuiet, bool iswritable,
+                                     bool onmaster) {
+  int maxattrs = 200;
+  GpPolicy *p = NULL;
 
-	/*
-	 * utility mode creates can't have a policy.  Only the QD can have policies
-	 */
-	if (Gp_role != GP_ROLE_DISPATCH)
-	{
-		*policyp = NULL;
-		return;
-	}
+  /*
+   * utility mode creates can't have a policy.  Only the QD can have policies
+   */
+  if (Gp_role != GP_ROLE_DISPATCH) {
+    *policyp = NULL;
+    return;
+  }
 
-	if(!iswritable && list_length(distributedBy) > 0)
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
-				 errmsg("Readable external tables can\'t specify a DISTRIBUTED BY clause.")));
+  if (!iswritable && list_length(distributedBy) > 0)
+    ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+                    errmsg(
+                        "Readable external tables can\'t specify a DISTRIBUTED "
+                        "BY clause.")));
 
-	if(iswritable)
-	{
-		/* WET */
+  if (iswritable) {
+    /* WET */
 
-		if(distributedBy == NIL && likeDistributedBy == NIL)
-		{
-			/* defaults to DISTRIBUTED RANDOMLY */
-			p = (GpPolicy *) palloc(sizeof(GpPolicy) + maxattrs *
-										 sizeof(p->attrs[0]));
-			p->ptype = POLICYTYPE_PARTITIONED;
-			p->nattrs = 0;
-			p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum());
-			p->attrs[0] = 1;
+    if (distributedBy == NIL && likeDistributedBy == NIL) {
+      /* defaults to DISTRIBUTED RANDOMLY */
+      p = (GpPolicy *)palloc(sizeof(GpPolicy) + maxattrs * sizeof(p->attrs[0]));
+      p->ptype = POLICYTYPE_PARTITIONED;
+      p->nattrs = 0;
+      p->bucketnum = GetRelOpt_bucket_num_fromOptions(
+          options, GetExternalTablePartitionNum());
+      p->attrs[0] = 1;
 
-			*policyp = p;
-		}
-		else
-		{
-			/* regular DISTRIBUTED BY transformation */
-			transformDistributedBy(pstate, cxt, distributedBy, policyp, options,
-								   likeDistributedBy, bQuiet);
-		}
-	}
-	else
-	{
-		/* RET */
+      *policyp = p;
+    } else {
+      /* regular DISTRIBUTED BY transformation */
+      transformDistributedBy(pstate, cxt, distributedBy, policyp, options,
+                             likeDistributedBy, bQuiet);
+    }
+  } else {
+    /* RET */
 
-		if(onmaster)
-		{
-			p = NULL;
-		}
-		else
-		{
-			/* defaults to DISTRIBUTED RANDOMLY */
-			p = (GpPolicy *) palloc(sizeof(GpPolicy) + maxattrs *
-										 sizeof(p->attrs[0]));
-			p->ptype = POLICYTYPE_PARTITIONED;
-			p->nattrs = 0;
-			p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum());
-			p->attrs[0] = 1;
-		}
+    if (onmaster) {
+      p = NULL;
+    } else {
+      /* defaults to DISTRIBUTED RANDOMLY */
+      p = (GpPolicy *)palloc(sizeof(GpPolicy) + maxattrs * sizeof(p->attrs[0]));
+      p->ptype = POLICYTYPE_PARTITIONED;
+      p->nattrs = 0;
+      p->bucketnum = GetRelOpt_bucket_num_fromOptions(
+          options, GetExternalTablePartitionNum());
+      p->attrs[0] = 1;
+    }
 
-		*policyp = p;
-	}
+    *policyp = p;
+  }
 }
 
 /****************stmt->policy*********************/
@@ -3424,7 +3957,7 @@
 	char				 newVals[10000];
 
 	{
-		List			*coldefs = stmt->tableElts;
+		List			*coldefs = stmt->base.tableElts;
 		ListCell		*lc      = NULL;
 		StringInfoData   sid;
 		int colcnt = 0;
@@ -3583,7 +4116,7 @@
 
 	if (1)
 	{
-		List			*coldefs = stmt->tableElts;
+		List			*coldefs = stmt->base.tableElts;
 		ListCell		*lc      = NULL;
 		List *vl1 = NULL;
 
@@ -6007,7 +6540,7 @@
 				foreach(lc2, already)
 				{
 					List *item = lfirst(lc2);
-					
+
 					Assert( IsA(item, List) && list_length(item) == nvals );
 
 					/*
@@ -6132,11 +6665,11 @@
 	ListCell *lc;
 	AlterPartitionCmd *pc;
 
-	/* 
+	/*
 	 * First of all, we shouldn't proceed if this partition isn't AOCO
 	 */
 
-	/* 
+	/*
 	 * Yes, I am as surprised as you are that this is how we represent the WITH
 	 * clause here.
 	 */
@@ -6153,7 +6686,7 @@
 			return; /* nothing more to do */
 	}
 
-	/* 
+	/*
 	 * If the specific partition has no specific column encoding, just
 	 * set it to the partition level default and we're done.
 	 */
@@ -6162,7 +6695,7 @@
 		elem->colencs = penc;
 		return;
 	}
-	
+
 	/*
 	 * Fixup the actual column encoding clauses for this specific partition
 	 * element.
@@ -7299,11 +7832,11 @@
 	if (!stenc)
 		return;
 
-	/* 
+	/*
 	 * First, split the table elements into column reference storage directives
 	 * and everything else.
 	 */
-	foreach(lc, cs->tableElts)
+	foreach(lc, cs->base.tableElts)
 	{
 		Node *n = lfirst(lc);
 
@@ -7335,7 +7868,7 @@
 		foreach(lc2, finalencs)
 		{
 			ColumnReferenceStorageDirective *f = lfirst(lc2);
- 
+
 			if (f->deflt)
 				continue;
 
@@ -7379,7 +7912,7 @@
 		}
 	}
 
-	cs->tableElts = list_concat(others, finalencs);
+	cs->base.tableElts = list_concat(others, finalencs);
 }
 
 static void
@@ -7403,9 +7936,9 @@
 	child_tab_name->relname = relname;
     child_tab_name->location = -1;
 
-	child_tab_stmt->relation = child_tab_name;
-	child_tab_stmt->is_part_child = true;
-	child_tab_stmt->is_add_part = stmt->is_add_part;
+	child_tab_stmt->base.relation = child_tab_name;
+	child_tab_stmt->base.is_part_child = true;
+	child_tab_stmt->base.is_add_part = stmt->base.is_add_part;
 
 	if (!bQuiet)
 		ereport(NOTICE,
@@ -7415,7 +7948,7 @@
 						cxt->relation->relname)));
 
 	/* set the "Post Create" rule if it exists */
-	child_tab_stmt->postCreate = pPostCreate;
+	child_tab_stmt->base.postCreate = pPostCreate;
 
 	/*
 	 * Deep copy the parent's table elements.
@@ -7430,7 +7963,7 @@
 	 * user-specified constraint names, so we don't do one here
 	 * any more.
 	 */
-	child_tab_stmt->tableElts = copyObject(stmt->tableElts);
+	child_tab_stmt->base.tableElts = copyObject(stmt->base.tableElts);
 
 	merge_part_column_encodings(child_tab_stmt, stenc);
 
@@ -7438,8 +7971,8 @@
 	if (pConstraint && ((enable_partition_rules &&
 						 curPby->partType == PARTTYP_HASH) ||
 						 curPby->partType != PARTTYP_HASH))
-		child_tab_stmt->tableElts =
-				lappend(child_tab_stmt->tableElts,
+		child_tab_stmt->base.tableElts =
+				lappend(child_tab_stmt->base.tableElts,
 						pConstraint);
 
 	/*
@@ -7449,10 +7982,10 @@
 	 * the create child table
 	 */
 	/*child_tab_stmt->inhRelations = list_make1(parent_tab_name); */
-	child_tab_stmt->inhRelations = list_copy(stmt->inhRelations);
+	child_tab_stmt->base.inhRelations = list_copy(stmt->base.inhRelations);
 
-	child_tab_stmt->constraints = copyObject(stmt->constraints);
-	child_tab_stmt->options = stmt->options;
+	child_tab_stmt->base.constraints = copyObject(stmt->base.constraints);
+	child_tab_stmt->base.options = stmt->base.options;
 
 	/* allow WITH clause for appendonly tables */
 	if ( pStoreAttr )
@@ -7461,24 +7994,24 @@
 
 		/* Options */
 		if ( psa_apc->arg1 )
-			child_tab_stmt->options = (List *)psa_apc->arg1;
+			child_tab_stmt->base.options = (List *)psa_apc->arg1;
 		/* Tablespace from parent (input CreateStmt)... */
 		if ( psa_apc->arg2 && *strVal(psa_apc->arg2) )
-			child_tab_stmt->tablespacename = strVal(psa_apc->arg2);
+			child_tab_stmt->base.tablespacename = strVal(psa_apc->arg2);
 	}
 	/* ...or tablespace from root. */
-	if ( !child_tab_stmt->tablespacename && stmt->tablespacename )
-		child_tab_stmt->tablespacename = stmt->tablespacename;
+	if ( !child_tab_stmt->base.tablespacename && stmt->base.tablespacename )
+		child_tab_stmt->base.tablespacename = stmt->base.tablespacename;
 
-	child_tab_stmt->oncommit = stmt->oncommit;
-	child_tab_stmt->distributedBy = stmt->distributedBy;
+	child_tab_stmt->base.oncommit = stmt->base.oncommit;
+	child_tab_stmt->base.distributedBy = stmt->base.distributedBy;
 
 	/* use the newSub as the partitionBy if the current
 	 * partition elem had an inline subpartition declaration
 	 */
-	child_tab_stmt->partitionBy = (Node *)newSub;
+	child_tab_stmt->base.partitionBy = (Node *)newSub;
 
-	child_tab_stmt->relKind = RELKIND_RELATION;
+	child_tab_stmt->base.relKind = RELKIND_RELATION;
 
 	/*
 	 * Adjust tablespace name for the CREATE TABLE via ADD PARTITION. (MPP-8047)
@@ -7489,18 +8022,18 @@
 	 * Ultimately, we take the tablespace as specified in the command, or, if none
 	 * was specified, the one from the root paritioned table.
 	 */
-	if ( ! child_tab_stmt->tablespacename )
+	if ( ! child_tab_stmt->base.tablespacename )
 	{
 		Oid poid = RangeVarGetRelid(cxt->relation, true, false /*allowHcatalog*/); /* parent branch */
 
 		if ( ! poid )
 		{
-			poid = RangeVarGetRelid(stmt->relation, true, false /*alloweHcatalog*/); /* whole partitioned table */
+			poid = RangeVarGetRelid(stmt->base.relation, true, false /*alloweHcatalog*/); /* whole partitioned table */
 		}
 		if ( poid )
 		{
 			Relation prel = RelationIdGetRelation(poid);
-			child_tab_stmt->tablespacename = get_tablespace_name(prel->rd_rel->reltablespace);
+			child_tab_stmt->base.tablespacename = get_tablespace_name(prel->rd_rel->reltablespace);
 			RelationClose(prel);
 		}
 	}
@@ -7732,7 +8265,7 @@
 		at_depth = at_buf;
 	}
 	else
-		pBy->parentRel = copyObject(stmt->relation);
+		pBy->parentRel = copyObject(stmt->base.relation);
 
 	/* set the depth for the immediate subpartition */
 	if (pBy->subPart)
@@ -8408,8 +8941,8 @@
 	if ((pBy->partDepth > 0) && (pBy->bKeepMe != true))
 	{
 		/* we don't need this any more */
-		stmt->partitionBy = NULL;
-		stmt->is_part_child = true;
+		stmt->base.partitionBy = NULL;
+		stmt->base.is_part_child = true;
 	}
 
 } /* end transformPartitionBy */
@@ -8589,7 +9122,7 @@
 				partrel = heap_open(PartitionRuleRelationId, AccessShareLock);
 
 				tuple = caql_getfirst(
-						caql_addrel(cqclr(&cqc), partrel), 
+						caql_addrel(cqclr(&cqc), partrel),
 						cql("SELECT * FROM pg_partition_rule "
 							" WHERE parchildrelid = :1 ",
 							ObjectIdGetDatum(relid)));
@@ -8607,7 +9140,7 @@
 				partrel = heap_open(PartitionRelationId, AccessShareLock);
 
 				tuple = caql_getfirst(
-						caql_addrel(cqclr(&cqc), partrel), 
+						caql_addrel(cqclr(&cqc), partrel),
 						cql("SELECT parlevel FROM pg_partition "
 							" WHERE oid = :1 ",
 							ObjectIdGetDatum(paroid)));
@@ -11015,7 +11548,7 @@
 			if (atc1->subtype != AT_PartAlter)
 			{
 				rv = makeRangeVar(
-						NULL /*catalogname*/, 
+						NULL /*catalogname*/,
 						get_namespace_name(
 								RelationGetNamespace(rel)),
 						pstrdup(RelationGetRelationName(rel)), -1);
@@ -11107,7 +11640,7 @@
 				 * the new partition is LIKE the parent and it
 				 * inherits from it
 				 */
-				ct->tableElts = lappend(ct->tableElts, inh);
+				ct->base.tableElts = lappend(ct->base.tableElts, inh);
 
 				cl = list_make1(ct);
 
@@ -11154,11 +11687,12 @@
 	bool		skipValidation = true;
 	AlterTableCmd *newcmd;
 
-	cxt.stmtType = "ALTER TABLE";
+    cxt.stmtType = "ALTER TABLE";
+	cxt.isExternalTable = false;
 	cxt.relation = stmt->relation;
 	cxt.inhRelations = NIL;
 	cxt.isalter = true;
-	cxt.hasoids = false;		/* need not be right */
+	cxt.hasoids = false; /* need not be right */
 	cxt.columns = NIL;
 	cxt.ckconstraints = NIL;
 	cxt.fkconstraints = NIL;
@@ -11433,7 +11967,7 @@
  * - has no LIMIT/OFFSET
  * - references only one range table (i.e. no joins, self-joins)
  *   - this range table must itself be updatable
- *	
+ *
  */
 static bool
 isSimplyUpdatableQuery(Query *query)
@@ -12037,7 +12571,7 @@
 				{
 					CreateStmt *elp = (CreateStmt *) element;
 
-					setSchemaName(cxt.schemaname, &elp->relation->schemaname);
+					setSchemaName(cxt.schemaname, &elp->base.relation->schemaname);
 
 					/*
 					 * XXX todo: deal with constraints
@@ -12050,7 +12584,7 @@
 				{
 					CreateExternalStmt *elp = (CreateExternalStmt *) element;
 
-					setSchemaName(cxt.schemaname, &elp->relation->schemaname);
+					setSchemaName(cxt.schemaname, &elp->base.relation->schemaname);
 
 					cxt.tables = lappend(cxt.tables, element);
 				}
@@ -12395,17 +12929,17 @@
 			attrList = lappend(attrList, coldef);
 		}
 
-		createStmt->relation = sreh->errtable;
-		createStmt->tableElts = attrList;
-		createStmt->inhRelations = NIL;
-		createStmt->constraints = NIL;
-		createStmt->options = list_make2(makeDefElem("errortable", (Node *) makeString("true")),
+		createStmt->base.relation = sreh->errtable;
+		createStmt->base.tableElts = attrList;
+		createStmt->base.inhRelations = NIL;
+		createStmt->base.constraints = NIL;
+		createStmt->base.options = list_make2(makeDefElem("errortable", (Node *) makeString("true")),
 				makeDefElem("appendonly", (Node *) makeString("true")));
-		createStmt->oncommit = ONCOMMIT_NOOP;
-		createStmt->tablespacename = NULL;
-		createStmt->relKind = RELKIND_RELATION;
+		createStmt->base.oncommit = ONCOMMIT_NOOP;
+		createStmt->base.tablespacename = NULL;
+		createStmt->base.relKind = RELKIND_RELATION;
 		createStmt->relStorage = RELSTORAGE_AOROWS;
-		createStmt->distributedBy = list_make1(NULL); /* DISTRIBUTED RANDOMLY */
+		createStmt->base.distributedBy = list_make1(NULL); /* DISTRIBUTED RANDOMLY */
 
 
 		cxt->blist = lappend(cxt->blist, createStmt);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index dc0e13b..b443bca 100755
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -242,10 +242,10 @@
 %type <dbehavior>	opt_drop_behavior
 
 %type <list>	createdb_opt_list alterdb_opt_list copy_opt_list
-				ext_on_clause_list format_opt format_opt_list format_def_list transaction_mode_list
+				ext_on_clause_list format_opt format_opt_list transaction_mode_list
 				ext_opt_encoding_list
 %type <defelt>	createdb_opt_item alterdb_opt_item copy_opt_item
-				ext_on_clause_item format_opt_item format_def_item transaction_mode_item
+				ext_on_clause_item format_opt_item transaction_mode_item
 				ext_opt_encoding_item
 
 %type <ival>	opt_lock lock_type cast_context
@@ -301,7 +301,7 @@
 				aggr_args aggr_args_list old_aggr_definition old_aggr_list
 				oper_argtypes RuleActionList RuleActionMulti
 				cdb_string_list
-				opt_column_list columnList opt_name_list exttab_auth_list keyvalue_list
+				opt_column_list columnList columnListPlus opt_name_list exttab_auth_list keyvalue_list
 				opt_inherited_column_list
 				sort_clause opt_sort_clause sortby_list index_params
 				name_list from_clause from_list opt_array_bounds
@@ -431,7 +431,7 @@
 %type <node>	var_value zone_value
 
 %type <keyword> unreserved_keyword func_name_keyword
-%type <keyword> col_name_keyword reserved_keyword
+%type <keyword> col_name_keyword reserved_keyword format_opt_keyword
 %type <keyword> keywords_ok_in_alias_no_as
 
 %type <node>	TableConstraint TableLikeClause 
@@ -3310,24 +3310,25 @@
 			OptTabPartitionBy
 				{
 					CreateStmt *n = makeNode(CreateStmt);
+					
 					$4->istemp = $2;
-					n->relation = $4;
-					n->tableElts = $6;
-					n->inhRelations = $8;
-					n->constraints = NIL;
-					n->options = $9;
-					n->oncommit = $10;
-					n->tablespacename = $11;
-					n->distributedBy = $12;
-					n->partitionBy = $13;
+					n->base.relation = $4;
+					n->base.tableElts = $6;
+					n->base.inhRelations = $8;
+					n->base.constraints = NIL;
+					n->base.options = $9;
+					n->base.oncommit = $10;
+					n->base.tablespacename = $11;
+					n->base.distributedBy = $12;
+					n->base.partitionBy = $13;
 					n->oidInfo.relOid = 0;
 					n->oidInfo.comptypeOid = 0;
 					n->oidInfo.toastOid = 0;
 					n->oidInfo.toastIndexOid = 0;
 					n->oidInfo.toastComptypeOid = 0;
-					n->relKind = RELKIND_RELATION;
+					n->base.relKind = RELKIND_RELATION;
 					n->policy = 0;
-					n->postCreate = NULL;
+					n->base.postCreate = NULL;
 					
 					$$ = (Node *)n;
 				}
@@ -3339,23 +3340,23 @@
 					 */
 					CreateStmt *n = makeNode(CreateStmt);
 					$4->istemp = $2;
-					n->relation = $4;
-					n->tableElts = $8;
-					n->inhRelations = list_make1($6);
-					n->constraints = NIL;
-					n->options = $10;
-					n->oncommit = $11;
-					n->tablespacename = $12;
-					n->distributedBy = $13;
-					n->partitionBy = $14;
+					n->base.relation = $4;
+					n->base.tableElts = $8;
+					n->base.inhRelations = list_make1($6);
+					n->base.constraints = NIL;
+					n->base.options = $10;
+					n->base.oncommit = $11;
+					n->base.tablespacename = $12;
+					n->base.distributedBy = $13;
+					n->base.partitionBy = $14;
 					n->oidInfo.relOid = 0;
 					n->oidInfo.comptypeOid = 0;
 					n->oidInfo.toastOid = 0;
 					n->oidInfo.toastIndexOid = 0;
 					n->oidInfo.toastComptypeOid = 0;
-					n->relKind = RELKIND_RELATION;
+					n->base.relKind = RELKIND_RELATION;
 					n->policy = 0;
-                    n->postCreate = NULL;
+                    n->base.postCreate = NULL;
 					
 					$$ = (Node *)n;
 				}
@@ -3754,6 +3755,17 @@
 			columnElem								{ $$ = list_make1($1); }
 			| columnList ',' columnElem				{ $$ = lappend($1, $3); }
 		;
+columnListPlus:
+			columnElem ',' columnElem	
+			{
+				$$ = list_make1($1);
+				$$ = lappend($$, $3);
+			}
+			| columnListPlus ',' columnElem
+			{
+				$$ = lappend($1, $3);
+			}
+		;
 
 columnElem: ColId
 				{
@@ -4584,16 +4596,17 @@
 						{
 							CreateExternalStmt *n = makeNode(CreateExternalStmt);
 							n->iswritable = $2;
+							n->isexternal = TRUE;
 							n->isweb = $4;
 							$7->istemp = $5;
-							n->relation = $7;
-							n->tableElts = $9;
+							n->base.relation = $7;
+							n->base.tableElts = $9;
 							n->exttypedesc = $11;
 							n->format = $13;
-							n->formatOpts = $14;
+							n->base.options = $14;
 							n->encoding = $15;
 							n->sreh = $16;
-							n->distributedBy = $17;
+							n->base.distributedBy = $17;
 							n->policy = 0;
 							
 							/* various syntax checks for EXECUTE external table */
@@ -4694,13 +4707,12 @@
 
 format_opt: 
 			  '(' format_opt_list ')'			{ $$ = $2; }
-			| '(' format_def_list ')'			{ $$ = $2; }
 			| '(' ')'							{ $$ = NIL; }
 			| /*EMPTY*/							{ $$ = NIL; }
 			;
 
 format_opt_list:
-			format_opt_item		
+			format_opt_item	
 			{ 
 				$$ = list_make1($1);
 			}
@@ -4710,67 +4722,43 @@
 			}
 			;
 
-format_def_list:
-			format_def_item		
-			{ 
-				$$ = list_make1($1);
-			} 
-			| format_def_list ',' format_def_item
-			{
-				$$ = lappend($1, $3);
-			}
-
-format_def_item:
-    		ColLabel '=' def_arg
-			{
-				$$ = makeDefElem($1, $3);
-			}
-			| ColLabel '=' '(' columnList ')'
-			{
-				$$ = makeDefElem($1, (Node *) $4);
-			}
+format_opt_keyword:
+			  AS
+			| DELIMITER
+			| NULL_P
+			| CSV
+			| HEADER_P
+			| QUOTE
+			| ESCAPE
+			| FORCE
+			| NOT
+			| FILL
+			| MISSING
+			| FIELDS
+			| NEWLINE
+			;
 
 
 format_opt_item:
-			DELIMITER opt_as Sconst
+			IDENT
 			{
-				$$ = makeDefElem("delimiter", (Node *)makeString($3));
+				$$ = makeDefElem("#ident", (Node *)makeString($1));
 			}
-			| NULL_P opt_as Sconst
+			| Sconst
 			{
-				$$ = makeDefElem("null", (Node *)makeString($3));
+				$$ = makeDefElem("#string", (Node *)makeString($1));
 			}
-			| CSV
+			| SignedIconst
 			{
-				$$ = makeDefElem("csv", (Node *)makeInteger(TRUE));
+				$$ = makeDefElem("#int", (Node *)makeInteger($1));
 			}
-			| HEADER_P
+			| format_opt_keyword
 			{
-				$$ = makeDefElem("header", (Node *)makeInteger(TRUE));
+				$$ = makeDefElem("#ident", (Node *)makeString($1));
 			}
-			| QUOTE opt_as Sconst
+			| columnListPlus
 			{
-				$$ = makeDefElem("quote", (Node *)makeString($3));
-			}
-			| ESCAPE opt_as Sconst
-			{
-				$$ = makeDefElem("escape", (Node *)makeString($3));
-			}
-			| FORCE NOT NULL_P columnList
-			{
-				$$ = makeDefElem("force_notnull", (Node *)$4);
-			}
-			| FORCE QUOTE columnList
-			{
-				$$ = makeDefElem("force_quote", (Node *)$3);
-			}
-			| FILL MISSING FIELDS
-			{
-				$$ = makeDefElem("fill_missing_fields", (Node *)makeInteger(TRUE));
-			}
-			| NEWLINE opt_as Sconst
-			{
-				$$ = makeDefElem("newline", (Node *)makeString($3));
+				$$ = makeDefElem("#collist", (Node *)$1);
 			}
 			;
 
@@ -13258,28 +13246,28 @@
 	CreateStmt        *ct        = makeNode(CreateStmt);
     PartitionBy       *pBy       = NULL;
 
-    ct->relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
+    ct->base.relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
 
     /* in analyze.c, fill in tableelts with a list of inhrelation of
        the partition parent table, and fill in inhrelations with copy
        of rangevar for parent table */
 
-    ct->tableElts    = NIL; /* fill in later */
-    ct->inhRelations = NIL; /* fill in later */
+    ct->base.tableElts    = NIL; /* fill in later */
+    ct->base.inhRelations = NIL; /* fill in later */
 
-    ct->constraints = NIL;
+    ct->base.constraints = NIL;
 
     if (pc_StAttr)
-        ct->options = (List *)pc_StAttr->arg1;
+        ct->base.options = (List *)pc_StAttr->arg1;
     else
-        ct->options = NIL;
+        ct->base.options = NIL;
 
-    ct->oncommit = ONCOMMIT_NOOP;
+    ct->base.oncommit = ONCOMMIT_NOOP;
         
     if (pc_StAttr && pc_StAttr->arg2)
-        ct->tablespacename = strVal(pc_StAttr->arg2);
+        ct->base.tablespacename = strVal(pc_StAttr->arg2);
     else
-        ct->tablespacename = NULL;
+        ct->base.tablespacename = NULL;
 
     if (subSpec) /* treat subspec as partition by... */
 	{
@@ -13290,19 +13278,19 @@
 		pBy->partQuiet = PART_VERBO_NODISTRO;
         pBy->location  = -1;
         pBy->partDefault = NULL;
-        pBy->parentRel = copyObject(ct->relation);
+        pBy->parentRel = copyObject(ct->base.relation);
     }
 
-    ct->distributedBy = NULL;
-    ct->partitionBy = (Node *)pBy;
+    ct->base.distributedBy = NULL;
+    ct->base.partitionBy = (Node *)pBy;
     ct->oidInfo.relOid = 0;
     ct->oidInfo.comptypeOid = 0;
     ct->oidInfo.toastOid = 0;
     ct->oidInfo.toastIndexOid = 0;
     ct->oidInfo.toastComptypeOid = 0;
-    ct->relKind = RELKIND_RELATION;
+    ct->base.relKind = RELKIND_RELATION;
     ct->policy = 0;
-    ct->postCreate = NULL;
+    ct->base.postCreate = NULL;
 
     return (Node *)ct;
 }
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index af2d12c..bd6e2b6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,12 +15,15 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "port.h"
 
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "access/fileam.h"
 #include "catalog/catalog.h"
 #include "catalog/catquery.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_exttable.h"
 #include "catalog/toasting.h"
 #include "catalog/aoseg.h"
 #include "commands/alter.h"
@@ -50,6 +53,7 @@
 #include "commands/vacuum.h"
 #include "commands/view.h"
 #include "miscadmin.h"
+#include "nodes/value.h"
 #include "postmaster/checkpoint.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rewriteRemove.h"
@@ -61,6 +65,7 @@
 #include "utils/guc.h"
 #include "utils/syscache.h"
 #include "utils/lsyscache.h"
+#include "utils/uri.h"
 #include "lib/stringinfo.h"
 
 #include "cdb/cdbcat.h"
@@ -71,6 +76,7 @@
 #include "cdb/dispatcher.h"
 
 #include "resourcemanager/resqueuecommand.h"
+#include "catalog/pg_exttable.h"
 /*
  * Error-checking support for DROP commands
  */
@@ -270,10 +276,32 @@
 
 	classform = (Form_pg_class) GETSTRUCT(tuple);
 
-	if ((removeType == OBJECT_EXTTABLE && classform->relstorage != RELSTORAGE_EXTERNAL) ||
-		(removeType == OBJECT_FOREIGNTABLE && classform->relstorage != RELSTORAGE_FOREIGN) ||	
-		(removeType == OBJECT_TABLE && (classform->relstorage == RELSTORAGE_EXTERNAL || 
-										classform->relstorage == RELSTORAGE_FOREIGN)))
+	bool is_internal = false;
+	if (classform->relstorage == RELSTORAGE_EXTERNAL)
+	{
+		ExtTableEntry *entry = GetExtTableEntry(relOid);
+		List *entry_locations = entry->locations;
+		Assert(entry_locations);
+		ListCell *entry_location = list_head(entry_locations);
+		char *url = ((Value*)lfirst(entry_location))->val.str;
+		char *category = getExtTblCategoryInFmtOptsStr(entry->fmtopts);
+
+		if ((IS_HDFS_URI(url)) &&
+		    (category != NULL && pg_strncasecmp(category, "internal", strlen("internal")) == 0))
+		{
+			is_internal = true;
+		}
+
+		if (category)
+		{
+			pfree(category);
+		}
+	}
+
+	if ((removeType == OBJECT_EXTTABLE && (classform->relstorage != RELSTORAGE_EXTERNAL || is_internal)) ||
+		    (removeType == OBJECT_FOREIGNTABLE && classform->relstorage != RELSTORAGE_FOREIGN) ||
+		    (removeType == OBJECT_TABLE && (classform->relstorage == RELSTORAGE_EXTERNAL && (!is_internal) ||
+											classform->relstorage == RELSTORAGE_FOREIGN)))
 	{
 		/* we have a mismatch. format an error string and shoot */
 		
@@ -287,7 +315,7 @@
 		else
 			want_type = pstrdup("a base");
 
-		if (classform->relstorage == RELSTORAGE_EXTERNAL)
+		if (classform->relstorage == RELSTORAGE_EXTERNAL && !is_internal)
 			hint = pstrdup("Use DROP EXTERNAL TABLE to remove an external table");
 		else if (classform->relstorage == RELSTORAGE_FOREIGN)
 			hint = pstrdup("Use DROP FOREIGN TABLE to remove a foreign table");
@@ -447,7 +475,7 @@
 
 				createStmt = (CreateStmt *) parsetree;
 
-				if (createStmt->relation->istemp)
+				if (createStmt->base.relation->istemp)
 					return;		// Permit creation of TEMPORARY tables in read-only mode.
 
 				ereport(ERROR,
@@ -912,8 +940,8 @@
 				Assert (gp_upgrade_mode || Gp_role != GP_ROLE_EXECUTE);
 
 
-				relOid = DefineRelation((CreateStmt *) parsetree,
-										relKind, relStorage);
+				relOid = DefineRelation((CreateStmt *) parsetree, relKind,
+								                        relStorage, NonCustomFormatType);
 
 				/*
 				 * Let AlterTableCreateToastTable decide if this one needs a
@@ -936,13 +964,13 @@
 						((CreateStmt *) parsetree)->oidInfo.toastOid,
 						((CreateStmt *) parsetree)->oidInfo.toastIndexOid,
 					    &(((CreateStmt *) parsetree)->oidInfo.toastComptypeOid),
-						((CreateStmt *)parsetree)->is_part_child);
+						((CreateStmt *)parsetree)->base.is_part_child);
 
 					AlterTableCreateAoSegTableWithOid(relOid,
 							((CreateStmt *) parsetree)->oidInfo.aosegOid,
 							((CreateStmt *) parsetree)->oidInfo.aosegIndexOid,
 							&(((CreateStmt *) parsetree)->oidInfo.aosegComptypeOid),
-							((CreateStmt *) parsetree)->is_part_child);
+							((CreateStmt *) parsetree)->base.is_part_child);
 				}
 				CommandCounterIncrement();
 				/*
diff --git a/src/backend/utils/misc/uriparser.c b/src/backend/utils/misc/uriparser.c
index 5489c17..e94408d 100644
--- a/src/backend/utils/misc/uriparser.c
+++ b/src/backend/utils/misc/uriparser.c
@@ -78,6 +78,11 @@
 		uri->protocol = URI_GPFDISTS;
 		protocol_len = strlen(PROTOCOL_GPFDISTS);
 	}
+	else if (IS_HDFS_URI(uri_str))
+	{
+			uri->protocol = URI_HDFS;
+			protocol_len = strlen(PROTOCOL_HDFS);
+	}
 
 	else /* not recognized. treat it as a custom protocol */
 	{
@@ -200,7 +205,10 @@
 			}
 			else
 			{
-				uri->port = -1; /* no port was indicated. will use default if needed */
+				if (IS_HDFS_URI(uri_str)) /* means nameservice format */
+					uri->port = 0;
+				else
+					uri->port = -1; /* no port was indicated. will use default if needed */
 			}
 		}
 	}
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 777698a..5d4871d 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -104,10 +104,9 @@
                              ScanState *ss,
                              TupleTableSlot *slot);
 
-extern ExternalInsertDesc external_insert_init(Relation rel,
-                                               int errAosegno,
-                                               ExternalTableType formatterType,
-                                               char *formatterName);
+extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno,
+        int formatterType, char *formatterName, PlannedStmt* plannedstmt);
+
 extern Oid external_insert(ExternalInsertDesc extInsertDesc,
                            TupleTableSlot *tupTableSlot);
 extern void external_insert_finish(ExternalInsertDesc extInsertDesc);
@@ -115,6 +114,12 @@
 extern void AtAbort_ExtTables(void);
 char*	linenumber_atoi(char buffer[20],int64 linenumber);
 
+extern bool hasErrTblInFmtOpts(List *fmtOpts);
+extern char getExtTblFormatterTypeInFmtOpts(List *fmtOpts);
+extern void external_populate_formatter_actionmask(struct CopyStateData *pstate,
+												   FormatterData *formatter);
+
+extern char *getExtTblCategoryInFmtOptsStr(char *fmtStr);
 extern char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr);
 extern char *getExtTblFormatterTypeInFmtOptsList(List *fmtOpts);
 
diff --git a/src/include/access/formatter.h b/src/include/access/formatter.h
index f87afc2..7b46d3d 100644
--- a/src/include/access/formatter.h
+++ b/src/include/access/formatter.h
@@ -36,10 +36,19 @@
 typedef enum FmtNotification
 {
 	FMT_NONE,
+	FMT_DONE,
 	FMT_NEED_MORE_DATA
 
 } FmtNotification;
 
+typedef enum FmtActionMask
+{
+	FMT_UNSET = 0,
+	FMT_SET = 1,
+	FMT_NEEDEXTBUFF = 2,
+	FMT_WRITE_END = 4
+} FmtActionMask;
+
 /*
  * FormatterData is the node type that is passed as fmgr "context" info
  * when a function is called by the External Table Formatter manager.
@@ -49,6 +58,7 @@
 {
 	NodeTag			type;                 /* see T_FormatterData */
 
+	FmtActionMask	fmt_mask;
 	/* metadata */
 	Relation    	fmt_relation;
 	TupleDesc   	fmt_tupDesc;
diff --git a/src/include/access/plugstorage.h b/src/include/access/plugstorage.h
index 48c5fde..f904f81 100644
--- a/src/include/access/plugstorage.h
+++ b/src/include/access/plugstorage.h
@@ -45,6 +45,7 @@
 #include "executor/tuptable.h"
 
 /* From src/include/access/fileam.h */
+extern char *getExtTblCategoryInFmtOptsStr(char *fmtStr);
 extern char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr);
 extern char *getExtTblFormatterTypeInFmtOptsList(List *fmtOpts);
 
@@ -98,6 +99,7 @@
 	bool                   ps_has_tuple;
 	Oid                    ps_tuple_oid;
 	TupleTableSlot        *ps_tuple_table_slot;
+	int                     ps_segno;
 
 } PlugStorageData;
 
@@ -179,7 +181,9 @@
 ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
                                                      Relation relation,
                                                      int formatterType,
-                                                     char *formatterName);
+                                                     char *formatterName,
+													PlannedStmt* plannedstmt,
+													int segno);
 
 Oid InvokePlugStorageFormatInsert(FmgrInfo *func,
                                   ExternalInsertDesc extInsertDesc,
diff --git a/src/include/catalog/pg_exttable.h b/src/include/catalog/pg_exttable.h
index 3256bb9..11f053c 100644
--- a/src/include/catalog/pg_exttable.h
+++ b/src/include/catalog/pg_exttable.h
@@ -164,9 +164,10 @@
 extern void
 RemoveExtTableEntry(Oid relid);
 
-#define CustomFormatType 'b'
-#define TextFormatType 't'
-#define CsvFormatType 'c'
+#define CustomFormatType    'b'
+#define TextFormatType      't'
+#define CsvFormatType       'c'
+#define NonCustomFormatType 'n'
 
 /* PXF formats*/
 #define GpdbWritableFormatName "GPDBWritable"
diff --git a/src/include/cdb/cdbdatalocality.h b/src/include/cdb/cdbdatalocality.h
index c3753ce..0afc45c 100644
--- a/src/include/cdb/cdbdatalocality.h
+++ b/src/include/cdb/cdbdatalocality.h
@@ -32,6 +32,7 @@
 #include "catalog/gp_policy.h"
 #include "nodes/parsenodes.h"
 #include "executor/execdesc.h"
+#include "catalog/pg_exttable.h"
 
 /*
  * structure containing information about data residence
@@ -71,12 +72,18 @@
 	char *hostname;
 } VirtualSegmentNode;
 
+typedef struct blocklocation_file{
+	BlockLocation *locations;
+	int block_num;
+	char *file_uri;
+}blocklocation_file;
+
 /*
  * calculate_planner_segment_num: based on the parse tree,
  * we calculate the appropriate planner segment_num.
  */
-SplitAllocResult * calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
-                                                List *rtable, GpPolicy *intoPolicy, int sliceNum, int fixedVsegNum);
+SplitAllocResult * calculate_planner_segment_num(PlannedStmt *plannedstmt, Query *query,
+		QueryResourceLife resourceLife, int fixedVsegNum);
 
 /*
  * udf_collector_walker: the routine to file udfs.
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 8404a7f..e23df2f 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -52,7 +52,7 @@
 
 extern const char *synthetic_sql;
 
-extern Oid	DefineRelation(CreateStmt *stmt, char relkind, char relstorage);
+extern Oid DefineRelation(CreateStmt *stmt, char relkind, char relstorage, const char *formattername);
 
 extern void	DefineExternalRelation(CreateExternalStmt *stmt);
 
@@ -89,6 +89,8 @@
 
 extern void ExecuteTruncate(TruncateStmt *stmt);
 
+
+
 extern void renameatt(Oid myrelid,
 		  const char *oldattname,
 		  const char *newattname,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index a9ca4a0..173a35c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1354,14 +1354,15 @@
  * Node that represents the single row error handling (SREH) clause.
  * used in COPY and External Tables.
  */
-typedef struct SingleRowErrorDesc
-{
-	NodeTag		type;
-	RangeVar	*errtable;			/* error table for data format errors */
-	int			rejectlimit;		/* per segment error reject limit */
-	bool		is_keep;			/* true if KEEP indicated (COPY only) */
-	bool		is_limit_in_rows;	/* true for ROWS false for PERCENT */
-	bool		reusing_existing_errtable;  /* var used later in trasform... */
+typedef struct SingleRowErrorDesc {
+  NodeTag type;
+  RangeVar *errtable;             /* error table for data format errors */
+  int rejectlimit;                /* per segment error reject limit */
+  bool is_keep;                   /* true if KEEP indicated (COPY only) */
+  bool is_limit_in_rows;          /* true for ROWS false for PERCENT */
+  bool reusing_existing_errtable; /* var used later in trasform... */
+  bool is_hdfs_protocol_text;     /* hdfs protocol text format table */
+  char *hdfsLoc; /* error table location for hdfs protocol text only */
 } SingleRowErrorDesc;
 
 /* ----------------------
@@ -1403,37 +1404,40 @@
  * implementation).
  * ----------------------
  */
+typedef struct CreateStmtBase {
+  char relKind;             // CDB: force relkind to this
+  RangeVar *relation;       // relation to create
+  List *tableElts;          // column definitions (list of ColumnDef)
+  List *inhRelations;       // relations to inherit from (list of inhRelation)
+  List *constraints;        // constraints (list of Constraint nodes)
+  List *options;            // options from WITH clause
+  OnCommitAction oncommit;  // what do we do at COMMIT?
+  char *tablespacename;     // table space to use, or NULL
+  List *distributedBy;      // what columns we distribute the data by
+  Node *postCreate;         // CDB: parse and process after the CREATE
+  // CDB: child table in a partition? Marked during analysis for
+  // interior or leaf parts of the new table.  Not marked for a
+  // a partition root or ordinary table.
+  bool is_part_child;
+  bool is_add_part;   // CDB: is create adding a part to a partition?
+  Node *partitionBy;  // what columns we partition the data by
+} CreateStmtBase;
 
-typedef struct CreateStmt
-{
-	NodeTag		type;
-	RangeVar   *relation;		/* relation to create */
-	List	   *tableElts;		/* column definitions (list of ColumnDef) */
-	List	   *inhRelations;	/* relations to inherit from (list of
-								 * inhRelation) */
-	List	   *constraints;	/* constraints (list of Constraint nodes) */
-	List	   *options;		/* options from WITH clause */
-	OnCommitAction oncommit;	/* what do we do at COMMIT? */
-	char	   *tablespacename; /* table space to use, or NULL */
-	List       *distributedBy;   /* what columns we distribute the data by */
-	Node       *partitionBy;     /* what columns we partition the data by */
-	TableOidInfo oidInfo;
-	char	    relKind;         /* CDB: force relkind to this */
-	char		relStorage;
-	struct GpPolicy  *policy;
-	Node       *postCreate;      /* CDB: parse and process after the CREATE */
-	List	   *deferredStmts;	/* CDB: Statements, e.g., partial indexes, that can't be 
-								 * analyzed until after CREATE (until the target table
-								 * is created and visible). */
-	bool		is_part_child;	/* CDB: child table in a partition? Marked during analysis for 
-								 * interior or leaf parts of the new table.  Not marked for a
-								 * a partition root or ordinary table.
-								 */
-	bool		is_add_part;	/* CDB: is create adding a part to a partition? */
-	bool		is_split_part;	/* CDB: is create spliting a part? */
-	Oid			ownerid;		/* OID of the role to own this. if InvalidOid, GetUserId() */
-	bool		buildAoBlkdir; /* whether to build the block directory for an AO table */
-	List	   *attr_encodings; /* attribute storage directives */
+
+typedef struct CreateStmt {
+  NodeTag type;
+  CreateStmtBase base;
+  TableOidInfo oidInfo;
+  char relStorage;
+  struct GpPolicy *policy;
+  List *deferredStmts; /* CDB: Statements, e.g., partial indexes, that can't be
+                                                * analyzed until after CREATE
+                        * (until the target table
+                                                * is created and visible). */
+  bool is_split_part;  /* CDB: is create spliting a part? */
+  Oid ownerid; /* OID of the role to own this. if InvalidOid, GetUserId() */
+  bool buildAoBlkdir; /* whether to build the block directory for an AO table */
+  List *attr_encodings; /* attribute storage directives */
 } CreateStmt;
 
 typedef enum SharedStorageOp
@@ -1459,8 +1463,9 @@
  */
 typedef enum ExtTableType
 {
-	EXTTBL_TYPE_LOCATION,		/* table defined with LOCATION clause */
-	EXTTBL_TYPE_EXECUTE			/* table defined with EXECUTE clause */
+	 EXTTBL_TYPE_LOCATION, /* table defined with LOCATION clause */
+	 EXTTBL_TYPE_EXECUTE,  /* table defined with EXECUTE clause */
+	 EXTTBL_TYPE_UNKNOWN   /* table defined with unknown store */
 } ExtTableType;
 
 typedef struct ExtTableTypeDesc
@@ -1472,22 +1477,21 @@
 	char			*command_string;
 } ExtTableTypeDesc;
 
-typedef struct CreateExternalStmt
-{
-	NodeTag		type;
-	RangeVar   *relation;		/* external relation to create */
-	List	   *tableElts;		/* column definitions (list of ColumnDef) */
-	Node	   *exttypedesc;    /* LOCATION or EXECUTE information */
-	char	   *format;			/* data format name */
-	List	   *formatOpts;		/* List of DefElem nodes for data format */
-	bool		isweb;
-	bool		iswritable;
-	Node	   *sreh;			/* Single row error handling info */
-	List	   *encoding;		/* List (size 1 max) of DefElem nodes for
-								   data encoding */
-	List       *distributedBy;   /* what columns we distribute the data by */
-	struct GpPolicy  *policy;	/* used for writable tables */
-	
+typedef struct CreateExternalStmt {
+  NodeTag type;
+  CreateStmtBase base;
+  Node *exttypedesc; /* LOCATION or EXECUTE information */
+  char *format;      /* data format name */
+  bool isweb;
+  bool iswritable;
+  bool isexternal;
+  Node *sreh;              /* Single row error handling info */
+  List *encoding;          /* List (size 1 max) of DefElem nodes for
+                                                      data encoding */
+  struct GpPolicy *policy; /* used for writable tables */
+  bool forceCreateDir;     /* true to create external dir */
+  char *parentPath;         // keep the parent relative path for partition
+  struct IndexStmt *pkey;  /* PRIMARY KEY index, if any */
 } CreateExternalStmt;
 
 /* ----------------------
diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h
index ccb54f7..1c7df3f 100644
--- a/src/include/parser/analyze.h
+++ b/src/include/parser/analyze.h
@@ -34,27 +34,33 @@
 Datum partition_arg_get_val(Node *node, bool *isnull);
 
 /* State shared by transformCreateStmt and its subroutines */
-typedef struct
-{
-	const char *stmtType;		/* "CREATE TABLE" or "ALTER TABLE" */
-	RangeVar   *relation;		/* relation to create */
-	List	   *inhRelations;	/* relations to inherit from */
-	bool		hasoids;		/* does relation have an OID column? */
-	bool		isalter;		/* true if altering existing table */
-	bool		isaddpart;		/* true if create in service of adding a part */
-	List	   *columns;		/* ColumnDef items */
-	List	   *ckconstraints;	/* CHECK constraints */
-	List	   *fkconstraints;	/* FOREIGN KEY constraints */
-	List	   *ixconstraints;	/* index-creating constraints */
-	List	   *inh_indexes;	/* cloned indexes from INCLUDING INDEXES */
-	List	   *blist;			/* "before list" of things to do before
-								 * creating the table */
-	List	   *alist;			/* "after list" of things to do after creating
-								 * the table */
-	List	   *dlist;			/* "deferred list" of utility statements to 
-								 * transfer to the list CreateStmt->deferredStmts
-								 * for later parse_analyze and dispatch */
-	IndexStmt  *pkey;			/* PRIMARY KEY index, if any */
+typedef struct {
+  bool isExternalTable;
+  const char *stmtType; /* "CREATE TABLE" or "ALTER TABLE" */
+  RangeVar *relation;   /* relation to create */
+  List *inhRelations;   /* relations to inherit from */
+  bool hasoids;         /* does relation have an OID column? */
+  bool isalter;         /* true if altering existing table */
+  bool isaddpart;       /* true if create in service of adding a part */
+  List *columns;        /* ColumnDef items */
+  List *ckconstraints;  /* CHECK constraints */
+  List *fkconstraints;  /* FOREIGN KEY constraints */
+  List *ixconstraints;  /* index-creating constraints */
+  List *inh_indexes;    /* cloned indexes from INCLUDING INDEXES */
+  List *blist;          /* "before list" of things to do before
+                                         * creating the table */
+  List *alist;          /* "after list" of things to do after creating
+                                         * the table */
+  List *dlist;          /* "deferred list" of utility statements to
+                                         * transfer to the list CreateStmt->deferredStmts
+                                         * for later parse_analyze and dispatch */
+  IndexStmt *pkey;      /* PRIMARY KEY index, if any */
+
+  // for external table only
+  Node *exttypedesc;
+  char *format;
+  bool iswritable;
+  char *parentPath;
 } CreateStmtContext;
 
 Query *transformCreateStmt(ParseState *pstate, CreateStmt *stmt,
@@ -83,4 +89,6 @@
 
 extern struct GpPolicy *createRandomDistribution(int maxattrs);
 
+extern void recognizeExternalRelationFormatterOptions(
+    CreateExternalStmt *createExtStmt);
 #endif   /* ANALYZE_H */
diff --git a/src/include/utils/uri.h b/src/include/utils/uri.h
index a37d9e7..9c5aedd 100644
--- a/src/include/utils/uri.h
+++ b/src/include/utils/uri.h
@@ -32,7 +32,8 @@
 	URI_HTTP,
 	URI_GPFDIST,
 	URI_CUSTOM,
-	URI_GPFDISTS
+	URI_GPFDISTS,
+	URI_HDFS
 }	UriProtocol;
 
 #define PROTOCOL_FILE		"file://"
@@ -41,6 +42,7 @@
 #define PROTOCOL_GPFDIST	"gpfdist://"
 #define PROTOCOL_GPFDISTS	"gpfdists://"
 #define PROTOCOL_PXF		"pxf://"
+#define PROTOCOL_HDFS		"hdfs://"
 
 /* 
  * sometimes we don't want to parse the whole URI but just take a peek at
@@ -52,6 +54,7 @@
 #define IS_GPFDISTS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_GPFDISTS, strlen(PROTOCOL_GPFDISTS)) == 0) 
 #define IS_FTP_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_FTP, strlen(PROTOCOL_FTP)) == 0)
 #define IS_PXF_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_PXF, strlen(PROTOCOL_PXF)) == 0)
+#define IS_HDFS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_HDFS, strlen(PROTOCOL_HDFS)) == 0)
 
 typedef struct Uri
 {