| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "postgres.h" |
| #include "utils/json.h" |
| |
| #include "utils/load/ag_load_edges.h" |
| #include "utils/load/ag_load_labels.h" |
| #include "utils/load/age_load.h" |
| |
| static agtype_value *csv_value_to_agtype_value(char *csv_val); |
| |
| agtype *create_empty_agtype(void) |
| { |
| agtype* out; |
| agtype_in_state result; |
| |
| memset(&result, 0, sizeof(agtype_in_state)); |
| |
| result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, |
| NULL); |
| result.res = push_agtype_value(&result.parse_state, WAGT_END_OBJECT, NULL); |
| |
| out = agtype_value_to_agtype(result.res); |
| pfree_agtype_in_state(&result); |
| |
| return out; |
| } |
| |
| /* |
| * Converts the given csv value to an agtype_value. |
| * |
| * If csv_val is not a valid json, it is wrapped by double-quotes to make it a |
| * string value. Because agtype is jsonb-like, the token should be a valid |
| * json in order to be parsed into an agtype_value of appropriate type. |
| * Finally, agtype_value_from_cstring() is called for parsing. |
| */ |
| static agtype_value *csv_value_to_agtype_value(char *csv_val) |
| { |
| char *new_csv_val; |
| agtype_value *res; |
| |
| if (!json_validate(cstring_to_text(csv_val), false, false)) |
| { |
| // wrap the string with double-quote |
| int oldlen; |
| int newlen; |
| |
| oldlen = strlen(csv_val); |
| newlen = oldlen + 2; // +2 for double-quotes |
| new_csv_val = (char *)palloc(sizeof(char) * (newlen + 1)); |
| |
| new_csv_val[0] = '"'; |
| strncpy(&new_csv_val[1], csv_val, oldlen); |
| new_csv_val[oldlen + 1] = '"'; |
| new_csv_val[oldlen + 2] = '\0'; |
| } |
| else |
| { |
| new_csv_val = csv_val; |
| } |
| |
| res = agtype_value_from_cstring(new_csv_val, strlen(new_csv_val)); |
| |
| // extract from top-level row scalar array |
| if (res->type == AGTV_ARRAY && res->val.array.raw_scalar) |
| { |
| res = &res->val.array.elems[0]; |
| } |
| |
| return res; |
| } |
| |
| agtype *create_agtype_from_list(char **header, char **fields, size_t fields_len, |
| int64 vertex_id, bool load_as_agtype) |
| { |
| agtype* out; |
| agtype_value* key_agtype; |
| agtype_value* value_agtype; |
| agtype_in_state result; |
| int i; |
| |
| memset(&result, 0, sizeof(agtype_in_state)); |
| |
| result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, |
| NULL); |
| |
| key_agtype = string_to_agtype_value("__id__"); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_KEY, |
| key_agtype); |
| |
| value_agtype = integer_to_agtype_value(vertex_id); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_VALUE, |
| value_agtype); |
| |
| pfree_agtype_value(key_agtype); |
| pfree_agtype_value(value_agtype); |
| |
| for (i = 0; i<fields_len; i++) |
| { |
| key_agtype = string_to_agtype_value(header[i]); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_KEY, |
| key_agtype); |
| |
| if (load_as_agtype) |
| { |
| value_agtype = csv_value_to_agtype_value(fields[i]); |
| } |
| else |
| { |
| value_agtype = string_to_agtype_value(fields[i]); |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_VALUE, |
| value_agtype); |
| |
| pfree_agtype_value(key_agtype); |
| pfree_agtype_value(value_agtype); |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_END_OBJECT, NULL); |
| |
| out = agtype_value_to_agtype(result.res); |
| pfree_agtype_in_state(&result); |
| |
| return out; |
| } |
| |
| agtype* create_agtype_from_list_i(char **header, char **fields, |
| size_t fields_len, size_t start_index, |
| bool load_as_agtype) |
| { |
| agtype* out; |
| agtype_value* key_agtype; |
| agtype_value* value_agtype; |
| agtype_in_state result; |
| size_t i; |
| |
| if (start_index + 1 == fields_len) |
| { |
| return create_empty_agtype(); |
| } |
| |
| memset(&result, 0, sizeof(agtype_in_state)); |
| |
| result.res = push_agtype_value(&result.parse_state, WAGT_BEGIN_OBJECT, |
| NULL); |
| |
| for (i = start_index; i < fields_len; i++) |
| { |
| key_agtype = string_to_agtype_value(header[i]); |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_KEY, |
| key_agtype); |
| |
| if (load_as_agtype) |
| { |
| value_agtype = csv_value_to_agtype_value(fields[i]); |
| } |
| else |
| { |
| value_agtype = string_to_agtype_value(fields[i]); |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_VALUE, |
| value_agtype); |
| |
| pfree_agtype_value(key_agtype); |
| pfree_agtype_value(value_agtype); |
| } |
| |
| result.res = push_agtype_value(&result.parse_state, |
| WAGT_END_OBJECT, NULL); |
| |
| out = agtype_value_to_agtype(result.res); |
| pfree_agtype_in_state(&result); |
| |
| return out; |
| } |
| |
| void insert_edge_simple(Oid graph_oid, char *label_name, graphid edge_id, |
| graphid start_id, graphid end_id, |
| agtype *edge_properties) |
| { |
| |
| Datum values[6]; |
| bool nulls[4] = {false, false, false, false}; |
| Relation label_relation; |
| HeapTuple tuple; |
| |
| // Check if label provided exists as vertex label, then throw error |
| if (get_label_kind(label_name, graph_oid) == LABEL_KIND_VERTEX) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label %s already exists as vertex label", label_name))); |
| } |
| |
| values[0] = GRAPHID_GET_DATUM(edge_id); |
| values[1] = GRAPHID_GET_DATUM(start_id); |
| values[2] = GRAPHID_GET_DATUM(end_id); |
| values[3] = AGTYPE_P_GET_DATUM((edge_properties)); |
| |
| label_relation = table_open(get_label_relation(label_name, graph_oid), |
| RowExclusiveLock); |
| |
| tuple = heap_form_tuple(RelationGetDescr(label_relation), |
| values, nulls); |
| heap_insert(label_relation, tuple, |
| GetCurrentCommandId(true), 0, NULL); |
| table_close(label_relation, RowExclusiveLock); |
| CommandCounterIncrement(); |
| } |
| |
| void insert_vertex_simple(Oid graph_oid, char *label_name, graphid vertex_id, |
| agtype *vertex_properties) |
| { |
| |
| Datum values[2]; |
| bool nulls[2] = {false, false}; |
| Relation label_relation; |
| HeapTuple tuple; |
| |
| // Check if label provided exists as edge label, then throw error |
| if (get_label_kind(label_name, graph_oid) == LABEL_KIND_EDGE) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label %s already exists as edge label", label_name))); |
| } |
| |
| values[0] = GRAPHID_GET_DATUM(vertex_id); |
| values[1] = AGTYPE_P_GET_DATUM((vertex_properties)); |
| |
| label_relation = table_open(get_label_relation(label_name, graph_oid), |
| RowExclusiveLock); |
| tuple = heap_form_tuple(RelationGetDescr(label_relation), |
| values, nulls); |
| heap_insert(label_relation, tuple, |
| GetCurrentCommandId(true), 0, NULL); |
| table_close(label_relation, RowExclusiveLock); |
| CommandCounterIncrement(); |
| } |
| |
| PG_FUNCTION_INFO_V1(load_labels_from_file); |
| Datum load_labels_from_file(PG_FUNCTION_ARGS) |
| { |
| Name graph_name; |
| Name label_name; |
| text* file_path; |
| char* graph_name_str; |
| char* label_name_str; |
| char* file_path_str; |
| Oid graph_oid; |
| int32 label_id; |
| bool id_field_exists; |
| bool load_as_agtype; |
| |
| if (PG_ARGISNULL(0)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("graph name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(1)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(2)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("file path must not be NULL"))); |
| } |
| |
| graph_name = PG_GETARG_NAME(0); |
| label_name = PG_GETARG_NAME(1); |
| file_path = PG_GETARG_TEXT_P(2); |
| id_field_exists = PG_GETARG_BOOL(3); |
| load_as_agtype = PG_GETARG_BOOL(4); |
| |
| |
| graph_name_str = NameStr(*graph_name); |
| label_name_str = NameStr(*label_name); |
| file_path_str = text_to_cstring(file_path); |
| |
| graph_oid = get_graph_oid(graph_name_str); |
| label_id = get_label_id(label_name_str, graph_oid); |
| |
| create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid, |
| label_name_str, label_id, id_field_exists, |
| load_as_agtype); |
| PG_RETURN_VOID(); |
| |
| } |
| |
| PG_FUNCTION_INFO_V1(load_edges_from_file); |
| Datum load_edges_from_file(PG_FUNCTION_ARGS) |
| { |
| |
| Name graph_name; |
| Name label_name; |
| text* file_path; |
| char* graph_name_str; |
| char* label_name_str; |
| char* file_path_str; |
| Oid graph_oid; |
| int32 label_id; |
| bool load_as_agtype; |
| |
| if (PG_ARGISNULL(0)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("graph name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(1)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("label name must not be NULL"))); |
| } |
| |
| if (PG_ARGISNULL(2)) |
| { |
| ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("file path must not be NULL"))); |
| } |
| |
| graph_name = PG_GETARG_NAME(0); |
| label_name = PG_GETARG_NAME(1); |
| file_path = PG_GETARG_TEXT_P(2); |
| load_as_agtype = PG_GETARG_BOOL(3); |
| |
| graph_name_str = NameStr(*graph_name); |
| label_name_str = NameStr(*label_name); |
| file_path_str = text_to_cstring(file_path); |
| |
| graph_oid = get_graph_oid(graph_name_str); |
| label_id = get_label_id(label_name_str, graph_oid); |
| |
| create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid, |
| label_name_str, label_id, load_as_agtype); |
| PG_RETURN_VOID(); |
| |
| } |