| #include <mapred.h> |
| #include <except.h> |
| #include <mapred_errors.h> |
| |
| #include <stdio.h> |
| #include <stdarg.h> |
| #include <unistd.h> /* for file "access" test */ |
| #include <errno.h> |
| |
| #define scalarfree(x) \ |
| do { \ |
| if (x) { \ |
| free(x); \ |
| x = NULL; \ |
| } \ |
| } while (0) |
| |
| |
| /* instantiate the extern from mapred.h */ |
| const char *mapred_kind_name[MAPRED_MAXKIND+1] = |
| { |
| "<NULL>", |
| "DOCUMENT", |
| "INPUT", |
| "OUTPUT", |
| "MAP", |
| "TRANSITION", |
| "CONSOLIDATE", |
| "FINALIZE", |
| "REDUCE", |
| "TASK", |
| "RUN", |
| "INTERNAL" |
| }; |
| |
| /* instantiate default parameter names from mapred.h */ |
| const char *default_parameter_names[MAPRED_MAXKIND+1][2] = |
| { |
| {NULL, NULL}, /* MAPRED_NO_KIND */ |
| {NULL, NULL}, /* MAPRED_DOCUMENT */ |
| {NULL, NULL}, /* MAPRED_INPUT */ |
| {NULL, NULL}, /* MAPRED_OUTPUT */ |
| {"key", "value"}, /* MAPRED_MAPPER */ |
| {"state", "value"}, /* MAPRED_TRANSITION */ |
| {"state1", "state2"}, /* MAPRED_COMBINER */ |
| {"state", NULL}, /* MAPRED_FINALIZER */ |
| {NULL, NULL}, /* MAPRED_REDUCER */ |
| {NULL, NULL}, /* MAPRED_TASK */ |
| {NULL, NULL}, /* MAPRED_EXECUTION */ |
| {NULL, NULL} /* MAPRED_ADT */ |
| }; |
| |
| /* instantiate default parameter names from mapred.h */ |
| const char *default_return_names[MAPRED_MAXKIND+1][2] = |
| { |
| {NULL, NULL}, /* MAPRED_NO_KIND */ |
| {NULL, NULL}, /* MAPRED_DOCUMENT */ |
| {NULL, NULL}, /* MAPRED_INPUT */ |
| {NULL, NULL}, /* MAPRED_OUTPUT */ |
| {"key", "value"}, /* MAPRED_MAPPER */ |
| {"value", NULL}, /* MAPRED_TRANSITION */ |
| {"value", NULL}, /* MAPRED_COMBINER */ |
| {"value", NULL}, /* MAPRED_FINALIZER */ |
| {NULL, NULL}, /* MAPRED_REDUCER */ |
| {NULL, NULL}, /* MAPRED_TASK */ |
| {NULL, NULL}, /* MAPRED_EXECUTION */ |
| {NULL, NULL} /* MAPRED_ADT */ |
| }; |
| |
| |
| /* |
| * libpq Errors that we care about |
| * (would be better to add <errcodes.h> to the include path) |
| */ |
| const char *IN_FAILED_SQL_TRANSACTION = "25P02"; |
| const char *OBJ_DOES_NOT_EXIST = "42P01"; |
| const char *SCHEMA_DOES_NOT_EXIST = "3F000"; |
| |
| const char *DISTRIBUTION_NOTICE = "NOTICE: Table doesn't have 'DISTRIBUTED BY' clause"; |
| |
| /* local prototypes */ |
| void * mapred_malloc(int size); |
| void mapred_free(void *ptr); |
| |
| buffer_t * makebuffer(size_t bufsize, size_t grow); |
| void bufreset(buffer_t *b); |
| void bufcat(buffer_t **bufp, char* fmt); |
| void ignore_notice_handler(void *arg, const PGresult *res); |
| void print_notice_handler(void *arg, const PGresult *res); |
| void mapred_setup_columns(PGconn *conn, mapred_object_t *obj); |
| |
| boolean mapred_create_object(PGconn *conn, mapred_document_t *doc, |
| mapred_object_t *obj); |
| void mapred_remove_object(PGconn *conn, mapred_document_t *doc, |
| mapred_object_t *obj); |
| void mapred_run_queries(PGconn *conn, mapred_document_t *doc); |
| |
| void mapred_resolve_dependencies(PGconn *conn, mapred_document_t *doc); |
| void mapred_resolve_ref(mapred_olist_t *olist, mapred_reference_t *ref); |
| void mapred_resolve_object(PGconn *conn, mapred_document_t *doc, |
| mapred_object_t *obj, int *exec_count); |
| |
| void lookup_function_in_catalog(PGconn *conn, mapred_document_t *doc, |
| mapred_object_t *obj); |
| int mapred_obj_error(mapred_object_t *obj, char *fmt, ...) |
| __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); |
| |
| /* Wrappers around malloc/free to handle error conditions more cleanly */ |
| void *mapred_malloc(int size) |
| { |
| void *m; |
| XASSERT(size > 0); |
| |
| if (global_debug_flag && global_verbose_flag) |
| fprintf(stderr, "Allocating %d bytes: ", size); |
| |
| m = malloc(size); |
| if (!m) |
| XRAISE(MEMORY_ERROR, "Memory allocation failure"); |
| |
| if (global_debug_flag && global_verbose_flag) |
| fprintf(stderr, "%p\n", m); |
| |
| return m; |
| } |
| |
| #define copyscalar(s) \ |
| strcpy(mapred_malloc(strlen(s)+1), s) |
| |
| void mapred_free(void *ptr) |
| { |
| XASSERT(ptr); |
| |
| if (global_debug_flag && global_verbose_flag) |
| fprintf(stderr, "Freeing memory: %p\n", ptr); |
| |
| free(ptr); |
| } |
| |
| int mapred_obj_error(mapred_object_t *obj, char *fmt, ...) |
| { |
| va_list arg; |
| |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| |
| fprintf(stderr, "Error: "); |
| if (obj && obj->name) |
| fprintf(stderr, "%s '%s': ", mapred_kind_name[obj->kind], obj->name); |
| if (obj && !obj->name) |
| fprintf(stderr, "%s: ", mapred_kind_name[obj->kind]); |
| |
| va_start(arg, fmt); |
| vfprintf(stderr, fmt, arg); |
| va_end(arg); |
| if (obj->line > 0) |
| fprintf(stderr, ", at line %d\n", obj->line); |
| else |
| fprintf(stderr, "\n"); |
| |
| return MAPRED_PARSE_ERROR; |
| } |
| |
| static void mapred_obj_debug(mapred_object_t *obj) |
| { |
| mapred_plist_t *plist; |
| |
| if (!obj) |
| { |
| fprintf(stderr, "Object is NULL"); |
| return; |
| } |
| fprintf(stderr, "%s: \n", mapred_kind_name[obj->kind]); |
| fprintf(stderr, " NAME: '%s': \n", obj->name ? obj->name : "-"); |
| switch (obj->kind) |
| { |
| case MAPRED_NO_KIND: |
| case MAPRED_DOCUMENT: |
| case MAPRED_ADT: |
| case MAPRED_INPUT: |
| case MAPRED_OUTPUT: |
| case MAPRED_TASK: |
| case MAPRED_EXECUTION: |
| case MAPRED_REDUCER: |
| { |
| fprintf(stderr, " DEBUG: 'debug output not yet implemented'\n"); |
| break; |
| } |
| |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| { |
| fprintf(stderr, " LANGUAGE: %s\n", obj->u.function.language ? |
| obj->u.function.language : "-"); |
| fprintf(stderr, " PARAMETERS: ["); |
| for (plist = obj->u.function.parameters; plist; plist = plist->next) |
| { |
| fprintf(stderr, "%s %s%s", plist->name, plist->type, |
| plist->next ? ", " : ""); |
| } |
| fprintf(stderr,"]\n"); |
| fprintf(stderr, " RETURNS: ["); |
| for (plist = obj->u.function.returns; plist; plist = plist->next) |
| { |
| fprintf(stderr, "%s %s%s", plist->name, plist->type, |
| plist->next ? ", " : ""); |
| } |
| fprintf(stderr,"]\n"); |
| fprintf(stderr, " LIBRARY: %s\n", obj->u.function.library ? |
| obj->u.function.library : "-"); |
| fprintf(stderr, " FUNCTION: %s\n", obj->u.function.body ? |
| obj->u.function.body : "-"); |
| break; |
| } |
| } |
| } |
| |
| |
| /* -------------------------------------------------------------------------- */ |
| /* Functions that play with buffers */ |
| /* -------------------------------------------------------------------------- */ |
| buffer_t *makebuffer(size_t bufsize, size_t grow) |
| { |
| buffer_t *b; |
| |
| XASSERT(bufsize > 0 && grow > 0); |
| |
| b = mapred_malloc(sizeof(buffer_t) + bufsize); |
| b->buffer = (char*)(b+1); |
| b->bufsize = bufsize; |
| b->grow = grow; |
| b->position = 0; |
| b->buffer[0] = '\0'; |
| return b; |
| } |
| |
| /* to re-use a buffer just "reset" it */ |
| void bufreset(buffer_t *b) |
| { |
| XASSERT(b && b->bufsize > 0 && b->grow > 0); |
| b->position = 0; |
| b->buffer[0] = '\0'; |
| } |
| |
| /* |
| * A simple wrapper around a strncpy that handles resizing an input buffer |
| * when needed. |
| */ |
| void bufcat(buffer_t **bufp, char* str) |
| { |
| buffer_t *b; |
| size_t len; |
| |
| XASSERT(bufp && *bufp); |
| XASSERT(str); |
| |
| b = *bufp; |
| len = strlen(str); |
| |
| /* If the buffer is too small, grow it */ |
| if (b->bufsize <= b->position + len) |
| { |
| buffer_t *newbuf; |
| |
| /* use the minumum of "grow" and the new length for the grow amount */ |
| if (b->grow <= len) |
| b->grow = len+1; |
| |
| newbuf = makebuffer(b->bufsize + b->grow, b->grow); |
| memcpy(newbuf->buffer, b->buffer, b->position+1); |
| newbuf->position = b->position; |
| *bufp = newbuf; |
| mapred_free(b); |
| b = newbuf; |
| } |
| |
| /* We are now guaranteed that we have enough space in the buffer */ |
| XASSERT( b->bufsize - b->position > len ); |
| strcpy(b->buffer+b->position, str); |
| b->position += len; |
| b->buffer[b->position] = '\0'; |
| } |
| |
| |
| /* |
| * Currently we just ignore all warnings, may eventually do something |
| * smarter, but this is preferable to dumping them to libpq's default |
| * of dumping them to stderr. |
| */ |
| void ignore_notice_handler(void *arg, const PGresult *res) |
| { |
| } |
| |
| void print_notice_handler(void *arg, const PGresult *res) |
| { |
| char *error = PQresultErrorMessage(res); |
| if (!strncmp(error, DISTRIBUTION_NOTICE, strlen(DISTRIBUTION_NOTICE)-1)) |
| return; |
| |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| fprintf(stderr, "%s", error); |
| } |
| |
| /* |
| * If a function is already defined in the database we need to be able to |
| * lookup the function information directly from the catalog. This is |
| * fairly similar to func_get_detail in backend/parser/parse_func.c, but |
| * the lookup from yaml is slightly different because we don't know the |
| * context that the function is in, but we _might_ have been told some |
| * of the parameter information already. |
| */ |
| void lookup_function_in_catalog(PGconn *conn, mapred_document_t *doc, |
| mapred_object_t *obj) |
| { |
| PGresult *result = NULL; |
| PGresult *result2 = NULL; |
| mapred_plist_t *plist, *plist2; |
| mapred_plist_t *newitem = NULL; |
| mapred_plist_t *returns = NULL; |
| buffer_t *buffer = NULL; |
| char *tmp1 = NULL; |
| char *tmp2 = NULL; |
| char *tmp3 = NULL; |
| #define STR_LEN 50 |
| char str[STR_LEN]; |
| int i, nargs; |
| |
| XASSERT(doc); |
| XASSERT(obj); |
| XASSERT(obj->kind == MAPRED_MAPPER || |
| obj->kind == MAPRED_TRANSITION || |
| obj->kind == MAPRED_COMBINER || |
| obj->kind == MAPRED_FINALIZER); |
| |
| obj->internal = true; |
| obj->u.function.internal_returns = NULL; |
| |
| XTRY |
| { |
| buffer = makebuffer(1024, 1024); |
| |
| /* Try to lookup the specified function */ |
| bufcat(&buffer, |
| "SELECT proretset, prorettype::regtype, pronargs,\n" |
| " proargnames, proargmodes, \n" |
| " (proargtypes::regtype[])[0:pronargs] as proargtypes,\n" |
| " proallargtypes::regtype[],\n"); |
| |
| /* |
| * If we have return types defined in the yaml then we want to resolve |
| * them to their authorative names for comparison purposes. |
| */ |
| if (obj->u.function.returns) |
| { |
| bufcat(&buffer, " ARRAY["); |
| for (plist = obj->u.function.returns; plist; plist = plist->next) |
| { |
| if (plist->type) |
| { |
| bufcat(&buffer, "'"); |
| bufcat(&buffer, plist->type); |
| bufcat(&buffer, "'::regtype"); |
| } |
| else |
| { |
| /* If we don't know the type, punt */ |
| bufcat(&buffer, "'-'::regtype"); |
| } |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, "] as yaml_rettypes\n"); |
| } |
| else |
| { |
| bufcat(&buffer, " null::regtype[] as yaml_rettypes\n"); |
| } |
| |
| bufcat(&buffer, |
| "FROM pg_proc\n" |
| "WHERE prokind = 'f'\n" |
| " AND proname = lower('"); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, "')\n"); |
| |
| /* Fill in the known parameter types */ |
| nargs = 0; |
| if (obj->u.function.parameters) |
| { |
| bufcat(&buffer, " AND (proargtypes::regtype[])[0:pronargs] = ARRAY["); |
| for (plist = obj->u.function.parameters; plist; plist = plist->next) |
| { |
| nargs++; |
| bufcat(&buffer, "'"); |
| bufcat(&buffer, plist->type); |
| bufcat(&buffer, "'::regtype"); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| snprintf(str, STR_LEN, "]\n AND pronargs=%d\n", nargs); |
| bufcat(&buffer, str); |
| } |
| |
| /* Run the SQL */ |
| if (global_print_flag || global_debug_flag) |
| printf("%s", buffer->buffer); |
| result = PQexec(conn, buffer->buffer); |
| bufreset(buffer); |
| |
| if (PQresultStatus(result) != PGRES_TUPLES_OK) |
| { |
| /* |
| * The SQL statement failed: |
| * Most likely scenario is a bad datatype causing the regtype cast |
| * to fail. |
| */ |
| char *code = PQresultErrorField(result, PG_DIAG_SQLSTATE); |
| char *error = PQresultErrorMessage(result); |
| |
| printf("errcode=\"%s\"\n", code); /* Todo: validate expected error code */ |
| |
| mapred_obj_error(obj, "SQL Error resolving function: \n %s", |
| error); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| else if (PQntuples(result) == 0) |
| { |
| /* No such function */ |
| mapred_obj_error(obj, "No such function"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| else if (PQntuples(result) > 1) |
| { |
| XASSERT(!obj->u.function.parameters); |
| mapred_obj_error(obj, "Ambiguous function, supply a function " |
| "prototype for disambiguation"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| else |
| { |
| char *value; |
| int len; |
| boolean retset; |
| int nargs; |
| char *argtypes = NULL; |
| char *argnames = ""; |
| char *argmodes = ""; |
| char *allargtypes = ""; |
| char *rettype = NULL; |
| char *yaml_rettypes = ""; |
| char *type, *typetokens = NULL; |
| char *name, *nametokens = NULL; |
| char *mode, *modetokens = NULL; |
| boolean name_end, mode_end; |
| |
| value = PQgetvalue(result, 0, 0); /* Column 0: proretset */ |
| retset = (value[0] == 't'); |
| value = PQgetvalue(result, 0, 1); /* Column 1: prorettype */ |
| rettype = value; |
| value = PQgetvalue(result, 0, 2); /* Column 2: pronargs */ |
| nargs = (int) strtol(value, (char **) NULL, 10); |
| |
| /* |
| * Arrays are formatted as: "{value,value,...}" |
| * of which we only want "value,value, ..." |
| * so find the part of the string between the braces |
| */ |
| if (!PQgetisnull(result, 0, 3)) /* Column 3: proargnames */ |
| { |
| value = PQgetvalue(result, 0, 3); |
| argnames = value+1; |
| len = strlen(argnames); |
| if (len > 0) |
| argnames[len-1] = '\0'; |
| } |
| |
| if (!PQgetisnull(result, 0, 4)) /* Column 4: proargmodes */ |
| { |
| value = PQgetvalue(result, 0, 4); |
| argmodes = value+1; |
| len = strlen(argmodes); |
| if (len > 0) |
| argmodes[len-1] = '\0'; |
| } |
| if (!PQgetisnull(result, 0, 5)) /* Column 5: proargtypes */ |
| { |
| value = PQgetvalue(result, 0, 5); |
| argtypes = value+1; |
| len = strlen(argtypes); |
| if (len > 0) |
| argtypes[len-1] = '\0'; |
| } |
| if (!PQgetisnull(result, 0, 6)) /* Column 6: proallargtypes */ |
| { |
| value = PQgetvalue(result, 0, 6); |
| allargtypes = value+1; |
| len = strlen(allargtypes); |
| if (len > 0) |
| allargtypes[len-1] = '\0'; |
| } |
| if (!PQgetisnull(result, 0, 7)) /* Column 7: yaml_rettypes */ |
| { |
| value = PQgetvalue(result, 0, 7); |
| yaml_rettypes = value+1; |
| len = strlen(yaml_rettypes); |
| if (len > 0) |
| yaml_rettypes[len-1] = '\0'; |
| } |
| |
| /* |
| * These constraints should all be enforced in the catalog, so |
| * if something is wrong then it's a coding error above. |
| */ |
| XASSERT(rettype); |
| XASSERT(argtypes); |
| XASSERT(nargs >= 0); |
| |
| /* |
| * If we just derived the parameters from the catalog then we |
| * need complete our internal metadata. |
| */ |
| plist = NULL; |
| if (!obj->u.function.parameters) |
| { |
| /* strtok is destructive and we need to preserve the original |
| * string, so we make some annoying copies prior to strtok. |
| */ |
| tmp1 = copyscalar(argtypes); |
| tmp2 = copyscalar(argnames); |
| tmp3 = copyscalar(argmodes); |
| |
| type = strtok_r(tmp1, ",", &typetokens); |
| name = strtok_r(tmp2, ",", &nametokens); |
| mode = strtok_r(tmp3, ",", &modetokens); |
| |
| /* |
| * Name and mode are used for IN/OUT parameters and may not be |
| * present. In the event that they are we are looking for: |
| * - the "i" (in) arguments |
| * - the "b" (inout) arguments |
| * we skip over: |
| * - the "o" (out) arguments. |
| * - the "t" (table out) arguments. |
| * |
| * Further it is possible for some of the arguments to be named |
| * and others to be unnamed. The unnamed arguments will show |
| * up as "" (two quotes, not an empty string) if there is an |
| * argnames defined. |
| * |
| * If argmodes is not defined then all names in proargnames |
| * refer to input arguments. |
| */ |
| |
| while (mode && strcmp(mode, "i") && strcmp(mode, "b")) |
| { |
| name = strtok_r(NULL, ",", &nametokens); |
| mode = strtok_r(NULL, ",", &modetokens); |
| } |
| name_end = (NULL == name); |
| mode_end = (NULL == mode); |
| |
| i = 0; |
| while (type) |
| { |
| /* Keep track of which parameter we are on */ |
| i++; |
| XASSERT(i <= nargs); |
| |
| /* |
| * If a name was not specified by the user, and was not |
| * specified by the in/out parameters then we assign it a |
| * default name. |
| */ |
| if (!name) |
| { |
| /* single argument functions always default to "value" */ |
| if (i == 1 && nargs == 1) |
| name = (char*) "value"; |
| |
| /* Base name on default parameter names for the first |
| * two arguments */ |
| else if (i <= 2) |
| name = (char*) default_parameter_names[obj->kind][i-1]; |
| |
| /* |
| * If we still didn't decide on a name, make up |
| * something useless. |
| */ |
| if (!name) |
| { |
| snprintf(str, STR_LEN, "parameter%d", i); |
| name = str; |
| } |
| } |
| |
| if (!plist) |
| { |
| plist = mapred_malloc(sizeof(mapred_plist_t)); |
| plist->name = copyscalar(name); |
| plist->type = copyscalar(type); |
| plist->next = (mapred_plist_t *) NULL; |
| obj->u.function.parameters = plist; |
| } |
| else |
| { |
| plist->next = mapred_malloc(sizeof(mapred_plist_t)); |
| plist = plist->next; |
| plist->name = copyscalar(name); |
| plist->type = copyscalar(type); |
| plist->next = (mapred_plist_t *) NULL; |
| } |
| |
| /* Procede to the next parameter */ |
| type = strtok_r(NULL, ",", &typetokens); |
| if (!name_end) |
| { |
| name = strtok_r(NULL, ",", &nametokens); |
| name_end = (NULL == name); |
| } |
| if (!mode_end) |
| { |
| mode = strtok_r(NULL, ",", &modetokens); |
| mode_end = (NULL == mode); |
| } |
| while (mode && strcmp(mode, "i") && strcmp(mode, "b")) |
| { |
| if (!name_end) |
| { |
| name = strtok_r(NULL, ",", &nametokens); |
| name_end = (NULL == name); |
| } |
| if (!mode_end) |
| { |
| mode = strtok_r(NULL, ",", &modetokens); |
| mode_end = (NULL == mode); |
| } |
| } |
| } |
| |
| mapred_free(tmp1); |
| mapred_free(tmp2); |
| mapred_free(tmp3); |
| tmp1 = NULL; |
| tmp2 = NULL; |
| tmp3 = NULL; |
| } |
| |
| /* |
| * Check that the number of parameters received is appropriate. |
| * This would be better moved to a generalized validation routine. |
| */ |
| switch (obj->kind) |
| { |
| |
| case MAPRED_MAPPER: |
| /* |
| * It would probably be possible to start supporting zero |
| * argument mappers, but: |
| * 1) It would require more modifications |
| * 2) Doesn't currently have a known use case |
| * 3) Has easy workarounds |
| */ |
| if (nargs < 1) |
| { |
| mapred_obj_error(obj, "Transition functions require " |
| "two or more parameters"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| break; |
| |
| case MAPRED_TRANSITION: |
| if (nargs < 2) |
| { |
| mapred_obj_error(obj, "Transition functions require " |
| "two or more parameters"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| if (retset) |
| { |
| mapred_obj_error(obj, "Transition functions cannot " |
| "be table functions"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| break; |
| |
| case MAPRED_COMBINER: |
| if (nargs != 2) |
| { |
| mapred_obj_error(obj, "Consolidate functions require " |
| "exactly two parameters"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| if (retset) |
| { |
| mapred_obj_error(obj, "Consolidate functions cannot " |
| "be table functions"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| break; |
| |
| case MAPRED_FINALIZER: |
| if (nargs != 1) |
| { |
| mapred_obj_error(obj, "Finalize functions require " |
| "exactly one parameter"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| break; |
| |
| default: |
| XASSERT(false); |
| } |
| |
| /* Fill in return type information */ |
| if (retset) |
| obj->u.function.mode = MAPRED_MODE_MULTI; |
| else |
| obj->u.function.mode = MAPRED_MODE_SINGLE; |
| |
| /* |
| * Determine the return type information, there are 3 primary |
| * subcases: |
| * |
| * 1) Function is defined with OUT/TABLE parameters. |
| * 2) Function returns a simple type. |
| * 3) Function returns a complex type. |
| * 4) Return type is void [error] |
| */ |
| plist = returns = NULL; |
| if (argmodes && strlen(argmodes) > 0) |
| { |
| |
| /* strtok is destructive and we need to preserve the original |
| * string, so we make some annoying copies prior to strtok. |
| */ |
| tmp1 = copyscalar(allargtypes); |
| tmp2 = copyscalar(argnames); |
| tmp3 = copyscalar(argmodes); |
| |
| type = strtok_r(tmp1, ",", &typetokens); |
| name = strtok_r(tmp2, ",", &nametokens); |
| mode = strtok_r(tmp3, ",", &modetokens); |
| |
| i = 1; |
| while (mode) |
| { |
| while (mode && |
| strcmp(mode, "o") && |
| strcmp(mode, "b") && |
| strcmp(mode, "t")) |
| { |
| /* skip input parameters */ |
| type = strtok_r(NULL, ",", &typetokens); |
| name = strtok_r(NULL, ",", &nametokens); |
| mode = strtok_r(NULL, ",", &modetokens); |
| } |
| if (mode) |
| { |
| XASSERT(type); |
| |
| newitem = mapred_malloc(sizeof(mapred_plist_t)); |
| |
| /* |
| * Note we haven't made local copies of these, we will |
| * do this after resolution when validating against any |
| * RETURNS defined in the yaml, if any. |
| */ |
| |
| if( NULL != name && |
| 0 != strcmp(name, "") && |
| 0 != strcmp(name, "\"\"") ) |
| { |
| /*if name defined in db, just use it*/ |
| newitem->name = copyscalar(name); |
| } |
| else |
| { |
| /*else just obey the default name in db*/ |
| snprintf( str, STR_LEN, "column%d", i); |
| newitem->name = copyscalar(str); |
| } |
| |
| newitem->type = copyscalar(type); |
| |
| newitem->next = NULL; |
| |
| if (plist) |
| plist->next = newitem; |
| else |
| returns = newitem; |
| plist = newitem; |
| ++i; |
| } |
| type = strtok_r(NULL, ",", &typetokens); |
| name = strtok_r(NULL, ",", &nametokens); |
| mode = strtok_r(NULL, ",", &modetokens); |
| } |
| |
| mapred_free(tmp1); |
| mapred_free(tmp2); |
| mapred_free(tmp3); |
| tmp1 = NULL; |
| tmp2 = NULL; |
| tmp3 = NULL; |
| } |
| |
| /* |
| * If the arguments were not defined in the function definition then |
| * we check to see if this was a complex type by looking up the type |
| * information in pg_attribute. |
| */ |
| if (!returns) |
| { |
| bufcat(&buffer, |
| "SELECT attname, atttypid::regtype\n" |
| "FROM pg_attribute a\n" |
| "JOIN pg_class c on (a.attrelid = c.oid)\n" |
| "WHERE not a.attisdropped\n" |
| " AND a.attnum > 0\n" |
| " AND c.reltype = '"); |
| bufcat(&buffer, rettype); |
| bufcat(&buffer, |
| "'::regtype\n" |
| "ORDER BY -attnum"); |
| result2 = PQexec(conn, buffer->buffer); |
| bufreset(buffer); |
| |
| if (PQresultStatus(result2) != PGRES_TUPLES_OK) |
| { |
| char *error = PQresultErrorMessage(result); |
| |
| mapred_obj_error(obj, "Error resolving function: %s", error); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| else if (PQntuples(result2) > 0) |
| { |
| /* We have a complex type, build the return list */ |
| for (i = 0; i < PQntuples(result2); i++) |
| { |
| name = PQgetvalue(result2, i, 0); |
| type = PQgetvalue(result2, i, 1); |
| |
| newitem = mapred_malloc(sizeof(mapred_plist_t)); |
| newitem->name = copyscalar(name); |
| newitem->type = copyscalar(type); |
| newitem->next = returns; |
| returns = newitem; |
| } |
| } |
| } |
| |
| /* |
| * If the return types were not defined in either the argument list |
| * nor the catalog then we assume it is a simple type. |
| */ |
| if (!returns) |
| { |
| /* Check against "void" which is a special return type that |
| * means there is no return value - which we don't support for |
| * mapreduce. |
| */ |
| if (!strcmp(rettype, "void")) |
| { |
| mapred_obj_error(obj, "Function returns void"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| returns = mapred_malloc(sizeof(mapred_plist_t)); |
| returns->type = copyscalar(rettype); |
| returns->name = NULL; |
| returns->next = NULL; |
| } |
| |
| /* |
| * We now should have a returns list, compare it against the RETURNS |
| * list given in the yaml. The yaml overrides return names, but can |
| * not override return types. If the return types are incompatible |
| * raise an error. |
| */ |
| obj->u.function.internal_returns = returns; |
| if (obj->u.function.returns) |
| { |
| /* |
| * The first thing to do is normalize the given return types |
| * with their formal names. This will, for example turn a type |
| * like "float8" => "double precision". The input name might |
| * be correct (float8) but we need it represented as the formal |
| * name so that we can compare against the formal name we got |
| * when we looked up the function in the catalog. |
| */ |
| plist = obj->u.function.returns; |
| type = strtok_r(yaml_rettypes, ",", &typetokens); |
| while (plist) |
| { |
| XASSERT(type); /* should be an equal number */ |
| |
| /* |
| * If we have a type specified replace it with the one we |
| * resolved from the select stmt, otherwise just keep it |
| * as NULL and fill it in during the compare against what |
| * was in the catalog. |
| */ |
| if (plist->type) |
| { |
| mapred_free(plist->type); |
| |
| /* |
| * When in an array the typname may get wrapped in |
| * double quotes, if so we need to strip them back out. |
| */ |
| if (type[0] == '"') |
| { |
| plist->type = copyscalar(type+1); |
| plist->type[strlen(plist->type)-1] = '\0'; |
| } |
| else |
| { |
| plist->type = copyscalar(type); |
| } |
| } |
| |
| plist = plist->next; |
| type = strtok_r(NULL, ",", &typetokens); |
| } |
| |
| |
| /* Compare against actual function return types */ |
| plist = obj->u.function.returns; |
| plist2 = returns; |
| while (plist && plist2) |
| { |
| XASSERT(plist->name); /* always defined in YAML */ |
| XASSERT(plist2->type); /* always defined in SQL */ |
| |
| /* |
| * In the YAML it is possible to have a name without a type, |
| * if that is the case then simply take the SQL type. |
| */ |
| if (!plist->type) |
| plist->type = copyscalar(plist2->type); |
| else if (strcmp(plist->type, plist2->type)) |
| break; |
| plist = plist->next; |
| plist2 = plist2->next; |
| } |
| if (plist || plist2) |
| { |
| mapred_obj_error(obj, "RETURN parameter '%s %s' != '%s %s'", |
| plist ? plist->name : "\"\"", |
| plist ? plist->type : "-", |
| plist2 ? (plist2->name ? plist2->name : plist->name) : "\"\"", |
| plist2 ? plist2->type : "-"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| } |
| else |
| { |
| obj->u.function.returns = returns; |
| i = 0; |
| for (plist = returns; plist; plist = plist->next) |
| { |
| XASSERT(plist->type); |
| i++; |
| plist->type = copyscalar(plist->type); |
| |
| /* |
| * if plist->name is not null and empty string, |
| * then use that name |
| */ |
| if (plist->name && |
| 0 != strcmp(plist->name, "") && |
| 0 != strcmp(plist->name, "\"\"") ) |
| { |
| plist->name = copyscalar(plist->name); |
| } |
| /* |
| * else We need generate a name anyway |
| */ |
| else |
| { |
| /* |
| * Manufacture a name for a column based on default |
| * naming rules. |
| */ |
| name = (char*) NULL; |
| if (i <= 2) |
| name = (char*) default_return_names[obj->kind][i-1]; |
| if (!name) |
| { |
| snprintf(str, STR_LEN, "parameter%d", i); |
| name = str; |
| } |
| plist->name = copyscalar(name); |
| } |
| } |
| } |
| } |
| } |
| XFINALLY |
| { |
| if (result) |
| PQclear(result); |
| if (result2) |
| PQclear(result2); |
| if (buffer) |
| mapred_free(buffer); |
| |
| if (tmp1) |
| mapred_free(tmp1); |
| if (tmp2) |
| mapred_free(tmp2); |
| if (tmp3) |
| mapred_free(tmp3); |
| |
| } |
| XTRY_END; |
| } |
| |
| void mapred_run_document(PGconn *conn, mapred_document_t *doc) |
| { |
| PGresult *result; |
| mapred_olist_t *olist; |
| boolean done; |
| boolean executes; |
| |
| /* Ignore NOTICE messages from database */ |
| PQsetNoticeReceiver(conn, ignore_notice_handler, NULL); |
| |
| /* Establish a name-prefix for temporary objects */ |
| doc->prefix = mapred_malloc(64); |
| snprintf(doc->prefix, 64, "mapreduce_%d_", PQbackendPID(conn)); |
| |
| |
| /* |
| * Resolution of dependecies was defered until now so that |
| * a database connection could be available to look up any |
| * dependencies that are not defined within the YAML document. |
| */ |
| if (global_verbose_flag) |
| fprintf(stderr, " - Resolving Dependencies:\n"); |
| mapred_resolve_dependencies(conn, doc); |
| if (global_verbose_flag) |
| fprintf(stderr, " - DONE\n"); |
| |
| XTRY |
| { |
| /* |
| * By running things within a transaction we can effectively |
| * obscure the mapreduce sql definitions. They could still |
| * be exposed by a savy user via mapreduce views that access |
| * the catalog tables, but it's a cleaner method of handling |
| * things. |
| */ |
| result = PQexec(conn, "BEGIN TRANSACTION"); |
| if (PQresultStatus(result) != PGRES_COMMAND_OK) |
| { |
| PQclear(result); |
| XRAISE(MAPRED_SQL_ERROR, NULL); |
| } |
| PQclear(result); |
| |
| /* With dependencies resolved we can now try creating objects */ |
| if (global_verbose_flag) |
| fprintf(stderr, " - Creating Objects\n"); |
| |
| /* |
| * we don't try to create any executes until all non-executes are |
| * successfully created |
| */ |
| executes = false; |
| do |
| { |
| boolean progress = false; |
| |
| /* |
| * We keep a buffer of errors during parsing, and display them at |
| * the end if they haven't been resolved. |
| */ |
| if (!doc->errors) |
| doc->errors = makebuffer(1024, 1024); |
| else |
| bufreset(doc->errors); |
| |
| /* |
| * Loop through the objects, creating each in turn. |
| * If an object has dependencies that have not been created yet |
| * it will return false and we will make additional passes through |
| * the object list |
| */ |
| done = true; |
| for (olist = doc->objects; olist; olist = olist->next) |
| { |
| mapred_object_t *obj = olist->object; |
| if (!obj->created && |
| (executes || obj->kind != MAPRED_EXECUTION)) |
| { |
| if (global_verbose_flag && obj->kind != MAPRED_ADT) |
| { |
| fprintf(stderr, " - %s:\n", |
| mapred_kind_name[obj->kind]); |
| fprintf(stderr, " NAME: %s\n", obj->name); |
| } |
| |
| if (!mapred_create_object(conn, doc, obj)) |
| done = false; |
| else |
| progress = true; |
| } |
| } |
| |
| /* |
| * If all non-execute objects have been created then switch over |
| * and start creating the execution jobs |
| */ |
| if (done && !executes) |
| { |
| executes = true; |
| done = false; |
| } |
| |
| /* |
| * If we looped through the list, we are not done, and no progress |
| * was made then we have an infinite cycle and should probably stop. |
| */ |
| if (!done && !progress) |
| { |
| if (doc->errors && doc->errors->position > 0) |
| fprintf(stderr, "%s", doc->errors->buffer); |
| XRAISE(MAPRED_PARSE_ERROR, |
| "Unable to make forward progress creating objects\n"); |
| } |
| |
| } while (!done); |
| |
| /* objects created, execute queries */ |
| mapred_run_queries(conn, doc); |
| } |
| XCATCH(MAPRED_SQL_ERROR) |
| { |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| fprintf(stderr, "%s", PQerrorMessage(conn)); |
| XRERAISE(); |
| } |
| XFINALLY |
| { |
| /* Remove all the objects that we created */ |
| if (global_print_flag || global_debug_flag) |
| printf("\n"); |
| for (olist = doc->objects; olist; olist = olist->next) |
| mapred_remove_object(conn, doc, olist->object); |
| if (global_print_flag || global_debug_flag) |
| printf("\n"); |
| |
| /* |
| * We always commit the transaction, even on failure since the failure |
| * may have occured after we generated some output tables and we want |
| * to keep the partial results. |
| */ |
| result = PQexec(conn, "COMMIT TRANSACTION"); |
| if (PQresultStatus(result) != PGRES_COMMAND_OK) |
| { |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| fprintf(stderr, "%s", PQerrorMessage(conn)); |
| } |
| } |
| XTRY_END; |
| } |
| |
| |
| void mapred_resolve_dependencies(PGconn *conn, mapred_document_t *doc) |
| { |
| mapred_olist_t *olist; |
| int exec_count = 0; |
| |
| /* Walk the list of objects */ |
| for (olist = doc->objects; olist; olist = olist->next) |
| mapred_resolve_object(conn, doc, olist->object, &exec_count); |
| |
| } |
| |
| void mapred_resolve_object(PGconn *conn, mapred_document_t *doc, |
| mapred_object_t *obj, int *exec_count) |
| { |
| mapred_olist_t *newlist; |
| mapred_object_t *sub; /* sub-object */ |
| size_t len; |
| |
| switch (obj->kind) |
| { |
| /* Objects with no dependencies */ |
| case MAPRED_OUTPUT: |
| case MAPRED_ADT: |
| break; |
| |
| case MAPRED_INPUT: |
| /* |
| * For FILE/GPFDIST/EXEC inputs we will create a name-prefixed |
| * version of the object to prevent name collisions, and then |
| * create a second temporary view over the external table to |
| * support access to the input by "name". This involves creating |
| * a second copy of the input which we place directly after the |
| * original input in the document object list. |
| */ |
| if (obj->u.input.type == MAPRED_INPUT_GPFDIST || |
| obj->u.input.type == MAPRED_INPUT_FILE || |
| obj->u.input.type == MAPRED_INPUT_EXEC) |
| { |
| mapred_object_t *newinput; |
| mapred_olist_t *parent; |
| |
| newinput = mapred_malloc(sizeof(mapred_object_t)); |
| memset(newinput, 0, sizeof(mapred_object_t)); |
| newinput->kind = MAPRED_INPUT; |
| len = strlen(obj->name) + 1; |
| newinput->name = mapred_malloc(len); |
| sprintf(newinput->name, "%s", obj->name); |
| newinput->u.input.type = MAPRED_INPUT_QUERY; |
| len = strlen(doc->prefix) + strlen(obj->name) + 16; |
| newinput->u.input.desc = mapred_malloc(len); |
| snprintf(newinput->u.input.desc, len, |
| "select * from %s%s", |
| doc->prefix, obj->name); |
| |
| /* |
| * Find parent input in the doclist and add the new object |
| * immediately after it. |
| */ |
| for (parent = doc->objects; |
| parent && parent->object != obj; |
| parent = parent->next); |
| XASSERT(parent); |
| newlist = mapred_malloc(sizeof(mapred_olist_t)); |
| newlist->object = newinput; |
| newlist->next = parent->next; |
| parent->next = newlist; |
| } |
| break; |
| |
| |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| |
| |
| /* |
| * If the function is an internal function then we try to resolve |
| * the function by looking it up in the catalog. |
| */ |
| obj->u.function.internal_returns = NULL; |
| obj->internal = false; |
| |
| if (!obj->u.function.language) |
| { |
| obj->internal = true; |
| lookup_function_in_catalog(conn, doc, obj); |
| } |
| /* ??? */ |
| else if (!obj->u.function.returns) |
| { |
| XASSERT(false); |
| } |
| |
| /* |
| * The function types may manufacture a dependency on an adt, |
| * but have no other dependencies. |
| */ |
| else if (obj->u.function.returns->next) |
| { |
| sub = mapred_malloc(sizeof(mapred_object_t)); |
| memset(sub, 0, sizeof(mapred_object_t)); |
| sub->kind = MAPRED_ADT; |
| len = strlen(doc->prefix) + strlen(obj->name) + 7; |
| sub->name = mapred_malloc(len); |
| snprintf(sub->name, len, "%s%s_rtype", |
| doc->prefix, obj->name); |
| sub->u.adt.returns = obj->u.function.returns; |
| |
| obj->u.function.rtype.name = sub->name; |
| obj->u.function.rtype.object = sub; |
| |
| /* Add the ADT to the list of document objects */ |
| newlist = mapred_malloc(sizeof(mapred_olist_t)); |
| newlist->object = sub; |
| newlist->next = doc->objects; |
| doc->objects = newlist; |
| |
| /* And resolve the sub-object */ |
| mapred_resolve_object(conn, doc, sub, exec_count); |
| } |
| else |
| { |
| obj->u.function.rtype.name = obj->u.function.returns->type; |
| obj->u.function.rtype.object = NULL; |
| } |
| break; |
| |
| |
| case MAPRED_REDUCER: |
| { |
| /* |
| * If we have a function, but no object then we assume that it is |
| * a database function. Create a dummy object to handle this case. |
| */ |
| mapred_resolve_ref(doc->objects, &obj->u.reducer.transition); |
| if (obj->u.reducer.transition.name && |
| !obj->u.reducer.transition.object) |
| { |
| len = strlen(obj->u.reducer.transition.name) + 1; |
| sub = mapred_malloc(sizeof(mapred_object_t)); |
| memset(sub, 0, sizeof(mapred_object_t)); |
| sub->kind = MAPRED_TRANSITION; |
| sub->name = mapred_malloc(len); |
| sub->line = obj->line; |
| strncpy(sub->name, obj->u.reducer.transition.name, len); |
| |
| newlist = mapred_malloc(sizeof(mapred_olist_t)); |
| newlist->object = sub; |
| newlist->next = doc->objects; |
| doc->objects = newlist; |
| obj->u.reducer.transition.object = sub; |
| |
| /* And resolve the sub-object */ |
| mapred_resolve_object(conn, doc, sub, exec_count); |
| } |
| mapred_resolve_ref(doc->objects, &obj->u.reducer.combiner); |
| if (obj->u.reducer.combiner.name && |
| !obj->u.reducer.combiner.object) |
| { |
| len = strlen(obj->u.reducer.combiner.name) + 1; |
| sub = mapred_malloc(sizeof(mapred_object_t)); |
| memset(sub, 0, sizeof(mapred_object_t)); |
| sub->kind = MAPRED_COMBINER; |
| sub->name = mapred_malloc(len); |
| sub->line = obj->line; |
| strncpy(sub->name, obj->u.reducer.combiner.name, len); |
| |
| newlist = mapred_malloc(sizeof(mapred_olist_t)); |
| newlist->object = sub; |
| newlist->next = doc->objects; |
| doc->objects = newlist; |
| obj->u.reducer.combiner.object = sub; |
| |
| /* And resolve the sub-object */ |
| mapred_resolve_object(conn, doc, sub, exec_count); |
| } |
| mapred_resolve_ref(doc->objects, &obj->u.reducer.finalizer); |
| if (obj->u.reducer.finalizer.name && |
| !obj->u.reducer.finalizer.object) |
| { |
| len = strlen(obj->u.reducer.finalizer.name) + 1; |
| sub = mapred_malloc(sizeof(mapred_object_t)); |
| memset(sub, 0, sizeof(mapred_object_t)); |
| sub->kind = MAPRED_FINALIZER; |
| sub->name = mapred_malloc(len); |
| sub->line = obj->line; |
| strncpy(sub->name, obj->u.reducer.finalizer.name, len); |
| |
| newlist = mapred_malloc(sizeof(mapred_olist_t)); |
| newlist->object = sub; |
| newlist->next = doc->objects; |
| doc->objects = newlist; |
| obj->u.reducer.finalizer.object = sub; |
| |
| /* And resolve the sub-object */ |
| mapred_resolve_object(conn, doc, sub, exec_count); |
| } |
| |
| break; |
| } |
| |
| |
| case MAPRED_TASK: |
| case MAPRED_EXECUTION: |
| { |
| /* |
| * Resolving a task may require recursion to resolve other |
| * tasks to work out parameter lists. We keep track of |
| * our resolution state in order to detect potential |
| * infinite recursion issues. |
| */ |
| if (obj->u.task.flags & mapred_task_resolved) |
| return; |
| |
| /* Assign a name to anonymous executions */ |
| if (!obj->name) |
| { |
| size_t len; |
| |
| XASSERT(obj->u.task.execute); |
| |
| /* 10 characters for max int digits, 4 for "run_" */ |
| len = strlen(doc->prefix) + 16; |
| obj->name = mapred_malloc(len); |
| snprintf(obj->name, len, "%srun_%d", |
| doc->prefix, ++(*exec_count)); |
| } |
| |
| /* Check for infinite recursion */ |
| if (obj->u.task.flags & mapred_task_resolving) |
| { |
| mapred_obj_error(obj, "Infinite recursion detected while " |
| "trying to resove TASK"); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| obj->u.task.flags |= mapred_task_resolving; |
| |
| /* Validate object types */ |
| if (obj->u.task.input.name) |
| { |
| mapred_resolve_ref(doc->objects, &obj->u.task.input); |
| sub = obj->u.task.input.object; |
| |
| /* If we can't find the input, throw an error */ |
| if (!sub) |
| { |
| /* Can't find INPUT object */ |
| mapred_obj_error(obj, "SOURCE '%s' not found in document", |
| obj->u.task.input.name); |
| XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure"); |
| } |
| |
| /* |
| * The input must either be an INPUT or a TASK |
| */ |
| switch (sub->kind) |
| { |
| case MAPRED_INPUT: |
| break; |
| |
| case MAPRED_TASK: |
| /* This objects input is the sub objects output */ |
| mapred_resolve_object(conn, doc, sub, exec_count); |
| break; |
| |
| /* Otherwise generate an error */ |
| default: |
| |
| /* SOURCE wasn't an INPUT */ |
| mapred_obj_error(obj, "SOURCE '%s' is neither an INPUT nor a TASK", |
| obj->u.task.input.name); |
| XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure"); |
| } |
| } |
| |
| if (obj->u.task.mapper.name) |
| { |
| mapred_resolve_ref(doc->objects, &obj->u.task.mapper); |
| sub = obj->u.task.mapper.object; |
| |
| if (!sub) |
| { |
| /* Create an internal map function */ |
| len = strlen(obj->u.task.mapper.name) + 1; |
| sub = mapred_malloc(sizeof(mapred_object_t)); |
| memset(sub, 0, sizeof(mapred_object_t)); |
| sub->kind = MAPRED_MAPPER; |
| sub->name = mapred_malloc(len); |
| sub->line = obj->line; |
| strncpy(sub->name, obj->u.task.mapper.name, len); |
| |
| newlist = mapred_malloc(sizeof(mapred_olist_t)); |
| newlist->object = sub; |
| newlist->next = doc->objects; |
| doc->objects = newlist; |
| obj->u.task.mapper.object = sub; |
| |
| /* And resolve the sub-object */ |
| mapred_resolve_object(conn, doc, sub, exec_count); |
| } |
| else |
| { |
| /* Allow any function type */ |
| switch (sub->kind) |
| { |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| break; |
| |
| default: |
| mapred_obj_error(obj, "MAP '%s' is not a MAP object", |
| obj->u.task.mapper.name); |
| XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure"); |
| } |
| } |
| } |
| |
| if (obj->u.task.reducer.name) |
| { |
| mapred_resolve_ref(doc->objects, &obj->u.task.reducer); |
| sub = obj->u.task.reducer.object; |
| |
| if (!sub) |
| { |
| /* FIXME: non-yaml reducers */ |
| } |
| else if (sub->kind == MAPRED_REDUCER) |
| { /* Validate Reducer */ |
| mapred_resolve_object(conn, doc, sub, exec_count); |
| } |
| else |
| { /* It's an object, but not a REDUCER */ |
| mapred_obj_error(obj, "REDUCE '%s' is not a REDUCE object", |
| obj->u.task.reducer.name); |
| XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure"); |
| } |
| } |
| |
| if (obj->u.task.output.name) |
| { |
| mapred_resolve_ref(doc->objects, &obj->u.task.output); |
| |
| sub = obj->u.task.output.object; |
| if (sub && sub->kind != MAPRED_OUTPUT) |
| { |
| mapred_obj_error(obj, "TARGET '%s' is not an OUTPUT object", |
| obj->u.task.output.name); |
| XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure"); |
| } |
| if (!sub && obj->u.task.output.name) |
| { |
| mapred_obj_error(obj, "TARGET '%s' is not defined in " |
| "document", |
| obj->u.task.output.name); |
| XRAISE(MAPRED_PARSE_ERROR, "Object Resolution Failure"); |
| } |
| } |
| |
| /* clear resolving bit and set resolved bit */ |
| obj->u.task.flags &= !mapred_task_resolving; |
| obj->u.task.flags |= mapred_task_resolved; |
| break; |
| } |
| |
| default: |
| XASSERT(false); |
| } |
| |
| if (global_debug_flag) |
| mapred_obj_debug(obj); |
| } |
| |
| /* |
| * mapred_setup_columns - |
| * setup column lists (input, output, grouping, etc) |
| * |
| * This is usually able to be determined directly from the YAML, |
| * but for some things (defined in the database rather than in |
| * the YAML, eg QUERY INPUTS) we can not determine the columns |
| * until the object has been created. Which can trickle down to |
| * any object that depends on it. |
| * |
| * For this reason we don't setup the columns during the parse phase, |
| * but rather just before or just after we actually create the object |
| * once we know that all the dependencies have already been created. |
| */ |
| void mapred_setup_columns(PGconn *conn, mapred_object_t *obj) |
| { |
| mapred_object_t *sub; |
| PGresult *result; |
| |
| /* switch based on object type */ |
| switch (obj->kind) |
| { |
| case MAPRED_ADT: |
| break; |
| |
| case MAPRED_INPUT: |
| |
| /* |
| * Should be called after creation, otherwise catalog queries |
| * could fail. |
| */ |
| XASSERT(obj->created); |
| |
| /* setup the column list for database defined inputs */ |
| if (obj->u.input.type == MAPRED_INPUT_TABLE || |
| obj->u.input.type == MAPRED_INPUT_QUERY) |
| { |
| /* |
| * This gets the ordered list of columns for the first |
| * input of the given name in the user's search path. |
| */ |
| buffer_t *buffer = makebuffer(1024, 1024); |
| bufcat(&buffer, |
| "SELECT attname, " |
| " pg_catalog.format_type(atttypid, atttypmod)\n" |
| "FROM pg_catalog.pg_attribute\n" |
| "WHERE attnum > 0 AND attrelid = lower('"); |
| if (obj->u.input.type == MAPRED_INPUT_TABLE) |
| bufcat(&buffer, obj->u.input.desc); |
| else |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, |
| "')::regclass\n" |
| "ORDER BY -attnum;\n\n"); |
| |
| if (global_debug_flag) |
| printf("%s", buffer->buffer); |
| |
| result = PQexec(conn, buffer->buffer); |
| mapred_free(buffer); |
| |
| if (PQresultStatus(result) == PGRES_TUPLES_OK && |
| PQntuples(result) > 0) |
| { |
| mapred_plist_t *newitem; |
| int i; |
| |
| /* Destroy any previous default values we setup */ |
| mapred_destroy_plist(&obj->u.input.columns); |
| |
| /* |
| * The columns were sorted reverse order above so |
| * the list can be generated back -> front |
| */ |
| for (i = 0; i < PQntuples(result); i++) |
| { |
| char *name = PQgetvalue(result, i, 0); |
| char *type = PQgetvalue(result, i, 1); |
| |
| size_t name_len = strlen(name) + 1; |
| size_t type_len = strlen(type) + 1; |
| /* Add the column to the list */ |
| newitem = mapred_malloc(sizeof(mapred_plist_t)); |
| newitem->name = mapred_malloc(name_len); |
| strncpy(newitem->name, name, name_len); |
| newitem->type = mapred_malloc(type_len); |
| strncpy(newitem->type, type, type_len); |
| newitem->next = obj->u.input.columns; |
| obj->u.input.columns = newitem; |
| } |
| } |
| else |
| { |
| char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE); |
| char *name; |
| |
| if (obj->u.input.type == MAPRED_INPUT_TABLE) |
| name = obj->u.input.desc; |
| else |
| name = obj->name; |
| |
| if (PQresultStatus(result) == PGRES_TUPLES_OK) |
| { |
| mapred_obj_error(obj, "Table '%s' contains no rows", name); |
| } |
| else if (!strcmp(error, OBJ_DOES_NOT_EXIST) || |
| !strcmp(error, SCHEMA_DOES_NOT_EXIST) ) |
| { |
| mapred_obj_error(obj, "Table '%s' not found", name); |
| } |
| else |
| { |
| mapred_obj_error(obj, "Table '%s' unknown error: %s", name, error); |
| } |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| PQclear(result); |
| } |
| break; |
| |
| |
| case MAPRED_OUTPUT: |
| break; |
| |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| XASSERT(obj->u.function.parameters); |
| XASSERT(obj->u.function.returns); |
| break; |
| |
| |
| case MAPRED_REDUCER: |
| { |
| mapred_object_t *transition = obj->u.reducer.transition.object; |
| |
| XASSERT(transition); |
| XASSERT(transition->u.function.parameters); |
| obj->u.reducer.parameters = |
| transition->u.function.parameters->next; |
| |
| /* |
| * Use the return result of: |
| * 1) The finalizer |
| * 2) The combiner, or |
| * 3) The transition |
| * |
| * in that order, if the return is not derivable then |
| * fall into the default value of a single text column |
| * named "value" |
| */ |
| if (obj->u.reducer.finalizer.name) |
| sub = obj->u.reducer.finalizer.object; |
| else if (obj->u.reducer.combiner.name) |
| sub = obj->u.reducer.combiner.object; |
| else |
| sub = obj->u.reducer.transition.object; |
| |
| if (sub) |
| obj->u.reducer.returns = sub->u.function.returns; |
| |
| if (!obj->u.reducer.returns) |
| { |
| /* |
| * If unable to determine the returns based on the reducer |
| * components (generally due to use of SQL functions) then |
| * use the default of a single text column named "value". |
| */ |
| obj->u.reducer.returns = mapred_malloc(sizeof(mapred_plist_t)); |
| obj->u.reducer.returns->name = "value"; |
| obj->u.reducer.returns->type = "text"; |
| obj->u.reducer.returns->next = NULL; |
| } |
| break; |
| } |
| |
| |
| case MAPRED_TASK: |
| case MAPRED_EXECUTION: |
| { |
| mapred_plist_t *scan; |
| mapred_plist_t *last = NULL; |
| |
| /* |
| * The input must either be an INPUT or a TASK |
| */ |
| sub = obj->u.task.input.object; |
| switch (sub->kind) |
| { |
| case MAPRED_INPUT: |
| obj->u.task.parameters = sub->u.input.columns; |
| break; |
| |
| case MAPRED_TASK: |
| /* union the input tasks returns and grouping */ |
| for (scan = sub->u.task.grouping; |
| scan; |
| scan = scan->next) |
| { |
| if (!last) |
| { |
| obj->u.task.parameters = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = obj->u.task.parameters; |
| } |
| else |
| { |
| last->next = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = last->next; |
| } |
| last->name = scan->name; |
| last->type = scan->type; |
| last->next = NULL; |
| } |
| for (scan = sub->u.task.returns; |
| scan; |
| scan = scan->next) |
| { |
| if (!last) |
| { |
| obj->u.task.parameters = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = obj->u.task.parameters; |
| } |
| else |
| { |
| last->next = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = last->next; |
| } |
| last->name = scan->name; |
| last->type = scan->type; |
| last->next = NULL; |
| } |
| break; |
| |
| default: |
| /* Should have already been validated */ |
| XASSERT(false); |
| } |
| |
| if (obj->u.task.mapper.name) |
| { |
| sub = obj->u.task.mapper.object; |
| if (!sub) |
| { |
| /* FIXME: Lookup function in database */ |
| /* for now... do nothing */ |
| } |
| else |
| { |
| /* Allow any function type */ |
| switch (sub->kind) |
| { |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| break; |
| |
| default: |
| /* Should have already been validated */ |
| XASSERT(false); |
| } |
| } |
| } |
| |
| if (obj->u.task.reducer.name) |
| { |
| mapred_clist_t *keys; |
| mapred_plist_t *source; |
| |
| /* |
| * The grouping columns for a task are the columns produced |
| * by the input/mapper that are not consumed by the reducer. |
| * |
| * A special exception is made for a column named "key" which |
| * is always a grouping column. |
| * |
| * FIXME: deal with non-yaml map functions |
| * |
| * FIXME: deal with KEY specifications |
| */ |
| if (obj->u.task.mapper.object) |
| source = obj->u.task.mapper.object->u.function.returns; |
| else |
| source = obj->u.task.parameters; |
| |
| sub = obj->u.task.reducer.object; |
| if (!sub) |
| { |
| /* |
| * The output of a built in function is defined to be |
| * "value", with an input of "value", everything else |
| * is defined to be a grouping column. |
| */ |
| last = NULL; |
| for (scan = source; scan; scan = scan->next) |
| { |
| if (strcasecmp(scan->name, "value")) |
| { |
| if (!last) |
| { |
| obj->u.task.grouping = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = obj->u.task.grouping; |
| } |
| else |
| { |
| last->next = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = last->next; |
| } |
| last->name = scan->name; |
| last->type = scan->type; |
| last->next = NULL; |
| } |
| } |
| } |
| else |
| { |
| /* Validate Reducer */ |
| XASSERT(sub->kind == MAPRED_REDUCER); |
| |
| /* |
| * source is the set of input columns that the reducer has |
| * to work with. |
| * |
| * Loop the reducer "keys" clause to determine what keys are |
| * present. |
| */ |
| last = NULL; |
| for (keys = sub->u.reducer.keys; keys; keys = keys->next) |
| { |
| /* |
| * If there is a '*' in the keys then it catches all |
| * unreferenced columns. |
| */ |
| if (keys->value[0] == '*' && keys->value[1] == '\0') |
| { |
| /* |
| * Add all sources not found in either parameters, |
| * or explicitly mentioned in keys |
| */ |
| for (scan = source; scan; scan = scan->next) |
| { |
| mapred_plist_t *pscan; |
| mapred_clist_t *kscan; |
| |
| for (pscan = sub->u.reducer.parameters; |
| pscan; |
| pscan = pscan->next) |
| { |
| if (!strcasecmp(scan->name, pscan->name)) |
| break; |
| } |
| if (pscan) |
| continue; /* found in parameters */ |
| for (kscan = sub->u.reducer.keys; |
| kscan; |
| kscan = kscan->next) |
| { |
| if (!strcasecmp(scan->name, kscan->value)) |
| break; |
| } |
| if (kscan) |
| continue; /* found in keys */ |
| |
| /* we have an unmatched source, add to grouping */ |
| if (!last) |
| { |
| obj->u.task.grouping = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = obj->u.task.grouping; |
| } |
| else |
| { |
| last->next = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = last->next; |
| } |
| last->name = scan->name; |
| last->type = scan->type; |
| last->next = NULL; |
| } |
| } |
| else |
| { |
| /* Look for the referenced key in the source list */ |
| for (scan = source; scan; scan = scan->next) |
| if (!strcasecmp(keys->value, scan->name)) |
| { |
| /* we have a match, add the key to grouping */ |
| if (!last) |
| { |
| obj->u.task.grouping = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = obj->u.task.grouping; |
| } |
| else |
| { |
| last->next = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = last->next; |
| } |
| last->name = scan->name; |
| last->type = scan->type; |
| last->next = NULL; |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| /* |
| * If there is a reducer then the "returns" columns are the |
| * output of the reducer, and must be unioned with the grouping |
| * columns for final output. |
| * |
| * If there is no reducer then the returns columns are the |
| * returns columns of the mapper or the input |
| */ |
| if (obj->u.task.reducer.name) |
| { |
| /* |
| * If it is a built in function then we'll just fall into the |
| * default of a single text column named "value". |
| */ |
| sub = obj->u.task.reducer.object; |
| if (sub) |
| obj->u.task.returns = sub->u.reducer.returns; |
| |
| } |
| else if (obj->u.task.mapper.name) |
| { |
| sub = obj->u.task.mapper.object; |
| if (sub) |
| obj->u.task.returns = sub->u.function.returns; |
| } |
| else |
| { |
| obj->u.task.returns = obj->u.task.parameters; |
| } |
| |
| if (!obj->u.task.returns) |
| { |
| /* |
| * If unable to determine the returns based on the reducer |
| * components (generally due to use of SQL functions) then |
| * use the default of a single text column named "value". |
| */ |
| obj->u.task.returns = mapred_malloc(sizeof(mapred_plist_t)); |
| obj->u.task.returns->name = "value"; |
| obj->u.task.returns->type = "text"; |
| obj->u.task.returns->next = NULL; |
| } |
| break; |
| } |
| |
| default: |
| XASSERT(false); |
| } |
| } |
| |
| void mapred_resolve_ref(mapred_olist_t *olist, mapred_reference_t *ref) |
| { |
| XASSERT(ref); |
| if (!ref->name) |
| return; |
| |
| /* Scan the list of objects until we find one with a matching name */ |
| for (; olist; olist = olist->next) |
| { |
| if (olist->object->name && !strcasecmp(ref->name, olist->object->name)) |
| { |
| ref->object = olist->object; |
| return; |
| } |
| } |
| } |
| |
| /* Some basic destructors */ |
| void mapred_destroy_object(mapred_object_t **objh) |
| { |
| mapred_object_t *obj; |
| |
| /* |
| * We are passed a handle to the object, get the actual pointer and point |
| * the handle to NULL so that it is not stale once we free the list below. |
| */ |
| if (!objh || !*objh) |
| return; |
| obj = *objh; |
| *objh = (mapred_object_t *) NULL; |
| |
| /* What fields are valid is dependent on what kind of object it is */ |
| scalarfree(obj->name); |
| switch (obj->kind) |
| { |
| case MAPRED_NO_KIND: |
| break; |
| |
| case MAPRED_DOCUMENT: |
| scalarfree(obj->u.document.version); |
| scalarfree(obj->u.document.database); |
| scalarfree(obj->u.document.user); |
| scalarfree(obj->u.document.host); |
| mapred_destroy_olist(&obj->u.document.objects); |
| mapred_destroy_olist(&obj->u.document.execute); |
| break; |
| |
| case MAPRED_INPUT: |
| scalarfree(obj->u.input.desc); |
| scalarfree(obj->u.input.delimiter); |
| scalarfree(obj->u.input.encoding); |
| mapred_destroy_clist(&obj->u.input.files); |
| mapred_destroy_plist(&obj->u.input.columns); |
| break; |
| |
| case MAPRED_OUTPUT: |
| scalarfree(obj->u.output.desc); |
| break; |
| |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| scalarfree(obj->u.function.body); |
| scalarfree(obj->u.function.language); |
| mapred_destroy_plist(&obj->u.function.parameters); |
| |
| if( obj->internal && |
| obj->u.function.internal_returns != obj->u.function.returns ) |
| mapred_destroy_plist(&obj->u.function.internal_returns); |
| |
| mapred_destroy_plist(&obj->u.function.returns); |
| break; |
| |
| case MAPRED_REDUCER: |
| scalarfree(obj->u.reducer.transition.name); |
| scalarfree(obj->u.reducer.combiner.name); |
| scalarfree(obj->u.reducer.finalizer.name); |
| scalarfree(obj->u.reducer.initialize); |
| break; |
| |
| case MAPRED_TASK: |
| case MAPRED_EXECUTION: |
| scalarfree(obj->u.task.input.name); |
| scalarfree(obj->u.task.mapper.name); |
| scalarfree(obj->u.task.reducer.name); |
| scalarfree(obj->u.task.output.name); |
| break; |
| |
| /* |
| * ADT just borrowed the parameter list from the owning function, |
| * so it has nothing else to delete. |
| */ |
| case MAPRED_ADT: |
| break; |
| |
| default: |
| XASSERT(false); |
| } |
| } |
| |
| void mapred_destroy_olist(mapred_olist_t **olisth) |
| { |
| mapred_olist_t *olist; |
| mapred_olist_t *next; |
| |
| /* |
| * We are passed a handle to the olist, get the actual pointer and point |
| * the handle to NULL so that it is not stale once we free the list below. |
| */ |
| if (!olisth || !*olisth) |
| return; |
| olist = *olisth; |
| *olisth = (mapred_olist_t *) NULL; |
| |
| /* Walk the list destroying each item as we come to it. */ |
| while (olist) |
| { |
| mapred_destroy_object(&olist->object); |
| next = olist->next; |
| mapred_free(olist); |
| olist = next; |
| } |
| } |
| |
| |
| void mapred_destroy_clist(mapred_clist_t **clisth) |
| { |
| mapred_clist_t *clist; |
| mapred_clist_t *next; |
| |
| /* |
| * We are passed a handle to the olist, get the actual pointer and point |
| * the handle to NULL so that it is not stale once we free the list below. |
| */ |
| if (!clisth || !*clisth) |
| return; |
| clist = *clisth; |
| *clisth = (mapred_clist_t *) NULL; |
| |
| /* Walk the list destroying each item as we come to it. */ |
| while (clist) |
| { |
| scalarfree(clist->value); |
| next = clist->next; |
| mapred_free(clist); |
| clist = next; |
| } |
| } |
| |
| void mapred_destroy_plist(mapred_plist_t **plisth) |
| { |
| mapred_plist_t *plist; |
| mapred_plist_t *next; |
| |
| /* |
| * We are passed a handle to the olist, get the actual pointer and point |
| * the handle to NULL so that it is not stale once we free the list below. |
| */ |
| if (!plisth || !*plisth) |
| return; |
| plist = *plisth; |
| *plisth = (mapred_plist_t *) NULL; |
| |
| /* Walk the list destroying each item as we come to it. */ |
| while (plist) |
| { |
| scalarfree(plist->name); |
| scalarfree(plist->type); |
| next = plist->next; |
| mapred_free(plist); |
| plist = next; |
| } |
| } |
| |
| |
| |
| |
| /* -------------------------------------------------------------------------- */ |
| /* Functions that get things done */ |
| /* -------------------------------------------------------------------------- */ |
| void mapred_run_queries(PGconn *conn, mapred_document_t *doc) |
| { |
| mapred_olist_t *olist; |
| mapred_plist_t *columns; |
| mapred_object_t *output; |
| PGresult *result = NULL; |
| FILE *outfile = stdout; |
| buffer_t *buffer = NULL; |
| |
| XTRY |
| { |
| /* allocates 512 bytes, extending by 512 bytes if we run out. */ |
| buffer = makebuffer(512, 512); |
| |
| /* Loop through all objects */ |
| for (olist = doc->objects; olist; olist = olist->next) |
| { |
| if (olist->object->kind == MAPRED_EXECUTION) |
| { |
| boolean exists = false; |
| |
| XASSERT(olist->object->name); |
| |
| /* Reset the buffer from any previous executions */ |
| bufreset(buffer); |
| |
| output = olist->object->u.task.output.object; |
| |
| /* |
| * [CREATE TABLE <name> AS ] |
| * SELECT * FROM <name> |
| * ORDER BY <column-list> |
| */ |
| if (output && output->u.output.type == MAPRED_OUTPUT_TABLE) |
| { |
| /* does the table already exist? */ |
| bufcat(&buffer, |
| "SELECT n.nspname \n" |
| "FROM pg_catalog.pg_class c JOIN \n" |
| " pg_catalog.pg_namespace n on \n" |
| " (c.relnamespace = n.oid) \n" |
| "WHERE n.nspname = ANY(current_schemas(true)) \n" |
| " AND c.relname = lower('"); |
| bufcat(&buffer, output->u.output.desc); |
| bufcat(&buffer, "')"); |
| result = PQexec(conn, buffer->buffer); |
| if (PQresultStatus(result) == PGRES_TUPLES_OK && |
| PQntuples(result) > 0) |
| exists = true; |
| bufreset(buffer); |
| |
| if (exists && output->u.output.mode == MAPRED_OUTPUT_MODE_REPLACE) |
| { |
| bufcat(&buffer, "DROP TABLE "); |
| bufcat(&buffer, output->u.output.desc); |
| PQexec(conn, "SAVEPOINT mapreduce_save"); |
| result = PQexec(conn, buffer->buffer); |
| if (PQresultStatus(result) == PGRES_COMMAND_OK) |
| { |
| PQexec(conn, "RELEASE SAVEPOINT mapreduce_save"); |
| } |
| else |
| { |
| /* rollback to savepoint */ |
| PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save"); |
| PQexec(conn, "RELEASE SAVEPOINT mapreduce_save"); |
| |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| fprintf(stderr, "Error: %s\n", |
| PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY)); |
| mapred_obj_error(output, "Table '%s' can't be replaced", |
| output->u.output.desc); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| bufreset(buffer); |
| exists = false; |
| } |
| |
| /* Handle Explain for OUTPUT TABLE */ |
| if (global_explain_flag & global_analyze) |
| bufcat(&buffer, "EXPLAIN ANALYZE "); |
| else if (global_explain_flag) |
| bufcat(&buffer, "EXPLAIN "); |
| |
| if (!exists) |
| { |
| bufcat(&buffer, "CREATE TABLE "); |
| bufcat(&buffer, output->u.output.desc); |
| bufcat(&buffer, " AS "); |
| } |
| else if (output->u.output.mode == MAPRED_OUTPUT_MODE_APPEND) |
| { |
| bufcat(&buffer, "INSERT INTO "); |
| bufcat(&buffer, output->u.output.desc); |
| bufcat(&buffer, " ("); |
| } |
| else |
| { |
| /* exists, mode is neither replace or append => error */ |
| mapred_obj_error(output, "Table '%s' already exists", |
| output->u.output.desc); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| } |
| /* Handle Explain for non-table output */ |
| else if (global_explain_flag & global_analyze) |
| { |
| bufcat(&buffer, "EXPLAIN ANALYZE "); |
| } |
| else if (global_explain_flag) |
| { |
| bufcat(&buffer, "EXPLAIN "); |
| } |
| |
| bufcat(&buffer, "SELECT * FROM "); |
| bufcat(&buffer, olist->object->name); |
| |
| /* |
| * add the DISTRIBUTED BY clause for output tables |
| * OR, the ORDER BY clause for other output formats |
| */ |
| if (output && output->u.output.type == MAPRED_OUTPUT_TABLE) |
| { |
| /* |
| * If there are no key columns then leave off the |
| * distributed by clause and let the server choose. |
| */ |
| if (exists) |
| bufcat(&buffer, ")"); |
| |
| else if (olist->object->u.task.grouping) |
| { |
| bufcat(&buffer, " DISTRIBUTED BY ("); |
| columns = olist->object->u.task.grouping; |
| while (columns) |
| { |
| bufcat(&buffer, columns->name); |
| if (columns->next) |
| bufcat(&buffer, ", "); |
| columns = columns->next; |
| } |
| bufcat(&buffer, ")"); |
| } |
| else |
| { |
| /* |
| * don't have any hints for what the distribution keys |
| * should be, so we do nothing and let the database |
| * decide |
| */ |
| } |
| } |
| else |
| { |
| if (olist->object->u.task.returns || |
| olist->object->u.task.grouping) |
| { |
| bufcat(&buffer, " ORDER BY "); |
| columns = olist->object->u.task.grouping; |
| while (columns) |
| { |
| bufcat(&buffer, columns->name); |
| if (columns->next || olist->object->u.task.returns) |
| bufcat(&buffer, ", "); |
| columns = columns->next; |
| } |
| columns = olist->object->u.task.returns; |
| while (columns) |
| { |
| bufcat(&buffer, columns->name); |
| if (columns->next) |
| bufcat(&buffer, ", "); |
| columns = columns->next; |
| } |
| } |
| } |
| bufcat(&buffer, ";\n"); |
| |
| /* Tell the user what job we are running */ |
| if (global_verbose_flag) |
| fprintf(stderr, " - RUN: "); |
| if (global_print_flag || global_debug_flag) |
| fprintf(stderr, "%s", buffer->buffer); |
| else |
| fprintf(stderr, "%s\n", olist->object->name); |
| |
| /* But we only execute it if we are not in "print-only" mode */ |
| if (!global_print_flag) |
| { |
| /* If we have an output file, open it for write now */ |
| if (output && output->u.output.type == MAPRED_OUTPUT_FILE) |
| { |
| switch (output->u.output.mode) |
| { |
| case MAPRED_OUTPUT_MODE_NONE: |
| /* check if the file exists */ |
| if (access(output->u.output.desc, F_OK) == 0) |
| { |
| mapred_obj_error(output, "file '%s' already exists", |
| output->u.output.desc); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| /* Fallthrough */ |
| |
| case MAPRED_OUTPUT_MODE_REPLACE: |
| outfile = fopen(output->u.output.desc, "wb"); |
| break; |
| |
| case MAPRED_OUTPUT_MODE_APPEND: |
| outfile = fopen(output->u.output.desc, "ab"); |
| break; |
| |
| default: |
| XASSERT(false); |
| } |
| |
| if (!outfile) |
| { |
| mapred_obj_error(output, "could not open file '%s' for write", |
| output->u.output.desc); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Failure"); |
| } |
| } |
| else |
| { |
| outfile = stdout; |
| } |
| |
| /* |
| * Enable notices for user queries since they may contain |
| * debugging info. |
| */ |
| PQsetNoticeReceiver(conn, print_notice_handler, NULL); |
| result = PQexec(conn, buffer->buffer); |
| PQsetNoticeReceiver(conn, ignore_notice_handler, NULL); |
| switch (PQresultStatus(result)) |
| { |
| /* Output is STDOUT or FILE */ |
| case PGRES_TUPLES_OK: |
| { |
| PQprintOpt options; |
| memset(&options, 0, sizeof(options)); |
| |
| /* |
| * Formatting: |
| * STDOUT = fancy formatting |
| * FILE = plain formatting |
| */ |
| if (outfile == stdout) |
| { |
| options.header = true; |
| options.align = true; |
| options.fieldSep = "|"; |
| } |
| else if (output->u.output.delimiter) |
| { |
| options.fieldSep = output->u.output.delimiter; |
| } |
| else |
| { |
| /* "\t" is our default delimiter */ |
| options.fieldSep = "\t"; |
| } |
| |
| PQprint(outfile, result, &options); |
| break; |
| } |
| |
| /* OUTPUT is a table */ |
| case PGRES_COMMAND_OK: |
| fprintf(stderr, "DONE\n"); |
| break; |
| |
| /* An error of some kind */ |
| default: |
| XRAISE(MAPRED_SQL_ERROR, "Execution Failure"); |
| } |
| PQclear(result); |
| result = NULL; |
| |
| if (NULL != outfile && outfile != stdout) |
| { |
| fclose(outfile); |
| outfile = stdout; |
| } |
| } |
| } |
| } |
| } |
| XFINALLY |
| { |
| if (result) |
| PQclear(result); |
| |
| if (NULL != outfile && outfile != stdout) |
| { |
| fclose(outfile); |
| outfile = stdout; |
| } |
| |
| if (buffer) |
| mapred_free(buffer); |
| } |
| XTRY_END; |
| } |
| |
| |
| boolean mapred_create_object(PGconn *conn, mapred_document_t *doc, |
| mapred_object_t *obj) |
| { |
| mapred_clist_t *clist = NULL; |
| mapred_plist_t *plist = NULL; |
| mapred_plist_t *plist2 = NULL; |
| const char *ckind = NULL; |
| buffer_t *buffer = NULL; |
| buffer_t *qbuffer = NULL; |
| PGresult *result = NULL; |
| |
| /* If the object was created in a prior pass, then do nothing */ |
| if (obj->created) |
| return true; |
| |
| /* Otherwise attempt to create the object */ |
| XTRY |
| { |
| /* allocates 1024 bytes, extending by 1024 bytes if we run out */ |
| buffer = makebuffer(1024, 1024); |
| |
| switch (obj->kind) |
| { |
| case MAPRED_INPUT: |
| XASSERT(obj->name); |
| |
| switch (obj->u.input.type) |
| { |
| case MAPRED_INPUT_TABLE: |
| /* Nothing to actually create */ |
| obj->created = true; |
| break; |
| |
| case MAPRED_INPUT_FILE: |
| case MAPRED_INPUT_GPFDIST: |
| XASSERT(obj->u.input.files); |
| |
| /* Allocate and produce buffer */ |
| bufcat(&buffer, "CREATE EXTERNAL TABLE "); |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, "("); |
| for (plist = obj->u.input.columns; |
| plist; |
| plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, " "); |
| bufcat(&buffer, plist->type); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, ")\n"); |
| bufcat(&buffer, " LOCATION("); |
| for (clist = obj->u.input.files; |
| clist; |
| clist = clist->next) |
| { |
| char *domain_port, *path, *p = NULL; |
| |
| if (obj->u.input.type == MAPRED_INPUT_GPFDIST) |
| bufcat(&buffer, "'gpfdist://"); |
| else |
| bufcat(&buffer, "'file://"); |
| |
| /* |
| * The general syntax of a URL is scheme://domain:port/path?query_string#fragment_id |
| * clist->value should contain just domain:port/path?query_string#fragment_id |
| */ |
| p = strchr(clist->value, '/'); |
| if (p == NULL) |
| { |
| mapred_obj_error(obj, "Failed to find '/' indicating start of path (%s)", |
| clist->value); |
| XRAISE(MAPRED_PARSE_ERROR, |
| "Invalid INPUT source specification"); |
| } |
| if (p == clist->value) |
| { |
| mapred_obj_error(obj, "Missing domain and port before '/' indicating start of path (%s)", |
| clist->value); |
| XRAISE(MAPRED_PARSE_ERROR, |
| "Invalid INPUT source specification"); |
| } |
| domain_port = clist->value; |
| path = p+1; |
| |
| /* |
| * Overwrite the / separating the domain:port from the path |
| * with a nul and move back one byte to check for a trailing ':'. |
| * We put the / back in when copying into the destination buffer. |
| */ |
| *p-- = '\0'; |
| |
| if (strlen(path) < 1) |
| { |
| mapred_obj_error(obj, "Missing path after '/' (%s)", |
| clist->value); |
| XRAISE(MAPRED_PARSE_ERROR, |
| "Invalid INPUT source specification"); |
| } |
| |
| /* |
| * We allow a trailing ':' (e.g. host:/filepath) |
| * but we must not copy it into the external table url. |
| */ |
| if (*p == ':') |
| *p = '\0'; |
| |
| if (strlen(domain_port) < 1) |
| { |
| mapred_obj_error(obj, "Missing host before '/' (%s)", |
| clist->value); |
| XRAISE(MAPRED_PARSE_ERROR, |
| "Invalid INPUT source specification"); |
| } |
| |
| bufcat(&buffer, domain_port); |
| bufcat(&buffer, "/"); |
| bufcat(&buffer, path); |
| if (clist->next) |
| bufcat(&buffer, "',\n "); |
| } |
| bufcat(&buffer, "')\n"); |
| if (obj->u.input.format == MAPRED_FORMAT_CSV) |
| bufcat(&buffer, " FORMAT 'CSV'"); |
| else |
| bufcat(&buffer, " FORMAT 'TEXT'"); |
| if (obj->u.input.delimiter || |
| obj->u.input.escape || |
| obj->u.input.quote || |
| obj->u.input.null) |
| { |
| bufcat(&buffer, " ( "); |
| if (obj->u.input.delimiter) |
| { |
| bufcat(&buffer, "DELIMITER '"); |
| bufcat(&buffer, obj->u.input.delimiter); |
| bufcat(&buffer, "' "); |
| } |
| if (obj->u.input.escape) |
| { |
| bufcat(&buffer, "ESCAPE '"); |
| bufcat(&buffer, obj->u.input.escape); |
| bufcat(&buffer, "' "); |
| } |
| if (obj->u.input.quote) |
| { |
| bufcat(&buffer, "QUOTE '"); |
| bufcat(&buffer, obj->u.input.quote); |
| bufcat(&buffer, "' "); |
| } |
| if (obj->u.input.null) |
| { |
| bufcat(&buffer, "NULL '"); |
| bufcat(&buffer, obj->u.input.null); |
| bufcat(&buffer, "' "); |
| } |
| bufcat(&buffer, ")"); |
| } |
| |
| if (obj->u.input.error_limit > 0) |
| { |
| char intbuf[11]; |
| snprintf(intbuf, 11, "%d", |
| obj->u.input.error_limit); |
| |
| bufcat(&buffer, "\n SEGMENT REJECT LIMIT "); |
| bufcat(&buffer, intbuf); |
| } |
| |
| bufcat(&buffer, ";\n\n"); |
| break; |
| |
| case MAPRED_INPUT_EXEC: |
| XASSERT(obj->u.input.desc); |
| |
| bufcat(&buffer, "CREATE EXTERNAL WEB TABLE "); |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, "("); |
| for (plist = obj->u.input.columns; |
| plist; |
| plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, " "); |
| bufcat(&buffer, plist->type); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, ")\n"); |
| bufcat(&buffer, "EXECUTE '"); |
| bufcat(&buffer, obj->u.input.desc); |
| bufcat(&buffer, "'\n"); |
| if (obj->u.input.format == MAPRED_FORMAT_CSV) |
| bufcat(&buffer, " FORMAT 'CSV'"); |
| else |
| bufcat(&buffer, " FORMAT 'TEXT'"); |
| if (obj->u.input.delimiter || |
| obj->u.input.quote || |
| obj->u.input.null) |
| { |
| bufcat(&buffer, " ( "); |
| if (obj->u.input.delimiter) |
| { |
| bufcat(&buffer, "DELIMITER '"); |
| bufcat(&buffer, obj->u.input.delimiter); |
| bufcat(&buffer, "' "); |
| } |
| if (obj->u.input.quote) |
| { |
| bufcat(&buffer, "QUOTE '"); |
| bufcat(&buffer, obj->u.input.quote); |
| bufcat(&buffer, "' "); |
| } |
| if (obj->u.input.null) |
| { |
| bufcat(&buffer, "NULL '"); |
| bufcat(&buffer, obj->u.input.null); |
| bufcat(&buffer, "' "); |
| } |
| bufcat(&buffer, ")"); |
| } |
| |
| if (obj->u.input.error_limit > 0) |
| { |
| char intbuf[11]; |
| snprintf(intbuf, 11, "%d", |
| obj->u.input.error_limit); |
| |
| bufcat(&buffer, "\n SEGMENT REJECT LIMIT "); |
| bufcat(&buffer, intbuf); |
| } |
| bufcat(&buffer, ";\n\n"); |
| break; |
| |
| case MAPRED_INPUT_QUERY: |
| XASSERT(obj->u.input.desc); |
| |
| /* |
| * CREATE TEMPORARY VIEW <name> AS |
| * <desc>; |
| */ |
| bufcat(&buffer, "CREATE TEMPORARY VIEW "); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " AS\n"); |
| bufcat(&buffer, obj->u.input.desc); |
| bufcat(&buffer, ";\n\n"); |
| break; |
| |
| case MAPRED_INPUT_NONE: |
| default: |
| XASSERT(false); |
| } |
| if (global_print_flag || global_debug_flag) |
| printf("-- INPUT %s\n", obj->name); |
| break; |
| |
| case MAPRED_OUTPUT: |
| /* |
| * Outputs have no backend objects created directly. |
| * For output tables we may issue a create table as |
| * select, but that occurs at run-time. |
| */ |
| obj->created = true; |
| mapred_setup_columns(conn, obj); |
| break; |
| |
| /* |
| * The function types have different defaults and generate |
| * slightly different error messages, but basically do the |
| * same thing. |
| */ |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| ckind = mapred_kind_name[obj->kind]; |
| |
| XASSERT(obj->name); |
| |
| /* |
| * 'kind' specific initialization accomplished above, now handle |
| * the generic function creation. |
| */ |
| if (global_print_flag || global_debug_flag) |
| printf("-- %s %s\n", ckind, obj->name); |
| |
| /* |
| * Nothing to do if we already looked up the function in the |
| * catalog. |
| */ |
| if (obj->internal) |
| { |
| obj->created = true; |
| break; |
| } |
| |
| /* Non-internal functions should have these defined */ |
| XASSERT(obj->u.function.body); |
| XASSERT(obj->u.function.language); |
| |
| mapred_setup_columns(conn, obj); |
| XASSERT(obj->u.function.parameters); |
| XASSERT(obj->u.function.rtype.name); |
| XASSERT(NULL == obj->u.function.internal_returns); |
| |
| /* |
| * fill in the buffer: |
| * |
| * CREATE FUNCTION <name>(<parameters>) |
| * RETURNS [SETOF] <rtype> LANGUAGE <lang> AS |
| * $$ |
| * <body> |
| * $$ [STRICT] [IMMUTABLE]; |
| * |
| */ |
| bufcat(&buffer, "CREATE FUNCTION "); |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, "("); |
| |
| /* Handle parameter list */ |
| for (plist = obj->u.function.parameters; |
| plist; |
| plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, " "); |
| bufcat(&buffer, plist->type); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| |
| /* Handle Return clause */ |
| bufcat(&buffer, ")\nRETURNS "); |
| if (obj->u.function.mode == MAPRED_MODE_MULTI) |
| bufcat(&buffer, "SETOF "); |
| bufcat(&buffer, obj->u.function.rtype.name); |
| |
| /* |
| * Handle LANGUAGE clause, every langauge but 'C' and 'SQL' |
| * has 'pl' prefixing it |
| */ |
| if (!strcasecmp("C", obj->u.function.language) || |
| !strcasecmp("SQL", obj->u.function.language) || |
| !strncasecmp("PL", obj->u.function.language, 2)) |
| { |
| bufcat(&buffer, " LANGUAGE "); |
| bufcat(&buffer, obj->u.function.language); |
| } |
| else |
| { |
| bufcat(&buffer, " LANGUAGE pl"); |
| bufcat(&buffer, obj->u.function.language); |
| } |
| |
| /* python only has an untrusted form */ |
| if (!strcasecmp("python", obj->u.function.language)) |
| bufcat(&buffer, "u"); |
| |
| |
| bufcat(&buffer, " AS "); |
| |
| /* |
| * Handle procedural language specific formatting for the |
| * function definition. |
| * |
| * C language functions are defined using the two parameter |
| * form: AS "library", "function". |
| * |
| * Perl functions append the yaml file line number via a |
| * #line declaration. |
| * |
| * Python functions try to append the yaml file line number |
| * by inserting a bunch of newlines. (only works for runtime |
| * errors, not compiletime errors). |
| */ |
| if (!strcasecmp("C", obj->u.function.language)) |
| { |
| bufcat(&buffer, "$$"); |
| bufcat(&buffer, obj->u.function.library); |
| bufcat(&buffer, "$$, $$"); |
| bufcat(&buffer, obj->u.function.body); |
| bufcat(&buffer, "$$"); |
| } |
| else if (!strncasecmp("plperl", obj->u.function.language, 6) || |
| !strncasecmp("perl", obj->u.function.language, 4)) |
| { |
| char lineno[10]; |
| snprintf(lineno, sizeof(lineno), "%d", obj->u.function.lineno); |
| bufcat(&buffer, "$$\n#line "); |
| bufcat(&buffer, lineno); |
| bufcat(&buffer, "\n"); |
| bufcat(&buffer, obj->u.function.body); |
| if (buffer->buffer[buffer->position-1] != '\n') |
| bufcat(&buffer, "\n"); |
| bufcat(&buffer, "$$"); |
| } |
| else if (!strncasecmp("plpython", obj->u.function.language, 8) || |
| !strncasecmp("python", obj->u.function.language, 6)) |
| { |
| /* |
| * Python very stubborn about not letting you manually |
| * adjust line number. So instead we take the stupid route |
| * and just insert N newlines. |
| */ |
| int i; |
| bufcat(&buffer, "$$\n"); |
| for (i = 1; i < obj->u.function.lineno-2; i++) |
| bufcat(&buffer, "\n"); |
| bufcat(&buffer, obj->u.function.body); |
| if (buffer->buffer[buffer->position-1] != '\n') |
| bufcat(&buffer, "\n"); |
| bufcat(&buffer, "$$"); |
| } |
| else |
| { |
| /* Some generic other language, take our best guess */ |
| bufcat(&buffer, "$$"); |
| bufcat(&buffer, obj->u.function.body); |
| bufcat(&buffer, "$$"); |
| } |
| |
| /* Handle options */ |
| if (obj->u.function.flags & mapred_function_strict) |
| bufcat(&buffer, " STRICT"); |
| if (obj->u.function.flags & mapred_function_immutable) |
| bufcat(&buffer, " IMMUTABLE"); |
| |
| /* All done */ |
| bufcat(&buffer, ";\n\n"); |
| break; |
| |
| case MAPRED_REDUCER: |
| { |
| mapred_object_t *transition = obj->u.reducer.transition.object; |
| mapred_object_t *combiner = obj->u.reducer.combiner.object; |
| mapred_object_t *finalizer = obj->u.reducer.finalizer.object; |
| char *state; |
| |
| XASSERT(obj->name); |
| XASSERT(transition); |
| XASSERT(transition->name); |
| |
| /* |
| * If the reducer depends on an object that hasn't been created |
| * then return false, it will be resolved during a second pass |
| */ |
| if ((transition && !transition->created) || |
| (combiner && !combiner->created) || |
| (finalizer && !finalizer->created)) |
| { |
| if (global_print_flag && global_debug_flag) |
| printf("-- deferring REDUCE %s\n", obj->name); |
| break; |
| } |
| if (global_print_flag || global_debug_flag) |
| printf("-- REDUCE %s\n", obj->name); |
| |
| /* Now, set things up to create the thing */ |
| mapred_setup_columns(conn, obj); |
| plist = transition->u.function.parameters; |
| XASSERT(plist); /* state */ |
| XASSERT(plist->next); /* parameters */ |
| |
| if (obj->u.reducer.ordering) |
| bufcat(&buffer, "CREATE ORDERED AGGREGATE "); |
| else |
| bufcat(&buffer, "CREATE AGGREGATE "); |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " ("); |
| |
| /* |
| * Get the state type, and write out the aggregate parameters |
| * based on the parameter list of the transition function. |
| */ |
| plist = transition->u.function.parameters; |
| state = plist->type; |
| for (plist = plist->next; plist; plist = plist->next) |
| { |
| bufcat(&buffer, plist->type); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, ") (\n"); |
| bufcat(&buffer, " stype = "); |
| bufcat(&buffer, state); |
| if (obj->u.reducer.initialize) |
| { |
| bufcat(&buffer, ",\n initcond = '"); |
| bufcat(&buffer, obj->u.reducer.initialize); |
| bufcat(&buffer, "'"); |
| } |
| bufcat(&buffer, ",\n sfunc = "); |
| if (!transition->internal) |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, transition->name); |
| if (combiner) |
| { |
| bufcat(&buffer, ",\n combinefunc = "); |
| if (!combiner->internal) |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, combiner->name); |
| } |
| |
| /* |
| * To handle set returning finalizers the finalizer is pushed |
| * into the task definition rather than being placed in the |
| * uda where it belongs. |
| */ |
| /* |
| if (obj->u.reducer.finalizer.name) |
| { |
| bufcat(&buffer, ",\n finalfunc = "); |
| bufcat(&buffer, obj->u.reducer.finalizer.name); |
| } |
| */ |
| |
| bufcat(&buffer, "\n);\n\n"); |
| break; |
| } |
| |
| case MAPRED_TASK: |
| case MAPRED_EXECUTION: |
| { |
| mapred_object_t *input = obj->u.task.input.object; |
| mapred_object_t *mapper = obj->u.task.mapper.object; |
| mapred_object_t *reducer = obj->u.task.reducer.object; |
| mapred_plist_t *columns = NULL; |
| mapred_plist_t *ingrouping = NULL; |
| mapred_plist_t *newitem = NULL; |
| mapred_plist_t *grouping = NULL; |
| mapred_plist_t *last = NULL; |
| mapred_plist_t *scan = NULL; |
| buffer_t *swap; |
| |
| if (!obj->u.task.execute) |
| XASSERT(obj->name); |
| XASSERT(obj->u.task.input.name); |
| |
| if (!qbuffer) |
| qbuffer = makebuffer(1024, 1024); |
| else |
| bufreset(qbuffer); |
| |
| /* |
| * If the task depends on an object that hasn't been created then |
| * return false, it will be resolved during a second pass |
| */ |
| if ((input && !input->created) || |
| (mapper && !mapper->created) || |
| (reducer && !reducer->created)) |
| { |
| if (global_print_flag && global_debug_flag) |
| { |
| if (obj->u.task.execute) |
| printf("-- deferring EXECUTION\n"); |
| else |
| printf("-- deferring TASK %s\n", obj->name); |
| } |
| break; |
| } |
| |
| if (global_print_flag || global_debug_flag) |
| { |
| if (obj->u.task.execute) |
| printf("-- EXECUTION\n"); |
| else |
| printf("-- TASK %s\n", obj->name); |
| } |
| |
| /* |
| * 1) Handle the INPUT, two cases: |
| * 1a) There is no MAP/REDUCE: "SELECT * FROM <input>" |
| * 1b) There is a MAP and/or REDUCE: "<input>" |
| */ |
| mapred_setup_columns(conn, obj); |
| if (!obj->u.task.mapper.name && !obj->u.task.reducer.name) |
| { |
| /* Allocate the buffer for the input. */ |
| if (input->u.input.type == MAPRED_INPUT_TABLE) |
| { |
| bufcat(&qbuffer, "SELECT * FROM "); |
| bufcat(&qbuffer, input->u.input.desc); |
| } |
| else |
| { |
| bufcat(&qbuffer, "SELECT * FROM "); |
| bufcat(&qbuffer, input->name); |
| } |
| } |
| else |
| { |
| /* Input is just the name or description of the input */ |
| if (input->u.input.type == MAPRED_INPUT_TABLE) |
| bufcat(&qbuffer, input->u.input.desc); |
| else |
| bufcat(&qbuffer, input->name); |
| } |
| |
| /* |
| * How we get the columns depends a bit on the input. |
| * Is the input actually an "MAPRED_INPUT" object, or is it |
| * a "MAPRED_TASK" object? |
| */ |
| switch (input->kind) |
| { |
| case MAPRED_INPUT: |
| columns = input->u.input.columns; |
| break; |
| |
| case MAPRED_TASK: |
| columns = input->u.task.returns; |
| ingrouping = input->u.task.grouping; |
| |
| if (!columns) |
| { |
| mapred_obj_error(obj, "Unable to determine return " |
| "columns for TASK '%s'", |
| obj->u.task.input.name); |
| XRAISE(MAPRED_PARSE_INTERNAL, NULL); |
| } |
| break; |
| |
| default: |
| mapred_obj_error(obj, "SOURCE '%s' is not an INPUT or " |
| "TASK object", |
| obj->u.task.input.name); |
| XRAISE(MAPRED_PARSE_ERROR, "Object creation Error"); |
| break; |
| } |
| XASSERT(columns); |
| |
| /* |
| * 2) Handle the MAPPER, two cases |
| * 2a) The Mapper returns an generated ADT that needs extraction |
| * "SELECT key(m), ... |
| * FROM (SELECT <map(...) as m FROM <input>) mapsubq |
| * 2b) The Mapper returns a single column: |
| * "SELECT <map>(...) FROM <input>" |
| */ |
| XASSERT(mapper || !obj->u.task.mapper.name); |
| if (mapper) |
| { |
| plist = mapper->u.function.returns; |
| plist2 = mapper->u.function.internal_returns; |
| XASSERT(plist); |
| |
| if (plist->next) |
| { /* 2a */ |
| bufcat(&buffer, "SELECT "); |
| for (; plist; plist = plist->next ) |
| { |
| if( obj->internal ) |
| { |
| XASSERT(plist2 != NULL); |
| bufcat(&buffer, plist2->name); |
| plist2 = plist2->next; |
| } |
| else |
| { |
| bufcat(&buffer, plist->name); |
| } |
| bufcat(&buffer, "(m)"); |
| bufcat(&buffer, " as "); |
| bufcat(&buffer, plist->name); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, "\nFROM ("); |
| } |
| |
| /* shared code */ |
| bufcat(&buffer, "SELECT "); |
| if (!mapper->internal) |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, mapper->name); |
| bufcat(&buffer, "("); |
| plist = mapper->u.function.parameters; |
| for (; plist; plist = plist->next) |
| { |
| /* Check if this parameter is one of the input columns */ |
| for (scan = columns; scan; scan = scan->next) |
| if (!strcasecmp(plist->name, scan->name)) |
| { |
| bufcat(&buffer, plist->name); |
| break; |
| } |
| |
| /* Task inputs also need to scan the grouping columns */ |
| if (!scan) |
| for (scan = ingrouping; scan; scan = scan->next) |
| if (!strcasecmp(plist->name, scan->name)) |
| { |
| bufcat(&buffer, plist->name); |
| break; |
| } |
| |
| /* Check if this parameter is in global_plist */ |
| if (!scan) |
| for (scan = global_plist; scan; scan = scan->next) |
| if (!strcasecmp(plist->name, scan->name)) |
| { |
| /* |
| * (HACK) |
| * Note that global_plist overloads the |
| * plist structure using the "type" field |
| * to store "value". |
| * (HACK) |
| */ |
| bufcat(&buffer, "'"); |
| bufcat(&buffer, scan->type); |
| bufcat(&buffer, "'::"); |
| bufcat(&buffer, plist->type); |
| break; |
| } |
| |
| |
| /* |
| * If we couldn't find it issue a warning and |
| * set to NULL |
| */ |
| if (!scan) |
| { |
| if (global_verbose_flag) |
| fprintf(stderr, " "); |
| fprintf(stderr, |
| "WARNING: unset parameter - " |
| "%s(%s => NULL)\n", |
| mapper->name, plist->name); |
| bufcat(&buffer, "NULL::"); |
| bufcat(&buffer, plist->type); |
| } |
| |
| /* Add a comma if there is another parameter */ |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, ") as "); |
| |
| /* break into cases again */ |
| plist = mapper->u.function.returns; |
| if (plist->next) |
| { /* 2a */ |
| bufcat(&buffer, "m FROM "); |
| bufcat(&buffer, qbuffer->buffer); |
| bufcat(&buffer, ") mapxq\n"); |
| /* |
| * Need to work this through, it seems that m is true |
| * whenever a column is null, which is not the desired |
| * behavior. |
| * |
| * Look more closely at "grep" code for why we want it, |
| * and "oreilly" code for why we don't. |
| * |
| * For the moment the compromise is that we do it only |
| * for SINGLE mode functions, since MULTI mode can |
| * control it's own filtering without this. |
| */ |
| if (mapper->u.function.mode != MAPRED_MODE_MULTI) |
| bufcat(&buffer, "WHERE m is not null"); |
| } |
| else |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, " FROM "); |
| bufcat(&buffer, qbuffer->buffer); |
| } |
| |
| /* |
| * Swap the buffer into the qbuffer for input as the next |
| * stage of the query pipeline. |
| */ |
| swap = qbuffer; |
| qbuffer = buffer; |
| buffer = swap; |
| bufreset(buffer); |
| |
| /* Columns are now the output of the mapper */ |
| columns = mapper->u.function.returns; |
| ingrouping = NULL; |
| } |
| |
| /* |
| * 3) Handle the Reducer, several sub-cases: |
| */ |
| if (obj->u.task.reducer.name) |
| { |
| /* |
| * Step 1: Determine grouping columns |
| * Find which columns are returned from the previous |
| * stage that are NOT parameters to the reducer. |
| */ |
| grouping = last = NULL; |
| if (!reducer) |
| { |
| /* |
| * We have a reducer, but it isn't listed in the YAML. |
| * How to work out parameter handling still needs to |
| * be worked out. For now we just assume that this |
| * sort of function always takes a single "value" column |
| * and returns a "value" column. |
| */ |
| for (plist = columns; plist; plist = plist->next) |
| { |
| if (strcasecmp(plist->name, "value")) |
| { |
| if (grouping) |
| { |
| last->next = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = last->next; |
| } |
| else |
| { |
| grouping = |
| mapred_malloc(sizeof(mapred_plist_t)); |
| last = grouping; |
| } |
| last->name = plist->name; |
| last->type = plist->type; |
| last->next = NULL; |
| } |
| } |
| } |
| else |
| { /* The reducer exists in the YAML */ |
| |
| /* We precalculated the grouping columns */ |
| grouping = obj->u.task.grouping; |
| } |
| |
| /* Fill in the buffer */ |
| bufcat(&buffer, "SELECT "); |
| for (plist = grouping; plist; plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, ", "); |
| } |
| |
| /* Call the aggregation function */ |
| if (reducer && !reducer->internal) |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->u.task.reducer.name); |
| bufcat(&buffer, "("); |
| |
| if (reducer) |
| { |
| plist = reducer->u.reducer.parameters; |
| for (; plist; plist = plist->next) |
| { |
| /* Check if parameter is one of the input columns */ |
| for (scan = columns; scan; scan = scan->next) |
| if (!strcasecmp(plist->name, scan->name)) |
| { |
| bufcat(&buffer, plist->name); |
| break; |
| } |
| |
| /* Task inputs need to scan the grouping columns */ |
| if (!scan) |
| for (scan = ingrouping; scan; scan = scan->next) |
| if (!strcasecmp(plist->name, scan->name)) |
| { |
| bufcat(&buffer, plist->name); |
| break; |
| } |
| |
| /* Check if this parameter is in global_plist */ |
| if (!scan) |
| { |
| for (scan = global_plist; |
| scan; |
| scan = scan->next) |
| { |
| if (!strcasecmp(plist->name, scan->name)) |
| { |
| /* |
| * (HACK) |
| * Note that global_plist overloads the |
| * plist structure using the "type" |
| * field to store "value". |
| * (HACK) |
| */ |
| bufcat(&buffer, "'"); |
| bufcat(&buffer, scan->type); |
| bufcat(&buffer, "'::"); |
| bufcat(&buffer, plist->type); |
| break; |
| } |
| } |
| } |
| |
| /* |
| * If we couldn't find it issue a warning |
| * and set to NULL |
| */ |
| if (!scan) |
| { |
| if (global_verbose_flag) |
| fprintf(stderr, " "); |
| fprintf(stderr, |
| "WARNING: unset parameter - " |
| "%s(%s => NULL)\n", |
| reducer->name, plist->name); |
| bufcat(&buffer, "NULL::"); |
| bufcat(&buffer, plist->type); |
| } |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| |
| /* Handle ORDERING, if specified */ |
| clist = reducer->u.reducer.ordering; |
| if (clist) |
| bufcat(&buffer, " ORDER BY "); |
| for(; clist; clist = clist->next) |
| { |
| bufcat(&buffer, clist->value); |
| if (clist->next) |
| bufcat(&buffer, ", "); |
| } |
| |
| } |
| else |
| { |
| /* |
| * non-yaml reducer always takes "value" as the |
| * input column |
| */ |
| |
| /* Check if "value" is one of the input columns */ |
| for (scan = columns; scan; scan = scan->next) |
| if (!strcasecmp(scan->name, "value")) |
| { |
| bufcat(&buffer, "value"); |
| break; |
| } |
| |
| /* Task inputs also need to scan the grouping columns */ |
| if (!scan) |
| for (scan = ingrouping; scan; scan = scan->next) |
| if (!strcasecmp(plist->name, scan->name)) |
| { |
| bufcat(&buffer, "value"); |
| break; |
| } |
| |
| /* Check if this parameter is in global_plist */ |
| if (!scan) |
| for (scan = global_plist; scan; scan = scan->next) |
| if (!strcasecmp(scan->name, "value")) |
| { |
| /* |
| * (HACK) |
| * Note that global_plist overloads the |
| * plist structure using the "type" field |
| * to store "value". |
| * (HACK) |
| */ |
| bufcat(&buffer, "'"); |
| bufcat(&buffer, scan->type); |
| bufcat(&buffer, "'::"); |
| bufcat(&buffer, plist->type); |
| break; |
| } |
| |
| if (!scan) |
| { |
| if (global_verbose_flag) |
| fprintf(stderr, " "); |
| fprintf(stderr, |
| "WARNING: unset parameter - " |
| "%s(value => NULL)\n", |
| obj->u.task.reducer.name); |
| bufcat(&buffer, "NULL"); |
| } |
| } |
| |
| bufcat(&buffer, ") as "); |
| |
| if (reducer) |
| { |
| plist = reducer->u.reducer.returns; |
| XASSERT(plist); /* Need to have a return! */ |
| |
| /* |
| * If the reducer has a finalizer we push it outside of |
| * the context of the UDA so that we can properly handle |
| * set returning/column returning functions. |
| */ |
| if (reducer->u.reducer.finalizer.name) |
| bufcat(&buffer, "r"); |
| else |
| bufcat(&buffer, plist->name); |
| } |
| else |
| { |
| /* |
| * non-yaml reducer always return a single column |
| * named "value" |
| */ |
| bufcat(&buffer, "value"); |
| } |
| bufcat(&buffer, "\nFROM "); |
| if (mapper) |
| { |
| bufcat(&buffer, "("); |
| bufcat(&buffer, qbuffer->buffer); |
| bufcat(&buffer, ") mapsubq"); |
| } |
| else |
| { |
| bufcat(&buffer, qbuffer->buffer); |
| } |
| if (grouping) |
| { |
| bufcat(&buffer, "\nGROUP BY "); |
| for (plist = grouping; plist; plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| } |
| |
| /* |
| * Swap the buffer into the qbuffer for input as the next |
| * stage of the query pipeline. |
| */ |
| swap = qbuffer; |
| qbuffer = buffer; |
| buffer = swap; |
| bufreset(buffer); |
| |
| /* |
| * Add the return columns to the grouping columns and set |
| * it to the current columns. |
| * |
| * Note that unlike the columns set by the mapper or the |
| * input this is a list that must be de-allocated. |
| */ |
| columns = last; |
| |
| /* |
| * If the reducer had a finalizer we push it into another |
| * nested subquery since user defined aggregates aren't |
| * allowed to return sets. |
| * |
| * NOTE: this code mostly duplicates the MAP code above |
| */ |
| if (reducer && reducer->u.reducer.finalizer.name) |
| { |
| mapred_object_t *finalizer; |
| |
| finalizer = reducer->u.reducer.finalizer.object; |
| XASSERT(finalizer); /* FIXME */ |
| XASSERT(finalizer->u.function.returns); |
| |
| /* |
| * If the finalizer returns multiple columns then we |
| * need an extra layer of wrapping to extract them. |
| */ |
| plist = finalizer->u.function.returns; |
| if (plist->next) |
| { |
| bufcat(&buffer, "SELECT "); |
| |
| /* the grouping columns */ |
| for (plist = grouping; |
| plist; |
| plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, ", "); |
| } |
| plist2 = finalizer->u.function.internal_returns; |
| for (plist = finalizer->u.function.returns; |
| plist; |
| plist = plist->next) |
| { |
| if( finalizer->internal ) |
| { |
| XASSERT( plist2 != NULL ); |
| bufcat(&buffer, plist2->name); |
| plist2 = plist2->next; |
| } |
| else |
| { |
| bufcat(&buffer, plist->name); |
| } |
| |
| bufcat(&buffer, "(r)"); |
| bufcat(&buffer, " as "); |
| bufcat(&buffer, plist->name); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, "\nFROM ("); |
| } |
| |
| /* |
| * Call the function on the returned state from |
| * the reducer. |
| */ |
| bufcat(&buffer, "SELECT "); |
| |
| /* grouping columns */ |
| for (plist = grouping; |
| plist; |
| plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, ", "); |
| } |
| |
| if (!finalizer->internal) |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, finalizer->name); |
| bufcat(&buffer, "(r) as "); |
| |
| /* break into cases again */ |
| plist = finalizer->u.function.returns; |
| if (plist->next) |
| bufcat(&buffer, "r"); |
| else |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, " FROM ("); |
| bufcat(&buffer, qbuffer->buffer); |
| bufcat(&buffer, ") redxq\n"); |
| |
| /* |
| * If we have that extra layer of wrapping |
| * then close it off |
| */ |
| if (finalizer->u.function.returns->next) |
| bufcat(&buffer, ") redsubq\n"); |
| |
| /* |
| * Swap the buffer into the qbuffer for input as the next |
| * stage of the query pipeline. |
| */ |
| swap = qbuffer; |
| qbuffer = buffer; |
| buffer = swap; |
| bufreset(buffer); |
| } |
| } |
| |
| |
| /* |
| * 4) Handle the final transform into the view definition: |
| * "CREATE TEMPORARY VIEW . AS .;" |
| */ |
| bufcat(&buffer, "CREATE TEMPORARY VIEW "); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " AS\n"); |
| bufcat(&buffer, qbuffer->buffer); |
| bufcat(&buffer, ";\n\n"); |
| |
| /* |
| * If there was a reducer then we have to release the columns |
| * list, otherwise it is a pointer to an existing list and can |
| * be ignored. |
| */ |
| if (obj->u.task.reducer.name) |
| { |
| plist = columns; |
| while (plist && plist != grouping) |
| { |
| newitem = plist; |
| plist = plist->next; |
| mapred_free(newitem); |
| } |
| } |
| break; |
| } |
| |
| case MAPRED_ADT: |
| XASSERT(obj->name); |
| mapred_setup_columns(conn, obj); |
| |
| /* |
| * ADT's have generated names that already include the |
| * document prefix |
| */ |
| bufcat(&buffer, "CREATE TYPE "); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " as ("); |
| for (plist = obj->u.adt.returns; plist; plist = plist->next) |
| { |
| bufcat(&buffer, plist->name); |
| bufcat(&buffer, " "); |
| bufcat(&buffer, plist->type); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, ");\n\n"); |
| break; |
| |
| default: |
| XASSERT(false); |
| } |
| |
| if (buffer->position > 0) |
| { |
| |
| /* |
| * In print-only mode we do everything but run the queries |
| * ie, we still create and destroy objects. |
| */ |
| if (global_print_flag || global_debug_flag) |
| printf("%s", buffer->buffer); |
| |
| /* |
| * Try to create the object, but failure should not terminate |
| * the transaction, so wrap it in a savepoint. |
| */ |
| PQexec(conn, "SAVEPOINT mapreduce_save"); |
| result = PQexec(conn, buffer->buffer); |
| if (PQresultStatus(result) == PGRES_COMMAND_OK) |
| { |
| obj->created = true; |
| PQexec(conn, "RELEASE SAVEPOINT mapreduce_save"); |
| } |
| else |
| { |
| char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE); |
| |
| /* rollback to savepoint */ |
| PQexec(conn, "ROLLBACK TO SAVEPOINT mapreduce_save"); |
| PQexec(conn, "RELEASE SAVEPOINT mapreduce_save"); |
| |
| /* |
| * If we have an "object does not exist" error from a SQL input |
| * then it may just be a dependency issue, so we don't error |
| * right away. |
| */ |
| if (obj->kind != MAPRED_INPUT || |
| obj->u.input.type != MAPRED_INPUT_QUERY || |
| strcmp(error, OBJ_DOES_NOT_EXIST)) |
| { |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| fprintf(stderr, "%s", PQresultErrorMessage(result)); |
| XRAISE(MAPRED_SQL_ERROR, "Object creation Failure"); |
| } |
| if (global_verbose_flag) |
| fprintf(stderr, " Error: %s\n", |
| PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY)); |
| |
| /* |
| * If it is an error that we think we can recover from then we don't |
| * log the error immediately, but write it to a buffer in the event |
| * that recovery wasn't successful. |
| */ |
| if (doc->errors) |
| { |
| if (global_verbose_flag) |
| bufcat(&doc->errors, " - "); |
| bufcat(&doc->errors, "Error: "); |
| bufcat(&doc->errors, (char*) mapred_kind_name[obj->kind]); |
| if (obj->name) |
| { |
| bufcat(&doc->errors, " '"); |
| bufcat(&doc->errors, obj->name); |
| bufcat(&doc->errors, "'"); |
| } |
| bufcat(&doc->errors, ": "); |
| bufcat(&doc->errors, PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY)); |
| if (obj->line > 0) |
| { |
| char numbuf[64]; |
| sprintf(numbuf, ", at line %d", obj->line); |
| bufcat(&doc->errors, numbuf); |
| } |
| bufcat(&doc->errors, "\n"); |
| } |
| } |
| } |
| |
| /* |
| * INPUTS setup columns AFTER creation. |
| * All other objects handle it above prior to creation. |
| */ |
| if (obj->kind == MAPRED_INPUT && obj->created) |
| mapred_setup_columns(conn, obj); |
| } |
| XFINALLY |
| { |
| if (buffer) |
| mapred_free(buffer); |
| if (qbuffer) |
| mapred_free(qbuffer); |
| } |
| XTRY_END; |
| |
| return obj->created; |
| } |
| |
| |
| void mapred_remove_object(PGconn *conn, mapred_document_t *doc, mapred_object_t *obj) |
| { |
| mapred_plist_t *plist = NULL; |
| const char *ckind = NULL; |
| buffer_t *buffer = NULL; |
| |
| |
| /* If the object wasn't created, then do nothing */ |
| if (!obj->created) |
| return; |
| |
| if (obj->internal) |
| { |
| obj->created = false; |
| return; |
| } |
| |
| XTRY |
| { |
| buffer = makebuffer(100, 100); |
| |
| /* Otherwise attempt to create the object */ |
| switch (obj->kind) |
| { |
| case MAPRED_INPUT: |
| XASSERT(obj->name); |
| |
| switch (obj->u.input.type) |
| { |
| case MAPRED_INPUT_TABLE: |
| obj->created = false; |
| break; /* do nothing */ |
| |
| case MAPRED_INPUT_FILE: |
| case MAPRED_INPUT_GPFDIST: |
| case MAPRED_INPUT_EXEC: |
| bufcat(&buffer, "DROP EXTERNAL TABLE IF EXISTS "); |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " CASCADE;\n"); |
| break; |
| |
| case MAPRED_INPUT_QUERY: |
| bufcat(&buffer, "DROP VIEW IF EXISTS "); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " CASCADE;\n"); |
| break; |
| |
| case MAPRED_INPUT_NONE: |
| default: |
| XASSERT(false); |
| } |
| break; |
| |
| case MAPRED_OUTPUT: |
| /* nothing to do for outputs */ |
| obj->created = false; |
| break; |
| |
| /* |
| * The function types have different defaults and generate |
| * slightly different error messages, but basically do the |
| * same thing. |
| */ |
| case MAPRED_MAPPER: |
| case MAPRED_TRANSITION: |
| case MAPRED_COMBINER: |
| case MAPRED_FINALIZER: |
| ckind = mapred_kind_name[obj->kind]; |
| |
| bufcat(&buffer, "DROP FUNCTION IF EXISTS "); |
| if (!obj->internal) |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, "("); |
| for (plist = obj->u.function.parameters; |
| plist; |
| plist = plist->next) |
| { /* Handle parameter list */ |
| bufcat(&buffer, plist->type); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, ") CASCADE;\n"); |
| break; |
| |
| |
| case MAPRED_REDUCER: |
| { |
| mapred_object_t *transition = obj->u.reducer.transition.object; |
| |
| XASSERT(obj->name); |
| XASSERT(transition); /* is this a good assumption? */ |
| |
| if (!transition->u.function.parameters) |
| break; /* FIXME */ |
| XASSERT(transition->u.function.parameters); |
| |
| bufcat(&buffer, "DROP AGGREGATE IF EXISTS "); |
| if (!obj->internal) |
| bufcat(&buffer, doc->prefix); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, "("); |
| |
| /* |
| * The first parameter of the transition function is the 'state' |
| * and is not listed as a parameter of the reducer, but all the |
| * rest of the parameters are |
| */ |
| plist = transition->u.function.parameters; |
| for (plist = plist->next; plist; plist = plist->next) |
| { |
| bufcat(&buffer, plist->type); |
| if (plist->next) |
| bufcat(&buffer, ", "); |
| } |
| bufcat(&buffer, ") CASCADE;\n"); |
| break; |
| } |
| |
| case MAPRED_TASK: |
| case MAPRED_EXECUTION: |
| XASSERT(obj->name); |
| |
| bufcat(&buffer, "DROP VIEW IF EXISTS "); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " CASCADE;\n"); |
| break; |
| |
| case MAPRED_ADT: |
| bufcat(&buffer, "DROP TYPE IF EXISTS "); |
| bufcat(&buffer, obj->name); |
| bufcat(&buffer, " CASCADE;\n"); |
| break; |
| |
| default: |
| XASSERT(false); |
| } |
| |
| if (buffer->position > 0) |
| { |
| PGresult *result; |
| |
| /* |
| * In print-only mode we do everything but run the queries |
| * ie, we still create and destroy objects. |
| */ |
| if (global_print_flag || global_debug_flag) |
| printf("%s", buffer->buffer); |
| |
| /* Try to delete the object, but don't raise exception on error */ |
| result = PQexec(conn, buffer->buffer); |
| if (PQresultStatus(result) == PGRES_COMMAND_OK) |
| obj->created = false; |
| else |
| { |
| char *error = PQresultErrorField(result, PG_DIAG_SQLSTATE); |
| |
| /* |
| * Errors that we can expect/ignore: |
| * |
| * IN_FAILED_SQL_TRANSACTION - |
| * another error has occured and the transaction was |
| * aborted |
| * |
| */ |
| if (strcmp(error, IN_FAILED_SQL_TRANSACTION)) |
| { |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| if (obj->name) |
| fprintf(stderr, |
| "[WARNING] Error dropping '%s'\n", obj->name); |
| else |
| fprintf(stderr, |
| "[WARNING] Error dropping unnamed object\n"); |
| if (global_verbose_flag) |
| fprintf(stderr, " - "); |
| fprintf(stderr, "%s", PQerrorMessage(conn)); |
| } |
| } |
| PQclear(result); |
| } |
| } |
| XFINALLY |
| { |
| if (buffer) |
| mapred_free(buffer); |
| } |
| XTRY_END; |
| } |