HAWQ-1628. Add hdfs protocol for pluggable storage framework
diff --git a/contrib/Makefile b/contrib/Makefile
index 695e92a..e5daff9 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -9,6 +9,7 @@
extprotocol \
gp_cancel_query \
formatter_fixedwidth \
+ exthdfs\
hawq-hadoop
ifeq ($(with_pgcrypto), yes)
diff --git a/contrib/exthdfs/Makefile b/contrib/exthdfs/Makefile
new file mode 100644
index 0000000..e247664
--- /dev/null
+++ b/contrib/exthdfs/Makefile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+MODULE_big = exthdfs
+OBJS = exthdfs.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport)
+
+override CFLAGS += -lhdfs3
+
+ifdef USE_PGXS
+PGXS := $(shell pg_config --pgxs)
+include $(PGXS)
+else
+subdir = contrib/exthdfs
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/exthdfs/common.h b/contrib/exthdfs/common.h
new file mode 100644
index 0000000..4111909
--- /dev/null
+++ b/contrib/exthdfs/common.h
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _EXTHDFS_COMMON_H_
+#define _EXTHDFS_COMMON_H_
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "access/extprotocol.h"
+#include "access/fileam.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_exttable.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "miscadmin.h"
+
+#include <fcntl.h>
+
+#endif // _EXTHDFS_COMMON_H_
+
diff --git a/contrib/exthdfs/exthdfs.c b/contrib/exthdfs/exthdfs.c
new file mode 100644
index 0000000..1378734
--- /dev/null
+++ b/contrib/exthdfs/exthdfs.c
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "postgres.h"
+
+#include "common.h"
+#include "access/extprotocol.h"
+#include "cdb/cdbdatalocality.h"
+#include "storage/fd.h"
+#include "storage/filesystem.h"
+#include "utils/uri.h"
+
+
+
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation);
+PG_FUNCTION_INFO_V1(hdfsprotocol_validate);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS);
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
+{
+
+ // Build the result instance
+ int nsize = 0;
+ int numOfBlock = 0;
+ ExtProtocolBlockLocationData *bldata =
+ palloc0(sizeof(ExtProtocolBlockLocationData));
+ if (bldata == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "cannot allocate due to no memory");
+ }
+ bldata->type = T_ExtProtocolBlockLocationData;
+ fcinfo->resultinfo = bldata;
+
+ ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *)
+ (fcinfo->context);
+
+
+ // Parse URI of the first location, we expect all locations uses the same
+ // name node server. This is checked in validation function.
+
+ char *first_uri_str = (char *)strVal(lfirst(list_head(pvalidator_data->url_list)));
+ Uri *uri = ParseExternalTableUri(first_uri_str);
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : "
+ "extracted HDFS name node address %s:%d",
+ uri->hostname, uri->port);
+
+ // Create file system instance
+ hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+ if (fs == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to create HDFS instance to connect to %s:%d",
+ uri->hostname, uri->port);
+ }
+
+ // Clean up uri instance as we don't need it any longer
+ FreeExternalTableUri(uri);
+
+ // Check all locations to get files to fetch location.
+ ListCell *lc = NULL;
+ foreach(lc, pvalidator_data->url_list)
+ {
+ // Parse current location URI.
+ char *url = (char *)strVal(lfirst(lc));
+ Uri *uri = ParseExternalTableUri(url);
+ if (uri == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "invalid URI encountered %s", url);
+ }
+
+ //
+ // NOTICE: We temporarily support only directories as locations. We plan
+ // to extend the logic to specifying single file as one location
+ // very soon.
+
+
+ // get files contained in the path.
+ hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+ if (fiarray == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to get files of path %s",
+ uri->path);
+ }
+
+ int i = 0 ;
+ // Call block location api to get data location for each file
+ for (i = 0 ; i < nsize ; i++)
+ {
+ hdfsFileInfo *fi = &fiarray[i];
+
+ // break condition of this for loop
+ if (fi == NULL) {break;}
+
+ // Build file name full path.
+ const char *fname = fi->mName;
+ char *fullpath = palloc0( // slash
+ strlen(fname) + // name
+ 1); // \0
+ sprintf(fullpath, "%s", fname);
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : "
+ "built full path file %s", fullpath);
+
+ // Get file full length.
+ int64_t len = fi->mSize;
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : "
+ "got file %s length " INT64_FORMAT,
+ fullpath, len);
+
+ if (len == 0) {
+ pfree(fullpath);
+ continue;
+ }
+
+ // Get block location data for this file
+ BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
+ if (bla == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to get block location of path %s. "
+ "It is reported generally due to HDFS service errors or "
+ "another session's ongoing writing.",
+ fullpath);
+ }
+
+ // Add file full path and its block number as result.
+ blocklocation_file *blf = palloc0(sizeof(blocklocation_file));
+ blf->file_uri = pstrdup(fullpath);
+ blf->block_num = numOfBlock;
+ blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num);
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks",
+ fullpath, blf->block_num);
+
+ // We don't need it any longer
+ pfree(fullpath);
+ int bidx = 0;
+ // Add block information as a list.
+ for (bidx = 0 ; bidx < blf->block_num ; bidx++)
+ {
+ BlockLocation *blo = &bla[bidx];
+ BlockLocation *bl = &(blf->locations[bidx]);
+ bl->numOfNodes = blo->numOfNodes;
+ bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+ bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+ bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+ bl->offset = blo->offset;
+ bl->length = blo->length;
+ bl->corrupt = blo->corrupt;
+
+ int nidx = 0 ;
+ for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++)
+ {
+ bl->hosts[nidx] = pstrdup(*blo[nidx].hosts);
+ bl->names[nidx] = pstrdup(*blo[nidx].names);
+ bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths);
+ }
+ }
+
+ bldata->files = lappend(bldata->files, (void *)(blf));
+
+ // Clean up block location instances created by the lib.
+ hdfsFreeFileBlockLocations(bla,numOfBlock);
+ }
+
+ // Clean up URI instance in loop as we don't need it any longer
+ FreeExternalTableUri(uri);
+
+ // Clean up file info array created by the lib for this location.
+ hdfsFreeFileInfo(fiarray,nsize);
+ }
+
+ // destroy fs instance
+ hdfsDisconnect(fs);
+
+ PG_RETURN_VOID();
+
+}
+
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS)
+{
+ elog(DEBUG3, "hdfsprotocol_validate() begin");
+
+ /* Check which action should perform. */
+ ExtProtocolValidatorData *pvalidator_data =
+ (ExtProtocolValidatorData *)(fcinfo->context);
+
+ if (pvalidator_data->forceCreateDir)
+ Assert(pvalidator_data->url_list && pvalidator_data->url_list->length == 1);
+
+ if (pvalidator_data->direction == EXT_VALIDATE_WRITE)
+ {
+ /* accept only one directory location */
+ if (list_length(pvalidator_data->url_list) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "only one location url is supported for writable external hdfs")));
+ }
+ }
+
+ /* Go through first round to get formatter type */
+ bool isCsv = false;
+ bool isText = false;
+ bool isOrc = false;
+ ListCell *optcell = NULL;
+ foreach(optcell, pvalidator_data->format_opts)
+ {
+ DefElem *de = (DefElem *)lfirst(optcell);
+ if (strcasecmp(de->defname, "formatter") == 0)
+ {
+ char *val = strVal(de->arg);
+ if (strcasecmp(val, "csv") == 0)
+ {
+ isCsv = true;
+ }
+ else if (strcasecmp(val, "text") == 0)
+ {
+ isText = true;
+ }
+ else if (strcasecmp(val, "orc") == 0)
+ {
+ isOrc = true;
+ }
+ }
+ }
+ /*if(1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "no formatter is supported for external hdfs")));
+ }*/
+ if (!isCsv && !isText && !isOrc)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "only 'csv', 'text' and 'orc' formatter is supported for external hdfs")));
+ }
+ Assert(isCsv || isText || isOrc);
+
+ /* Validate formatter options */
+ foreach(optcell, pvalidator_data->format_opts)
+ {
+ DefElem *de = (DefElem *)lfirst(optcell);
+ if (strcasecmp(de->defname, "delimiter") == 0)
+ {
+ char *val = strVal(de->arg);
+ /* Validation 1. User can not specify 'OFF' in delimiter */
+ if (strcasecmp(val, "off") == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'off' value of 'delimiter' option is not supported")));
+ }
+ /* Validation 2. Can specify multibytes characters */
+ if (strlen(val) < 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'delimiter' option accepts multibytes characters")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "escape") == 0)
+ {
+ char *val = strVal(de->arg);
+ /* Validation 3. User can not specify 'OFF' in delimiter */
+ if (strcasecmp(val, "off") == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'off' value of 'escape' option is not supported")));
+ }
+ /* Validation 4. Can only specify one character */
+ if (strlen(val) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'escape' option accepts single character")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "newline") == 0)
+ {
+ char *val = strVal(de->arg);
+ /* Validation 5. only accept 'lf', 'cr', 'crlf' */
+ if (strcasecmp(val, "lf") != 0 &&
+ strcasecmp(val, "cr") != 0 &&
+ strcasecmp(val, "crlf") != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "the value of 'newline' option can only be "
+ "'lf', 'cr' or 'crlf'")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "quote") == 0)
+ {
+ /* This is allowed only for csv mode formatter */
+ if (!isCsv)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'quote' option is only available in 'csv' formatter")));
+ }
+
+ char *val = strVal(de->arg);
+ /* Validation 5. Can only specify one character */
+ if (strlen(val) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'quote' option accepts single character")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "force_notnull") == 0)
+ {
+ /* This is allowed only for csv mode formatter */
+ if (!isCsv)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'force_notnull' option is only available in 'csv' formatter")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "force_quote") == 0)
+ {
+ /* This is allowed only for csv mode formatter */
+ if (!isCsv)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'force_quote' option is only available in 'csv' formatter")));
+ }
+ }
+ }
+
+ /* All urls should
+ * 1) have the same protocol name 'hdfs',
+ * 2) the same hdfs namenode server address
+ */
+ /* Check all locations to get files to fetch location. */
+ char *nnaddr = NULL;
+ int nnport = -1;
+ ListCell *lc = NULL;
+ foreach(lc, pvalidator_data->url_list)
+ {
+ /* Parse current location URI. */
+ char *url = (char *)strVal(lfirst(lc));
+ Uri *uri = ParseExternalTableUri(url);
+ if (uri == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "invalid URI encountered %s", url);
+ }
+
+ if (uri->protocol != URI_HDFS)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "invalid URI protocol encountered in %s, "
+ "hdfs:// protocol is required",
+ url);
+ }
+
+ if (nnaddr == NULL)
+ {
+ nnaddr = pstrdup(uri->hostname);
+ nnport = uri->port;
+ }
+ else
+ {
+ if (strcmp(nnaddr, uri->hostname) != 0)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "different name server addresses are detected, "
+ "both %s and %s are found",
+ nnaddr, uri->hostname);
+ }
+ if (nnport != uri->port)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "different name server ports are detected, "
+ "both %d and %d are found",
+ nnport, uri->port);
+ }
+ }
+
+ /* SHOULD ADD LOGIC HERE TO CREATE UNEXISTING PATH */
+ if (pvalidator_data->forceCreateDir) {
+
+ elog(LOG, "hdfs_validator() forced creating dir");
+
+ /* Create file system instance */
+ hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+ if (fs == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "failed to create HDFS instance to connect to %s:%d",
+ uri->hostname, uri->port);
+ }
+
+ if (hdfsExists(fs, uri->path) == -1)
+ elog(ERROR, "hdfsprotocol_validate : "
+ "Location \"%s\" is not exist",
+ uri->path);
+
+ /* destroy fs instance */
+ hdfsDisconnect(fs);
+ }
+
+ /* Clean up temporarily created instances */
+ FreeExternalTableUri(uri);
+ if (nnaddr != NULL)
+ {
+ pfree(nnaddr);
+ }
+ }
+
+ elog(LOG, "passed validating hdfs protocol options");
+
+ /**************************************************************************
+ * This is a bad implementation that we check formatter options here. Should
+ * be moved to call formatter specific validation UDFs.
+ **************************************************************************/
+
+ PG_RETURN_VOID();
+}
+
diff --git a/contrib/extprotocol/gpextprotocol.c b/contrib/extprotocol/gpextprotocol.c
index 419b923..07f1731 100644
--- a/contrib/extprotocol/gpextprotocol.c
+++ b/contrib/extprotocol/gpextprotocol.c
@@ -324,5 +324,5 @@
if (uri->protocol)
pfree(uri->protocol);
- pfree(uri);
+ FreeExternalTableUri(uri);
}
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 692a5db..0d2527d 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -86,14 +86,17 @@
char *uri, int rejectlimit,
bool islimitinrows, Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding);
-static void FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
- int nArgs,
- CopyState pstate,
- FormatterData* formatter,
- Relation rel,
- TupleDesc tupDesc,
- FmgrInfo *convFuncs,
- Oid *typioparams);
+static void
+FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
+ int nArgs,
+ CopyState pstate,
+ FormatterData *formatter,
+ Relation rel,
+ TupleDesc tupDesc,
+ FmgrInfo *convFuncs,
+ Oid *typioparams,
+ char *url,
+ ScanState *ss);
static void open_external_readable_source(FileScanDesc scan);
static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
@@ -305,12 +308,54 @@
scan->errcontext.previous = error_context_stack;
//pgstat_initstats(relation);
-
+ external_populate_formatter_actionmask(scan->fs_pstate, scan->fs_formatter);
return scan;
}
/* ----------------
+ * external_populate_formatter_actionmask
+ * ----------------
+ */
+void external_populate_formatter_actionmask(CopyState pstate,
+ FormatterData *formatter)
+{
+ /* We just call the formatter in function to populate the mask */
+ formatter->fmt_mask = FMT_UNSET;
+
+ if (pstate->custom_formatter_func == NULL)
+ {
+ formatter->fmt_mask |= FMT_NEEDEXTBUFF;
+ elog(LOG, "external scan needs an external protocol to cooperate");
+ return;
+ }
+
+ Datum d;
+ FunctionCallInfoData fcinfo;
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL);
+ d = FunctionCallInvoke(&fcinfo);
+
+ if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+ {
+ elog(LOG, "external scan needs an external protocol to cooperate");
+ }
+ else
+ {
+ elog(LOG, "external scan needs only formatter to manipulate data");
+ }
+}
+
+/* ----------------
* external_rescan - (re)start a scan of an external file
* ----------------
*/
@@ -525,13 +570,15 @@
/*
* open the external source (local file or http).
*
- * NOTE: external_beginscan() seems like the natural place for this call. However,
- * in queries with more than one gang each gang will initialized all the nodes
- * of the plan (but actually executed only the nodes in it's local slice)
- * This means that external_beginscan() (and external_endscan() too) will get called
- * more than needed and we'll end up opening too many http connections when
- * they are not expected (see MPP-1261). Therefore we instead do it here on the
- * first time around only.
+ * NOTE: external_beginscan() seems like the natural place for this call.
+ * However, in queries with more than one gang each gang will initialized
+ * all the nodes of the plan (but actually executed only the nodes in it's
+ * local slice)
+ *
+ * This means that external_beginscan() (and external_endscan() too) will
+ * get called more than needed and we'll end up opening too many http
+ * connections when they are not expected (see MPP-1261). Therefore we
+ * instead do it here on the first time around only.
*/
/*
@@ -539,7 +586,7 @@
* load external protocol.
*/
- if (scan->fs_file == NULL)
+ if (scan->fs_file == NULL && (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF))
open_external_readable_source(scan);
/* Note: no locking manipulations needed */
@@ -567,6 +614,7 @@
return true;
}
+
/*
* external_insert_init
*
@@ -575,7 +623,7 @@
*/
ExternalInsertDesc
external_insert_init(Relation rel, int errAosegno,
- ExternalTableType formatterType, char *formatterName)
+ int formatterType, char *formatterName, PlannedStmt* plannedstmt)
{
ExternalInsertDesc extInsertDesc;
ExtTableEntry* extentry;
@@ -634,8 +682,12 @@
/* get a url to use. we use seg number modulo total num of urls */
v = list_nth(extentry->locations, my_url);
uri_str = pstrdup(v->val.str);
+ Uri* uri = ParseExternalTableUri(uri_str);
+
extInsertDesc->ext_uri = uri_str;
+ FreeExternalTableUri(uri);
+
/*elog(NOTICE, "seg %d got url number %d: %s", segindex, my_url, uri_str);*/
}
@@ -668,6 +720,10 @@
{
extInsertDesc->ext_formatter_data = (FormatterData *) palloc0 (sizeof(FormatterData));
extInsertDesc->ext_formatter_data->fmt_perrow_ctx = extInsertDesc->ext_pstate->rowcontext;
+
+ /* First call formatter in function to get action mask */
+ external_populate_formatter_actionmask(extInsertDesc->ext_pstate,
+ extInsertDesc->ext_formatter_data);
}
return extInsertDesc;
@@ -687,7 +743,6 @@
Oid
external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
{
-
HeapTuple instup = ExecFetchSlotHeapTuple(tupTableSlot);
TupleDesc tupDesc = extInsertDesc->ext_tupDesc;
Datum* values = extInsertDesc->ext_values;
@@ -695,14 +750,18 @@
CopyStateData* pstate = extInsertDesc->ext_pstate;
bool customFormat = extInsertDesc->ext_pstate->custom;
-
if(extInsertDesc->ext_noop)
return InvalidOid;
/* Open our output file or output stream if not yet open */
- if(!extInsertDesc->ext_file && !extInsertDesc->ext_noop)
+ if(!extInsertDesc->ext_file &&
+ !extInsertDesc->ext_noop &&
+ (extInsertDesc->ext_formatter_data == NULL ||
+ (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF)))
+ {
open_external_writable_source(extInsertDesc);
+ }
/*
* deconstruct the tuple and format it into text
@@ -730,15 +789,34 @@
/* must have been created during insert_init */
Assert(formatter);
- /* per call formatter prep */
- FunctionCallPrepareFormatter(&fcinfo,
- 1,
- pstate,
- formatter,
- extInsertDesc->ext_rel,
- extInsertDesc->ext_tupDesc,
- pstate->out_functions,
- NULL);
+ if ((formatter->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+ {
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ extInsertDesc->ext_rel,
+ extInsertDesc->ext_tupDesc,
+ pstate->out_functions,
+ NULL,
+ extInsertDesc->ext_uri,
+ NULL);
+ }
+ else
+ {
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 1,
+ pstate,
+ formatter,
+ extInsertDesc->ext_rel,
+ extInsertDesc->ext_tupDesc,
+ pstate->out_functions,
+ NULL,
+ NULL,
+ NULL);
+ }
/* Mark the correct record type in the passed tuple */
HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
@@ -748,18 +826,22 @@
fcinfo.argnull[0] = false;
d = FunctionCallInvoke(&fcinfo);
+
MemoryContextReset(formatter->fmt_perrow_ctx);
- /* We do not expect a null result */
- if (fcinfo.isnull)
- elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
+ if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+ {
+ /* We do not expect a null result */
+ if (fcinfo.isnull)
+ elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
- b = DatumGetByteaP(d);
+ b = DatumGetByteaP(d);
- CopyOneCustomRowTo(pstate, b);
+ CopyOneCustomRowTo(pstate, b);
+ }
}
- if (extInsertDesc->ext_formatter_data == NULL)
+ if (extInsertDesc->ext_formatter_data == NULL || (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF))
{
/* Write the data into the external source */
external_senddata((URL_FILE*)extInsertDesc->ext_file, pstate);
@@ -769,7 +851,6 @@
pstate->fe_msgbuf->data[0] = '\0';
}
pstate->processed++;
-
return HeapTupleGetOid(instup);
}
@@ -782,6 +863,28 @@
void
external_insert_finish(ExternalInsertDesc extInsertDesc)
{
+ /* Tell formatter to close */
+ if (extInsertDesc->ext_formatter_data != NULL &&
+ (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+ {
+ Datum d;
+ FunctionCallInfoData fcinfo;
+
+ extInsertDesc->ext_formatter_data->fmt_mask |= FMT_WRITE_END;
+
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ extInsertDesc->ext_pstate,
+ extInsertDesc->ext_formatter_data,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL);
+ d = FunctionCallInvoke(&fcinfo);
+ }
/*
* Close the external source
@@ -805,6 +908,7 @@
pfree(extInsertDesc);
}
+
/* ==========================================================================
* The follwing macros aid in major refactoring of data processing code (in
* externalgettup() ). We use macros because in some cases the code must be in
@@ -1042,102 +1146,102 @@
static HeapTuple
externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
{
- HeapTuple tuple = NULL;
- CopyState pstate = scan->fs_pstate;
- bool needData = false;
+ HeapTuple tuple = NULL;
+ CopyState pstate = scan->fs_pstate;
+ bool needData = false;
- /* If we either got things to read or stuff to process */
- while (!pstate->fe_eof || !pstate->raw_buf_done)
+ /* If we either got things to read or stuff to process */
+ while (!pstate->fe_eof || !pstate->raw_buf_done)
+ {
+ /* need to fill our buffer with data? */
+ if (pstate->raw_buf_done)
{
- /* need to fill our buffer with data? */
- if (pstate->raw_buf_done)
+ pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+ pstate->begloc = pstate->raw_buf;
+ pstate->raw_buf_done = (pstate->bytesread==0);
+ pstate->raw_buf_index = 0;
+
+ /* on first time around just throw the header line away */
+ if (pstate->header_line && pstate->bytesread > 0)
{
- pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
- pstate->begloc = pstate->raw_buf;
- pstate->raw_buf_done = (pstate->bytesread==0);
- pstate->raw_buf_index = 0;
-
- /* on first time around just throw the header line away */
- if (pstate->header_line && pstate->bytesread > 0)
+ PG_TRY();
{
- PG_TRY();
- {
- readHeaderLine(pstate);
- }
- PG_CATCH();
- {
- /*
- * got here? encoding conversion error occurred on the
- * header line (first row).
- */
- if (pstate->errMode == ALL_OR_NOTHING)
- {
- PG_RE_THROW();
- }
- else
- {
- /* SREH - release error state */
- if (!elog_dismiss(DEBUG5))
- PG_RE_THROW(); /* hope to never get here! */
-
- /*
- * note: we don't bother doing anything special here.
- * we are never interested in logging a header line
- * error. just continue the workflow.
- */
- }
- }
- PG_END_TRY();
-
- EXT_RESET_LINEBUF;
- pstate->header_line = false;
+ readHeaderLine(pstate);
}
- }
-
- /* while there is still data in our buffer */
- while (!pstate->raw_buf_done || needData)
- {
- DataLineStatus ret_mode = parse_next_line(scan);
-
- if(ret_mode == LINE_OK)
+ PG_CATCH();
{
- /* convert to heap tuple */
- /* XXX This is bad code. Planner should be able to
- * decide whether we need heaptuple or memtuple upstream,
- * so make the right decision here.
+ /*
+ * got here? encoding conversion error occurred on the
+ * header line (first row).
*/
- tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
- pstate->processed++;
- MemoryContextReset(pstate->rowcontext);
- return tuple;
+ if (pstate->errMode == ALL_OR_NOTHING)
+ {
+ PG_RE_THROW();
+ }
+ else
+ {
+ /* SREH - release error state */
+ if (!elog_dismiss(DEBUG5))
+ PG_RE_THROW(); /* hope to never get here! */
+
+ /*
+ * note: we don't bother doing anything special here.
+ * we are never interested in logging a header line
+ * error. just continue the workflow.
+ */
+ }
}
- else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
- {
- /* error was handled in parse_next_line. move to the next */
- continue;
- }
- else if(ret_mode == END_MARKER)
- {
- scan->fs_inited = false;
- return NULL;
- }
- else
- {
- /* try to get more data if possible */
- Assert((ret_mode == NEED_MORE_DATA) ||
- (ret_mode == LINE_ERROR && pstate->raw_buf_done));
- needData = true;
- break;
- }
+ PG_END_TRY();
+
+ EXT_RESET_LINEBUF;
+ pstate->header_line = false;
}
}
- /*
- * if we got here we finished reading all the data.
- */
- scan->fs_inited = false;
+ /* while there is still data in our buffer */
+ while (!pstate->raw_buf_done || needData)
+ {
+ DataLineStatus ret_mode = parse_next_line(scan);
- return NULL;
+ if(ret_mode == LINE_OK)
+ {
+ /* convert to heap tuple */
+ /* XXX This is bad code. Planner should be able to
+ * decide whether we need heaptuple or memtuple upstream,
+ * so make the right decision here.
+ */
+ tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
+ pstate->processed++;
+ MemoryContextReset(pstate->rowcontext);
+ return tuple;
+ }
+ else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
+ {
+ /* error was handled in parse_next_line. move to the next */
+ continue;
+ }
+ else if(ret_mode == END_MARKER)
+ {
+ scan->fs_inited = false;
+ return NULL;
+ }
+ else
+ {
+ /* try to get more data if possible */
+ Assert((ret_mode == NEED_MORE_DATA) ||
+ (ret_mode == LINE_ERROR && pstate->raw_buf_done));
+ needData = true;
+ break;
+ }
+ }
+ }
+
+ /*
+ * if we got here we finished reading all the data.
+ */
+ scan->fs_inited = false;
+
+ return NULL;
}
@@ -1145,149 +1249,241 @@
static HeapTuple
externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
{
- HeapTuple tuple;
- CopyState pstate = scan->fs_pstate;
- FormatterData* formatter = scan->fs_formatter;
- bool no_more_data = false;
- MemoryContext oldctxt = CurrentMemoryContext;
+ HeapTuple tuple;
+ CopyState pstate = scan->fs_pstate;
+ FormatterData* formatter = scan->fs_formatter;
+ bool no_more_data = false;
+ MemoryContext oldctxt = CurrentMemoryContext;
- Assert(formatter);
+ Assert(formatter);
- /* while didn't finish processing the entire file */
- while (!no_more_data)
+ /* while didn't finish processing the entire file */
+ while (!no_more_data)
+ {
+ /* need to fill our buffer with data? */
+ if (pstate->raw_buf_done)
{
- /* need to fill our buffer with data? */
- if (pstate->raw_buf_done)
- {
- int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
- if ( bytesread > 0 )
- appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
- pstate->raw_buf_done = false;
+ int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+ if ( bytesread > 0 )
+ appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
+ pstate->raw_buf_done = false;
- /* HEADER not yet supported ... */
- if(pstate->header_line)
- elog(ERROR, "header line in custom format is not yet supported");
- }
-
- if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
- {
- /* while there is still data in our buffer */
- while (!pstate->raw_buf_done)
- {
- bool error_caught = false;
-
- /*
- * Invoke the custom formatter function.
- */
- PG_TRY();
- {
- Datum d;
- FunctionCallInfoData fcinfo;
-
- /* per call formatter prep */
- FunctionCallPrepareFormatter(&fcinfo,
- 0,
- pstate,
- formatter,
- scan->fs_rd,
- scan->fs_tupDesc,
- scan->in_functions,
- scan->typioparams);
- d = FunctionCallInvoke(&fcinfo);
-
- }
- PG_CATCH();
- {
- error_caught = true;
-
- MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
-
- /*
- * Save any bad row information that was set
- * by the user in the formatter UDF (if any).
- * Then handle the error in FILEAM_HANDLE_ERROR.
- */
- pstate->cur_lineno = formatter->fmt_badrow_num;
- pstate->cur_byteno = formatter->fmt_bytesread;
- resetStringInfo(&pstate->line_buf);
-
- if (formatter->fmt_badrow_len > 0)
- {
- if (formatter->fmt_badrow_data)
- appendBinaryStringInfo(&pstate->line_buf,
- formatter->fmt_badrow_data,
- formatter->fmt_badrow_len);
-
- formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
- if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
- formatter->fmt_databuf.cursor < 0 )
- {
- formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
- }
- }
-
- FILEAM_HANDLE_ERROR;
-
- MemoryContextSwitchTo(oldctxt);
- }
- PG_END_TRY();
-
- /*
- * Examine the function results. If an error was caught
- * we already handled it, so after checking the reject
- * limit, loop again and call the UDF for the next tuple.
- */
- if (!error_caught)
- {
- switch (formatter->fmt_notification)
- {
- case FMT_NONE:
-
- /* got a tuple back */
-
- tuple = formatter->fmt_tuple;
- pstate->processed++;
- MemoryContextReset(formatter->fmt_perrow_ctx);
-
- return tuple;
-
- case FMT_NEED_MORE_DATA:
-
- /*
- * Callee consumed all data in the buffer.
- * Prepare to read more data into it.
- */
- pstate->raw_buf_done = true;
- justifyDatabuf(&formatter->fmt_databuf);
-
- continue;
-
- default:
- elog(ERROR, "unsupported formatter notification (%d)",
- formatter->fmt_notification);
- break;
- }
- }
- else
- {
- FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
- }
-
- }
- }
- else
- {
- no_more_data = true;
- }
+ /* HEADER not yet supported ... */
+ if(pstate->header_line)
+ elog(ERROR, "header line in custom format is not yet supported");
}
- /*
- * if we got here we finished reading all the data.
- */
- Assert(no_more_data);
- scan->fs_inited = false;
+ if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
+ {
+ /* while there is still data in our buffer */
+ while (!pstate->raw_buf_done)
+ {
+ bool error_caught = false;
- return NULL;
+ /*
+ * Invoke the custom formatter function.
+ */
+ PG_TRY();
+ {
+ Datum d;
+ FunctionCallInfoData fcinfo;
+
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ scan->fs_rd,
+ scan->fs_tupDesc,
+ scan->in_functions,
+ scan->typioparams,
+ ((URL_FILE *)(scan->fs_file))->url,
+ ss);
+ d = FunctionCallInvoke(&fcinfo);
+
+ }
+ PG_CATCH();
+ {
+ error_caught = true;
+
+ MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
+
+ /*
+ * Save any bad row information that was set
+ * by the user in the formatter UDF (if any).
+ * Then handle the error in FILEAM_HANDLE_ERROR.
+ */
+ pstate->cur_lineno = formatter->fmt_badrow_num;
+ pstate->cur_byteno = formatter->fmt_bytesread;
+ resetStringInfo(&pstate->line_buf);
+
+ if (formatter->fmt_badrow_len > 0)
+ {
+ if (formatter->fmt_badrow_data)
+ appendBinaryStringInfo(&pstate->line_buf,
+ formatter->fmt_badrow_data,
+ formatter->fmt_badrow_len);
+
+ formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
+ if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
+ formatter->fmt_databuf.cursor < 0 )
+ {
+ formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
+ }
+ }
+
+ FILEAM_HANDLE_ERROR;
+
+ MemoryContextSwitchTo(oldctxt);
+ }
+ PG_END_TRY();
+
+ /*
+ * Examine the function results. If an error was caught
+ * we already handled it, so after checking the reject
+ * limit, loop again and call the UDF for the next tuple.
+ */
+ if (!error_caught)
+ {
+ switch (formatter->fmt_notification)
+ {
+ case FMT_NONE:
+
+ /* got a tuple back */
+
+ tuple = formatter->fmt_tuple;
+ pstate->processed++;
+ MemoryContextReset(formatter->fmt_perrow_ctx);
+
+ return tuple;
+
+ case FMT_NEED_MORE_DATA:
+
+ /*
+ * Callee consumed all data in the buffer.
+ * Prepare to read more data into it.
+ */
+ pstate->raw_buf_done = true;
+ justifyDatabuf(&formatter->fmt_databuf);
+
+ continue;
+
+ default:
+ elog(ERROR, "unsupported formatter notification (%d)",
+ formatter->fmt_notification);
+ break;
+ }
+ }
+ else
+ {
+ FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
+ }
+ }
+ }
+ else
+ {
+ no_more_data = true;
+ }
+ }
+
+ /*
+ * if we got here we finished reading all the data.
+ */
+ Assert(no_more_data);
+ scan->fs_inited = false;
+
+ return NULL;
+}
+
+static HeapTuple
+externalgettup_custom_noextprot(FileScanDesc scan,
+ ExternalSelectDesc desc,
+ ScanState *ss)
+{
+ HeapTuple tuple;
+ CopyState pstate = scan->fs_pstate;
+ FormatterData* formatter = scan->fs_formatter;
+ bool no_more_data = false;
+ MemoryContext oldctxt = CurrentMemoryContext;
+
+ Assert(formatter);
+
+ /* while didn't finish processing the entire file */
+ while (!no_more_data)
+ {
+ bool error_caught = false;
+
+ /*
+ * Invoke the custom formatter function.
+ */
+ PG_TRY();
+ {
+ Datum d;
+ FunctionCallInfoData fcinfo;
+
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ scan->fs_rd,
+ scan->fs_tupDesc,
+ scan->in_functions,
+ scan->typioparams,
+ scan->fs_uri,
+ ss);
+ d = FunctionCallInvoke(&fcinfo);
+
+ }
+ PG_CATCH();
+ {
+ error_caught = true;
+ MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
+ FILEAM_HANDLE_ERROR;
+ MemoryContextSwitchTo(oldctxt);
+ }
+ PG_END_TRY();
+
+ if (!error_caught)
+ {
+ switch (formatter->fmt_notification)
+ {
+ case FMT_NONE:
+ {
+ /* got a tuple back */
+ tuple = formatter->fmt_tuple;
+ pstate->processed++;
+ MemoryContextReset(formatter->fmt_perrow_ctx);
+ return tuple;
+ }
+ case FMT_DONE:
+ {
+ no_more_data = true;
+ break;
+ }
+ default:
+ {
+ elog(ERROR, "unsupported formatter notification (%d)",
+ formatter->fmt_notification);
+ break;
+ }
+ }
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION),
+ (errmsg("formatter reported error")),
+ errOmitLocation(true)));
+ }
+ }
+
+ /*
+ * if we got here we finished reading all the data.
+ */
+ Assert(no_more_data);
+ scan->fs_inited = false;
+ return NULL;
}
/* ----------------
@@ -1322,11 +1518,19 @@
/* (set current state...) */
}
+ /***********************************************************
+ * This version has always custom formatter and fs defined.
+ ***********************************************************/
if (!custom)
- return externalgettup_defined(scan, desc, ss); /* text/csv */
+ return externalgettup_defined(scan, desc, ss); // text/csv
+ else if (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF)
+ {
+ return externalgettup_custom(scan, desc, ss);
+ }
else
- return externalgettup_custom(scan, desc, ss); /* custom */
-
+ {
+ return externalgettup_custom_noextprot(scan, desc, ss);
+ }
}
/*
* setCustomFormatter
@@ -1346,7 +1550,23 @@
Oid argList[1];
Oid returnOid;
- funcname = lappend(funcname, makeString(formatter_name));
+ char* new_formatter_name = (char *)palloc0(strlen(formatter_name) + 5);
+ if (!iswritable)
+ {
+ sprintf(new_formatter_name, "%s_in", formatter_name);
+ }
+ else
+ {
+ sprintf(new_formatter_name, "%s_out", formatter_name);
+ }
+
+ /* update to all lowercase string */
+ for ( int i = 0 ; new_formatter_name[i] != '\0' ; ++i )
+ {
+ new_formatter_name[i] = tolower(new_formatter_name[i]);
+ }
+
+ funcname = lappend(funcname, makeString(new_formatter_name));
if(iswritable)
{
@@ -1363,7 +1583,7 @@
if (!OidIsValid(procOid))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("formatter function %s of type %s was not found.",
- formatter_name,
+ new_formatter_name,
(iswritable ? "writable" : "readable")),
errhint("Create it with CREATE FUNCTION."),
errOmitLocation(true)));
@@ -1372,7 +1592,7 @@
if (get_func_rettype(procOid) != returnOid)
ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("formatter function %s of type %s has an incorrect return type",
- formatter_name,
+ new_formatter_name,
(iswritable ? "writable" : "readable")),
errOmitLocation(true)));
@@ -1381,9 +1601,12 @@
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("formatter function %s is not declared STABLE.",
- formatter_name),
+ new_formatter_name),
errOmitLocation(true)));
+ if(NULL != new_formatter_name)
+ pfree(new_formatter_name);
+
return procOid;
}
@@ -1664,7 +1887,9 @@
Relation rel,
TupleDesc tupDesc,
FmgrInfo *convFuncs,
- Oid *typioparams)
+ Oid *typioparams,
+ char *url,
+ ScanState *ss)
{
formatter->type = T_FormatterData;
formatter->fmt_relation = rel;
@@ -1680,6 +1905,8 @@
formatter->fmt_needs_transcoding = pstate->need_transcoding;
formatter->fmt_conversion_proc = pstate->enc_conversion_proc;
formatter->fmt_external_encoding = pstate->client_encoding;
+ formatter->fmt_url = url;
+ formatter->fmt_splits = ss == NULL ? NULL : ss->splits;
InitFunctionCallInfoData(/* FunctionCallInfoData */ *fcinfo,
/* FmgrInfo */ pstate->custom_formatter_func,
@@ -1688,7 +1915,6 @@
/* ResultSetInfo */ NULL);
}
-
/*
* open the external source for scanning (RET only)
*
@@ -2360,6 +2586,48 @@
return start;
}
+bool hasErrTblInFmtOpts(List *fmtOpts) {
+ char *format_str = pstrdup((char *) strVal(linitial(fmtOpts)));
+ const char *whitespace = " \t\n\r";
+ int encoding = GetDatabaseEncoding();
+ char *key = strtokx2(format_str, whitespace, NULL, NULL,
+ 0, false, true, encoding);
+ while (key) {
+ if (pg_strcasecmp(key, "err_table") == 0)
+ return true;
+ key = strtokx2(NULL, whitespace, NULL, NULL,
+ 0, false, false, encoding);
+ }
+ return false;
+}
+
+char *getExtTblCategoryInFmtOptsStr(char *fmtStr)
+{
+ const char *whitespace = " \t\n\r";
+ const char *quote = "'";
+ int encoding = GetDatabaseEncoding();
+
+ char *key = strtokx2(fmtStr, whitespace, NULL, NULL,
+ 0, false, true, encoding);
+ char *val = strtokx2(NULL, whitespace, NULL, quote,
+ 0, false, true, encoding);
+
+ while (key && val)
+ {
+ if (pg_strncasecmp(key, "category", strlen("category")) == 0)
+ {
+ return pstrdup(val);
+ }
+
+ key = strtokx2(NULL, whitespace, NULL, NULL,
+ 0, false, false, encoding);
+ val = strtokx2(NULL, whitespace, NULL, quote,
+ 0, false, true, encoding);
+ }
+
+ return NULL;
+}
+
char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr)
{
const char *whitespace = " \t\n\r";
diff --git a/src/backend/access/external/plugstorage.c b/src/backend/access/external/plugstorage.c
index ad7d260..fe7c82a 100644
--- a/src/backend/access/external/plugstorage.c
+++ b/src/backend/access/external/plugstorage.c
@@ -479,24 +479,33 @@
ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
Relation relation,
int formatterType,
- char *formatterName)
+ char *formatterName,
+ PlannedStmt *plannedstmt,
+ int segno )
{
PlugStorageData psdata;
FunctionCallInfoData fcinfo;
- psdata.type = T_PlugStorageData;
- psdata.ps_relation = relation;
- psdata.ps_formatter_type = formatterType;
- psdata.ps_formatter_name = formatterName;
+ psdata.type = T_PlugStorageData;
+ psdata.ps_relation = relation;
+ psdata.ps_formatter_type = formatterType;
+ psdata.ps_formatter_name = formatterName;
+ psdata.ps_segno = segno;
- InitFunctionCallInfoData(fcinfo,
- func,
- 0,
- (Node *)(&psdata),
- NULL);
+ psdata.ps_scan_state = palloc0(sizeof(ScanState));
+
+ InitFunctionCallInfoData(fcinfo, // FunctionCallInfoData
+ func, // FmgrInfo
+ 0, // nArgs
+ (Node *)(&psdata), // Call Context
+ NULL); // ResultSetInfo
+
+ // Invoke function
FunctionCallInvoke(&fcinfo);
+
+ // We do not expect a null result
if (fcinfo.isnull)
{
elog(ERROR, "function %u returned NULL",
@@ -505,6 +514,7 @@
ExternalInsertDesc extInsertDesc = psdata.ps_ext_insert_desc;
+ pfree(psdata.ps_scan_state);
return extInsertDesc;
}
diff --git a/src/backend/catalog/cdb_external_extensions.sql b/src/backend/catalog/cdb_external_extensions.sql
index e3a8080..d11797f 100644
--- a/src/backend/catalog/cdb_external_extensions.sql
+++ b/src/backend/catalog/cdb_external_extensions.sql
@@ -47,3 +47,15 @@
CREATE OR REPLACE FUNCTION fixedwidth_out(record) RETURNS bytea
AS '$libdir/fixedwidth.so', 'fixedwidth_out'
LANGUAGE C STABLE;
+
+------------------------------------------------------------------
+-- external HDFS
+------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION hdfs_validate() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_validate'
+LANGUAGE C STABLE;
+
+CREATE OR REPLACE FUNCTION hdfs_blocklocation() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_blocklocation'
+LANGUAGE C STABLE;
+
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 18a8acf..e159c83 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -47,7 +47,11 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "port.h"
+#include "access/fileam.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/sysattr.h"
@@ -78,38 +82,42 @@
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
-#include "cdb/cdbappendonlyam.h"
-#include "cdb/cdbpartition.h"
+
#include "cdb/cdbanalyze.h"
+#include "cdb/cdbappendonlyam.h"
+#include "cdb/cdbmirroredfilesysobj.h"
#include "cdb/cdbparquetfooterprocessor.h"
+#include "cdb/cdbparquetstoragewrite.h"
+#include "cdb/cdbpartition.h"
+#include "cdb/cdbpersistentfilesysobj.h"
+#include "cdb/cdbsharedstorageop.h"
+#include "cdb/cdbvars.h"
#include "commands/dbcommands.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
-#include "miscadmin.h"
+
#include "nodes/makefuncs.h"
+#include "nodes/pg_list.h"
+#include "nodes/value.h"
#include "optimizer/clauses.h"
#include "optimizer/var.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
+#include "pg_config_manual.h"
+#include "storage/fd.h"
#include "storage/smgr.h"
+#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h" /* CDB: GetMemoryChunkContext */
+#include "utils/palloc.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
-#include "utils/guc.h"
-#include "cdb/cdbvars.h"
-
-#include "cdb/cdbsharedstorageop.h"
-#include "cdb/cdbmirroredfilesysobj.h"
-#include "cdb/cdbpersistentfilesysobj.h"
-#include "cdb/cdbparquetstoragewrite.h"
-#include "catalog/gp_persistent.h"
-
-#include "utils/guc.h"
+#include "utils/uri.h"
typedef struct pg_result PGresult;
extern void PQclear(PGresult *res);
@@ -2544,7 +2552,52 @@
* External table? If so, delete the pg_exttable tuple.
*/
if (is_external_rel)
+ {
+ /* Step 1. remove uri on file system */
+ rel = relation_open(relid, AccessExclusiveLock);
+ ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
+ char *path = (char *) strVal(linitial(exttbl->locations));
+ char *searchKey = (char *) palloc0 (MAXPGPATH);
+ char *fileSpacePath = NULL;
+ GetFilespacePathForTablespace(get_database_dts(MyDatabaseId),
+ &fileSpacePath);
+ sprintf(searchKey, "%s/ExtErrTbl/",fileSpacePath);
+ char *match = strstr(path,searchKey);
+ if (match)
+ {
+ RemovePath(path, 1);
+ }
+
+ /* Get category for the external table */
+ List *entry_locations = exttbl->locations;
+ Assert(entry_locations);
+ ListCell *entry_location = list_head(entry_locations);
+ char *url = ((Value*)lfirst(entry_location))->val.str;
+ char *category = getExtTblCategoryInFmtOptsStr(exttbl->fmtopts);
+
+ /* Remove data for internal table */
+ if (category != NULL &&
+ pg_strncasecmp(category, "internal", strlen("internal")) == 0)
+ {
+
+
+ if (IS_HDFS_URI(url)) /* ORC, TEXT, CSV */
+ {
+ // orc, text, csv only support one location.
+ Assert(list_length(entry_locations) == 1);
+ RemovePath(url, 1);
+ }
+ }
+
+ if (category)
+ {
+ pfree(category);
+ }
+ relation_close(rel, AccessExclusiveLock);
+
+ /* Step 2. remove pg_exttable entry */
RemoveExtTableEntry(relid);
+ }
if (is_foreign_rel)
RemoveForeignTableEntry(relid);
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index b451f68..f7d4472 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -29,14 +29,17 @@
#include "access/genam.h"
#include "access/aomd.h"
+#include "access/extprotocol.h"
#include "access/heapam.h"
#include "access/filesplit.h"
#include "access/parquetsegfiles.h"
+#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/catquery.h"
#include "catalog/pg_exttable.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_extprotocol.h"
#include "cdb/cdbdatalocality.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
@@ -52,6 +55,7 @@
#include "optimizer/planmain.h"
#include "parser/parsetree.h"
#include "storage/fd.h"
+#include "parser/parse_func.h"
#include "postmaster/identity.h"
#include "cdb/cdbmetadatacache.h"
#include "resourcemanager/utils/network_utils.h"
@@ -87,6 +91,11 @@
List *full_range_tables; /* every table include result relation */
} range_table_collector_context;
+typedef struct collect_scan_rangetable_context {
+ plan_tree_base_prefix base;
+ List *range_tables; // range table for scan only
+ List *full_range_tables; // full range table
+} collect_scan_rangetable_context;
/*
* structure containing information about how much a
* host holds.
@@ -123,10 +132,14 @@
int64 logiceof;
int host;
bool is_local_read;
+ char *ext_file_uri;
} File_Split;
typedef enum DATALOCALITY_RELATION_TYPE {
- DATALOCALITY_APPENDONLY, DATALOCALITY_PARQUET, DATALOCALITY_UNKNOWN
+ DATALOCALITY_APPENDONLY,
+ DATALOCALITY_PARQUET,
+ DATALOCALITY_HDFS,
+ DATALOCALITY_UNKNOWN
} DATALOCALITY_RELATION_TYPE;
/*
@@ -290,6 +303,7 @@
*/
typedef struct split_to_segment_mapping_context {
range_table_collector_context rtc_context;
+ collect_scan_rangetable_context srtc_context;
data_dist_stat_context dds_context;
collect_hdfs_split_location_context chsl_context;
hostname_volume_stat_context host_context;
@@ -315,9 +329,9 @@
int64 total_metadata_logic_len;
- int metadata_cache_time_us;
- int alloc_resource_time_us;
- int cal_datalocality_time_us;
+ int metadata_cache_time_us;
+ int alloc_resource_time_us;
+ int cal_datalocality_time_us;
} split_to_segment_mapping_context;
typedef struct vseg_list{
@@ -331,14 +345,19 @@
static void init_split_assignment_result(Split_Assignment_Result *result,
int host_num);
-static void init_datalocality_context(split_to_segment_mapping_context *context);
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+ split_to_segment_mapping_context *context);
static bool range_table_collector_walker(Node *node,
range_table_collector_context *context);
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
range_table_collector_context *context);
+static bool collect_scan_rangetable(Node *node,
+ collect_scan_rangetable_context *cxt);
+
+
static void convert_range_tables_to_oids_and_check_table_functions(List **range_tables, bool* isUDFExists,
MemoryContext my_memorycontext);
@@ -349,6 +368,8 @@
static int64 get_block_locations_and_claculte_table_size(
split_to_segment_mapping_context *collector_context);
+static bool dataStoredInHdfs(Relation rel);
+
static List *get_virtual_segments(QueryResource *resource);
static List *run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, QueryResource ** resourcePtr,
@@ -368,6 +389,12 @@
Relation_Data *rel_data, int* hitblocks,
int* allblocks, GpPolicy *targetPolicy);
+static void ExternalGetHdfsFileDataLocation(Relation relation,
+ split_to_segment_mapping_context *context, int64 splitsize,
+ Relation_Data *rel_data, int* allblocks);
+
+Oid LookupCustomProtocolBlockLocationFunc(char *protoname);
+
static BlockLocation *fetch_hdfs_data_block_location(char *filepath, int64 len,
int *block_num, RelFileNode rnode, uint32_t segno, double* hit_ratio);
@@ -464,6 +491,9 @@
static int64 set_maximum_segment_volume_parameter(Relation_Data *rel_data,
int host_num, double* maxSizePerSegment);
+static void InvokeHDFSProtocolBlockLocation(Oid procOid,
+ List *locs,
+ List **blockLocations);
/*
* Setup /cleanup the memory context for this run
* of data locality algorithm.
@@ -500,13 +530,17 @@
return;
}
-static void init_datalocality_context(split_to_segment_mapping_context *context) {
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+ split_to_segment_mapping_context *context) {
context->old_memorycontext = CurrentMemoryContext;
context->datalocality_memorycontext = DataLocalityMemoryContext;
context->chsl_context.relations = NIL;
context->rtc_context.range_tables = NIL;
- context->rtc_context.full_range_tables = NIL;
+ context->rtc_context.full_range_tables = plannedstmt->rtable;
+ context->srtc_context.range_tables = NIL;
+ context->srtc_context.full_range_tables = plannedstmt->rtable;
+ context->srtc_context.base.node = (Node *)plannedstmt;
context->externTableForceSegNum = 0;
context->externTableLocationSegNum = 0;
@@ -592,7 +626,7 @@
* collect_range_tables: collect all range table relations, and store
* them into the range_table_collector_context.
*/
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
range_table_collector_context *context) {
query_tree_walker(query, range_table_collector_walker, (void *) context,
@@ -613,9 +647,27 @@
}
context->range_tables = new_range_tables;
}
- context->full_range_tables = full_range_table;
return;
}
+
+bool collect_scan_rangetable(Node *node,
+ collect_scan_rangetable_context *cxt) {
+ if (NULL == node) return false;
+
+ switch (nodeTag(node)) {
+ case T_ExternalScan:
+ case T_AppendOnlyScan:
+ case T_ParquetScan: {
+ RangeTblEntry *rte = rt_fetch(((Scan *)node)->scanrelid,
+ cxt->full_range_tables);
+ cxt->range_tables = lappend(cxt->range_tables, rte);
+ }
+ default:
+ break;
+ }
+
+ return plan_tree_walker(node, collect_scan_rangetable, cxt);
+}
/*
*
*/
@@ -750,6 +802,19 @@
if (uri && uri->protocol == URI_CUSTOM && is_pxf_protocol(uri)) {
isPxf = true;
}
+ else if (uri && (is_hdfs_protocol(uri))) {
+ relation_close(rel, AccessShareLock);
+ if (targetPolicy->nattrs > 0)
+ {
+ /*select the maximum hash bucket number as hashSegNum of query*/
+ if (context->hashSegNum < targetPolicy->bucketnum)
+ {
+ context->hashSegNum = targetPolicy->bucketnum;
+ context->keep_hash = true;
+ }
+ }
+ continue;
+ }
}
}
if (extEnrty->command || isPxf) {
@@ -844,37 +909,14 @@
/*
* We only consider the data stored in HDFS.
*/
- if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
- Relation_Data *rel_data = NULL;
- /*
- * Get pg_appendonly information for this table.
- */
- AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
-
- rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
+ bool isDataStoredInHdfs = dataStoredInHdfs(rel);
+ if (isDataStoredInHdfs ) {
+ GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
+ Relation_Data *rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
rel_data->relid = rel_oid;
rel_data->files = NIL;
rel_data->partition_parent_relid = 0;
rel_data->block_count = 0;
-
- GpPolicy *targetPolicy = NULL;
- targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
- /*
- * Based on the pg_appendonly information, calculate the data
- * location information associated with this relation.
- */
- if (RelationIsAoRows(rel)) {
- rel_data->type = DATALOCALITY_APPENDONLY;
- AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
- aoEntry->splitsize, rel_data, &hitblocks,
- &allblocks, targetPolicy);
- } else {
- rel_data->type = DATALOCALITY_PARQUET;
- ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
- context->split_size, rel_data, &hitblocks,
- &allblocks, targetPolicy);
- }
-
bool isResultRelation = true;
ListCell *nonResultlc;
foreach(nonResultlc, context->rtc_context.range_tables)
@@ -884,6 +926,54 @@
isResultRelation = false;
}
}
+
+ if (!isResultRelation) {
+ // skip the relation not in scan nodes
+ // for partition table scan optimization;
+ ListCell *rtc;
+ bool found = false;
+ foreach(rtc, context->srtc_context.range_tables) {
+ RangeTblEntry *rte = lfirst(rtc);
+ if (rel_oid == rte->relid) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ relation_close(rel, AccessShareLock);
+ continue;
+ }
+ }
+
+ if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+ /*
+ * Get pg_appendonly information for this table.
+ */
+ AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
+ /*
+ * Based on the pg_appendonly information, calculate the data
+ * location information associated with this relation.
+ */
+ if (RelationIsAoRows(rel)) {
+ rel_data->type = DATALOCALITY_APPENDONLY;
+ AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+ aoEntry->splitsize, rel_data, &hitblocks,
+ &allblocks, targetPolicy);
+ } else {
+ rel_data->type = DATALOCALITY_PARQUET;
+ ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+ context->split_size, rel_data, &hitblocks,
+ &allblocks, targetPolicy);
+ }
+ } else if (RelationIsExternal(rel)) {
+ if (isDataStoredInHdfs) {
+ // we can't use metadata cache, so hitblocks will always be 0
+ rel_data->type = DATALOCALITY_HDFS;
+ ExternalGetHdfsFileDataLocation(rel, context, context->split_size,
+ rel_data, &allblocks);
+ }
+ }
+
if (!isResultRelation) {
total_size += rel_data->total_size;
totalFileCount += list_length(rel_data->files);
@@ -915,8 +1005,7 @@
}
context->total_file_count = totalFileCount;
context->total_size = total_size;
-
- context->metadata_cache_time_us = eclaspeTime;
+ context->metadata_cache_time_us = eclaspeTime;
if(debug_datalocality_time){
elog(LOG, "metadata overall execution time: %d us. \n", eclaspeTime);
@@ -924,6 +1013,25 @@
return total_size;
}
+bool dataStoredInHdfs(Relation rel) {
+ if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+ return true;
+ } else if (RelationIsExternal(rel)) {
+ ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id);
+ bool isHdfsProtocol = false;
+ if (!extEnrty->command && extEnrty->locations) {
+ char* first_uri_str = (char *) strVal(lfirst(list_head(extEnrty->locations)));
+ if (first_uri_str) {
+ Uri* uri = ParseExternalTableUri(first_uri_str);
+ if (uri && is_hdfs_protocol(uri)) {
+ isHdfsProtocol = true;
+ }
+ }
+ }
+ return isHdfsProtocol;
+ }
+ return false;
+}
/*
* search_host_in_stat_context: search a host name in the statistic
* context; if not found, create a new one.
@@ -1579,7 +1687,237 @@
return;
}
+static void InvokeHDFSProtocolBlockLocation(Oid procOid,
+ List *locs,
+ List **blockLocations)
+{
+ ExtProtocolValidatorData *validator_data;
+ FmgrInfo *validator_udf;
+ FunctionCallInfoData fcinfo;
+ validator_data = (ExtProtocolValidatorData *)
+ palloc0 (sizeof(ExtProtocolValidatorData));
+ validator_udf = palloc(sizeof(FmgrInfo));
+ fmgr_info(procOid, validator_udf);
+
+ validator_data->type = T_ExtProtocolValidatorData;
+ validator_data->url_list = locs;
+ validator_data->format_opts = NULL;
+ validator_data->errmsg = NULL;
+ validator_data->direction = EXT_VALIDATE_READ;
+ validator_data->action = EXT_VALID_ACT_GETBLKLOC;
+
+ InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
+ /* FmgrInfo */ validator_udf,
+ /* nArgs */ 0,
+ /* Call Context */ (Node *) validator_data,
+ /* ResultSetInfo */ NULL);
+
+ /* invoke validator. if this function returns - validation passed */
+ FunctionCallInvoke(&fcinfo);
+
+ ExtProtocolBlockLocationData *bls =
+ (ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
+ /* debug output block location. */
+ if (bls != NULL)
+ {
+ ListCell *c;
+ int i = 0 ,j = 0;
+ foreach(c, bls->files)
+ {
+ blocklocation_file *blf = (blocklocation_file *)(lfirst(c));
+ elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
+ blf->file_uri, blf->block_num);
+ for ( i = 0 ; i < blf->block_num ; ++i )
+ {
+ BlockLocation *pbl = &(blf->locations[i]);
+ elog(DEBUG3, "DEBUG LOCATION for block %d : %d, "
+ INT64_FORMAT ", " INT64_FORMAT ", %d",
+ i,
+ pbl->corrupt, pbl->length, pbl->offset,
+ pbl->numOfNodes);
+ for ( j = 0 ; j < pbl->numOfNodes ; ++j )
+ {
+ elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s",
+ i,
+ pbl->hosts[j], pbl->names[j],
+ pbl->topologyPaths[j]);
+ }
+ }
+ }
+ }
+
+ elog(DEBUG3, "after invoking get block location API");
+
+ /* get location data from fcinfo.resultinfo. */
+ if (bls != NULL)
+ {
+ Assert(bls->type == T_ExtProtocolBlockLocationData);
+ while(list_length(bls->files) > 0)
+ {
+ void *v = lfirst(list_head(bls->files));
+ bls->files = list_delete_first(bls->files);
+ *blockLocations = lappend(*blockLocations, v);
+ }
+ }
+ pfree(validator_data);
+ pfree(validator_udf);
+}
+
+Oid
+LookupCustomProtocolBlockLocationFunc(char *protoname)
+{
+ List* funcname = NIL;
+ Oid procOid = InvalidOid;
+ Oid argList[1];
+ Oid returnOid;
+
+ char* new_func_name = (char *)palloc0(strlen(protoname) + 16);
+ sprintf(new_func_name, "%s_blocklocation", protoname);
+ funcname = lappend(funcname, makeString(new_func_name));
+ returnOid = VOIDOID;
+ procOid = LookupFuncName(funcname, 0, argList, true);
+
+ if (!OidIsValid(procOid))
+ ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("protocol function %s was not found.",
+ new_func_name),
+ errhint("Create it with CREATE FUNCTION."),
+ errOmitLocation(true)));
+
+ /* check return type matches */
+ if (get_func_rettype(procOid) != returnOid)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("protocol function %s has an incorrect return type",
+ new_func_name),
+ errOmitLocation(true)));
+
+ /* check allowed volatility */
+ if (func_volatile(procOid) != PROVOLATILE_STABLE)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("protocol function %s is not declared STABLE.",
+ new_func_name),
+ errOmitLocation(true)));
+ pfree(new_func_name);
+
+ return procOid;
+}
+
+static void ExternalGetHdfsFileDataLocation(
+ Relation relation,
+ split_to_segment_mapping_context *context,
+ int64 splitsize,
+ Relation_Data *rel_data,
+ int* allblocks) {
+ ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
+ Assert(ext_entry->locations != NIL);
+ int64 total_size = 0;
+ int segno = 1;
+
+ /*
+ * Step 1. get external HDFS location from URI.
+ */
+ char* first_uri_str = (char *) strVal(lfirst(list_head(ext_entry->locations)));
+ /* We must have at least one location. */
+ Assert(first_uri_str != NULL);
+ Uri* uri = ParseExternalTableUri(first_uri_str);
+ bool isHdfs = false;
+ if (uri != NULL && is_hdfs_protocol(uri)) {
+ isHdfs = true;
+ }
+ Assert(isHdfs); /* Currently, we accept HDFS only. */
+
+ /*
+ * Step 2. Get function to call for getting location information. This work
+ * is done by validator function registered for this external protocol.
+ */
+ Oid procOid = InvalidOid;
+ if (isHdfs) {
+ procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
+ }
+ else
+ {
+ Assert(false);
+ }
+
+ /*
+ * Step 3. Call validator to get location data.
+ */
+
+ /* Prepare function call parameter by passing into location string. This is
+ * only called at dispatcher side. */
+ List *bls = NULL; /* Block locations */
+ if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
+ {
+ InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, &bls);
+ }
+
+ /*
+ * Step 4. Build data location info for optimization after this call.
+ */
+
+ /* Go through each files */
+ ListCell *cbl = NULL;
+ foreach(cbl, bls)
+ {
+ blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
+ BlockLocation *locations = f->locations;
+ int block_num = f->block_num;
+ int64 logic_len = 0;
+ *allblocks += block_num;
+ if ((locations != NULL) && (block_num > 0)) {
+ // calculate length for one specific file
+ for (int j = 0; j < block_num; ++j) {
+ logic_len += locations[j].length;
+ // locations[j].lowerBoundInc = NULL;
+ // locations[j].upperBoundExc = NULL;
+ }
+ total_size += logic_len;
+
+ Block_Host_Index * host_index = update_data_dist_stat(context,
+ locations, block_num);
+
+ Relation_File *file = (Relation_File *) palloc(sizeof(Relation_File));
+ if (atoi(strrchr(f->file_uri, '/') + 1) > 0)
+ file->segno = atoi(strrchr(f->file_uri, '/') + 1);
+ else
+ file->segno = segno++;
+ file->block_num = block_num;
+ file->locations = locations;
+ file->hostIDs = host_index;
+ file->logic_len = logic_len;
+
+ // do the split logic
+ int realSplitNum = 0;
+ int split_num = file->block_num;
+ int64 offset = 0;
+ File_Split *splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
+ while (realSplitNum < split_num) {
+ splits[realSplitNum].host = -1;
+ splits[realSplitNum].is_local_read = true;
+ splits[realSplitNum].offset = offset;
+ splits[realSplitNum].length = file->locations[realSplitNum].length;
+ splits[realSplitNum].logiceof = logic_len;
+ splits[realSplitNum].ext_file_uri = pstrdup(f->file_uri);
+
+ if (logic_len - offset <= splits[realSplitNum].length) {
+ splits[realSplitNum].length = logic_len - offset;
+ ++realSplitNum;
+ break;
+ }
+ offset += splits[realSplitNum].length;
+ ++realSplitNum;
+ }
+ file->split_num = realSplitNum;
+ file->splits = splits;
+ context->total_split_count += realSplitNum;
+
+ rel_data->files = lappend(rel_data->files, file);
+ }
+ }
+ context->total_metadata_logic_len += total_size;
+ rel_data->total_size = total_size;
+}
/*
* step 1 search segments with local read and segment is not full after being assigned the block
* step 2 search segments with local read and segment is not full before being assigned the block
@@ -4021,14 +4359,17 @@
* fixedVsegNum is used by PBE, since all the execute should use the same number of vsegs.
*/
SplitAllocResult *
-calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
- List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum, int fixedVsegNum) {
+calculate_planner_segment_num(PlannedStmt *plannedstmt, Query *query,
+ QueryResourceLife resourceLife, int fixedVsegNum) {
SplitAllocResult *result = NULL;
QueryResource *resource = NULL;
-
List *virtual_segments = NIL;
List *alloc_result = NIL;
+ Node *planTree = plannedstmt->planTree;
+ GpPolicy *intoPolicy = plannedstmt->intoPolicy;
+ int sliceNum = plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1;
split_to_segment_mapping_context context;
+ context.chsl_context.relations = NIL;
int planner_segments = 0; /*virtual segments number for explain statement */
@@ -4061,9 +4402,11 @@
{
init_datalocality_memory_context();
- init_datalocality_context(&context);
+ init_datalocality_context(plannedstmt, &context);
- collect_range_tables(query, fullRangeTable, &(context.rtc_context));
+ collect_range_tables(query, &(context.rtc_context));
+
+ collect_scan_rangetable(planTree, &(context.srtc_context));
bool isTableFunctionExists = false;
@@ -4104,6 +4447,9 @@
/* get block location and calculate relation size*/
get_block_locations_and_claculte_table_size(&context);
+ if(context.chsl_context.relations){
+ Relation_Data* tmp = (Relation_Data*) lfirst(context.chsl_context.relations->tail);
+ }
/*use inherit resource*/
if (resourceLife == QRL_INHERIT) {
diff --git a/src/backend/cdb/cdbpartition.c b/src/backend/cdb/cdbpartition.c
index a887ee4..963983f 100644
--- a/src/backend/cdb/cdbpartition.c
+++ b/src/backend/cdb/cdbpartition.c
@@ -4787,9 +4787,9 @@
static void
fixup_table_storage_options(CreateStmt *ct)
{
- if (!ct->options)
+ if (!ct->base.options)
{
- ct->options = list_make2(makeDefElem("appendonly",
+ ct->base.options = list_make2(makeDefElem("appendonly",
(Node *)makeString("true")),
makeDefElem("orientation",
(Node *)makeString("column")));
@@ -5243,11 +5243,11 @@
ct = (CreateStmt *)linitial((List *)pUtl);
Assert(IsA(ct, CreateStmt));
- ct->is_add_part = true; /* subroutines need to know this */
+ ct->base.is_add_part = true; /* subroutines need to know this */
ct->ownerid = ownerid;
- if (!ct->distributedBy)
- ct->distributedBy = make_dist_clause(rel);
+ if (!ct->base.distributedBy)
+ ct->base.distributedBy = make_dist_clause(rel);
if (bSetTemplate)
/* if creating a template, silence partition name messages */
@@ -6377,7 +6377,7 @@
* name of the parent relation
*/
- ct->relation = makeRangeVar(
+ ct->base.relation = makeRangeVar(
NULL /*catalogname*/,
get_namespace_name(RelationGetNamespace(par_rel)),
RelationGetRelationName(par_rel), -1);
@@ -6429,7 +6429,7 @@
Oid skipTableRelid = InvalidOid;
List *attr_encodings = NIL;
- ct->partitionBy = (Node *)pBy;
+ ct->base.partitionBy = (Node *)pBy;
/* this parse_analyze expands the phony create of a partitioned table
* that we just build into the constituent commands we need to create
@@ -6517,7 +6517,7 @@
gs->objtype = ACL_OBJECT_RELATION;
gs->cooked_privs = cp;
- gs->objects = list_make1(copyObject(t->relation));
+ gs->objects = list_make1(copyObject(t->base.relation));
pt = parse_analyze((Node *)gs, NULL, NULL, 0);
l1 = list_concat(l1, pt);
@@ -6545,7 +6545,7 @@
{
CreateStmt *t = (CreateStmt *)((Query *)s)->utilityStmt;
- skipTableRelid = RangeVarGetRelid(t->relation, true, false /*allowHcatalog*/);
+ skipTableRelid = RangeVarGetRelid(t->base.relation, true, false /*allowHcatalog*/);
}
}
@@ -8376,7 +8376,7 @@
/* Caller should check this! */
Assert(stmt->partitionBy && !stmt->is_part_child);
- foreach( lc_elt, stmt->tableElts )
+ foreach( lc_elt, stmt->base.tableElts )
{
Node * elt = lfirst(lc_elt);
@@ -8512,7 +8512,7 @@
FkConstraint *fcon = list_nth(unnamed_cons, i);
fcon->constr_name =
- ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+ ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
colname,
label,
used_names);
@@ -8528,7 +8528,7 @@
if ( 0 != strcmp(label, "pkey") )
colname = list_nth(unnamed_cons_col, i);
- con->name = ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+ con->name = ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
colname,
label,
used_names);
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index ddd9677..ab70e5a 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -54,6 +54,7 @@
#include "access/aosegfiles.h"
#include "access/parquetsegfiles.h"
#include "access/hash.h"
+#include "access/xact.h"
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/namespace.h"
@@ -64,6 +65,7 @@
#include "cdb/cdbheap.h"
#include "cdb/cdbhash.h"
#include "commands/vacuum.h"
+#include "commands/dbcommands.h"
#include "executor/executor.h"
#include "lib/stringinfo.h"
#include "libpq/pqformat.h" /* pq_beginmessage() etc. */
@@ -80,12 +82,15 @@
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/tuplesort.h"
+#include "utils/palloc.h"
#include "utils/pg_locale.h"
#include "utils/builtins.h"
#include "utils/inval.h"
+#include "utils/uri.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbanalyze.h"
#include "cdb/cdbrelsize.h"
+#include "cdb/cdbdatalocality.h"
#include "utils/fmgroids.h"
#include "storage/backendid.h"
#include "executor/spi.h"
@@ -93,6 +98,8 @@
#include "catalog/pg_namespace.h"
#include "utils/debugbreak.h"
#include "nodes/makefuncs.h"
+#include "nodes/nodes.h"
+#include "nodes/parsenodes.h"
#include "commands/analyzeutils.h"
@@ -153,9 +160,15 @@
static void gp_statistics_estimate_reltuples_relpages_heap(Relation rel, float4 *reltuples, float4 *relpages);
static void gp_statistics_estimate_reltuples_relpages_ao_rows(Relation rel, float4 *reltuples, float4 *relpages);
static void gp_statistics_estimate_reltuples_relpages_parquet(Relation rel, float4 *reltuples, float4 *relpages);
+static void gp_statistics_estimate_reltuples_relpages_external(Relation rel, float4 *relTuples, float4 *relPages);
static void analyzeEstimateReltuplesRelpages(Oid relationOid, float4 *relTuples, float4 *relPages, bool rootonly);
static void analyzeEstimateIndexpages(Oid relationOid, Oid indexOid, float4 *indexPages);
+static void getExternalRelTuples(Oid relationOid, float4 *relTuples);
+static void getExternalRelPages(Oid relationOid, float4 *relPages , Relation rel);
+static float4 getExtrelPagesHDFS(Uri *uri);
+static bool isExternalHDFSProtocol(Oid relOid);
+
/* Attribute-type related functions */
static bool isOrderedAndHashable(Oid relationOid, const char *attributeName);
static bool isBoolType(Oid relationOid, const char *attributeName);
@@ -442,6 +455,17 @@
"Please run ANALYZE on the root partition table.",
get_rel_name(relationOid))));
}
+ else if (!isExternalHDFSProtocol(relationOid))
+ {
+ /*
+ * Support analyze for external table.
+ * For now, HDFS protocol external table is supported.
+ */
+ ereport(WARNING,
+ (errmsg("skipping \"%s\" --- cannot analyze external table with non-HDFS or non-MAGMA protocol. "
+ "Please run ANALYZE on external table with HDFS or MAGMA protocol.",
+ get_rel_name(relationOid))));
+ }
else
{
lRelOids = list_make1_oid(relationOid);
@@ -989,8 +1013,10 @@
while (HeapTupleIsValid(tuple = caql_getnext(pcqCtx)))
{
Oid candidateOid = HeapTupleGetOid(tuple);
- if (analyzePermitted(candidateOid)
- && candidateOid != StatisticRelationId)
+ bool isExternalHDFS = isExternalHDFSProtocol(candidateOid);
+ if (analyzePermitted(candidateOid) &&
+ candidateOid != StatisticRelationId &&
+ isExternalHDFS)
{
*fullRelOids = lappend_oid(*fullRelOids, candidateOid);
}
@@ -998,8 +1024,9 @@
{
continue;
}
- if (analyzePermitted(candidateOid)
- && candidateOid != StatisticRelationId)
+ if (analyzePermitted(candidateOid) &&
+ candidateOid != StatisticRelationId &&
+ isExternalHDFS)
{
lRelOids = lappend_oid(lRelOids, candidateOid);
}
@@ -1705,6 +1732,11 @@
gp_statistics_estimate_reltuples_relpages_parquet(rel, &partRelTuples,
&partRelPages);
}
+ else if( RelationIsExternal(rel))
+ {
+ gp_statistics_estimate_reltuples_relpages_external(rel, &partRelTuples,
+ &partRelPages);
+ }
relation_close(rel, AccessShareLock);
*relPages += partRelPages;
@@ -3266,3 +3298,380 @@
pfree(fstotal);
return;
}
+
+/**
+ * This method estimates the number of tuples and pages in an extern relation. We can not get accurate tuple counts
+ * and pages counts in the catalog. Therefore, we have to get reltuples and relpages manually.
+ *
+ * Input:
+ * rel - Relation. Must be an external table.
+ *
+ * Output:
+ * reltuples - exact number of tuples in relation.
+ * relpages - exact number of pages.
+ */
+static void gp_statistics_estimate_reltuples_relpages_external(Relation rel, float4 *relTuples, float4 *relPages){
+ Oid extRelationOid = RelationGetRelid(rel);
+ getExternalRelTuples(extRelationOid, relTuples);
+ getExternalRelPages(extRelationOid, relPages, rel);
+}
+
+/**
+ * This method called by analyzeExternalEstimateReltuplesRelpages,
+ * to get External Relation reltuple counts, we run count(*) sql manually
+ *
+ * Input:
+ * extRelationOid - External Table Relation Oid
+ * Output:
+ * relTuples - exact number of tuples in relation.
+ */
+static void getExternalRelTuples(Oid extRelationOid, float4 *relTuples){
+ const char *schemaName = NULL;
+ const char *tableName = NULL;
+ schemaName = get_namespace_name(get_rel_namespace(extRelationOid)); /* must be pfreed */
+ tableName = get_rel_name(extRelationOid); /* must be pfreed */
+
+ StringInfoData str;
+ initStringInfo(&str);
+ appendStringInfo(&str, "select count(*)::float4 from %s.%s as Ta",
+ quote_identifier(schemaName),
+ quote_identifier(tableName));
+
+ spiExecuteWithCallback(str.data, false /*readonly*/, 0 /*tcount */,
+ spiCallback_getSingleResultRowColumnAsFloat4, relTuples);
+ pfree(tableName);
+ pfree(schemaName);
+ pfree(str.data);
+}
+
+/**
+ * This method called by analyzeExternalEstimateReltuplesRelpages,to get External Relation relpages counts.
+ * We call GetExtTableEntry method to get get List of external Table Locations.And then we go through every
+ * location url to sum the count of relpages.External Relation now support some different protocals, therefore
+ * we need to process them in different way.
+ *
+ * Input:
+ * extRelationOid - External Table Relation Oid
+ * Output:
+ * relTuples - exact number of pages in relation.
+ */
+static void getExternalRelPages(Oid extRelationOid, float4 *relPages , Relation rel){
+
+ ExtTableEntry* entry = GetExtTableEntry(extRelationOid);
+ List* extLocations = entry->locations;
+ int num_urls = list_length(extLocations);
+ ListCell *cell = list_head(extLocations);
+ ListCell *cellTmp = NULL;
+ while(cell != NULL)
+ {
+ char *url = pstrdup(((Value*)lfirst(cell))->val.str);
+ Assert(url != NULL);
+ Uri *uri = ParseExternalTableUri(url);
+ Assert(uri != NULL);
+ switch (uri->protocol){
+ case URI_HDFS:
+ *relPages += getExtrelPagesHDFS(uri);
+ break;
+
+ /*
+ * to be done
+ */
+ case URI_GPFDIST:
+ *relPages = 1.0;
+ elog(NOTICE,"In external table ANALYZE command are not supported in GPFDIST location so far.");
+ break;
+ case URI_FILE:
+ *relPages = 1.0;
+ elog(NOTICE,"In external table ANALYZE command are not supported in FILE location so far.");
+ break;
+ case URI_FTP:
+ *relPages = 1.0;
+ elog(NOTICE,"In external table ANALYZE command are not supported in FTP location so far.");
+ break;
+ case URI_HTTP:
+ *relPages = 1.0;
+ elog(NOTICE,"In external table ANALYZE command are not supported in HTTP location so far.");
+ break;
+ case URI_CUSTOM:
+ *relPages = 1.0;
+ elog(NOTICE,"In external table ANALYZE command are not supported in CUSTOM location so far.");
+ break;
+ case URI_GPFDISTS:
+ *relPages = 1.0;
+ elog(NOTICE,"In external table ANALYZE command are not supported in GPFDISTS location so far.");
+ break;
+ default:
+ *relPages = 1.0;
+ elog(NOTICE,"should not go here");
+ break;
+ }
+
+ cell = cell->next;
+
+ /* free resourse */
+ pfree(url);
+ if(uri->customprotocol != NULL){ pfree(uri->customprotocol);}
+ pfree(uri->hostname);
+ if(uri->path!=NULL){pfree(uri->path);}
+ FreeExternalTableUri(uri);
+ }
+ /* pfree entry->location*/
+ list_free_deep(extLocations);
+ /* pfree entry */
+ if(entry->fmtopts != NULL){ pfree(entry->fmtopts);}
+ if(entry->command != NULL){ pfree(entry->command);}
+ pfree(entry);
+}
+
+/**
+ * This method get the number of pages external table which location uri protocol is HDFS. We hold that
+ * the concept of the page number in external table is same as the concept of block number in hdfs.
+ * Therefore we get the number of pages for external table by get the number of blocks in hdfs
+ *
+ * Input:
+ * uri - hdfs uri which refers to external table storage location, uri can refer to a file or a folder
+ *
+ */
+static float4 getExtrelPagesHDFS(Uri *uri){
+ int numOfBlock = 0;
+ int nsize = 0;
+ float4 relpages = 0.0;
+ hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+
+ //hdfsFileInfo *fiarray = hdfsGetPathInfo(fs, uri->path);
+ hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+ if (fs == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to get files of path %s",
+ uri->path);
+ }
+
+ /* Call block location api to get data location for each file */
+ for (int i = 0 ; i < nsize ; i++)
+ {
+// FscHdfsFileInfoC *fi = FscHdfsGetFileInfoFromArray(fiarray, i);
+ hdfsFileInfo *fi = &fiarray[i];
+ /* break condition of this for loop */
+ if (fi == NULL) {break;}
+
+ /* Build file name full path. */
+ const char *fname = fi->mName;
+ char *fullpath = palloc0(
+ strlen(fname) + /* name */
+ 1); /* \0 */
+ sprintf(fullpath, "%s/%s", uri->path, fname);
+
+ /* Get file full length. */
+ // int64_t len = FscHdfsGetFileInfoLength(fi);
+ int64_t len = fi->mSize;
+ if (len == 0) {
+ pfree(fullpath);
+ continue;
+ }
+
+ /* Get block location data for this file */
+ BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
+ if (bla == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to get block location of path %s. "
+ "It is reported generally due to HDFS service errors or "
+ "another session's ongoing writing.",
+ fullpath);
+ }
+
+ relpages += numOfBlock;
+
+
+ /* We don't need it any longer */
+ pfree(fullpath);
+
+ /* Clean up block location instances created by the lib. */
+ hdfsFreeFileBlockLocations(&bla,numOfBlock);
+ }
+
+ /* Clean up file info array created by the lib for this location. */
+// FscHdfsFreeFileInfoArrayC(&fiarray);
+ hdfsFreeFileInfo(fiarray,nsize);
+ hdfsDisconnect(fs);
+ return relpages;
+}
+
+
+/*
+ * Get total bytes of external table with HDFS protocol
+ */
+uint64 GetExternalTotalBytesHDFS(Uri *uri)
+{
+ uint64 totalBytes = 0;
+ int nsize = 0;
+
+ hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+
+ hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+ if (fiarray == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to get files of path %s.",
+ uri->path);
+ }
+
+ /* Call block location api to get data location for each file */
+ for (int i = 0 ; i < nsize ; i++)
+ {
+ hdfsFileInfo *fi = &fiarray[i];
+
+ /* Break condition of this for loop */
+ if (fi == NULL)
+ {
+ break;
+ }
+
+ /* Get file full length. */
+ totalBytes += fi->mSize;
+
+ }
+
+ /* Clean up file info array created by the lib for this location. */
+ hdfsFreeFileInfo(fiarray,nsize);
+ hdfsDisconnect(fs);
+
+ return totalBytes;
+}
+
+/*
+ * Get total bytes of external table
+ */
+uint64 GetExternalTotalBytes(Relation rel)
+{
+ Oid extRelOid = RelationGetRelid(rel);
+ ExtTableEntry *entry = GetExtTableEntry(extRelOid);
+ List *extLocations = entry->locations;
+ int num_urls = list_length(extLocations);
+ ListCell *cell = list_head(extLocations);
+ ListCell *cellTmp = NULL;
+ uint64 totalBytes = 0;
+
+ while(cell != NULL)
+ {
+ char *url = pstrdup(((Value*)lfirst(cell))->val.str);
+ Assert(url != NULL);
+
+ Uri *uri = ParseExternalTableUri(url);
+ Assert(uri != NULL);
+
+ switch (uri->protocol)
+ {
+ case URI_HDFS:
+ totalBytes += GetExternalTotalBytesHDFS(uri);
+ break;
+ /*
+ * Support analyze for external table.
+ * For now, HDFS protocol external table is supported.
+ */
+ case URI_GPFDIST:
+ totalBytes += 0;
+ elog(ERROR,"In external table ANALYZE command are not supported in GPFDIST location so far.");
+ break;
+
+ case URI_FILE:
+ totalBytes += 0;
+ elog(ERROR,"In external table ANALYZE command are not supported in FILE location so far.");
+ break;
+
+ case URI_FTP:
+ totalBytes += 0;
+ elog(ERROR,"In external table ANALYZE command are not supported in FTP location so far.");
+ break;
+
+ case URI_HTTP:
+ totalBytes += 0;
+ elog(ERROR,"In external table ANALYZE command are not supported in HTTP location so far.");
+ break;
+
+ case URI_CUSTOM:
+ totalBytes += 0;
+ elog(ERROR,"In external table ANALYZE command are not supported in CUSTOM location so far.");
+ break;
+
+ case URI_GPFDISTS:
+ totalBytes += 0;
+ elog(ERROR,"In external table ANALYZE command are not supported in GPFDISTS location so far.");
+ break;
+
+ default:
+ totalBytes += 0;
+ elog(ERROR,"should not go here");
+ break;
+ }
+
+ cell = cell->next;
+
+ /* free resourse */
+ pfree(url);
+ if (uri->customprotocol != NULL)
+ {
+ pfree(uri->customprotocol);
+ }
+ pfree(uri->hostname);
+
+ if (uri->path != NULL)
+ {
+ pfree(uri->path);
+ }
+ FreeExternalTableUri(uri);
+
+ }
+ /* pfree entry->location*/
+ list_free_deep(extLocations);
+ /* pfree entry */
+ if (entry->fmtopts != NULL)
+ {
+ pfree(entry->fmtopts);
+ }
+ if (entry->command != NULL)
+ {
+ pfree(entry->command);
+ }
+ pfree(entry);
+
+ return totalBytes;
+}
+
+/*
+ * Check if a relation is external table with HDFS protocol
+ */
+static bool isExternalHDFSProtocol(Oid relOid)
+{
+ bool ret = true;
+
+ Relation rel = try_relation_open(relOid, AccessShareLock, false);
+ if (rel != NULL)
+ {
+ if ((rel->rd_rel->relkind == RELKIND_RELATION) &&
+ RelationIsExternal(rel))
+ {
+ ExtTableEntry* entry = GetExtTableEntry(relOid);
+ List* extLocations = entry->locations;
+ ListCell *cell = list_head(extLocations);
+ while(cell != NULL)
+ {
+ char *url = ((Value*)lfirst(cell))->val.str;
+ Assert(url != NULL);
+ // if (!IS_HDFS_URI(url))
+ if (!IS_HDFS_URI(url))
+ {
+ ret = false;
+ break;
+ }
+
+ cell = cell->next;
+ }
+ }
+
+ relation_close(rel, AccessShareLock);
+ }
+
+ return ret;
+}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6e08bf0..9c21148 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -4308,8 +4308,9 @@
else if (formatterType != ExternalTableType_PLUG)
{
resultRelInfo->ri_extInsertDesc =
- external_insert_init(resultRelInfo->ri_RelationDesc,
- 0, formatterType, formatterName);
+ resultRelInfo->ri_extInsertDesc =
+ external_insert_init(resultRelInfo->ri_RelationDesc,
+ 0, formatterType, formatterName, NULL);
}
else
{
@@ -4323,11 +4324,20 @@
FmgrInfo *insertInitFunc = (FmgrInfo *)palloc(sizeof(FmgrInfo));
fmgr_info(procOid, insertInitFunc);
+ ResultRelSegFileInfo *segfileinfo = NULL;
+ ResultRelInfoSetSegFileInfo(resultRelInfo,
+ cstate->ao_segfileinfos);
+ segfileinfo = (ResultRelSegFileInfo *) list_nth(
+ resultRelInfo->ri_aosegfileinfos,
+ GetQEIndex());
+
resultRelInfo->ri_extInsertDesc =
- InvokePlugStorageFormatInsertInit(insertInitFunc,
- resultRelInfo->ri_RelationDesc,
- formatterType,
- formatterName);
+ InvokePlugStorageFormatInsertInit(insertInitFunc,
+ resultRelInfo->ri_RelationDesc,
+ formatterType,
+ formatterName,
+ NULL,
+ segfileinfo->segno);
pfree(insertInitFunc);
}
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index bd8b7d4..dd85f32 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -903,6 +903,7 @@
errOmitLocation(true)));
}
+
static void
ComputeIndexAttrs(IndexInfo *indexInfo,
Oid *classOidP,
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index f1259d6..c82d570 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -65,6 +65,8 @@
#include "postmaster/seqserver.h"
+#include "catalog/pg_exttable.h"
+
/*
* We don't want to log each fetching of a value from a sequence,
@@ -415,7 +417,7 @@
stmt->oidInfo.aosegIndexOid = 0;
stmt->oidInfo.aoblkdirOid = 0;
stmt->oidInfo.aoblkdirIndexOid = 0;
- stmt->tableElts = NIL;
+ stmt->base.tableElts = NIL;
for (i = SEQ_COL_FIRSTCOL; i <= SEQ_COL_LASTCOL; i++)
{
ColumnDef *coldef = makeNode(ColumnDef);
@@ -478,20 +480,20 @@
value[i - 1] = BoolGetDatum(false);
break;
}
- stmt->tableElts = lappend(stmt->tableElts, coldef);
+ stmt->base.tableElts = lappend(stmt->base.tableElts, coldef);
}
- stmt->relation = seq->sequence;
- stmt->inhRelations = NIL;
- stmt->constraints = NIL;
- stmt->options = list_make1(defWithOids(false));
- stmt->oncommit = ONCOMMIT_NOOP;
- stmt->tablespacename = NULL;
- stmt->relKind = RELKIND_SEQUENCE;
+ stmt->base.relation = seq->sequence;
+ stmt->base.inhRelations = NIL;
+ stmt->base.constraints = NIL;
+ stmt->base.options = list_make1(defWithOids(false));
+ stmt->base.oncommit = ONCOMMIT_NOOP;
+ stmt->base.tablespacename = NULL;
+ stmt->base.relKind = RELKIND_SEQUENCE;
stmt->oidInfo.comptypeOid = seq->comptypeOid;
stmt->ownerid = GetUserId();
- seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP);
+ seqoid = DefineRelation(stmt, RELKIND_SEQUENCE, RELSTORAGE_HEAP, NonCustomFormatType);
/*
* Open and lock the new sequence. (This lock is redundant; an
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c920260..156e04c 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -32,6 +32,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "fmgr.h"
#include <math.h>
#include <fcntl.h>
@@ -49,6 +50,7 @@
#include "access/reloptions.h"
#include "access/xact.h"
#include "access/transam.h"
+#include "access/plugstorage.h"
#include "catalog/catalog.h"
#include "catalog/catquery.h"
#include "catalog/dependency.h"
@@ -68,6 +70,7 @@
#include "catalog/pg_inherits.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_operator.h"
+#include "catalog/pg_proc.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
#include "catalog/pg_tablespace.h"
@@ -94,6 +97,8 @@
#include "nodes/makefuncs.h"
#include "nodes/print.h"
#include "nodes/relation.h"
+#include "nodes/parsenodes.h"
+#include "nodes/value.h"
#include "optimizer/clauses.h"
#include "optimizer/plancat.h"
#include "optimizer/planner.h"
@@ -126,6 +131,7 @@
#include "utils/syscache.h"
#include "utils/tqual.h"
#include "utils/uri.h"
+#include "utils/formatting.h"
#include "mb/pg_wchar.h"
#include "cdb/cdbdisp.h"
@@ -143,6 +149,7 @@
#include "cdb/cdbquerycontextdispatching.h"
#include "cdb/dispatcher.h"
#include "cdb/cdbdispatchresult.h"
+#include "optimizer/planmain.h"
/*
* ON COMMIT action list
@@ -387,12 +394,35 @@
const char *newname,
bool fk_scan,
bool update_relname);
-static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool iswritable, bool* isCustom);
+static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb,
+ bool iswritable, bool forceCreateDir, bool* isCustom);
static Datum transformExecOnClause(List *on_clause, int *preferred_segment_num, bool iswritable);
static char transformFormatType(char *formatname);
-static Datum transformFormatOpts(char formattype, List *formatOpts, int numcols, bool iswritable);
+static void checkCustomFormatOptString(char **opt,
+ const char *key,
+ const char *val,
+ const bool needopt,
+ const int nvalidvals,
+ const char **validvals);
+static void checkCustomFormatEncoding(const char *formatterName,
+ const char *encodingName);
+static char *checkCustomFormatOptions(const char *formatterName,
+ List *formatOpts,
+ const bool isWritable,
+ const char *delimiter);
+static void checkCustomFormatDateTypes(const char *formatterName,
+ TupleDesc tupleDesc);
+static Datum transformFormatOpts(char formattype,
+ char *formatname,
+ char *formattername,
+ List *formatOpts,
+ int numcols,
+ bool iswritable,
+ GpPolicy *policy);
-static Oid DefineRelation_int(CreateStmt *stmt, char relkind, char relstorage);
+Oid DefineRelation(CreateStmt *stmt, char relkind, char relstorage, const char *formattername);
+static Oid DefineRelation_int(CreateStmt *stmt, char relkind,
+ char relstorage, const char *formattername);
static void ATExecPartAddInternal(Relation rel, Node *def);
@@ -407,12 +437,14 @@
static bool prebuild_temp_table(Relation rel, RangeVar *tmpname, List *distro,
List *opts, List **hidden_types, bool isTmpTableAo);
static void ATPartitionCheck(AlterTableType subtype, Relation rel, bool rejectroot, bool recursing);
-static void InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, List *locs, List* fmtopts);
-
+static void InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, bool forceCreateDir,
+ List *locs, List* fmtopts);
static char *alterTableCmdString(AlterTableType subtype);
static bool isPgDefaultTablespace(const char *tablespacename);
+static Oid LookupCustomProtocolValidatorFunc(char *protoname);
+
bool
isPgDefaultTablespace(const char *tablespacename){
if(tablespacename == NULL)
@@ -432,20 +464,20 @@
* ----------------------------------------------------------------
*/
Oid
-DefineRelation(CreateStmt *stmt, char relkind, char relstorage)
+DefineRelation(CreateStmt *stmt, char relkind, char relstorage, const char *formattername)
{
Oid reloid = 0;
- Assert(stmt->relation->schemaname == NULL || strlen(stmt->relation->schemaname)>0);
+ Assert(stmt->base.relation->schemaname == NULL || strlen(stmt->base.relation->schemaname)>0);
/* forbid create non-system table on tablespace pg_default */
- if((!IsBootstrapProcessingMode()) && (isPgDefaultTablespace(stmt->tablespacename)))
+ if((!IsBootstrapProcessingMode()) && (isPgDefaultTablespace(stmt->base.tablespacename)))
{
ereport(ERROR,
(errcode(ERRCODE_CDB_FEATURE_NOT_YET),
errmsg("Creating table on tablespace 'pg_default' is not allowed")));
}
- reloid = DefineRelation_int(stmt, relkind, relstorage);
+ reloid = DefineRelation_int(stmt, relkind, relstorage, formattername);
return reloid;
}
@@ -454,11 +486,12 @@
Oid
DefineRelation_int(CreateStmt *stmt,
char relkind,
- char relstorage)
+ char relstorage,
+ const char *formattername)
{
char relname[NAMEDATALEN];
Oid namespaceId;
- List *schema = stmt->tableElts;
+ List *schema = stmt->base.tableElts;
Oid relationId = InvalidOid;
Oid tablespaceId;
Relation rel;
@@ -482,12 +515,12 @@
* Truncate relname to appropriate length (probably a waste of time, as
* parser should have done this already).
*/
- StrNCpy(relname, stmt->relation->relname, NAMEDATALEN);
+ StrNCpy(relname, stmt->base.relation->relname, NAMEDATALEN);
/*
* Check consistency of arguments
*/
- if (stmt->oncommit != ONCOMMIT_NOOP && !stmt->relation->istemp)
+ if (stmt->base.oncommit != ONCOMMIT_NOOP && !stmt->base.relation->istemp)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("ON COMMIT can only be used on temporary tables"),
@@ -498,7 +531,7 @@
* Check we have permission to create there. Skip check if bootstrapping,
* since permissions machinery may not be working yet.
*/
- namespaceId = RangeVarGetCreationNamespace(stmt->relation);
+ namespaceId = RangeVarGetCreationNamespace(stmt->base.relation);
if (!IsBootstrapProcessingMode())
{
@@ -532,21 +565,21 @@
* We shouldn't go through the regular default case for these because we
* don't want to pick up the value from the default_tablespace guc.
*/
- Assert(!stmt->tablespacename);
+ Assert(!stmt->base.tablespacename);
tablespaceId = InvalidOid;
}
- else if (stmt->tablespacename)
+ else if (stmt->base.tablespacename)
{
/*
* Tablespace specified on the command line, or was passed down by
* dispatch.
*/
- tablespaceId = get_tablespace_oid(stmt->tablespacename);
+ tablespaceId = get_tablespace_oid(stmt->base.tablespacename);
if (!OidIsValid(tablespaceId))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("tablespace \"%s\" does not exist",
- stmt->tablespacename),
+ stmt->base.tablespacename),
errOmitLocation(true)));
}
else
@@ -573,7 +606,7 @@
/*
* Parse and validate reloptions, if any.
*/
- reloptions = transformRelOptions((Datum) 0, stmt->options, true, false);
+ reloptions = transformRelOptions((Datum) 0, stmt->base.options, true, false);
/*
* Accept and only accept tidycat option during upgrade.
@@ -610,17 +643,17 @@
* inherited attributes. Update the offsets of the distribution attributes
* in GpPolicy if necessary.
*/
- isPartitioned = stmt->partitionBy ? true : false;
- schema = MergeAttributes(schema, stmt->inhRelations,
- stmt->relation->istemp, isPartitioned,
+ isPartitioned = stmt->base.partitionBy ? true : false;
+ schema = MergeAttributes(schema, stmt->base.inhRelations,
+ stmt->base.relation->istemp, isPartitioned,
&inheritOids, &old_constraints, &parentOidCount, stmt->policy);
/*
* If a partition table, and user not specify pagesize and rowgroupsize, specify the default
* pagesize to 1MB, rowgroupsize to 8MB.
*/
- if((stmt->partitionBy) || (stmt->is_part_child)){
- reloptions = AddDefaultPageRowGroupSize(reloptions, stmt->options);
+ if((stmt->base.partitionBy) || (stmt->base.is_part_child)){
+ reloptions = AddDefaultPageRowGroupSize(reloptions, stmt->base.options);
}
/*
@@ -632,10 +665,21 @@
*/
descriptor = BuildDescForRelation(schema);
- localHasOids = interpretOidsOption(stmt->options);
+ localHasOids = interpretOidsOption(stmt->base.options);
descriptor->tdhasoid = (localHasOids || parentOidCount > 0);
/*
+ * Check supported data types for pluggable format, i.e., orc
+ * Need to remove this check if all data types are supported for orc format.
+ */
+ if ((relkind == RELKIND_RELATION) && (relstorage == RELSTORAGE_EXTERNAL) &&
+ formattername && pg_strncasecmp(formattername, "text", strlen("text")) &&
+ pg_strncasecmp(formattername, "csv", strlen("csv")))
+ {
+ checkCustomFormatDateTypes(formattername, descriptor);
+ }
+
+ /*
* old_constraints: pre-cooked constraints from CREATE TABLE ... INHERIT ...
* stmt->constraints: might have some pre-cooked constraints passed by analyze.c,
* due to LIKE tab INCLUDING CONSTRAINTS
@@ -651,8 +695,8 @@
{
Constraint *cdef = (Constraint *) lfirst(listptr);
if (cdef->contype == CONSTR_CHECK &&
- add_nonduplicate_cooked_constraint(cdef, stmt->constraints))
- stmt->constraints = lappend(stmt->constraints, cdef);
+ add_nonduplicate_cooked_constraint(cdef, stmt->base.constraints))
+ stmt->base.constraints = lappend(stmt->base.constraints, cdef);
}
}
@@ -674,8 +718,8 @@
}
/* MPP-8405: disallow OIDS on partitioned tables */
- if ((stmt->partitionBy ||
- stmt->is_part_child) &&
+ if ((stmt->base.partitionBy ||
+ stmt->base.is_part_child) &&
descriptor->tdhasoid &&
IsNormalProcessingMode() &&
(Gp_role == GP_ROLE_DISPATCH))
@@ -690,7 +734,7 @@
if (stmt->oidInfo.relOid)
elog(DEBUG4, "DefineRelation relOid=%d schemaname=%s ",
stmt->oidInfo.relOid,
- stmt->relation->schemaname ? stmt->relation->schemaname : "");
+ stmt->base.relation->schemaname ? stmt->base.relation->schemaname : "");
relationId = heap_create_with_catalog(relname,
namespaceId,
@@ -705,7 +749,7 @@
localHasOids,
/* bufferPoolBulkLoad */ false,
parentOidCount,
- stmt->oncommit,
+ stmt->base.oncommit,
stmt->policy, /*CDB*/
reloptions,
allowSystemTableModsDDL,
@@ -729,7 +773,7 @@
* backends can't see the new rel anyway until we commit), but it keeps
* the lock manager from complaining about deadlock risks.
*/
- if (stmt->is_part_child)
+ if (stmt->base.is_part_child)
rel = relation_open(relationId, NoLock);
else
rel = relation_open(relationId, AccessExclusiveLock);
@@ -780,8 +824,8 @@
/*
* Parse and add the defaults/constraints, if any.
*/
- if (rawDefaults || stmt->constraints)
- AddRelationConstraints(rel, rawDefaults, stmt->constraints);
+ if (rawDefaults || stmt->base.constraints)
+ AddRelationConstraints(rel, rawDefaults, stmt->base.constraints);
if (stmt->attr_encodings)
AddRelationAttributeEncodings(rel, stmt->attr_encodings);
@@ -939,26 +983,174 @@
char* commandString = NULL;
char rejectlimittype = '\0';
char formattype;
+ char* formattername = NULL;
int rejectlimit = -1;
int encoding = -1;
int preferred_segment_num = -1;
bool issreh = false; /* is single row error handling requested? */
+ bool isexternal = createExtStmt->isexternal;
bool iswritable = createExtStmt->iswritable;
bool isweb = createExtStmt->isweb;
+ bool forceCreateDir = createExtStmt->forceCreateDir;
+
+ bool isExternalHdfs = false;
+ char* location = NULL;
+ int location_len = NULL;
/*
* now set the parameters for keys/inheritance etc. Most of these are
* uninteresting for external relations...
*/
- createStmt->relation = createExtStmt->relation;
- createStmt->tableElts = createExtStmt->tableElts;
- createStmt->inhRelations = NIL;
- createStmt->constraints = NIL;
- createStmt->options = NIL;
- createStmt->oncommit = ONCOMMIT_NOOP;
- createStmt->tablespacename = NULL;
+ createStmt->base = createExtStmt->base;
+ // external table options is not compatible with internal table
+ // set NIL here
+ createStmt->base.options = NIL;
createStmt->policy = createExtStmt->policy; /* policy was set in transform */
-
+
+ /*
+ * Recognize formatter option if there are some tokens found in parser.
+ * This design is to give CREATE EXTERNAL TABLE DDL the flexiblity to
+ * support user defined external formatter options.
+ */
+ recognizeExternalRelationFormatterOptions(createExtStmt);
+
+ /*
+ * Get tablespace, database, schema for the relation
+ */
+ RangeVar *rel = createExtStmt->base.relation;
+ // get tablespace name for the relation
+ Oid tablespace_id = (gp_upgrade_mode) ? DEFAULTTABLESPACE_OID : GetDefaultTablespace();
+ if (!OidIsValid(tablespace_id))
+ {
+ tablespace_id = get_database_dts(MyDatabaseId);
+ }
+ char *tablespace_name = get_tablespace_name(tablespace_id);
+
+ // get database name for the relation
+ char *database_name = rel->catalogname ? rel->catalogname : get_database_name(MyDatabaseId);
+
+ // get schema name for the relation
+ char *schema_name = get_namespace_name(RangeVarGetCreationNamespace(rel));
+
+ // get table name for the relation
+ char *table_name = rel->relname;
+
+ /*
+ * Do some special logic when we use custom
+ */
+ if (exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION)
+ {
+ if (exttypeDesc->location_list == NIL)
+ {
+ if (dfs_url == NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Cannot create table on HDFS when the service is not available"),
+ errhint("Check HDFS service and hawq_dfs_url configuration"),
+ errOmitLocation(true)));
+ }
+
+ location_len = strlen(PROTOCOL_HDFS) + /* hdfs:// */
+ strlen(dfs_url) + /* hawq_dfs_url */
+ // 1 + strlen(filespace_name) + /* '/' + filespace name */
+ 1 + strlen(tablespace_name) + /* '/' + tablespace name */
+ 1 + strlen(database_name) + /* '/' + database name */
+ 1 + strlen(schema_name) + /* '/' + schema name */
+ 1 + strlen(table_name) + 1; /* '/' + table name + '\0' */
+
+ char *path;
+
+ if (createExtStmt->parentPath == NULL) {
+ path = (char *)palloc(sizeof(char) * location_len);
+ sprintf(path, "%s%s/%s/%s/%s/%s",
+ PROTOCOL_HDFS,
+ dfs_url, /* filespace_name, */ tablespace_name,
+ database_name, schema_name, table_name);
+ }
+ else {
+ path = (char *)palloc(sizeof(char) *
+ (location_len + strlen(createExtStmt->parentPath)+1));
+ sprintf(path, "%s%s/%s/%s/%s/%s/%s",
+ PROTOCOL_HDFS,
+ dfs_url, /* filespace_name, */ tablespace_name,
+ database_name, schema_name,
+ createExtStmt->parentPath, table_name);
+ }
+
+ exttypeDesc->location_list = list_make1((Node *) makeString(path));
+ }
+
+ /* Check the location to extract protocol */
+ ListCell *first_uri = list_head(exttypeDesc->location_list);
+ Value *v = lfirst(first_uri);
+ char *uri_str = pstrdup(v->val.str);
+ Uri *uri = ParseExternalTableUri(uri_str);
+ bool isHdfs = is_hdfs_protocol(uri);
+
+ pfree(uri_str);
+ FreeExternalTableUri(uri);
+
+ if (isHdfs)
+ {
+ /* We have an HDFS external protocol */
+ isExternalHdfs = true;
+ }
+
+ }
+
+ if (isExternalHdfs)
+ {
+ if (list_length(exttypeDesc->location_list)!= 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Only support 1 external HDFS location. "
+ "Now input %d location(s)",
+ list_length(exttypeDesc->location_list))));
+ }
+
+
+ if (isExternalHdfs)
+ {
+ /*
+ * We force any specified formatter to be custom.
+ */
+ if (strcmp(createExtStmt->format, "custom") != 0)
+ {
+ /* There should be no "formatter" option which we will add now */
+ ListCell *cell = NULL;
+ foreach(cell, createExtStmt->base.options)
+ {
+ DefElem *de = (DefElem *)lfirst(cell);
+ if (strcmp(de->defname, "formatter") == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Invalid FORMATTER option. "
+ "Should not specify formatter option")));
+ }
+ }
+
+ formattername = str_tolower(createExtStmt->format,
+ strlen(createExtStmt->format));
+
+ createExtStmt->base.options =
+ lappend(createExtStmt->base.options,
+ makeDefElem("formatter",
+ (Node *)makeString(pstrdup(formattername))));
+ pfree(createExtStmt->format);
+ createExtStmt->format = pstrdup("custom");
+ }
+
+ /*
+ * Add table category (EXTERNAL or INTERNAL) in format options
+ */
+ const char *kind = createExtStmt->isexternal ? "external" : "internal";
+ createExtStmt->base.options =
+ lappend(createExtStmt->base.options,
+ makeDefElem("category", (Node *)makeString(kind)));
+ }
+
bool isCustom = false;
switch(exttypeDesc->exttabletype)
{
@@ -966,11 +1158,11 @@
/* Parse and validate URI strings (LOCATION clause) */
locationUris = transformLocationUris(exttypeDesc->location_list,
- createExtStmt->formatOpts,
- isweb, iswritable,&isCustom);
+ createExtStmt->base.options,
+ isweb, iswritable, forceCreateDir, &isCustom);
if(!isCustom){
int locLength = list_length(exttypeDesc->location_list);
- if (createStmt->policy && locLength > 0 && locLength > createStmt->policy->bucketnum)
+ if (createStmt->policy && locLength >= 0 && locLength > createStmt->policy->bucketnum)
{
createStmt->policy->bucketnum = locLength;
}
@@ -1007,11 +1199,11 @@
* - Always allow if superuser.
* - Never allow EXECUTE or 'file' exttables if not superuser.
* - Allow http, gpfdist or gpfdists tables if pg_auth has the right permissions
- * for this role and for this type of table, or if gp_external_grant_privileges
+ * for this role and for this type of table, or if gp_external_grant_privileges
* is on (gp_external_grant_privileges should be deprecated at some point).
*/
if(!superuser() && Gp_role == GP_ROLE_DISPATCH)
- {
+ {
if(exttypeDesc->exttabletype == EXTTBL_TYPE_EXECUTE)
{
ereport(ERROR,
@@ -1021,12 +1213,19 @@
}
else
{
- ListCell *first_uri = list_head(exttypeDesc->location_list);
- Value *v = lfirst(first_uri);
- char *uri_str = pstrdup(v->val.str);
- Uri *uri = ParseExternalTableUri(uri_str);
+ ListCell *first_uri;
+ Value *v;
+ char *uri_str;
+ Uri *uri;
- Assert(exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION);
+ if(exttypeDesc->exttabletype == EXTTBL_TYPE_LOCATION)
+ {
+ first_uri = list_head(exttypeDesc->location_list);
+ v = lfirst(first_uri);
+ uri_str = pstrdup(v->val.str);
+ uri = ParseExternalTableUri(uri_str);
+ }
+
if(uri->protocol == URI_FILE)
{
@@ -1042,7 +1241,7 @@
* permissions in pg_auth for creating this table.
*/
- bool isnull;
+ bool isnull;
Oid userid = GetUserId();
cqContext *pcqCtx;
HeapTuple tuple;
@@ -1055,16 +1254,16 @@
ObjectIdGetDatum(userid)));
tuple = caql_getnext(pcqCtx);
-
+
if (!HeapTupleIsValid(tuple))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("role \"%s\" does not exist (in DefineExternalRelation)",
+ errmsg("role \"%s\" does not exist (in DefineExternalRelation)",
GetUserNameFromId(userid))));
if ( (uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && iswritable)
{
- Datum d_wextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreatewextgpfd,
+ Datum d_wextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreatewextgpfd,
&isnull);
bool createwextgpfd = (isnull ? false : DatumGetBool(d_wextgpfd));
@@ -1076,7 +1275,7 @@
}
else if ( (uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && !iswritable)
{
- Datum d_rextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterextgpfd,
+ Datum d_rextgpfd = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterextgpfd,
&isnull);
bool createrextgpfd = (isnull ? false : DatumGetBool(d_rextgpfd));
@@ -1089,7 +1288,7 @@
}
else if (uri->protocol == URI_HTTP && !iswritable)
{
- Datum d_exthttp = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterexthttp,
+ Datum d_exthttp = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterexthttp,
&isnull);
bool createrexthttp = (isnull ? false : DatumGetBool(d_exthttp));
@@ -1105,27 +1304,56 @@
char* protname = uri->customprotocol;
Oid ptcId = LookupExtProtocolOid(protname, false);
AclResult aclresult;
-
+
/* Check we have the right permissions on this protocol */
if (!pg_extprotocol_ownercheck(ptcId, ownerId))
- {
+ {
AclMode mode = (iswritable ? ACL_INSERT : ACL_SELECT);
-
+
aclresult = pg_extprotocol_aclcheck(ptcId, ownerId, mode);
-
+
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_EXTPROTOCOL, protname);
}
}
+ /* magma follow the same ack check as HDFS */
+ else if (uri->protocol == URI_HDFS )
+ {
+ Datum d_wexthdfs = caql_getattr(pcqCtx, Anum_pg_authid_rolcreatewexthdfs, &isnull);
+ bool createwexthdfs = (isnull ? false : DatumGetBool(d_wexthdfs));
+ if (iswritable) {
+ if (!createwexthdfs)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: no privilege to create a writable hdfs external table"),
+ errOmitLocation(true)));
+ }
+ }
+ else
+ {
+ /* A role that can create writable external hdfs table can always
+ * create readable external hdfs table.*/
+ Datum d_rexthdfs = caql_getattr(pcqCtx, Anum_pg_authid_rolcreaterexthdfs, &isnull);
+ bool createrexthdfs = (isnull ? false : DatumGetBool(d_rexthdfs));
+ if (!createrexthdfs && !createwexthdfs)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied: no privilege to create a readable hdfs external table"),
+ errOmitLocation(true)));
+ }
+ }
+ }
else
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("internal error in DefineExternalRelation. "
- "protocol is %d, writable is %d",
+ "protocol is %d, writable is %d",
uri->protocol, iswritable)));
}
-
+
caql_endscan(pcqCtx);
}
FreeExternalTableUri(uri);
@@ -1134,24 +1362,12 @@
}
/*
- * Parse and validate FORMAT clause.
- */
- formattype = transformFormatType(createExtStmt->format);
-
- formatOptStr = transformFormatOpts(formattype,
- createExtStmt->formatOpts,
- list_length(createExtStmt->tableElts),
- iswritable);
-
- /*
* Parse single row error handling info if available
*/
singlerowerrorDesc = (SingleRowErrorDesc *)createExtStmt->sreh;
if(singlerowerrorDesc)
{
- Assert(!iswritable);
-
issreh = true;
/* get reject limit, and reject limit type */
@@ -1175,6 +1391,42 @@
}
/*
+ * Parse and validate FORMAT clause.
+ *
+ * We force formatter as 'custom' if it is external hdfs protocol
+ */
+ formattype = (isExternalHdfs) ?
+ 'b' : transformFormatType(createExtStmt->format);
+
+ if ((formattype == 'b') )
+ {
+ Oid procOid = InvalidOid;
+
+ procOid = LookupPlugStorageValidatorFunc(formattername, "validate_interfaces");
+
+ if (OidIsValid(procOid))
+ {
+ InvokePlugStorageValidationFormatInterfaces(procOid, formattername);
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("unsupported format '%s' in pluggable storage", formattername),
+ errhint("make sure format is installed"),
+ errOmitLocation(true)));
+ }
+ }
+
+ formatOptStr = transformFormatOpts(formattype,
+ createExtStmt->format,
+ formattername,
+ createExtStmt->base.options,
+ list_length(createExtStmt->base.tableElts),
+ iswritable,
+ createExtStmt->policy);
+
+ /*
* Parse external table data encoding
*/
foreach(option, createExtStmt->encoding)
@@ -1199,6 +1451,13 @@
{
encoding = intVal(dencoding->arg);
encoding_name = pg_encoding_to_char(encoding);
+
+ /* custom format */
+ if (formattername)
+ {
+ checkCustomFormatEncoding(formattername, encoding_name);
+ }
+
if (strcmp(encoding_name, "") == 0 ||
pg_valid_client_encoding(encoding_name) < 0)
ereport(ERROR,
@@ -1210,6 +1469,14 @@
else if (IsA(dencoding->arg, String))
{
encoding_name = strVal(dencoding->arg);
+
+ if (formattername)
+ {
+ checkCustomFormatEncoding(formattername, encoding_name);
+ }
+
+ /* custom format */
+
if (pg_valid_client_encoding(encoding_name) < 0)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
@@ -1227,7 +1494,6 @@
if (encoding < 0)
encoding = pg_get_client_encoding();
-
/*
* First, create the pg_class and other regular relation catalog entries.
* Under the covers this will dispatch a CREATE TABLE statement to all the
@@ -1235,7 +1501,8 @@
*/
Assert(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY);
- reloid = DefineRelation(createStmt, RELKIND_RELATION, RELSTORAGE_EXTERNAL);
+ reloid = DefineRelation(createStmt, RELKIND_RELATION,
+ RELSTORAGE_EXTERNAL, formattername);
/*
* Now we take care of pg_exttable and dependency with error table (if any).
@@ -1250,7 +1517,7 @@
if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
Assert(reloid != InvalidOid);
else
- reloid = RangeVarGetRelid(createExtStmt->relation, true, false /*allowHcatalog*/);
+ reloid = RangeVarGetRelid(createExtStmt->base.relation, true, false /*allowHcatalog*/);
/*
* create a pg_exttable entry for this external table.
@@ -1278,7 +1545,6 @@
referenced;
Oid fmtErrTblOid = RangeVarGetRelid(((SingleRowErrorDesc *)createExtStmt->sreh)->errtable, true, false /*allowHcatalog*/);
- Assert(!createExtStmt->iswritable);
Assert(fmtErrTblOid != InvalidOid);
myself.classId = RelationRelationId;
@@ -1289,6 +1555,16 @@
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
+
+ /*
+ * Add primary key constraint in pg_constraint for custom external table.
+ * We don't add primary key index in pg_index for now.
+ */
+
+ if (formattername)
+ {
+ pfree(formattername);
+ }
}
extern void
@@ -1303,13 +1579,13 @@
* now set the parameters for keys/inheritance etc. Most of these are
* uninteresting for external relations...
*/
- createStmt->relation = createForeignStmt->relation;
- createStmt->tableElts = createForeignStmt->tableElts;
- createStmt->inhRelations = NIL;
- createStmt->constraints = NIL;
- createStmt->options = NIL;
- createStmt->oncommit = ONCOMMIT_NOOP;
- createStmt->tablespacename = NULL;
+ createStmt->base.relation = createForeignStmt->relation;
+ createStmt->base.tableElts = createForeignStmt->tableElts;
+ createStmt->base.inhRelations = NIL;
+ createStmt->base.constraints = NIL;
+ createStmt->base.options = NIL;
+ createStmt->base.oncommit = ONCOMMIT_NOOP;
+ createStmt->base.tablespacename = NULL;
createStmt->policy = NULL; /* for now, we use "master only" type of distribution */
/* (permissions are checked in foreigncmd.c:InsertForeignTableEntry() ) */
@@ -1320,7 +1596,7 @@
* QEs.
*/
if(Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY)
- reloid = DefineRelation(createStmt, RELKIND_RELATION, RELSTORAGE_FOREIGN);
+ reloid = DefineRelation(createStmt, RELKIND_RELATION, RELSTORAGE_FOREIGN, NonCustomFormatType);
/*
* Now we take care of pg_foreign_table
@@ -1389,12 +1665,12 @@
DefinePartitionedRelation(CreateStmt *stmt, Oid relOid)
{
- if (stmt->postCreate)
+ if (stmt->base.postCreate)
{
List *pQry;
Node *pUtl;
DestReceiver *dest = None_Receiver;
- List *pL1 = (List *)stmt->postCreate;
+ List *pL1 = (List *)stmt->base.postCreate;
pQry = parse_analyze(linitial(pL1), NULL, NULL, 0);
@@ -12724,18 +13000,18 @@
char *dstr = "__gp_atsdb_droppedcol";
DestReceiver *dest = None_Receiver;
- cs->relKind = RELKIND_RELATION;
- cs->distributedBy = distro;
- cs->relation = tmpname;
+ cs->base.relKind = RELKIND_RELATION;
+ cs->base.distributedBy = distro;
+ cs->base.relation = tmpname;
cs->ownerid = rel->rd_rel->relowner;
- cs->tablespacename = get_tablespace_name(rel->rd_rel->reltablespace);
+ cs->base.tablespacename = get_tablespace_name(rel->rd_rel->reltablespace);
cs->buildAoBlkdir = false;
if (isTmpTableAo &&
rel->rd_rel->relhasindex)
cs->buildAoBlkdir = true;
- cs->options = opts;
+ cs->base.options = opts;
for (attno = 0; attno < tupdesc->natts; attno++)
{
@@ -12795,7 +13071,7 @@
tname->location = -1;
cd->typname = tname;
- cs->tableElts = lappend(cs->tableElts, cd);
+ cs->base.tableElts = lappend(cs->base.tableElts, cd);
}
parsetrees = parse_analyze((Node *)cs, NULL, NULL, 0);
Assert(list_length(parsetrees) == 1);
@@ -16057,14 +16333,14 @@
inh->options = list_make3_int(CREATE_TABLE_LIKE_INCLUDING_DEFAULTS,
CREATE_TABLE_LIKE_INCLUDING_CONSTRAINTS,
CREATE_TABLE_LIKE_INCLUDING_INDEXES);
- ct->tableElts = list_make1(inh);
- ct->distributedBy = list_copy(distro); /* must preserve the list for later */
+ ct->base.tableElts = list_make1(inh);
+ ct->base.distributedBy = list_copy(distro); /* must preserve the list for later */
/* should be unique enough */
snprintf(tmpname, NAMEDATALEN, "pg_temp_%u", relid);
tmprv = makeRangeVar(NULL /*catalogname*/, nspname, tmpname, -1);
- ct->relation = tmprv;
- ct->relKind = RELKIND_RELATION;
+ ct->base.relation = tmprv;
+ ct->base.relKind = RELKIND_RELATION;
ct->ownerid = rel->rd_rel->relowner;
ct->is_split_part = true;
parsetrees = parse_analyze((Node *)ct, NULL, NULL, 0);
@@ -16273,8 +16549,8 @@
parname = pstrdup(prule->topRule->parname);
}
- mycs->relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
- mycs->relKind = RELKIND_RELATION;
+ mycs->base.relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
+ mycs->base.relKind = RELKIND_RELATION;
/*
* If the new partition is column oriented, initialize the column
@@ -16286,9 +16562,9 @@
* analysis.
*/
if (colencs)
- mycs->tableElts = list_copy(colencs);
+ mycs->base.tableElts = list_copy(colencs);
- mycs->options = orient;
+ mycs->base.options = orient;
mypid->idtype = AT_AP_IDNone;
mypid->location = -1;
@@ -17452,25 +17728,29 @@
* The result is a text array but we declare it as Datum to avoid
* including array.h in analyze.h.
*/
-static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool iswritable, bool* isCustom)
+static Datum transformLocationUris(List *locs, List* fmtopts, bool isweb, bool iswritable,
+ bool forceCreateDir, bool* isCustom)
{
ListCell *cell;
ArrayBuildState *astate;
Datum result;
UriProtocol first_protocol = URI_FILE; /* initialize to keep gcc quiet */
bool first_uri = true;
+ char* first_customprotocal;
#define FDIST_DEF_PORT 8080
- /* Parser should not let this happen */
- Assert(locs != NIL);
+ Uri *uri;
+ text *t;
+ char *uri_str_orig;
+ char *uri_str_final;
+ Size len;
+ Value *v;
/* We build new array using accumArrayResult */
astate = NULL;
- /*
- * first, check for duplicate URI entries
- */
+ /* first, check for duplicate URI entries */
foreach(cell, locs)
{
Value *v1 = lfirst(cell);
@@ -17491,17 +17771,10 @@
}
}
- /*
- * iterate through the user supplied URI list from LOCATION clause.
- */
+ /* iterate through the user supplied URI list from LOCATION clause. */
foreach(cell, locs)
{
- Uri *uri;
- text *t;
- char *uri_str_orig;
- char *uri_str_final;
- Size len;
- Value *v = lfirst(cell);
+ v = lfirst(cell);
/* get the current URI string from the command */
uri_str_orig = pstrdup(v->val.str);
@@ -17528,6 +17801,7 @@
errOmitLocation(true)));
}
+
/* no port was specified for gpfdist, gpfdists or hdfs. add the default */
if ((uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && uri->port == -1)
{
@@ -17561,7 +17835,6 @@
if (first_uri && uri->protocol == URI_CUSTOM)
{
Oid procOid = InvalidOid;
-
procOid = LookupExtProtocolFunction(uri->customprotocol,
EXTPTC_FUNC_VALIDATOR,
false);
@@ -17569,10 +17842,25 @@
if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
InvokeProtocolValidation(procOid,
uri->customprotocol,
- iswritable,
+ iswritable, forceCreateDir,
locs, fmtopts);
+ first_customprotocal = uri->customprotocol;
}
+ if (first_uri && uri->protocol == URI_HDFS)
+ {
+ Oid procOid = InvalidOid;
+
+ procOid = LookupCustomProtocolValidatorFunc("hdfs");
+ if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
+ {
+ InvokeProtocolValidation(procOid,
+ uri->customprotocol,
+ iswritable, forceCreateDir,
+ locs, fmtopts);
+ }
+ }
+
if(first_uri)
{
first_protocol = uri->protocol;
@@ -17580,10 +17868,9 @@
if(uri->protocol == URI_CUSTOM){
*isCustom = true;
}
- }
-
+ }
- if(uri->protocol != first_protocol)
+ if(uri->protocol != first_protocol || (first_protocol == URI_CUSTOM && strcmp(first_customprotocal, uri->customprotocol) != 0))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -17622,7 +17909,9 @@
"\'%s\'", uri_str_final),
errhint("Specify the explicit path and file name to write into.")));
- if ((uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) && iswritable && uri->path[strlen(uri->path) - 1] == '/')
+ if ((uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS) &&
+ iswritable &&
+ uri->path[strlen(uri->path) - 1] == '/')
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("Unsupported use of a directory name in a writable gpfdist(s) external table : "
@@ -17652,7 +17941,47 @@
result = (Datum) 0;
return result;
+}
+static Oid
+LookupCustomProtocolValidatorFunc(char *protoname)
+{
+ List* funcname = NIL;
+ Oid procOid = InvalidOid;
+ Oid argList[1];
+ Oid returnOid;
+
+ elog(LOG, "find validator func for %s", protoname);
+
+ char* new_func_name = (char *)palloc0(strlen(protoname) + 16);
+ sprintf(new_func_name, "%s_validate", protoname);
+ funcname = lappend(funcname, makeString(new_func_name));
+ returnOid = VOIDOID;
+ procOid = LookupFuncName(funcname, 0, argList, true);
+
+ if (!OidIsValid(procOid))
+ ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("protocol function %s was not found.",
+ new_func_name),
+ errhint("Create it with CREATE FUNCTION."),
+ errOmitLocation(true)));
+
+ /* check return type matches */
+ if (get_func_rettype(procOid) != returnOid)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("protocol function %s has an incorrect return type",
+ new_func_name),
+ errOmitLocation(true)));
+
+ /* check allowed volatility */
+ if (func_volatile(procOid) != PROVOLATILE_STABLE)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("protocol function %s is not declared STABLE.",
+ new_func_name),
+ errOmitLocation(true)));
+ pfree(new_func_name);
+
+ return procOid;
}
static Datum transformExecOnClause(List *on_clause, int *preferred_segment_num, bool iswritable)
@@ -17820,12 +18149,207 @@
/*
+ * Check if values for options of custom external table are valid
+ */
+static void checkCustomFormatOptString(char **opt,
+ const char *key,
+ const char *val,
+ const bool needopt,
+ const int nvalidvals,
+ const char **validvals)
+{
+ Assert(opt);
+
+ /* check if need to check option */
+ if (!needopt)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("redundant option %s", key),
+ errOmitLocation(true)));
+ }
+
+ /* check if option is redundant */
+ if (*opt)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options \"%s\"", key),
+ errOmitLocation(true)));
+ }
+
+ *opt = val;
+
+ /* check if value for option is valid */
+ bool valid = false;
+ for (int i = 0; i < nvalidvals; i++)
+ {
+ if (strncasecmp(*opt, validvals[i], strlen(validvals[i])) == 0)
+ {
+ valid = true;
+ }
+ }
+
+ if (!valid)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid value for option %s: \"%s\"", key, val),
+ errOmitLocation(true)));
+ }
+}
+/*
+ * Check if given encoding for custom external table is supported.
+ *
+ * For custom external table, the valid format type kinds are:
+ * 1) 't' for TEXT
+ * 2) 'c' for CSV
+ * 3) 'b' for pluggable format, i.e., ORC which currently support UTF8 encoding.
+ */
+static void checkCustomFormatEncoding(const char *formatterName,
+ const char *encodingName)
+{
+ Oid procOid = InvalidOid;
+
+ procOid = LookupPlugStorageValidatorFunc(formatterName, "validate_encodings");
+
+ if (OidIsValid(procOid))
+ {
+ InvokePlugStorageValidationFormatEncodings(procOid, encodingName);
+ }
+}
+
+/*
+ * Check if options for custom external table are valid
+ */
+static char *checkCustomFormatOptions(const char *formattername,
+ List *formatOpts,
+ const bool isWritable,
+ const char *delimiter)
+{
+
+ /* General options for custom external table */
+ const int maxlen = 8 * 1024 - 1;
+ char *format_str = NULL;
+ format_str = (char *) palloc0(maxlen + 1);
+
+ /* built-in TEXT, CSV format */
+ if (pg_strncasecmp(formattername, "text", strlen("text")) == 0 ||
+ pg_strncasecmp(formattername, "csv", strlen("csv")) == 0)
+ {
+ char *formatter = NULL;
+ ListCell *option;
+ int len = 0;
+
+ StringInfoData key_modified;
+ initStringInfo(&key_modified);
+
+ foreach(option, formatOpts)
+ {
+ DefElem *defel = (DefElem *) lfirst(option);
+ char *key = defel->defname;
+ bool need_free_value = false;
+ char *val = defGetString(defel, &need_free_value);
+
+ /* check formatter */
+ if (strncasecmp(key, "formatter", strlen("formatter")) == 0)
+ {
+ char *formatterValues[] = {"text", "csv"};
+ checkCustomFormatOptString(&formatter, key, val, true, 2, formatterValues);
+ }
+
+ /* check option for text/csv format */
+
+ /* MPP-14467 - replace any space chars with meta char */
+ resetStringInfo(&key_modified);
+ appendStringInfoString(&key_modified, key);
+ replaceStringInfoString(&key_modified, " ", "<gpx20>");
+
+ sprintf((char *) format_str + len, "%s '%s' ", key_modified.data, val);
+ len += strlen(key_modified.data) + strlen(val) + 4;
+
+ if (need_free_value)
+ {
+ pfree(val);
+ val = NULL;
+ }
+
+ AssertImply(need_free_value, NULL == val);
+
+ if (len > maxlen)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("format options must be less than %d bytes in size", maxlen),
+ errOmitLocation(true)));
+ }
+ }
+
+ if(!formatter)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("no formatter function specified"),
+ errOmitLocation(true)));
+ }
+ else if (strncasecmp(formatter, "text", strlen("text")) == 0)
+ {
+ if (delimiter &&
+ strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789", delimiter[0]) != NULL)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("delimiter cannot be \"%s\"", delimiter)));
+ }
+ }
+ }
+ /* pluggable format */
+ else
+ {
+ Oid procOid = InvalidOid;
+
+ procOid = LookupPlugStorageValidatorFunc(formattername, "validate_options");
+
+ if (OidIsValid(procOid))
+ {
+ InvokePlugStorageValidationFormatOptions(procOid, formatOpts,
+ format_str, isWritable);
+ }
+ }
+
+ return format_str;
+}
+/*
+ * Check if the data types for custom external table are valid
+ *
+ * Currently supported data types for ORC external tables are:
+ * SMALLINT, INT, BIGINT, REAL, FLOAT, DOUBLE PRECISION, BOOL, VARCHAR, or TEXT
+ *
+ * Need to revise this if more data types are supported for ORC external table.
+ */
+static void checkCustomFormatDateTypes(const char *formatterName,
+ TupleDesc tupleDesc)
+{
+ if (formatterName == NULL)
+ return;
+
+ Oid procOid = InvalidOid;
+
+ procOid = LookupPlugStorageValidatorFunc(formatterName, "validate_datatypes");
+
+ if (OidIsValid(procOid))
+ {
+ InvokePlugStorageValidationFormatDataTypes(procOid, tupleDesc);
+ }
+}
+
+/*
* Transform the FORMAT options into a text field. Parse the
* options and validate them for their respective format type.
*
* The result is a text field that includes the format string.
*/
-static Datum transformFormatOpts(char formattype, List *formatOpts, int numcols, bool iswritable)
+static Datum transformFormatOpts(char formattype, char *formatname, char *formattername, List *formatOpts, int numcols, bool iswritable, GpPolicy *policy)
{
ListCell *option;
Datum result;
@@ -17836,6 +18360,8 @@
char *escape = NULL;
char *eol_str = NULL;
char *formatter = NULL;
+ char *compresstype = NULL;
+ char *rlecoder = NULL;
bool header_line = false;
bool fill_missing = false;
List *force_notnull = NIL;
@@ -17852,7 +18378,7 @@
{
foreach(option, formatOpts)
{
- DefElem *defel = (DefElem *) lfirst(option);
+ DefElem *defel = (DefElem *)lfirst(option);
if (strcmp(defel->defname, "delimiter") == 0)
{
@@ -17949,9 +18475,9 @@
defel->defname);
}
- /*
- * Set defaults
- */
+
+ /* Set defaults */
+
if (!delim)
delim = fmttype_is_csv(formattype) ? "," : "\t";
@@ -17967,7 +18493,7 @@
}
if (!fmttype_is_csv(formattype) && !escape)
- escape = "\\"; /* default escape for text mode */
+ escape = "\\"; /* default escape for text mode */
/*
* re-construct the FORCE NOT NULL list string.
@@ -17991,9 +18517,9 @@
}
}
- /*
- * re-construct the FORCE QUOTE list string.
- */
+
+ /* re-construct the FORCE QUOTE list string. */
+
if(force_quote)
{
ListCell *l;
@@ -18041,11 +18567,11 @@
1 + /* space */
4 + 1 + 1 + strlen(null_print) + 1 + /* "null 'str'" */
1 + /* space */
- 6 + 1 + 1 + strlen(escape) + 1; /* "escape 'c' or 'off' */
+ 6 + 1 + 1 + strlen(escape) + 1; /* "escape 'c' or 'off' */
if (fmttype_is_csv(formattype))
- len += 1 + /* space */
- 5 + 1 + 3; /* "quote 'x'" */
+ len += 1 + /* space */
+ 5 + 1 + 3; /* "quote 'x'" */
len += header_line ? strlen(" header") : 0;
len += fill_missing ? strlen(" fill missing fields") : 0;
@@ -18076,68 +18602,31 @@
/* should never happen */
Assert(false);
}
-
}
else
{
- /* custom format */
-
- int len = 0;
- const int maxlen = 8 * 1024 - 1;
- StringInfoData key_modified;
-
- initStringInfo(&key_modified);
-
- format_str = (char *) palloc0(maxlen + 1);
-
- foreach(option, formatOpts)
+ /* custom/parquet/orc format */
+ format_str = checkCustomFormatOptions(formattername, formatOpts, iswritable, delim);
+
+ /* set default hash key */
+ ListCell *opt;
+ foreach(opt, formatOpts)
{
- DefElem *defel = (DefElem *) lfirst(option);
- char *key = defel->defname;
+ DefElem *defel = (DefElem *) lfirst(opt);
+ char *key = defel->defname;
bool need_free_value = false;
- char *val = defGetString(defel, &need_free_value);
-
- if (strcmp(key, "formatter") == 0)
+ char *val = (char *) defGetString(defel, &need_free_value);
+ if (strncasecmp(key, "bucketnum", strlen("bucketnum")) == 0)
{
- if (formatter)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options"),
- errOmitLocation(true)));
-
- formatter = strVal(defel->arg);
+ policy->bucketnum = atoi(val);
}
-
- /* MPP-14467 - replace any space chars with meta char */
- resetStringInfo(&key_modified);
- appendStringInfoString(&key_modified, key);
- replaceStringInfoString(&key_modified, " ", "<gpx20>");
-
- sprintf((char *) format_str + len, "%s '%s' ", key_modified.data, val);
- len += strlen(key_modified.data) + strlen(val) + 4;
-
if (need_free_value)
{
pfree(val);
val = NULL;
}
-
- AssertImply(need_free_value, NULL == val);
-
- if (len > maxlen)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("format options must be less than %d bytes in size", maxlen),
- errOmitLocation(true)));
}
-
- if(!formatter)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("no formatter function specified"),
- errOmitLocation(true)));
}
-
/* convert c string to text datum */
result = DirectFunctionCall1(textin, CStringGetDatum(format_str));
@@ -18340,7 +18829,8 @@
}
static void
-InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, List *locs, List* fmtopts)
+InvokeProtocolValidation(Oid procOid, char *procName, bool iswritable, bool forceCreateDir,
+ List *locs, List* fmtopts)
{
ExtProtocolValidatorData *validator_data;
@@ -18354,14 +18844,16 @@
validator_data->type = T_ExtProtocolValidatorData;
validator_data->url_list = locs;
validator_data->format_opts = fmtopts;
+ validator_data->forceCreateDir = forceCreateDir;
validator_data->errmsg = NULL;
validator_data->direction = (iswritable ? EXT_VALIDATE_WRITE :
EXT_VALIDATE_READ);
+ validator_data->action = EXT_VALID_ACT_ARGUMENTS;
InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
/* FmgrInfo */ validator_udf,
- /* nArgs */ 0,
- /* Call Context */ (Node *) validator_data,
+ /* nArgs */ 0,
+ /* Call Context */ (Node *) validator_data,
/* ResultSetInfo */ NULL);
/* invoke validator. if this function returns - validation passed */
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 6c745cc..d8ece33 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -42,6 +42,7 @@
#include "catalog/pg_compression.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_depend.h"
+#include "catalog/pg_exttable.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "catalog/pg_type_encoding.h"
@@ -1294,19 +1295,19 @@
* now set the parameters for keys/inheritance etc. All of these are
* uninteresting for composite types...
*/
- createStmt->relation = (RangeVar *) typevar;
- createStmt->tableElts = coldeflist;
- createStmt->inhRelations = NIL;
- createStmt->constraints = NIL;
- createStmt->options = list_make1(defWithOids(false));
- createStmt->oncommit = ONCOMMIT_NOOP;
- createStmt->tablespacename = NULL;
+ createStmt->base.relation = (RangeVar *) typevar;
+ createStmt->base.tableElts = coldeflist;
+ createStmt->base.inhRelations = NIL;
+ createStmt->base.constraints = NIL;
+ createStmt->base.options = list_make1(defWithOids(false));
+ createStmt->base.oncommit = ONCOMMIT_NOOP;
+ createStmt->base.tablespacename = NULL;
/*
* finally create the relation...
*/
- return DefineRelation(createStmt, RELKIND_COMPOSITE_TYPE, RELSTORAGE_VIRTUAL);
-
+ return DefineRelation(createStmt, RELKIND_COMPOSITE_TYPE,
+ RELSTORAGE_VIRTUAL, NonCustomFormatType);
/*
* DefineRelation already dispatches this.
*
diff --git a/src/backend/commands/user.c b/src/backend/commands/user.c
index a132adb..f8fc301 100644
--- a/src/backend/commands/user.c
+++ b/src/backend/commands/user.c
@@ -2271,7 +2271,8 @@
if(strcasecmp(val, "gpfdist") != 0 &&
strcasecmp(val, "gpfdists") != 0 &&
strcasecmp(val, "http") != 0 &&
- strcasecmp(val, "gphdfs") != 0)
+ strcasecmp(val, "gphdfs") != 0&&
+ strcasecmp(val, "hdfs") != 0)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("invalid %s value \"%s\"", key, val)));
@@ -2303,7 +2304,7 @@
const int numvals = 6;
const char *keys[] = { "type", "protocol"}; /* order matters for validation. don't change! */
const char *vals[] = { /* types */ "readable", "writable",
- /* protocols */ "gpfdist", "gpfdists" , "http", "gphdfs"};
+ /* protocols */ "gpfdist", "gpfdists" , "http", "gphdfs", "hdfs"};
if(list_length(l) > 2)
ereport(ERROR,
@@ -2450,6 +2451,19 @@
createwexthdfs_specified = true;
}
}
+ else if(strcasecmp(extauth->protocol, "hdfs") == 0)
+ {
+ if(strcasecmp(extauth->type, "readable") == 0)
+ {
+ *createrexthdfs = true;
+ createrexthdfs_specified = true;
+ }
+ else
+ {
+ *createwexthdfs = true;
+ createwexthdfs_specified = true;
+ }
+ }
else /* http */
{
if(strcasecmp(extauth->type, "readable") == 0)
@@ -2518,6 +2532,23 @@
*createwexthdfs = false;
}
}
+ else if(strcasecmp(extauth->protocol, "hdfs") == 0)
+ {
+ if(strcasecmp(extauth->type, "readable") == 0)
+ {
+ if(createrexthdfs_specified)
+ conflict = true;
+
+ *createrexthdfs = false;
+ }
+ else
+ {
+ if(createwexthdfs_specified)
+ conflict = true;
+
+ *createwexthdfs = false;
+ }
+ }
else /* http */
{
if(strcasecmp(extauth->type, "readable") == 0)
diff --git a/src/backend/commands/view.c b/src/backend/commands/view.c
index 67af585..967e938 100644
--- a/src/backend/commands/view.c
+++ b/src/backend/commands/view.c
@@ -59,6 +59,7 @@
#include "cdb/cdbvars.h"
#include "cdb/cdbcat.h"
+#include "catalog/pg_exttable.h"
static void checkViewTupleDesc(TupleDesc newdesc, TupleDesc olddesc);
static bool isViewOnTempTable_walker(Node *node, void *context);
@@ -244,21 +245,21 @@
* now set the parameters for keys/inheritance etc. All of these are
* uninteresting for views...
*/
- createStmt->relation = (RangeVar *) relation;
- createStmt->tableElts = attrList;
- createStmt->inhRelations = NIL;
- createStmt->constraints = NIL;
- createStmt->options = list_make1(defWithOids(false));
- createStmt->oncommit = ONCOMMIT_NOOP;
- createStmt->tablespacename = NULL;
- createStmt->relKind = RELKIND_VIEW;
+ createStmt->base.relation = (RangeVar *) relation;
+ createStmt->base.tableElts = attrList;
+ createStmt->base.inhRelations = NIL;
+ createStmt->base.constraints = NIL;
+ createStmt->base.options = list_make1(defWithOids(false));
+ createStmt->base.oncommit = ONCOMMIT_NOOP;
+ createStmt->base.tablespacename = NULL;
+ createStmt->base.relKind = RELKIND_VIEW;
/*
* finally create the relation (this will error out if there's an
* existing view, so we don't need more code to complain if "replace"
* is false).
*/
- newviewOid = DefineRelation(createStmt, RELKIND_VIEW, RELSTORAGE_VIRTUAL);
+ newviewOid = DefineRelation(createStmt, RELKIND_VIEW, RELSTORAGE_VIRTUAL, NonCustomFormatType);
if(comptypeOid)
*comptypeOid = createStmt->oidInfo.comptypeOid;
return newviewOid;
diff --git a/src/backend/executor/execDML.c b/src/backend/executor/execDML.c
index 23ba53d..db54aee 100644
--- a/src/backend/executor/execDML.c
+++ b/src/backend/executor/execDML.c
@@ -421,7 +421,7 @@
else if (formatterType != ExternalTableType_PLUG)
{
resultRelInfo->ri_extInsertDesc = external_insert_init(
- resultRelationDesc, 0, formatterType, formatterName);
+ resultRelationDesc, 0, formatterType, formatterName, estate->es_plannedstmt);
}
else
{
@@ -435,11 +435,21 @@
FmgrInfo insertInitFunc;
fmgr_info(procOid, &insertInitFunc);
+
+ ResultRelSegFileInfo *segfileinfo = NULL;
+ ResultRelInfoSetSegFileInfo(resultRelInfo,
+ estate->es_result_segfileinfos);
+ segfileinfo = (ResultRelSegFileInfo *) list_nth(
+ resultRelInfo->ri_aosegfileinfos, GetQEIndex());
+
resultRelInfo->ri_extInsertDesc =
- InvokePlugStorageFormatInsertInit(&insertInitFunc,
- resultRelationDesc,
- formatterType,
- formatterName);
+
+ InvokePlugStorageFormatInsertInit(&insertInitFunc,
+ resultRelationDesc,
+ formatterType,
+ formatterName,
+ estate->es_plannedstmt,
+ segfileinfo->segno);
}
else
{
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 28357a9..0cd0b6d 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2927,20 +2927,22 @@
{
CreateStmt *newnode = makeNode(CreateStmt);
- COPY_NODE_FIELD(relation);
- COPY_NODE_FIELD(tableElts);
- COPY_NODE_FIELD(inhRelations);
- COPY_NODE_FIELD(constraints);
- COPY_NODE_FIELD(options);
- COPY_SCALAR_FIELD(oncommit);
- COPY_STRING_FIELD(tablespacename);
- COPY_NODE_FIELD(distributedBy);
+ COPY_SCALAR_FIELD(base.relKind);
+ COPY_NODE_FIELD(base.relation);
+ COPY_NODE_FIELD(base.tableElts);
+ COPY_NODE_FIELD(base.inhRelations);
+ COPY_NODE_FIELD(base.constraints);
+ COPY_NODE_FIELD(base.options);
+ COPY_SCALAR_FIELD(base.oncommit);
+ COPY_STRING_FIELD(base.tablespacename);
+ COPY_NODE_FIELD(base.distributedBy);
+ COPY_SCALAR_FIELD(base.is_part_child);
+ COPY_SCALAR_FIELD(base.is_add_part);
COPY_SCALAR_FIELD(oidInfo.relOid);
COPY_SCALAR_FIELD(oidInfo.comptypeOid);
COPY_SCALAR_FIELD(oidInfo.toastOid);
COPY_SCALAR_FIELD(oidInfo.toastIndexOid);
COPY_SCALAR_FIELD(oidInfo.toastComptypeOid);
- COPY_SCALAR_FIELD(relKind);
COPY_SCALAR_FIELD(relStorage);
if (from->policy)
{
@@ -2950,8 +2952,6 @@
newnode->policy = NULL;
/* postCreate omitted (why?) */
COPY_NODE_FIELD(deferredStmts);
- COPY_SCALAR_FIELD(is_part_child);
- COPY_SCALAR_FIELD(is_add_part);
COPY_SCALAR_FIELD(ownerid);
COPY_SCALAR_FIELD(buildAoBlkdir);
COPY_NODE_FIELD(attr_encodings);
@@ -3133,16 +3133,26 @@
{
CreateExternalStmt *newnode = makeNode(CreateExternalStmt);
- COPY_NODE_FIELD(relation);
- COPY_NODE_FIELD(tableElts);
+ COPY_SCALAR_FIELD(base.relKind);
+ COPY_NODE_FIELD(base.relation);
+ COPY_NODE_FIELD(base.tableElts);
+ COPY_NODE_FIELD(base.inhRelations);
+ COPY_NODE_FIELD(base.constraints);
+ COPY_NODE_FIELD(base.options);
+ COPY_SCALAR_FIELD(base.oncommit);
+ COPY_STRING_FIELD(base.tablespacename);
+ COPY_NODE_FIELD(base.distributedBy);
+ COPY_SCALAR_FIELD(base.is_part_child);
+ COPY_SCALAR_FIELD(base.is_add_part);
COPY_NODE_FIELD(exttypedesc);
COPY_STRING_FIELD(format);
- COPY_NODE_FIELD(formatOpts);
COPY_SCALAR_FIELD(isweb);
COPY_SCALAR_FIELD(iswritable);
+ COPY_SCALAR_FIELD(isexternal);
+ COPY_SCALAR_FIELD(forceCreateDir);
+ COPY_STRING_FIELD(parentPath);
COPY_NODE_FIELD(sreh);
COPY_NODE_FIELD(encoding);
- COPY_NODE_FIELD(distributedBy);
if (from->policy)
{
COPY_POINTER_FIELD(policy,sizeof(GpPolicy) + from->policy->nattrs*sizeof(from->policy->attrs[0]));
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 5619450..5f20eeb 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1079,25 +1079,25 @@
static bool
_equalCreateStmt(CreateStmt *a, CreateStmt *b)
{
- COMPARE_NODE_FIELD(relation);
- COMPARE_NODE_FIELD(tableElts);
- COMPARE_NODE_FIELD(inhRelations);
- COMPARE_NODE_FIELD(constraints);
- COMPARE_NODE_FIELD(options);
- COMPARE_SCALAR_FIELD(oncommit);
- COMPARE_STRING_FIELD(tablespacename);
- COMPARE_NODE_FIELD(distributedBy);
+ COMPARE_SCALAR_FIELD(base.relKind);
+ COMPARE_NODE_FIELD(base.relation);
+ COMPARE_NODE_FIELD(base.tableElts);
+ COMPARE_NODE_FIELD(base.inhRelations);
+ COMPARE_NODE_FIELD(base.constraints);
+ COMPARE_NODE_FIELD(base.options);
+ COMPARE_SCALAR_FIELD(base.oncommit);
+ COMPARE_STRING_FIELD(base.tablespacename);
+ COMPARE_NODE_FIELD(base.distributedBy);
+ COMPARE_SCALAR_FIELD(base.is_part_child);
+ COMPARE_SCALAR_FIELD(base.is_add_part);
COMPARE_SCALAR_FIELD(oidInfo.relOid);
COMPARE_SCALAR_FIELD(oidInfo.comptypeOid);
COMPARE_SCALAR_FIELD(oidInfo.toastOid);
COMPARE_SCALAR_FIELD(oidInfo.toastIndexOid);
- COMPARE_SCALAR_FIELD(relKind);
COMPARE_SCALAR_FIELD(relStorage);
/* policy omitted */
/* postCreate omitted */
/* deferredStmts omitted */
- COMPARE_SCALAR_FIELD(is_part_child);
- COMPARE_SCALAR_FIELD(is_add_part);
COMPARE_SCALAR_FIELD(is_split_part);
COMPARE_SCALAR_FIELD(ownerid);
COMPARE_SCALAR_FIELD(buildAoBlkdir);
@@ -1141,16 +1141,25 @@
static bool
_equalCreateExternalStmt(CreateExternalStmt *a, CreateExternalStmt *b)
{
- COMPARE_NODE_FIELD(relation);
- COMPARE_NODE_FIELD(tableElts);
+ COMPARE_SCALAR_FIELD(base.relKind);
+ COMPARE_NODE_FIELD(base.relation);
+ COMPARE_NODE_FIELD(base.tableElts);
+ COMPARE_NODE_FIELD(base.inhRelations);
+ COMPARE_NODE_FIELD(base.constraints);
+ COMPARE_NODE_FIELD(base.options);
+ COMPARE_SCALAR_FIELD(base.oncommit);
+ COMPARE_STRING_FIELD(base.tablespacename);
+ COMPARE_NODE_FIELD(base.distributedBy);
+ COMPARE_SCALAR_FIELD(base.is_part_child);
+ COMPARE_SCALAR_FIELD(base.is_add_part);
COMPARE_NODE_FIELD(exttypedesc);
COMPARE_STRING_FIELD(format);
- COMPARE_NODE_FIELD(formatOpts);
COMPARE_SCALAR_FIELD(isweb);
COMPARE_SCALAR_FIELD(iswritable);
+ COMPARE_SCALAR_FIELD(forceCreateDir);
+ COMPARE_STRING_FIELD(parentPath);
COMPARE_NODE_FIELD(sreh);
COMPARE_NODE_FIELD(encoding);
- COMPARE_NODE_FIELD(distributedBy);
return true;
}
diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c
index 1ebd11f..507ef53 100644
--- a/src/backend/nodes/outfast.c
+++ b/src/backend/nodes/outfast.c
@@ -2006,14 +2006,17 @@
{
WRITE_NODE_TYPE("CREATESTMT");
- WRITE_NODE_FIELD(relation);
- WRITE_NODE_FIELD(tableElts);
- WRITE_NODE_FIELD(inhRelations);
- WRITE_NODE_FIELD(constraints);
- WRITE_NODE_FIELD(options);
- WRITE_ENUM_FIELD(oncommit, OnCommitAction);
- WRITE_STRING_FIELD(tablespacename);
- WRITE_NODE_FIELD(distributedBy);
+ WRITE_CHAR_FIELD(base.relKind);
+ WRITE_NODE_FIELD(base.relation);
+ WRITE_NODE_FIELD(base.tableElts);
+ WRITE_NODE_FIELD(base.inhRelations);
+ WRITE_NODE_FIELD(base.constraints);
+ WRITE_NODE_FIELD(base.options);
+ WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+ WRITE_STRING_FIELD(base.tablespacename);
+ WRITE_NODE_FIELD(base.distributedBy);
+ WRITE_BOOL_FIELD(base.is_part_child);
+ WRITE_BOOL_FIELD(base.is_add_part);
WRITE_OID_FIELD(oidInfo.relOid);
WRITE_OID_FIELD(oidInfo.comptypeOid);
WRITE_OID_FIELD(oidInfo.toastOid);
@@ -2025,13 +2028,10 @@
WRITE_OID_FIELD(oidInfo.aoblkdirOid);
WRITE_OID_FIELD(oidInfo.aoblkdirIndexOid);
WRITE_OID_FIELD(oidInfo.aoblkdirComptypeOid);
- WRITE_CHAR_FIELD(relKind);
WRITE_CHAR_FIELD(relStorage);
/* policy omitted */
/* postCreate - for analysis, QD only */
/* deferredStmts - for analysis, QD only */
- WRITE_BOOL_FIELD(is_part_child);
- WRITE_BOOL_FIELD(is_add_part);
WRITE_BOOL_FIELD(is_split_part);
WRITE_OID_FIELD(ownerid);
WRITE_BOOL_FIELD(buildAoBlkdir);
@@ -2232,16 +2232,26 @@
{
WRITE_NODE_TYPE("CREATEEXTERNALSTMT");
- WRITE_NODE_FIELD(relation);
- WRITE_NODE_FIELD(tableElts);
+ WRITE_CHAR_FIELD(base.relKind);
+ WRITE_NODE_FIELD(base.relation);
+ WRITE_NODE_FIELD(base.tableElts);
+ WRITE_NODE_FIELD(base.inhRelations);
+ WRITE_NODE_FIELD(base.constraints);
+ WRITE_NODE_FIELD(base.options);
+ WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+ WRITE_STRING_FIELD(base.tablespacename);
+ WRITE_NODE_FIELD(base.distributedBy);
+ WRITE_BOOL_FIELD(base.is_part_child);
+ WRITE_BOOL_FIELD(base.is_add_part);
WRITE_NODE_FIELD(exttypedesc);
WRITE_STRING_FIELD(format);
- WRITE_NODE_FIELD(formatOpts);
WRITE_BOOL_FIELD(isweb);
WRITE_BOOL_FIELD(iswritable);
+ WRITE_BOOL_FIELD(isexternal);
+ WRITE_BOOL_FIELD(forceCreateDir);
+ WRITE_STRING_FIELD(parentPath);
WRITE_NODE_FIELD(sreh);
WRITE_NODE_FIELD(encoding);
- WRITE_NODE_FIELD(distributedBy);
}
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index cf6bf04..5894184 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2111,15 +2111,18 @@
{
WRITE_NODE_TYPE("CREATESTMT");
- WRITE_NODE_FIELD(relation);
- WRITE_NODE_FIELD(tableElts);
- WRITE_NODE_FIELD(inhRelations);
- WRITE_NODE_FIELD(constraints);
- WRITE_NODE_FIELD(options);
- WRITE_ENUM_FIELD(oncommit, OnCommitAction);
- WRITE_STRING_FIELD(tablespacename);
- WRITE_NODE_FIELD(distributedBy);
- WRITE_NODE_FIELD(partitionBy);
+ WRITE_CHAR_FIELD(base.relKind);
+ WRITE_NODE_FIELD(base.relation);
+ WRITE_NODE_FIELD(base.tableElts);
+ WRITE_NODE_FIELD(base.inhRelations);
+ WRITE_NODE_FIELD(base.constraints);
+ WRITE_NODE_FIELD(base.options);
+ WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+ WRITE_STRING_FIELD(base.tablespacename);
+ WRITE_NODE_FIELD(base.distributedBy);
+ WRITE_BOOL_FIELD(base.is_part_child);
+ WRITE_BOOL_FIELD(base.is_add_part);
+ WRITE_NODE_FIELD(base.partitionBy);
WRITE_OID_FIELD(oidInfo.relOid);
WRITE_OID_FIELD(oidInfo.comptypeOid);
WRITE_OID_FIELD(oidInfo.toastOid);
@@ -2131,13 +2134,10 @@
WRITE_OID_FIELD(oidInfo.aoblkdirOid);
WRITE_OID_FIELD(oidInfo.aoblkdirIndexOid);
WRITE_OID_FIELD(oidInfo.aoblkdirComptypeOid);
- WRITE_CHAR_FIELD(relKind);
WRITE_CHAR_FIELD(relStorage);
/* policy omitted */
/* postCreate omitted */
WRITE_NODE_FIELD(deferredStmts);
- WRITE_BOOL_FIELD(is_part_child);
- WRITE_BOOL_FIELD(is_add_part);
WRITE_BOOL_FIELD(is_split_part);
WRITE_OID_FIELD(ownerid);
WRITE_BOOL_FIELD(buildAoBlkdir);
@@ -2170,16 +2170,27 @@
{
WRITE_NODE_TYPE("CREATEEXTERNALSTMT");
- WRITE_NODE_FIELD(relation);
- WRITE_NODE_FIELD(tableElts);
+ WRITE_CHAR_FIELD(base.relKind);
+ WRITE_NODE_FIELD(base.relation);
+ WRITE_NODE_FIELD(base.tableElts);
+ WRITE_NODE_FIELD(base.inhRelations);
+ WRITE_NODE_FIELD(base.constraints);
+ WRITE_NODE_FIELD(base.options);
+ WRITE_ENUM_FIELD(base.oncommit, OnCommitAction);
+ WRITE_STRING_FIELD(base.tablespacename);
+ WRITE_NODE_FIELD(base.distributedBy);
+ WRITE_BOOL_FIELD(base.is_part_child);
+ WRITE_BOOL_FIELD(base.is_add_part);
+ WRITE_NODE_FIELD(base.partitionBy);
WRITE_NODE_FIELD(exttypedesc);
WRITE_STRING_FIELD(format);
- WRITE_NODE_FIELD(formatOpts);
WRITE_BOOL_FIELD(isweb);
WRITE_BOOL_FIELD(iswritable);
+ WRITE_BOOL_FIELD(isexternal);
+ WRITE_BOOL_FIELD(forceCreateDir);
+ WRITE_STRING_FIELD(parentPath);
WRITE_NODE_FIELD(sreh);
WRITE_NODE_FIELD(encoding);
- WRITE_NODE_FIELD(distributedBy);
}
static void
diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c
index f9aee80..2cc7035 100644
--- a/src/backend/nodes/readfast.c
+++ b/src/backend/nodes/readfast.c
@@ -2033,14 +2033,17 @@
{
READ_LOCALS(CreateStmt);
- READ_NODE_FIELD(relation);
- READ_NODE_FIELD(tableElts);
- READ_NODE_FIELD(inhRelations);
- READ_NODE_FIELD(constraints);
- READ_NODE_FIELD(options);
- READ_ENUM_FIELD(oncommit,OnCommitAction);
- READ_STRING_FIELD(tablespacename);
- READ_NODE_FIELD(distributedBy);
+ READ_CHAR_FIELD(base.relKind);
+ READ_NODE_FIELD(base.relation);
+ READ_NODE_FIELD(base.tableElts);
+ READ_NODE_FIELD(base.inhRelations);
+ READ_NODE_FIELD(base.constraints);
+ READ_NODE_FIELD(base.options);
+ READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+ READ_STRING_FIELD(base.tablespacename);
+ READ_NODE_FIELD(base.distributedBy);
+ READ_BOOL_FIELD(base.is_part_child);
+ READ_BOOL_FIELD(base.is_add_part);
READ_OID_FIELD(oidInfo.relOid);
READ_OID_FIELD(oidInfo.comptypeOid);
READ_OID_FIELD(oidInfo.toastOid);
@@ -2052,13 +2055,10 @@
READ_OID_FIELD(oidInfo.aoblkdirOid);
READ_OID_FIELD(oidInfo.aoblkdirIndexOid);
READ_OID_FIELD(oidInfo.aoblkdirComptypeOid);
- READ_CHAR_FIELD(relKind);
READ_CHAR_FIELD(relStorage);
/* policy omitted */
/* postCreate - for analysis, QD only */
/* deferredStmts - for analysis, QD only */
- READ_BOOL_FIELD(is_part_child);
- READ_BOOL_FIELD(is_add_part);
READ_BOOL_FIELD(is_split_part);
READ_OID_FIELD(ownerid);
READ_BOOL_FIELD(buildAoBlkdir);
@@ -2070,16 +2070,16 @@
* Some extra checks to make sure we didn't get lost
* during serialization/deserialization
*/
- Assert(local_node->relKind == RELKIND_INDEX ||
- local_node->relKind == RELKIND_RELATION ||
- local_node->relKind == RELKIND_SEQUENCE ||
- local_node->relKind == RELKIND_UNCATALOGED ||
- local_node->relKind == RELKIND_TOASTVALUE ||
- local_node->relKind == RELKIND_VIEW ||
- local_node->relKind == RELKIND_COMPOSITE_TYPE ||
- local_node->relKind == RELKIND_AOSEGMENTS ||
- local_node->relKind == RELKIND_AOBLOCKDIR);
- Assert(local_node->oncommit <= ONCOMMIT_DROP);
+ Assert(local_node->base.relKind == RELKIND_INDEX ||
+ local_node->base.relKind == RELKIND_RELATION ||
+ local_node->base.relKind == RELKIND_SEQUENCE ||
+ local_node->base.relKind == RELKIND_UNCATALOGED ||
+ local_node->base.relKind == RELKIND_TOASTVALUE ||
+ local_node->base.relKind == RELKIND_VIEW ||
+ local_node->base.relKind == RELKIND_COMPOSITE_TYPE ||
+ local_node->base.relKind == RELKIND_AOSEGMENTS ||
+ local_node->base.relKind == RELKIND_AOBLOCKDIR);
+ Assert(local_node->base.oncommit <= ONCOMMIT_DROP);
READ_DONE();
}
@@ -2316,16 +2316,26 @@
{
READ_LOCALS(CreateExternalStmt);
- READ_NODE_FIELD(relation);
- READ_NODE_FIELD(tableElts);
+ READ_CHAR_FIELD(base.relKind);
+ READ_NODE_FIELD(base.relation);
+ READ_NODE_FIELD(base.tableElts);
+ READ_NODE_FIELD(base.inhRelations);
+ READ_NODE_FIELD(base.constraints);
+ READ_NODE_FIELD(base.options);
+ READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+ READ_STRING_FIELD(base.tablespacename);
+ READ_NODE_FIELD(base.distributedBy);
+ READ_BOOL_FIELD(base.is_part_child);
+ READ_BOOL_FIELD(base.is_add_part);
READ_NODE_FIELD(exttypedesc);
READ_STRING_FIELD(format);
- READ_NODE_FIELD(formatOpts);
READ_BOOL_FIELD(isweb);
READ_BOOL_FIELD(iswritable);
+ READ_BOOL_FIELD(isexternal);
+ READ_BOOL_FIELD(forceCreateDir);
+ READ_STRING_FIELD(parentPath);
READ_NODE_FIELD(sreh);
READ_NODE_FIELD(encoding);
- READ_NODE_FIELD(distributedBy);
READ_DONE();
}
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index cbfbb53..dc7de27 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -2154,16 +2154,18 @@
{
READ_LOCALS(CreateStmt);
- READ_NODE_FIELD(relation);
- READ_NODE_FIELD(tableElts);
- READ_NODE_FIELD(inhRelations);
- READ_NODE_FIELD(constraints);
-
- READ_NODE_FIELD(options);
- READ_ENUM_FIELD(oncommit,OnCommitAction);
- READ_STRING_FIELD(tablespacename);
- READ_NODE_FIELD(distributedBy);
- READ_NODE_FIELD(partitionBy);
+ READ_CHAR_FIELD(base.relKind);
+ READ_NODE_FIELD(base.relation);
+ READ_NODE_FIELD(base.tableElts);
+ READ_NODE_FIELD(base.inhRelations);
+ READ_NODE_FIELD(base.constraints);
+ READ_NODE_FIELD(base.options);
+ READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+ READ_STRING_FIELD(base.tablespacename);
+ READ_NODE_FIELD(base.distributedBy);
+ READ_BOOL_FIELD(base.is_part_child);
+ READ_BOOL_FIELD(base.is_add_part);
+ READ_NODE_FIELD(base.partitionBy);
READ_OID_FIELD(oidInfo.relOid);
READ_OID_FIELD(oidInfo.comptypeOid);
READ_OID_FIELD(oidInfo.toastOid);
@@ -2175,13 +2177,10 @@
READ_OID_FIELD(oidInfo.aoblkdirOid);
READ_OID_FIELD(oidInfo.aoblkdirIndexOid);
READ_OID_FIELD(oidInfo.aoblkdirComptypeOid);
- READ_CHAR_FIELD(relKind);
READ_CHAR_FIELD(relStorage);
/* policy omitted */
/* postCreate omitted */
READ_NODE_FIELD(deferredStmts);
- READ_BOOL_FIELD(is_part_child);
- READ_BOOL_FIELD(is_add_part);
READ_BOOL_FIELD(is_split_part);
READ_OID_FIELD(ownerid);
READ_BOOL_FIELD(buildAoBlkdir);
@@ -2300,16 +2299,27 @@
{
READ_LOCALS(CreateExternalStmt);
- READ_NODE_FIELD(relation);
- READ_NODE_FIELD(tableElts);
+ READ_CHAR_FIELD(base.relKind);
+ READ_NODE_FIELD(base.relation);
+ READ_NODE_FIELD(base.tableElts);
+ READ_NODE_FIELD(base.inhRelations);
+ READ_NODE_FIELD(base.constraints);
+ READ_NODE_FIELD(base.options);
+ READ_ENUM_FIELD(base.oncommit,OnCommitAction);
+ READ_STRING_FIELD(base.tablespacename);
+ READ_NODE_FIELD(base.distributedBy);
+ READ_BOOL_FIELD(base.is_part_child);
+ READ_BOOL_FIELD(base.is_add_part);
+ READ_NODE_FIELD(base.partitionBy);
READ_NODE_FIELD(exttypedesc);
READ_STRING_FIELD(format);
- READ_NODE_FIELD(formatOpts);
READ_BOOL_FIELD(isweb);
READ_BOOL_FIELD(iswritable);
+ READ_BOOL_FIELD(isexternal);
+ READ_BOOL_FIELD(forceCreateDir);
+ READ_STRING_FIELD(parentPath);
READ_NODE_FIELD(sreh);
READ_NODE_FIELD(encoding);
- READ_NODE_FIELD(distributedBy);
local_node->policy = NULL;
READ_DONE();
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 7a7f261..cc8fc0e 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1127,6 +1127,11 @@
return false;
}
+bool is_hdfs_protocol(Uri *uri)
+{
+ return uri->protocol == URI_HDFS;
+}
+
/*
* create plan for pxf
*/
@@ -1441,6 +1446,10 @@
segdb_file_map = create_pxf_plan(segdb_file_map, rel, total_primaries, ctx, scan_relid);
}
+ else if (using_location && is_hdfs_protocol(uri))
+ {
+ // nothing to do
+ }
/* (2) */
else if(using_location && (uri->protocol == URI_GPFDIST ||
uri->protocol == URI_GPFDISTS ||
@@ -1839,17 +1848,30 @@
/* data encoding */
encoding = rel->ext_encoding;
- scan_plan = make_externalscan(tlist,
- scan_clauses,
- scan_relid,
- filenames,
- fmtopts,
- rel->fmttype,
- ismasteronly,
- rejectlimit,
- islimitinrows,
- fmtErrTblOid,
- encoding);
+ if (using_location && (is_hdfs_protocol(uri)))
+ scan_plan = make_externalscan(tlist,
+ scan_clauses,
+ scan_relid,
+ rel->locationlist,
+ fmtopts,
+ rel->fmttype,
+ ismasteronly,
+ rejectlimit,
+ islimitinrows,
+ fmtErrTblOid,
+ encoding);
+ else
+ scan_plan = make_externalscan(tlist,
+ scan_clauses,
+ scan_relid,
+ filenames,
+ fmtopts,
+ rel->fmttype,
+ ismasteronly,
+ rejectlimit,
+ islimitinrows,
+ fmtErrTblOid,
+ encoding);
copy_path_costsize(ctx->root, &scan_plan->scan.plan, best_path);
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index d754df3..e34da95 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -318,11 +318,7 @@
/*
* Now, we want to allocate resource.
*/
- allocResult = calculate_planner_segment_num(my_parse, plannedstmt->resource->life,
- plannedstmt->rtable, plannedstmt->intoPolicy,
- plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1,
- -1);
-
+ allocResult = calculate_planner_segment_num(plannedstmt, my_parse, plannedstmt->resource->life, -1);
Assert(allocResult);
ppResult->saResult = *allocResult;
@@ -628,9 +624,7 @@
/*
* Now, we want to allocate resource.
*/
- allocResult = calculate_planner_segment_num(my_parse, resourceLife,
- plannedstmt->rtable, plannedstmt->intoPolicy,
- plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1, -1);
+ allocResult = calculate_planner_segment_num(plannedstmt, my_parse, resourceLife, -1);
Assert(allocResult);
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 5024389..3d953c2 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -41,10 +41,15 @@
*-------------------------------------------------------------------------
*/
+
#include "postgres.h"
+#include "port.h"
+
+#include <uuid/uuid.h>
#include "access/heapam.h"
#include "access/reloptions.h"
+#include "access/plugstorage.h"
#include "catalog/catquery.h"
#include "catalog/gp_policy.h"
#include "catalog/heap.h"
@@ -52,6 +57,8 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "catalog/pg_compression.h"
+#include "catalog/pg_constraint.h"
+#include "catalog/pg_exttable.h"
#include "catalog/pg_partition.h"
#include "catalog/pg_partition_rule.h"
#include "catalog/pg_operator.h"
@@ -59,6 +66,7 @@
#include "catalog/pg_type_encoding.h"
#include "cdb/cdbpartition.h"
#include "cdb/cdbparquetstoragewrite.h"
+#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/prepare.h"
#include "commands/tablecmds.h"
@@ -66,10 +74,14 @@
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
+#include "nodes/nodes.h"
+#include "nodes/pg_list.h"
+#include "nodes/value.h"
#include "optimizer/clauses.h"
#include "optimizer/plancat.h"
#include "optimizer/tlist.h"
#include "optimizer/var.h"
+#include "optimizer/planmain.h"
#include "parser/analyze.h"
#include "parser/gramparse.h"
#include "parser/parse_agg.h"
@@ -88,7 +100,9 @@
#include "utils/builtins.h"
#include "utils/datum.h"
#include "utils/lsyscache.h"
+#include "utils/palloc.h"
#include "utils/syscache.h"
+#include "utils/uri.h"
#include "cdb/cdbappendonlyam.h"
#include "cdb/cdbvars.h"
@@ -228,6 +242,9 @@
static void transformTableConstraint(ParseState *pstate,
CreateStmtContext *cxt,
Constraint *constraint);
+static void transformExtTableConstraint(ParseState *pstate,
+ CreateStmtContext *cxt,
+ Constraint *constraint);
static void transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
List *distributedBy, GpPolicy **policyp, List *options,
List *likeDistributedBy,
@@ -235,7 +252,7 @@
bool iswritable,
bool onmaster);
static void transformDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
- List *distributedBy, GpPolicy ** policy, List *options,
+ List *distributedBy, GpPolicy ** policy, List *options,
List *likeDistributedBy,
bool bQuiet);
static void transformPartitionBy(ParseState *pstate,
@@ -523,7 +540,7 @@
*/
if (pstate->parentParseState == NULL && query->utilityStmt &&
IsA(query->utilityStmt, CreateStmt) &&
- ((CreateStmt *)query->utilityStmt)->partitionBy)
+ ((CreateStmt *)query->utilityStmt)->base.partitionBy)
{
/*
* We just break the statements into two lists: alter statements and
@@ -1572,7 +1589,7 @@
return;
/* Generate a hash table for all the columns */
- foreach(lc, stmt->tableElts)
+ foreach(lc, stmt->base.tableElts)
{
Node *n = lfirst(lc);
@@ -1600,7 +1617,7 @@
cacheFlags = HASH_ELEM;
ht = hash_create("column info cache",
- list_length(stmt->tableElts),
+ list_length(stmt->base.tableElts),
&cacheInfo, cacheFlags);
}
@@ -1620,7 +1637,7 @@
errmsg("column \"%s\" duplicated",
colname),
errOmitLocation(true)));
-
+
}
ce->count = 0;
}
@@ -1715,7 +1732,7 @@
Datum options;
bool isnull;
- options = caql_getattr(pcqCtx,
+ options = caql_getattr(pcqCtx,
Anum_pg_type_encoding_typoptions,
&isnull);
@@ -1730,7 +1747,7 @@
/*
* Make a default column storage directive from a WITH clause
- * Ignore options in the WITH clause that don't appear in
+ * Ignore options in the WITH clause that don't appear in
* storage_directives for column-level compression.
*/
List *
@@ -1848,7 +1865,7 @@
* try and set the same options!
*/
- if (encodings_overlap(stmt->options, c->encoding, false))
+ if (encodings_overlap(stmt->base.options, c->encoding, false))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("DEFAULT COLUMN ENCODING clause cannot "
@@ -1863,7 +1880,7 @@
*/
if (!deflt)
{
- tmpenc = form_default_storage_directive(stmt->options);
+ tmpenc = form_default_storage_directive(stmt->base.options);
}
else
{
@@ -1875,7 +1892,7 @@
deflt = makeNode(ColumnReferenceStorageDirective);
deflt->deflt = true;
deflt->encoding = transformStorageEncodingClause(tmpenc);
- }
+ }
/*
* Loop over all columns. If a column has a column reference storage clause
@@ -1981,11 +1998,12 @@
cxt.stmtType = "CREATE TABLE";
- cxt.relation = stmt->relation;
- cxt.inhRelations = stmt->inhRelations;
+ cxt.isExternalTable = false;
+ cxt.relation = stmt->base.relation;
+ cxt.inhRelations = stmt->base.inhRelations;
cxt.isalter = false;
- cxt.isaddpart = stmt->is_add_part;
- cxt.columns = NIL;
+ cxt.isaddpart = stmt->base.is_add_part;
+ cxt.columns = NIL;
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
cxt.ixconstraints = NIL;
@@ -1994,12 +2012,12 @@
cxt.alist = NIL;
cxt.dlist = NIL; /* for deferred analysis requiring the created table */
cxt.pkey = NULL;
- cxt.hasoids = interpretOidsOption(stmt->options);
+ cxt.hasoids = interpretOidsOption(stmt->base.options);
stmt->policy = NULL;
/* Disallow inheritance in combination with partitioning. */
- if (stmt->inhRelations && (stmt->partitionBy || stmt->is_part_child ))
+ if (stmt->base.inhRelations && (stmt->base.partitionBy || stmt->base.is_part_child ))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
@@ -2007,7 +2025,7 @@
}
/* Only on top-most partitioned tables. */
- if ( stmt->partitionBy && !stmt->is_part_child )
+ if ( stmt->base.partitionBy && !stmt->base.is_part_child )
{
fixCreateStmtForPartitionedTable(stmt);
}
@@ -2016,7 +2034,7 @@
* Run through each primary element in the table creation clause. Separate
* column defs from constraints, and do preliminary analysis.
*/
- foreach(elements, stmt->tableElts)
+ foreach(elements, stmt->base.tableElts)
{
Node *element = lfirst(elements);
@@ -2045,8 +2063,8 @@
(InhRelation *) element, false);
if (Gp_role == GP_ROLE_DISPATCH && isBeginning &&
- stmt->distributedBy == NIL &&
- stmt->inhRelations == NIL &&
+ stmt->base.distributedBy == NIL &&
+ stmt->base.inhRelations == NIL &&
stmt->policy == NULL)
{
likeDistributedBy = getLikeDistributionPolicy((InhRelation *) element);
@@ -2079,7 +2097,7 @@
/*
* Postprocess constraints that give rise to index definitions.
*/
- transformIndexConstraints(pstate, &cxt, stmt->is_add_part || stmt->is_split_part);
+ transformIndexConstraints(pstate, &cxt, stmt->base.is_add_part || stmt->is_split_part);
/*
* Carry any deferred analysis statements forward. Added for MPP-13750
@@ -2095,7 +2113,7 @@
* Postprocess foreign-key constraints.
* But don't cascade FK constraints to parts, yet.
*/
- if ( ! stmt->is_part_child )
+ if ( ! stmt->base.is_part_child )
transformFKConstraints(pstate, &cxt, true, false);
/*
@@ -2113,9 +2131,9 @@
* to the partition which the user wants to be non-AO. Just ignore it
* instead.
*/
- if (stmt->is_part_child)
+ if (stmt->base.is_part_child)
{
- if (co_explicitly_disabled(stmt->options) || !stenc)
+ if (co_explicitly_disabled(stmt->base.options) || !stenc)
stmt->attr_encodings = NIL;
else
{
@@ -2132,29 +2150,29 @@
/*
* Postprocess Greenplum Database distribution columns
*/
- if (stmt->is_part_child ||
- (stmt->partitionBy &&
+ if (stmt->base.is_part_child ||
+ (stmt->base.partitionBy &&
(
/* be very quiet if set subpartn template */
- (((PartitionBy *)(stmt->partitionBy))->partQuiet ==
+ (((PartitionBy *)(stmt->base.partitionBy))->partQuiet ==
PART_VERBO_NOPARTNAME) ||
(
/* quiet for partitions of depth > 0 */
- (((PartitionBy *)(stmt->partitionBy))->partDepth != 0) &&
- (((PartitionBy *)(stmt->partitionBy))->partQuiet !=
+ (((PartitionBy *)(stmt->base.partitionBy))->partDepth != 0) &&
+ (((PartitionBy *)(stmt->base.partitionBy))->partQuiet !=
PART_VERBO_NORMAL)
)
)
))
bQuiet = true; /* silence distro messages for partitions */
- transformDistributedBy(pstate, &cxt, stmt->distributedBy, &stmt->policy, stmt->options,
+ transformDistributedBy(pstate, &cxt, stmt->base.distributedBy, &stmt->policy, stmt->base.options,
likeDistributedBy, bQuiet);
/*
* Process table partitioning clause
*/
- transformPartitionBy(pstate, &cxt, stmt, stmt->partitionBy, stmt->policy);
+ transformPartitionBy(pstate, &cxt, stmt, stmt->base.partitionBy, stmt->policy);
/*
* Output results.
@@ -2162,147 +2180,657 @@
q = makeNode(Query);
q->commandType = CMD_UTILITY;
q->utilityStmt = (Node *) stmt;
- stmt->tableElts = cxt.columns;
- stmt->constraints = cxt.ckconstraints;
+ stmt->base.tableElts = cxt.columns;
+ stmt->base.constraints = cxt.ckconstraints;
*extras_before = list_concat(*extras_before, cxt.blist);
*extras_after = list_concat(cxt.alist, *extras_after);
return q;
}
-static Query *
-transformCreateExternalStmt(ParseState *pstate, CreateExternalStmt *stmt,
- List **extras_before, List **extras_after)
-{
- CreateStmtContext cxt;
- Query *q;
- ListCell *elements;
- ExtTableTypeDesc *exttypeDesc = NULL;
- List *likeDistributedBy = NIL;
- bool bQuiet = false; /* shut up transformDistributedBy messages */
- bool onmaster = false;
- bool iswritable = stmt->iswritable;
+enum PreDefinedFormatterOptionVALTYPE {
+ PREDEF_FMTOPT_VAL_NO,
+ PREDEF_FMTOPT_VAL_STRING,
+ PREDEF_FMTOPT_VAL_SIGNEDINTEGER,
+ PREDEF_FMTOPT_VAL_COLNAMELIST
+};
- cxt.stmtType = "CREATE EXTERNAL TABLE";
- cxt.relation = stmt->relation;
- cxt.inhRelations = NIL;
- cxt.hasoids = false;
- cxt.isalter = false;
- cxt.columns = NIL;
- cxt.ckconstraints = NIL;
- cxt.fkconstraints = NIL;
- cxt.ixconstraints = NIL;
- cxt.pkey = NULL;
+enum PreDefinedFormatterOptionID {
+ PREDEF_FMT_OPT_ID_DELIMITER,
+ PREDEF_FMT_OPT_ID_NULL,
+ PREDEF_FMT_OPT_ID_HEADER,
+ PREDEF_FMT_OPT_ID_QUOTE,
+ PREDEF_FMT_OPT_ID_ESCAPE,
+ PREDEF_FMT_OPT_ID_FORCENOTNULL,
+ PREDEF_FMT_OPT_ID_FORCEQUOTE,
+ PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS,
+ PREDEF_FMT_OPT_ID_NEWLINE,
+ PREDEF_FMT_OPT_ID_UNPREDEFINED,
+ PREDEF_FMT_OPT_ID_ILLEGAL
+};
- cxt.blist = NIL;
- cxt.alist = NIL;
+typedef struct PreDefinedFormatterOption {
+ char keyword[3][32];
+ int nKeyword;
+ bool hasValue;
+ enum PreDefinedFormatterOptionVALTYPE valueType;
+ enum PreDefinedFormatterOptionID optID;
+} PreDefinedFormatterOption;
- /*
- * Run through each primary element in the table creation clause. Separate
- * column defs from constraints, and do preliminary analysis.
- */
- foreach(elements, stmt->tableElts)
- {
- Node *element = lfirst(elements);
+#define PREDEF_FMTOPT_SIZE 9
- switch (nodeTag(element))
- {
- case T_ColumnDef:
- transformColumnDefinition(pstate, &cxt,
- (ColumnDef *) element);
- break;
+enum PreDefinedFormatterOptionID MatchExternalRelationFormatterOption(
+ PreDefinedFormatterOption *options, ListCell *head) {
+ ListCell *p1 = head;
+ ListCell *p2 = head->next;
+ ListCell *p3 = p2 == NULL ? NULL : p2->next;
+ ListCell *p4 = p3 == NULL ? NULL : p3->next;
- case T_Constraint:
- case T_FkConstraint:
- /* should never happen. If it does fix gram.y */
- elog(ERROR, "node type %d not supported for external tables",
- (int) nodeTag(element));
- break;
+ DefElem *de1 = (DefElem *)lfirst(p1);
+ DefElem *de2 = p2 == NULL ? NULL : (DefElem *)lfirst(p2);
+ DefElem *de3 = p3 == NULL ? NULL : (DefElem *)lfirst(p3);
+ DefElem *de4 = p4 == NULL ? NULL : (DefElem *)lfirst(p4);
- case T_InhRelation:
- {
- /* LIKE */
- bool isBeginning = (cxt.columns == NIL);
+ if (strcmp("#ident", de1->defname) != 0) {
+ return PREDEF_FMT_OPT_ID_ILLEGAL; /* must start with a #ident elem */
+ }
- transformInhRelation(pstate, &cxt,
- (InhRelation *) element, true);
+ for (int i = 0; i < PREDEF_FMTOPT_SIZE; ++i) {
+ PreDefinedFormatterOption *pdOpt = &(options[i]);
+ if (pdOpt->nKeyword == 1 &&
+ strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) == 0) {
+ if (!options[i].hasValue) {
+ return options[i].optID; /* Got no value option */
+ } else if (p2 == NULL) {
+ return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */
+ } else if (((strcmp("#string", de2->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) ||
+ ((strcmp("#int", de2->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) ||
+ ((strcmp("#collist", de2->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) ||
+ ((strcmp("#ident", de2->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) {
+ return options[i].optID; /* Got option having one value */
+ } else {
+ return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */
+ }
+ } else if (pdOpt->nKeyword == 2 && de2 != NULL &&
+ strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) ==
+ 0 &&
+ strcasecmp(pdOpt->keyword[1], ((Value *)(de2->arg))->val.str) ==
+ 0) {
+ if (!options[i].hasValue) {
+ return options[i].optID; /* got no value option */
+ } else if (de3 == NULL) {
+ return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */
+ } else if (((strcmp("#string", de3->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) ||
+ ((strcmp("#int", de3->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) ||
+ ((strcmp("#collist", de3->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) ||
+ ((strcmp("#ident", de3->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) {
+ return options[i].optID; /* Got option having one value */
+ } else {
+ return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */
+ }
+ } else if (pdOpt->nKeyword == 3 && de2 != NULL && de3 != NULL &&
+ strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) ==
+ 0 &&
+ strcasecmp(pdOpt->keyword[1], ((Value *)(de2->arg))->val.str) ==
+ 0 &&
+ strcasecmp(pdOpt->keyword[2], ((Value *)(de3->arg))->val.str) ==
+ 0) {
+ if (!options[i].hasValue) {
+ return options[i].optID; /* got no value option */
+ } else if (de4 == NULL) {
+ return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */
+ } else if (((strcmp("#string", de4->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) ||
+ ((strcmp("#int", de4->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) ||
+ ((strcmp("#collist", de4->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) ||
+ ((strcmp("#ident", de4->defname) == 0) &&
+ (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) {
+ return options[i].optID; /* Got option having one value */
+ } else {
+ return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */
+ }
+ }
+ }
- if (Gp_role == GP_ROLE_DISPATCH && isBeginning &&
- stmt->distributedBy == NIL &&
- stmt->policy == NULL &&
- iswritable /* dont bother if readable table */)
- {
- likeDistributedBy = getLikeDistributionPolicy((InhRelation *) element);
- }
- }
- break;
+ /*
+ * We expect user defined special options which should be consumed
+ * further by customized formatter.
+ */
+ return PREDEF_FMT_OPT_ID_UNPREDEFINED;
+}
- default:
- elog(ERROR, "unrecognized node type: %d",
- (int) nodeTag(element));
- break;
- }
- }
+void recognizeExternalRelationFormatterOptions(
+ CreateExternalStmt *createExtStmt) {
+ PreDefinedFormatterOption options[PREDEF_FMTOPT_SIZE] = {
+ {{"delimiter", "", ""},
+ 1,
+ true,
+ PREDEF_FMTOPT_VAL_STRING,
+ PREDEF_FMT_OPT_ID_DELIMITER},
+ {{"null", "", ""},
+ 1,
+ true,
+ PREDEF_FMTOPT_VAL_STRING,
+ PREDEF_FMT_OPT_ID_NULL},
+ {{"header", "", ""},
+ 1,
+ false,
+ PREDEF_FMTOPT_VAL_NO,
+ PREDEF_FMT_OPT_ID_HEADER},
+ {{"quote", "", ""},
+ 1,
+ true,
+ PREDEF_FMTOPT_VAL_STRING,
+ PREDEF_FMT_OPT_ID_QUOTE},
+ {{"escape", "", ""},
+ 1,
+ true,
+ PREDEF_FMTOPT_VAL_STRING,
+ PREDEF_FMT_OPT_ID_ESCAPE},
+ {{"force", "not", "null"},
+ 3,
+ true,
+ PREDEF_FMTOPT_VAL_COLNAMELIST,
+ PREDEF_FMT_OPT_ID_FORCENOTNULL},
+ {{"force", "quote", ""},
+ 2,
+ true,
+ PREDEF_FMTOPT_VAL_COLNAMELIST,
+ PREDEF_FMT_OPT_ID_FORCEQUOTE},
+ {{"fill", "missing", "fields"},
+ 3,
+ false,
+ PREDEF_FMTOPT_VAL_NO,
+ PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS},
+ {{"newline", "", ""},
+ 1,
+ true,
+ PREDEF_FMTOPT_VAL_STRING,
+ PREDEF_FMT_OPT_ID_NEWLINE}};
- /*
- * Check if this is an EXECUTE ON MASTER table. We'll need this information
- * in transformExternalDistributedBy. While at it, we also check if an error
- * table is attempted to be used on ON MASTER table and error if so.
- */
- if(!iswritable)
- {
- exttypeDesc = (ExtTableTypeDesc *)stmt->exttypedesc;
+ List *newOpts = NULL;
+ ListCell *optCell = list_head(createExtStmt->base.options);
- if(exttypeDesc->exttabletype == EXTTBL_TYPE_EXECUTE)
- {
- ListCell *exec_location_opt;
+ /* Add restriction of error lines */
+ if (createExtStmt->sreh != NULL) {
+ /* Handle error table specification and reject number per segment */
+ SingleRowErrorDesc *errDesc = (SingleRowErrorDesc *)createExtStmt->sreh;
+ if (errDesc->rejectlimit > 0 && errDesc->is_hdfs_protocol_text) {
+ newOpts = lappend(
+ newOpts,
+ makeDefElem("reject_limit", makeInteger(errDesc->rejectlimit)));
+ if (errDesc->hdfsLoc)
+ newOpts =
+ lappend(newOpts,
+ makeDefElem("err_table",
+ (Node *)makeString(pstrdup(errDesc->hdfsLoc))));
+ }
+ }
- foreach(exec_location_opt, exttypeDesc->on_clause)
- {
- DefElem *defel = (DefElem *) lfirst(exec_location_opt);
+ while (optCell != NULL) {
+ /* Try a match now. */
+ enum PreDefinedFormatterOptionID id =
+ MatchExternalRelationFormatterOption(options, optCell);
+ switch (id) {
+ case PREDEF_FMT_OPT_ID_DELIMITER: {
+ DefElem *de = (DefElem *)lfirst(optCell->next);
+ Value *v = (Value *)(de->arg);
+ DefElem *newde =
+ makeDefElem("delimiter", (Node *)makeString(v->val.str));
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_NULL: {
+ DefElem *de = (DefElem *)lfirst(optCell->next);
+ Value *v = (Value *)(de->arg);
+ DefElem *newde = makeDefElem("null", (Node *)makeString(v->val.str));
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_HEADER: {
+ DefElem *newde = makeDefElem("header", (Node *)makeInteger(TRUE));
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_QUOTE: {
+ DefElem *de = (DefElem *)lfirst(optCell->next);
+ Value *v = (Value *)(de->arg);
+ DefElem *newde = makeDefElem("quote", (Node *)makeString(v->val.str));
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_ESCAPE: {
+ DefElem *de = (DefElem *)lfirst(optCell->next);
+ Value *v = (Value *)(de->arg);
+ DefElem *newde = makeDefElem("escape", (Node *)makeString(v->val.str));
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_FORCENOTNULL: {
+ DefElem *newde = NULL;
+ DefElem *de = (DefElem *)lfirst(optCell->next->next->next);
+ if (strcmp("#ident", de->defname) == 0) {
+ /*
+ * The case there is only one column name which is recognized
+ * as a ident string.
+ */
+ Value *v = (Value *)(de->arg);
+ List *collist = list_make1(makeString(v->val.str));
+ newde = makeDefElem("force_notnull", (Node *)collist);
- if (strcmp(defel->defname, "master") == 0)
- {
- SingleRowErrorDesc *srehDesc = (SingleRowErrorDesc *)stmt->sreh;
+ } else {
+ /*
+ * There are multiple column names in a list already
+ * recognized by parser.
+ */
+ List *collist = NULL;
+ ListCell *colCell = NULL;
+ foreach (colCell, (List *)(de->arg)) {
+ collist = lappend(collist,
+ makeString(((Value *)lfirst(colCell))->val.str));
+ elog(LOG, "recognized column list colname:%s",
+ ((Value *)lfirst(colCell))->val.str);
+ }
+ newde = makeDefElem("force_notnull", (Node *)collist);
- onmaster = true;
+ /* TODO: check where the old instance is freed */
+ }
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_FORCEQUOTE: {
+ DefElem *newde = NULL;
+ DefElem *de = (DefElem *)lfirst(optCell->next->next);
+ if (strcmp("#ident", de->defname) == 0) {
+ /*
+ * The case there is only one column name which is recognized
+ * as a ident string.
+ */
+ Value *v = (Value *)(de->arg);
+ List *collist = list_make1(makeString(v->val.str));
+ newde = makeDefElem("force_quote", (Node *)collist);
- if(srehDesc && srehDesc->errtable)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("External web table with ON MASTER clause "
- "cannot use error tables.")));
- }
- }
- }
- }
+ } else {
+ /*
+ * There are multiple column names in a list already
+ * recognized by parser.
+ */
+ List *collist = NULL;
+ ListCell *colCell = NULL;
+ foreach (colCell, (List *)(de->arg)) {
+ collist = lappend(collist,
+ makeString(((Value *)lfirst(colCell))->val.str));
+ elog(LOG, "recognized column list colname:%s",
+ ((Value *)lfirst(colCell))->val.str);
+ }
+ newde = makeDefElem("force_quote", (Node *)collist);
- /*
- * Check if we need to create an error table. If so, add it to the
- * before list.
- */
- if(stmt->sreh && ((SingleRowErrorDesc *)stmt->sreh)->errtable)
- transformSingleRowErrorHandling(pstate, &cxt,
- (SingleRowErrorDesc *) stmt->sreh);
+ /* TODO: check where the old instance is freed */
+ }
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS: {
+ DefElem *newde =
+ makeDefElem("fill_missing_fields", (Node *)makeInteger(TRUE));
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_NEWLINE: {
+ DefElem *de = (DefElem *)lfirst(optCell->next);
+ Value *v = (Value *)(de->arg);
+ DefElem *newde = makeDefElem("newline", (Node *)makeString(v->val.str));
+ newOpts = lappend(newOpts, newde);
+ optCell = optCell->next->next;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_UNPREDEFINED: {
+ /*
+ * In case it is a user defined option. we combind all continuous
+ * ident until we see a string constant or a integer constant.
+ * So this means user defined formatter's user defined option
+ * values can only be string or integer values.
+ */
+ int c = 0;
+ int identlength = 0;
+ ListCell *walkerCell = optCell;
+ while (walkerCell != NULL &&
+ strcmp("#ident", ((DefElem *)lfirst(walkerCell))->defname) ==
+ 0) {
+ c++;
+ Value *v = (Value *)(((DefElem *)lfirst(walkerCell))->arg);
+ identlength += strlen(v->val.str) + 1;
+ walkerCell = walkerCell->next;
+ }
- transformETDistributedBy(pstate, &cxt, stmt->distributedBy, &stmt->policy, NULL,/*no WITH options for ET*/
- likeDistributedBy, bQuiet, iswritable, onmaster);
+ /* Decide the value part */
+ Node *value = NULL;
+ if (walkerCell == NULL) {
+ /* The case the option without value. we set TRUE for it. */
+ value = makeInteger(TRUE);
+ } else {
+ DefElem *de = (DefElem *)lfirst(walkerCell);
+ if (strcmp("#collist", de->defname) == 0) {
+ /*
+ * We don't accept column name list value types for
+ * customized formatter's user defined options.
+ */
+ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg(
+ "cannot support column name list as an unknown "
+ "option's value"),
+ errOmitLocation(true)));
+ } else if (strcmp("#int", de->defname) == 0) {
+ value = makeInteger(((Value *)(de->arg))->val.ival);
+ } else {
+ value = makeString(((Value *)(de->arg))->val.str);
+ }
+ }
- Assert(cxt.ckconstraints == NIL);
- Assert(cxt.fkconstraints == NIL);
- Assert(cxt.ixconstraints == NIL);
+ /* Build key part. */
+ char *newKey = (char *)palloc0(sizeof(char) * identlength);
+ ListCell *walkerCell2 = optCell;
+ int counter = 0;
+ for (; counter < c; counter++, walkerCell2 = walkerCell2->next) {
+ Value *v = (Value *)(((DefElem *)lfirst(walkerCell2))->arg);
+ if (counter > 0) {
+ strcat(newKey, "_");
+ }
+ strcat(newKey, v->val.str);
+ }
- /*
- * Output results.
- */
- q = makeNode(Query);
- q->commandType = CMD_UTILITY;
- q->utilityStmt = (Node *) stmt;
- stmt->tableElts = cxt.columns;
- *extras_before = list_concat(*extras_before, cxt.blist);
- *extras_after = list_concat(cxt.alist, *extras_after);
+ DefElem *newde = makeDefElem(newKey, (Node *)value);
+ newOpts = lappend(newOpts, newde);
- return q;
+ if (walkerCell)
+ optCell = walkerCell->next;
+ else
+ optCell = NULL;
+ break;
+ }
+ case PREDEF_FMT_OPT_ID_ILLEGAL: {
+ ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot recognize full formatter option list"),
+ errOmitLocation(true)));
+ }
+ }
+ }
+
+ /* Use new list to replace the old one */
+ createExtStmt->base.options = newOpts;
+}
+
+static Query *transformCreateExternalStmt(ParseState *pstate,
+ CreateExternalStmt *stmt,
+ List **extras_before,
+ List **extras_after) {
+ CreateStmtContext cxt;
+ Query *q;
+ ListCell *elements;
+ ExtTableTypeDesc *desc = NULL;
+ List *likeDistributedBy = NIL;
+ bool bQuiet = false; /* shut up transformDistributedBy messages */
+ bool onmaster = false;
+ bool iswritable = stmt->iswritable;
+ bool isPluggableStorage = false;
+ if (!stmt->forceCreateDir) stmt->forceCreateDir = stmt->iswritable;
+
+ cxt.stmtType = "CREATE EXTERNAL TABLE";
+ cxt.isExternalTable = true;
+ cxt.relation = stmt->base.relation;
+ cxt.inhRelations = stmt->base.inhRelations;
+ cxt.isaddpart = stmt->base.is_add_part;
+ cxt.iswritable = stmt->iswritable;
+ cxt.exttypedesc = stmt->exttypedesc;
+ cxt.format = stmt->format;
+ cxt.parentPath = stmt->parentPath;
+ cxt.hasoids = false;
+ cxt.isalter = false;
+ cxt.columns = NIL;
+ cxt.ckconstraints = NIL;
+ cxt.fkconstraints = NIL;
+ cxt.ixconstraints = NIL;
+ cxt.inh_indexes = NIL;
+ cxt.pkey = NULL;
+
+ cxt.blist = NIL;
+ cxt.alist = NIL;
+
+ /*
+ * Build type description of internal table in pluggable storage
+ * framework based on format
+ */
+ desc = (ExtTableTypeDesc *)(stmt->exttypedesc);
+ if (desc->exttabletype == EXTTBL_TYPE_UNKNOWN) {
+ if (stmt->format == NULL) {
+ ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Internal table must have format specification"),
+ errhint("Use CREATE TABLE FORMAT instead"),
+ errOmitLocation(true)));
+ }
+
+ /* orc, text, csv on hdfs */
+ else if (pg_strncasecmp(stmt->format, "orc", strlen("orc")) == 0 ||
+ pg_strncasecmp(stmt->format, "text", strlen("text")) == 0 ||
+ pg_strncasecmp(stmt->format, "csv", strlen("csv")) == 0) {
+ desc->exttabletype = EXTTBL_TYPE_LOCATION;
+ desc->location_list = NIL;
+ // desc->location_list = list_make1((Node *) makeString(PROTOCOL_HDFS));
+ desc->command_string = NULL;
+ desc->on_clause = NIL;
+ } else {
+ ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("Format \"%s\" for internal table is invalid",
+ stmt->format)));
+ }
+ isPluggableStorage = true;
+ }
+
+ if (desc->exttabletype == EXTTBL_TYPE_LOCATION) {
+
+ ListCell *loc_cell = list_head(desc->location_list);
+ if (loc_cell == NIL) {
+ if (pg_strncasecmp(stmt->format, "orc", strlen("orc")) &&
+ pg_strncasecmp(stmt->format, "text", strlen("text")) &&
+ pg_strncasecmp(stmt->format, "csv", strlen("csv"))) {
+ ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg(
+ "Internal table on hdfs must be \'orc\', "
+ "\'text\', or \'csv\' format")));
+ }
+ isPluggableStorage = true;
+ } else {
+ Value *loc_val = lfirst(loc_cell);
+ char *loc_str = pstrdup(loc_val->val.str);
+ bool is_hdfs_protocol = IS_HDFS_URI(loc_str);
+ isPluggableStorage = is_hdfs_protocol;
+
+
+ if (is_hdfs_protocol &&
+ (pg_strncasecmp(stmt->format, "orc", strlen("orc")) &&
+ pg_strncasecmp(stmt->format, "text", strlen("text")) &&
+ pg_strncasecmp(stmt->format, "csv", strlen("csv")))) {
+ ereport(
+ ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg(
+ "LOCATION using hdfs url \'%s\' does not "
+ "support \'%s\' format",
+ loc_str, stmt->format),
+ errhint("Use \"FORMAT \'orc\', \'text\', or \'csv\'\" instead"),
+ errOmitLocation(true)));
+ }
+ }
+ }
+
+ // handle error table for text/csv pluggable storage
+ if (stmt->sreh && isPluggableStorage &&
+ (strcasecmp(stmt->format, "text") == 0 ||
+ strcasecmp(stmt->format, "csv") == 0)) {
+ SingleRowErrorDesc *errDesc = (SingleRowErrorDesc *)stmt->sreh;
+
+ if (!errDesc->is_limit_in_rows) {
+ ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg(
+ "Single row error handling with percentage limit is "
+ "not accepted for pluggable storage")));
+ }
+
+ errDesc->is_hdfs_protocol_text = true;
+ if (errDesc->errtable) {
+ errDesc->hdfsLoc = (char *)palloc0(MAXPGPATH);
+ char *fileSpacePath = NULL;
+ GetFilespacePathForTablespace(get_database_dts(MyDatabaseId),
+ &fileSpacePath);
+ /* uuid_t uuid;
+ char buf[1024];
+ uuid_generate(uuid);
+ uuid_unparse(uuid, buf);
+ sprintf(errDesc->hdfsLoc, "%s/ExtErrTbl/%s", fileSpacePath, buf);*/
+ }
+ }
+
+ // Only on top-most partitioned tables
+ if (stmt->base.partitionBy && !stmt->base.is_part_child) {
+ if (isPluggableStorage)
+ fixCreateStmtForPartitionedTable(&stmt->base);
+ else
+ elog(ERROR,
+ "Partition external table only supported for pluggable storage");
+ }
+
+ /*
+ * Run through each primary element in the table creation clause. Separate
+ * column defs from constraints, and do preliminary analysis.
+ */
+ foreach (elements, stmt->base.tableElts) {
+ Node *element = lfirst(elements);
+
+ switch (nodeTag(element)) {
+ case T_ColumnDef:
+ transformColumnDefinition(pstate, &cxt, (ColumnDef *)element);
+ break;
+
+ case T_Constraint:
+ transformExtTableConstraint(pstate, &cxt, (Constraint *)element);
+ break;
+
+ case T_FkConstraint:
+ /* should never happen. If it does fix gram.y */
+ elog(ERROR, "node type %d not supported for external tables",
+ (int)nodeTag(element));
+ break;
+
+ case T_InhRelation: {
+ /* LIKE */
+ bool isBeginning = (cxt.columns == NIL);
+
+ transformInhRelation(pstate, &cxt, (InhRelation *)element,
+ !isPluggableStorage);
+
+ if (Gp_role == GP_ROLE_DISPATCH && isBeginning &&
+ stmt->base.distributedBy == NIL && stmt->policy == NULL &&
+ iswritable /* dont bother if readable table */) {
+ likeDistributedBy = getLikeDistributionPolicy((InhRelation *)element);
+ }
+ } break;
+
+ default:
+ elog(ERROR, "unrecognized node type: %d", (int)nodeTag(element));
+ break;
+ }
+ }
+
+ /*
+ * transformIndexConstraints wants cxt.alist to contain only index
+ * statements, so transfer anything we already have into extras_after
+ * immediately.
+ */
+ *extras_after = list_concat(cxt.alist, *extras_after);
+ cxt.alist = NIL;
+
+ /*
+ * Postprocess constraints that give rise to index definitions.
+ */
+ transformIndexConstraints(pstate, &cxt, false);
+
+ /*
+ * Check if this is an EXECUTE ON MASTER table. We'll need this information
+ * in transformExternalDistributedBy. While at it, we also check if an error
+ * table is attempted to be used on ON MASTER table and error if so.
+ */
+ if (!iswritable) {
+ desc = (ExtTableTypeDesc *)stmt->exttypedesc;
+
+ if (desc->exttabletype == EXTTBL_TYPE_EXECUTE) {
+ ListCell *exec_location_opt;
+
+ foreach (exec_location_opt, desc->on_clause) {
+ DefElem *defel = (DefElem *)lfirst(exec_location_opt);
+
+ if (strcmp(defel->defname, "master") == 0) {
+ SingleRowErrorDesc *srehDesc = (SingleRowErrorDesc *)stmt->sreh;
+
+ onmaster = true;
+
+ if (srehDesc && srehDesc->errtable)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg(
+ "External web table with ON MASTER clause "
+ "cannot use error tables.")));
+ }
+ }
+ }
+ }
+
+ /*
+ * Check if we need to create an error table. If so, add it to the
+ * before list.
+ */
+ if (stmt->sreh && ((SingleRowErrorDesc *)stmt->sreh)->errtable)
+ transformSingleRowErrorHandling(pstate, &cxt,
+ (SingleRowErrorDesc *)stmt->sreh);
+
+ transformETDistributedBy(pstate, &cxt, stmt->base.distributedBy,
+ &stmt->policy, NULL, /*no WITH options for ET*/
+ likeDistributedBy, bQuiet, iswritable, onmaster);
+
+ // Process table partitioning clause
+ if (isPluggableStorage)
+ transformPartitionBy(pstate, &cxt, &stmt->base, stmt->base.partitionBy,
+ stmt->policy);
+
+ /*
+ * Output results.
+ */
+ q = makeNode(Query);
+ q->commandType = CMD_UTILITY;
+ q->utilityStmt = (Node *)stmt;
+ stmt->base.tableElts = cxt.columns;
+ stmt->base.constraints = cxt.ckconstraints;
+ stmt->pkey = cxt.pkey;
+ *extras_before = list_concat(*extras_before, cxt.blist);
+ *extras_after = list_concat(cxt.alist, *extras_after);
+
+ return q;
}
static Query *
@@ -2314,8 +2842,9 @@
ListCell *elements;
cxt.stmtType = "CREATE FOREIGN TABLE";
- cxt.relation = stmt->relation;
- cxt.inhRelations = NIL;
+ cxt.isExternalTable = false;
+ cxt.relation = stmt->relation;
+ cxt.inhRelations = NIL;
cxt.hasoids = false;
cxt.isalter = false;
cxt.columns = NIL;
@@ -2572,9 +3101,9 @@
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple default values specified for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname)));
- /*
- * Note: DEFAULT NULL maps to constraint->raw_expr == NULL
- *
+ /*
+ * Note: DEFAULT NULL maps to constraint->raw_expr == NULL
+ *
* We lose the knowledge that the user specified DEFAULT NULL at
* this point, so we record it in default_is_null
*/
@@ -2643,7 +3172,23 @@
}
}
+static void transformExtTableConstraint(ParseState *pstate,
+ CreateStmtContext *cxt,
+ Constraint *constraint) {
+ switch (constraint->contype) {
+ case CONSTR_PRIMARY:
+ cxt->ixconstraints = lappend(cxt->ixconstraints, constraint);
+ break;
+ case CONSTR_CHECK:
+ cxt->ckconstraints = lappend(cxt->ckconstraints, constraint);
+ break;
+
+ default:
+ elog(ERROR, "unrecognized constraint type: %d", constraint->contype);
+ break;
+ }
+}
/*
* transformETDistributedBy - transform DISTRIBUTED BY clause for
@@ -2658,75 +3203,63 @@
* this is an EXECUTE table with ON MASTER specified, in which case
* we create no policy so that the master will be accessed.
*/
-static void
-transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
- List *distributedBy, GpPolicy **policyp, List *options,
- List *likeDistributedBy,
- bool bQuiet,
- bool iswritable,
- bool onmaster)
-{
- int maxattrs = 200;
- GpPolicy* p = NULL;
+static void transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt,
+ List *distributedBy, GpPolicy **policyp,
+ List *options, List *likeDistributedBy,
+ bool bQuiet, bool iswritable,
+ bool onmaster) {
+ int maxattrs = 200;
+ GpPolicy *p = NULL;
- /*
- * utility mode creates can't have a policy. Only the QD can have policies
- */
- if (Gp_role != GP_ROLE_DISPATCH)
- {
- *policyp = NULL;
- return;
- }
+ /*
+ * utility mode creates can't have a policy. Only the QD can have policies
+ */
+ if (Gp_role != GP_ROLE_DISPATCH) {
+ *policyp = NULL;
+ return;
+ }
- if(!iswritable && list_length(distributedBy) > 0)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
- errmsg("Readable external tables can\'t specify a DISTRIBUTED BY clause.")));
+ if (!iswritable && list_length(distributedBy) > 0)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
+ errmsg(
+ "Readable external tables can\'t specify a DISTRIBUTED "
+ "BY clause.")));
- if(iswritable)
- {
- /* WET */
+ if (iswritable) {
+ /* WET */
- if(distributedBy == NIL && likeDistributedBy == NIL)
- {
- /* defaults to DISTRIBUTED RANDOMLY */
- p = (GpPolicy *) palloc(sizeof(GpPolicy) + maxattrs *
- sizeof(p->attrs[0]));
- p->ptype = POLICYTYPE_PARTITIONED;
- p->nattrs = 0;
- p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum());
- p->attrs[0] = 1;
+ if (distributedBy == NIL && likeDistributedBy == NIL) {
+ /* defaults to DISTRIBUTED RANDOMLY */
+ p = (GpPolicy *)palloc(sizeof(GpPolicy) + maxattrs * sizeof(p->attrs[0]));
+ p->ptype = POLICYTYPE_PARTITIONED;
+ p->nattrs = 0;
+ p->bucketnum = GetRelOpt_bucket_num_fromOptions(
+ options, GetExternalTablePartitionNum());
+ p->attrs[0] = 1;
- *policyp = p;
- }
- else
- {
- /* regular DISTRIBUTED BY transformation */
- transformDistributedBy(pstate, cxt, distributedBy, policyp, options,
- likeDistributedBy, bQuiet);
- }
- }
- else
- {
- /* RET */
+ *policyp = p;
+ } else {
+ /* regular DISTRIBUTED BY transformation */
+ transformDistributedBy(pstate, cxt, distributedBy, policyp, options,
+ likeDistributedBy, bQuiet);
+ }
+ } else {
+ /* RET */
- if(onmaster)
- {
- p = NULL;
- }
- else
- {
- /* defaults to DISTRIBUTED RANDOMLY */
- p = (GpPolicy *) palloc(sizeof(GpPolicy) + maxattrs *
- sizeof(p->attrs[0]));
- p->ptype = POLICYTYPE_PARTITIONED;
- p->nattrs = 0;
- p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum());
- p->attrs[0] = 1;
- }
+ if (onmaster) {
+ p = NULL;
+ } else {
+ /* defaults to DISTRIBUTED RANDOMLY */
+ p = (GpPolicy *)palloc(sizeof(GpPolicy) + maxattrs * sizeof(p->attrs[0]));
+ p->ptype = POLICYTYPE_PARTITIONED;
+ p->nattrs = 0;
+ p->bucketnum = GetRelOpt_bucket_num_fromOptions(
+ options, GetExternalTablePartitionNum());
+ p->attrs[0] = 1;
+ }
- *policyp = p;
- }
+ *policyp = p;
+ }
}
/****************stmt->policy*********************/
@@ -3424,7 +3957,7 @@
char newVals[10000];
{
- List *coldefs = stmt->tableElts;
+ List *coldefs = stmt->base.tableElts;
ListCell *lc = NULL;
StringInfoData sid;
int colcnt = 0;
@@ -3583,7 +4116,7 @@
if (1)
{
- List *coldefs = stmt->tableElts;
+ List *coldefs = stmt->base.tableElts;
ListCell *lc = NULL;
List *vl1 = NULL;
@@ -6007,7 +6540,7 @@
foreach(lc2, already)
{
List *item = lfirst(lc2);
-
+
Assert( IsA(item, List) && list_length(item) == nvals );
/*
@@ -6132,11 +6665,11 @@
ListCell *lc;
AlterPartitionCmd *pc;
- /*
+ /*
* First of all, we shouldn't proceed if this partition isn't AOCO
*/
- /*
+ /*
* Yes, I am as surprised as you are that this is how we represent the WITH
* clause here.
*/
@@ -6153,7 +6686,7 @@
return; /* nothing more to do */
}
- /*
+ /*
* If the specific partition has no specific column encoding, just
* set it to the partition level default and we're done.
*/
@@ -6162,7 +6695,7 @@
elem->colencs = penc;
return;
}
-
+
/*
* Fixup the actual column encoding clauses for this specific partition
* element.
@@ -7299,11 +7832,11 @@
if (!stenc)
return;
- /*
+ /*
* First, split the table elements into column reference storage directives
* and everything else.
*/
- foreach(lc, cs->tableElts)
+ foreach(lc, cs->base.tableElts)
{
Node *n = lfirst(lc);
@@ -7335,7 +7868,7 @@
foreach(lc2, finalencs)
{
ColumnReferenceStorageDirective *f = lfirst(lc2);
-
+
if (f->deflt)
continue;
@@ -7379,7 +7912,7 @@
}
}
- cs->tableElts = list_concat(others, finalencs);
+ cs->base.tableElts = list_concat(others, finalencs);
}
static void
@@ -7403,9 +7936,9 @@
child_tab_name->relname = relname;
child_tab_name->location = -1;
- child_tab_stmt->relation = child_tab_name;
- child_tab_stmt->is_part_child = true;
- child_tab_stmt->is_add_part = stmt->is_add_part;
+ child_tab_stmt->base.relation = child_tab_name;
+ child_tab_stmt->base.is_part_child = true;
+ child_tab_stmt->base.is_add_part = stmt->base.is_add_part;
if (!bQuiet)
ereport(NOTICE,
@@ -7415,7 +7948,7 @@
cxt->relation->relname)));
/* set the "Post Create" rule if it exists */
- child_tab_stmt->postCreate = pPostCreate;
+ child_tab_stmt->base.postCreate = pPostCreate;
/*
* Deep copy the parent's table elements.
@@ -7430,7 +7963,7 @@
* user-specified constraint names, so we don't do one here
* any more.
*/
- child_tab_stmt->tableElts = copyObject(stmt->tableElts);
+ child_tab_stmt->base.tableElts = copyObject(stmt->base.tableElts);
merge_part_column_encodings(child_tab_stmt, stenc);
@@ -7438,8 +7971,8 @@
if (pConstraint && ((enable_partition_rules &&
curPby->partType == PARTTYP_HASH) ||
curPby->partType != PARTTYP_HASH))
- child_tab_stmt->tableElts =
- lappend(child_tab_stmt->tableElts,
+ child_tab_stmt->base.tableElts =
+ lappend(child_tab_stmt->base.tableElts,
pConstraint);
/*
@@ -7449,10 +7982,10 @@
* the create child table
*/
/*child_tab_stmt->inhRelations = list_make1(parent_tab_name); */
- child_tab_stmt->inhRelations = list_copy(stmt->inhRelations);
+ child_tab_stmt->base.inhRelations = list_copy(stmt->base.inhRelations);
- child_tab_stmt->constraints = copyObject(stmt->constraints);
- child_tab_stmt->options = stmt->options;
+ child_tab_stmt->base.constraints = copyObject(stmt->base.constraints);
+ child_tab_stmt->base.options = stmt->base.options;
/* allow WITH clause for appendonly tables */
if ( pStoreAttr )
@@ -7461,24 +7994,24 @@
/* Options */
if ( psa_apc->arg1 )
- child_tab_stmt->options = (List *)psa_apc->arg1;
+ child_tab_stmt->base.options = (List *)psa_apc->arg1;
/* Tablespace from parent (input CreateStmt)... */
if ( psa_apc->arg2 && *strVal(psa_apc->arg2) )
- child_tab_stmt->tablespacename = strVal(psa_apc->arg2);
+ child_tab_stmt->base.tablespacename = strVal(psa_apc->arg2);
}
/* ...or tablespace from root. */
- if ( !child_tab_stmt->tablespacename && stmt->tablespacename )
- child_tab_stmt->tablespacename = stmt->tablespacename;
+ if ( !child_tab_stmt->base.tablespacename && stmt->base.tablespacename )
+ child_tab_stmt->base.tablespacename = stmt->base.tablespacename;
- child_tab_stmt->oncommit = stmt->oncommit;
- child_tab_stmt->distributedBy = stmt->distributedBy;
+ child_tab_stmt->base.oncommit = stmt->base.oncommit;
+ child_tab_stmt->base.distributedBy = stmt->base.distributedBy;
/* use the newSub as the partitionBy if the current
* partition elem had an inline subpartition declaration
*/
- child_tab_stmt->partitionBy = (Node *)newSub;
+ child_tab_stmt->base.partitionBy = (Node *)newSub;
- child_tab_stmt->relKind = RELKIND_RELATION;
+ child_tab_stmt->base.relKind = RELKIND_RELATION;
/*
* Adjust tablespace name for the CREATE TABLE via ADD PARTITION. (MPP-8047)
@@ -7489,18 +8022,18 @@
* Ultimately, we take the tablespace as specified in the command, or, if none
* was specified, the one from the root paritioned table.
*/
- if ( ! child_tab_stmt->tablespacename )
+ if ( ! child_tab_stmt->base.tablespacename )
{
Oid poid = RangeVarGetRelid(cxt->relation, true, false /*allowHcatalog*/); /* parent branch */
if ( ! poid )
{
- poid = RangeVarGetRelid(stmt->relation, true, false /*alloweHcatalog*/); /* whole partitioned table */
+ poid = RangeVarGetRelid(stmt->base.relation, true, false /*alloweHcatalog*/); /* whole partitioned table */
}
if ( poid )
{
Relation prel = RelationIdGetRelation(poid);
- child_tab_stmt->tablespacename = get_tablespace_name(prel->rd_rel->reltablespace);
+ child_tab_stmt->base.tablespacename = get_tablespace_name(prel->rd_rel->reltablespace);
RelationClose(prel);
}
}
@@ -7732,7 +8265,7 @@
at_depth = at_buf;
}
else
- pBy->parentRel = copyObject(stmt->relation);
+ pBy->parentRel = copyObject(stmt->base.relation);
/* set the depth for the immediate subpartition */
if (pBy->subPart)
@@ -8408,8 +8941,8 @@
if ((pBy->partDepth > 0) && (pBy->bKeepMe != true))
{
/* we don't need this any more */
- stmt->partitionBy = NULL;
- stmt->is_part_child = true;
+ stmt->base.partitionBy = NULL;
+ stmt->base.is_part_child = true;
}
} /* end transformPartitionBy */
@@ -8589,7 +9122,7 @@
partrel = heap_open(PartitionRuleRelationId, AccessShareLock);
tuple = caql_getfirst(
- caql_addrel(cqclr(&cqc), partrel),
+ caql_addrel(cqclr(&cqc), partrel),
cql("SELECT * FROM pg_partition_rule "
" WHERE parchildrelid = :1 ",
ObjectIdGetDatum(relid)));
@@ -8607,7 +9140,7 @@
partrel = heap_open(PartitionRelationId, AccessShareLock);
tuple = caql_getfirst(
- caql_addrel(cqclr(&cqc), partrel),
+ caql_addrel(cqclr(&cqc), partrel),
cql("SELECT parlevel FROM pg_partition "
" WHERE oid = :1 ",
ObjectIdGetDatum(paroid)));
@@ -11015,7 +11548,7 @@
if (atc1->subtype != AT_PartAlter)
{
rv = makeRangeVar(
- NULL /*catalogname*/,
+ NULL /*catalogname*/,
get_namespace_name(
RelationGetNamespace(rel)),
pstrdup(RelationGetRelationName(rel)), -1);
@@ -11107,7 +11640,7 @@
* the new partition is LIKE the parent and it
* inherits from it
*/
- ct->tableElts = lappend(ct->tableElts, inh);
+ ct->base.tableElts = lappend(ct->base.tableElts, inh);
cl = list_make1(ct);
@@ -11154,11 +11687,12 @@
bool skipValidation = true;
AlterTableCmd *newcmd;
- cxt.stmtType = "ALTER TABLE";
+ cxt.stmtType = "ALTER TABLE";
+ cxt.isExternalTable = false;
cxt.relation = stmt->relation;
cxt.inhRelations = NIL;
cxt.isalter = true;
- cxt.hasoids = false; /* need not be right */
+ cxt.hasoids = false; /* need not be right */
cxt.columns = NIL;
cxt.ckconstraints = NIL;
cxt.fkconstraints = NIL;
@@ -11433,7 +11967,7 @@
* - has no LIMIT/OFFSET
* - references only one range table (i.e. no joins, self-joins)
* - this range table must itself be updatable
- *
+ *
*/
static bool
isSimplyUpdatableQuery(Query *query)
@@ -12037,7 +12571,7 @@
{
CreateStmt *elp = (CreateStmt *) element;
- setSchemaName(cxt.schemaname, &elp->relation->schemaname);
+ setSchemaName(cxt.schemaname, &elp->base.relation->schemaname);
/*
* XXX todo: deal with constraints
@@ -12050,7 +12584,7 @@
{
CreateExternalStmt *elp = (CreateExternalStmt *) element;
- setSchemaName(cxt.schemaname, &elp->relation->schemaname);
+ setSchemaName(cxt.schemaname, &elp->base.relation->schemaname);
cxt.tables = lappend(cxt.tables, element);
}
@@ -12395,17 +12929,17 @@
attrList = lappend(attrList, coldef);
}
- createStmt->relation = sreh->errtable;
- createStmt->tableElts = attrList;
- createStmt->inhRelations = NIL;
- createStmt->constraints = NIL;
- createStmt->options = list_make2(makeDefElem("errortable", (Node *) makeString("true")),
+ createStmt->base.relation = sreh->errtable;
+ createStmt->base.tableElts = attrList;
+ createStmt->base.inhRelations = NIL;
+ createStmt->base.constraints = NIL;
+ createStmt->base.options = list_make2(makeDefElem("errortable", (Node *) makeString("true")),
makeDefElem("appendonly", (Node *) makeString("true")));
- createStmt->oncommit = ONCOMMIT_NOOP;
- createStmt->tablespacename = NULL;
- createStmt->relKind = RELKIND_RELATION;
+ createStmt->base.oncommit = ONCOMMIT_NOOP;
+ createStmt->base.tablespacename = NULL;
+ createStmt->base.relKind = RELKIND_RELATION;
createStmt->relStorage = RELSTORAGE_AOROWS;
- createStmt->distributedBy = list_make1(NULL); /* DISTRIBUTED RANDOMLY */
+ createStmt->base.distributedBy = list_make1(NULL); /* DISTRIBUTED RANDOMLY */
cxt->blist = lappend(cxt->blist, createStmt);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index dc0e13b..b443bca 100755
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -242,10 +242,10 @@
%type <dbehavior> opt_drop_behavior
%type <list> createdb_opt_list alterdb_opt_list copy_opt_list
- ext_on_clause_list format_opt format_opt_list format_def_list transaction_mode_list
+ ext_on_clause_list format_opt format_opt_list transaction_mode_list
ext_opt_encoding_list
%type <defelt> createdb_opt_item alterdb_opt_item copy_opt_item
- ext_on_clause_item format_opt_item format_def_item transaction_mode_item
+ ext_on_clause_item format_opt_item transaction_mode_item
ext_opt_encoding_item
%type <ival> opt_lock lock_type cast_context
@@ -301,7 +301,7 @@
aggr_args aggr_args_list old_aggr_definition old_aggr_list
oper_argtypes RuleActionList RuleActionMulti
cdb_string_list
- opt_column_list columnList opt_name_list exttab_auth_list keyvalue_list
+ opt_column_list columnList columnListPlus opt_name_list exttab_auth_list keyvalue_list
opt_inherited_column_list
sort_clause opt_sort_clause sortby_list index_params
name_list from_clause from_list opt_array_bounds
@@ -431,7 +431,7 @@
%type <node> var_value zone_value
%type <keyword> unreserved_keyword func_name_keyword
-%type <keyword> col_name_keyword reserved_keyword
+%type <keyword> col_name_keyword reserved_keyword format_opt_keyword
%type <keyword> keywords_ok_in_alias_no_as
%type <node> TableConstraint TableLikeClause
@@ -3310,24 +3310,25 @@
OptTabPartitionBy
{
CreateStmt *n = makeNode(CreateStmt);
+
$4->istemp = $2;
- n->relation = $4;
- n->tableElts = $6;
- n->inhRelations = $8;
- n->constraints = NIL;
- n->options = $9;
- n->oncommit = $10;
- n->tablespacename = $11;
- n->distributedBy = $12;
- n->partitionBy = $13;
+ n->base.relation = $4;
+ n->base.tableElts = $6;
+ n->base.inhRelations = $8;
+ n->base.constraints = NIL;
+ n->base.options = $9;
+ n->base.oncommit = $10;
+ n->base.tablespacename = $11;
+ n->base.distributedBy = $12;
+ n->base.partitionBy = $13;
n->oidInfo.relOid = 0;
n->oidInfo.comptypeOid = 0;
n->oidInfo.toastOid = 0;
n->oidInfo.toastIndexOid = 0;
n->oidInfo.toastComptypeOid = 0;
- n->relKind = RELKIND_RELATION;
+ n->base.relKind = RELKIND_RELATION;
n->policy = 0;
- n->postCreate = NULL;
+ n->base.postCreate = NULL;
$$ = (Node *)n;
}
@@ -3339,23 +3340,23 @@
*/
CreateStmt *n = makeNode(CreateStmt);
$4->istemp = $2;
- n->relation = $4;
- n->tableElts = $8;
- n->inhRelations = list_make1($6);
- n->constraints = NIL;
- n->options = $10;
- n->oncommit = $11;
- n->tablespacename = $12;
- n->distributedBy = $13;
- n->partitionBy = $14;
+ n->base.relation = $4;
+ n->base.tableElts = $8;
+ n->base.inhRelations = list_make1($6);
+ n->base.constraints = NIL;
+ n->base.options = $10;
+ n->base.oncommit = $11;
+ n->base.tablespacename = $12;
+ n->base.distributedBy = $13;
+ n->base.partitionBy = $14;
n->oidInfo.relOid = 0;
n->oidInfo.comptypeOid = 0;
n->oidInfo.toastOid = 0;
n->oidInfo.toastIndexOid = 0;
n->oidInfo.toastComptypeOid = 0;
- n->relKind = RELKIND_RELATION;
+ n->base.relKind = RELKIND_RELATION;
n->policy = 0;
- n->postCreate = NULL;
+ n->base.postCreate = NULL;
$$ = (Node *)n;
}
@@ -3754,6 +3755,17 @@
columnElem { $$ = list_make1($1); }
| columnList ',' columnElem { $$ = lappend($1, $3); }
;
+columnListPlus:
+ columnElem ',' columnElem
+ {
+ $$ = list_make1($1);
+ $$ = lappend($$, $3);
+ }
+ | columnListPlus ',' columnElem
+ {
+ $$ = lappend($1, $3);
+ }
+ ;
columnElem: ColId
{
@@ -4584,16 +4596,17 @@
{
CreateExternalStmt *n = makeNode(CreateExternalStmt);
n->iswritable = $2;
+ n->isexternal = TRUE;
n->isweb = $4;
$7->istemp = $5;
- n->relation = $7;
- n->tableElts = $9;
+ n->base.relation = $7;
+ n->base.tableElts = $9;
n->exttypedesc = $11;
n->format = $13;
- n->formatOpts = $14;
+ n->base.options = $14;
n->encoding = $15;
n->sreh = $16;
- n->distributedBy = $17;
+ n->base.distributedBy = $17;
n->policy = 0;
/* various syntax checks for EXECUTE external table */
@@ -4694,13 +4707,12 @@
format_opt:
'(' format_opt_list ')' { $$ = $2; }
- | '(' format_def_list ')' { $$ = $2; }
| '(' ')' { $$ = NIL; }
| /*EMPTY*/ { $$ = NIL; }
;
format_opt_list:
- format_opt_item
+ format_opt_item
{
$$ = list_make1($1);
}
@@ -4710,67 +4722,43 @@
}
;
-format_def_list:
- format_def_item
- {
- $$ = list_make1($1);
- }
- | format_def_list ',' format_def_item
- {
- $$ = lappend($1, $3);
- }
-
-format_def_item:
- ColLabel '=' def_arg
- {
- $$ = makeDefElem($1, $3);
- }
- | ColLabel '=' '(' columnList ')'
- {
- $$ = makeDefElem($1, (Node *) $4);
- }
+format_opt_keyword:
+ AS
+ | DELIMITER
+ | NULL_P
+ | CSV
+ | HEADER_P
+ | QUOTE
+ | ESCAPE
+ | FORCE
+ | NOT
+ | FILL
+ | MISSING
+ | FIELDS
+ | NEWLINE
+ ;
format_opt_item:
- DELIMITER opt_as Sconst
+ IDENT
{
- $$ = makeDefElem("delimiter", (Node *)makeString($3));
+ $$ = makeDefElem("#ident", (Node *)makeString($1));
}
- | NULL_P opt_as Sconst
+ | Sconst
{
- $$ = makeDefElem("null", (Node *)makeString($3));
+ $$ = makeDefElem("#string", (Node *)makeString($1));
}
- | CSV
+ | SignedIconst
{
- $$ = makeDefElem("csv", (Node *)makeInteger(TRUE));
+ $$ = makeDefElem("#int", (Node *)makeInteger($1));
}
- | HEADER_P
+ | format_opt_keyword
{
- $$ = makeDefElem("header", (Node *)makeInteger(TRUE));
+ $$ = makeDefElem("#ident", (Node *)makeString($1));
}
- | QUOTE opt_as Sconst
+ | columnListPlus
{
- $$ = makeDefElem("quote", (Node *)makeString($3));
- }
- | ESCAPE opt_as Sconst
- {
- $$ = makeDefElem("escape", (Node *)makeString($3));
- }
- | FORCE NOT NULL_P columnList
- {
- $$ = makeDefElem("force_notnull", (Node *)$4);
- }
- | FORCE QUOTE columnList
- {
- $$ = makeDefElem("force_quote", (Node *)$3);
- }
- | FILL MISSING FIELDS
- {
- $$ = makeDefElem("fill_missing_fields", (Node *)makeInteger(TRUE));
- }
- | NEWLINE opt_as Sconst
- {
- $$ = makeDefElem("newline", (Node *)makeString($3));
+ $$ = makeDefElem("#collist", (Node *)$1);
}
;
@@ -13258,28 +13246,28 @@
CreateStmt *ct = makeNode(CreateStmt);
PartitionBy *pBy = NULL;
- ct->relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
+ ct->base.relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1);
/* in analyze.c, fill in tableelts with a list of inhrelation of
the partition parent table, and fill in inhrelations with copy
of rangevar for parent table */
- ct->tableElts = NIL; /* fill in later */
- ct->inhRelations = NIL; /* fill in later */
+ ct->base.tableElts = NIL; /* fill in later */
+ ct->base.inhRelations = NIL; /* fill in later */
- ct->constraints = NIL;
+ ct->base.constraints = NIL;
if (pc_StAttr)
- ct->options = (List *)pc_StAttr->arg1;
+ ct->base.options = (List *)pc_StAttr->arg1;
else
- ct->options = NIL;
+ ct->base.options = NIL;
- ct->oncommit = ONCOMMIT_NOOP;
+ ct->base.oncommit = ONCOMMIT_NOOP;
if (pc_StAttr && pc_StAttr->arg2)
- ct->tablespacename = strVal(pc_StAttr->arg2);
+ ct->base.tablespacename = strVal(pc_StAttr->arg2);
else
- ct->tablespacename = NULL;
+ ct->base.tablespacename = NULL;
if (subSpec) /* treat subspec as partition by... */
{
@@ -13290,19 +13278,19 @@
pBy->partQuiet = PART_VERBO_NODISTRO;
pBy->location = -1;
pBy->partDefault = NULL;
- pBy->parentRel = copyObject(ct->relation);
+ pBy->parentRel = copyObject(ct->base.relation);
}
- ct->distributedBy = NULL;
- ct->partitionBy = (Node *)pBy;
+ ct->base.distributedBy = NULL;
+ ct->base.partitionBy = (Node *)pBy;
ct->oidInfo.relOid = 0;
ct->oidInfo.comptypeOid = 0;
ct->oidInfo.toastOid = 0;
ct->oidInfo.toastIndexOid = 0;
ct->oidInfo.toastComptypeOid = 0;
- ct->relKind = RELKIND_RELATION;
+ ct->base.relKind = RELKIND_RELATION;
ct->policy = 0;
- ct->postCreate = NULL;
+ ct->base.postCreate = NULL;
return (Node *)ct;
}
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index af2d12c..bd6e2b6 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -15,12 +15,15 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "port.h"
#include "access/twophase.h"
#include "access/xact.h"
+#include "access/fileam.h"
#include "catalog/catalog.h"
#include "catalog/catquery.h"
#include "catalog/namespace.h"
+#include "catalog/pg_exttable.h"
#include "catalog/toasting.h"
#include "catalog/aoseg.h"
#include "commands/alter.h"
@@ -50,6 +53,7 @@
#include "commands/vacuum.h"
#include "commands/view.h"
#include "miscadmin.h"
+#include "nodes/value.h"
#include "postmaster/checkpoint.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteRemove.h"
@@ -61,6 +65,7 @@
#include "utils/guc.h"
#include "utils/syscache.h"
#include "utils/lsyscache.h"
+#include "utils/uri.h"
#include "lib/stringinfo.h"
#include "cdb/cdbcat.h"
@@ -71,6 +76,7 @@
#include "cdb/dispatcher.h"
#include "resourcemanager/resqueuecommand.h"
+#include "catalog/pg_exttable.h"
/*
* Error-checking support for DROP commands
*/
@@ -270,10 +276,32 @@
classform = (Form_pg_class) GETSTRUCT(tuple);
- if ((removeType == OBJECT_EXTTABLE && classform->relstorage != RELSTORAGE_EXTERNAL) ||
- (removeType == OBJECT_FOREIGNTABLE && classform->relstorage != RELSTORAGE_FOREIGN) ||
- (removeType == OBJECT_TABLE && (classform->relstorage == RELSTORAGE_EXTERNAL ||
- classform->relstorage == RELSTORAGE_FOREIGN)))
+ bool is_internal = false;
+ if (classform->relstorage == RELSTORAGE_EXTERNAL)
+ {
+ ExtTableEntry *entry = GetExtTableEntry(relOid);
+ List *entry_locations = entry->locations;
+ Assert(entry_locations);
+ ListCell *entry_location = list_head(entry_locations);
+ char *url = ((Value*)lfirst(entry_location))->val.str;
+ char *category = getExtTblCategoryInFmtOptsStr(entry->fmtopts);
+
+ if ((IS_HDFS_URI(url)) &&
+ (category != NULL && pg_strncasecmp(category, "internal", strlen("internal")) == 0))
+ {
+ is_internal = true;
+ }
+
+ if (category)
+ {
+ pfree(category);
+ }
+ }
+
+ if ((removeType == OBJECT_EXTTABLE && (classform->relstorage != RELSTORAGE_EXTERNAL || is_internal)) ||
+ (removeType == OBJECT_FOREIGNTABLE && classform->relstorage != RELSTORAGE_FOREIGN) ||
+ (removeType == OBJECT_TABLE && (classform->relstorage == RELSTORAGE_EXTERNAL && (!is_internal) ||
+ classform->relstorage == RELSTORAGE_FOREIGN)))
{
/* we have a mismatch. format an error string and shoot */
@@ -287,7 +315,7 @@
else
want_type = pstrdup("a base");
- if (classform->relstorage == RELSTORAGE_EXTERNAL)
+ if (classform->relstorage == RELSTORAGE_EXTERNAL && !is_internal)
hint = pstrdup("Use DROP EXTERNAL TABLE to remove an external table");
else if (classform->relstorage == RELSTORAGE_FOREIGN)
hint = pstrdup("Use DROP FOREIGN TABLE to remove a foreign table");
@@ -447,7 +475,7 @@
createStmt = (CreateStmt *) parsetree;
- if (createStmt->relation->istemp)
+ if (createStmt->base.relation->istemp)
return; // Permit creation of TEMPORARY tables in read-only mode.
ereport(ERROR,
@@ -912,8 +940,8 @@
Assert (gp_upgrade_mode || Gp_role != GP_ROLE_EXECUTE);
- relOid = DefineRelation((CreateStmt *) parsetree,
- relKind, relStorage);
+ relOid = DefineRelation((CreateStmt *) parsetree, relKind,
+ relStorage, NonCustomFormatType);
/*
* Let AlterTableCreateToastTable decide if this one needs a
@@ -936,13 +964,13 @@
((CreateStmt *) parsetree)->oidInfo.toastOid,
((CreateStmt *) parsetree)->oidInfo.toastIndexOid,
&(((CreateStmt *) parsetree)->oidInfo.toastComptypeOid),
- ((CreateStmt *)parsetree)->is_part_child);
+ ((CreateStmt *)parsetree)->base.is_part_child);
AlterTableCreateAoSegTableWithOid(relOid,
((CreateStmt *) parsetree)->oidInfo.aosegOid,
((CreateStmt *) parsetree)->oidInfo.aosegIndexOid,
&(((CreateStmt *) parsetree)->oidInfo.aosegComptypeOid),
- ((CreateStmt *) parsetree)->is_part_child);
+ ((CreateStmt *) parsetree)->base.is_part_child);
}
CommandCounterIncrement();
/*
diff --git a/src/backend/utils/misc/uriparser.c b/src/backend/utils/misc/uriparser.c
index 5489c17..e94408d 100644
--- a/src/backend/utils/misc/uriparser.c
+++ b/src/backend/utils/misc/uriparser.c
@@ -78,6 +78,11 @@
uri->protocol = URI_GPFDISTS;
protocol_len = strlen(PROTOCOL_GPFDISTS);
}
+ else if (IS_HDFS_URI(uri_str))
+ {
+ uri->protocol = URI_HDFS;
+ protocol_len = strlen(PROTOCOL_HDFS);
+ }
else /* not recognized. treat it as a custom protocol */
{
@@ -200,7 +205,10 @@
}
else
{
- uri->port = -1; /* no port was indicated. will use default if needed */
+ if (IS_HDFS_URI(uri_str)) /* means nameservice format */
+ uri->port = 0;
+ else
+ uri->port = -1; /* no port was indicated. will use default if needed */
}
}
}
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 777698a..5d4871d 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -104,10 +104,9 @@
ScanState *ss,
TupleTableSlot *slot);
-extern ExternalInsertDesc external_insert_init(Relation rel,
- int errAosegno,
- ExternalTableType formatterType,
- char *formatterName);
+extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno,
+ int formatterType, char *formatterName, PlannedStmt* plannedstmt);
+
extern Oid external_insert(ExternalInsertDesc extInsertDesc,
TupleTableSlot *tupTableSlot);
extern void external_insert_finish(ExternalInsertDesc extInsertDesc);
@@ -115,6 +114,12 @@
extern void AtAbort_ExtTables(void);
char* linenumber_atoi(char buffer[20],int64 linenumber);
+extern bool hasErrTblInFmtOpts(List *fmtOpts);
+extern char getExtTblFormatterTypeInFmtOpts(List *fmtOpts);
+extern void external_populate_formatter_actionmask(struct CopyStateData *pstate,
+ FormatterData *formatter);
+
+extern char *getExtTblCategoryInFmtOptsStr(char *fmtStr);
extern char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr);
extern char *getExtTblFormatterTypeInFmtOptsList(List *fmtOpts);
diff --git a/src/include/access/formatter.h b/src/include/access/formatter.h
index f87afc2..7b46d3d 100644
--- a/src/include/access/formatter.h
+++ b/src/include/access/formatter.h
@@ -36,10 +36,19 @@
typedef enum FmtNotification
{
FMT_NONE,
+ FMT_DONE,
FMT_NEED_MORE_DATA
} FmtNotification;
+typedef enum FmtActionMask
+{
+ FMT_UNSET = 0,
+ FMT_SET = 1,
+ FMT_NEEDEXTBUFF = 2,
+ FMT_WRITE_END = 4
+} FmtActionMask;
+
/*
* FormatterData is the node type that is passed as fmgr "context" info
* when a function is called by the External Table Formatter manager.
@@ -49,6 +58,7 @@
{
NodeTag type; /* see T_FormatterData */
+ FmtActionMask fmt_mask;
/* metadata */
Relation fmt_relation;
TupleDesc fmt_tupDesc;
diff --git a/src/include/access/plugstorage.h b/src/include/access/plugstorage.h
index 48c5fde..f904f81 100644
--- a/src/include/access/plugstorage.h
+++ b/src/include/access/plugstorage.h
@@ -45,6 +45,7 @@
#include "executor/tuptable.h"
/* From src/include/access/fileam.h */
+extern char *getExtTblCategoryInFmtOptsStr(char *fmtStr);
extern char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr);
extern char *getExtTblFormatterTypeInFmtOptsList(List *fmtOpts);
@@ -98,6 +99,7 @@
bool ps_has_tuple;
Oid ps_tuple_oid;
TupleTableSlot *ps_tuple_table_slot;
+ int ps_segno;
} PlugStorageData;
@@ -179,7 +181,9 @@
ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
Relation relation,
int formatterType,
- char *formatterName);
+ char *formatterName,
+ PlannedStmt* plannedstmt,
+ int segno);
Oid InvokePlugStorageFormatInsert(FmgrInfo *func,
ExternalInsertDesc extInsertDesc,
diff --git a/src/include/catalog/pg_exttable.h b/src/include/catalog/pg_exttable.h
index 3256bb9..11f053c 100644
--- a/src/include/catalog/pg_exttable.h
+++ b/src/include/catalog/pg_exttable.h
@@ -164,9 +164,10 @@
extern void
RemoveExtTableEntry(Oid relid);
-#define CustomFormatType 'b'
-#define TextFormatType 't'
-#define CsvFormatType 'c'
+#define CustomFormatType 'b'
+#define TextFormatType 't'
+#define CsvFormatType 'c'
+#define NonCustomFormatType 'n'
/* PXF formats*/
#define GpdbWritableFormatName "GPDBWritable"
diff --git a/src/include/cdb/cdbdatalocality.h b/src/include/cdb/cdbdatalocality.h
index c3753ce..0afc45c 100644
--- a/src/include/cdb/cdbdatalocality.h
+++ b/src/include/cdb/cdbdatalocality.h
@@ -32,6 +32,7 @@
#include "catalog/gp_policy.h"
#include "nodes/parsenodes.h"
#include "executor/execdesc.h"
+#include "catalog/pg_exttable.h"
/*
* structure containing information about data residence
@@ -71,12 +72,18 @@
char *hostname;
} VirtualSegmentNode;
+typedef struct blocklocation_file{
+ BlockLocation *locations;
+ int block_num;
+ char *file_uri;
+}blocklocation_file;
+
/*
* calculate_planner_segment_num: based on the parse tree,
* we calculate the appropriate planner segment_num.
*/
-SplitAllocResult * calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
- List *rtable, GpPolicy *intoPolicy, int sliceNum, int fixedVsegNum);
+SplitAllocResult * calculate_planner_segment_num(PlannedStmt *plannedstmt, Query *query,
+ QueryResourceLife resourceLife, int fixedVsegNum);
/*
* udf_collector_walker: the routine to file udfs.
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 8404a7f..e23df2f 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -52,7 +52,7 @@
extern const char *synthetic_sql;
-extern Oid DefineRelation(CreateStmt *stmt, char relkind, char relstorage);
+extern Oid DefineRelation(CreateStmt *stmt, char relkind, char relstorage, const char *formattername);
extern void DefineExternalRelation(CreateExternalStmt *stmt);
@@ -89,6 +89,8 @@
extern void ExecuteTruncate(TruncateStmt *stmt);
+
+
extern void renameatt(Oid myrelid,
const char *oldattname,
const char *newattname,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index a9ca4a0..173a35c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1354,14 +1354,15 @@
* Node that represents the single row error handling (SREH) clause.
* used in COPY and External Tables.
*/
-typedef struct SingleRowErrorDesc
-{
- NodeTag type;
- RangeVar *errtable; /* error table for data format errors */
- int rejectlimit; /* per segment error reject limit */
- bool is_keep; /* true if KEEP indicated (COPY only) */
- bool is_limit_in_rows; /* true for ROWS false for PERCENT */
- bool reusing_existing_errtable; /* var used later in trasform... */
+typedef struct SingleRowErrorDesc {
+ NodeTag type;
+ RangeVar *errtable; /* error table for data format errors */
+ int rejectlimit; /* per segment error reject limit */
+ bool is_keep; /* true if KEEP indicated (COPY only) */
+ bool is_limit_in_rows; /* true for ROWS false for PERCENT */
+ bool reusing_existing_errtable; /* var used later in trasform... */
+ bool is_hdfs_protocol_text; /* hdfs protocol text format table */
+ char *hdfsLoc; /* error table location for hdfs protocol text only */
} SingleRowErrorDesc;
/* ----------------------
@@ -1403,37 +1404,40 @@
* implementation).
* ----------------------
*/
+typedef struct CreateStmtBase {
+ char relKind; // CDB: force relkind to this
+ RangeVar *relation; // relation to create
+ List *tableElts; // column definitions (list of ColumnDef)
+ List *inhRelations; // relations to inherit from (list of inhRelation)
+ List *constraints; // constraints (list of Constraint nodes)
+ List *options; // options from WITH clause
+ OnCommitAction oncommit; // what do we do at COMMIT?
+ char *tablespacename; // table space to use, or NULL
+ List *distributedBy; // what columns we distribute the data by
+ Node *postCreate; // CDB: parse and process after the CREATE
+ // CDB: child table in a partition? Marked during analysis for
+ // interior or leaf parts of the new table. Not marked for a
+ // a partition root or ordinary table.
+ bool is_part_child;
+ bool is_add_part; // CDB: is create adding a part to a partition?
+ Node *partitionBy; // what columns we partition the data by
+} CreateStmtBase;
-typedef struct CreateStmt
-{
- NodeTag type;
- RangeVar *relation; /* relation to create */
- List *tableElts; /* column definitions (list of ColumnDef) */
- List *inhRelations; /* relations to inherit from (list of
- * inhRelation) */
- List *constraints; /* constraints (list of Constraint nodes) */
- List *options; /* options from WITH clause */
- OnCommitAction oncommit; /* what do we do at COMMIT? */
- char *tablespacename; /* table space to use, or NULL */
- List *distributedBy; /* what columns we distribute the data by */
- Node *partitionBy; /* what columns we partition the data by */
- TableOidInfo oidInfo;
- char relKind; /* CDB: force relkind to this */
- char relStorage;
- struct GpPolicy *policy;
- Node *postCreate; /* CDB: parse and process after the CREATE */
- List *deferredStmts; /* CDB: Statements, e.g., partial indexes, that can't be
- * analyzed until after CREATE (until the target table
- * is created and visible). */
- bool is_part_child; /* CDB: child table in a partition? Marked during analysis for
- * interior or leaf parts of the new table. Not marked for a
- * a partition root or ordinary table.
- */
- bool is_add_part; /* CDB: is create adding a part to a partition? */
- bool is_split_part; /* CDB: is create spliting a part? */
- Oid ownerid; /* OID of the role to own this. if InvalidOid, GetUserId() */
- bool buildAoBlkdir; /* whether to build the block directory for an AO table */
- List *attr_encodings; /* attribute storage directives */
+
+typedef struct CreateStmt {
+ NodeTag type;
+ CreateStmtBase base;
+ TableOidInfo oidInfo;
+ char relStorage;
+ struct GpPolicy *policy;
+ List *deferredStmts; /* CDB: Statements, e.g., partial indexes, that can't be
+ * analyzed until after CREATE
+ * (until the target table
+ * is created and visible). */
+ bool is_split_part; /* CDB: is create spliting a part? */
+ Oid ownerid; /* OID of the role to own this. if InvalidOid, GetUserId() */
+ bool buildAoBlkdir; /* whether to build the block directory for an AO table */
+ List *attr_encodings; /* attribute storage directives */
} CreateStmt;
typedef enum SharedStorageOp
@@ -1459,8 +1463,9 @@
*/
typedef enum ExtTableType
{
- EXTTBL_TYPE_LOCATION, /* table defined with LOCATION clause */
- EXTTBL_TYPE_EXECUTE /* table defined with EXECUTE clause */
+ EXTTBL_TYPE_LOCATION, /* table defined with LOCATION clause */
+ EXTTBL_TYPE_EXECUTE, /* table defined with EXECUTE clause */
+ EXTTBL_TYPE_UNKNOWN /* table defined with unknown store */
} ExtTableType;
typedef struct ExtTableTypeDesc
@@ -1472,22 +1477,21 @@
char *command_string;
} ExtTableTypeDesc;
-typedef struct CreateExternalStmt
-{
- NodeTag type;
- RangeVar *relation; /* external relation to create */
- List *tableElts; /* column definitions (list of ColumnDef) */
- Node *exttypedesc; /* LOCATION or EXECUTE information */
- char *format; /* data format name */
- List *formatOpts; /* List of DefElem nodes for data format */
- bool isweb;
- bool iswritable;
- Node *sreh; /* Single row error handling info */
- List *encoding; /* List (size 1 max) of DefElem nodes for
- data encoding */
- List *distributedBy; /* what columns we distribute the data by */
- struct GpPolicy *policy; /* used for writable tables */
-
+typedef struct CreateExternalStmt {
+ NodeTag type;
+ CreateStmtBase base;
+ Node *exttypedesc; /* LOCATION or EXECUTE information */
+ char *format; /* data format name */
+ bool isweb;
+ bool iswritable;
+ bool isexternal;
+ Node *sreh; /* Single row error handling info */
+ List *encoding; /* List (size 1 max) of DefElem nodes for
+ data encoding */
+ struct GpPolicy *policy; /* used for writable tables */
+ bool forceCreateDir; /* true to create external dir */
+ char *parentPath; // keep the parent relative path for partition
+ struct IndexStmt *pkey; /* PRIMARY KEY index, if any */
} CreateExternalStmt;
/* ----------------------
diff --git a/src/include/parser/analyze.h b/src/include/parser/analyze.h
index ccb54f7..1c7df3f 100644
--- a/src/include/parser/analyze.h
+++ b/src/include/parser/analyze.h
@@ -34,27 +34,33 @@
Datum partition_arg_get_val(Node *node, bool *isnull);
/* State shared by transformCreateStmt and its subroutines */
-typedef struct
-{
- const char *stmtType; /* "CREATE TABLE" or "ALTER TABLE" */
- RangeVar *relation; /* relation to create */
- List *inhRelations; /* relations to inherit from */
- bool hasoids; /* does relation have an OID column? */
- bool isalter; /* true if altering existing table */
- bool isaddpart; /* true if create in service of adding a part */
- List *columns; /* ColumnDef items */
- List *ckconstraints; /* CHECK constraints */
- List *fkconstraints; /* FOREIGN KEY constraints */
- List *ixconstraints; /* index-creating constraints */
- List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */
- List *blist; /* "before list" of things to do before
- * creating the table */
- List *alist; /* "after list" of things to do after creating
- * the table */
- List *dlist; /* "deferred list" of utility statements to
- * transfer to the list CreateStmt->deferredStmts
- * for later parse_analyze and dispatch */
- IndexStmt *pkey; /* PRIMARY KEY index, if any */
+typedef struct {
+ bool isExternalTable;
+ const char *stmtType; /* "CREATE TABLE" or "ALTER TABLE" */
+ RangeVar *relation; /* relation to create */
+ List *inhRelations; /* relations to inherit from */
+ bool hasoids; /* does relation have an OID column? */
+ bool isalter; /* true if altering existing table */
+ bool isaddpart; /* true if create in service of adding a part */
+ List *columns; /* ColumnDef items */
+ List *ckconstraints; /* CHECK constraints */
+ List *fkconstraints; /* FOREIGN KEY constraints */
+ List *ixconstraints; /* index-creating constraints */
+ List *inh_indexes; /* cloned indexes from INCLUDING INDEXES */
+ List *blist; /* "before list" of things to do before
+ * creating the table */
+ List *alist; /* "after list" of things to do after creating
+ * the table */
+ List *dlist; /* "deferred list" of utility statements to
+ * transfer to the list CreateStmt->deferredStmts
+ * for later parse_analyze and dispatch */
+ IndexStmt *pkey; /* PRIMARY KEY index, if any */
+
+ // for external table only
+ Node *exttypedesc;
+ char *format;
+ bool iswritable;
+ char *parentPath;
} CreateStmtContext;
Query *transformCreateStmt(ParseState *pstate, CreateStmt *stmt,
@@ -83,4 +89,6 @@
extern struct GpPolicy *createRandomDistribution(int maxattrs);
+extern void recognizeExternalRelationFormatterOptions(
+ CreateExternalStmt *createExtStmt);
#endif /* ANALYZE_H */
diff --git a/src/include/utils/uri.h b/src/include/utils/uri.h
index a37d9e7..9c5aedd 100644
--- a/src/include/utils/uri.h
+++ b/src/include/utils/uri.h
@@ -32,7 +32,8 @@
URI_HTTP,
URI_GPFDIST,
URI_CUSTOM,
- URI_GPFDISTS
+ URI_GPFDISTS,
+ URI_HDFS
} UriProtocol;
#define PROTOCOL_FILE "file://"
@@ -41,6 +42,7 @@
#define PROTOCOL_GPFDIST "gpfdist://"
#define PROTOCOL_GPFDISTS "gpfdists://"
#define PROTOCOL_PXF "pxf://"
+#define PROTOCOL_HDFS "hdfs://"
/*
* sometimes we don't want to parse the whole URI but just take a peek at
@@ -52,6 +54,7 @@
#define IS_GPFDISTS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_GPFDISTS, strlen(PROTOCOL_GPFDISTS)) == 0)
#define IS_FTP_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_FTP, strlen(PROTOCOL_FTP)) == 0)
#define IS_PXF_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_PXF, strlen(PROTOCOL_PXF)) == 0)
+#define IS_HDFS_URI(uri_str) (pg_strncasecmp(uri_str, PROTOCOL_HDFS, strlen(PROTOCOL_HDFS)) == 0)
typedef struct Uri
{