| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * workfile_mgr_test.c |
| * Unit tests for workfile manager and cache. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include <postgres.h> |
| #include <unistd.h> |
| #include "cdb/cdbgang.h" |
| #include "cdb/cdbvars.h" |
| #include "executor/execWorkfile.h" |
| #include "miscadmin.h" |
| #include "postmaster/primary_mirror_mode.h" |
| #include "storage/bfz.h" |
| #include "storage/buffile.h" |
| #include "utils/atomic.h" |
| #include "utils/builtins.h" |
| #include "utils/logtape.h" |
| #include "utils/memutils.h" |
| #include "utils/sharedcache.h" |
| #include "utils/syncrefhashtable.h" |
| |
| #define TEST_NAME_LENGTH 50 |
| #define TEST_HT_NUM_ELEMENTS 8192 |
| |
| /* Number of Workfiles created during the "stress" workfile test */ |
| #define TEST_MAX_NUM_WORKFILES 100000 |
| |
| typedef struct TestSyncHTElt |
| { |
| char key[TEST_NAME_LENGTH]; |
| int data; |
| int numChildren; |
| int32 pinCount; |
| } TestSyncHTElt; |
| |
| typedef struct TestCacheElt |
| { |
| char key[TEST_NAME_LENGTH]; |
| int data; |
| } TestCacheElt; |
| |
| typedef struct TestPopParam |
| { |
| char key[TEST_NAME_LENGTH]; |
| int data; |
| } TestPopParam; |
| |
| int tests_passed; |
| int tests_failed; |
| int tests_total; |
| |
| /* Test definitions */ |
| static bool syncrefhashtable_test_basics(void); |
| static bool syncrefhashtable_test_concurrency_non_overlap(void); |
| static bool syncrefhashtable_test_error_cases(void); |
| static bool syncrefhashtable_test_full_table(void); |
| |
| static bool cache_test_acquire(void); |
| static bool cache_test_insert(void); |
| static bool cache_test_remove(void); |
| static bool cache_test_concurrency(void); |
| static bool cache_test_evict(void); |
| static bool cache_test_evict_stress(void); |
| static bool cache_test_clear(void); |
| |
| static bool bfz_test_reopen(void); |
| static bool execworkfile_buffile_test(void); |
| static bool execworkfile_bfz_zlib_test(void); |
| static bool execworkfile_bfz_uncompressed_test(void); |
| static bool fd_tests(void); |
| static bool buffile_size_test(void); |
| static bool buffile_large_file_test(void); |
| static bool logicaltape_test(void); |
| static bool fd_large_file_test(void); |
| static bool execworkfile_create_one_MB_file(void); |
| static bool workfile_queryspace(void); |
| static bool workfile_fill_sharedcache(void); |
| static bool workfile_create_and_set_cleanup(void); |
| static bool workfile_create_and_individual_cleanup(void); |
| |
| static bool atomic_test(void); |
| |
| /* Local helpers */ |
| static SyncHT *syncrefhastable_test_create(void); |
| static Cache *cache_test_create(); |
| |
| /* Unit tests helper */ |
| static void unit_test_result(bool result); |
| static void unit_test_reset(void); |
| static bool unit_test_summary(void); |
| |
| |
| #define GET_STR(textp) DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) |
| |
| typedef bool (*gp_workfile_mgr_test)(void); |
| |
| typedef struct test_def |
| { |
| char *test_name; |
| gp_workfile_mgr_test test_func; |
| } test_def; |
| |
| static test_def test_defns[] = { |
| {"syncrefhashtable_test_basics", syncrefhashtable_test_basics}, |
| {"syncrefhashtable_test_concurrency_non_overlap", syncrefhashtable_test_concurrency_non_overlap}, |
| {"syncrefhashtable_test_error_cases", syncrefhashtable_test_error_cases}, |
| {"syncrefhashtable_test_full_table", syncrefhashtable_test_full_table}, |
| {"cache_test_aquire", cache_test_acquire}, |
| {"cache_test_insert", cache_test_insert}, |
| {"cache_test_remove", cache_test_remove}, |
| {"cache_test_concurrency", cache_test_concurrency}, |
| {"cache_test_evict", cache_test_evict}, |
| {"cache_test_evict_stress", cache_test_evict_stress}, |
| {"cache_test_clear", cache_test_clear}, |
| {"bfz_test_reopen", bfz_test_reopen}, |
| {"execworkfile_buffile_test", execworkfile_buffile_test}, |
| {"execworkfile_bfz_zlib_test", execworkfile_bfz_zlib_test}, |
| {"execworkfile_bfz_uncompressed_test", execworkfile_bfz_uncompressed_test}, |
| {"atomic_test", atomic_test}, |
| {"fd_tests", fd_tests}, |
| {"buffile_size_test", buffile_size_test}, |
| {"buffile_large_file_test", buffile_large_file_test}, |
| {"logicaltape_test", logicaltape_test}, |
| {"fd_large_file_test",fd_large_file_test}, |
| {"execworkfile_create_one_MB_file",execworkfile_create_one_MB_file}, |
| {"workfile_queryspace", workfile_queryspace}, |
| {"workfile_fill_sharedcache", workfile_fill_sharedcache}, |
| {"workfile_create_and_set_cleanup", workfile_create_and_set_cleanup}, |
| {"workfile_create_and_individual_cleanup", workfile_create_and_individual_cleanup}, |
| {NULL, NULL}, /* This has to be the last element of the array */ |
| }; |
| |
| |
| Datum |
| gp_workfile_mgr_test_harness(PG_FUNCTION_ARGS) |
| { |
| bool result = true; |
| |
| Assert(PG_NARGS() == 1); |
| |
| char *test_name = GET_STR(PG_GETARG_TEXT_P(0)); |
| bool run_all_tests = strcasecmp(test_name, "all") == 0; |
| bool ran_any_tests = false; |
| |
| int crt_test = 0; |
| while (NULL != test_defns[crt_test].test_name) |
| { |
| if (run_all_tests || (strcasecmp(test_name, test_defns[crt_test].test_name) == 0)) |
| { |
| result = result && test_defns[crt_test].test_func(); |
| ran_any_tests = true; |
| } |
| crt_test++; |
| } |
| |
| if (!ran_any_tests) |
| { |
| elog(LOG, "No tests match given name: %s", test_name); |
| } |
| |
| PG_RETURN_BOOL(ran_any_tests && result); |
| } |
| |
| /* |
| * Callback function to test if two cache resources are equivalent. |
| */ |
| static bool |
| cacheEltEquivalent(const void *resource1, const void *resource2) |
| { |
| TestCacheElt *elt1 = (TestCacheElt *) resource1; |
| TestCacheElt *elt2 = (TestCacheElt *) resource2; |
| |
| return elt1->data == elt2->data && |
| strncmp(elt1->key, elt2->key, TEST_NAME_LENGTH) == 0; |
| } |
| |
| /* |
| * Callback function to do the client-side cleanup for an entry that is being |
| * removed from the cache. |
| * |
| * Resource is the payload of the entry. |
| */ |
| static void |
| cacheEltCleanup(const void *resource) |
| { |
| TestCacheElt *elt = (TestCacheElt *) resource; |
| |
| CACHE_ASSERT_VALID(CACHE_ENTRY_HEADER(elt)); |
| |
| elog(gp_workfile_caching_loglevel, "deleting TestCacheElt. [key=""%s"", data=%d]", |
| elt->key, elt->data); |
| |
| elt->data = -1; |
| } |
| |
| static void |
| cacheEltPopulate(const void *resource, const void *param) |
| { |
| Assert(NULL != resource); |
| |
| if (NULL == param) |
| { |
| elog(gp_workfile_caching_loglevel, "Returning uninitialized entry"); |
| return; |
| } |
| |
| TestCacheElt *elt = (TestCacheElt *) resource; |
| TestPopParam *eltInfo = (TestPopParam *) param; |
| |
| strncpy(elt->key, eltInfo->key, TEST_NAME_LENGTH); |
| elt->data = eltInfo->data; |
| } |
| |
| /* |
| * Function to be called to cleanup in event of client error |
| * |
| * Releases and surrenders all client owned entries back to the cache |
| */ |
| static void |
| Cache_TeardownCallback(XactEvent event, void *arg) |
| { |
| Assert(NULL != arg); |
| |
| Cache *cache = (Cache *) arg; |
| |
| elog(LOG, "Calling cleanup now for cache %s with %d used entries", |
| cache->cacheName, |
| cache->cacheHdr->nEntries - cache->cacheHdr->cacheStats.noFreeEntries); |
| |
| Cache_SurrenderClientEntries(cache); |
| } |
| |
| static Cache * |
| cache_test_create() |
| { |
| CacheCtl cacheCtl; |
| MemSet(&cacheCtl, 0, sizeof(CacheCtl)); |
| |
| cacheCtl.entrySize = sizeof(TestCacheElt); |
| cacheCtl.keySize = TEST_NAME_LENGTH; |
| cacheCtl.keyOffset = GPDB_OFFSET(TestCacheElt, key); |
| |
| cacheCtl.hash = string_hash; |
| cacheCtl.keyCopy = (HashCopyFunc) strncpy; |
| cacheCtl.match = (HashCompareFunc) strncmp; |
| |
| cacheCtl.equivalentEntries = cacheEltEquivalent; |
| cacheCtl.cleanupEntry = cacheEltCleanup; |
| cacheCtl.populateEntry = cacheEltPopulate; |
| |
| cacheCtl.maxSize = TEST_HT_NUM_ELEMENTS; |
| cacheCtl.cacheName = "Test Cache"; |
| |
| cacheCtl.baseLWLockId = FirstWorkfileMgrLock; |
| cacheCtl.numPartitions = NUM_WORKFILEMGR_PARTITIONS; |
| |
| |
| Cache *cache = NULL; |
| cache = Cache_Create(&cacheCtl); |
| |
| Assert(cache); |
| RegisterXactCallbackOnce(Cache_TeardownCallback, cache); |
| |
| return cache; |
| } |
| |
| static bool |
| cache_test_acquire(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: cache_test_acquire"); |
| |
| elog(LOG, "Running sub-test: CacheCreate"); |
| Cache *cache = cache_test_create(); |
| unit_test_result(cache != NULL); |
| |
| elog(LOG, "Running sub-test: CacheAcquireEntry"); |
| |
| TestPopParam param; |
| strncpy(param.key, "Test Key 1", TEST_NAME_LENGTH); |
| param.data = 4567; |
| |
| CacheEntry *entry = Cache_AcquireEntry(cache, ¶m); |
| unit_test_result(entry != NULL); |
| |
| |
| elog(LOG, "Running sub-test: CacheSurrenderEntry"); |
| Cache_Release(cache, entry); |
| unit_test_result(true); |
| |
| return unit_test_summary(); |
| } |
| |
| static bool |
| cache_test_insert() |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: cache_test_insert"); |
| |
| elog(LOG, "Running sub-test: CacheCreate"); |
| Cache *cache = cache_test_create(); |
| unit_test_result(cache != NULL); |
| |
| elog(LOG, "Running sub-test: CacheAcquireEntry"); |
| |
| TestPopParam param; |
| strncpy(param.key, "Test Key 2", TEST_NAME_LENGTH); |
| param.data = 1111; |
| |
| CacheEntry *entry = Cache_AcquireEntry(cache, ¶m); |
| unit_test_result(entry != NULL); |
| |
| elog(LOG, "Running sub-test: CacheInsert"); |
| |
| Cache_Insert(cache, entry); |
| Cache_Release(cache, entry); |
| unit_test_result(true); |
| |
| /* Look-up test */ |
| elog(LOG, "Running sub-test: CacheLookup"); |
| |
| strncpy(param.key, "Test Key 2", TEST_NAME_LENGTH); |
| param.data = 1111; |
| CacheEntry *localEntry = Cache_AcquireEntry(cache, ¶m); |
| |
| CacheEntry *lookedUpEntry = Cache_Lookup(cache, localEntry); |
| unit_test_result(lookedUpEntry != NULL); |
| |
| Cache_Release(cache, lookedUpEntry); |
| |
| elog(LOG, "Running sub-test: CacheLookup equal key, no match on value"); |
| TestCacheElt *elt = CACHE_ENTRY_PAYLOAD(localEntry); |
| strncpy(elt->key, "Test Key 2", TEST_NAME_LENGTH); |
| elt->data = 1234; |
| lookedUpEntry = Cache_Lookup(cache, localEntry); |
| unit_test_result(lookedUpEntry == NULL); |
| |
| elog(LOG, "Running sub-test: CacheLookup different key, no match"); |
| elt = CACHE_ENTRY_PAYLOAD(localEntry); |
| strncpy(elt->key, "Test Key bogus", TEST_NAME_LENGTH); |
| elt->data = 1111; |
| lookedUpEntry = Cache_Lookup(cache, localEntry); |
| unit_test_result(lookedUpEntry == NULL); |
| |
| Cache_Release(cache, localEntry); |
| return unit_test_summary(); |
| |
| } |
| |
| static bool |
| cache_test_remove(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: cache_test_remove"); |
| |
| elog(LOG, "Running sub-test: CacheCreate"); |
| Cache *cache = cache_test_create(); |
| unit_test_result(cache != NULL); |
| |
| char *testKey = "Test Remove Key 2"; |
| |
| /* Insert one entry */ |
| TestPopParam param; |
| strncpy(param.key, testKey, TEST_NAME_LENGTH); |
| param.data = 1111; |
| CacheEntry *entry1 = Cache_AcquireEntry(cache, ¶m); |
| Cache_Insert(cache, entry1); |
| Cache_Release(cache, entry1); |
| |
| /* Insert another entry */ |
| strncpy(param.key, testKey, TEST_NAME_LENGTH); |
| param.data = 2222; |
| CacheEntry *entry2 = Cache_AcquireEntry(cache, ¶m); |
| Cache_Insert(cache, entry2); |
| Cache_Release(cache, entry2); |
| |
| /* Look-up and remove an entry */ |
| strncpy(param.key, testKey, TEST_NAME_LENGTH); |
| param.data = 2222; |
| CacheEntry *localEntry = Cache_AcquireEntry(cache, ¶m); |
| |
| elog(LOG, "Running sub-test: Look-up inserted element"); |
| CacheEntry *lookedUpEntry = Cache_Lookup(cache, localEntry); |
| unit_test_result(lookedUpEntry != NULL); |
| |
| elog(LOG, "Running sub-test: Remove looked-up element"); |
| Cache_Remove(cache, lookedUpEntry); |
| Cache_Release(cache, lookedUpEntry); |
| unit_test_result(true); |
| |
| elog(LOG, "Running sub-test: Look up removed element"); |
| TestCacheElt *elt3 = CACHE_ENTRY_PAYLOAD(localEntry); |
| strncpy(elt3->key, testKey, TEST_NAME_LENGTH); |
| elt3->data = 2222; |
| lookedUpEntry = Cache_Lookup(cache, localEntry); |
| unit_test_result(lookedUpEntry == NULL); |
| |
| elog(LOG, "Running sub-test: Look up existing element"); |
| elt3 = CACHE_ENTRY_PAYLOAD(localEntry); |
| strncpy(elt3->key, testKey, TEST_NAME_LENGTH); |
| elt3->data = 1111; |
| lookedUpEntry = Cache_Lookup(cache, localEntry); |
| unit_test_result(lookedUpEntry != NULL); |
| |
| Cache_Release(cache, localEntry); |
| Cache_Release(cache, lookedUpEntry); |
| |
| return unit_test_summary(); |
| } |
| |
| static bool |
| cache_test_concurrency(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: cache_test_concurrency"); |
| |
| elog(LOG, "Running sub-test: CacheCreate"); |
| Cache *cache = cache_test_create(); |
| unit_test_result(cache != NULL); |
| |
| /* Number of elements in the array to hold test entries */ |
| const int noTestEntries = 1000; |
| CacheEntry *entries[noTestEntries]; |
| char key[TEST_NAME_LENGTH]; |
| |
| elog(LOG, "Running sub-test: Cache insert/lookup/delete many elements"); |
| int noTestIterations = 5000; |
| int i = 0; |
| int iterNo = 0; |
| bool testFailed = false; |
| |
| TestPopParam param; |
| |
| for (iterNo = 0; iterNo < noTestIterations ; iterNo ++) |
| { |
| if (testFailed) |
| { |
| break; |
| } |
| |
| /* Insert noTestEntries elements */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| /* If we include Pid in the key, we get short chains */ |
| /* snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); */ |
| snprintf(key, TEST_NAME_LENGTH, "cache key no. %d", i); |
| |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| entries[i] = Cache_AcquireEntry(cache, ¶m); |
| |
| if (entries[i] == NULL) |
| { |
| elog(LOG, "Could not acquire entry"); |
| testFailed = true; |
| break; |
| } |
| |
| Cache_Insert(cache, entries[i]); |
| Cache_Release(cache, entries[i]); |
| } |
| |
| if (testFailed) |
| { |
| break; |
| } |
| |
| /* Look up noTestEntries */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| //snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); |
| snprintf(key, TEST_NAME_LENGTH, "cache key no. %d", i); |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| CacheEntry *localEntry = Cache_AcquireEntry(cache, ¶m); |
| if (localEntry == NULL) |
| { |
| elog(LOG, "Could not acquire entry"); |
| testFailed = true; |
| break; |
| } |
| |
| entries[i] = Cache_Lookup(cache, localEntry); |
| if (entries[i] == NULL) |
| { |
| elog(LOG, "Could not find inserted entry"); |
| testFailed = true; |
| break; |
| } |
| |
| Cache_Release(cache, localEntry); |
| } |
| |
| if (testFailed) |
| { |
| break; |
| } |
| |
| /* Delete and release noTestEntries */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| Cache_Remove(cache, entries[i]); |
| Cache_Release(cache, entries[i]); |
| } |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* Look up no TestEntries again, should not be found */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| //snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); |
| snprintf(key, TEST_NAME_LENGTH, "cache key no. %d", i); |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| CacheEntry *localEntry = Cache_AcquireEntry(cache, ¶m); |
| if (localEntry == NULL) |
| { |
| elog(LOG, "Could not acquire entry"); |
| testFailed = true; |
| break; |
| } |
| |
| entries[i] = Cache_Lookup(cache, localEntry); |
| if (entries[i] != NULL) |
| { |
| elog(LOG, "Unexpected entry found in cache"); |
| testFailed = true; |
| break; |
| } |
| |
| Cache_Release(cache, localEntry); |
| } |
| |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| unit_test_result(!testFailed); |
| return unit_test_summary(); |
| } |
| |
| static bool |
| cache_test_evict(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: cache_test_evict"); |
| |
| elog(LOG, "Running sub-test: CacheCreate"); |
| Cache *cache = cache_test_create(); |
| unit_test_result(cache != NULL); |
| |
| /* Number of elements in the array to hold test entries */ |
| const int noTestEntries = 20; |
| const int entryWeight = 3; |
| CacheEntry *entries[noTestEntries]; |
| TestPopParam param; |
| char key[TEST_NAME_LENGTH]; |
| |
| |
| /* Inserting noTestEntries entries */ |
| elog(LOG, "Running sub-test: Inserting elements"); |
| int i; |
| for (i=0; i < noTestEntries; i++) |
| { |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| entries[i] = Cache_AcquireEntry(cache, ¶m); |
| Assert(NULL != entries[i]); |
| |
| entries[i]->size = entryWeight; |
| entries[i]->utility = random() % 100; |
| |
| Cache_Insert(cache, entries[i]); |
| Cache_Release(cache, entries[i]); |
| } |
| unit_test_result(true); |
| |
| /* Evicting elements */ |
| elog(LOG, "Running sub-test: Succesful eviction"); |
| |
| int64 evictRequest = (noTestEntries * entryWeight) / 2 + 1; /* Half of the entries + 1 */ |
| int64 evictActual = Cache_Evict(cache, evictRequest); |
| |
| /* Expected result: Evicted half plus one entries */ |
| unit_test_result(evictActual == evictRequest - 1 + entryWeight); |
| |
| elog(LOG, "Running sub-test: Unsuccesful eviction"); |
| evictRequest = (noTestEntries * entryWeight) / 2; /* Half of the entries */ |
| evictActual = Cache_Evict(cache, evictRequest); |
| |
| /* Expected result: Evicted one less entry than requested */ |
| unit_test_result(evictActual == evictRequest - entryWeight); |
| return unit_test_summary(); |
| } |
| |
| static bool |
| cache_test_evict_stress(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: cache_test_evict_stress"); |
| |
| elog(LOG, "Running sub-test: CacheCreate"); |
| Cache *cache = cache_test_create(); |
| unit_test_result(cache != NULL); |
| |
| /* Number of elements in the array to hold test entries */ |
| const int noTestEntries = 1000; |
| const int entryWeight = 3; |
| CacheEntry *entries[noTestEntries]; |
| char key[TEST_NAME_LENGTH]; |
| bool testFailed = false; |
| const uint32 noTestIterations = 1000; |
| uint32 iterNo; |
| TestPopParam param; |
| |
| elog(LOG, "Running sub-test: Cache insert/lookup/evict many elements"); |
| for (iterNo = 0; iterNo < noTestIterations ; iterNo ++) |
| { |
| if (testFailed) |
| { |
| break; |
| } |
| |
| /* Inserting noTestEntries entries */ |
| int i; |
| for (i=0; i < noTestEntries; i++) |
| { |
| /* If we include Pid in the key, we get short chains */ |
| /* snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); */ |
| snprintf(key, TEST_NAME_LENGTH, "cache key no. %d", i); |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| entries[i] = Cache_AcquireEntry(cache, ¶m); |
| Assert(NULL != entries[i]); |
| |
| |
| entries[i]->size = entryWeight; |
| entries[i]->utility = random() % 100; |
| |
| Cache_Insert(cache, entries[i]); |
| Cache_Release(cache, entries[i]); |
| } |
| |
| if (testFailed) |
| { |
| break; |
| } |
| |
| /* Look up noTestEntries */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| /* snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); */ |
| snprintf(key, TEST_NAME_LENGTH, "cache key no. %d", i); |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| CacheEntry *localEntry = Cache_AcquireEntry(cache, ¶m); |
| if (localEntry == NULL) |
| { |
| elog(LOG, "Could not acquire entry"); |
| testFailed = true; |
| break; |
| } |
| |
| entries[i] = Cache_Lookup(cache, localEntry); |
| /* |
| * Since we're possibly running evictions from other clients in the meantime, |
| * some of these elements will not be found. But that's ok, we can still |
| * look them up to exercise that mechanism |
| */ |
| |
| if (NULL != entries[i]) |
| { |
| Cache_Release(cache, entries[i]); |
| } |
| |
| Cache_Release(cache, localEntry); |
| } |
| |
| if (testFailed) |
| { |
| break; |
| } |
| |
| /* Evict noTestEntries x weight from cache */ |
| int64 evictRequest = noTestEntries * entryWeight; |
| int64 evictSize = Cache_Evict(cache, evictRequest); |
| |
| /* XXX Under high concurrency, this test can actually legally fail. |
| * If someone else just evicted everything we added, and we have to |
| * wait for someone else to insert something we can evict, but it's |
| * happening too slowly |
| */ |
| |
| if (evictSize != evictRequest) |
| { |
| elog(LOG, "Could not satisfy evict. Requested= " INT64_FORMAT " evicted=" INT64_FORMAT, evictRequest, evictSize); |
| testFailed = true; |
| break; |
| } |
| |
| CHECK_FOR_INTERRUPTS(); |
| } |
| |
| unit_test_result(!testFailed); |
| return unit_test_summary(); |
| } |
| |
| static bool |
| cache_test_clear(void) |
| { |
| int32 noDeleted = 0; |
| |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: cache_test_evict_clear"); |
| |
| elog(LOG, "Running sub-test: CacheCreate"); |
| Cache *cache = cache_test_create(); |
| unit_test_result(cache != NULL); |
| |
| elog(LOG, "Running sub-test: Cache_Clear on empty"); |
| noDeleted = Cache_Clear(cache); |
| unit_test_result(noDeleted == 0); |
| |
| /* Number of elements in the array to hold test entries */ |
| const int noTestEntries = 20; |
| const int entryWeight = 3; |
| CacheEntry *entries[noTestEntries]; |
| char key[TEST_NAME_LENGTH]; |
| TestPopParam param; |
| |
| |
| /* Inserting noTestEntries entries */ |
| elog(LOG, "Running sub-test: Cache_Clear with %d inserted elements", noTestEntries); |
| int i; |
| for (i=0; i < noTestEntries; i++) |
| { |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); |
| entries[i] = Cache_AcquireEntry(cache, ¶m); |
| Assert(NULL != entries[i]); |
| |
| entries[i]->size = entryWeight; |
| entries[i]->utility = random() % 100; |
| |
| Cache_Insert(cache, entries[i]); |
| Cache_Release(cache, entries[i]); |
| } |
| |
| /* Clear should clear all of them */ |
| noDeleted = Cache_Clear(cache); |
| unit_test_result(noDeleted == noTestEntries); |
| |
| |
| elog(LOG, "Running sub-test: Looking up %d elements after they got cleared", noTestEntries); |
| |
| bool testFailed = false; |
| CacheEntry *localEntry = Cache_AcquireEntry(cache, NULL); |
| TestCacheElt *localElt = CACHE_ENTRY_PAYLOAD(localEntry); |
| localElt->data = MyProcPid; |
| for (i=0; i < noTestEntries; i++) |
| { |
| snprintf(localElt->key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); |
| CacheEntry *foundEntry = Cache_Lookup(cache, localEntry); |
| if (foundEntry != NULL) |
| { |
| /* Found an entry that was supposed to be cleared out, error out! */ |
| testFailed = true; |
| Cache_Release(cache, foundEntry); |
| break; |
| } |
| } |
| |
| Cache_Release(cache, localEntry); |
| unit_test_result(!testFailed); |
| |
| /* Acquiring but not inserting noTestEntries entries */ |
| elog(LOG, "Running sub-test: Cache_Clear with %d acquired elements", noTestEntries); |
| for (i=0; i < noTestEntries; i++) |
| { |
| /* Put some payload in the entry */ |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d cache key no. %d", MyProcPid, i); |
| strncpy(param.key, key, TEST_NAME_LENGTH); |
| param.data = MyProcPid; |
| |
| entries[i] = Cache_AcquireEntry(cache, ¶m); |
| Assert(NULL != entries[i]); |
| } |
| |
| /* Clear should clear none of them */ |
| noDeleted = Cache_Clear(cache); |
| unit_test_result(noDeleted == 0); |
| |
| elog(LOG, "Running sub-test: Cache_Clear after releasing all acquired elements"); |
| for (i=0; i < noTestEntries; i++) |
| { |
| Cache_Release(cache, entries[i]); |
| } |
| |
| /* Clear should clear none of them */ |
| noDeleted = Cache_Clear(cache); |
| unit_test_result(noDeleted == 0); |
| |
| |
| return unit_test_summary(); |
| } |
| |
| |
| /* |
| * Callback function to test if an entry in the hashtable is "empty" |
| */ |
| static bool |
| isTestEltEmpty(const void *entry) |
| { |
| TestSyncHTElt *testElt = (TestSyncHTElt *) entry; |
| return testElt->numChildren == 0; |
| } |
| |
| /* |
| * Callback function to initialize an entry before returning to the caller |
| */ |
| static void |
| initTestElt(void *entry) |
| { |
| TestSyncHTElt *testElt = (TestSyncHTElt *) entry; |
| testElt->numChildren = 0; |
| testElt->pinCount = 0; |
| } |
| |
| static bool |
| syncrefhashtable_test_basics(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: syncrefhashtable_test_basics"); |
| |
| elog(LOG, "Running sub-test: SyncHTCreate"); |
| SyncHT *syncHT = syncrefhastable_test_create(); |
| unit_test_result(syncHT != NULL); |
| Assert(syncHT); |
| |
| char *keyText = "Key one"; |
| |
| elog(LOG, "Running sub-test: SyncHTInsert first element"); |
| bool existing = false; |
| |
| /* Number of elements in the array to hold test entries */ |
| const int noTestEntries = 20; |
| |
| TestSyncHTElt *elements[noTestEntries]; |
| long numEntries = 0; |
| |
| elements[0] = (TestSyncHTElt *) SyncHTInsert(syncHT, (void *) keyText, &existing); |
| unit_test_result(elements[0] != NULL && !existing); |
| |
| elog(LOG, "Running sub-test: SyncHTInsert duplicate element"); |
| existing = false; |
| elements[1] = (TestSyncHTElt *) SyncHTInsert(syncHT, (void *) keyText, &existing); |
| unit_test_result(elements[1] != NULL && existing); |
| |
| elog(LOG, "Running sub-test: SyncHTInsert unique element"); |
| keyText = "Key two"; |
| existing = false; |
| elements[2] = (TestSyncHTElt *) SyncHTInsert(syncHT, (void *) keyText, &existing); |
| unit_test_result(elements[2] != NULL && !existing); |
| |
| elog(LOG, "Running sub-test: SyncHTRelease"); |
| bool deleted[noTestEntries]; |
| deleted[0] = SyncHTRelease(syncHT, elements[0]); |
| deleted[1] = SyncHTRelease(syncHT, elements[1]); |
| deleted[2] = SyncHTRelease(syncHT, elements[2]); |
| numEntries = SyncHTNumEntries(syncHT); |
| |
| unit_test_result(numEntries == 0 && !deleted[0] && deleted[1] && deleted[2]); |
| |
| |
| elog(LOG, "Running sub-test: SyncHTDestroy"); |
| SyncHTDestroy(syncHT); |
| unit_test_result(true); |
| |
| return unit_test_summary(); |
| } |
| |
| static bool |
| syncrefhashtable_test_concurrency_non_overlap(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: syncrefhashtable_test_concurrency_non_overlap"); |
| |
| elog(LOG, "Running sub-test: SyncHTCreate"); |
| SyncHT *syncHT = syncrefhastable_test_create(); |
| unit_test_result(syncHT != NULL); |
| Assert(syncHT); |
| |
| /* Number of elements in the array to hold test entries */ |
| const int noTestEntries = 1000; |
| TestSyncHTElt *elements[noTestEntries]; |
| bool existing = false, deleted = false; |
| |
| char key[TEST_NAME_LENGTH]; |
| |
| elog(LOG, "Running sub-test: SyncHTInsert many elements"); |
| int noTestIterations = 5000; |
| int i = 0; |
| int iterNo = 0; |
| bool testFailed = false; |
| |
| for (iterNo = 0; iterNo < noTestIterations ; iterNo ++) |
| { |
| if (testFailed) |
| { |
| break; |
| } |
| |
| /* Insert noTestEntries elements */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d key no. %d", MyProcPid, i); |
| |
| elements[i] = SyncHTInsert(syncHT, key, &existing); |
| if (existing) |
| { |
| testFailed = true; |
| break; |
| } |
| elements[i]->numChildren = 123; |
| } |
| |
| /* Release noTestEntries elements, don't get deleted */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| deleted = SyncHTRelease(syncHT, elements[i]); |
| if (deleted) |
| { |
| testFailed = true; |
| break; |
| } |
| } |
| |
| // numEntries = SyncHTNumEntries(syncHT); /* XXX This does not hold for concurrency */ |
| // if (numEntries != noTestEntries) |
| // { |
| // testFailed = true; |
| // break; |
| // } |
| |
| |
| /* Look-up noTestEntries elements. Empty them and release them, they should get deleted */ |
| for (i=0; i < noTestEntries; i++) |
| { |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d key no. %d", MyProcPid, i); |
| TestSyncHTElt *foundElt = (TestSyncHTElt *) SyncHTLookup(syncHT, key); |
| if (!foundElt) |
| { |
| testFailed = true; |
| break; |
| } |
| /* Logically empty the element */ |
| foundElt->numChildren = 0; |
| |
| /* Release element. It should get deleted. */ |
| deleted = SyncHTRelease(syncHT, foundElt); |
| if (!deleted) |
| { |
| testFailed = true; |
| break; |
| } |
| } |
| |
| // numEntries = SyncHTNumEntries(syncHT); /* XXX This does not hold for concurrency */ |
| // if (numEntries != 0) |
| // { |
| // testFailed = true; |
| // break; |
| // } |
| } /* for iterNo */ |
| |
| unit_test_result(!testFailed); |
| |
| elog(LOG, "Running sub-test: SyncHTDestroy"); |
| SyncHTDestroy(syncHT); |
| unit_test_result(true); |
| |
| return unit_test_summary(); |
| } |
| |
| static bool |
| syncrefhashtable_test_error_cases(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: syncrefhashtable_test_error_cases"); |
| |
| elog(LOG, "Running sub-test: SyncHTCreate"); |
| SyncHT *syncHT = syncrefhastable_test_create(); |
| unit_test_result(syncHT != NULL); |
| Assert(syncHT); |
| |
| elog(LOG, "Running sub-test: SyncHTInsert double remove"); |
| |
| TestSyncHTElt *element; |
| char *keyText = "Key one"; |
| bool existing; |
| bool deleted; |
| |
| element = SyncHTInsert(syncHT, keyText, &existing); |
| deleted = SyncHTRelease(syncHT, element); |
| Assert(deleted); |
| |
| PG_TRY(); |
| { |
| deleted = SyncHTRelease(syncHT, element); |
| |
| /* Should not get here, above line throws */ |
| unit_test_result(false); |
| } |
| PG_CATCH(); |
| { |
| unit_test_result(true); |
| } |
| PG_END_TRY(); |
| |
| return unit_test_summary(); |
| } |
| |
| static bool |
| syncrefhashtable_test_full_table(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: syncrefhashtable_test_error_cases"); |
| |
| elog(LOG, "Running sub-test: SyncHTCreate"); |
| SyncHT *syncHT = syncrefhastable_test_create(); |
| unit_test_result(syncHT != NULL); |
| Assert(syncHT); |
| |
| elog(LOG, "Running sub-test: SyncHTInsert into full hashtable"); |
| |
| /* Number of elements in the array to hold test entries */ |
| TestSyncHTElt *elements[10 * TEST_HT_NUM_ELEMENTS + 1]; |
| char key[TEST_NAME_LENGTH]; |
| bool existing = false, deleted = false; |
| bool testFailed = false; |
| int i; |
| |
| /* Fill up the hashtable */ |
| for (i=0; i < 2 * TEST_HT_NUM_ELEMENTS; i++) |
| { |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d key no. %d", MyProcPid, i); |
| |
| elements[i] = SyncHTInsert(syncHT, key, &existing); |
| if (existing || !elements[i]) |
| { |
| elog(LOG, "Filling up the hashtable failed after %d inserts", i + 1); |
| testFailed = true; |
| break; |
| } |
| } |
| |
| int numEntries = SyncHTNumEntries(syncHT); /* XXX This does not hold for concurrency */ |
| elog(LOG, "Hashtable says it hash %d entries", numEntries); |
| |
| unit_test_result(!testFailed && numEntries == TEST_HT_NUM_ELEMENTS); |
| |
| /* Insert elements in full hashtable */ |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d key no. %d", MyProcPid, TEST_HT_NUM_ELEMENTS); |
| elements[TEST_HT_NUM_ELEMENTS] = SyncHTInsert(syncHT, key, &existing); |
| |
| /* Insertion should has failed */ |
| unit_test_result(elements[TEST_HT_NUM_ELEMENTS] == NULL && !existing); |
| |
| |
| /* Look-up noTestEntries elements. Release them, they should get deleted */ |
| for (i=0; i < TEST_HT_NUM_ELEMENTS; i++) |
| { |
| snprintf(key, TEST_NAME_LENGTH, "PID=%d key no. %d", MyProcPid, i); |
| TestSyncHTElt *foundElt = (TestSyncHTElt *) SyncHTLookup(syncHT, key); |
| if (!foundElt) |
| { |
| testFailed = true; |
| break; |
| } |
| |
| /* Release element. It should get deleted. */ |
| deleted = SyncHTRelease(syncHT, foundElt); |
| if (!deleted) |
| { |
| testFailed = true; |
| break; |
| } |
| } |
| |
| unit_test_result(!testFailed); |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Unit test for bfz reopen operation |
| */ |
| static bool |
| bfz_test_reopen(void) |
| { |
| unit_test_reset(); |
| |
| elog(LOG, "Running test: bfz_test_reopen"); |
| |
| StringInfo filename = makeStringInfo(); |
| |
| appendStringInfo(filename, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| "Test_bfz.dat"); |
| |
| StringInfo text = makeStringInfo(); |
| appendStringInfo(text,"Small amount of data to test."); |
| |
| elog(LOG, "Running sub-test: Creating file %s", filename->data); |
| |
| /*Write data to file.*/ |
| bfz_t * fileWrite = bfz_create(filename->data, false, TRUE); |
| fileWrite->del_on_close=false; |
| bfz_append(fileWrite, text->data, text->len); |
| /*Flush data*/ |
| bfz_append_end(fileWrite); |
| |
| /*Read data back from file.*/ |
| char result[TEST_NAME_LENGTH]; |
| int result_size = 0; |
| |
| elog(LOG, "Running sub-test: Reading file %s", filename->data); |
| bfz_t * fileRead = bfz_open(filename->data, true, TRUE); |
| /*Seek 0*/ |
| bfz_scan_begin(fileRead); |
| result_size = bfz_scan_next(fileRead,result,text->len); |
| |
| unit_test_result( (result_size == text->len) && |
| (strncmp(text->data, result, text->len) == 0)); |
| |
| pfree(filename->data); |
| pfree(text->data); |
| pfree(filename); |
| pfree(text); |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Creates a StringInfo object holding n_chars characters. |
| */ |
| static StringInfo |
| create_text_stringinfo(int64 n_chars) |
| { |
| StringInfo strInfo = makeStringInfo(); |
| char *text = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; |
| |
| int64 to_write = n_chars; |
| while (to_write >= (int64) strlen(text)) |
| { |
| appendStringInfo(strInfo, "%s", text); |
| to_write -= (int64) strlen(text); |
| } |
| Assert(to_write >= 0 && to_write < (int64) strlen(text)); |
| |
| /* Pad end */ |
| while (to_write > 0) |
| { |
| appendStringInfoChar(strInfo, 'P'); |
| to_write--; |
| } |
| |
| return strInfo; |
| } |
| |
| /* |
| * Removes a physical file. Updates the WorkfileDiskspace accounting |
| * to reflect the deletion if requested. |
| * Used as part of the unit test cleanup to remove artifacts. |
| * |
| * path includes the pgsql_tmp path but not the filespace. |
| * |
| * Returns true for successful deletion, false otherwise. |
| */ |
| static bool |
| remove_tmp_file(char *path, int64 size, bool update_diskspace) |
| { |
| Assert(NULL != path); |
| |
| /* Create path to file by adding crt temp path */ |
| StringInfo full_filepath = makeStringInfo(); |
| appendStringInfo(full_filepath, "%s/%s", |
| getCurrentTempFilePath, path); |
| |
| /* Remove file from disk */ |
| int res = unlink(full_filepath->data); |
| |
| if (update_diskspace) |
| { |
| /* Reverting the used diskspace to reflect the deletion */ |
| WorkfileDiskspace_Commit(0, size, true /* update_query_space */ ); |
| } |
| |
| pfree(full_filepath->data); |
| pfree(full_filepath); |
| |
| return (res == 0); |
| } |
| |
| /* |
| * Unit tests for new ExecWorkfile and WorkfileSegspace functionality |
| * with underlying Buffile files |
| * |
| * This test is only run when the per-segment limit GUC is non-zero. |
| * If GUC is 0, then we don't keep track of the per-segment used size. |
| * |
| */ |
| static bool |
| execworkfile_buffile_test(void) |
| { |
| int64 result = 0; |
| bool success = false; |
| int64 expected_size = 0; |
| int64 final_size = 0; |
| int64 current_size = 0; |
| int64 initial_diskspace = WorkfileSegspace_GetSize(); |
| |
| unit_test_reset(); |
| elog(LOG, "Running test: execworkfile_buffile_test"); |
| if (0 == gp_workfile_limit_per_segment) |
| { |
| elog(LOG, "Skipping test because the gp_workfile_limit_per_segment is 0"); |
| unit_test_result(true); |
| return unit_test_summary(); |
| } |
| |
| /* Create file name */ |
| char *file_name = "test_execworkfile_buffile.dat"; |
| StringInfo test_filepath = makeStringInfo(); |
| appendStringInfo(test_filepath, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name); |
| |
| if (test_filepath->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path: %s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name))); |
| } |
| |
| elog(LOG, "Running sub-test: Creating EWF/Buffile"); |
| |
| ExecWorkFile *ewf = ExecWorkFile_Create(test_filepath->data, |
| BUFFILE, |
| false, /* delOnClose */ |
| 0 /* compressionType */); |
| |
| unit_test_result(ewf != NULL); |
| |
| int nchars = 100000; |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| elog(LOG, "Running sub-test: Writing small amount data to EWF/Buffile and checking size"); |
| |
| success = ExecWorkFile_Write(ewf, text->data, 20); |
| expected_size += 20; |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| |
| elog(LOG, "Running sub-test: Writing larger amount data (%d bytes) to EWF/Buffile and checking size", nchars); |
| success = ExecWorkFile_Write(ewf, text->data, nchars); |
| expected_size += nchars; |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| elog(LOG, "Running sub-test: Writing to the middle of a EWF/Buffile and checking size"); |
| result = ExecWorkFile_Seek(ewf, ExecWorkFile_GetSize(ewf) / 2, SEEK_SET); |
| Assert(result == 0); |
| /* This write should not add to the size */ |
| success = ExecWorkFile_Write(ewf, text->data, ExecWorkFile_GetSize(ewf) / 10); |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| elog(LOG, "Running sub-test: Seeking past end and writing data to EWF/Buffile and checking size"); |
| int past_end_offset = 100; |
| int past_end_write = 200; |
| result = ExecWorkFile_Seek(ewf, ExecWorkFile_GetSize(ewf) + past_end_offset, SEEK_SET); |
| Assert(result == 0); |
| success = ExecWorkFile_Write(ewf, text->data, past_end_write); |
| expected_size += past_end_offset + past_end_write; |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| elog(LOG, "Running sub-test: Closing EWF/Buffile"); |
| final_size = ExecWorkFile_Close(ewf, true); |
| unit_test_result(final_size == expected_size); |
| |
| elog(LOG, "Running sub-test: Opening existing EWF/Buffile and checking size"); |
| |
| ewf = ExecWorkFile_Open(test_filepath->data, |
| BUFFILE, |
| false, /* delOnClose */ |
| 0 /* compressionType */); |
| |
| current_size = ExecWorkFile_GetSize(ewf); |
| unit_test_result(current_size == final_size); |
| |
| elog(LOG, "Running sub-test: Reading from reopened EWF/Buffile file"); |
| |
| int buf_size = 100; |
| char *buf = (char *) palloc(buf_size); |
| |
| result = ExecWorkFile_Read(ewf, buf, buf_size); |
| unit_test_result(result == buf_size); |
| pfree(buf); |
| |
| elog(LOG, "Running sub-test: Closing EWF/Buffile"); |
| final_size = ExecWorkFile_Close(ewf, true); |
| |
| unit_test_result(final_size == current_size); |
| |
| elog(LOG, "Running sub-test: Removing physical file from disk"); |
| success = remove_tmp_file(test_filepath->data, final_size, true /* update_diskspace */); |
| |
| unit_test_result(success); |
| |
| pfree(test_filepath->data); |
| pfree(test_filepath); |
| pfree(text->data); |
| pfree(text); |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Unit tests for new ExecWorkfile and WorkfileSegspace functionality |
| * with underlying compressed bfz files |
| * |
| * This test is only run when the per-segment limit GUC is non-zero. |
| * If GUC is 0, then we don't keep track of the per-segment used size. |
| * |
| */ |
| static bool |
| execworkfile_bfz_zlib_test(void) |
| { |
| bool success = false; |
| int64 result = 0; |
| int64 expected_size = 0; |
| int64 current_size = 0; |
| int64 final_size = 0; |
| int64 initial_diskspace = WorkfileSegspace_GetSize(); |
| |
| unit_test_reset(); |
| elog(LOG, "Running test: execworkfile_bfz_zlib_test"); |
| |
| if (0 == gp_workfile_limit_per_segment) |
| { |
| elog(LOG, "Skipping test because the gp_workfile_limit_per_segment is 0"); |
| unit_test_result(true); |
| return unit_test_summary(); |
| } |
| |
| /* Create file name */ |
| char *file_name = "test_execworkfile_bfz_zlib.dat"; |
| StringInfo test_filepath = makeStringInfo(); |
| appendStringInfo(test_filepath, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name); |
| |
| if (test_filepath->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path: %s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name))); |
| } |
| |
| elog(LOG, "Running sub-test: Creating EWF/Buffile"); |
| |
| ExecWorkFile *ewf = ExecWorkFile_Create(test_filepath->data, |
| BFZ, |
| false, /* delOnClose */ |
| 1 /* compressionType */); |
| |
| unit_test_result(ewf != NULL); |
| |
| int nchars = 100000; |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| elog(LOG, "Running sub-test: Writing small amount data to EWF/BFZ and checking size"); |
| |
| success = ExecWorkFile_Write(ewf, text->data, 20); |
| expected_size += 20; |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| |
| elog(LOG, "Running sub-test: Writing larger amount data (%d bytes) to EWF/BFZ and checking size", nchars); |
| success = ExecWorkFile_Write(ewf, text->data, nchars); |
| expected_size += nchars; |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| elog(LOG, "Running sub-test: Suspending EWF/BFZ and checking for size"); |
| final_size = ExecWorkFile_Suspend(ewf); |
| unit_test_result(final_size < expected_size); |
| |
| elog(LOG, "Running sub-test: Restarting EWF/BFZ"); |
| ExecWorkFile_Restart(ewf); |
| unit_test_result(true); |
| |
| elog(LOG, "Running sub-test: Closing EWF/BFZ"); |
| final_size = ExecWorkFile_Close(ewf, true); |
| |
| unit_test_result(final_size < expected_size); |
| |
| elog(LOG, "Running sub-test: Opening existing EWF/BFZ and checking size"); |
| |
| ewf = ExecWorkFile_Open(test_filepath->data, |
| BFZ, |
| false, /* delOnClose */ |
| 1 /* compressionType */); |
| current_size = ExecWorkFile_GetSize(ewf); |
| unit_test_result(current_size == final_size); |
| |
| elog(LOG, "Running sub-test: Reading from reopened EWF/BFZ file"); |
| |
| int buf_size = 100; |
| char *buf = (char *) palloc(buf_size); |
| |
| result = ExecWorkFile_Read(ewf, buf, buf_size); |
| unit_test_result(result == buf_size); |
| pfree(buf); |
| |
| elog(LOG, "Running sub-test: Closing EWF/BFZ"); |
| final_size = ExecWorkFile_Close(ewf, true); |
| |
| unit_test_result(final_size == current_size); |
| |
| elog(LOG, "Running sub-test: Removing physical file from disk"); |
| success = remove_tmp_file(test_filepath->data, final_size, true /* update_diskspace */); |
| |
| unit_test_result(success); |
| |
| pfree(test_filepath->data); |
| pfree(test_filepath); |
| pfree(text->data); |
| pfree(text); |
| |
| return unit_test_summary(); |
| |
| } |
| |
| /* |
| * Unit tests for new ExecWorkfile and WorkfileSegspace functionality |
| * with underlying uncompressed bfz files |
| * |
| * This test is only run when the per-segment limit GUC is non-zero. |
| * If GUC is 0, then we don't keep track of the per-segment used size. |
| * |
| */ |
| static bool |
| execworkfile_bfz_uncompressed_test(void) |
| { |
| bool success = false; |
| int64 result = 0; |
| int64 expected_size = 0; |
| int64 current_size = 0; |
| int64 final_size = 0; |
| int64 initial_diskspace = WorkfileSegspace_GetSize(); |
| |
| unit_test_reset(); |
| elog(LOG, "Running test: execworkfile_bfz_uncompressed_test"); |
| |
| if (0 == gp_workfile_limit_per_segment) |
| { |
| elog(LOG, "Skipping test because the gp_workfile_limit_per_segment is 0"); |
| unit_test_result(true); |
| return unit_test_summary(); |
| } |
| |
| /* Create file name */ |
| char *file_name = "test_execworkfile_bfz_uncomp.dat"; |
| StringInfo test_filepath = makeStringInfo(); |
| appendStringInfo(test_filepath, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name); |
| |
| if (test_filepath->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path: %s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name))); |
| } |
| |
| elog(LOG, "Running sub-test: Creating EWF/Buffile"); |
| |
| ExecWorkFile *ewf = ExecWorkFile_Create(test_filepath->data, |
| BFZ, |
| false, /* delOnClose */ |
| 0 /* compressionType */); |
| |
| unit_test_result(ewf != NULL); |
| |
| int nchars = 100000; |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| elog(LOG, "Running sub-test: Writing small amount data to EWF/BFZ and checking size"); |
| |
| success = ExecWorkFile_Write(ewf, text->data, 20); |
| expected_size += 20; |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| |
| elog(LOG, "Running sub-test: Writing larger amount data (%d bytes) to EWF/BFZ and checking size", nchars); |
| success = ExecWorkFile_Write(ewf, text->data, nchars); |
| expected_size += nchars; |
| |
| unit_test_result(success && expected_size == WorkfileSegspace_GetSize() - initial_diskspace); |
| |
| elog(LOG, "Running sub-test: Suspending EWF/BFZ and checking for size"); |
| final_size = ExecWorkFile_Suspend(ewf); |
| unit_test_result(final_size >= expected_size); |
| |
| elog(LOG, "Running sub-test: Restarting EWF/BFZ"); |
| ExecWorkFile_Restart(ewf); |
| unit_test_result(true); |
| |
| elog(LOG, "Running sub-test: Closing EWF/BFZ"); |
| final_size = ExecWorkFile_Close(ewf, true); |
| |
| /* For uncompressed files, final file may contain checksums, which makes it |
| * larger than expected */ |
| unit_test_result(final_size >= expected_size); |
| |
| elog(LOG, "Running sub-test: Opening existing EWF/BFZ and checking size"); |
| |
| ewf = ExecWorkFile_Open(test_filepath->data, |
| BFZ, |
| false, /* delOnClose */ |
| 0 /* compressionType */); |
| current_size = ExecWorkFile_GetSize(ewf); |
| unit_test_result(current_size == final_size); |
| |
| elog(LOG, "Running sub-test: Reading from reopened EWF/BFZ file"); |
| |
| int buf_size = 100; |
| char *buf = (char *) palloc(buf_size); |
| |
| result = ExecWorkFile_Read(ewf, buf, buf_size); |
| unit_test_result(result == buf_size); |
| pfree(buf); |
| |
| elog(LOG, "Running sub-test: Closing EWF/BFZ"); |
| final_size = ExecWorkFile_Close(ewf, true); |
| |
| unit_test_result(final_size == current_size); |
| |
| elog(LOG, "Running sub-test: Removing physical file from disk"); |
| success = remove_tmp_file(test_filepath->data, final_size, true /* update_diskspace */); |
| |
| unit_test_result(success); |
| |
| pfree(test_filepath->data); |
| pfree(test_filepath); |
| pfree(text->data); |
| pfree(text); |
| |
| return unit_test_summary(); |
| |
| } |
| |
| /* |
| * Unit test for testing the fd.c FileDiskSize and other new capabilities |
| */ |
| static bool |
| fd_tests(void) |
| { |
| unit_test_reset(); |
| elog(LOG, "Running test: fd_tests"); |
| |
| elog(LOG, "Running sub-test: Creating fd file"); |
| |
| /* Create file name */ |
| char *file_name = "test_fd.dat"; |
| StringInfo test_filepath = makeStringInfo(); |
| appendStringInfo(test_filepath, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name); |
| |
| if (test_filepath->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path: %s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name))); |
| } |
| |
| File testFd = OpenNamedFile(test_filepath->data, |
| true /* create */, |
| false /* delOnClose */, |
| true /* closeAtEOXact */); |
| |
| unit_test_result(testFd > 0); |
| |
| elog(LOG, "Running sub-test: Reading size of open empty file"); |
| int64 fd_size = FileDiskSize(testFd); |
| unit_test_result(fd_size == 0L); |
| |
| elog(LOG, "Running sub-test: Closing file"); |
| FileClose(testFd); |
| unit_test_result(true); |
| |
| elog(LOG, "Running sub-test: Opening existing empty file and reading size"); |
| testFd = OpenNamedFile(test_filepath->data, |
| false /* create */, |
| false /* delOnClose */, |
| true /* closeAtEOXact */); |
| |
| fd_size = FileDiskSize(testFd); |
| unit_test_result(fd_size == 0L); |
| |
| elog(LOG, "Running sub-test: Writing to existing open file, sync and read size"); |
| int nchars = 10000; |
| StringInfo text = create_text_stringinfo(nchars); |
| int len_to_write = 5000; |
| Assert(len_to_write <= text->len); |
| |
| FileWrite(testFd, text->data, len_to_write); |
| FileSync(testFd); |
| |
| fd_size = FileDiskSize(testFd); |
| unit_test_result(fd_size == len_to_write); |
| |
| elog(LOG, "Running sub-test: Closing file"); |
| FileClose(testFd); |
| unit_test_result(true); |
| |
| pfree(text->data); |
| pfree(text); |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Unit tests for the buffile size functionality |
| */ |
| static bool |
| buffile_size_test(void) |
| { |
| unit_test_reset(); |
| elog(LOG, "Running test: buffile_size_test"); |
| |
| elog(LOG, "Running sub-test: Creating buffile"); |
| |
| /* Create file name */ |
| char *file_name = "test_buffile.dat"; |
| StringInfo test_filepath = makeStringInfo(); |
| appendStringInfo(test_filepath, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name); |
| |
| if (test_filepath->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path: %s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name))); |
| } |
| |
| BufFile *testBf = BufFileCreateFile(test_filepath->data, |
| false /* delOnClose */, false /* interXact */); |
| |
| unit_test_result(NULL != testBf); |
| |
| elog(LOG, "Running sub-test: Size of newly created buffile"); |
| int64 test_size = BufFileGetSize(testBf); |
| unit_test_result(test_size == 0); |
| |
| elog(LOG, "Running sub-test: Writing to new buffile and reading size < bufsize"); |
| int nchars = 10000; |
| int expected_size = nchars; |
| StringInfo text = create_text_stringinfo(nchars); |
| BufFileWrite(testBf, text->data, nchars); |
| pfree(text->data); |
| pfree(text); |
| test_size = BufFileGetSize(testBf); |
| |
| unit_test_result(test_size == expected_size); |
| |
| elog(LOG, "Running sub-test: Writing to new buffile and reading size > bufsize"); |
| nchars = 1000000; |
| expected_size += nchars; |
| text = create_text_stringinfo(nchars); |
| BufFileWrite(testBf, text->data, nchars); |
| test_size = BufFileGetSize(testBf); |
| |
| unit_test_result(test_size == expected_size); |
| |
| elog(LOG, "Running sub-test: seeking back and writing then testing size"); |
| BufFileSeek(testBf, expected_size/2, SEEK_SET); |
| /* This write should not add to the size */ |
| BufFileWrite(testBf, text->data, expected_size / 10); |
| test_size = BufFileGetSize(testBf); |
| |
| unit_test_result(test_size == expected_size); |
| |
| elog(LOG, "Running sub-test: Closing buffile"); |
| BufFileClose(testBf); |
| unit_test_result(true); |
| |
| elog(LOG, "Running sub-test: Opening existing and testing size"); |
| testBf = BufFileOpenFile(test_filepath->data, |
| false /* create */, |
| false /*delOnClose */, |
| false /*interXact */ |
| ); |
| test_size = BufFileGetSize(testBf); |
| |
| unit_test_result(test_size == expected_size); |
| |
| elog(LOG, "Running sub-test: Seek past end, appending and testing size"); |
| int past_end_offset = 100; |
| int past_end_write = 200; |
| BufFileSeek(testBf, expected_size + past_end_offset, SEEK_SET); |
| BufFileWrite(testBf, text->data, past_end_write); |
| expected_size += past_end_offset + past_end_write; |
| test_size = BufFileGetSize(testBf); |
| |
| unit_test_result(test_size == expected_size); |
| |
| elog(LOG, "Running sub-test: Closing buffile"); |
| BufFileClose(testBf); |
| unit_test_result(true); |
| |
| |
| pfree(text->data); |
| pfree(text); |
| |
| return unit_test_summary(); |
| |
| } |
| |
| /* |
| * Unit test for the atomic functions |
| * |
| * These are functional tests, they only test for correctness with no concurrency |
| * |
| */ |
| static bool |
| atomic_test(void) |
| { |
| unit_test_reset(); |
| elog(LOG, "Running test: atomic_test"); |
| |
| { |
| elog(LOG, "Running sub-test: compare_and_swap_64"); |
| uint64 dest = 5; |
| uint64 old = 5; |
| uint64 new = 6; |
| |
| elog(LOG, "Before: dest=%d, old=%d, new=%d", (uint32) dest, (uint32) old, (uint32) new); |
| int32 result = compare_and_swap_64(&dest, old, new); |
| elog(LOG, "After: dest=%d, old=%d, new=%d, result=%d", (uint32) dest, (uint32) old, (uint32) new, (uint32) result); |
| unit_test_result(dest == new); |
| } |
| |
| { |
| elog(LOG, "Running sub-test: gp_atomic_add_64 small addition"); |
| |
| int64 base = 25; |
| int64 inc = 3; |
| int64 result = 0; |
| int64 expected_result = base + inc; |
| elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| result = gp_atomic_add_int64(&base, inc); |
| elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| unit_test_result(result == expected_result && base == expected_result); |
| |
| elog(LOG, "Running sub-test: gp_atomic_add_64 small subtraction"); |
| |
| inc = -4; |
| result = 0; |
| expected_result = base + inc; |
| elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| result = gp_atomic_add_int64(&base, inc); |
| elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| unit_test_result(result == expected_result && base == expected_result); |
| |
| elog(LOG, "Running sub-test: gp_atomic_add_64 huge addition"); |
| base = 37421634719307; |
| inc = 738246483234; |
| result = 0; |
| expected_result = base + inc; |
| elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| result = gp_atomic_add_int64(&base, inc); |
| elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| unit_test_result(result == expected_result && base == expected_result); |
| |
| |
| elog(LOG, "Running sub-test: gp_atomic_add_64 huge subtraction"); |
| inc = -32738246483234; |
| result = 0; |
| expected_result = base + inc; |
| elog(DEBUG1, "Before: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| result = gp_atomic_add_int64(&base, inc); |
| elog(DEBUG1, "After: base=%lld, inc=%lld, result=%lld", (long long int) base, (long long int) inc, (long long int) result); |
| unit_test_result(result == expected_result && base == expected_result); |
| } |
| |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Unit test for BufFile support of large files (greater than 4 GB). |
| * |
| */ |
| static bool |
| buffile_large_file_test(void) |
| { |
| unit_test_reset(); |
| elog(LOG, "Running test: buffile_large_file_test"); |
| |
| StringInfo filename = makeStringInfo(); |
| |
| appendStringInfo(filename, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| "Test_large_buff.dat"); |
| |
| BufFile *bfile = BufFileCreateFile(filename->data, true /* delOnClose */, true /* interXact */); |
| |
| int nchars = 100000; |
| /* 4.5 GBs */ |
| int total_entries = 48319; |
| /* Entry that requires an int64 seek */ |
| int test_entry = 45000; |
| |
| StringInfo test_string = create_text_stringinfo(nchars); |
| |
| elog(LOG, "Running sub-test: Creating file %s", filename->data); |
| |
| for (int i = 0; i < total_entries; i++) |
| { |
| if (test_entry == i) |
| { |
| BufFileWrite(bfile, test_string->data , nchars*sizeof(char)); |
| } |
| else |
| { |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| BufFileWrite(bfile, text->data , nchars*sizeof(char)); |
| |
| pfree(text->data); |
| pfree(text); |
| } |
| } |
| elog(LOG, "Running sub-test: Reading record %s", filename->data); |
| |
| char *buffer= buffer = palloc(nchars * sizeof(char)); |
| |
| BufFileSeek(bfile, (int64) ((int64)test_entry * (int64) nchars), SEEK_SET); |
| |
| int nread = BufFileRead(bfile, buffer, nchars*sizeof(char)); |
| |
| BufFileClose(bfile); |
| |
| /* Verify correct size of the inserted record and content */ |
| unit_test_result (nread == nchars && |
| (strncmp(test_string->data, buffer, test_string->len) == 0)); |
| |
| pfree(filename->data); |
| pfree(test_string->data); |
| pfree(filename); |
| pfree(test_string); |
| |
| return unit_test_summary(); |
| |
| } |
| |
| /* |
| * Unit test for logical tape's support for large spill files. |
| * */ |
| static bool |
| logicaltape_test(void) |
| { |
| unit_test_reset(); |
| elog(LOG, "Running test: logicaltape_test"); |
| |
| int max_tapes = 10; |
| int nchars = 100000; |
| /* 4.5 GBs */ |
| int max_entries = 48319; |
| |
| /* Target record values */ |
| int test_tape = 5; |
| int test_entry = 45000; |
| LogicalTapePos entryPos; |
| |
| LogicalTapeSet *tape_set = LogicalTapeSetCreate(max_tapes, true /*delOnclose */); |
| |
| LogicalTape *work_tape = NULL; |
| |
| StringInfo test_string = create_text_stringinfo(nchars); |
| |
| elog(LOG, "Running sub-test: Creating LogicalTape"); |
| |
| /* Fill LogicalTapeSet */ |
| for (int i = 0; i < max_tapes; i++) |
| { |
| work_tape = LogicalTapeSetGetTape(tape_set, i); |
| |
| /* Create large SpillFile for LogicalTape */ |
| if (test_tape == i) |
| { |
| elog(LOG, "Running sub-test: Fill LogicalTape"); |
| for (int j = 0; j < max_entries; j++) |
| { |
| if ( j == test_entry) |
| { |
| /* Keep record position of target record in LogicalTape */ |
| LogicalTapeUnfrozenTell(tape_set, work_tape, &entryPos); |
| |
| LogicalTapeWrite(tape_set, work_tape, test_string->data, (size_t)test_string->len); |
| } |
| else |
| { |
| /* Add additional records */ |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| LogicalTapeWrite(tape_set, work_tape, text->data, (size_t)text->len); |
| |
| pfree(text->data); |
| pfree(text); |
| } |
| } |
| } |
| else |
| { |
| /* Add additional records */ |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| LogicalTapeWrite(tape_set, work_tape, text->data, (size_t)text->len); |
| |
| pfree(text->data); |
| pfree(text); |
| } |
| |
| } |
| |
| /* Set target LogicalTape */ |
| work_tape = LogicalTapeSetGetTape(tape_set, test_tape); |
| char *buffer= buffer = palloc(nchars * sizeof(char)); |
| |
| elog(LOG, "Running sub-test: Freeze LogicalTape"); |
| LogicalTapeFreeze(tape_set, work_tape); |
| |
| elog(LOG, "Running sub-test: Seek in LogicalTape"); |
| LogicalTapeSeek(tape_set, work_tape, &entryPos); |
| |
| elog(LOG, "Running sub-test: Reading from LogicalTape"); |
| LogicalTapeRead(tape_set, work_tape, buffer, (size_t)(nchars*sizeof(char))); |
| |
| LogicalTapeSetClose(tape_set, NULL /* work_set */); |
| |
| unit_test_result (strncmp(test_string->data, buffer, test_string->len) == 0); |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Unit test for testing large fd file. |
| * This unit test verifies that the file's size on disk is as expected. |
| */ |
| static bool |
| fd_large_file_test(void) |
| { |
| unit_test_reset(); |
| elog(LOG, "Running test: fd_large_file_test"); |
| |
| elog(LOG, "Running sub-test: Creating fd file"); |
| |
| /* Create file name */ |
| char *file_name = "test_large_fd.dat"; |
| StringInfo test_filepath = makeStringInfo(); |
| appendStringInfo(test_filepath, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name); |
| |
| if (test_filepath->len > MAXPGPATH) |
| { |
| ereport(ERROR, (errmsg("cannot generate path: %s/%s", |
| PG_TEMP_FILES_DIR, |
| file_name))); |
| } |
| |
| File testFd = OpenNamedFile(test_filepath->data, |
| true /* create */, |
| true /* delOnClose */, |
| true /* closeAtEOXact */); |
| |
| unit_test_result(testFd > 0); |
| |
| elog(LOG, "Running sub-test: Writing to existing open file, sync and read size"); |
| int nchars = 100000; |
| /* 4.5 GBs */ |
| int total_entries = 48319; |
| |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| for (int i = 0; i < total_entries; i++) |
| { |
| FileWrite(testFd, text->data, strlen(text->data)); |
| FileSync(testFd); |
| } |
| |
| pfree(text->data); |
| pfree(text); |
| |
| int64 fd_size = FileDiskSize(testFd); |
| unit_test_result(fd_size == (int64) nchars * sizeof(char) * (int64) total_entries); |
| |
| elog(LOG, "Running sub-test: Closing file"); |
| FileClose(testFd); |
| unit_test_result(true); |
| |
| pfree(test_filepath->data); |
| pfree(test_filepath); |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Unit test for writing a one MB execworkfile. |
| * |
| */ |
| static bool |
| execworkfile_create_one_MB_file(void) |
| { |
| unit_test_reset(); |
| elog(LOG, "Running test: execworkfile_one_MB_file_test"); |
| |
| StringInfo filename = makeStringInfo(); |
| |
| appendStringInfo(filename, |
| "%s/%s", |
| PG_TEMP_FILES_DIR, |
| "Test_buffile_one_MB_file_test.dat"); |
| |
| ExecWorkFile *ewf = ExecWorkFile_Create(filename->data, |
| BUFFILE, |
| true, /* delOnClose */ |
| 0 /* compressionType */); |
| |
| /* Number of characters in a MB */ |
| int nchars = (int)((1<<20)/sizeof(char)); |
| |
| elog(LOG, "Running sub-test: Creating file %s", filename->data); |
| |
| StringInfo text = create_text_stringinfo(nchars); |
| |
| ExecWorkFile_Write(ewf, text->data, nchars*sizeof(char)); |
| |
| pfree(text->data); |
| pfree(text); |
| |
| elog(LOG, "Running sub-test: Closing file %s", filename->data); |
| |
| int64 final_size = workfile_mgr_close_file(NULL /* work_set */, ewf, true); |
| |
| /* Verify correct size of the created file */ |
| unit_test_result (final_size == (int64)nchars*sizeof(char) ); |
| |
| pfree(filename->data); |
| pfree(filename); |
| |
| return unit_test_summary(); |
| |
| } |
| |
| /* |
| * Unit test for tracking per-query disk space |
| */ |
| static bool |
| workfile_queryspace(void) |
| { |
| |
| int64 current_size = 0; |
| bool success = false; |
| |
| unit_test_reset(); |
| elog(LOG, "Running test: workfile_queryspace"); |
| if (0 == gp_workfile_limit_per_query) |
| { |
| elog(LOG, "Skipping test because the gp_workfile_limit_per_query is 0"); |
| unit_test_result(true); |
| return unit_test_summary(); |
| } |
| |
| elog(LOG, "Running sub-test: Reading initial workfile_queryspace"); |
| |
| current_size = WorkfileQueryspace_GetSize(gp_session_id, gp_command_count); |
| |
| unit_test_result(current_size == 0); |
| |
| elog(LOG, "Running sub-test: Reading bogus workfile_queryspace"); |
| |
| current_size = WorkfileQueryspace_GetSize(gp_session_id, gp_command_count+1); |
| |
| unit_test_result(current_size == -1); |
| |
| elog(LOG, "Running sub-test: Reserving workfile queryspace"); |
| |
| int64 bytes_reserved = 12345; |
| success = WorkfileQueryspace_Reserve(bytes_reserved); |
| current_size = WorkfileQueryspace_GetSize(gp_session_id, gp_command_count); |
| |
| unit_test_result(success && current_size == bytes_reserved); |
| |
| elog(LOG, "Running sub-test: Committing workfile queryspace"); |
| int64 bytes_committed = 1234; |
| WorkfileQueryspace_Commit(bytes_committed, bytes_reserved); |
| current_size = WorkfileQueryspace_GetSize(gp_session_id, gp_command_count); |
| |
| unit_test_result(current_size == bytes_committed); |
| |
| |
| return unit_test_summary(); |
| } |
| |
| /* |
| * Unit test that inserts many entries in the workfile mgr shared cache |
| */ |
| static bool |
| workfile_fill_sharedcache(void) |
| { |
| bool success = true; |
| |
| unit_test_reset(); |
| elog(LOG, "Running test: workfile_fill_sharedcache"); |
| |
| int n_entries = gp_workfile_max_entries + 1; |
| |
| elog(LOG, "Running sub-test: Creating %d empty workfile sets", n_entries); |
| |
| int crt_entry = 0; |
| for (crt_entry = 0; crt_entry < n_entries; crt_entry++) |
| { |
| workfile_set *work_set = workfile_mgr_create_set(BUFFILE, |
| false /* can_be_reused */, NULL /* PlanState */, NULL_SNAPSHOT); |
| if (NULL == work_set) |
| { |
| success = false; |
| break; |
| } |
| if (crt_entry >= gp_workfile_max_entries - 2) |
| { |
| /* Pause between adding extra ones so we can test from other sessions */ |
| elog(LOG, "Added %d entries out of %d, pausing for 30 seconds before proceeding", crt_entry + 1, n_entries); |
| sleep(30); |
| } |
| |
| } |
| |
| unit_test_result(success); |
| |
| return unit_test_summary(); |
| |
| } |
| |
| /* |
| * Unit test that creates very many workfiles, and then cleans them up |
| */ |
| static bool |
| workfile_create_and_set_cleanup(void) |
| { |
| bool success = true; |
| |
| unit_test_reset(); |
| elog(LOG, "Running test: workfile_create_and_set_cleanup"); |
| |
| elog(LOG, "Running sub-test: Create Workset"); |
| |
| workfile_set *work_set = workfile_mgr_create_set(BUFFILE, |
| false /* can_be_reused */, NULL /* PlanState */, NULL_SNAPSHOT); |
| |
| unit_test_result(NULL != work_set); |
| |
| elog(LOG, "Running sub-test: Create %d workfiles", TEST_MAX_NUM_WORKFILES); |
| |
| ExecWorkFile **ewfiles = (ExecWorkFile **) palloc(TEST_MAX_NUM_WORKFILES * sizeof(ExecWorkFile *)); |
| |
| for (int i=0; i < TEST_MAX_NUM_WORKFILES; i++) |
| { |
| ewfiles[i] = workfile_mgr_create_file(work_set); |
| |
| if (ewfiles[i] == NULL) |
| { |
| success = false; |
| break; |
| } |
| |
| if (i % 1000 == 999) |
| { |
| elog(LOG, "Created %d workfiles so far", i); |
| } |
| } |
| unit_test_result(success); |
| |
| elog(LOG, "Running sub-test: Closing Workset"); |
| workfile_mgr_close_set(work_set); |
| |
| unit_test_result(true); |
| |
| return unit_test_summary(); |
| } |
| |
| |
| /* |
| * Unit test that creates very many workfiles, and then closes them one by one |
| */ |
| static bool |
| workfile_create_and_individual_cleanup(void) |
| { |
| bool success = true; |
| |
| unit_test_reset(); |
| elog(LOG, "Running test: workfile_create_and_individual_cleanup"); |
| |
| elog(LOG, "Running sub-test: Create Workset"); |
| |
| workfile_set *work_set = workfile_mgr_create_set(BUFFILE, |
| false /* can_be_reused */, NULL /* PlanState */, NULL_SNAPSHOT); |
| |
| unit_test_result(NULL != work_set); |
| |
| elog(LOG, "Running sub-test: Create %d workfiles", TEST_MAX_NUM_WORKFILES); |
| |
| ExecWorkFile **ewfiles = (ExecWorkFile **) palloc(TEST_MAX_NUM_WORKFILES * sizeof(ExecWorkFile *)); |
| |
| for (int i=0; i < TEST_MAX_NUM_WORKFILES; i++) |
| { |
| ewfiles[i] = workfile_mgr_create_file(work_set); |
| |
| if (ewfiles[i] == NULL) |
| { |
| success = false; |
| break; |
| } |
| |
| if (i % 1000 == 999) |
| { |
| elog(LOG, "Created %d workfiles so far", i); |
| } |
| } |
| unit_test_result(success); |
| |
| elog(LOG, "Running sub-test: Closing %d workfiles", TEST_MAX_NUM_WORKFILES); |
| |
| for (int i=0; i < TEST_MAX_NUM_WORKFILES; i++) |
| { |
| workfile_mgr_close_file(work_set, ewfiles[i], true); |
| |
| if (i % 1000 == 999) |
| { |
| elog(LOG, "Closed %d workfiles so far", i); |
| } |
| } |
| unit_test_result(success); |
| |
| elog(LOG, "Running sub-test: Closing Workset"); |
| workfile_mgr_close_set(work_set); |
| |
| unit_test_result(true); |
| |
| return unit_test_summary(); |
| } |
| |
| static bool |
| unit_test_summary(void) |
| { |
| elog(LOG, "Unit tests summary: PASSED: %d/%d, FAILED: %d/%d", |
| tests_passed, tests_total, |
| tests_failed, tests_total); |
| return tests_failed == 0; |
| } |
| |
| static void |
| unit_test_reset() |
| { |
| tests_passed = tests_failed = tests_total = 0; |
| } |
| |
| static void |
| unit_test_result(bool result) |
| { |
| tests_total++; |
| |
| if (result) |
| { |
| tests_passed++; |
| elog(LOG, "====== PASS ======"); |
| } |
| else |
| { |
| tests_failed++; |
| elog(LOG, "!!!!!! FAIL !!!!!!"); |
| } |
| } |
| |
| static SyncHT * |
| syncrefhastable_test_create(void) |
| { |
| SyncHTCtl syncHTCtl; |
| MemSet(&syncHTCtl, 0, sizeof(SyncHTCtl)); |
| |
| syncHTCtl.entrySize = sizeof(TestSyncHTElt); |
| syncHTCtl.keySize = TEST_NAME_LENGTH; |
| |
| syncHTCtl.hash = string_hash; |
| syncHTCtl.keyCopy = (HashCopyFunc) strncpy; |
| syncHTCtl.match = (HashCompareFunc) strncmp; |
| |
| syncHTCtl.numElements = TEST_HT_NUM_ELEMENTS; |
| |
| syncHTCtl.baseLWLockId = FirstWorkfileMgrLock; |
| syncHTCtl.numPartitions = NUM_WORKFILEMGR_PARTITIONS; |
| |
| syncHTCtl.tabName = "Test SyncRef Hashtable"; |
| syncHTCtl.isEmptyEntry = isTestEltEmpty; |
| syncHTCtl.initEntry = initTestElt; |
| |
| syncHTCtl.keyOffset = GPDB_OFFSET(TestSyncHTElt, key); |
| syncHTCtl.pinCountOffset = GPDB_OFFSET(TestSyncHTElt, pinCount); |
| |
| /* Create hashtable in the top memory context so we can clean up if transaction aborts */ |
| MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext); |
| |
| SyncHT *syncHT = SyncHTCreate(&syncHTCtl); |
| Assert(syncHT); |
| |
| MemoryContextSwitchTo(oldcxt); |
| |
| return syncHT; |
| } |