blob: 9a371b887c88906923a66315577b2ea627003a14 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "cdb/cdbparquetfooterbuffer.h"
#define PARQUET_FOOTER_TEMPFILE_PREFIX "footer_buffered_write"
#define PARQUET_FOOTER_TEMPFILE_SEQ 1
#define ASSERT_IS_A_READING_ONLY_BUFFER(buffer) \
Assert( (buffer) != NULL && \
(buffer)->Mode == PARQUET_FOOTER_BUFFERMODE_READ)
#define ASSERT_IS_A_WRITING_ONLY_BUFFER(buffer) \
Assert( (buffer) != NULL && \
(buffer)->Mode == PARQUET_FOOTER_BUFFERMODE_WRITE)
void fillBufferFromFile(File file, char *filename, uint8_t *buffer, int size);
void flushBufferToFile(File file, char *filename, uint8_t *buffer, int size);
int fillBufferFromTempFile(File file, uint8_t *buffer, int size);
char ModeStringConstant[3][6] = {"ZERO","READ","WRITE"};
/*******************************************************************************
* External function implementation.
******************************************************************************/
/*
* Create one parquet footer buffer.
*
* @datafile The footer file handler.
* @filename The filename of the footer file.
* @footerlength The total length of the footer data to read.
* @capacity The capacity of the buffer.
* @mode If the buffer is for reading or writing.
* PARQUET_FOOTER_BUFFERMODE_READ for reading,
* PARQUET_FOOTER_BUFFERMODE_WRITE for write.
*
* Return created footer buffer instance.
*/
ParquetFooterBuffer *createParquetFooterBuffer( File datafile,
char *filename,
int footerlength,
int capacity,
int mode)
{
/* Create new instance. memset to 0 is redundant, just for safety. */
ParquetFooterBuffer *res = (ParquetFooterBuffer *)
palloc0(sizeof(ParquetFooterBuffer));
/* Set the properties. */
res->Mode = mode;
res->FileHandler = datafile;
res->ValidLen = 0;
res->Cursor = -1;
res->FooterProcessed = 0;
res->FileName = (char *)palloc(sizeof(char)*(strlen(filename)+1));
strcpy(res->FileName, filename);
if ( mode == PARQUET_FOOTER_BUFFERMODE_READ ) {
res->FooterLength = footerlength;
res->TmpFile = -1;
/* Create buffer. If the footer is small,
* no need to acquire more space. */
res->Capacity = capacity > footerlength ? footerlength : capacity;
res->Head = (uint8_t *)palloc(sizeof(char)*res->Capacity);
}
else if ( mode == PARQUET_FOOTER_BUFFERMODE_WRITE ) {
res->FooterLength = -1;
res->Capacity = capacity;
/* Create buffer. */
res->Head = (uint8_t *)palloc((res->Capacity) * sizeof(char));
res->TmpFile = OpenTemporaryFile( PARQUET_FOOTER_TEMPFILE_PREFIX,
PARQUET_FOOTER_TEMPFILE_SEQ,
true,
true,
true,
true );
/* NOTE: If the temporary file can not be created,
* the error is raised by OpenTemporaryFile */
}
elog(DEBUG5, "Create parquet footer buffer. "
"INSTADDR=%p, "
"MODE=%s, "
"FILENAME=%s, "
"FOOTERLENGTH=%d, "
"CAPACITY=%d, "
"TMPFILEHANDLER=%d",
res,
ModeStringConstant[res->Mode],
res->FileName,
res->FooterLength,
res->Capacity,
res->TmpFile );
return res;
}
/*
* Free one parquet footer buffer.
*
* @buffer The buffer to free.
*/
void freeParquetFooterBuffer( ParquetFooterBuffer *buffer )
{
elog(DEBUG5, "Free parquet footer buffer."
"INSTADDR=%p, "
"MODE=%s",
buffer,
ModeStringConstant[buffer->Mode] );
/* Free allocated spaces */
pfree(buffer->Head);
pfree(buffer->FileName);
/* Close temporary file which will automatically be deleted. */
if ( buffer->TmpFile >= 0 ) {
FileClose(buffer->TmpFile);
}
/* Free whole buffer instance. */
pfree(buffer);
}
/*
* Prepare to read specified amount of the bytes.
*
* After calling this function, caller can use (*ptr) to access a memory of size
* length to read the data. Buffer handles forward only reading automatically.
*
* @buffer The buffer to read the file content.
* @length The length of the data expected to read after calling.
* @ptr The pointer to the valid memory for the prepared data.
*
* return PARQUET_FOOTER_BUFFER_MOVE_OK if succeed.
* Otherwise, PARQUET_FOOTER_BUFFER_MOVE_ERROR.
*/
int prepareFooterBufferForwardReading( ParquetFooterBuffer *buffer,
int length,
uint8_t **ptr )
{
int loadsize = -1;
Assert( buffer != NULL );
ASSERT_IS_A_READING_ONLY_BUFFER(buffer);
Assert( ptr != NULL );
Assert( length > 0 );
/* Check if too large data to prepare. This is not one valid request. */
if ( length > buffer->Capacity ) {
return PARQUET_FOOTER_BUFFER_MOVE_ERROR;
}
/* If all content is consumed already, should load data into buffer. */
if ( buffer->Cursor == -1 ||
buffer->Cursor >= buffer->ValidLen ) {
loadsize = buffer->FooterLength - buffer->FooterProcessed;
loadsize = loadsize < buffer->Capacity ? loadsize : buffer->Capacity;
fillBufferFromFile( buffer->FileHandler,
buffer->FileName,
buffer->Head,
loadsize );
buffer->Cursor = 0;
buffer->ValidLen = loadsize;
buffer->FooterProcessed += loadsize;
elog(DEBUG5, "Reset cursor back. "
"INSTADDR=%p, "
"ValidContentLength=%d, "
"ProcessedFooter=%d",
buffer,
buffer->ValidLen,
buffer->FooterProcessed );
}
/* Check if should move memory content. */
if ( buffer->Cursor >= 0 && buffer->Cursor < buffer->Capacity &&
buffer->ValidLen >= buffer->Cursor + length ) {
/* Start from current buffer pointer , the caller can directly read
* expected amount of bytes. */
*ptr = buffer->Head + buffer->Cursor;
buffer->Cursor += length;
elog(DEBUG5, "No read action. "
"INSTADDR=%p, "
"ValidContentLength=%d, "
"ProcessedFooter=%d",
buffer,
buffer->ValidLen,
buffer->FooterProcessed );
return PARQUET_FOOTER_BUFFER_MOVE_OK;
}
/* Move the data to the head, and make cursor start from the head. */
memmove( buffer->Head,
buffer->Head + buffer->Cursor,
buffer->ValidLen - buffer->Cursor );
elog(DEBUG5, "Shift %d bytes memory from offset %d to 0. "
"INSTADDR=%p.",
buffer->ValidLen - buffer->Cursor,
buffer->Cursor,
buffer);
buffer->ValidLen = buffer->ValidLen - buffer->Cursor;
buffer->Cursor = 0;
/* Keep loading data to fill the buffer. */
loadsize = buffer->FooterLength - buffer->FooterProcessed;
loadsize = loadsize < buffer->Capacity - buffer->ValidLen ?
loadsize :
buffer->Capacity - buffer->ValidLen;
fillBufferFromFile( buffer->FileHandler,
buffer->FileName,
buffer->Head + buffer->ValidLen,
loadsize );
buffer->ValidLen += loadsize;
buffer->FooterProcessed += loadsize;
/* If after loading data, there are still not enough data, return error. */
if ( buffer->ValidLen < length )
return PARQUET_FOOTER_BUFFER_MOVE_ERROR;
*ptr = buffer->Head;
buffer->Cursor += length;
return PARQUET_FOOTER_BUFFER_MOVE_OK;
}
/*
* Flush all the data from the memory buffer to temporary file.
*
* @buffer The buffer to flush.
*/
void flushFooterBufferToTempFile( ParquetFooterBuffer *buffer )
{
ASSERT_IS_A_WRITING_ONLY_BUFFER(buffer);
Assert( buffer->TmpFile >= 0 );
/* If no data saved in the buffer, no need to flush. */
if ( buffer->Cursor == -1 )
return;
int writesize = FileWrite( buffer->TmpFile,
(char *)(buffer->Head),
buffer->Cursor+1 );
if ( writesize != buffer->Cursor+1 ) {
/*ereport error*/
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("Parquet Storage Write error on temporary file:"
"writing footer tempaorary file failure")));
}
buffer->FooterProcessed += buffer->Cursor+1;
buffer->Cursor = -1;
elog(DEBUG5, "Flush %d bytes from buffer to temporary file. "
"Total flushed %d bytes. "
"INSTADDR=%p. ",
writesize,
buffer->FooterProcessed,
buffer );
}
/*
* Write data into the buffer which in fact is one temporary file.
*
* @buffer The buffer to write data to
* @content The data to be copied into the buffer
* @length The length of the data to be copied.
*/
void writeToFooterBuffer( ParquetFooterBuffer *buffer,
uint8_t *content,
int length)
{
int totallen = length;
ASSERT_IS_A_WRITING_ONLY_BUFFER(buffer);
Assert( buffer->TmpFile >= 0 );
while ( length > 0 ) {
/* Decide how long data to write into this buffer. */
int bufferwritelen = buffer->Capacity - buffer->Cursor -1;
bufferwritelen = bufferwritelen > length ? length : bufferwritelen;
/* Write to buffer */
memcpy((void *)(buffer->Head + buffer->Cursor + 1),
(void *)content,
bufferwritelen );
buffer->Cursor += bufferwritelen;
/* Check if should flush the content to temporary file. */
if ( buffer->Cursor >= buffer->Capacity - 1 ) {
flushFooterBufferToTempFile(buffer);
}
/* reset content length and index for writting*/
length -= bufferwritelen;
content = content + bufferwritelen;
}
elog(DEBUG5, "Write %d bytes into buffer. "
"Total flushed %d bytes. "
"INSTADDR=%p. ",
totallen,
buffer->FooterProcessed,
buffer );
}
/*
* Append temporary file content directly to the real target file.
*
* After calling this function, the buffer can not be used to buffer data any
* more, this function does return the buffer to a consistent status back. So
* should call freeParquetFooterBuffer() to free the buffer instance.
*
* @file The file handler to write
*/
int appendFooterBufferTempData (File targetFile, ParquetFooterBuffer *buffer )
{
int appended = 0;
int readsize = 0;
ASSERT_IS_A_WRITING_ONLY_BUFFER(buffer);
Assert( buffer->TmpFile >= 0 );
/* Flush the buffer to ensure all are writen to the temporary file. */
flushFooterBufferToTempFile(buffer);
/* Seek to the beginning of the tempoarary file. */
if(FileSeek( buffer->TmpFile, 0, SEEK_SET ) != 0){
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR), errmsg("Parquet Storage write "
"error on segment file '%s': " "writing footer failure",
buffer->FileName)));
}
while ( appended < buffer->FooterProcessed ) {
/* Read data into the buffer */
readsize = fillBufferFromTempFile(buffer->TmpFile,
buffer->Head,
buffer->Capacity);
/* Write data into the target file */
flushBufferToFile(targetFile,
buffer->FileName,
buffer->Head,
readsize);
appended += readsize;
}
elog(DEBUG5, "Append temporary data to segment file '%s'."
"Total appended %d bytes."
"INSTADDR=%p. ",
buffer->FileName,
appended,
buffer );
return buffer->FooterProcessed;
}
/*******************************************************************************
* Internal function implementation.
******************************************************************************/
/*
* Read data from file to the buffer.
*
* @file The file handler to read
* @filename The file name string for generating error report
* @buffer The buffer to fill data into. Always sufficient to contain data.
* @size The data size to read. Always can read as many as size bytes.
*/
void fillBufferFromFile(File file, char *filename, uint8_t *buffer, int size)
{
int actualreadsize = 0;
int readsize = 0;
while( actualreadsize < size ) {
readsize = FileRead( file,
(char *)(buffer) + actualreadsize,
size - actualreadsize);
if ( readsize < 0 ) {
/*ereport error*/
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("Parquet Storage Read error on segment file '%s': "
"reading footer failure",
filename)));
}
actualreadsize += readsize;
}
elog(DEBUG5, "Read parquet footer data from segment file '%s'. "
"Loaded %d bytes into buffer.",
filename,
size);
return;
}
/*
* Write buffer data to target file.
*
* @file The file handler to write
* @filename The file name string for generating error report
* @buffer The buffer as source data.
* @size The data size to write.
*/
void flushBufferToFile(File file, char *filename, uint8_t *buffer, int size)
{
int writesize = FileWrite(file,
(char *)buffer,
size );
if ( writesize != size ) {
/*ereport error*/
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("Parquet Storage Write error on segment file '%s': "
"writing footer file failure",
filename)));
}
}
/*
* Read temporary file and fill the buffer as many as possible. The content in
* the buffer will be flushed again to the real target file.
*
* @file The temporary file handler to read.
* @buffer The buffer to contain data.
* @size The size of the buffer.
*
* Return the actual size of the data read from the temporary file. If the value
* returned is less than the buffer size and there is no error reported, the end
* of the temporary file is reached.
*/
int fillBufferFromTempFile(File file, uint8_t *buffer, int size)
{
int actualreadsize = 0;
int readsize = 0;
while( actualreadsize < size ) {
readsize = FileRead( file,
(char *)(buffer) + actualreadsize,
size - actualreadsize);
if ( readsize < 0 ) {
/*ereport error*/
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("Parquet Storage Read error on temporary file:"
"reading footer temporary file failure")));
}
else if ( readsize == 0 )
break;
actualreadsize += readsize;
}
return actualreadsize;
}