blob: 7e8ec58297f40c6c3bd9d12d4d477b31f86cb2af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* execworkfile.c
* Management of temporary work files used by the executor nodes.
*
* WorkFiles provide a general interface to different implementations of
* temporary files used by the executor nodes.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/buffile.h"
#include "storage/bfz.h"
#include "executor/execWorkfile.h"
#include "miscadmin.h"
#include "cdb/cdbvars.h"
#include "utils/workfile_mgr.h"
#include "utils/memutils.h"
/*
* Number of temporary files opened during the current session;
* this is used in generation of unique tempfile names.
*/
static uint64 temp_file_counter = 0;
static void ExecWorkFile_SetFlags(ExecWorkFile *workfile, bool delOnClose, bool created);
static void ExecWorkFile_AdjustBFZSize(ExecWorkFile *workfile, int64 file_size);
/*
* ExecWorkFile_Create
* create a new work file with the specified name, the file type,
* and the compression type.
*
* If this fails, NULL is returned.
*/
ExecWorkFile *
ExecWorkFile_Create(const char *fileName,
ExecWorkFileType fileType,
bool delOnClose,
int compressType)
{
ExecWorkFile *workfile;
void *file;
/* Before creating a new file, let's check the limit on number of workfile created */
if (!WorkfileQueryspace_AddWorkfile())
{
/* Failed to reserve additional disk space, notify caller */
workfile_mgr_report_error();
}
switch(fileType)
{
case BUFFILE:
file = (void *) BufFileCreateFile(fileName, delOnClose, false /* interXact */ );
BufFileSetWorkfile(file);
break;
case BFZ:
file = (void *)bfz_create(fileName, delOnClose, compressType);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid work file type: %d", fileType)));
return NULL; /* keep compiler quiet */
}
workfile = palloc0(sizeof(ExecWorkFile));
workfile->fileType = fileType;
workfile->compressType = compressType;
workfile->file = file;
workfile->fileName = pstrdup(fileName);
workfile->size = 0;
ExecWorkFile_SetFlags(workfile, delOnClose, true /* created */);
return workfile;
}
/*
* ExecWorkfile_AddUniqueSuffix
* Adds a suffix to a filename to make it unique, within and across queries
*
* The returned StringInfo and data are palloc-ed in the current memory context.
*/
StringInfo
ExecWorkFile_AddUniqueSuffix(const char *filename)
{
StringInfo uniquename = makeStringInfo();
appendStringInfo(uniquename, "%s_%d_" UINT64_FORMAT,
filename,
MyProcPid,
temp_file_counter++);
Assert(uniquename->len <= MAXPGPATH - 1 && "could not generate temporary file name");
return uniquename;
}
/*
* ExecWorkFile_CreateUnique
* create a new work file with specified name, type and compression
* In addition, it adds a unique suffix
*/
ExecWorkFile *
ExecWorkFile_CreateUnique(const char *filename,
ExecWorkFileType fileType,
bool delOnClose,
int compressType)
{
StringInfo uniquename = ExecWorkFile_AddUniqueSuffix(filename);
ExecWorkFile *ewf = ExecWorkFile_Create(uniquename->data, fileType, delOnClose, compressType);
pfree(uniquename->data);
pfree(uniquename);
return ewf;
}
/*
* Opens an existing work file with the specified name, the file type,
* and the compression type.
*
* If this fails, NULL is returned.
*/
ExecWorkFile *
ExecWorkFile_Open(const char *fileName,
ExecWorkFileType fileType,
bool delOnClose,
int compressType)
{
ExecWorkFile *workfile;
void *file;
int64 file_size;
switch(fileType)
{
case BUFFILE:
file = (void *)BufFileOpenFile(fileName,
false, /* Create */
delOnClose,
true /* interXact */ );
if (!file)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open temporary file \"%s\": %m",
fileName)));
BufFileSetWorkfile(file);
file_size = BufFileGetSize(file);
break;
case BFZ:
file = (void *)bfz_open(fileName, delOnClose, compressType);
if (!file)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open temporary file \"%s\": %m",
fileName)));
file_size = bfz_totalbytes((bfz_t *)file);
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid work file type: %d", fileType)));
return NULL; /* keep compiler quiet */
}
workfile = palloc0(sizeof(ExecWorkFile));
workfile->fileType = fileType;
workfile->compressType = compressType;
workfile->file = file;
workfile->fileName = pstrdup(fileName);
workfile->size = file_size;
ExecWorkFile_SetFlags(workfile, delOnClose, false /* created */);
return workfile;
}
/*
* ExecWorkFile_Write
* write the given data from the end of the last write position.
*
* This function returns true if the write succeeds. Otherwise, return false.
*/
bool
ExecWorkFile_Write(ExecWorkFile *workfile,
void *data,
uint64 size)
{
Assert(workfile != NULL);
uint64 bytes;
if (data == NULL || size == 0)
{
return false;
}
/* Test the per-query and per-segment limit */
if ((workfile->flags & EXEC_WORKFILE_LIMIT_SIZE) &&
!WorkfileDiskspace_Reserve(size))
{
/* Failed to reserve additional disk space, notify caller */
workfile_mgr_report_error();
}
switch(workfile->fileType)
{
case BUFFILE:
{}
BufFile *buffile = (BufFile *)workfile->file;
int64 current_size = BufFileGetSize(buffile);
int64 new_size = 0;
PG_TRY();
{
bytes = BufFileWrite(buffile, data, size);
}
PG_CATCH();
{
new_size = BufFileGetSize(buffile);
workfile->size = new_size;
WorkfileDiskspace_Commit( (new_size - current_size), size, true /* update_query_size */);
int64 size_evicted = workfile_mgr_evict(MIN_EVICT_SIZE);
elog(gp_workfile_caching_loglevel, "Hit out of disk space, evicted " INT64_FORMAT " bytes", size_evicted);
PG_RE_THROW();
}
PG_END_TRY();
new_size = BufFileGetSize(buffile);
workfile->size = new_size;
WorkfileDiskspace_Commit( (new_size - current_size), size, true /* update_query_size */);
workfile_update_in_progress_size(workfile, new_size - current_size);
if (bytes != size)
{
workfile_mgr_report_error();
}
break;
case BFZ:
PG_TRY();
{
bfz_append((bfz_t *)workfile->file, data, size);
}
PG_CATCH();
{
Assert(WorkfileDiskspace_IsFull());
WorkfileDiskspace_Commit(0, size, true /* update_query_size */);
int64 size_evicted = workfile_mgr_evict(MIN_EVICT_SIZE);
elog(gp_workfile_caching_loglevel, "Hit out of disk space, evicted " INT64_FORMAT " bytes", size_evicted);
PG_RE_THROW();
}
PG_END_TRY();
/* bfz_append always adds to the file size */
workfile->size += size;
if ((workfile->flags & EXEC_WORKFILE_LIMIT_SIZE))
{
WorkfileDiskspace_Commit(size, size, true /* update_query_size */);
}
workfile_update_in_progress_size(workfile, size);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
return true;
}
/*
* ExecWorkFile_Read
* read the data with specified size to the given buffer.
*
* The given buffer should contain at least the space specified by
* 'size'.
*
* If the read succeeds, this function returns the number of bytes
* that are read. Otherwise, returns 0.
*/
uint64
ExecWorkFile_Read(ExecWorkFile *workfile,
void *data,
uint64 size)
{
Assert(workfile != NULL);
uint64 bytes = 0;
switch(workfile->fileType)
{
case BUFFILE:
bytes = BufFileRead((BufFile *)workfile->file, data, size);
break;
case BFZ:
bytes = bfz_scan_next((bfz_t *)workfile->file, data, size);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
return bytes;
}
/*
* ExecWorkFile_ReadFromBuffer
*
* This function provides a faster implementation of Read which applies
* when the data is already in the underlying buffer.
* In that case, it returns a pointer to the data in the buffer
* If the data is not in the buffer, returns NULL and the caller must
* call the regular ExecWorkFile_Read with a destination buffer.
*
* Currently only bfz supports this behavior.
*
*/
void *
ExecWorkFile_ReadFromBuffer(ExecWorkFile *workfile,
uint64 size)
{
Assert(workfile != NULL);
void *data = NULL;
switch(workfile->fileType)
{
case BFZ:
data = bfz_scan_peek((bfz_t *)workfile->file, size);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
return data;
}
/*
* ExecWorkFile_Rewind
* rewind the pointer position to the beginning of the file.
*
* This function returns true if this succeeds. Otherwise, return false.
*/
bool
ExecWorkFile_Rewind(ExecWorkFile *workfile)
{
Assert(workfile != NULL);
long ret = 0;
int64 file_size = 0;
switch(workfile->fileType)
{
case BUFFILE:
ret = BufFileSeek((BufFile *)workfile->file, 0L /* offset */, SEEK_SET);
/* BufFileSeek returns 0 if everything went OK */
return (0 == ret);
case BFZ:
file_size = bfz_append_end((bfz_t *)workfile->file);
ExecWorkFile_AdjustBFZSize(workfile, file_size);
bfz_scan_begin((bfz_t *)workfile->file);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
return true;
}
/*
* ExecWorkFile_Tell64
* return the value of the current file position indicator.
*/
uint64
ExecWorkFile_Tell64(ExecWorkFile *workfile)
{
Assert(workfile != NULL);
uint64 bytes = 0;
switch(workfile->fileType)
{
case BUFFILE:
BufFileTell((BufFile *)workfile->file, (int64 *) &bytes);
break;
case BFZ:
bytes = bfz_totalbytes((bfz_t *)workfile->file);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
return bytes;
}
/*
* ExecWorkFile_Close
* close the work file, and release the space.
*
* Returns the actual size of the file on disk upon closing
*/
int64
ExecWorkFile_Close(ExecWorkFile *workfile, bool canReportError)
{
Assert(workfile != NULL);
bfz_t *bfz_file = NULL;
switch(workfile->fileType)
{
case BUFFILE:
BufFileClose((BufFile *)workfile->file);
break;
case BFZ:
bfz_file = (bfz_t *)workfile->file;
Assert(bfz_file != NULL);
if (bfz_file->mode == BFZ_MODE_APPEND)
{
/* Flush data out to disk if we were writing */
int64 file_size = bfz_append_end(bfz_file);
/* Adjust the size with WorkfileDiskspace to our actual size */
ExecWorkFile_AdjustBFZSize(workfile, file_size);
}
bfz_close(bfz_file, true, canReportError);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
int64 size = ExecWorkFile_GetSize(workfile);
pfree(workfile->fileName);
pfree(workfile);
return size;
}
/*
* Return the file name for the given workfile.
* Note that this might have a different meaning, depending on the underlying
* implementation.
* For example, BufFile can span over several physical files, and thus this
* is just a prefix.
*/
char *
ExecWorkFile_GetFileName(ExecWorkFile *workfile)
{
Assert(workfile != NULL);
return workfile->fileName;
}
/*
* ExecWorkFile_Seek
* Result is 0 if OK, EOF if not. Logical position is not moved if an
* impossible seek is attempted.
*/
int
ExecWorkFile_Seek(ExecWorkFile *workfile, uint64 offset, int whence)
{
Assert(workfile != NULL);
Assert((workfile->flags & EXEC_WORKFILE_RANDOM_ACCESS) != 0);
int result = 0;
/* Determine if this seeks beyond EOF */
int64 additional_size = 0;
switch (whence)
{
case SEEK_SET:
if (offset > workfile->size)
{
additional_size = offset - workfile->size;
}
break;
case SEEK_CUR:
if (ExecWorkFile_Tell64(workfile) + offset > workfile->size)
{
additional_size = ExecWorkFile_Tell64(workfile) + offset - workfile->size;
}
break;
default:
elog(LOG, "invalid whence: %d", whence);
Assert(false);
return EOF;
}
/* Reserve disk space if needed */
if (additional_size > 0)
{
/*
* We only allow seeking beyond EOF for files opened for writing
* (i.e. files we created)
*/
if (workfile->flags & EXEC_WORKFILE_CREATED)
{
bool success = WorkfileDiskspace_Reserve(additional_size);
if (!success)
{
/* Failed to reserve additional disk space, notify caller */
return EOF;
}
}
else
{
return EOF;
}
}
/* Do the actual seek */
switch(workfile->fileType)
{
case BUFFILE:
result = BufFileSeek((BufFile *)workfile->file, offset, whence);
if (additional_size > 0)
{
workfile->size = BufFileGetSize((BufFile *)workfile->file);
}
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
if (additional_size > 0)
{
WorkfileDiskspace_Commit(additional_size, additional_size, true /* update_query_size */);
workfile_update_in_progress_size(workfile, additional_size);
}
return result;
}
void
ExecWorkFile_Flush(ExecWorkFile *workfile)
{
Assert(workfile != NULL);
switch(workfile->fileType)
{
case BUFFILE:
BufFileFlush((BufFile *) workfile->file);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
}
/*
* Suspend a file without closing it. For bfz, which allocates a buffer for
* each open a file, this frees up that buffer but keeps the fd so we can
* re-open this file later
*
* Returns the actual size of the file on disk
*/
int64
ExecWorkFile_Suspend(ExecWorkFile *workfile)
{
Assert(workfile != NULL);
Assert((workfile->flags & EXEC_WORKFILE_SUSPENDABLE) != 0);
int64 size = -1;
switch(workfile->fileType)
{
case BFZ:
size = bfz_append_end((bfz_t *) workfile->file);
ExecWorkFile_AdjustBFZSize(workfile, size);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
return size;
}
/*
* Re-open a suspended file for reading. This allocates all the necessary
* buffers and data structures to restart reading from the file
*/
void
ExecWorkFile_Restart(ExecWorkFile *workfile)
{
Assert(workfile != NULL);
Assert((workfile->flags & EXEC_WORKFILE_SUSPENDABLE) != 0);
switch(workfile->fileType)
{
case BFZ:
bfz_scan_begin((bfz_t *) workfile->file);
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
}
/*
* Returns the size of the underlying file, as tracked by this API
*/
int64
ExecWorkFile_GetSize(ExecWorkFile *workfile)
{
return workfile->size;
}
/*
* Sets the pointer to the parent workfile_set
*/
void
ExecWorkfile_SetWorkset(ExecWorkFile *workfile, workfile_set *work_set)
{
Assert(NULL != workfile);
Assert(NULL != work_set);
Assert(NULL == workfile->work_set);
workfile->work_set = work_set;
}
/*
* For a new workfile, sets the capabilities flags according to
* the known underlying file type capabilities and the method the file was created
*/
static void
ExecWorkFile_SetFlags(ExecWorkFile *workfile, bool delOnClose, bool created)
{
Assert(workfile != NULL);
/* Assert that only the creator of a file can delete it on close */
AssertImply(delOnClose, created);
switch(workfile->fileType)
{
case BUFFILE:
workfile->flags |= EXEC_WORKFILE_RANDOM_ACCESS;
break;
case BFZ:
workfile->flags |= EXEC_WORKFILE_SUSPENDABLE;
break;
default:
insist_log(false, "invalid work file type: %d", workfile->fileType);
}
if (delOnClose)
{
workfile->flags |= EXEC_WORKFILE_DEL_ON_CLOSE;
}
if (created)
{
workfile->flags |= EXEC_WORKFILE_CREATED;
elog(gp_workfile_caching_loglevel, "Created workfile %s, delOnClose = %d",
ExecWorkFile_GetFileName(workfile), delOnClose);
}
else
{
elog(gp_workfile_caching_loglevel, "Opened existing workfile %s, delOnClose = %d",
ExecWorkFile_GetFileName(workfile), delOnClose);
}
if ((gp_workfile_limit_per_query > 0) || (gp_workfile_limit_per_segment > 0))
{
workfile->flags |= EXEC_WORKFILE_LIMIT_SIZE;
}
}
/*
* Correct the size for a BFZ file when we are done appending to it.
*
* During writing to a BFZ, the amount of bytes writen to disk differs from
* the bytes passed in to write. We can have one of two situations:
* - BFZ is compressed. The size on disk is smaller than the bytes written
* - BFZ is uncompressed, and we're using checksumming. The size on disk is
* slightly larger than the bytes written
* Make the necessary correction here.
*/
static void
ExecWorkFile_AdjustBFZSize(ExecWorkFile *workfile, int64 file_size)
{
Assert(workfile != NULL);
#if USE_ASSERT_CHECKING
bfz_t *bfz_file = (bfz_t *) workfile->file;
#endif
if (file_size <= workfile->size)
{
/*
* Actual size on disk is smaller than expected. This can happen in two cases:
* - file on disk is compressed
* - we hit out of disk space
*/
Assert(bfz_file->compression_index > 0 || WorkfileDiskspace_IsFull());
WorkfileDiskspace_Commit(file_size, workfile->size, true /* update_query_size */);
workfile_update_in_progress_size(workfile, file_size - workfile->size);
workfile->size = file_size;
}
else
{
int64 extra_bytes = file_size - workfile->size;
/* Actual file on disk is bigger than expected. This can happen when:
* - added checksums to an uncompressed file
* - closing empty or very small compressed file (zlib header overhead larger than saved space)
*/
Assert( (bfz_file->has_checksum && bfz_file->compression_index == 0) || (bfz_file->compression_index > 0 && workfile->size < BFZ_BUFFER_SIZE));
/*
* If we're already under disk full, don't try to reserve, as it will
* fail anyway. We're in cleanup code in that case, and the file
* will be deleted soon.
*/
if (!WorkfileDiskspace_IsFull())
{
bool reserved = WorkfileDiskspace_Reserve(extra_bytes);
if (!reserved)
{
elog(gp_workfile_caching_loglevel, "Could not reserve " INT64_FORMAT " additional bytes while adjusting for BFZ addtional size",
extra_bytes);
workfile_mgr_report_error();
}
WorkfileDiskspace_Commit(extra_bytes, extra_bytes, true /* update_query_size */);
workfile_update_in_progress_size(workfile, extra_bytes);
workfile->size = file_size;
}
}
}
/* EOF */