| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /* |
| * cdbparquetstoragewrite.c |
| |
| * |
| * Created on: Jul 30, 2013 |
| * Author: malili |
| */ |
| |
| #include "postgres.h" |
| |
| #include "catalog/catquery.h" |
| #include "cdb/cdbparquetstoragewrite.h" |
| #include "cdb/cdbparquetfooterserializer.h" |
| #include "lib/stringinfo.h" |
| #include "utils/cash.h" |
| #include "utils/geo_decls.h" |
| #include "utils/date.h" |
| #include "utils/numeric.h" |
| #include "utils/xml.h" |
| #include "utils/inet.h" |
| |
| #include "snappy-c.h" |
| #include "zlib.h" |
| |
| |
| static void addDataPage( |
| ParquetColumnChunk columnChunk); |
| |
| static int encodeCurrentPage( |
| ParquetColumnChunk chunk); |
| |
| static int finalizeCurrentAndNewPage( |
| ParquetColumnChunk columnChunk); |
| |
| static void flushDataPage( |
| ParquetColumnChunk chunk, |
| int page_number); |
| |
| static void initGroupType( |
| FileField_4C *field, |
| char *name, |
| RepetitionType repetitionType, |
| int hawqType, |
| int r, |
| int d, |
| int depth, |
| int numChildren, |
| char *parentPathInSchema); |
| |
| static void initPrimitiveType( |
| FileField_4C *field, |
| char *name, |
| RepetitionType repetitionType, |
| int hawqType, |
| int typeLen, |
| int r, |
| int d, |
| int depth, |
| char *parentPathInSchema); |
| |
| /* init embedded data types */ |
| static void initPointType( |
| FileField_4C *pointField, |
| Form_pg_attribute att); |
| |
| static void initPathType( |
| FileField_4C *pathField, |
| Form_pg_attribute att); |
| |
| static void initLsegBoxType( |
| FileField_4C *lsegBoxField, |
| Form_pg_attribute att); |
| |
| static void initPolygonType( |
| FileField_4C *polygonField, |
| Form_pg_attribute att); |
| |
| static void initCircleType( |
| FileField_4C *circleField, |
| Form_pg_attribute att); |
| |
| static void addSingleColumn( |
| AppendOnlyEntry *catalog, |
| struct ColumnChunkMetadata_4C** columnsMetadata, |
| ParquetColumnChunk columns, |
| int *estimateChunkSizes, |
| struct FileField_4C *field, |
| int *colIndex, |
| File parquetFile); |
| |
| static int appendNullForFields( |
| struct FileField_4C *field, |
| ParquetColumnChunk columnChunks, |
| int *colIndex); |
| |
| static int appendValueForFields( |
| struct FileField_4C *field, |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| Datum value); |
| |
| static int appendParquetColumnNull( |
| ParquetColumnChunk columnChunk); |
| |
| static int appendParquetColumnValue( |
| ParquetColumnChunk columnChunk, |
| Datum value, |
| int r, |
| int d); |
| |
| /*---------------------------------------------------------------- |
| * append column of geometric types |
| * - point, lseg, path, box, polygon, circle |
| *----------------------------------------------------------------*/ |
| static int appendParquetColumn_Point( |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| Point *point, |
| int r, |
| int d); |
| |
| static int appendParquetColumn_Lseg( |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| LSEG *lseg, |
| int r, |
| int d); |
| |
| static int appendParquetColumn_Path( |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| PATH *path, |
| int r, |
| int d); |
| |
| static int appendParquetColumn_Box( |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| BOX *box, |
| int r, |
| int d); |
| |
| static int appendParquetColumn_Polygon( |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| POLYGON *polygon, |
| int r, |
| int d); |
| |
| static int appendParquetColumn_Circle( |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| CIRCLE *circle, |
| int r, |
| int d); |
| |
| /* Used by cdbparquetfooterserializer.c */ |
| char *generateHAWQSchemaStr( |
| ParquetFileField pfields, |
| int fieldCount); |
| |
| static void estimateColumnWidths( |
| int *columnWidths, |
| int ncolumns, |
| TupleDesc tableAttrs); |
| |
| static char *getTypeName(Oid typeOid); |
| |
| static int encodePlain( |
| Datum data, |
| ParquetDataPage current_page, |
| int hawqTypeId, |
| int pageSizeLimit); |
| |
| static int approximatePageSize(ParquetDataPage page); |
| |
| static bool ensureBufferCapacity(ParquetDataPage page, |
| int newValueSize, |
| int pageSizeLimit); |
| |
| #define ENCODE_INVALID_VALUE -1 |
| #define ENCODE_OUTOF_PAGE -2 |
| |
| /** |
| * generate hawq schema in to string. for example: |
| * |
| * message person { |
| * required varchar name; |
| * required int2 age; |
| * optional group home_addr (address) { |
| * required varchar street; |
| * required varchar city; |
| * required varchar state; |
| * required int4 zip; |
| * } |
| * required varchar[] tags; |
| * } |
| |
| What about Array? |
| */ |
| char * |
| generateHAWQSchemaStr(ParquetFileField pfields, |
| int fieldCount) |
| { |
| StringInfoData schemaBuf; |
| initStringInfo(&schemaBuf); |
| appendStringInfo(&schemaBuf, "message hawqschema {"); |
| |
| for (ParquetFileField field = pfields; field < pfields + fieldCount; field++) |
| { |
| /* TODO add ARRAY and UDF type support */ |
| char *typeName = getTypeName(field->hawqTypeId); |
| appendStringInfo(&schemaBuf, "%s %s %s;", |
| (field->repetitionType == REQUIRED) ? "required" : "optional", |
| typeName, |
| field->name); |
| pfree(typeName); |
| } |
| |
| appendStringInfo(&schemaBuf, "}"); |
| return schemaBuf.data; |
| } |
| |
| int |
| initparquetMetadata(ParquetMetadata parquetmd, |
| TupleDesc tableAttrs, |
| File parquetFile) |
| { |
| Form_pg_attribute att; |
| ParquetFileField field; |
| |
| parquetmd->version = CURRENT_PARQUET_VERSION; |
| parquetmd->fieldCount = tableAttrs->natts; |
| parquetmd->pfield = palloc0(tableAttrs->natts * sizeof(struct FileField_4C)); |
| int colCount = 0; |
| int schemaTreeNodeCount = 0; |
| for (int i = 0; i < tableAttrs->natts; i++) |
| { |
| att = tableAttrs->attrs[i]; |
| field = &parquetmd->pfield[i]; |
| |
| switch (att->atttypid) |
| { |
| /** basic types */ |
| case HAWQ_TYPE_BOOL: |
| case HAWQ_TYPE_BYTE: |
| case HAWQ_TYPE_INT2: |
| case HAWQ_TYPE_INT4: |
| case HAWQ_TYPE_MONEY: |
| case HAWQ_TYPE_INT8: |
| case HAWQ_TYPE_FLOAT4: |
| case HAWQ_TYPE_FLOAT8: |
| case HAWQ_TYPE_NUMERIC: |
| /* text related types */ |
| case HAWQ_TYPE_NAME: |
| case HAWQ_TYPE_CHAR: |
| case HAWQ_TYPE_BPCHAR: |
| case HAWQ_TYPE_VARCHAR: |
| case HAWQ_TYPE_TEXT: |
| case HAWQ_TYPE_XML: |
| /* time related types */ |
| case HAWQ_TYPE_DATE: |
| case HAWQ_TYPE_TIME: |
| case HAWQ_TYPE_TIMETZ: |
| case HAWQ_TYPE_TIMESTAMP: |
| case HAWQ_TYPE_TIMESTAMPTZ: |
| case HAWQ_TYPE_INTERVAL: |
| /* other types */ |
| case HAWQ_TYPE_MACADDR: |
| case HAWQ_TYPE_INET: |
| case HAWQ_TYPE_CIDR: |
| case HAWQ_TYPE_BIT: |
| case HAWQ_TYPE_VARBIT: |
| initPrimitiveType(field, |
| NameStr(att->attname), /* field name*/ |
| att->attnotnull ? REQUIRED : OPTIONAL, /* repetition */ |
| att->atttypid, /* HAWQ type */ |
| att->attlen, /* type len */ |
| 0, /* r */ |
| att->attnotnull ? 0 : 1, /* d */ |
| 1, /* depth */ |
| NULL); /* parent field path */ |
| colCount += 1; |
| schemaTreeNodeCount += 1; |
| break; |
| /** embedded types */ |
| case HAWQ_TYPE_POINT: |
| initPointType(field, att); |
| colCount += 2; |
| schemaTreeNodeCount += 3; |
| break; |
| case HAWQ_TYPE_PATH: |
| initPathType(field, att); |
| colCount += 3; |
| schemaTreeNodeCount += 5; |
| break; |
| case HAWQ_TYPE_LSEG: |
| case HAWQ_TYPE_BOX: |
| initLsegBoxType(field, att); |
| colCount += 4; |
| schemaTreeNodeCount += 5; |
| break; |
| case HAWQ_TYPE_POLYGON: |
| initPolygonType(field, att); |
| colCount += 6; |
| schemaTreeNodeCount += 9; |
| break; |
| case HAWQ_TYPE_CIRCLE: |
| initCircleType(field, att); |
| colCount += 3; |
| schemaTreeNodeCount += 4; |
| break; |
| default: |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_FEATURE_NOT_SUPPORTED), |
| errmsg("unsupport type '%s'", NameStr(att->attname)))); |
| break; |
| } |
| } |
| parquetmd->colCount = colCount; |
| parquetmd->schemaTreeNodeCount = schemaTreeNodeCount; |
| |
| parquetmd->hawqschemastr = generateHAWQSchemaStr(parquetmd->pfield, |
| parquetmd->fieldCount); |
| return 0; |
| } |
| |
| /* |
| * init primitive type |
| */ |
| void initPrimitiveType(struct FileField_4C *field, |
| char *name, |
| enum RepetitionType repetitionType, |
| int hawqType, |
| int typeLen, |
| int r, |
| int d, |
| int depth, |
| char *parentPathInSchema) |
| { |
| int nameLen = strlen(name); |
| int pathInSchemaLen; |
| /*initialize name*/ |
| field->name = (char*) palloc0(nameLen + 1); |
| strcpy(field->name, name); |
| |
| /*initialize pathInSchema, should be parentPathInSchema:Name*/ |
| if (parentPathInSchema == NULL) { |
| pathInSchemaLen = nameLen; |
| field->pathInSchema = (char*) palloc0(pathInSchemaLen + 1); |
| strcpy(field->pathInSchema, name); |
| } else { |
| pathInSchemaLen = strlen(parentPathInSchema) + nameLen + 1; |
| field->pathInSchema = (char*) palloc0(pathInSchemaLen + 1); |
| strcpy(field->pathInSchema, parentPathInSchema); |
| strcat(field->pathInSchema, ":"); |
| strcat(field->pathInSchema, name); |
| } |
| |
| /*initialize other fields*/ |
| field->repetitionType = repetitionType; |
| field->type = mappingHAWQType(hawqType); |
| field->hawqTypeId = hawqType; |
| field->typeLength = typeLen; |
| field->r = r; |
| field->d = d; |
| field->depth = depth; |
| } |
| |
| /** |
| * initialize group type |
| */ |
| void initGroupType(struct FileField_4C *field, |
| char *name, |
| enum RepetitionType repetitionType, |
| int hawqType, |
| int r, |
| int d, |
| int depth, |
| int numChildren, |
| char *parentPathInSchema) |
| { |
| int nameLen = strlen(name); |
| int pathInSchemaLen; |
| /*initialize name*/ |
| field->name = (char *) palloc0 (nameLen + 1); |
| strcpy(field->name, name); |
| |
| /*initialize pathInSchema, should be parentPathInSchema:Name*/ |
| if (parentPathInSchema == NULL) { |
| pathInSchemaLen = nameLen; |
| field->pathInSchema = (char*) palloc0(pathInSchemaLen + 1); |
| strcpy(field->pathInSchema, name); |
| } else { |
| pathInSchemaLen = strlen(parentPathInSchema) + nameLen + 1; |
| field->pathInSchema = (char*) palloc0(pathInSchemaLen + 1); |
| strcpy(field->pathInSchema, parentPathInSchema); |
| strcat(field->pathInSchema, ":"); |
| strcat(field->pathInSchema, name); |
| } |
| |
| /*initialize other fields*/ |
| field->repetitionType = repetitionType; |
| field->hawqTypeId = hawqType; |
| field->r = r; |
| field->d = d; |
| field->depth = depth; |
| field->num_children = numChildren; |
| field->children = |
| (struct FileField_4C*) palloc0 (sizeof(struct FileField_4C) * field->num_children); |
| } |
| |
| /** |
| * initPointType |
| * point {required double x; required double y} |
| */ |
| void initPointType(struct FileField_4C *pointField, Form_pg_attribute att) { |
| /*point itself*/ |
| enum RepetitionType repetitonType = att->attnotnull ? REQUIRED : OPTIONAL; |
| int r = 0; |
| int d = att->attnotnull ? 0 : 1; |
| int depth = 1; |
| int numChildren = 2; |
| char *parentPathInSchema = NULL; |
| |
| initGroupType(pointField, NameStr(att->attname), repetitonType, att->atttypid, |
| r, d, depth, |
| numChildren, parentPathInSchema); |
| |
| /*point:x*/ |
| struct FileField_4C *child_0 = &(pointField->children[0]); |
| initPrimitiveType(child_0, "x", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| pointField->r, pointField->d, pointField->depth + 1, |
| pointField->pathInSchema); |
| |
| /*point:y*/ |
| struct FileField_4C *child_1 = &(pointField->children[1]); |
| initPrimitiveType(child_1, "y", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| pointField->r, pointField->d, pointField->depth + 1, |
| pointField->pathInSchema); |
| } |
| |
| /** |
| * initPathType |
| * path: group { required boolean is_open; |
| * repeated group points {required double x; required double y;}} |
| */ |
| void initPathType(struct FileField_4C* pathField, Form_pg_attribute att) { |
| /*path itself*/ |
| enum RepetitionType repetitionType = att->attnotnull ? REQUIRED : OPTIONAL; |
| int r = 0; |
| int d = att->attnotnull ? 0 : 1; |
| int depth = 1; |
| int numChildren = 2; |
| char *parentPathInSchema = NULL; |
| initGroupType(pathField, NameStr(att->attname), repetitionType, att->atttypid, |
| r, d, depth, |
| numChildren, parentPathInSchema); |
| |
| /* path:is_open */ |
| struct FileField_4C *child_0 = &(pathField->children[0]); |
| initPrimitiveType(child_0, "is_open", REQUIRED, HAWQ_TYPE_BOOL, /*FIXME is bool typeLen 1?*/1, |
| pathField->r, pathField->d, pathField->depth + 1, |
| pathField->pathInSchema); |
| |
| /* path:points */ |
| struct FileField_4C *child_1 = &(pathField->children[1]); |
| initGroupType(child_1, "points", REPEATED, HAWQ_TYPE_POINT, |
| pathField->r + 1, pathField->d + 1, pathField->depth + 1, |
| 2/*numChildren*/, pathField->pathInSchema); |
| |
| /* path:points:x */ |
| struct FileField_4C * child_1_0 = &(child_1->children[0]); |
| initPrimitiveType(child_1_0, "x", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| child_1->r, child_1->d, child_1->depth + 1, |
| child_1->pathInSchema); |
| |
| /* path:points:y */ |
| struct FileField_4C * child_1_1 = &(child_1->children[1]); |
| initPrimitiveType(child_1_1, "y", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| child_1->r, child_1->d, child_1->depth + 1, |
| child_1->pathInSchema); |
| } |
| |
| /** |
| * initLsegBoxType |
| * lseg(601) group {required double x1; required double y1; required double x2; required double y2;} |
| * |
| * box(603) group {required double x1; required double y1; required double x2; required double y2;} |
| * |
| */ |
| void initLsegBoxType(struct FileField_4C *lsegBoxField, Form_pg_attribute att) { |
| /*lseg/box itself*/ |
| enum RepetitionType repetitionType = att->attnotnull ? REQUIRED : OPTIONAL; |
| int r = 0; |
| int d = att->attnotnull ? 0 : 1; |
| int depth = 1; |
| int numChildren = 4; |
| char *parentPathInSchema = NULL; |
| initGroupType(lsegBoxField, NameStr(att->attname), repetitionType, att->atttypid, |
| r, d, depth, |
| numChildren, parentPathInSchema); |
| |
| struct FileField_4C *child_0 = &(lsegBoxField->children[0]); |
| initPrimitiveType(child_0, "x1", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| lsegBoxField->r, lsegBoxField->d, lsegBoxField->depth + 1, |
| lsegBoxField->pathInSchema); |
| |
| struct FileField_4C *child_1 = &(lsegBoxField->children[1]); |
| initPrimitiveType(child_1, "y1", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| lsegBoxField->r, lsegBoxField->d, lsegBoxField->depth + 1, |
| lsegBoxField->pathInSchema); |
| |
| struct FileField_4C *child_2 = &(lsegBoxField->children[2]); |
| initPrimitiveType(child_2, "x2", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| lsegBoxField->r, lsegBoxField->d, lsegBoxField->depth + 1, |
| lsegBoxField->pathInSchema); |
| |
| struct FileField_4C *child_3 = &(lsegBoxField->children[3]); |
| initPrimitiveType(child_3, "y2", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| lsegBoxField->r, lsegBoxField->d, lsegBoxField->depth + 1, |
| lsegBoxField->pathInSchema); |
| } |
| |
| /** |
| * init polygon type. |
| * group { |
| * required group boundbox { |
| * required double x1; |
| * required double y1; |
| * required double x2; |
| * required double y2; |
| * }, |
| * repeated group points { |
| * required double x; |
| * required double y; |
| * } |
| * } |
| */ |
| void initPolygonType(FileField_4C *polygonField,Form_pg_attribute att) |
| { |
| FileField_4C *box, *points, *f; |
| RepetitionType repetitionType = att->attnotnull ? REQUIRED : OPTIONAL; |
| int r = 0; |
| int d = att->attnotnull ? 0 : 1; |
| int depth = 1; |
| int numChildren = 2; |
| char *parentPathInSchema = NULL; |
| |
| /* polygon is a group */ |
| initGroupType(polygonField, NameStr(att->attname), repetitionType, att->atttypid, |
| r, d, depth, |
| numChildren, parentPathInSchema); |
| box = polygonField->children; |
| points = polygonField->children + 1; |
| |
| /* polygon:boundbox */ |
| initGroupType(box, "boundbox", REQUIRED, HAWQ_TYPE_BOX, |
| polygonField->r, polygonField->d, polygonField->depth + 1, |
| 4 /* numchild */, polygonField->pathInSchema); |
| /* polygon:points */ |
| initGroupType(points, "points", REPEATED, HAWQ_TYPE_POINT, |
| polygonField->r + 1, polygonField->d + 1, polygonField->depth + 1, |
| 2 /* numchild */, polygonField->pathInSchema); |
| |
| /* polygon:boundbox:{x1,y1,x2,y2} */ |
| f = box->children; |
| initPrimitiveType(f, "x1", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| box->r, box->d, box->depth + 1, |
| box->pathInSchema); |
| f = box->children + 1; |
| initPrimitiveType(f, "y1", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| box->r, box->d, box->depth + 1, |
| box->pathInSchema); |
| f = box->children + 2; |
| initPrimitiveType(f, "x2", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| box->r, box->d, box->depth + 1, |
| box->pathInSchema); |
| f = box->children + 3; |
| initPrimitiveType(f, "y2", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| box->r, box->d, box->depth + 1, |
| box->pathInSchema); |
| |
| /* polygon:points:{x,y} */ |
| f = points->children; |
| initPrimitiveType(f, "x", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| points->r, points->d, points->depth + 1, |
| points->pathInSchema); |
| f = points->children + 1; |
| initPrimitiveType(f, "y", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| points->r, points->d, points->depth + 1, |
| points->pathInSchema); |
| } |
| |
| /** |
| * initCircleType |
| * |
| * circle(718) group {required double x; required double y; required double r;} |
| * |
| */ |
| void initCircleType(struct FileField_4C *circleField, Form_pg_attribute att) { |
| /*circle itself*/ |
| enum RepetitionType repetitionType = att->attnotnull ? REQUIRED : OPTIONAL; |
| int r = 0; |
| int d = att->attnotnull ? 0 : 1; |
| int depth = 1; |
| int numChildren = 3; |
| char *parentPathInSchema = NULL; |
| initGroupType(circleField, NameStr(att->attname), repetitionType, att->atttypid, |
| r, d, depth, numChildren, parentPathInSchema); |
| |
| struct FileField_4C *child_0 = &(circleField->children[0]); |
| initPrimitiveType(child_0, "x", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| circleField->r, circleField->d, circleField->depth + 1, |
| circleField->pathInSchema); |
| |
| struct FileField_4C *child_1 = &(circleField->children[1]); |
| initPrimitiveType(child_1, "y", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| circleField->r, circleField->d, circleField->depth + 1, |
| circleField->pathInSchema); |
| |
| struct FileField_4C *child_2 = &(circleField->children[2]); |
| initPrimitiveType(child_2, "r", REQUIRED, HAWQ_TYPE_FLOAT8, 8, |
| circleField->r, circleField->d, circleField->depth + 1, |
| circleField->pathInSchema); |
| } |
| |
| int |
| mappingHAWQType(int hawqTypeID) |
| { |
| switch (hawqTypeID) |
| { |
| case HAWQ_TYPE_BOOL: |
| return BOOLEAN; |
| |
| case HAWQ_TYPE_INT2: |
| case HAWQ_TYPE_INT4: |
| case HAWQ_TYPE_DATE: |
| return INT32; |
| |
| case HAWQ_TYPE_INT8: |
| case HAWQ_TYPE_TIME: |
| case HAWQ_TYPE_TIMESTAMPTZ: |
| case HAWQ_TYPE_TIMESTAMP: |
| case HAWQ_TYPE_MONEY: |
| return INT64; |
| |
| case HAWQ_TYPE_FLOAT4: |
| return FLOAT; |
| |
| case HAWQ_TYPE_FLOAT8: |
| return DOUBLE; |
| |
| case HAWQ_TYPE_BIT: |
| case HAWQ_TYPE_VARBIT: |
| case HAWQ_TYPE_BYTE: |
| case HAWQ_TYPE_NUMERIC: |
| case HAWQ_TYPE_NAME: |
| case HAWQ_TYPE_CHAR: |
| case HAWQ_TYPE_BPCHAR: |
| case HAWQ_TYPE_VARCHAR: |
| case HAWQ_TYPE_TEXT: |
| case HAWQ_TYPE_XML: |
| case HAWQ_TYPE_TIMETZ: |
| case HAWQ_TYPE_INTERVAL: |
| case HAWQ_TYPE_MACADDR: |
| case HAWQ_TYPE_INET: |
| case HAWQ_TYPE_CIDR: |
| return BINARY; |
| |
| default: |
| Insist(false); |
| return -1; |
| } |
| } |
| |
| void estimateColumnWidth(int *columnWidths, |
| int *colidx, |
| Form_pg_attribute att, |
| bool expandEmbeddingType){ |
| switch (att->atttypid) |
| { |
| /* fixed size type */ |
| case HAWQ_TYPE_BOOL: |
| case HAWQ_TYPE_INT2: |
| case HAWQ_TYPE_INT4: |
| case HAWQ_TYPE_INT8: |
| case HAWQ_TYPE_FLOAT4: |
| case HAWQ_TYPE_FLOAT8: |
| case HAWQ_TYPE_DATE: |
| case HAWQ_TYPE_TIME: |
| case HAWQ_TYPE_TIMETZ: |
| case HAWQ_TYPE_TIMESTAMP: |
| case HAWQ_TYPE_TIMESTAMPTZ: |
| case HAWQ_TYPE_INTERVAL: |
| case HAWQ_TYPE_NAME: |
| case HAWQ_TYPE_MONEY: |
| case HAWQ_TYPE_MACADDR: |
| Assert(att->attlen > 0); |
| columnWidths[(*colidx)++] = att->attlen; |
| break; |
| |
| /* variable length type */ |
| case HAWQ_TYPE_CHAR: |
| /* for char(n), atttypmod is n + 4 */ |
| Assert(att->atttypmod > 4); |
| columnWidths[(*colidx)++] = att->atttypmod; |
| break; |
| case HAWQ_TYPE_BPCHAR: |
| case HAWQ_TYPE_VARCHAR: |
| case HAWQ_TYPE_TEXT: |
| case HAWQ_TYPE_XML: |
| if (att->atttypmod > 4) |
| { /* for varchar(n), atttypmod is n + 4 */ |
| columnWidths[(*colidx)++] = att->atttypmod; |
| } |
| else |
| { /* for varchar, text, xml */ |
| columnWidths[(*colidx)++] = 30; |
| } |
| break; |
| |
| case HAWQ_TYPE_BIT: |
| case HAWQ_TYPE_VARBIT: |
| if (att->atttypmod > 0) |
| { /* for bit(n) and bit varying (n), atttypmod is n, |
| * but we also have 4 bytes binary header */ |
| columnWidths[(*colidx)++] = 4 + att->atttypmod; |
| } |
| else |
| { |
| columnWidths[(*colidx)++] = 20; |
| } |
| break; |
| |
| case HAWQ_TYPE_BYTE: |
| case HAWQ_TYPE_NUMERIC: |
| case HAWQ_TYPE_INET: |
| case HAWQ_TYPE_CIDR: |
| columnWidths[(*colidx)++] = 24; |
| break; |
| |
| /* maps to multiple columns */ |
| case HAWQ_TYPE_POINT: |
| if (expandEmbeddingType) { |
| columnWidths[(*colidx)++] = 8; /* x */ |
| columnWidths[(*colidx)++] = 8; /* y */ |
| } else { |
| columnWidths[(*colidx)++] = 16; |
| } |
| break; |
| case HAWQ_TYPE_PATH: |
| if (expandEmbeddingType) { |
| columnWidths[(*colidx)++] = 1; /* is_open */ |
| columnWidths[(*colidx)++] = 24;/* repeated points.x */ |
| columnWidths[(*colidx)++] = 24;/* repeated points.y */ |
| } else { |
| columnWidths[(*colidx)++] = 49; |
| } |
| break; |
| case HAWQ_TYPE_LSEG: |
| case HAWQ_TYPE_BOX: |
| if (expandEmbeddingType) { |
| columnWidths[(*colidx)++] = 8; /* x1 */ |
| columnWidths[(*colidx)++] = 8; /* y1 */ |
| columnWidths[(*colidx)++] = 8; /* x2 */ |
| columnWidths[(*colidx)++] = 8; /* y2 */ |
| } else { |
| columnWidths[(*colidx)++] = 32; |
| } |
| |
| break; |
| case HAWQ_TYPE_POLYGON: |
| if (expandEmbeddingType) { |
| columnWidths[(*colidx)++] = 8; /* boundbox.x1 */ |
| columnWidths[(*colidx)++] = 8; /* boundbox.y1 */ |
| columnWidths[(*colidx)++] = 8; /* boundbox.x2 */ |
| columnWidths[(*colidx)++] = 8; /* boundbox.y2 */ |
| columnWidths[(*colidx)++] = 24;/* repeated points.x */ |
| columnWidths[(*colidx)++] = 24;/* repeated points.y */ |
| } else { |
| columnWidths[(*colidx)++] = 80; |
| } |
| break; |
| case HAWQ_TYPE_CIRCLE: |
| if (expandEmbeddingType) { |
| columnWidths[(*colidx)++] = 8; /* x */ |
| columnWidths[(*colidx)++] = 8; /* y */ |
| columnWidths[(*colidx)++] = 8; /* r */ |
| } else { |
| columnWidths[(*colidx)++] = 24; |
| } |
| break; |
| default: |
| Insist(false); |
| break; |
| } |
| } |
| |
| static void |
| estimateColumnWidths(int *columnWidths, |
| int ncolumns, |
| TupleDesc tableAttrs) |
| { |
| int colidx = 0; |
| for (int i = 0; i < tableAttrs->natts; i++) |
| { |
| estimateColumnWidth(columnWidths, &colidx, tableAttrs->attrs[i], |
| /*expandEmbeddingType*/ true); |
| } |
| Assert(colidx == ncolumns); |
| |
| const int min_column_width = 1; |
| const int max_column_width = 100; |
| |
| /* make sure columnwidth in reasonable range to avoid some column |
| * got too low or too high weight */ |
| for (colidx = 0; colidx < ncolumns; colidx++) |
| { |
| Insist(columnWidths[colidx] >= min_column_width); |
| if (columnWidths[colidx] > max_column_width) |
| { |
| columnWidths[colidx] = max_column_width; |
| } |
| } |
| } |
| |
| ParquetRowGroup |
| addRowGroup(ParquetMetadata parquetmd, |
| TupleDesc tableAttrs, |
| AppendOnlyEntry *aoentry, |
| File file) |
| { |
| ParquetRowGroup rowgroup = palloc0(sizeof(struct ParquetRowGroup_S)); |
| |
| /* ParquetRowGroup */ |
| rowgroup->catalog = aoentry; |
| rowgroup->rowGroupMetadata = palloc0(sizeof(struct BlockMetadata_4C)); |
| rowgroup->columnChunkNumber = parquetmd->colCount; |
| rowgroup->columnChunks = palloc0(parquetmd->colCount * sizeof(struct ParquetColumnChunk_S)); |
| rowgroup->parquetFile = file; |
| |
| /* RowGroupMetadata */ |
| RowGroupMetadata rowgroupmd = rowgroup->rowGroupMetadata; |
| rowgroupmd->ColChunkCount = parquetmd->colCount; |
| rowgroupmd->columns = palloc0(parquetmd->colCount * sizeof(struct ColumnChunkMetadata_4C)); |
| rowgroupmd->rowCount = 0; |
| rowgroupmd->totalByteSize = 0; |
| |
| if (parquetmd->estimateChunkSizes == NULL) |
| { |
| parquetmd->estimateChunkSizes = (int *)palloc0(parquetmd->colCount * sizeof(int)); |
| |
| int *columnWidths = (int *)palloc0(parquetmd->colCount * sizeof(int)); |
| estimateColumnWidths(columnWidths, parquetmd->colCount, tableAttrs); |
| |
| double rowWidths = 0.0; |
| for (int i = 0; i < parquetmd->colCount; i++) |
| { |
| rowWidths += columnWidths[i]; |
| } |
| |
| for (int i = 0; i < parquetmd->colCount; i++) |
| { |
| parquetmd->estimateChunkSizes[i] = |
| (int) ((columnWidths[i] * rowgroup->catalog->blocksize * 1.05)/rowWidths); |
| } |
| |
| pfree(columnWidths); |
| } |
| |
| /* |
| * init ParquetColumnChunk (including it's metadata) for each column |
| */ |
| int cIndex = 0; |
| for (int i = 0; i < parquetmd->fieldCount; ++i) |
| { |
| addSingleColumn(rowgroup->catalog, |
| &rowgroupmd->columns, /* for ColumnChunkMetadata */ |
| rowgroup->columnChunks, /* for ParquetColumnChunk */ |
| parquetmd->estimateChunkSizes, |
| /*parquet column chunk max size*/ |
| &parquetmd->pfield[i], |
| &cIndex, |
| rowgroup->parquetFile); |
| } |
| Assert(cIndex == parquetmd->colCount); |
| |
| /* should increment the row group count of parquet metadata*/ |
| parquetmd->blockCount++; |
| return rowgroup; |
| } |
| |
| void |
| flushRowGroup(ParquetRowGroup rowgroup, |
| ParquetMetadata parquetmd, |
| MirroredAppendOnlyOpen *mirroredOpen, |
| CompactProtocol *footerProtocol, |
| int64 *fileLen, |
| int64 *fileLen_uncompressed) |
| { |
| int bytes_added = 0; |
| Assert(rowgroup != NULL); |
| |
| /* |
| * Write out column chunks one by one. For each chunk, we do the following: |
| * 1. encode the last page. |
| * 2. write out pages one by one. |
| * 3. write out chunk's metadata after the last page. |
| */ |
| for (int i = 0; i < rowgroup->columnChunkNumber; i++) |
| { |
| ParquetColumnChunk chunk = &rowgroup->columnChunks[i]; |
| ColumnChunkMetadata chunkmd = chunk->columnChunkMetadata; |
| |
| bytes_added += encodeCurrentPage(chunk); |
| |
| /*---------------------------------------------------------------- |
| * recompute estimate chunk size based on uncompressed size (excludes header) |
| *----------------------------------------------------------------*/ |
| parquetmd->estimateChunkSizes[i] = 0; |
| for (int pageno = 0; pageno < chunk->pageNumber; ++pageno) |
| { |
| parquetmd->estimateChunkSizes[i] += chunk->pages[pageno].header->uncompressed_page_size; |
| } |
| parquetmd->estimateChunkSizes[i] = (int) (parquetmd->estimateChunkSizes[i] * 1.05); |
| |
| /*---------------------------------------------------------------- |
| * write out pages one by one |
| *----------------------------------------------------------------*/ |
| chunkmd->firstDataPage = FileNonVirtualTell(rowgroup->parquetFile); |
| if (chunkmd->firstDataPage < 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("file tell position error for segment file: %s", strerror(errno)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| for (int pageno = 0; pageno < chunk->pageNumber; ++pageno) |
| { |
| flushDataPage(chunk, pageno); |
| } |
| |
| /*---------------------------------------------------------------- |
| * write out chunk's metadata after the last page |
| *----------------------------------------------------------------*/ |
| chunkmd->file_offset = FileNonVirtualTell(rowgroup->parquetFile); |
| if (chunkmd->file_offset < 0) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("file tell position error for segment file: %s", strerror(errno)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| |
| uint8_t *Thrift_ColumnMetaData_Buf; |
| uint32_t Thrift_ColumnMetaData_Len; |
| if (writeColumnChunkMetadata(&Thrift_ColumnMetaData_Buf, |
| &Thrift_ColumnMetaData_Len, |
| chunkmd) != 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("failed to serialize column metadata using thrift"))); |
| } |
| |
| bytes_added += Thrift_ColumnMetaData_Len; |
| |
| if (FileWrite(rowgroup->parquetFile, |
| (char *) Thrift_ColumnMetaData_Buf, |
| Thrift_ColumnMetaData_Len) != Thrift_ColumnMetaData_Len) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("file write error when writing out column metadata: %s", strerror(errno)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| |
| /* Add chunk compressedsize and uncompressedsize to parquet fileLen and fileLen_uncompressed*/ |
| (*fileLen) += (chunkmd->totalSize + Thrift_ColumnMetaData_Len); |
| (*fileLen_uncompressed) += (chunkmd->totalUncompressedSize + Thrift_ColumnMetaData_Len); |
| } |
| |
| int fileSync = 0; |
| MirroredAppendOnly_Flush(mirroredOpen, &fileSync); |
| if(fileSync < 0){ |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("file sync error: %s", strerror(fileSync)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| |
| rowgroup->rowGroupMetadata->totalByteSize += bytes_added; |
| parquetmd->num_rows += rowgroup->rowGroupMetadata->rowCount; |
| |
| writeRowGroupInfo(rowgroup->rowGroupMetadata, footerProtocol); |
| freeRowGroup(rowgroup); |
| } |
| |
| void |
| freeRowGroup(ParquetRowGroup rowgroup) |
| { |
| for (int i = 0; i < rowgroup->columnChunkNumber; i++) |
| { |
| pfree(rowgroup->columnChunks[i].pages); |
| |
| /* chunk metadata should be kept util parquet_insert_finish */ |
| rowgroup->columnChunks[i].columnChunkMetadata = NULL; |
| } |
| pfree(rowgroup->columnChunks); |
| |
| freeRowGroupInfo(rowgroup->rowGroupMetadata); |
| pfree(rowgroup); |
| } |
| |
| /** |
| * add a column information to row group |
| * catalog: pg_appendonly entry |
| * columnsMetadata: columnMetadata needed to be added |
| * columns: column chunks of row group needed to be initialized |
| * estimateChunkSizes: array of estimated sizes for each columnchunk |
| * field: the column description in parquet file metadata schema part |
| * colIndex: the index of the column in columnsMetadata and columns |
| * parquetFile: file to insert into |
| */ |
| void |
| addSingleColumn(AppendOnlyEntry *catalog, |
| struct ColumnChunkMetadata_4C** columnsMetadata, |
| ParquetColumnChunk columns, |
| int *estimateChunkSizes, |
| struct FileField_4C *field, |
| int *colIndex, |
| File parquetFile) |
| { |
| if (field->num_children > 0) |
| { /* for embedded types, should expand it, recursive call the function itself*/ |
| for (int i = 0; i < field->num_children; i++) { |
| addSingleColumn(catalog, columnsMetadata, columns, estimateChunkSizes, |
| &(field->children[i]), colIndex, parquetFile); |
| } |
| } |
| else |
| { /* for single column, directly add the column*/ |
| struct ColumnChunkMetadata_4C* chunkmd = |
| &((*columnsMetadata)[*colIndex]); |
| |
| /*---------------------------------------------------------------- |
| * initialize ColumnChunkMetadata |
| *----------------------------------------------------------------*/ |
| chunkmd->EncodingCount = 3; |
| chunkmd->pEncodings = palloc0(chunkmd->EncodingCount * sizeof(enum Encoding)); |
| chunkmd->pEncodings[0] = RLE; /*set definition level encoding as RLE*/ |
| chunkmd->pEncodings[1] = RLE; /*set repetition level encoding as RLE*/ |
| chunkmd->pEncodings[2] = PLAIN; /*set data encoding as PLAIN*/ |
| chunkmd->file_offset = 0; |
| chunkmd->firstDataPage = 0; |
| chunkmd->totalSize = 0; |
| chunkmd->totalUncompressedSize = 0; |
| chunkmd->valueCount = 0; |
| |
| if (catalog->compresstype == NULL) |
| { |
| chunkmd->codec = UNCOMPRESSED; |
| } |
| else |
| { |
| if (0 == strcmp(catalog->compresstype, "snappy")) |
| { |
| chunkmd->codec = SNAPPY; |
| } |
| else if (0 == strcmp(catalog->compresstype, "gzip")) |
| { |
| chunkmd->codec = GZIP; |
| } |
| #ifdef NOT_USED |
| else if (0 == strcmp(catalog->compresstype, "lzo")) |
| { |
| chunkmd->codec = LZO; |
| } |
| #endif |
| else |
| { |
| Assert(0 == strcmp(catalog->compresstype, "none")); |
| chunkmd->codec = UNCOMPRESSED; |
| } |
| } |
| |
| chunkmd->type = field->type; |
| chunkmd->hawqTypeId = field->hawqTypeId; |
| chunkmd->colName = (char *)palloc0(strlen(field->name) + 1); |
| strcpy(chunkmd->colName, field->name); |
| chunkmd->pathInSchema = (char *)palloc0(strlen(field->pathInSchema) + 1); |
| strcpy(chunkmd->pathInSchema, field->pathInSchema); |
| chunkmd->r = field->r; |
| chunkmd->d = field->d; |
| chunkmd->depth = field->depth; |
| |
| /*---------------------------------------------------------------- |
| * initialize ParquetColumnChunk |
| *----------------------------------------------------------------*/ |
| ParquetColumnChunk chunk = columns + (*colIndex); |
| chunk->columnChunkMetadata = chunkmd; |
| chunk->maxPageCount = DEFAULT_DATAPAGE_COUNT; |
| chunk->pageNumber = 0; |
| chunk->pages = palloc0(chunk->maxPageCount * sizeof(struct ParquetDataPage_S)); |
| chunk->currentPage = NULL; |
| chunk->estimateChunkSizeRemained = estimateChunkSizes[*colIndex]; |
| chunk->pageSizeLimit = catalog->pagesize; |
| chunk->compresstype = catalog->compresstype; |
| chunk->compresslevel = catalog->compresslevel; |
| chunk->parquetFile = parquetFile; |
| |
| *colIndex = *colIndex + 1; |
| } |
| } |
| |
| /* |
| * Write out a specified data page (page header + page data) |
| */ |
| static void |
| flushDataPage(ParquetColumnChunk chunk, int page_number) |
| { |
| ParquetDataPage page = &chunk->pages[page_number]; |
| Assert(page != NULL); |
| Assert(page->finalized); |
| Assert(page->header_buffer != NULL); |
| |
| /*---------------------------------------------------------------- |
| * write out thrift page header |
| *----------------------------------------------------------------*/ |
| if (FileWrite(page->parquetFile, |
| (char *) page->header_buffer, |
| page->header_len) != page->header_len) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("file write error when writing out page header: %s", strerror(errno)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| pfree(page->header_buffer); |
| |
| /*---------------------------------------------------------------- |
| * write out page data |
| *----------------------------------------------------------------*/ |
| if (FileWrite(page->parquetFile, |
| (char *) page->data, |
| page->header->compressed_page_size) != page->header->compressed_page_size) |
| { |
| ereport(ERROR, |
| (errcode_for_file_access(), |
| errmsg("file write error when writing out page data: %s", strerror(errno)), |
| errdetail("%s", HdfsGetLastError()))); |
| } |
| |
| pfree(page->header); |
| if (page->data != NULL) |
| { |
| pfree(page->data); |
| } |
| } |
| |
| /* |
| * Append value to the corresponding data page. |
| * |
| * Upon successful completion, the number of bytes which were added is returned. |
| * |
| * Otherwise ENCODE_INVALID_VALUE is returned if encoded length of `data` |
| * exceeds `pageSizeLimit`. |
| * |
| * ENCODE_OUTOF_PAGE is returned if `data` is of valid size but appending |
| * the data will make the page exceeds `pageSizeLimit`. |
| * |
| */ |
| static int |
| encodePlain(Datum data, |
| ParquetDataPage current_page, |
| int hawqTypeId, |
| int pageSizeLimit) |
| { |
| int len = 0; /* actual number of bytes added to buffer */ |
| uint8_t* dst_ptr = NULL; |
| |
| switch (hawqTypeId) |
| { |
| |
| case HAWQ_TYPE_BOOL: |
| { |
| if (approximatePageSize(current_page) >= pageSizeLimit) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| return BitPack_WriteInt(current_page->bool_values, DatumGetBool(data) ? 1 : 0); |
| } |
| |
| /*---------------------------------------------------------------- |
| * Type mapped to 4-bytes INT32/FLOAT in Parquet |
| *----------------------------------------------------------------*/ |
| case HAWQ_TYPE_INT2: |
| { |
| len = 4; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| int32 val = (int32) DatumGetInt16(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| case HAWQ_TYPE_INT4: |
| { |
| len = 4; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| int32 val = DatumGetInt32(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| case HAWQ_TYPE_DATE: |
| { |
| len = 4; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| DateADT val = DatumGetDateADT(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| case HAWQ_TYPE_FLOAT4: |
| { |
| len = 4; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| float4 val = DatumGetFloat4(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| |
| |
| /*---------------------------------------------------------------- |
| * Type mapped to 8-bytes INT64/DOUBLE in Parquet |
| *----------------------------------------------------------------*/ |
| case HAWQ_TYPE_MONEY: |
| { |
| /* |
| * Although money is represented as int64 internally, |
| * it's passed by reference. |
| */ |
| len = 8; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| Cash *cash_p = (Cash *) DatumGetPointer(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, cash_p, len); |
| return len; |
| } |
| |
| case HAWQ_TYPE_INT8: |
| { |
| len = 8; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| int64 val = DatumGetInt64(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| case HAWQ_TYPE_TIME: |
| { |
| len = 8; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| TimeADT val = DatumGetTimeADT(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| case HAWQ_TYPE_TIMESTAMP: |
| { |
| len = 8; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| Timestamp val = DatumGetTimestamp(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| case HAWQ_TYPE_TIMESTAMPTZ: |
| { |
| len = 8; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| TimestampTz val = DatumGetTimestampTz(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| case HAWQ_TYPE_FLOAT8: |
| { |
| len = 8; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| float8 val = DatumGetFloat8(data); |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &val, len); |
| return len; |
| } |
| |
| /*---------------------------------------------------------------- |
| * fixed length type, mapped to BINARY in Parquet |
| *----------------------------------------------------------------*/ |
| case HAWQ_TYPE_NAME: |
| { |
| int data_size = NAMEDATALEN; |
| NameData *name = DatumGetName(data); |
| len = 4 + data_size; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &(/*htole32(*/data_size/*)*/), 4); |
| dst_ptr += 4; |
| memcpy(dst_ptr, NameStr(*name), data_size); |
| return len; |
| } |
| case HAWQ_TYPE_TIMETZ: |
| { |
| /* |
| * timetz (12 bytes) is stored in parquet's BINARY type, |
| * that is <4-bytes-header> + <12-bytes-content> |
| */ |
| int data_size = sizeof(TimeTzADT); |
| TimeTzADT *timetz = DatumGetTimeTzADTP(data); |
| len = 4 + data_size; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &(/*htole32(*/data_size/*)*/), 4); |
| dst_ptr += 4; |
| memcpy(dst_ptr, timetz, sizeof(TimeTzADT)); |
| return len; |
| } |
| case HAWQ_TYPE_INTERVAL: |
| { |
| int data_size = sizeof(Interval); |
| Interval *interval = DatumGetIntervalP(data); |
| len = 4 + data_size; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &(/*htole32(*/data_size/*)*/), 4); |
| dst_ptr += 4; |
| memcpy(dst_ptr, interval, sizeof(Interval)); |
| return len; |
| } |
| case HAWQ_TYPE_MACADDR: |
| { |
| int data_size = 6; |
| macaddr *mac = DatumGetMacaddrP(data); |
| len = 4 + data_size; |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &(/*htole32(*/data_size/*)*/), 4); |
| dst_ptr += 4; |
| /* TODO can we just memcpy the structure? */ |
| memcpy(dst_ptr, &mac->a, sizeof(char)); |
| dst_ptr += sizeof(char); |
| memcpy(dst_ptr, &mac->b, sizeof(char)); |
| dst_ptr += sizeof(char); |
| memcpy(dst_ptr, &mac->c, sizeof(char)); |
| dst_ptr += sizeof(char); |
| memcpy(dst_ptr, &mac->d, sizeof(char)); |
| dst_ptr += sizeof(char); |
| memcpy(dst_ptr, &mac->e, sizeof(char)); |
| dst_ptr += sizeof(char); |
| memcpy(dst_ptr, &mac->f, sizeof(char)); |
| return len; |
| } |
| |
| /* |
| * variable length type, mapped to BINARY in Parquet |
| * ------------------ |
| * The following types are implemented as varlena in HAWQ, they all corresponds |
| * to BINARY in Parquet. However, we have two strategies when storing them, depends |
| * on whether we stores varlena header or not. |
| * |
| * [strategy 1] exclude varlena header: |
| * BINARY = [VARSIZE(d) - 4, VARDATA(d)] |
| * This strategy works better for text-related type because |
| * any parquet client can interprete text binary. |
| * |
| * [strategy 2] include varlena header in actual data: |
| * BINARY = [VARSIZE(d), d] |
| * This works better for HAWQ specific type like numeric because |
| * we can easily deserialize data. (just get the data part of the byte array) |
| */ |
| |
| /* these types use [strategy 1] */ |
| case HAWQ_TYPE_BYTE: |
| case HAWQ_TYPE_CHAR: |
| case HAWQ_TYPE_BPCHAR: |
| case HAWQ_TYPE_VARCHAR: |
| case HAWQ_TYPE_TEXT: |
| case HAWQ_TYPE_XML: |
| { |
| struct varlena *varlen = (struct varlena *) DatumGetPointer(data); |
| Assert(!VARATT_IS_COMPRESSED(varlen) && !VARATT_IS_EXTERNAL(varlen)); |
| |
| int puredataSize = VARSIZE_ANY_EXHDR(varlen); |
| len = puredataSize + 4; |
| |
| if (len > pageSizeLimit) |
| { |
| return ENCODE_INVALID_VALUE; |
| } |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &(/*htole32(*/puredataSize/*)*/), 4); |
| dst_ptr += 4; |
| memcpy(dst_ptr, VARDATA_ANY(varlen), puredataSize); |
| return len; |
| } |
| /* these types use [strategy 2] */ |
| case HAWQ_TYPE_BIT: |
| case HAWQ_TYPE_VARBIT: |
| case HAWQ_TYPE_NUMERIC: |
| case HAWQ_TYPE_INET: |
| case HAWQ_TYPE_CIDR: |
| { |
| struct varlena *varlen = (struct varlena *) DatumGetPointer(data); |
| Assert(!VARATT_IS_COMPRESSED(varlen) && !VARATT_IS_EXTERNAL(varlen)); |
| |
| int dataSize = VARSIZE_ANY(varlen); |
| len = dataSize + sizeof(int32); |
| |
| if (len > pageSizeLimit) |
| { |
| return ENCODE_INVALID_VALUE; |
| } |
| if (!ensureBufferCapacity(current_page, len, pageSizeLimit)) |
| { |
| return ENCODE_OUTOF_PAGE; |
| } |
| |
| dst_ptr = current_page->values_buffer + |
| current_page->header->uncompressed_page_size; |
| |
| memcpy(dst_ptr, &(/*htole32(*/dataSize/*)*/), sizeof(int32)); |
| dst_ptr += sizeof(int32); |
| memcpy(dst_ptr, varlen, dataSize); |
| return len; |
| } |
| default: |
| Insist(false); |
| break; |
| } |
| } |
| |
| |
| /* |
| * Ensure page's values_buffer is large enough to contain the new value. |
| * |
| * Return false if adding the value will make the page exceeds `pageSizeLimit`. |
| * Return true otherwise. |
| */ |
| static bool ensureBufferCapacity(ParquetDataPage page, |
| int newValueSize, int pageSizeLimit) |
| { |
| if ((approximatePageSize(page) + newValueSize) > pageSizeLimit) |
| return false; |
| |
| /* the lower bound size for values_buffer to contain the new value */ |
| int buffer_lowerbound = page->header->uncompressed_page_size + newValueSize; |
| |
| Assert(buffer_lowerbound <= pageSizeLimit); |
| |
| /* make sure buffer_lowerbound <= values_buffer_capacity <= pageSizeLimit */ |
| if (buffer_lowerbound > page->values_buffer_capacity) |
| { |
| page->values_buffer_capacity = buffer_lowerbound > page->values_buffer_capacity * 2 |
| ? buffer_lowerbound |
| : page->values_buffer_capacity * 2; |
| |
| if (page->values_buffer_capacity > pageSizeLimit) |
| page->values_buffer_capacity = pageSizeLimit; |
| |
| page->values_buffer = repalloc(page->values_buffer, page->values_buffer_capacity); |
| } |
| return true; |
| } |
| |
| /** |
| * Put the final page data (may be compressed) in page->data, and the |
| * final page header data in page->header_buffer. |
| * |
| * Return added size of uncompressed data in the current row group, which |
| * is 'unflushed rle/bitpack data' + 'page header' |
| */ |
| int |
| encodeCurrentPage(ParquetColumnChunk chunk) |
| { |
| int bytes_added; |
| ParquetDataPage current_page; |
| ParquetPageHeader header; |
| ColumnChunkMetadata chunkmd; |
| |
| bytes_added = 0; |
| current_page = chunk->currentPage; |
| header = current_page->header; |
| chunkmd = chunk->columnChunkMetadata; |
| |
| Assert(current_page != NULL); |
| |
| if (current_page->finalized) |
| return 0; |
| |
| /*---------------------------------------------------------------- |
| * Flush RLE/BitPack encoded data. Size of r and d data are |
| * accumulated into page's uncompressed_page_size in this phase. |
| * |
| * r/d = <4-bytes little-endian encoded-data-len> + <encoded-data> |
| *----------------------------------------------------------------*/ |
| if(current_page->repetition_level != NULL) |
| { |
| RLEEncoder_Flush(current_page->repetition_level); |
| bytes_added += 4 + RLEEncoder_Size(current_page->repetition_level); |
| } |
| |
| if(current_page->definition_level != NULL) |
| { |
| RLEEncoder_Flush(current_page->definition_level); |
| bytes_added += 4 + RLEEncoder_Size(current_page->definition_level); |
| } |
| |
| if (chunkmd->hawqTypeId == HAWQ_TYPE_BOOL) |
| { |
| bytes_added += BitPack_Flush(current_page->bool_values); |
| } |
| |
| header->uncompressed_page_size += bytes_added; |
| |
| /* we must make sure there is no empty page, since some compression algorithm |
| * will fail if input buffer is NULL */ |
| Assert(header->uncompressed_page_size > 0); |
| |
| /*---------------------------------------------------------------- |
| * Combine r/d/value bytes into a buffer for compressing. |
| *----------------------------------------------------------------*/ |
| StringInfoData buf; |
| /* we don't want to StringInfo to enlarge its buffer during appendXXX, |
| * however StringInfo has a trailing '\0', so we add 1 here */ |
| initStringInfoOfSize(&buf, header->uncompressed_page_size + 1); |
| |
| if(current_page->repetition_level != NULL) |
| { |
| int encoded_data_len = RLEEncoder_Size(current_page->repetition_level); |
| appendBinaryStringInfo(&buf, &(/*htole32(*/encoded_data_len/*)*/), 4); |
| appendBinaryStringInfo(&buf, |
| RLEEncoder_Data(current_page->repetition_level), |
| encoded_data_len); |
| |
| pfree(current_page->repetition_level->writer.buffer); |
| pfree(current_page->repetition_level->packBuffer); |
| pfree(current_page->repetition_level); |
| } |
| |
| if(current_page->definition_level != NULL) |
| { |
| int encoded_data_len = RLEEncoder_Size(current_page->definition_level); |
| appendBinaryStringInfo(&buf, &(/*htole32(*/encoded_data_len/*)*/), 4); |
| appendBinaryStringInfo(&buf, |
| RLEEncoder_Data(current_page->definition_level), |
| encoded_data_len); |
| |
| pfree(current_page->definition_level->writer.buffer); |
| pfree(current_page->definition_level->packBuffer); |
| pfree(current_page->definition_level); |
| } |
| |
| if (chunkmd->hawqTypeId == HAWQ_TYPE_BOOL) |
| { |
| appendBinaryStringInfo(&buf, |
| BitPack_Data(current_page->bool_values), |
| BitPack_Size(current_page->bool_values)); |
| |
| pfree(current_page->bool_values->buffer); |
| pfree(current_page->bool_values); |
| } |
| else |
| { |
| appendBinaryStringInfo(&buf, |
| current_page->values_buffer, |
| header->uncompressed_page_size - buf.len); |
| |
| pfree(current_page->values_buffer); |
| } |
| |
| /*---------------------------------------------------------------- |
| * Compress page data if needed, saved it to current_page->data. |
| *----------------------------------------------------------------*/ |
| switch (chunkmd->codec) |
| { |
| case UNCOMPRESSED: |
| { |
| current_page->data = (uint8_t*) buf.data; |
| header->compressed_page_size = header->uncompressed_page_size; |
| break; |
| } |
| |
| case SNAPPY: |
| { |
| size_t compressedLen = snappy_max_compressed_length(header->uncompressed_page_size); |
| current_page->data = (uint8_t *) palloc(compressedLen); |
| |
| if (snappy_compress(buf.data, header->uncompressed_page_size, |
| (char *)current_page->data, &compressedLen) == SNAPPY_OK) |
| { |
| pfree(buf.data); |
| header->compressed_page_size = compressedLen; |
| } |
| else |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("snappy compression failed: %s", (char *)current_page->data))); |
| } |
| |
| break; |
| } |
| case GZIP: |
| { |
| int ret; |
| /* 15(default windowBits for deflate) + 16(ouput GZIP header/tailer) */ |
| const int windowbits = 31; |
| |
| z_stream stream; |
| stream.zalloc = Z_NULL; |
| stream.zfree = Z_NULL; |
| stream.opaque = Z_NULL; |
| stream.avail_in = header->uncompressed_page_size; |
| stream.next_in = (Bytef *) buf.data; |
| |
| ret = deflateInit2(&stream, chunk->compresslevel, Z_DEFLATED, |
| windowbits, MAX_MEM_LEVEL, Z_DEFAULT_STRATEGY); |
| if (ret != Z_OK) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("zlib deflateInit2 failed: %s", stream.msg))); |
| } |
| |
| size_t compressedLen = header->uncompressed_page_size; |
| current_page->data = (uint8_t *) palloc(compressedLen); |
| |
| Bytef *out = (Bytef *) current_page->data; |
| int outlen = compressedLen; |
| |
| /* process until all inputs have been compressed */ |
| do |
| { |
| stream.next_out = out; |
| stream.avail_out = outlen; |
| |
| ret = deflate(&stream, Z_FINISH); |
| if (ret == Z_STREAM_END) |
| break; |
| if (ret == Z_OK) |
| { |
| /* out buffer is not big enough, extend 4096 byte at a time */ |
| outlen = 4096; |
| current_page->data = repalloc(current_page->data, compressedLen + outlen); |
| out = current_page->data + compressedLen; |
| compressedLen += outlen; |
| } |
| else |
| { |
| deflateEnd(&stream); |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("zlib deflate failed: %s", stream.msg))); |
| } |
| |
| } while (1); |
| |
| compressedLen = stream.total_out; |
| deflateEnd(&stream); |
| |
| pfree(buf.data); |
| header->compressed_page_size = compressedLen; |
| break; |
| } |
| #ifdef NOT_USED |
| case LZO: |
| /* TODO*/ |
| Insist(false); |
| break; |
| #endif |
| default: |
| Insist(false); /* shouldn't get here */ |
| break; |
| } |
| |
| /*---------------------------------------------------------------- |
| * All fields of page header are filled, convert to binary page |
| * header in thrift. |
| *----------------------------------------------------------------*/ |
| uint8_t* header_buffer = NULL; |
| if (writePageMetadata(&header_buffer, (uint32_t *) ¤t_page->header_len, |
| current_page->header) < 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_GP_INTERNAL_ERROR), |
| errmsg("failed to serialize page metadata using thrift for column: %s", chunkmd->colName))); |
| } |
| |
| current_page->header_buffer = (uint8_t *) palloc0(current_page->header_len); |
| memcpy(current_page->header_buffer, header_buffer, current_page->header_len); |
| |
| chunkmd->totalUncompressedSize += current_page->header_len + header->uncompressed_page_size; |
| chunkmd->totalSize += current_page->header_len + header->compressed_page_size; |
| |
| current_page->finalized = true; |
| |
| return bytes_added; |
| } |
| |
| static void |
| addDataPage(ParquetColumnChunk chunk) |
| { |
| if (chunk->pageNumber >= chunk->maxPageCount) |
| { |
| chunk->pages = repalloc(chunk->pages, |
| 2 * chunk->maxPageCount * sizeof(struct ParquetDataPage_S)); |
| /* make sure all allocated page memory are zero-filled */ |
| memset(chunk->pages + chunk->maxPageCount, 0, chunk->maxPageCount * sizeof(struct ParquetDataPage_S)); |
| chunk->maxPageCount *= 2; |
| } |
| |
| chunk->currentPage = &chunk->pages[chunk->pageNumber]; |
| chunk->currentPage->data = NULL; |
| chunk->currentPage->finalized = false; |
| chunk->currentPage->header = (ParquetPageHeader) palloc0(sizeof(PageMetadata_4C)); |
| chunk->currentPage->header->page_type = DATA_PAGE; |
| chunk->currentPage->header->definition_level_encoding = RLE; |
| chunk->currentPage->header->repetition_level_encoding = RLE; |
| chunk->currentPage->parquetFile = chunk->parquetFile; |
| |
| if (chunk->columnChunkMetadata->d != 0) |
| { |
| chunk->currentPage->definition_level = palloc0(sizeof(RLEEncoder)); |
| |
| RLEEncoder_Init(chunk->currentPage->definition_level, |
| widthFromMaxInt(chunk->columnChunkMetadata->d)); |
| } |
| if (chunk->columnChunkMetadata->r != 0) |
| { |
| chunk->currentPage->repetition_level = palloc0(sizeof(RLEEncoder)); |
| |
| RLEEncoder_Init(chunk->currentPage->repetition_level, |
| widthFromMaxInt(chunk->columnChunkMetadata->r)); |
| } |
| /* use BIT_PACK encoding for bool column */ |
| if (chunk->columnChunkMetadata->type == BOOLEAN) |
| { |
| chunk->currentPage->bool_values = palloc0(sizeof(ByteBasedBitPackingEncoder)); |
| BitPack_InitEncoder(chunk->currentPage->bool_values, /*bitWidth=*/1); |
| } |
| else |
| { |
| int max_buffer_size = chunk->pageSizeLimit; |
| int min_buffer_size = 512; |
| if (chunk->estimateChunkSizeRemained > max_buffer_size) |
| { |
| chunk->currentPage->values_buffer_capacity = max_buffer_size; |
| chunk->estimateChunkSizeRemained -= max_buffer_size; |
| } |
| else if (chunk->estimateChunkSizeRemained < min_buffer_size) |
| { |
| chunk->currentPage->values_buffer_capacity = min_buffer_size; |
| } |
| else |
| { |
| chunk->currentPage->values_buffer_capacity = chunk->estimateChunkSizeRemained; |
| } |
| chunk->currentPage->values_buffer = palloc0(chunk->currentPage->values_buffer_capacity); |
| } |
| |
| chunk->pageNumber++; |
| } |
| |
| int |
| appendParquetColumnNull(ParquetColumnChunk columnChunk) |
| { |
| int bytes_added = 0; |
| |
| /*if page is null, initialize a new page*/ |
| if ((columnChunk->pageNumber == 0) || (columnChunk->currentPage == NULL)) { |
| addDataPage(columnChunk); |
| } |
| |
| /* If page size exceeds limit, finalize current data page and add a new one*/ |
| if (approximatePageSize(columnChunk->currentPage) >= columnChunk->pageSizeLimit) |
| { |
| bytes_added += finalizeCurrentAndNewPage(columnChunk); |
| } |
| |
| Assert(columnChunk->currentPage->definition_level != NULL); |
| RLEEncoder_WriteInt(columnChunk->currentPage->definition_level, 0); |
| |
| if (columnChunk->currentPage->repetition_level != NULL) |
| { |
| RLEEncoder_WriteInt(columnChunk->currentPage->repetition_level, 0); |
| } |
| |
| columnChunk->currentPage->header->num_values++; |
| columnChunk->columnChunkMetadata->valueCount++; |
| return bytes_added; |
| } |
| |
| /** |
| * Finalize current data page, and then add a new page |
| * @columnChunk: The column chunk which needs to add data |
| * @bytes_added: The number of bytes added |
| * |
| * return uncompressed bytes added to current row group |
| */ |
| int |
| finalizeCurrentAndNewPage(ParquetColumnChunk columnChunk) |
| { |
| int bytes_added = encodeCurrentPage(columnChunk); |
| |
| /*add a new page*/ |
| addDataPage(columnChunk); |
| |
| return bytes_added; |
| } |
| |
| /** |
| * add a value to a column. includes: adding r to repetition level; adding d to definition level; |
| * adding the value itself to page data section |
| * @chunk: the column writer which needs to write data |
| * @value: the value needed to be inserted |
| * @r: the repetition level for the value |
| * @d: the definition level for the value |
| * |
| * return uncompressed bytes added to current row group |
| */ |
| int |
| appendParquetColumnValue(ParquetColumnChunk chunk, |
| Datum value, |
| int r, |
| int d) |
| { |
| int bytes_added = 0; |
| int encoded_len = 0; |
| |
| /*if page is null, initialize a new page*/ |
| if ((chunk->pageNumber == 0) || (chunk->currentPage == NULL)) |
| { |
| addDataPage(chunk); |
| } |
| |
| encoded_len = encodePlain(value, |
| chunk->currentPage, |
| chunk->columnChunkMetadata->hawqTypeId, |
| chunk->pageSizeLimit); |
| |
| if (encoded_len == ENCODE_INVALID_VALUE) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("value for column \"%s\" exceeds pagesize %d!", |
| chunk->columnChunkMetadata->colName, chunk->pageSizeLimit))); |
| } |
| |
| if (encoded_len == ENCODE_OUTOF_PAGE) |
| { |
| bytes_added += finalizeCurrentAndNewPage(chunk); |
| encoded_len = encodePlain(value, |
| chunk->currentPage, |
| chunk->columnChunkMetadata->hawqTypeId, |
| chunk->pageSizeLimit); |
| |
| } |
| |
| bytes_added += encoded_len; |
| |
| if (chunk->currentPage->repetition_level != NULL) |
| { |
| RLEEncoder_WriteInt(chunk->currentPage->repetition_level, r); |
| } |
| if (chunk->currentPage->definition_level != NULL) |
| { |
| RLEEncoder_WriteInt(chunk->currentPage->definition_level, d); |
| } |
| |
| chunk->currentPage->header->num_values++; |
| chunk->currentPage->header->uncompressed_page_size += encoded_len; |
| |
| chunk->columnChunkMetadata->valueCount++; |
| |
| return bytes_added; |
| } |
| |
| /* |
| * Append null for field. |
| * |
| * We don't consider UDT currently, therefore we don't have intermediate |
| * null value. If one table's attribute is null, all its corresponding |
| * columns are null, having (r,d) == (0,0). |
| * |
| * Return uncompressed bytes added to current row group. |
| */ |
| int |
| appendNullForFields(struct FileField_4C *field, |
| ParquetColumnChunk columnChunks, |
| int *colIndex) |
| { |
| int bytes_added = 0; |
| |
| if (field->num_children == 0) |
| { |
| bytes_added += appendParquetColumnNull(&columnChunks[*colIndex]); |
| *colIndex = *colIndex + 1; |
| } |
| else |
| { |
| for (int i = 0; i < field->num_children; i++) |
| { |
| bytes_added += appendNullForFields(&field->children[i], columnChunks, colIndex); |
| } |
| } |
| |
| return bytes_added; |
| } |
| |
| /** |
| * Append the value of a hawq field to parquet columns chunks. Should consider embedded types |
| * repetition level and definition level calculation. |
| * @field: the hawq field for the value |
| * @columnChunk: the parquet column chunks needed to be inserted into |
| * @colIndex: current parquet column chunk index |
| * @value: the value needed to be inserted |
| */ |
| int |
| appendValueForFields(struct FileField_4C *field, |
| ParquetColumnChunk columnChunks, |
| int *colIndex, |
| Datum value) |
| { |
| int bytes_added = 0; |
| |
| /*primitive type, r = 0, d = field->definition_level*/ |
| if (field->num_children == 0) |
| { |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| value, |
| 0, field->d); |
| *colIndex = *colIndex + 1; |
| } |
| else |
| { |
| switch (field->hawqTypeId) |
| { |
| /* HAWQ built-in embeded type */ |
| case HAWQ_TYPE_POINT: |
| bytes_added += appendParquetColumn_Point(columnChunks, |
| colIndex, |
| DatumGetPointP(value), |
| 0, field->d); |
| break; |
| case HAWQ_TYPE_LSEG: |
| bytes_added += appendParquetColumn_Lseg(columnChunks, |
| colIndex, |
| DatumGetLsegP(value), |
| 0, field->d); |
| break; |
| case HAWQ_TYPE_PATH: |
| { |
| PATH *path = DatumGetPathP(value); |
| bytes_added += appendParquetColumn_Path(columnChunks, colIndex, |
| path, 0, field->d); |
| if(VARATT_IS_EXTENDED((struct varlena *) DatumGetPointer(value))) |
| pfree(path); |
| break; |
| } |
| case HAWQ_TYPE_BOX: |
| bytes_added += appendParquetColumn_Box(columnChunks, |
| colIndex, |
| DatumGetBoxP(value), |
| 0, field->d); |
| break; |
| case HAWQ_TYPE_POLYGON: |
| { |
| POLYGON *polygon = DatumGetPolygonP(value); |
| bytes_added += appendParquetColumn_Polygon(columnChunks, |
| colIndex, |
| polygon, |
| 0, field->d); |
| if(VARATT_IS_EXTENDED((struct varlena *) DatumGetPointer(value))) |
| pfree(polygon); |
| break; |
| } |
| case HAWQ_TYPE_CIRCLE: |
| bytes_added += appendParquetColumn_Circle(columnChunks, |
| colIndex, |
| DatumGetCircleP(value), |
| 0, field->d); |
| break; |
| |
| default: |
| /* TODO array type */ |
| /* TODO UDT */ |
| Insist(false); |
| break; |
| |
| } |
| } |
| |
| return bytes_added; |
| } |
| |
| int |
| appendParquetColumn_Point(ParquetColumnChunk columnChunks, int *colIndex, |
| Point *point, int r, int d) |
| { |
| int bytes_added = 0; |
| |
| /* x and y are required, there both r and d remains unchanged. */ |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(point->x), r, d); |
| *colIndex = *colIndex + 1; |
| |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(point->y), r, d); |
| *colIndex = *colIndex + 1; |
| |
| return bytes_added; |
| } |
| |
| int |
| appendParquetColumn_Lseg(ParquetColumnChunk columnChunks, int *colIndex, |
| LSEG *lseg, int r, int d) |
| { |
| int bytes_added = 0; |
| |
| bytes_added += appendParquetColumn_Point(columnChunks, colIndex, lseg->p, r, d); |
| bytes_added += appendParquetColumn_Point(columnChunks, colIndex, lseg->p + 1, r, d); |
| |
| return bytes_added; |
| } |
| |
| int |
| appendParquetColumn_Path(ParquetColumnChunk columnChunks, int *colIndex, |
| PATH *path, int r, int d) |
| { |
| int i; |
| int bytes_added = 0; |
| bool is_open = !path->closed; |
| |
| /* append is_open column */ |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| BoolGetDatum(is_open), r, d); |
| *colIndex += 1; |
| |
| /* append points.x column */ |
| for (i = 0; i < path->npts; ++i) |
| { |
| if (i == 0) |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(path->p[i].x), |
| 0, d + 1); |
| else |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(path->p[i].x), |
| r + 1, d + 1); |
| } |
| *colIndex += 1; |
| |
| /* append points.y column */ |
| for (i = 0; i < path->npts; ++i) |
| { |
| if (i == 0) |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(path->p[i].y), |
| 0, d + 1); |
| else |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(path->p[i].y), |
| r + 1, d + 1); |
| } |
| *colIndex += 1; |
| |
| return bytes_added; |
| } |
| |
| int |
| appendParquetColumn_Box(ParquetColumnChunk columnChunks, int *colIndex, |
| BOX *box, int r, int d) |
| { |
| int bytes_added = 0; |
| |
| bytes_added += appendParquetColumn_Point(columnChunks, colIndex, &box->high, r, d); |
| bytes_added += appendParquetColumn_Point(columnChunks, colIndex, &box->low, r, d); |
| |
| return bytes_added; |
| } |
| |
| int |
| appendParquetColumn_Polygon(ParquetColumnChunk columnChunks, int *colIndex, |
| POLYGON *polygon, int r, int d) |
| { |
| int i; |
| int bytes_added = 0; |
| |
| /* append boundbox:{x1,y1,x2,y2} columns */ |
| bytes_added += appendParquetColumn_Box(columnChunks, colIndex, &polygon->boundbox, r, d); |
| |
| /* append points:x column */ |
| for (i = 0; i < polygon->npts; ++i) |
| { |
| if (i == 0) |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(polygon->p[i].x), |
| 0, d + 1); |
| else |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(polygon->p[i].x), |
| r + 1, d + 1); |
| } |
| *colIndex += 1; |
| |
| /* append points:y column */ |
| for (i = 0; i < polygon->npts; ++i) |
| { |
| if (i == 0) |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(polygon->p[i].y), |
| 0, d + 1); |
| else |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), |
| Float8GetDatum(polygon->p[i].y), |
| r + 1, d + 1); |
| } |
| *colIndex += 1; |
| |
| return bytes_added; |
| } |
| |
| int |
| appendParquetColumn_Circle(ParquetColumnChunk columnChunks, int *colIndex, |
| CIRCLE *circle, |
| int r, int d) |
| { |
| int bytes_added = 0; |
| |
| bytes_added += appendParquetColumn_Point(columnChunks, colIndex, &circle->center, r, d); |
| bytes_added += appendParquetColumnValue(columnChunks + (*colIndex), Float8GetDatum(circle->radius), r, d); |
| *colIndex += 1; |
| |
| return bytes_added; |
| } |
| |
| void |
| appendRowValue(ParquetRowGroup rowgroup, |
| ParquetMetadata parquetmd, |
| Datum* values, bool* nulls) |
| { |
| int bytes_added = 0; |
| |
| /* |
| * Append row value column by column. |
| * |
| * One table's column may corresponds to multiple parquet columns |
| * due to nested data type like point, array, UDF, etc. |
| * |
| */ |
| int colIndex = 0; |
| for (int i = 0; i < parquetmd->fieldCount; i++) |
| { |
| /* for null value, we insert definition level to underlying columns */ |
| if (nulls[i]) |
| { |
| bytes_added += appendNullForFields(&(parquetmd->pfield[i]), |
| rowgroup->columnChunks, |
| &colIndex); |
| } |
| /* otherwise the actual value is written, possibly along with r/d */ |
| else |
| { |
| bytes_added += appendValueForFields(&parquetmd->pfield[i], |
| rowgroup->columnChunks, |
| &colIndex, |
| values[i]); |
| } |
| } |
| Assert(colIndex == parquetmd->colCount); |
| |
| rowgroup->rowGroupMetadata->totalByteSize += bytes_added; |
| rowgroup->rowGroupMetadata->rowCount++; |
| } |
| |
| /* |
| * getTypeName |
| * get name of a type |
| * |
| * Note: any associated array type is *not* renamed; caller must make |
| * another call to handle that case. Currently this is only used for |
| * renaming types associated with tables, for which there are no arrays. |
| */ |
| char * |
| getTypeName(Oid typeOid) { |
| Relation pg_type_desc; |
| HeapTuple tuple; |
| Form_pg_type form; |
| cqContext *pcqCtx; |
| cqContext cqc; |
| char *typeName; |
| pg_type_desc = heap_open(TypeRelationId, RowExclusiveLock); |
| |
| pcqCtx = caql_addrel(cqclr(&cqc), pg_type_desc); |
| |
| tuple = caql_getfirst( |
| pcqCtx, |
| cql("SELECT typname FROM pg_type " |
| " WHERE oid = :1 ", |
| ObjectIdGetDatum(typeOid))); |
| |
| if (!HeapTupleIsValid(tuple)) |
| ereport(ERROR, |
| (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("type with OID \"%d\" does not exist", typeOid))); |
| |
| form = (Form_pg_type) GETSTRUCT(tuple); |
| typeName = (char*) palloc0(strlen(form->typname.data) + 1); |
| memcpy(typeName, form->typname.data, strlen(form->typname.data)); |
| |
| heap_freetuple(tuple); |
| heap_close(pg_type_desc, RowExclusiveLock); |
| return typeName; |
| } |
| |
| /* |
| * Before finalize a page, we cannot know the exact number of |
| * uncompressed size of a page, due to rle/bitpack encoder buffers |
| * some input value. |
| * |
| * This producure returns an approximate uncompressed page size which |
| * mey be a little smaller than the actual size. |
| */ |
| int |
| approximatePageSize(ParquetDataPage page) |
| { |
| int size = page->header->uncompressed_page_size; |
| |
| if (page->repetition_level != NULL) |
| size += RLEEncoder_Size(page->repetition_level); |
| |
| if (page->definition_level != NULL) |
| size += RLEEncoder_Size(page->definition_level); |
| |
| return size; |
| } |