blob: 691534aa39a1127e9aefec66fc94baaeb75a1bb5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* cdbparquetfooterprocessor.c
*
* Created on: Sep 22, 2013
* Author: malili
*/
#include "cdb/cdbparquetfooterprocessor.h"
#include "cdb/cdbparquetstoragewrite.h"
#include "cdb/cdbparquetfooterserializer.h"
void writeParquetHeader(File dataFile, char *filePathName, int64 *fileLen, int64 *fileLen_uncompressed) {
/* write out magic 'PAR1'*/
DetectHostEndian();
char PARQUET_VERSION_NUMBER[4] = { 'P', 'A', 'R', '1' };
if (FileWrite(dataFile, PARQUET_VERSION_NUMBER, 4) != 4)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("file write error in file '%s': %s", filePathName, strerror(errno)),
errdetail("%s", HdfsGetLastError())));
}
*fileLen += 4;
*fileLen_uncompressed += 4;
}
void writeParquetFooter(File dataFile,
char *filePathName,
ParquetMetadata parquetMetadata,
int64 *fileLen,
int64 *fileLen_uncompressed,
CompactProtocol **footer_read_protocol,
CompactProtocol **footer_write_protocol,
int previous_rowgroup_count) {
uint32_t footerLen;
char bufferFooterLen[4];
int writeRet = 0;
footerLen = (uint32_t)endSerializeFooter(footer_read_protocol, footer_write_protocol,
filePathName, dataFile, parquetMetadata, previous_rowgroup_count);
*fileLen += footerLen;
*fileLen_uncompressed += footerLen;
/* write out footer length*/
bufferFooterLen[0] = (footerLen >> 0) & 0xFF;
bufferFooterLen[1] = (footerLen >> 8) & 0xFF;
bufferFooterLen[2] = (footerLen >> 16) & 0xFF;
bufferFooterLen[3] = (footerLen >> 24) & 0xFF;
writeRet = FileWrite(dataFile, bufferFooterLen, 4);
if (writeRet != 4)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("file write error in file '%s': %s", filePathName, strerror(errno)),
errdetail("%s", HdfsGetLastError())));
}
/* write out magic 'PAR1'*/
char PARQUET_VERSION_NUMBER[4] = { 'P', 'A', 'R', '1' };
writeRet = FileWrite(dataFile, PARQUET_VERSION_NUMBER, 4);
if (writeRet != 4)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("file write error in file '%s': %s", filePathName, strerror(errno)),
errdetail("%s", HdfsGetLastError())));
}
*fileLen += 8;
*fileLen_uncompressed += 8;
}
/*
* read the footer of a parquet file
*
* @fileHandler the file to be read
* @parquetMetadata the parquetMetadata which needs to be read out from file
* @eof the startring read point of file
*
* @return whether the parquetMetadata be read out. If read out, return true,
* else return false
*
* */
bool readParquetFooter(File fileHandler, ParquetMetadata *parquetMetadata,
CompactProtocol **footerProtocol, int64 eof, char *filePathName) {
/*int compact = 1;*/
int actualReadSize = 0;
DetectHostEndian();
*parquetMetadata = (struct ParquetMetadata_4C *)
palloc0(sizeof(struct ParquetMetadata_4C));
/* if file size is 0, means there's no data in file, return false*/
if (eof == 0)
return false;
/* should judge correctness of eof, at least the file should contain header 'PAR1', and footer
* footerLength(4 bytes) and 'PAR1'*/
if (eof < 12)
{
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("catalog information for '%s' not correct, eof should be more than 12, but got" INT64_FORMAT,
filePathName, eof)));
}
/* get footer length*/
int64 footLengthIndex = FileSeek(fileHandler, eof - 8, SEEK_SET);
if (footLengthIndex != (eof - 8))
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("file seek error in file '%s' when seeking \'" INT64_FORMAT "\': '%s'"
, filePathName, footLengthIndex, strerror(errno)),
errdetail("%s", HdfsGetLastError())));
}
elog(DEBUG5, "Parquet metadata file footer length index: " INT64_FORMAT "\n", footLengthIndex);
char buffer[4];
while(actualReadSize < 4)
{
/*read out all the buffer of the column chunk*/
int readFooterLen = FileRead(fileHandler, buffer + actualReadSize, 4 - actualReadSize);
if (readFooterLen < 0) {
ereport(ERROR,
(errcode_for_file_access(),
errmsg("file read error in file '%s': %s", filePathName, strerror(errno)),
errdetail("%s", HdfsGetLastError())));
}
actualReadSize += readFooterLen;
}
/** get footerlen through little-endian decoding */
uint32_t footerLen = *(uint32*) buffer;
elog(DEBUG5, "Parquet metadata file footer length: %u\n",footerLen);
/** Part 2: read footer itself*/
int64 footerIndex = footLengthIndex - (int64) footerLen;
elog(
DEBUG5, "Parquet metadata file footer Index: " INT64_FORMAT, footerIndex);
if (FileSeek(fileHandler, footerIndex, SEEK_SET) != footerIndex)
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("file seek error in file '%s' when seeking \'" INT64_FORMAT "\': %s"
, filePathName, footerIndex, strerror(errno)),
errdetail("%s", HdfsGetLastError())));
}
initDeserializeFooter(fileHandler, footerLen, filePathName, parquetMetadata, footerProtocol);
return true;
}
/**
* This procedure does two things:
* 1. check whether the file's metadata matches table's schema, return false if mismatch happens.
* 2. sync meta info: for each parquet field, set its hawq type id
*
* TODO if we only accepts files that DB outputs, do we need this check process?
*/
bool
checkAndSyncMetadata(ParquetMetadata parquetmd,
TupleDesc tupdesc)
{
int numfields, i;
Form_pg_attribute att;
FileField_4C *field;
numfields = parquetmd->fieldCount;
if (numfields != tupdesc->natts)
return false;
for (i = 0; i < numfields; ++i)
{
field = parquetmd->pfield + i;
att = tupdesc->attrs[i];
if (strcmp(field->name, NameStr(att->attname)))
return false;
if (field->repetitionType == REPEATED)
return false; /* top level fields shouldn't be repeated. */
field->hawqTypeId = att->atttypid;
/* for non-group type, check whether its PrimitiveTypeName compatible with hawqTypeId */
if (field->num_children == 0 && field->type != mappingHAWQType(field->hawqTypeId))
return false;
/* for nested type, set hawqTypeId for children fields */
switch(field->hawqTypeId)
{
case HAWQ_TYPE_POINT:
field->children[0].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[1].hawqTypeId = HAWQ_TYPE_FLOAT8;
break;
case HAWQ_TYPE_PATH:
field->children[0].hawqTypeId = HAWQ_TYPE_BOOL; /* is_open */
field->children[1].hawqTypeId = HAWQ_TYPE_POINT;/* points */
field->children[1].children[0].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[1].children[1].hawqTypeId = HAWQ_TYPE_FLOAT8;
break;
case HAWQ_TYPE_LSEG:
case HAWQ_TYPE_BOX:
field->children[0].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[1].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[2].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[3].hawqTypeId = HAWQ_TYPE_FLOAT8;
break;
case HAWQ_TYPE_POLYGON:
field->children[0].hawqTypeId = HAWQ_TYPE_BOX;
field->children[0].children[0].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[0].children[1].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[0].children[2].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[0].children[3].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[1].hawqTypeId = HAWQ_TYPE_POINT;
field->children[1].children[0].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[1].children[1].hawqTypeId = HAWQ_TYPE_FLOAT8;
break;
case HAWQ_TYPE_CIRCLE:
field->children[0].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[1].hawqTypeId = HAWQ_TYPE_FLOAT8;
field->children[2].hawqTypeId = HAWQ_TYPE_FLOAT8;
break;
}
}
return true;
}
void
DetectHostEndian(void)
{
/*uint32_t O32_LITTLE_ENDIAN = 0x03020100ul;*/
uint32_t O32_BIG_ENDIAN = 0x00010203ul;
static union
{
unsigned char bytes[4];
uint32_t value;
} o32_host_order = { { 0, 1, 2, 3 } };
if (O32_BIG_ENDIAN == o32_host_order.value)
ereport(ERROR,
(errcode(ERRCODE_GP_INTERNAL_ERROR),
errmsg("parquet format is not supported on big endian.")));
}