blob: 54e5d27e02f2297e5e740e6f767f1a00baf1c121 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* workfile_file.c
* Implementation of workfile manager file operations
*
*-------------------------------------------------------------------------
*/
#include <postgres.h>
#include <unistd.h>
#include <sys/stat.h>
#include "utils/workfile_mgr.h"
#include "cdb/cdbvars.h"
static void retrieve_file_no(workfile_set *work_set, uint32 file_no, char *workfile_name, uint32 workfile_name_len);
static void update_workset_size(workfile_set *work_set, bool delOnClose, bool created, int64 size);
static void adjust_size_temp_file_new(workfile_set *work_set, int64 size);
static void adjust_size_persistent_file_new(workfile_set *work_set, int64 size);
static void adjust_size_temp_file_existing(workfile_set *work_set, int64 size);
static void adjust_size_persistent_file_existing(workfile_set *work_set, int64 size);
/*
* Creates a new workfile in a given set
*
* The next file name in the sequence is generated
*/
ExecWorkFile *
workfile_mgr_create_file(workfile_set *work_set)
{
Assert(NULL != work_set);
work_set->no_files++;
return workfile_mgr_create_fileno(work_set, work_set->no_files);
}
/*
* Creates a new numbered workfile in a given set
*
* The given file_no is used to generate the file name
*/
ExecWorkFile *
workfile_mgr_create_fileno(workfile_set *work_set, uint32 file_no)
{
Assert(NULL != work_set);
char file_name[MAXPGPATH];
retrieve_file_no(work_set, file_no, file_name, sizeof(file_name));
bool del_on_close = !work_set->can_be_reused;
ExecWorkFile *ewfile = ExecWorkFile_Create(file_name,
work_set->metadata.type,
del_on_close,
work_set->metadata.bfz_compress_type);
ExecWorkfile_SetWorkset(ewfile, work_set);
return ewfile;
}
/*
* Opens a numbered workfile of a given set
*
* The given file_no is used to generate the file name
*/
ExecWorkFile *
workfile_mgr_open_fileno(workfile_set *work_set, uint32 file_no)
{
Assert(NULL != work_set);
char file_name[MAXPGPATH];
retrieve_file_no(work_set, file_no, file_name, sizeof(file_name));
bool del_on_close = !work_set->can_be_reused;
ExecWorkFile *ewfile = ExecWorkFile_Open(file_name,
work_set->metadata.type,
del_on_close,
work_set->metadata.bfz_compress_type);
ExecWorkfile_SetWorkset(ewfile, work_set);
return ewfile;
}
/*
* Opens a given workfile of a given set
*
* The exact file_name given is used to open the file
*/
ExecWorkFile *
workfile_mgr_open_filename(workfile_set *work_set, const char *file_name)
{
Assert(false);
return NULL;
}
/*
* Closes a given workfile and updates the diskspace accordingly
*
* work_set can be NULL for workfile that were created outside of the workfile manager,
* e.g. for ShareInputScan workfiles or window functions
*
* Returns the actual size of the file on disk in bytes upon closing
*/
int64
workfile_mgr_close_file(workfile_set *work_set, ExecWorkFile *file, bool canReportError)
{
Assert(NULL != file);
bool delOnClose = file->flags & EXEC_WORKFILE_DEL_ON_CLOSE;
bool created = file->flags & EXEC_WORKFILE_CREATED;
elog(gp_workfile_caching_loglevel, "closing file %s, delOnClose=%d", ExecWorkFile_GetFileName(file), delOnClose);
int64 size = 0;
PG_TRY();
{
size = ExecWorkFile_Close(file, canReportError);
}
PG_CATCH();
{
elog(gp_workfile_caching_loglevel, "Caught exception, file=%s, returned size=" INT64_FORMAT " actual size before adjustment=" INT64_FORMAT,
file->fileName, size, ExecWorkFile_GetSize(file));
update_workset_size(work_set, delOnClose, created, ExecWorkFile_GetSize(file));
PG_RE_THROW();
}
PG_END_TRY();
update_workset_size(work_set, delOnClose, created, size);
return size;
}
/*
* Update the size of a workset after closing a member file
* work_set is the parent workset of the file
* delOnClose is set if the file is to be deleted upon closing (temporary)
* created is set if the file was created by this backend
* size is the final size of the file at the time of the close
*
*/
static void
update_workset_size(workfile_set *work_set, bool delOnClose, bool created, int64 size)
{
if (delOnClose)
{
if (created)
{
adjust_size_temp_file_new(work_set, size);
}
else
{
adjust_size_temp_file_existing(work_set, size);
}
}
else
{
if (created)
{
adjust_size_persistent_file_new(work_set, size);
}
else
{
adjust_size_persistent_file_existing(work_set, size);
}
}
}
/*
* Updating accounting of size when closing a temporary file we created
*/
static void
adjust_size_temp_file_new(workfile_set *work_set, int64 size)
{
#if USE_ASSERT_CHECKING
bool isCached = (NULL != work_set) && Cache_IsCached(CACHE_ENTRY_HEADER(work_set));
#endif
Assert(!isCached);
AssertImply((NULL != work_set), work_set->size == 0);
AssertImply((NULL != work_set), work_set->in_progress_size >= size);
if (NULL != work_set)
{
work_set->in_progress_size -= size;
}
WorkfileDiskspace_Commit(0, size, true /* update_query_size */);
elog(gp_workfile_caching_loglevel, "closed and deleted temp file, subtracted size " INT64_FORMAT " from disk space", size);
}
/*
* Updating accounting of size when closing a persistent file we created
*/
static void
adjust_size_persistent_file_new(workfile_set *work_set, int64 size)
{
#if USE_ASSERT_CHECKING
bool isCached = (NULL != work_set) && Cache_IsCached(CACHE_ENTRY_HEADER(work_set));
#endif
Assert(NULL != work_set && !isCached);
work_set->size += size;
elog(gp_workfile_caching_loglevel, "closed new persistent file, added size " INT64_FORMAT " to set space", size);
}
/*
* Updating accounting of size when closing an existing temporary file
* we opened for reading
*/
static void
adjust_size_temp_file_existing(workfile_set *work_set, int64 size)
{
elog(gp_workfile_caching_loglevel, "closing existing temp file, not deleting, nothing to do");
Assert(false && "We should not open existing temp files");
return;
}
/*
* Updating accounting of size when closing an existing persistent file
* we opened for reading
*/
static void
adjust_size_persistent_file_existing(workfile_set *work_set, int64 size)
{
#if USE_ASSERT_CHECKING
bool isCached = (NULL != work_set) && Cache_IsCached(CACHE_ENTRY_HEADER(work_set));
#endif
AssertEquivalent((NULL != work_set), isCached);
elog(gp_workfile_caching_loglevel, "closing existing persistent file, nothing to do");
return;
}
/*
* Retrieves the name of the file_no file in the workset. If the file was not
* already in the work_set, it is added.
* This can create gaps in the work_set if file_no > work_set->no_files.
*
* workfile_name is allocated by the caller. It will contain the name for
* the new file.
*/
static void
retrieve_file_no(workfile_set *work_set, uint32 file_no, char *workfile_name, uint32 workfile_name_len)
{
Assert(work_set);
Assert(workfile_name);
if (file_no >= work_set->no_files)
{
/* Retrieving file beyond the end of the set. Adjusting the size of the set */
work_set->no_files = file_no + 1;
}
snprintf(workfile_name, workfile_name_len,
"%s/spillfile_f%u", work_set->path, file_no);
}