blob: 083f669e5ab1bf8696beb1b821a1531d65cdfa1a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* cdbparquetcolumn.c
*
* Created on: Sep 29, 2013
* Author: malili
*/
#include "cdb/cdbparquetcolumn.h"
#include "cdb/cdbparquetrleencoder.h"
#include "utils/inet.h"
#include "utils/date.h"
#include "utils/timestamp.h"
#include "utils/geo_decls.h"
#include "utils/memutils.h"
#include "snappy-c.h"
#include "zlib.h"
#define BUFFER_SCALE_FACTOR 1.2
#define BUFFER_SIZE_LIMIT_BEFORE_SCALED ((Size) ((MaxAllocSize) * 1.0 / (BUFFER_SCALE_FACTOR)))
static void consume(ParquetColumnReader *columnReader);
static void readRepetitionAndDefinitionLevels(ParquetColumnReader *columnReader);
static void decodeCurrentPage(ParquetColumnReader *columnReader);
static bool decodePlain(Datum *value, uint8_t **buffer, int hawqTypeID);
/* return size of PATH struct given number of points in it */
static inline int get_path_size(int npts) { return offsetof(PATH, p[0]) + sizeof(Point) * npts; }
/* return size of POLYGON struct given number of points in it */
static inline int get_polygon_size(int npts) { return offsetof(POLYGON, p[0]) + sizeof(Point) * npts; }
void
ParquetExecutorReadColumn(ParquetColumnReader *columnReader, File file)
{
struct ColumnChunkMetadata_4C* columnChunkMetadata = columnReader->columnMetadata;
int64 firstPageOffset = columnChunkMetadata->firstDataPage;
int64 columnChunkSize = columnChunkMetadata->totalSize;
if ( columnChunkSize > MaxAllocSize )
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("parquet storage read error on reading column %s due to too large column chunk size: " INT64_FORMAT,
columnChunkMetadata->colName, columnChunkSize)));
}
int64 actualReadSize = 0;
MemoryContext oldContext = MemoryContextSwitchTo(columnReader->memoryContext);
/*reuse the column reader data buffer to avoid memory re-allocation*/
if(columnReader->dataLen == 0)
{
columnReader->dataLen = columnChunkSize < BUFFER_SIZE_LIMIT_BEFORE_SCALED ?
columnChunkSize * BUFFER_SCALE_FACTOR :
MaxAllocSize-1;
columnReader->dataBuffer = (char*) palloc0(columnReader->dataLen);
}
else if(columnReader->dataLen < columnChunkSize)
{
columnReader->dataLen = columnChunkSize < BUFFER_SIZE_LIMIT_BEFORE_SCALED ?
columnChunkSize * BUFFER_SCALE_FACTOR :
MaxAllocSize-1;
columnReader->dataBuffer = (char*) repalloc(columnReader->dataBuffer, columnReader->dataLen);
memset(columnReader->dataBuffer, 0, columnReader->dataLen);
}
char *buffer = columnReader->dataBuffer;
int64 numValuesInColumnChunk = columnChunkMetadata->valueCount;
int64 numValuesProcessed = 0;
/*seek to the beginning of the column chunk*/
int64 seekResult = FileSeek(file, firstPageOffset, SEEK_SET);
if (seekResult != firstPageOffset)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("file seek error to position " INT64_FORMAT ": %s", firstPageOffset, strerror(errno)),
errdetail("%s", HdfsGetLastError())));
}
/*recursively read, until get the total column chunk data out*/
while(actualReadSize < columnChunkSize)
{
/*read out all the buffer of the column chunk*/
int columnChunkLen = FileRead(file, buffer + actualReadSize, columnChunkSize - actualReadSize);
if (columnChunkLen < 0) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("parquet storage read error on reading column %s ", columnChunkMetadata->colName),
errdetail("%s", HdfsGetLastError())));
}
actualReadSize += columnChunkLen;
}
/*only if first column reader set, just need palloc the data pages*/
if(columnReader->dataPageCapacity == 0)
{
columnReader->dataPageCapacity = DAFAULT_DATAPAGE_NUM_PER_COLUMNCHUNK;
columnReader->dataPageNum = 0;
columnReader->dataPages = (ParquetDataPage)palloc0
(columnReader->dataPageCapacity *sizeof(struct ParquetDataPage_S));
}
/* read all the data pages of the column chunk */
while(numValuesProcessed < numValuesInColumnChunk)
{
ParquetPageHeader pageHeader;
ParquetDataPage dataPage;
uint32_t header_size = (char *) columnReader->dataBuffer + columnChunkSize - buffer;
if (readPageMetadata((uint8_t*) buffer, &header_size, /*compact*/1, &pageHeader) < 0)
{
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("thrift deserialize failure on reading page header of column %s ",
columnChunkMetadata->colName)));
}
buffer += header_size;
/*just process data page now*/
if(pageHeader->page_type != DATA_PAGE){
if(pageHeader->page_type == DICTIONARY_PAGE) {
ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("HAWQ does not support dictionary page type resolver for Parquet format in column \'%s\' ",
columnChunkMetadata->colName)));
}
buffer += pageHeader->compressed_page_size;
continue;
}
if(columnReader->dataPageNum == columnReader->dataPageCapacity)
{
columnReader->dataPages = (ParquetDataPage)repalloc(
columnReader->dataPages,
2 * columnReader->dataPageCapacity * sizeof(struct ParquetDataPage_S));
memset(columnReader->dataPages + columnReader->dataPageCapacity, 0,
columnReader->dataPageCapacity * sizeof(struct ParquetDataPage_S));
columnReader->dataPageCapacity *= 2;
}
dataPage = &(columnReader->dataPages[columnReader->dataPageNum]);
dataPage->header = pageHeader;
dataPage->data = (uint8_t *) buffer;
buffer += pageHeader->compressed_page_size;
numValuesProcessed += pageHeader->num_values;
columnReader->dataPageNum++;
}
MemoryContextSwitchTo(oldContext);
columnReader->currentPageValueRemained = 0; /* indicate to read next page */
columnReader->dataPageProcessed = 0;
if (columnChunkMetadata->r > 0)
{
consume(columnReader);
}
}
/*
* End the current value, move to next r/d/value.
* Should be called after current value is read.
*/
static void
consume(ParquetColumnReader *columnReader)
{
/* make sure we have values to read in current page */
if (columnReader->currentPageValueRemained == 0)
{
if (columnReader->dataPageProcessed >= columnReader->dataPageNum)
{
/* next r must be 0 when reached chunk end */
columnReader->repetitionLevel = 0;
return;
}
/* read next page */
columnReader->currentPage = &columnReader->dataPages[columnReader->dataPageProcessed];
decodeCurrentPage(columnReader);
columnReader->currentPageValueRemained = columnReader->currentPage->header->num_values;
columnReader->dataPageProcessed++;
}
readRepetitionAndDefinitionLevels(columnReader);
}
static void
readRepetitionAndDefinitionLevels(ParquetColumnReader *reader)
{
if (reader->currentPage->repetition_level_reader)
{
reader->repetitionLevel =
RLEDecoder_ReadInt(reader->currentPage->repetition_level_reader);
}
if (reader->currentPage->definition_level_reader)
{
reader->definitionLevel =
RLEDecoder_ReadInt(reader->currentPage->definition_level_reader);
}
reader->currentPageValueRemained--;
}
/*
* Decode raw data of chunk's current page into corresponding r/d/bool reader
* and values_buffer, uncompress data if needed.
*/
static void
decodeCurrentPage(ParquetColumnReader *columnReader)
{
ColumnChunkMetadata_4C *chunkmd;
ParquetDataPage page;
ParquetPageHeader header;
uint8_t *buf; /* store uncompressed or decompressed page data */
MemoryContext oldContext;
chunkmd = columnReader->columnMetadata;
page = columnReader->currentPage;
header = page->header;
oldContext = MemoryContextSwitchTo(columnReader->memoryContext);
/*----------------------------------------------------------------
* Decompress raw data. After this, buf & page->data points to
* uncompressed/decompressed data.
*----------------------------------------------------------------*/
if (chunkmd->codec == UNCOMPRESSED)
{
buf = page->data;
}
else
{
/*
* make `buf` points to decompressed buffer,
* which should be large enough for uncompressed data.
*/
if (chunkmd->r > 0)
{
/* repeatable column creates decompression buffer for each page */
buf = palloc0(header->uncompressed_page_size);
}
else
{
/* non-repeatable column reuses pageBuffer for decompression */
if (columnReader->pageBuffer == NULL)
{
columnReader->pageBufferLen = header->uncompressed_page_size * BUFFER_SCALE_FACTOR;
columnReader->pageBuffer = palloc0(columnReader->pageBufferLen);
}
else if (columnReader->pageBufferLen < header->uncompressed_page_size)
{
columnReader->pageBufferLen = header->uncompressed_page_size * BUFFER_SCALE_FACTOR;
columnReader->pageBuffer = repalloc(columnReader->pageBuffer, columnReader->pageBufferLen);
}
buf = (uint8_t *) columnReader->pageBuffer;
}
/*
* call corresponding decompress routine
*/
switch (chunkmd->codec)
{
case SNAPPY:
{
size_t uncompressedLen;
if (snappy_uncompressed_length((char *) page->data,
header->compressed_page_size,
&uncompressedLen) != SNAPPY_OK)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("invalid snappy compressed data for column %s, page number %d",
chunkmd->colName, columnReader->dataPageProcessed)));
}
Insist(uncompressedLen == header->uncompressed_page_size);
if (snappy_uncompress((char *) page->data, header->compressed_page_size,
(char *) buf, &uncompressedLen) != SNAPPY_OK)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("failed to decompress snappy data for column %s, page number %d, "
"uncompressed size %d, compressed size %d",
chunkmd->colName, columnReader->dataPageProcessed,
header->uncompressed_page_size, header->compressed_page_size)));
}
page->data = buf;
break;
}
case GZIP:
{
int ret;
/* 15(default windowBits for deflate) + 16(ouput GZIP header/tailer) */
const int windowbits = 31;
z_stream stream;
stream.zalloc = Z_NULL;
stream.zfree = Z_NULL;
stream.opaque = Z_NULL;
stream.avail_in = header->compressed_page_size;
stream.next_in = (Bytef *) page->data;
ret = inflateInit2(&stream, windowbits);
if (ret != Z_OK)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("zlib inflateInit2 failed: %s", stream.msg)));
}
size_t uncompressedLen = header->uncompressed_page_size;
stream.avail_out = uncompressedLen;
stream.next_out = (Bytef *) buf;
ret = inflate(&stream, Z_FINISH);
if (ret != Z_STREAM_END)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("zlib inflate failed: %s", stream.msg)));
}
/* should fill all uncompressed_page_size bytes */
Assert(stream.avail_out == 0);
inflateEnd(&stream);
page->data = buf;
break;
}
case LZO:
/* TODO */
Insist(false);
break;
default:
Insist(false);
break;
}
}
/*----------------------------------------------------------------
* get r/d/value part
*----------------------------------------------------------------*/
if(chunkmd->r != 0)
{
int num_repetition_bytes = /*le32toh(*/ *((uint32_t *) buf) /*)*/;
buf += 4;
page->repetition_level_reader = (RLEDecoder *) palloc0(sizeof(RLEDecoder));
RLEDecoder_Init(page->repetition_level_reader,
widthFromMaxInt(chunkmd->r),
buf,
num_repetition_bytes);
buf += num_repetition_bytes;
}
if(chunkmd->d != 0)
{
int num_definition_bytes = /*le32toh(*/ *((uint32_t *) buf) /*)*/;
buf += 4;
page->definition_level_reader = (RLEDecoder *) palloc0(sizeof(RLEDecoder));
RLEDecoder_Init(page->definition_level_reader,
widthFromMaxInt(chunkmd->d),
buf,
num_definition_bytes);
buf += num_definition_bytes;
}
if (chunkmd->type == BOOLEAN)
{
page->bool_values_reader = (ByteBasedBitPackingDecoder *)
palloc0(sizeof(ByteBasedBitPackingDecoder));
BitPack_InitDecoder(page->bool_values_reader, buf, /*bitwidth=*/1);
}
else
{
page->values_buffer = buf;
}
MemoryContextSwitchTo(oldContext);
}
/**
* Read the value from a certain columnReader, the value will be embedded in value,
* and if the value is null, the null field should be true
* @columnReader the column which needs to be read
* @value used to store the value of the record
* @null used to record whether the value is null
*/
void
ParquetColumnReader_readValue(
ParquetColumnReader *columnReader,
Datum *value,
bool *null,
int hawqTypeID)
{
/*
* for non-repeatable column, because of using shared `pageBuffer`,
* consume should be called after last value is truely returned.
*/
if (columnReader->columnMetadata->r == 0)
{
consume(columnReader);
}
/* current definition level is used to determine null value */
if (CurrentDefinitionLevel(columnReader) < columnReader->columnMetadata->d)
{
*null = true;
}
else
{
*null = false;
if (hawqTypeID == HAWQ_TYPE_BOOL)
{
*value = BoolGetDatum((bool) BitPack_ReadInt(columnReader->currentPage->bool_values_reader));
}
else
{
decodePlain(value, &(columnReader->currentPage->values_buffer), hawqTypeID);
}
}
/*
* for repeatable column, need to preread next value's r
*/
if (columnReader->columnMetadata->r > 0)
{
consume(columnReader);
}
}
static bool
decodePlain(Datum *value, uint8_t **buffer, int hawqTypeID)
{
switch(hawqTypeID)
{
case HAWQ_TYPE_INT2:
case HAWQ_TYPE_INT4:
case HAWQ_TYPE_DATE:
case HAWQ_TYPE_FLOAT4:
{
*value = *((int32_t*)(*buffer));
(*buffer) += 4;
break;
}
case HAWQ_TYPE_MONEY:
{
/*
* money is a pass-by-ref type, the return Datum
* should be a pointer, see cash.c
*/
(*value) = PointerGetDatum(*buffer);
(*buffer) += 8;
break;
}
case HAWQ_TYPE_INT8:
case HAWQ_TYPE_TIME:
case HAWQ_TYPE_TIMESTAMPTZ:
case HAWQ_TYPE_TIMESTAMP:
case HAWQ_TYPE_FLOAT8:
{
*value = *((int64_t*)(*buffer));
(*buffer) += 8;
break;
}
/*----------------------------------------------------------------
* fixed length type, mapped to BINARY in Parquet
*----------------------------------------------------------------*/
case HAWQ_TYPE_NAME:
case HAWQ_TYPE_TIMETZ:
case HAWQ_TYPE_INTERVAL:
{
int datalen = /*le32toh(*/*((int32_t*)(*buffer))/*)*/;
(*buffer) += 4;
*value = PointerGetDatum(*buffer);
(*buffer) += datalen;
break;
}
case HAWQ_TYPE_MACADDR:
{
(*buffer) += 4; /* skip BINARY header */
macaddr *result = (macaddr *)palloc0(sizeof(macaddr));
result->a = *((char*)(*buffer));
(*buffer) += sizeof(char);
result->b = *((char*)(*buffer));
(*buffer) += sizeof(char);
result->c = *((char*)(*buffer));
(*buffer) += sizeof(char);
result->d = *((char*)(*buffer));
(*buffer) += sizeof(char);
result->e = *((char*)(*buffer));
(*buffer) += sizeof(char);
result->f = *((char*)(*buffer));
(*buffer) += sizeof(char);
(*value) = PointerGetDatum(result);
break;
}
/* varlena based type
* ------------------
* The following types are implemented as varlena in HAWQ, they all corresponds
* to BINARY in Parquet. However, we have two strategies when storing them, depends
* on whether we stores varlena header or not.
*
* [strategy 1] exclude varlena header:
* BINARY = [VARSIZE(d) - 4, VARDATA(d)]
* This strategy works better for text-related type because
* any parquet client can interprete text binary.
* When reading:
* VARSIZE = LEN + 4;
* VARDATA = buffer + 4;
*
* [strategy 2] include varlena header in actual data:
* BINARY = [VARSIZE(d), d]
* This works better for HAWQ specific type like numeric because
* we can easily deserialize data. (just get the data part of the byte array)
* When reading:
* VARSIZE = LEN;
* VARDATA = buffer + 4 + 4; first 4 seeks to the varlena struct, the second 4 seeks to
* the VARDATA part of the struct
*/
/* these types use [strategy 1] */
case HAWQ_TYPE_BYTE:
case HAWQ_TYPE_CHAR:
case HAWQ_TYPE_BPCHAR:
case HAWQ_TYPE_VARCHAR:
case HAWQ_TYPE_TEXT:
case HAWQ_TYPE_XML:
{
int pureDataLen = /*le32toh(*/*((int32_t*)(*buffer))/*)*/;
int dataSize = pureDataLen + VARHDRSZ;
SET_VARSIZE((struct varlena *)(*buffer), dataSize);
(*value) = PointerGetDatum(*buffer);
(*buffer) += dataSize;
break;
}
/* these types use [strategy 2] */
case HAWQ_TYPE_BIT:
case HAWQ_TYPE_VARBIT:
case HAWQ_TYPE_NUMERIC:
case HAWQ_TYPE_INET:
case HAWQ_TYPE_CIDR:
{
int dataSize = /*le32toh(*/ *((int32_t*)(*buffer)) /*)*/;
(*buffer) += sizeof(int32);
(*value) = PointerGetDatum(*buffer);
(*buffer) += dataSize;
break;
}
default:
Insist(false);
break;
}
return true;
}
/**
* finish scan current column, free and reset column reader part
*/
void
ParquetColumnReader_FinishedScanColumn(
ParquetColumnReader *columnReader)
{
MemoryContext oldContext = MemoryContextSwitchTo(columnReader->memoryContext);
for(int i = 0; i < columnReader->dataPageNum; i++)
{
ParquetDataPage page = columnReader->dataPages + i;
pfree(page->header);
/* TODO may be reuse these decoder? */
if (page->repetition_level_reader != NULL)
{
pfree(page->repetition_level_reader);
}
if (page->definition_level_reader != NULL)
{
pfree(page->definition_level_reader);
}
if (page->bool_values_reader != NULL)
{
pfree(page->bool_values_reader);
}
/*
* compressed repeatable column keeps each page's decompressed
* content in page->data, which should be freed.
*/
if (columnReader->columnMetadata->codec != UNCOMPRESSED &&
columnReader->columnMetadata->r > 0)
{
pfree(page->data);
}
}
if(columnReader->dataLen != 0){
memset(columnReader->dataBuffer, 0, columnReader->dataLen);
}
if(columnReader->dataPageNum != 0)
{
memset(columnReader->dataPages, 0, columnReader->dataPageNum
* sizeof(struct ParquetDataPage_S));
columnReader->dataPageNum = 0;
}
if (columnReader->geoval != NULL)
{
pfree(columnReader->geoval);
columnReader->geoval = NULL;
}
MemoryContextSwitchTo(oldContext);
columnReader->dataPageProcessed = 0;
columnReader->currentPageValueRemained = 0;
}
/*----------------------------------------------------------------
* read nested geometry type
*----------------------------------------------------------------*/
void
ParquetColumnReader_readPoint(
ParquetColumnReader readers[],
Datum *value,
bool *null)
{
Point *point;
Datum child_values[2] = {0};
bool child_nulls[2] = {0};
ParquetColumnReader_readValue(&readers[0], &child_values[0], &child_nulls[0], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[1], &child_values[1], &child_nulls[1], HAWQ_TYPE_FLOAT8);
Insist(child_nulls[0] == child_nulls[1]);
if (child_nulls[0])
{
*null = true;
return;
}
point = readers[0].geoval;
if (point == NULL)
{
point = readers[0].geoval = palloc(sizeof *point);
readers[0].geoval = point;
}
point->x = DatumGetFloat8(child_values[0]);
point->y = DatumGetFloat8(child_values[1]);
*value = PointerGetDatum(point);
*null = false;
}
void
ParquetColumnReader_readLSEG(
ParquetColumnReader readers[],
Datum *value,
bool *null)
{
LSEG *lseg;
Datum child_values[4] = {0};
bool child_nulls[4] = {0};
ParquetColumnReader_readValue(&readers[0], &child_values[0], &child_nulls[0], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[1], &child_values[1], &child_nulls[1], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[2], &child_values[2], &child_nulls[2], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[3], &child_values[3], &child_nulls[3], HAWQ_TYPE_FLOAT8);
Insist(child_nulls[0] == child_nulls[1] &&
child_nulls[2] == child_nulls[3] &&
child_nulls[0] == child_nulls[2]);
if (child_nulls[0])
{
*null = true;
return;
}
lseg = readers[0].geoval;
if (lseg == NULL)
{
lseg = palloc(sizeof *lseg);
readers[0].geoval = lseg;
}
lseg->p[0].x = DatumGetFloat8(child_values[0]);
lseg->p[0].y = DatumGetFloat8(child_values[1]);
lseg->p[1].x = DatumGetFloat8(child_values[2]);
lseg->p[1].y = DatumGetFloat8(child_values[3]);
*value = PointerGetDatum(lseg);
*null = false;
}
void
ParquetColumnReader_readPATH(
ParquetColumnReader readers[],
Datum *value,
bool *null)
{
PATH *path;
Datum child_values[3] = {0}; /* is_open, points.x, points.y */
bool child_nulls[3] = {0};
int npts, maxnpts;
ParquetColumnReader_readValue(&readers[0], &child_values[0], &child_nulls[0], HAWQ_TYPE_BOOL);
if (child_nulls[0])
{
ParquetColumnReader_readValue(&readers[1], &child_values[1], &child_nulls[1], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[2], &child_values[2], &child_nulls[2], HAWQ_TYPE_FLOAT8);
Insist(child_nulls[1] && child_nulls[2]);
*null = true;
return;
}
npts = 0;
path = readers[0].geoval;
if (path == NULL)
{
maxnpts = 10;
path = (PATH *)palloc0(get_path_size(maxnpts));
}
else
{
maxnpts = path->npts;
}
path->closed = !DatumGetBool(child_values[0]);
while (true)
{
if (npts >= maxnpts)
{
maxnpts *= 2;
path = (PATH *)repalloc(path, get_path_size(maxnpts));
}
ParquetColumnReader_readValue(&readers[1], &child_values[1], &child_nulls[1], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[2], &child_values[2], &child_nulls[2], HAWQ_TYPE_FLOAT8);
path->p[npts].x = DatumGetFloat8(child_values[1]);
path->p[npts].y = DatumGetFloat8(child_values[2]);
npts++;
if (NextRepetitionLevel(&readers[1]) == 0)
break; /* no more points in this path */
}
path = (PATH *)repalloc(path, get_path_size(npts));
SET_VARSIZE(path, get_path_size(npts));
path->npts = npts;
*value = PointerGetDatum(path);
*null = false;
readers[0].geoval = path;
}
void
ParquetColumnReader_readBOX(
ParquetColumnReader readers[],
Datum *value,
bool *null)
{
BOX *box;
Datum child_values[4] = {0};
bool child_nulls[4] = {0};
ParquetColumnReader_readValue(&readers[0], &child_values[0], &child_nulls[0], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[1], &child_values[1], &child_nulls[1], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[2], &child_values[2], &child_nulls[2], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[3], &child_values[3], &child_nulls[3], HAWQ_TYPE_FLOAT8);
Insist(child_nulls[0] == child_nulls[1] &&
child_nulls[2] == child_nulls[3] &&
child_nulls[0] == child_nulls[2]);
if (child_nulls[0])
{
*null = true;
return;
}
box = readers[0].geoval;
if (box == NULL)
{
box = palloc(sizeof *box);
readers[0].geoval = box;
}
box->high.x = DatumGetFloat8(child_values[0]);
box->high.y = DatumGetFloat8(child_values[1]);
box->low.x = DatumGetFloat8(child_values[2]);
box->low.y = DatumGetFloat8(child_values[3]);
*value = PointerGetDatum(box);
*null = false;
}
void
ParquetColumnReader_readPOLYGON(
ParquetColumnReader readers[],
Datum *value,
bool *null)
{
POLYGON *polygon;
Datum child_values[6] = {0}; /* boundbox:{x1,y1,x2,y2}, points:{x,y} */
bool child_nulls[6] = {0};
int npts, maxnpts;
/*
* read BOX boundbox
*/
ParquetColumnReader_readValue(&readers[0], &child_values[0], &child_nulls[0], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[1], &child_values[1], &child_nulls[1], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[2], &child_values[2], &child_nulls[2], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[3], &child_values[3], &child_nulls[3], HAWQ_TYPE_FLOAT8);
Insist(child_nulls[0] == child_nulls[1] &&
child_nulls[2] == child_nulls[3] &&
child_nulls[0] == child_nulls[2]);
if (child_nulls[0])
{
ParquetColumnReader_readValue(&readers[4], &child_values[4], &child_nulls[4], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[5], &child_values[5], &child_nulls[5], HAWQ_TYPE_FLOAT8);
Insist(child_nulls[4] && child_nulls[5]);
*null = true;
return;
}
npts = 0;
polygon = readers[0].geoval;
if (polygon == NULL)
{
maxnpts = 10;
polygon = palloc(get_polygon_size(maxnpts));
}
else
{
maxnpts = polygon->npts;
}
polygon->boundbox.high.x = DatumGetFloat8(child_values[0]);
polygon->boundbox.high.y = DatumGetFloat8(child_values[1]);
polygon->boundbox.low.x = DatumGetFloat8(child_values[2]);
polygon->boundbox.low.y = DatumGetFloat8(child_values[3]);
/*
* read repeated points
*/
while (true)
{
if (npts >= maxnpts)
{
maxnpts *= 2;
polygon = repalloc(polygon, get_polygon_size(maxnpts));
}
ParquetColumnReader_readValue(&readers[4], &child_values[4], &child_nulls[4], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[5], &child_values[5], &child_nulls[5], HAWQ_TYPE_FLOAT8);
polygon->p[npts].x = DatumGetFloat8(child_values[4]);
polygon->p[npts].y = DatumGetFloat8(child_values[5]);
npts++;
if (NextRepetitionLevel(&readers[4]) == 0)
break; /* no more points in this polygon */
}
polygon = repalloc(polygon, get_polygon_size(npts));
SET_VARSIZE(polygon, get_polygon_size(npts));
polygon->npts = npts;
*value = PointerGetDatum(polygon);
*null = false;
readers[0].geoval = polygon;
}
void
ParquetColumnReader_readCIRCLE(
ParquetColumnReader readers[],
Datum *value,
bool *null)
{
CIRCLE *circle;
Datum child_values[3] = {0};
bool child_nulls[3] = {0};
ParquetColumnReader_readValue(&readers[0], &child_values[0], &child_nulls[0], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[1], &child_values[1], &child_nulls[1], HAWQ_TYPE_FLOAT8);
ParquetColumnReader_readValue(&readers[2], &child_values[2], &child_nulls[2], HAWQ_TYPE_FLOAT8);
Insist(child_nulls[0] == child_nulls[1] &&
child_nulls[1] == child_nulls[2]);
if (child_nulls[0])
{
*null = true;
return;
}
circle = readers[0].geoval;
if (circle == NULL)
{
circle = palloc(sizeof *circle);
readers[0].geoval = circle;
}
circle->center.x = DatumGetFloat8(child_values[0]);
circle->center.y = DatumGetFloat8(child_values[1]);
circle->radius = DatumGetFloat8(child_values[2]);
*value = PointerGetDatum(circle);
*null = false;
}