blob: 57c4b4be4fb05f442769907d2714637898c29d75 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* cdbdatalocality.c
* Manages data locality.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/aomd.h"
#include "access/extprotocol.h"
#include "access/heapam.h"
#include "access/filesplit.h"
#include "access/parquetsegfiles.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/catquery.h"
#include "catalog/pg_exttable.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_extprotocol.h"
#include "cdb/cdbdatalocality.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbpartition.h"
#include "utils/lsyscache.h"
#include "utils/tqual.h"
#include "utils/memutils.h"
#include "executor/execdesc.h"
#include "executor/spi.h"
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "optimizer/walkers.h"
#include "optimizer/planmain.h"
#include "parser/parsetree.h"
#include "storage/fd.h"
#include "parser/parse_func.h"
#include "postmaster/identity.h"
#include "cdb/cdbmetadatacache.h"
#include "resourcemanager/utils/network_utils.h"
#include "access/skey.h"
#include "utils/fmgroids.h"
#include "utils/uri.h"
#include "catalog/pg_proc.h"
#include "postgres.h"
#include "resourcemanager/utils/hashtable.h"
/* We need to build a mapping from host name to host index */
extern bool optimizer; /* Enable the optimizer */
typedef struct segmentFilenoPair {
int segmentid;
int fileno;
} segmentFilenoPair;
typedef struct HostnameIndexKey {
char hostname[HOSTNAME_MAX_LENGTH];
} HostnameIndexKey;
typedef struct HostnameIndexEntry {
HostnameIndexKey key;
int index;
} HostnameIndexEntry;
/*
* structure containing all relation range table entries.
*/
typedef struct range_table_collector_context {
List *range_tables; /* tables without result relation(insert into etc) */
List *full_range_tables; /* every table include result relation */
} range_table_collector_context;
typedef struct collect_scan_rangetable_context {
plan_tree_base_prefix base;
List *range_tables; // range table for scan only
List *full_range_tables; // full range table
} collect_scan_rangetable_context;
/*
* structure containing information about how much a
* host holds.
*/
typedef struct HostDataVolumeInfo {
HostnameIndexEntry *hashEntry;
int64 datavolume;
int occur_count;
} HostDataVolumeInfo;
/*
* structure for data distribution statistics.
*/
typedef struct data_dist_stat_context {
int size;
int max_size;
HostDataVolumeInfo *volInfos;
} data_dist_stat_context;
/*
* structure for prefer host.
*/
typedef struct Prefer_Host {
char *hostname;
int64 data_size;
} Prefer_Host;
/*
* structure for file portion.
*/
typedef struct File_Split {
int64 offset;
int64 length;
int64 logiceof;
int host;
bool is_local_read;
char *ext_file_uri;
} File_Split;
typedef enum DATALOCALITY_RELATION_TYPE {
DATALOCALITY_APPENDONLY,
DATALOCALITY_PARQUET,
DATALOCALITY_HDFS,
DATALOCALITY_UNKNOWN
} DATALOCALITY_RELATION_TYPE;
/*
* structure for detailed file split.
*/
typedef struct Detailed_File_Split {
Oid rel_oid;
int segno; // file name suffix
int index;
int host;
int64 logiceof;
int64 offset;
int64 length;
char *ext_file_uri_string;
} Detailed_File_Split;
/*
* structure for block hosts.
*/
typedef struct Host_Index {
int index;
char *hostname; // used to sort host index
} Host_Index;
/*
* structure for block hosts.
*/
typedef struct Block_Host_Index {
int replica_num;
int* hostIndex; // hdfs host name list(size is replica_num)
Host_Index *hostIndextoSort; // used to sore host index
int insertHost; // the host which inserted this block.
} Block_Host_Index;
/*
* structure for one Relation File.
*/
typedef struct Relation_File {
int segno; // file name suffix
BlockLocation *locations;
Block_Host_Index *hostIDs;
int block_num;
File_Split *splits;
int split_num;
int segmentid;
int64 logic_len; // eof length.
double continue_ratio;
} Relation_File;
/*
* structure for one Relation File.
*/
typedef struct Split_Block {
int fileIndex;
int splitIndex;
} Split_Block;
/*
* structure for all relation data.
*/
typedef struct Relation_Data {
Oid relid;
DATALOCALITY_RELATION_TYPE type;
int64 total_size;
List *files;
Oid partition_parent_relid;
int64 block_count;
} Relation_Data;
/*
* structure for allocating all splits
* of one relation.
*/
typedef struct Relation_Assignment_Context {
int virtual_segment_num;
int64 upper_bound;
int64 *vols;
int64 *totalvols;
int64 *totalvols_with_penalty;
int *split_num;
int *continue_split_num;
int roundrobinIndex;
int total_split_num;
HASHTABLE vseg_to_splits_map;
HASHTABLE patition_parent_size_map; /*record avg vseg size of each partition table*/
HASHTABLE partitionvols_map; /*record vseg size of each partition table*/
HASHTABLE partitionvols_with_penalty_map; /*record vseg size of each partition table*/
HASHTABLE table_blocks_num_map; /*record block number of a table*/
double avg_size_of_whole_query; /*average size per vseg for all all the relation in a query*/
int64 avg_size_of_whole_partition_table; /*average size per vseg for all all the relation in a query*/
int block_lessthan_vseg_round_robin_no;
} Relation_Assignment_Context;
/*
* structure for print data locality result
*/
typedef struct Assignment_Log_Context {
double totalDataSize;
double datalocalityRatio;
int maxSegmentNumofHost;
int minSegmentNumofHost;
int avgSegmentNumofHost;
int numofDifferentHost;
double avgSizeOverall;
int64 maxSizeSegmentOverall;
int64 minSizeSegmentOverall;
double avgSizeOverallPenalty;
int64 maxSizeSegmentOverallPenalty;
int64 minSizeSegmentOverallPenalty;
double avgContinuityOverall;
double maxContinuityOverall;
double minContinuityOverall;
int64 localDataSizePerRelation;
int64 totalDataSizePerRelation;
} Assignment_Log_Context;
/*
* structure for target segment ID mapping.
*/
typedef struct TargetSegmentIDMap {
int target_segment_num;
int *global_IDs;
char** hostname;
} TargetSegmentIDMap;
/*
* structure for storing all HDFS block locations.
*/
typedef struct collect_hdfs_split_location_context {
List *relations;
} collect_hdfs_split_location_context;
/*
* structure for host split assignment result.
*/
typedef struct Host_Assignment_Result {
int count;
int max_size;
Detailed_File_Split *splits;
} Host_Assignment_Result;
/*
* structure for total split assignment result.
*/
typedef struct Split_Assignment_Result {
int host_num;
Host_Assignment_Result *host_assigns;
} Split_Assignment_Result;
/*
* structure for host data statistics.
*/
typedef struct hostname_volume_stat_context {
int size;
HostnameVolumeInfo *hostnameVolInfos;
} hostname_volume_stat_context;
/*
* structure for tracking the whole procedure
* of computing the split to segment mapping
* for each query.
*/
typedef struct split_to_segment_mapping_context {
range_table_collector_context rtc_context;
collect_scan_rangetable_context srtc_context;
data_dist_stat_context dds_context;
collect_hdfs_split_location_context chsl_context;
hostname_volume_stat_context host_context;
HTAB *hostname_map;
bool keep_hash;
int prefer_segment_num;
int64 split_size;
MemoryContext old_memorycontext;
MemoryContext datalocality_memorycontext;
int externTableForceSegNum; //expected virtual segment number when external table exists
int externTableLocationSegNum; //expected virtual segment number when external table exists
int tableFuncSegNum; //expected virtual segment number when table function exists
int hashSegNum; // expected virtual segment number when there is hash table in from clause
int randomSegNum; // expected virtual segment number when there is random table in from clause
int resultRelationHashSegNum; // expected virtual segment number when hash table as a result relation
int minimum_segment_num; //default is 1.
int64 randomRelSize; //all the random relation size
int64 hashRelSize; //all the hash relation size
int64 total_size; /* total data size for all relations */
int64 total_split_count;
int64 total_file_count;
int64 total_metadata_logic_len;
int metadata_cache_time_us;
int alloc_resource_time_us;
int cal_datalocality_time_us;
} split_to_segment_mapping_context;
typedef struct vseg_list{
List* vsegList;
}vseg_list;
static MemoryContext DataLocalityMemoryContext = NULL;
static void init_datalocality_memory_context(void);
static void init_split_assignment_result(Split_Assignment_Result *result,
int host_num);
static void init_datalocality_context(PlannedStmt *plannedstmt,
split_to_segment_mapping_context *context);
static bool range_table_collector_walker(Node *node,
range_table_collector_context *context);
static void collect_range_tables(Query *query,
range_table_collector_context *context);
static bool collect_scan_rangetable(Node *node,
collect_scan_rangetable_context *cxt);
static void convert_range_tables_to_oids_and_check_table_functions(List **range_tables, bool* isUDFExists,
MemoryContext my_memorycontext);
static void check_keep_hash_and_external_table(
split_to_segment_mapping_context *collector_context, Query *query,
GpPolicy *intoPolicy);
static int64 get_block_locations_and_claculte_table_size(
split_to_segment_mapping_context *collector_context);
static bool dataStoredInHdfs(Relation rel);
static List *get_virtual_segments(QueryResource *resource);
static List *run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, QueryResource ** resourcePtr,
split_to_segment_mapping_context *context);
static void double_check_hdfs_metadata_logic_length(BlockLocation * locations,int block_num,int64 logic_len);
static void AOGetSegFileDataLocation(Relation relation,
AppendOnlyEntry *aoEntry, Snapshot metadataSnapshot,
split_to_segment_mapping_context *context, int64 splitsize,
Relation_Data *rel_data, int* hitblocks,
int* allblocks, GpPolicy *targetPolicy);
static void ParquetGetSegFileDataLocation(Relation relation,
AppendOnlyEntry *aoEntry, Snapshot metadataSnapshot,
split_to_segment_mapping_context *context, int64 splitsize,
Relation_Data *rel_data, int* hitblocks,
int* allblocks, GpPolicy *targetPolicy);
static void ExternalGetHdfsFileDataLocation(Relation relation,
split_to_segment_mapping_context *context, int64 splitsize,
Relation_Data *rel_data, int* allblocks);
Oid LookupCustomProtocolBlockLocationFunc(char *protoname);
static BlockLocation *fetch_hdfs_data_block_location(char *filepath, int64 len,
int *block_num, RelFileNode rnode, uint32_t segno, double* hit_ratio);
static void free_hdfs_data_block_location(BlockLocation *locations,
int block_num);
static Block_Host_Index * update_data_dist_stat(
split_to_segment_mapping_context *context, BlockLocation *locations,
int block_num);
static HostDataVolumeInfo *search_host_in_stat_context(
split_to_segment_mapping_context *context, char *hostname);
static bool IsBuildInFunction(Oid funcOid);
static bool allocate_hash_relation(Relation_Data* rel_data,
Assignment_Log_Context *log_context, TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context, bool parentIsHashExist, bool parentIsHash);
static void allocate_random_relation(Relation_Data* rel_data,
Assignment_Log_Context *log_context, TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context);
static void print_datalocality_overall_log_information(SplitAllocResult *result,
List *virtual_segments, int relationCount,
Assignment_Log_Context *log_context,
Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context);
static void caculate_per_relation_data_locality_result(Relation_Data* rel_data,
Assignment_Log_Context* log_context,
Relation_Assignment_Context* assignment_context);
static void combine_all_splits(Detailed_File_Split **splits,
Relation_Assignment_Context* assignment_context, TargetSegmentIDMap* idMap,
Assignment_Log_Context* log_context,
split_to_segment_mapping_context* context);
static int remedy_non_localRead(int fileIndex, int splitIndex, int parentPos,
Relation_File** file_vector, int fileCount, int64 maxExtendedSizePerSegment,
TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context);
static int select_random_host_algorithm(Relation_Assignment_Context *context,
int64 splitsize, int64 maxExtendedSizePerSegment, TargetSegmentIDMap *idMap,
Block_Host_Index** hostid,int fileindex, Oid partition_parent_oid, bool* isLocality);
static int compare_detailed_file_split(const void *e1, const void *e2);
static int compare_container_segment(const void *e1, const void *e2);
static int compare_file_segno(const void *e1, const void *e2);
static int compare_relation_size(const void *e1, const void *e2);
static int compare_file_continuity(const void *e1, const void *e2);
static int compare_hostid(const void *e1, const void *e2);
static void assign_split_to_host(Host_Assignment_Result *result,
Detailed_File_Split *split);
static void assign_splits_to_hosts(Split_Assignment_Result *result,
Detailed_File_Split *splits, int split_num);
static List *post_process_assign_result(Split_Assignment_Result *result);
static List *search_map_node(List *result, Oid rel_oid, int host_num,
SegFileSplitMapNode **found_map_node);
static void cleanup_allocation_algorithm(
split_to_segment_mapping_context *context);
//static void print_split_alloc_result(List *alloc_result);
//static int64 costTransform(Cost totalCost);
static void change_hash_virtual_segments_order(QueryResource ** resourcePtr,
Relation_Data *rel_data, Relation_Assignment_Context *assignment_context,
TargetSegmentIDMap* idMap_ptr);
static bool is_relation_hash(GpPolicy *targetPolicy);
static void allocation_preparation(List *hosts, TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context);
static Relation_File** change_file_order_based_on_continuity(
Relation_Data *rel_data, TargetSegmentIDMap* idMap, int host_num,
int* fileCount, Relation_Assignment_Context *assignment_context);
static int64 set_maximum_segment_volume_parameter(Relation_Data *rel_data,
int host_num, double* maxSizePerSegment);
static void InvokeHDFSProtocolBlockLocation(Oid procOid,
List *locs,
List **blockLocations);
/*
* Setup /cleanup the memory context for this run
* of data locality algorithm.
*/
static void init_datalocality_memory_context(void) {
if (DataLocalityMemoryContext == NULL) {
DataLocalityMemoryContext = AllocSetContextCreate(TopMemoryContext,
"DataLocalityMemoryContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
} else {
MemoryContextResetAndDeleteChildren(DataLocalityMemoryContext);
}
return;
}
static void init_split_assignment_result(Split_Assignment_Result *result,
int host_num) {
int i;
result->host_num = host_num;
result->host_assigns = (Host_Assignment_Result *) palloc(
sizeof(Host_Assignment_Result) * host_num);
for (i = 0; i < host_num; i++) {
result->host_assigns[i].count = 0;
result->host_assigns[i].max_size = 2;
result->host_assigns[i].splits = (Detailed_File_Split *) palloc(
sizeof(Detailed_File_Split) * 2);
}
return;
}
static void init_datalocality_context(PlannedStmt *plannedstmt,
split_to_segment_mapping_context *context) {
context->old_memorycontext = CurrentMemoryContext;
context->datalocality_memorycontext = DataLocalityMemoryContext;
context->chsl_context.relations = NIL;
context->rtc_context.range_tables = NIL;
context->rtc_context.full_range_tables = plannedstmt->rtable;
context->srtc_context.range_tables = NIL;
context->srtc_context.full_range_tables = plannedstmt->rtable;
context->srtc_context.base.node = (Node *)plannedstmt;
context->externTableForceSegNum = 0;
context->externTableLocationSegNum = 0;
context->tableFuncSegNum = 0;
context->hashSegNum = 0;
context->resultRelationHashSegNum = 0;
context->randomSegNum = 0;
context->randomRelSize = 0;
context->hashRelSize = 0;
context->minimum_segment_num = 1;
/*
* initialize the data distribution
* static context.
*/
{
context->dds_context.size = 0;
context->dds_context.max_size = 4;
MemoryContextSwitchTo(context->datalocality_memorycontext);
context->dds_context.volInfos = (HostDataVolumeInfo *) palloc(
sizeof(HostDataVolumeInfo) * context->dds_context.max_size);
MemSet(context->dds_context.volInfos, 0,
sizeof(HostDataVolumeInfo) * context->dds_context.max_size);
MemoryContextSwitchTo(context->old_memorycontext);
}
/*
* initialize the hostname map hash.
*/
{
HASHCTL ctl;
ctl.keysize = sizeof(HostnameIndexKey);
ctl.entrysize = sizeof(HostnameIndexEntry);
ctl.hcxt = context->datalocality_memorycontext;
context->hostname_map = hash_create("Hostname Index Map Hash", 16, &ctl,
HASH_ELEM);
}
context->keep_hash = false;
context->prefer_segment_num = -1;
context->split_size = split_read_size_mb;
context->split_size <<= 20;
context->total_size = 0;
context->total_split_count = 0;
context->total_file_count = 0;
context->total_metadata_logic_len = 0;
context->metadata_cache_time_us = 0;
context->alloc_resource_time_us = 0;
context->cal_datalocality_time_us = 0;
return;
}
/*
* range_table_collector_walker: the routine to collect all range table relations.
*/
static bool range_table_collector_walker(Node *node,
range_table_collector_context *context) {
if (node == NULL) {
return false;
}
if (IsA(node, Query)) {
return query_tree_walker((Query *) node, range_table_collector_walker,
(void *) context,
QTW_EXAMINE_RTES);
}
if (IsA(node, RangeTblEntry)) {
if (((RangeTblEntry *) node)->rtekind == RTE_RELATION) {
context->range_tables = lappend(context->range_tables, node);
}
return false;
}
return expression_tree_walker(node, range_table_collector_walker,
(void *) context);
}
/*
* collect_range_tables: collect all range table relations, and store
* them into the range_table_collector_context.
*/
static void collect_range_tables(Query *query,
range_table_collector_context *context) {
query_tree_walker(query, range_table_collector_walker, (void *) context,
QTW_EXAMINE_RTES);
if (query->resultRelation > 0) {
RangeTblEntry* resultRte = rt_fetch(query->resultRelation, query->rtable);
ListCell *lc;
List *new_range_tables = NIL;
bool isFound = false;
foreach(lc, context->range_tables)
{
RangeTblEntry *entry = (RangeTblEntry *) lfirst(lc);
if (resultRte->relid == entry->relid && !isFound) {
isFound = true;
} else {
new_range_tables = lappend(new_range_tables, entry);
}
}
context->range_tables = new_range_tables;
}
return;
}
bool collect_scan_rangetable(Node *node,
collect_scan_rangetable_context *cxt) {
if (NULL == node) return false;
switch (nodeTag(node)) {
case T_ExternalScan:
case T_AppendOnlyScan:
case T_ParquetScan: {
RangeTblEntry *rte = rt_fetch(((Scan *)node)->scanrelid,
cxt->full_range_tables);
cxt->range_tables = lappend(cxt->range_tables, rte);
}
default:
break;
}
return plan_tree_walker(node, collect_scan_rangetable, cxt);
}
/*
*
*/
static bool IsBuildInFunction(Oid foid) {
cqContext *pcqCtx;
HeapTuple procedureTuple;
Form_pg_proc procedureStruct;
/*
* get the procedure tuple corresponding to the given function Oid
*/
pcqCtx = caql_beginscan(
NULL,
cql("SELECT * FROM pg_proc "
" WHERE oid = :1 ",
ObjectIdGetDatum(foid)));
procedureTuple = caql_getnext(pcqCtx);
if (!HeapTupleIsValid(procedureTuple))
elog(ERROR, "cache lookup failed for function %u", foid);
procedureStruct = (Form_pg_proc) GETSTRUCT(procedureTuple);
caql_endscan(pcqCtx);
/* we treat proc namespace = 11 to build in function.*/
if (procedureStruct->pronamespace == 11) {
return true;
} else {
return false;
}
}
/*
*
*/
static void convert_range_tables_to_oids_and_check_table_functions(List **range_tables, bool* isTableFunctionExists,
MemoryContext my_memorycontext) {
List *new_range_tables = NIL;
ListCell *old_lc;
MemoryContext old_memorycontext;
old_memorycontext = MemoryContextSwitchTo(my_memorycontext);
foreach(old_lc, *range_tables)
{
RangeTblEntry *entry = (RangeTblEntry *) lfirst(old_lc);
if (entry->rtekind != RTE_RELATION) {
continue;
}
Oid rel_oid = entry->relid;
List *children = NIL;
ListCell *child;
children = find_all_inheritors(rel_oid);
foreach(child, children)
{
Oid myrelid = lfirst_oid(child);
ListCell *new_lc;
bool found = false;
foreach(new_lc, new_range_tables)
{
Oid relid = lfirst_oid(new_lc);
if (myrelid == relid) {
found = true;
break;
}
}
if (!found) {
new_range_tables = lappend_oid(new_range_tables, myrelid);
}
}
}
MemoryContextSwitchTo(old_memorycontext);
*range_tables = new_range_tables;
return;
}
/*
* check_keep_hash_dist_policy: determine whether keep the hash distribution policy.
*/
static void check_keep_hash_and_external_table(
split_to_segment_mapping_context *context, Query *query,
GpPolicy *intoPolicy) {
ListCell *lc;
MemoryContextSwitchTo(context->datalocality_memorycontext);
if (query->resultRelation != 0) /* This is a insert command */
{
GpPolicy *targetPolicy = NULL;
RangeTblEntry *rte = rt_fetch(query->resultRelation, query->rtable);
Assert(rte->rtekind == RTE_RELATION);
targetPolicy = GpPolicyFetch(CurrentMemoryContext, rte->relid);
if (targetPolicy->nattrs > 0) /* distributed by table */
{
context->keep_hash = true;
context->resultRelationHashSegNum = targetPolicy->bucketnum;
}
pfree(targetPolicy);
}
/*
* This is a CREATE TABLE AS statement
* SELECT * INTO newtable from origintable would create a random table default. Not hash table previously.
*/
if ((intoPolicy != NULL) && (intoPolicy->nattrs > 0))
{
context->keep_hash = true;
context->resultRelationHashSegNum = intoPolicy->bucketnum;
}
foreach(lc, context->rtc_context.range_tables)
{
GpPolicy *targetPolicy = NULL;
Relation rel = NULL;
Oid myrelid = lfirst_oid(lc);
targetPolicy = GpPolicyFetch(CurrentMemoryContext, myrelid);
rel = relation_open(myrelid, AccessShareLock);
if (RelationIsExternal(rel)) {
/* targetPolicy->bucketnum is bucket number of external table,
* whose default value is set to default_segment_num
*/
ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id);
bool isPxf = false;
if (!extEnrty->command && extEnrty->locations) {
char* first_uri_str = (char *) strVal(lfirst(list_head(extEnrty->locations)));
if (first_uri_str) {
Uri* uri = ParseExternalTableUri(first_uri_str);
if (uri && uri->protocol == URI_CUSTOM && is_pxf_protocol(uri)) {
isPxf = true;
}
else if (uri && (is_hdfs_protocol(uri))) {
relation_close(rel, AccessShareLock);
if (targetPolicy->nattrs > 0)
{
/*select the maximum hash bucket number as hashSegNum of query*/
if (context->hashSegNum < targetPolicy->bucketnum)
{
context->hashSegNum = targetPolicy->bucketnum;
context->keep_hash = true;
}
}
continue;
}
}
}
if (extEnrty->command || isPxf) {
// command external table or pxf case
if (context->externTableForceSegNum == 0) {
context->externTableForceSegNum = targetPolicy->bucketnum;
} else {
if (context->externTableForceSegNum != targetPolicy->bucketnum) {
/*
* In this case, two external table join but with different bucket number
* we cannot allocate the right segment number.
*/
elog(ERROR, "All external tables in one query must have the same bucket number!");
}
}
}
else {
// gpfdist location case and others
if (context->externTableLocationSegNum < targetPolicy->bucketnum) {
context->externTableLocationSegNum = targetPolicy->bucketnum;
context->minimum_segment_num = targetPolicy->bucketnum;
}
}
}
/*for hash relation */
else if (targetPolicy->nattrs > 0) {
/*select the maximum hash bucket number as hashSegNum of query*/
if (context->hashSegNum < targetPolicy->bucketnum) {
context->hashSegNum = targetPolicy->bucketnum;
context->keep_hash = true;
}
}
relation_close(rel, AccessShareLock);
pfree(targetPolicy);
}
MemoryContextSwitchTo(context->old_memorycontext);
return;
}
/*
* get_virtual_segments: fetch the virtual segments from the
* resource management.
*/
static List *
get_virtual_segments(QueryResource *resource) {
List *result = NIL;
ListCell *lc;
foreach (lc, resource->segments)
{
Segment *info = (Segment *) lfirst(lc);
VirtualSegmentNode *vs = makeNode(VirtualSegmentNode);
vs->hostname = pstrdup(
info->hdfsHostname == NULL ? info->hostname : info->hdfsHostname);
result = lappend(result, vs);
}
return result;
}
/*
* get_block_locations_and_claculte_table_size: the HDFS block information
* corresponding to the required relations, and calculate relation size
*/
int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_context *context) {
uint64_t allRelationFetchBegintime = 0;
uint64_t allRelationFetchLeavetime = 0;
int totalFileCount = 0;
int hitblocks = 0;
int allblocks = 0;
allRelationFetchBegintime = gettime_microsec();
ListCell *lc;
int64 total_size = 0;
Snapshot saveActiveSnapshot = ActiveSnapshot;
MemoryContextSwitchTo(context->datalocality_memorycontext);
if (ActiveSnapshot == NULL)
{
ActiveSnapshot = GetTransactionSnapshot();
}
ActiveSnapshot = CopySnapshot(ActiveSnapshot);
ActiveSnapshot->curcid = GetCurrentCommandId();
foreach(lc, context->rtc_context.full_range_tables)
{
Oid rel_oid = lfirst_oid(lc);
Relation rel = relation_open(rel_oid, AccessShareLock);
/*
* We only consider the data stored in HDFS.
*/
bool isDataStoredInHdfs = dataStoredInHdfs(rel);
if (isDataStoredInHdfs ) {
GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
Relation_Data *rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
rel_data->relid = rel_oid;
rel_data->files = NIL;
rel_data->partition_parent_relid = 0;
rel_data->block_count = 0;
bool isResultRelation = true;
ListCell *nonResultlc;
foreach(nonResultlc, context->rtc_context.range_tables)
{
Oid nonResultRel_oid = lfirst_oid(nonResultlc);
if (rel_oid == nonResultRel_oid) {
isResultRelation = false;
}
}
if (!isResultRelation) {
// skip the relation not in scan nodes
// for partition table scan optimization;
ListCell *rtc;
bool found = false;
foreach(rtc, context->srtc_context.range_tables) {
RangeTblEntry *rte = lfirst(rtc);
if (rel_oid == rte->relid) {
found = true;
break;
}
}
if (!found) {
relation_close(rel, AccessShareLock);
continue;
}
}
if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
/*
* Get pg_appendonly information for this table.
*/
AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
/*
* Based on the pg_appendonly information, calculate the data
* location information associated with this relation.
*/
if (RelationIsAoRows(rel)) {
rel_data->type = DATALOCALITY_APPENDONLY;
AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
aoEntry->splitsize, rel_data, &hitblocks,
&allblocks, targetPolicy);
} else {
rel_data->type = DATALOCALITY_PARQUET;
ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
context->split_size, rel_data, &hitblocks,
&allblocks, targetPolicy);
}
} else if (RelationIsExternal(rel)) {
if (isDataStoredInHdfs) {
// we can't use metadata cache, so hitblocks will always be 0
rel_data->type = DATALOCALITY_HDFS;
ExternalGetHdfsFileDataLocation(rel, context, context->split_size,
rel_data, &allblocks);
}
}
if (!isResultRelation) {
total_size += rel_data->total_size;
totalFileCount += list_length(rel_data->files);
//for hash relation
if (targetPolicy->nattrs > 0) {
context->hashRelSize += rel_data->total_size;
} else {
context->randomRelSize += rel_data->total_size;
}
}
context->chsl_context.relations = lappend(context->chsl_context.relations,
rel_data);
pfree(targetPolicy);
}
relation_close(rel, AccessShareLock);
}
MemoryContextSwitchTo(context->old_memorycontext);
ActiveSnapshot = saveActiveSnapshot;
allRelationFetchLeavetime = gettime_microsec();
int eclaspeTime = allRelationFetchLeavetime - allRelationFetchBegintime;
double hitrate = (allblocks == 0) ? 0 : (double) hitblocks / allblocks;
if (debug_print_split_alloc_result) {
elog(LOG, "fetch blocks of %d files overall execution time: %d us with hit rate %f",
totalFileCount, eclaspeTime, hitrate);
}
context->total_file_count = totalFileCount;
context->total_size = total_size;
context->metadata_cache_time_us = eclaspeTime;
if(debug_datalocality_time){
elog(LOG, "metadata overall execution time: %d us. \n", eclaspeTime);
}
return total_size;
}
bool dataStoredInHdfs(Relation rel) {
if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
return true;
} else if (RelationIsExternal(rel)) {
ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id);
bool isHdfsProtocol = false;
if (!extEnrty->command && extEnrty->locations) {
char* first_uri_str = (char *) strVal(lfirst(list_head(extEnrty->locations)));
if (first_uri_str) {
Uri* uri = ParseExternalTableUri(first_uri_str);
if (uri && is_hdfs_protocol(uri)) {
isHdfsProtocol = true;
}
}
}
return isHdfsProtocol;
}
return false;
}
/*
* search_host_in_stat_context: search a host name in the statistic
* context; if not found, create a new one.
*/
static HostDataVolumeInfo *
search_host_in_stat_context(split_to_segment_mapping_context *context,
char *hostname) {
HostnameIndexKey key;
HostnameIndexEntry *entry;
bool found;
MemSet(&(key.hostname), 0, HOSTNAME_MAX_LENGTH);
strncpy(key.hostname, hostname, HOSTNAME_MAX_LENGTH - 1);
entry = (HostnameIndexEntry *) hash_search(context->hostname_map,
(void *) &key, HASH_ENTER, &found);
if (!found) {
if (context->dds_context.size >= context->dds_context.max_size) {
int offset = context->dds_context.max_size;
context->dds_context.max_size <<= 1;
context->dds_context.volInfos = (HostDataVolumeInfo *) repalloc(
context->dds_context.volInfos,
sizeof(HostDataVolumeInfo) * context->dds_context.max_size);
MemSet(context->dds_context.volInfos + offset, 0,
sizeof(HostDataVolumeInfo)
* (context->dds_context.max_size - offset));
}
entry->index = context->dds_context.size++;
context->dds_context.volInfos[entry->index].hashEntry = entry;
context->dds_context.volInfos[entry->index].datavolume = 0;
context->dds_context.volInfos[entry->index].occur_count = 0;
}
return context->dds_context.volInfos + entry->index;
}
/*
* fetch_hdfs_data_block_location: given a HDFS file path,
* collect all its data block location information.
*/
static BlockLocation *
fetch_hdfs_data_block_location(char *filepath, int64 len, int *block_num,
RelFileNode rnode, uint32_t segno, double* hit_ratio) {
// for fakse test, the len of file always be zero
if(len == 0 && !debug_fake_datalocality){
*hit_ratio = 0.0;
return NULL;
}
BlockLocation *locations;
HdfsFileInfo *file_info;
//double hit_ratio;
uint64_t beginTime;
beginTime = gettime_microsec();
if (metadata_cache_enable) {
file_info = CreateHdfsFileInfo(rnode, segno);
if (metadata_cache_testfile && metadata_cache_testfile[0]) {
locations = GetHdfsFileBlockLocationsForTest(filepath, len, block_num);
if (locations) {
DumpHdfsFileBlockLocations(locations, *block_num);
}
} else {
locations = GetHdfsFileBlockLocations(file_info, len, block_num,
hit_ratio);
}
DestroyHdfsFileInfo(file_info);
} else {
locations = HdfsGetFileBlockLocations(filepath, len, block_num);
}
if (debug_print_split_alloc_result) {
uint64 endTime = gettime_microsec();
int eclaspeTime = endTime - beginTime;
elog(LOG, "fetch blocks of file relationid %d segno %d execution time:"
" %d us with hit rate %f \n", rnode.relNode,segno,eclaspeTime,*hit_ratio);
}
if(locations == NULL && !debug_fake_datalocality){
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg("cannot fetch block locations"),
errdetail("%s", HdfsGetLastError())));
}
return locations;
}
/*
* free_hdfs_data_block_location: free the memory allocated when
* calling fetch_hdfs_data_block_location.
*/
static void free_hdfs_data_block_location(BlockLocation *locations,
int block_num) {
if (metadata_cache_enable) {
FreeHdfsFileBlockLocations(locations, block_num);
} else {
HdfsFreeFileBlockLocations(locations, block_num);
}
return;
}
/*
* update_data_dist_stat: update the data distribution
* statistics.
*/
static Block_Host_Index *
update_data_dist_stat(split_to_segment_mapping_context *context,
BlockLocation *locations, int block_num) {
int i;
Block_Host_Index *hostIDs;
hostIDs = (Block_Host_Index *) palloc(sizeof(Block_Host_Index) * block_num);
for (i = 0; i < block_num; i++) {
int j;
hostIDs[i].replica_num = locations[i].numOfNodes;
hostIDs[i].insertHost = -1;
hostIDs[i].hostIndex = (int *) palloc(sizeof(int) * hostIDs[i].replica_num);
hostIDs[i].hostIndextoSort = (Host_Index *) palloc(
sizeof(Host_Index) * hostIDs[i].replica_num);
for (j = 0; j < locations[i].numOfNodes; j++) {
char *hostname = pstrdup(locations[i].hosts[j]); /* locations[i].hosts[j]; */
HostDataVolumeInfo *info = search_host_in_stat_context(context, hostname);
info->datavolume += locations[i].length;
hostIDs[i].hostIndextoSort[j].index = info->hashEntry->index;
hostIDs[i].hostIndextoSort[j].hostname = hostname;
if (output_hdfs_block_location) {
elog(LOG, "block%d has replica %d on %s",i+1,j+1,hostname);
}
}
qsort(hostIDs[i].hostIndextoSort, locations[i].numOfNodes,
sizeof(Host_Index), compare_hostid);
for (j = 0; j < locations[i].numOfNodes; j++) {
hostIDs[i].hostIndex[j] = hostIDs[i].hostIndextoSort[j].index;
}
pfree(hostIDs[i].hostIndextoSort);
}
return hostIDs;
}
/*
* check hdfs file length equals to pg_aoseg file logic length
*/
static void double_check_hdfs_metadata_logic_length(BlockLocation * locations,int block_num,int64 logic_len) {
//double check hdfs file length equals to pg_aoseg logic length
int64 hdfs_file_len = 0;
for(int i=0;i<block_num;i++) {
hdfs_file_len += locations[i].length;
}
if(logic_len > hdfs_file_len) {
elog(ERROR, "hdfs file length does not equal to metadata logic length!");
}
}
/*
* AOGetSegFileDataLocation: fetch the data location of the
* segment files of the AO relation.
*/
static void AOGetSegFileDataLocation(Relation relation,
AppendOnlyEntry *aoEntry, Snapshot metadataSnapshot,
split_to_segment_mapping_context *context, int64 splitsize,
Relation_Data *rel_data, int* hitblocks,
int* allblocks, GpPolicy *targetPolicy) {
char *basepath;
char *segfile_path;
int filepath_maxlen;
Relation pg_aoseg_rel;
TupleDesc pg_aoseg_dsc;
HeapTuple tuple;
SysScanDesc aoscan;
int64 total_size = 0;
/*
* calculate the base file path for the segment files.
*/
basepath = relpath(relation->rd_node);
filepath_maxlen = strlen(basepath) + 9;
segfile_path = (char *) palloc0(filepath_maxlen);
// fake data locality
if (debug_fake_datalocality) {
fpaoseg = fopen("/tmp/aoseg.result", "r");
if (fpaoseg == NULL) {
elog(ERROR, "Could not open file!");
return;
}
int fileCount = 0;
int reloid = 0;
while (true) {
int res = fscanf(fpaoseg, "%d", &reloid);
if (res == 0) {
elog(ERROR, "cannot find relation in fake aoseg!");
}
res = fscanf(fpaoseg, "%d", &fileCount);
if (res == 0) {
elog(ERROR, "cannot find file count in fake aoseg!");
}
if (rel_data->relid == reloid) {
break;
}
}
fclose(fpaoseg);
for (int i = 0; i < fileCount; i++) {
BlockLocation *locations = NULL;
int block_num =0;
Relation_File *file;
int segno = i + 1;
int64 logic_len = 0;
bool isRelationHash = true;
if (targetPolicy->nattrs == 0) {
isRelationHash = false;
}
if (!context->keep_hash || !isRelationHash) {
FormatAOSegmentFileName(basepath, segno, -1, 0, &segno, segfile_path);
double hit_ratio=0.0;
locations = fetch_hdfs_data_block_location(segfile_path, logic_len,
&block_num, relation->rd_node, segno, &hit_ratio);
*allblocks += block_num;
*hitblocks += block_num * hit_ratio;
//fake data locality need to recalculate logic length
if ((locations != NULL) && (block_num > 0)) {
logic_len = 0;
for (int i = 0; i < block_num; i++) {
logic_len += locations[i].length;
}
}
if ((locations != NULL) && (block_num > 0)) {
Block_Host_Index * host_index = update_data_dist_stat(context,
locations, block_num);
for (int k = 0; k < block_num; k++) {
fprintf(fp, "block %d of file %d of relation %d is on ", k, segno,
rel_data->relid);
for (int j = 0; j < locations[k].numOfNodes; j++) {
char *hostname = pstrdup(locations[k].hosts[j]); /* locations[i].hosts[j]; */
fprintf(fp, "host No%d name:%s, ", host_index[k].hostIndex[j],
hostname);
}
fprintf(fp, "\n");
}
fflush(fp);
file = (Relation_File *) palloc(sizeof(Relation_File));
file->segno = segno;
file->block_num = block_num;
file->locations = locations;
file->hostIDs = host_index;
file->logic_len = logic_len;
if (aoEntry->majorversion < 2) {
File_Split *split = (File_Split *) palloc(sizeof(File_Split));
split->offset = 0;
split->length = logic_len;
split->ext_file_uri = NULL;
file->split_num = 1;
file->splits = split;
context->total_split_count += file->split_num;
} else {
File_Split *splits;
int split_num;
int realSplitNum;
int64 offset = 0;
// split equals to block
split_num = file->block_num;
splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
for (realSplitNum = 0; realSplitNum < split_num; realSplitNum++) {
splits[realSplitNum].host = -1;
splits[realSplitNum].is_local_read = true;
splits[realSplitNum].offset = offset;
splits[realSplitNum].ext_file_uri = NULL;
splits[realSplitNum].length =
file->locations[realSplitNum].length;
if (logic_len - offset <= splits[realSplitNum].length) {
splits[realSplitNum].length = logic_len - offset;
realSplitNum++;
break;
}
offset += splits[realSplitNum].length;
}
file->split_num = realSplitNum;
file->splits = splits;
context->total_split_count += realSplitNum;
}
rel_data->files = lappend(rel_data->files, file);
}
} else {
FormatAOSegmentFileName(basepath, segno, -1, 0, &segno, segfile_path);
double hit_ratio = 0.0;
locations = fetch_hdfs_data_block_location(segfile_path, logic_len,
&block_num, relation->rd_node, segno, &hit_ratio);
*allblocks += block_num;
*hitblocks += block_num * hit_ratio;
//fake data locality need to recalculate logic length
if ((locations != NULL) && (block_num > 0)) {
logic_len = 0;
for (int i = 0; i < block_num; i++) {
logic_len += locations[i].length;
}
}
File_Split *split = (File_Split *) palloc(sizeof(File_Split));
file = (Relation_File *) palloc0(sizeof(Relation_File));
file->segno = segno;
split->offset = 0;
split->length = logic_len;
split->host = -1;
split->is_local_read = true;
split->ext_file_uri = NULL;
file->split_num = 1;
file->splits = split;
file->logic_len = logic_len;
if ((locations != NULL) && (block_num > 0)) {
Block_Host_Index * host_index = update_data_dist_stat(context,
locations, block_num);
// fake data locality
for (int k = 0; k < block_num; k++) {
fprintf(fp, "block %d of file %d of relation %d is on ", k, segno,
rel_data->relid);
for (int j = 0; j < locations[k].numOfNodes; j++) {
char *hostname = pstrdup(locations[k].hosts[j]); /* locations[i].hosts[j]; */
fprintf(fp, "host No%d name:%s, ", host_index[k].hostIndex[j],
hostname);
}
fprintf(fp, "\n");
}
fflush(fp);
file->block_num = block_num;
file->locations = locations;
file->hostIDs = host_index;
// for hash, we need to add block number to total_split_count
context->total_split_count += block_num;
} else {
file->block_num = 0;
file->locations = NULL;
file->hostIDs = NULL;
}
rel_data->files = lappend(rel_data->files, file);
}
total_size += logic_len;
if (!context->keep_hash) {
MemSet(segfile_path, 0, filepath_maxlen);
}
}
} else {
pg_aoseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
pg_aoseg_dsc = RelationGetDescr(pg_aoseg_rel);
aoscan = systable_beginscan(pg_aoseg_rel, InvalidOid, FALSE,
metadataSnapshot, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(aoscan))) {
BlockLocation *locations = NULL;
int block_num = 0;
Relation_File *file;
int segno = DatumGetInt32(
fastgetattr(tuple, Anum_pg_aoseg_segno, pg_aoseg_dsc,
NULL));
int64 logic_len = (int64) DatumGetFloat8(
fastgetattr(tuple, Anum_pg_aoseg_eof, pg_aoseg_dsc, NULL));
context->total_metadata_logic_len += logic_len;
bool isRelationHash = true;
if (targetPolicy->nattrs == 0) {
isRelationHash = false;
}
if (!context->keep_hash || !isRelationHash) {
FormatAOSegmentFileName(basepath, segno, -1, 0, &segno, segfile_path);
double hit_ratio = 0.0;
locations = fetch_hdfs_data_block_location(segfile_path, logic_len,
&block_num, relation->rd_node, segno, &hit_ratio);
*allblocks += block_num;
*hitblocks += block_num * hit_ratio;
if ((locations != NULL) && (block_num > 0)) {
Block_Host_Index * host_index = update_data_dist_stat(context,
locations, block_num);
double_check_hdfs_metadata_logic_length(locations, block_num, logic_len);
file = (Relation_File *) palloc(sizeof(Relation_File));
file->segno = segno;
file->block_num = block_num;
file->locations = locations;
file->hostIDs = host_index;
file->logic_len = logic_len;
if (aoEntry->majorversion < 2) {
File_Split *split = (File_Split *) palloc(sizeof(File_Split));
split->offset = 0;
split->length = logic_len;
split->ext_file_uri = NULL;
file->split_num = 1;
file->splits = split;
context->total_split_count += file->split_num;
} else {
File_Split *splits;
int split_num;
// for truncate real split number may be less than hdfs split number
int realSplitNum;
int64 offset = 0;
// split equals to block
split_num = file->block_num;
splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
for (realSplitNum = 0; realSplitNum < split_num; realSplitNum++) {
splits[realSplitNum].host = -1;
splits[realSplitNum].is_local_read = true;
splits[realSplitNum].offset = offset;
splits[realSplitNum].ext_file_uri = NULL;
splits[realSplitNum].length =
file->locations[realSplitNum].length;
if (logic_len - offset <= splits[realSplitNum].length) {
splits[realSplitNum].length = logic_len - offset;
realSplitNum++;
break;
}
offset += splits[realSplitNum].length;
}
file->split_num = realSplitNum;
file->splits = splits;
context->total_split_count += realSplitNum;
}
rel_data->files = lappend(rel_data->files, file);
}
} else {
FormatAOSegmentFileName(basepath, segno, -1, 0, &segno, segfile_path);
double hit_ratio = 0.0;
locations = fetch_hdfs_data_block_location(segfile_path, logic_len,
&block_num, relation->rd_node, segno, &hit_ratio);
*allblocks += block_num;
*hitblocks += block_num * hit_ratio;
//fake data locality need to recalculate logic length
if (debug_fake_datalocality) {
if ((locations != NULL) && (block_num > 0)) {
logic_len = 0;
for (int i = 0; i < block_num; i++) {
logic_len += locations[i].length;
}
}
}
//end fake
File_Split *split = (File_Split *) palloc(sizeof(File_Split));
file = (Relation_File *) palloc0(sizeof(Relation_File));
file->segno = segno;
split->offset = 0;
split->length = logic_len;
split->host = -1;
split->is_local_read = true;
split->ext_file_uri = NULL;
file->split_num = 1;
file->splits = split;
file->logic_len = logic_len;
if ((locations != NULL) && (block_num > 0)) {
Block_Host_Index * host_index = update_data_dist_stat(context,
locations, block_num);
double_check_hdfs_metadata_logic_length(locations, block_num, logic_len);
// fake data locality
if (debug_fake_datalocality) {
for (int k = 0; k < block_num; k++) {
fprintf(fp, "block %d of file %d of relation %d is on ", k, segno,
rel_data->relid);
for (int j = 0; j < locations[k].numOfNodes; j++) {
char *hostname = pstrdup(locations[k].hosts[j]); /* locations[i].hosts[j]; */
fprintf(fp, "host No%d name:%s, ", host_index[k].hostIndex[j],
hostname);
}
fprintf(fp, "\n");
}
fflush(fp);
}
file->block_num = block_num;
file->locations = locations;
file->hostIDs = host_index;
// for hash, we need to add block number to total_split_count
context->total_split_count += block_num;
} else {
file->block_num = 0;
file->locations = NULL;
file->hostIDs = NULL;
}
rel_data->files = lappend(rel_data->files, file);
}
total_size += logic_len;
if (!context->keep_hash) {
MemSet(segfile_path, 0, filepath_maxlen);
}
}
systable_endscan(aoscan);
heap_close(pg_aoseg_rel, AccessShareLock);
}
pfree(segfile_path);
rel_data->total_size = total_size;
return;
}
/*
* ParquetGetSegFileDataLocation: fetch the data location of the
* segment files of the Parquet relation.
*/
static void ParquetGetSegFileDataLocation(Relation relation,
AppendOnlyEntry *aoEntry, Snapshot metadataSnapshot,
split_to_segment_mapping_context *context, int64 splitsize,
Relation_Data *rel_data, int* hitblocks,
int* allblocks, GpPolicy *targetPolicy) {
char *basepath;
char *segfile_path;
int filepath_maxlen;
int64 total_size = 0;
Relation pg_parquetseg_rel;
TupleDesc pg_parquetseg_dsc;
HeapTuple tuple;
SysScanDesc parquetscan;
basepath = relpath(relation->rd_node);
filepath_maxlen = strlen(basepath) + 9;
segfile_path = (char *) palloc0(filepath_maxlen);
pg_parquetseg_rel = heap_open(aoEntry->segrelid, AccessShareLock);
pg_parquetseg_dsc = RelationGetDescr(pg_parquetseg_rel);
parquetscan = systable_beginscan(pg_parquetseg_rel, InvalidOid, FALSE,
metadataSnapshot, 0, NULL);
while (HeapTupleIsValid(tuple = systable_getnext(parquetscan))) {
BlockLocation *locations;
int block_num = 0;
Relation_File *file;
int segno = DatumGetInt32(
fastgetattr(tuple, Anum_pg_parquetseg_segno, pg_parquetseg_dsc, NULL));
int64 logic_len = (int64) DatumGetFloat8(
fastgetattr(tuple, Anum_pg_parquetseg_eof, pg_parquetseg_dsc, NULL));
context->total_metadata_logic_len += logic_len;
bool isRelationHash = true;
if (targetPolicy->nattrs == 0) {
isRelationHash = false;
}
if (!context->keep_hash || !isRelationHash) {
FormatAOSegmentFileName(basepath, segno, -1, 0, &segno, segfile_path);
double hit_ratio = 0.0;
locations = fetch_hdfs_data_block_location(segfile_path, logic_len,
&block_num, relation->rd_node, segno, &hit_ratio);
*allblocks += block_num;
*hitblocks += block_num * hit_ratio;
//fake data locality need to recalculate logic length
if (debug_fake_datalocality) {
if ((locations != NULL) && (block_num > 0)) {
logic_len = 0;
for (int i = 0; i < block_num; i++) {
logic_len += locations[i].length;
}
}
}
//end fake
if ((locations != NULL) && (block_num > 0)) {
File_Split *splits;
int split_num;
int64 offset = 0;
Block_Host_Index * host_index = update_data_dist_stat(context,
locations, block_num);
double_check_hdfs_metadata_logic_length(locations, block_num, logic_len);
file = (Relation_File *) palloc(sizeof(Relation_File));
file->segno = segno;
file->block_num = block_num;
file->locations = locations;
file->hostIDs = host_index;
file->logic_len = logic_len;
// split equals to block
int realSplitNum = 0;
split_num = file->block_num;
splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
for (realSplitNum = 0; realSplitNum < split_num; realSplitNum++) {
splits[realSplitNum].host = -1;
splits[realSplitNum].is_local_read = true;
splits[realSplitNum].offset = offset;
splits[realSplitNum].length = file->locations[realSplitNum].length;
splits[realSplitNum].logiceof = logic_len;
splits[realSplitNum].ext_file_uri = NULL;
if (logic_len - offset <= splits[realSplitNum].length) {
splits[realSplitNum].length = logic_len - offset;
realSplitNum++;
break;
}
offset += splits[realSplitNum].length;
}
file->split_num = realSplitNum;
file->splits = splits;
context->total_split_count += realSplitNum;
rel_data->files = lappend(rel_data->files, file);
}
} else {
FormatAOSegmentFileName(basepath, segno, -1, 0, &segno, segfile_path);
double hit_ratio = 0.0;
locations = fetch_hdfs_data_block_location(segfile_path, logic_len,
&block_num, relation->rd_node, segno, &hit_ratio);
*allblocks += block_num;
*hitblocks += block_num * hit_ratio;
File_Split *split = (File_Split *) palloc(sizeof(File_Split));
file = (Relation_File *) palloc0(sizeof(Relation_File));
file->segno = segno;
split->offset = 0;
split->length = logic_len;
split->logiceof = logic_len;
split->host = -1;
split->is_local_read = true;
split->ext_file_uri = NULL;
file->split_num = 1;
file->splits = split;
file->logic_len = logic_len;
if ((locations != NULL) && (block_num > 0)) {
Block_Host_Index * host_index = update_data_dist_stat(context,
locations, block_num);
double_check_hdfs_metadata_logic_length(locations, block_num, logic_len);
file->block_num = block_num;
file->locations = locations;
file->hostIDs = host_index;
// for hash, we need to add block number to total_split_count
context->total_split_count += block_num;
} else {
file->block_num = 0;
file->locations = NULL;
file->hostIDs = NULL;
}
rel_data->files = lappend(rel_data->files, file);
}
total_size += logic_len;
if (!context->keep_hash) {
MemSet(segfile_path, 0, filepath_maxlen);
}
}
systable_endscan(parquetscan);
heap_close(pg_parquetseg_rel, AccessShareLock);
pfree(segfile_path);
rel_data->total_size = total_size;
return;
}
static void InvokeHDFSProtocolBlockLocation(Oid procOid,
List *locs,
List **blockLocations)
{
ExtProtocolValidatorData *validator_data;
FmgrInfo *validator_udf;
FunctionCallInfoData fcinfo;
validator_data = (ExtProtocolValidatorData *)
palloc0 (sizeof(ExtProtocolValidatorData));
validator_udf = palloc(sizeof(FmgrInfo));
fmgr_info(procOid, validator_udf);
validator_data->type = T_ExtProtocolValidatorData;
validator_data->url_list = locs;
validator_data->format_opts = NULL;
validator_data->errmsg = NULL;
validator_data->direction = EXT_VALIDATE_READ;
validator_data->action = EXT_VALID_ACT_GETBLKLOC;
InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
/* FmgrInfo */ validator_udf,
/* nArgs */ 0,
/* Call Context */ (Node *) validator_data,
/* ResultSetInfo */ NULL);
/* invoke validator. if this function returns - validation passed */
FunctionCallInvoke(&fcinfo);
ExtProtocolBlockLocationData *bls =
(ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
/* debug output block location. */
if (bls != NULL)
{
ListCell *c;
int i = 0 ,j = 0;
foreach(c, bls->files)
{
blocklocation_file *blf = (blocklocation_file *)(lfirst(c));
elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
blf->file_uri, blf->block_num);
for ( i = 0 ; i < blf->block_num ; ++i )
{
BlockLocation *pbl = &(blf->locations[i]);
elog(DEBUG3, "DEBUG LOCATION for block %d : %d, "
INT64_FORMAT ", " INT64_FORMAT ", %d",
i,
pbl->corrupt, pbl->length, pbl->offset,
pbl->numOfNodes);
for ( j = 0 ; j < pbl->numOfNodes ; ++j )
{
elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s",
i,
pbl->hosts[j], pbl->names[j],
pbl->topologyPaths[j]);
}
}
}
}
elog(DEBUG3, "after invoking get block location API");
/* get location data from fcinfo.resultinfo. */
if (bls != NULL)
{
Assert(bls->type == T_ExtProtocolBlockLocationData);
while(list_length(bls->files) > 0)
{
void *v = lfirst(list_head(bls->files));
bls->files = list_delete_first(bls->files);
*blockLocations = lappend(*blockLocations, v);
}
}
pfree(validator_data);
pfree(validator_udf);
}
Oid
LookupCustomProtocolBlockLocationFunc(char *protoname)
{
List* funcname = NIL;
Oid procOid = InvalidOid;
Oid argList[1];
Oid returnOid;
char* new_func_name = (char *)palloc0(strlen(protoname) + 16);
sprintf(new_func_name, "%s_blocklocation", protoname);
funcname = lappend(funcname, makeString(new_func_name));
returnOid = VOIDOID;
procOid = LookupFuncName(funcname, 0, argList, true);
if (!OidIsValid(procOid))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("protocol function %s was not found.",
new_func_name),
errhint("Create it with CREATE FUNCTION."),
errOmitLocation(true)));
/* check return type matches */
if (get_func_rettype(procOid) != returnOid)
ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("protocol function %s has an incorrect return type",
new_func_name),
errOmitLocation(true)));
/* check allowed volatility */
if (func_volatile(procOid) != PROVOLATILE_STABLE)
ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("protocol function %s is not declared STABLE.",
new_func_name),
errOmitLocation(true)));
pfree(new_func_name);
return procOid;
}
static void ExternalGetHdfsFileDataLocation(
Relation relation,
split_to_segment_mapping_context *context,
int64 splitsize,
Relation_Data *rel_data,
int* allblocks) {
ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
Assert(ext_entry->locations != NIL);
int64 total_size = 0;
int segno = 1;
/*
* Step 1. get external HDFS location from URI.
*/
char* first_uri_str = (char *) strVal(lfirst(list_head(ext_entry->locations)));
/* We must have at least one location. */
Assert(first_uri_str != NULL);
Uri* uri = ParseExternalTableUri(first_uri_str);
bool isHdfs = false;
if (uri != NULL && is_hdfs_protocol(uri)) {
isHdfs = true;
}
Assert(isHdfs); /* Currently, we accept HDFS only. */
/*
* Step 2. Get function to call for getting location information. This work
* is done by validator function registered for this external protocol.
*/
Oid procOid = InvalidOid;
if (isHdfs) {
procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
}
else
{
Assert(false);
}
/*
* Step 3. Call validator to get location data.
*/
/* Prepare function call parameter by passing into location string. This is
* only called at dispatcher side. */
List *bls = NULL; /* Block locations */
if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
{
InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, &bls);
}
/*
* Step 4. Build data location info for optimization after this call.
*/
/* Go through each files */
ListCell *cbl = NULL;
foreach(cbl, bls)
{
blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
BlockLocation *locations = f->locations;
int block_num = f->block_num;
int64 logic_len = 0;
*allblocks += block_num;
if ((locations != NULL) && (block_num > 0)) {
// calculate length for one specific file
for (int j = 0; j < block_num; ++j) {
logic_len += locations[j].length;
// locations[j].lowerBoundInc = NULL;
// locations[j].upperBoundExc = NULL;
}
total_size += logic_len;
Block_Host_Index * host_index = update_data_dist_stat(context,
locations, block_num);
Relation_File *file = (Relation_File *) palloc(sizeof(Relation_File));
if (atoi(strrchr(f->file_uri, '/') + 1) > 0)
file->segno = atoi(strrchr(f->file_uri, '/') + 1);
else
file->segno = segno++;
file->block_num = block_num;
file->locations = locations;
file->hostIDs = host_index;
file->logic_len = logic_len;
// do the split logic
int realSplitNum = 0;
int split_num = file->block_num;
int64 offset = 0;
File_Split *splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
while (realSplitNum < split_num) {
splits[realSplitNum].host = -1;
splits[realSplitNum].is_local_read = true;
splits[realSplitNum].offset = offset;
splits[realSplitNum].length = file->locations[realSplitNum].length;
splits[realSplitNum].logiceof = logic_len;
splits[realSplitNum].ext_file_uri = pstrdup(f->file_uri);
if (logic_len - offset <= splits[realSplitNum].length) {
splits[realSplitNum].length = logic_len - offset;
++realSplitNum;
break;
}
offset += splits[realSplitNum].length;
++realSplitNum;
}
file->split_num = realSplitNum;
file->splits = splits;
context->total_split_count += realSplitNum;
rel_data->files = lappend(rel_data->files, file);
}
}
context->total_metadata_logic_len += total_size;
rel_data->total_size = total_size;
}
/*
* step 1 search segments with local read and segment is not full after being assigned the block
* step 2 search segments with local read and segment is not full before being assigned the block
* step 3 search segments which is not full after being assigned the block with net_disk_ratio penalty
* step 4 all the segments are full, then select the segment with smallest size after being assigned the block
* step 5 avoid assigning all of full split to vseg 0
* note: assign a non-local blocks to segment will increase size of segment with block size * net_disk_ratio
*/
static int select_random_host_algorithm(Relation_Assignment_Context *context,
int64 splitsize, int64 maxExtendedSizePerSegment, TargetSegmentIDMap *idMap,
Block_Host_Index **hostID, int fileindex, Oid partition_parent_oid, bool* isLocality) {
*isLocality = false;
bool isExceedVolume = false;
bool isExceedWholeSize =false;
bool isExceedPartitionTableSize =false;
//step1
int64 minvols = INT64_MAX;
int minindex = 0;
ListCell* lc;
for (int l = 0; l < (*hostID)->replica_num; l++) {
uint32_t key = (*hostID)->hostIndex[l];
PAIR pair = getHASHTABLENode(context->vseg_to_splits_map,
TYPCONVERT(void *, key));
if (pair == NULL) {
continue;
}
List* val = (List*) (pair->Value);
foreach(lc, val)
{
int j = lfirst_int(lc);
isExceedVolume = splitsize + context->vols[j] > maxExtendedSizePerSegment;
isExceedWholeSize = balance_on_whole_query_level
&& splitsize + context->totalvols_with_penalty[j]
> context->avg_size_of_whole_query;
int64** partitionvols_with_penalty=NULL;
if(partition_parent_oid > 0){
PAIR p = getHASHTABLENode(context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
partitionvols_with_penalty = (int64 **) (&(p->Value));
isExceedPartitionTableSize = balance_on_partition_table_level && splitsize
+ (*partitionvols_with_penalty)[j]
> context->avg_size_of_whole_partition_table;
}
else{
isExceedPartitionTableSize= false;
}
if (context->block_lessthan_vseg_round_robin_no >= 0 && context->vols[j] > 0) {
continue;
}
if ((!isExceedWholeSize || context->totalvols_with_penalty[j] == 0)
&& (!isExceedVolume || context->vols[j] == 0)
&& (!isExceedPartitionTableSize || (*partitionvols_with_penalty)[j] ==0)) {
{
*isLocality = true;
if (minvols > context->vols[j]) {
minvols = context->vols[j];
minindex = j;
} else if(minvols == context->vols[j]){
/*prefer insert host if exists*/
int inserthost = (*hostID)->insertHost;
if(inserthost == idMap->global_IDs[j]){
minindex = j;
}
else if (context->block_lessthan_vseg_round_robin_no >= 0) {
if (l == context->block_lessthan_vseg_round_robin_no) {
minindex = j;
context->block_lessthan_vseg_round_robin_no = (context->block_lessthan_vseg_round_robin_no+1)%(*hostID)->replica_num;
}
}
}
}
}
}
}
if (*isLocality) {
return minindex;
}
//step2
minvols = INT64_MAX;
bool isFound = false;
for (int j = 0; j < context->virtual_segment_num; j++) {
isExceedVolume = net_disk_ratio * splitsize + context->vols[j]
> maxExtendedSizePerSegment;
isExceedWholeSize = balance_on_whole_query_level
&& net_disk_ratio * splitsize + context->totalvols_with_penalty[j]
> context->avg_size_of_whole_query;
int64** partitionvols_with_penalty=NULL;
if(partition_parent_oid > 0){
PAIR p = getHASHTABLENode(context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
partitionvols_with_penalty = (int64 **) (&(p->Value));
isExceedPartitionTableSize = balance_on_partition_table_level &&
splitsize + (*partitionvols_with_penalty)[j]
> context->avg_size_of_whole_partition_table;
} else {
isExceedPartitionTableSize = false;
}
if ((!isExceedWholeSize || context->totalvols_with_penalty[j] == 0)
&& (!isExceedVolume || context->vols[j] == 0)
&& (!isExceedPartitionTableSize || (*partitionvols_with_penalty)[j] ==0)) {
isFound = true;
if (minvols > context->vols[j]) {
minvols = context->vols[j];
minindex = j;
} else if (minvols == context->vols[j]
&& context->totalvols_with_penalty[j]
< context->totalvols_with_penalty[minindex]*0.9) {
minindex = j;
}
}
}
if (isFound) {
return minindex;
}
//step3
minvols = INT64_MAX;
minindex = 0;
isFound = false;
for (int j = 0; j < context->virtual_segment_num; j++) {
bool isLocalSegment = false;
for (int l = 0; l < (*hostID)->replica_num; l++) {
if ((*hostID)->hostIndex[l] == idMap->global_IDs[j]) {
isLocalSegment = true;
if (balance_on_whole_query_level
&& context->totalvols_with_penalty[j] + splitsize
> context->avg_size_of_whole_query && context->totalvols_with_penalty[j] > 0) {
continue;
}
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
if (balance_on_partition_table_level
&& splitsize + (*partitionvols_with_penalty)[j]
> context->avg_size_of_whole_partition_table
&& (*partitionvols_with_penalty)[j] != 0) {
continue;
}
}
if (context->vols[j] + splitsize < minvols) {
minvols = context->vols[j] + splitsize;
minindex = j;
*isLocality = true;
isFound = true;
}
else if (context->vols[j] + splitsize == minvols
&& context->totalvols_with_penalty[j]
< context->totalvols_with_penalty[minindex] * 0.9) {
minindex = j;
*isLocality = true;
isFound = true;
}
}
}
if (!isLocalSegment) {
if (balance_on_whole_query_level
&& context->totalvols_with_penalty[j] + net_disk_ratio * splitsize
> context->avg_size_of_whole_query
&& context->totalvols_with_penalty[j] > 0) {
continue;
}
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
if (balance_on_partition_table_level
&& net_disk_ratio * splitsize + (*partitionvols_with_penalty)[j]
> context->avg_size_of_whole_partition_table
&& (*partitionvols_with_penalty)[j] != 0) {
continue;
}
}
if (context->vols[j] + net_disk_ratio * splitsize < minvols) {
minvols = context->vols[j] + net_disk_ratio * splitsize;
minindex = j;
*isLocality = false;
isFound = true;
} else if (context->vols[j] + net_disk_ratio * splitsize == minvols
&& context->totalvols_with_penalty[j]
< context->totalvols_with_penalty[minindex]*0.9) {
minindex = j;
*isLocality = false;
isFound = true;
}
}
}
if(isFound){
return minindex;
}
//step4
minvols = INT64_MAX;
minindex = 0;
for (int j = 0; j < context->virtual_segment_num; j++) {
bool isLocalSegment = false;
for (int l = 0; l < (*hostID)->replica_num; l++) {
if ((*hostID)->hostIndex[l] == idMap->global_IDs[j]) {
isLocalSegment = true;
if (context->vols[j] + splitsize < minvols) {
minvols = context->vols[j] + splitsize;
minindex = j;
*isLocality = true;
} else if (context->vols[j] + splitsize == minvols
&& context->totalvols_with_penalty[j]
< context->totalvols_with_penalty[minindex] *0.9) {
minindex = j;
*isLocality = true;
}
}
}
if (!isLocalSegment) {
if (context->vols[j] + net_disk_ratio * splitsize < minvols) {
minvols = context->vols[j] + net_disk_ratio * splitsize;
minindex = j;
*isLocality = false;
} else if (context->vols[j] + net_disk_ratio * splitsize == minvols
&& context->totalvols_with_penalty[j]
< context->totalvols_with_penalty[minindex] *0.9) {
minindex = j;
*isLocality = false;
}
}
}
if (debug_fake_datalocality) {
fprintf(fp,
"cur_size_of_whole_query is:%.0f, avg_size_of_whole_query is: %.3f",
context->totalvols_with_penalty[minindex] + net_disk_ratio * splitsize,
context->avg_size_of_whole_query);
}
return minindex;
}
static void assign_split_to_host(Host_Assignment_Result *result,
Detailed_File_Split *split) {
Detailed_File_Split *last_split;
if (split->length == 0) {
return;
}
/*
* for the first split.
*/
if (result->count == 0) {
result->splits[result->count++] = *split;
return;
}
/*
* we try to merge the new split into
* the old split.
*/
last_split = result->splits + result->count - 1;
if ((last_split->rel_oid == split->rel_oid)
&& (last_split->segno == split->segno)
&& (last_split->offset + last_split->length == split->offset)) {
last_split->length += split->length;
return;
}
/*
* cannot merge into the last split.
*/
if (result->count < result->max_size) {
result->splits[result->count++] = *split;
return;
}
/*
* not enough space.
*/
result->max_size <<= 1;
result->splits = (Detailed_File_Split *) repalloc(result->splits,
sizeof(Detailed_File_Split) * (result->max_size));
result->splits[result->count++] = *split;
return;
}
static void assign_splits_to_hosts(Split_Assignment_Result *result,
Detailed_File_Split *splits, int split_num) {
int i;
for (i = 0; i < split_num; i++) {
Detailed_File_Split *split = splits + i;
assign_split_to_host(result->host_assigns + split->host, split);
}
return;
}
static List *
search_map_node(List *result, Oid rel_oid, int host_num,
SegFileSplitMapNode **found_map_node) {
SegFileSplitMapNode *map_node = NULL;
ListCell *lc;
int i;
foreach(lc, result)
{
SegFileSplitMapNode *split_map_node = (SegFileSplitMapNode *) lfirst(lc);
if (split_map_node->relid == rel_oid) {
*found_map_node = split_map_node;
return result;
}
}
map_node = makeNode(SegFileSplitMapNode);
map_node->relid = rel_oid;
map_node->splits = NIL;
for (i = 0; i < host_num; i++) {
map_node->splits = lappend(map_node->splits, NIL);
}
*found_map_node = map_node;
result = lappend(result, map_node);
return result;
}
static List *
post_process_assign_result(Split_Assignment_Result *assign_result) {
List *final_result = NIL;
int i;
for (i = 0; i < assign_result->host_num; i++) {
Host_Assignment_Result *har = assign_result->host_assigns + i;
int j = 0;
Oid last_oid = InvalidOid;
SegFileSplitMapNode *last_mapnode = NULL;
while (j < har->count) {
Detailed_File_Split *split = har->splits + j;
ListCell *per_seg_splits;
bool empty_seg = false;
char *p = NULL;
FileSplit fileSplit = makeNode(FileSplitNode);
if (split->rel_oid != last_oid) {
last_oid = split->rel_oid;
final_result = search_map_node(final_result, last_oid,
assign_result->host_num, &last_mapnode);
}
fileSplit->segno = (split->segno - split->index) + 1;
fileSplit->logiceof = split->logiceof;
if (split->length <= 0) {
empty_seg = true;
}
fileSplit->offsets = split->offset;
fileSplit->lengths = split->length;
p = split->ext_file_uri_string;
fileSplit->ext_file_uri_string = p ? pstrdup(p) : (char *) NULL;
j += 1;
if (empty_seg) {
if (fileSplit->ext_file_uri_string) {
pfree(fileSplit->ext_file_uri_string);
}
pfree(fileSplit);
} else {
per_seg_splits = list_nth_cell((List *) (last_mapnode->splits), i);
lfirst(per_seg_splits) = lappend((List *) lfirst(per_seg_splits),
fileSplit);
}
}
}
return final_result;
}
/*
* compare two relation
*/
static int compare_relation_size(const void *e1, const void *e2) {
Relation_Data **s1 = (Relation_Data **) e1;
Relation_Data **s2 = (Relation_Data **) e2;
/*
* host id first.
*/
if ((*s1)->total_size < (*s2)->total_size) {
return 1;
}
if ((*s1)->total_size > (*s2)->total_size) {
return -1;
}
return 0;
}
/*
* compare two file based on continuity
*/
static int compare_file_continuity(const void *e1, const void *e2) {
Relation_File **s1 = (Relation_File **) e1;
Relation_File **s2 = (Relation_File **) e2;
/*
* host id first.
*/
if ((*s1)->continue_ratio < (*s2)->continue_ratio) {
return 1;
}
if ((*s1)->continue_ratio > (*s2)->continue_ratio) {
return -1;
}
return 0;
}
/*
* compare two file based on segno.
*/
static int compare_file_segno(const void *e1, const void *e2) {
Relation_File **s1 = (Relation_File **) e1;
Relation_File **s2 = (Relation_File **) e2;
/*
* host id first.
*/
if ((*s1)->segno < (*s2)->segno) {
return -1;
}
if ((*s1)->segno > (*s2)->segno) {
return 1;
}
return 0;
}
/*
* compare two container-segment Pair.
*/
static int compare_container_segment(const void *e1, const void *e2) {
segmentFilenoPair *s1 = (segmentFilenoPair *) e1;
segmentFilenoPair *s2 = (segmentFilenoPair *) e2;
/*
* host id first.
*/
if (s1->fileno < s2->fileno) {
return -1;
}
if (s1->fileno > s2->fileno) {
return 1;
}
return 0;
}
/*
* compare two detailed file splits.
*/
static int compare_detailed_file_split(const void *e1, const void *e2) {
Detailed_File_Split *s1 = (Detailed_File_Split *) e1;
Detailed_File_Split *s2 = (Detailed_File_Split *) e2;
/*
* host id first.
*/
if (s1->host < s2->host) {
return -1;
}
if (s1->host > s2->host) {
return 1;
}
/*
* relation id second.
*/
if (s1->rel_oid < s2->rel_oid) {
return -1;
}
if (s1->rel_oid > s2->rel_oid) {
return 1;
}
/*
* file id third.
*/
if (s1->segno < s2->segno) {
return -1;
}
if (s1->segno > s2->segno) {
return 1;
}
/*
* offset forth.
*/
if (s1->offset < s2->offset) {
return -1;
}
if (s1->offset > s2->offset) {
return 1;
}
return 0;
}
/*
* compare two container-segment Pair.
*/
static int
compare_hostid(const void *e1, const void *e2)
{
Host_Index* s1 = (Host_Index*) e1;
Host_Index* s2 = (Host_Index*) e2;
return strcmp((s1)->hostname,(s2)->hostname);
}
/*
*
*/
static void change_hash_virtual_segments_order(QueryResource ** resourcePtr,
Relation_Data *rel_data,
Relation_Assignment_Context *assignment_context_ptr,
TargetSegmentIDMap* idMap_ptr) {
// first we check if datalocality is one without changing vseg order
ListCell *lc_file;
bool datalocalityEqualsOne = true;
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
for (int i = 0; i < rel_file->split_num; i++) {
int targethost = (rel_file->segno - 1)
% ((*assignment_context_ptr).virtual_segment_num);
for (int p = 0; p < rel_file->block_num; p++) {
bool islocal = false;
Block_Host_Index *hostID = rel_file->hostIDs + p;
for (int l = 0; l < hostID->replica_num; l++) {
if (hostID->hostIndex[l] == (*idMap_ptr).global_IDs[targethost]) {
islocal = true;
break;
}
}
if (!islocal) {
datalocalityEqualsOne = false;
}
}
}
}
if(datalocalityEqualsOne){
if (debug_print_split_alloc_result) {
elog(LOG, "didn't change virtual segments order");
}
return;
}
if (debug_print_split_alloc_result) {
elog(LOG, "change virtual segments order");
}
TargetSegmentIDMap idMap = *idMap_ptr;
Relation_Assignment_Context assignment_context = *assignment_context_ptr;
//ListCell *lc_file;
int fileCount = list_length(rel_data->files);
// empty table may have zero filecount
if (fileCount > 0) {
int *vs2fileno = (int *) palloc(
sizeof(int) * assignment_context.virtual_segment_num);
for (int i = 0; i < assignment_context.virtual_segment_num; i++) {
vs2fileno[i] = -1;
}
Relation_File** rel_file_vector = (Relation_File**) palloc(
sizeof(Relation_File*) * fileCount);
int i = 0;
foreach(lc_file, rel_data->files)
{
rel_file_vector[i++] = (Relation_File *) lfirst(lc_file);
}
qsort(rel_file_vector, fileCount, sizeof(Relation_File*),
compare_file_segno);
// fileCount = assignment_context.virtual_segment_num * const value
// when there is parallel insert, const value can be greater than 1
int filesPerSegment = fileCount / assignment_context.virtual_segment_num;
int* numOfLocalRead = (int *) palloc(sizeof(int) * assignment_context.virtual_segment_num);
// assign files to the virtual segments with max local read. one file group to one virtual segment
// file group is files with the same module value to virtual_segment_num
ListCell* lc;
for (i = 0; i < assignment_context.virtual_segment_num; i++) {
int largestFileIndex = i;
int maxLogicLen = -1;
for (int j = 0; j < filesPerSegment; j++) {
if (rel_file_vector[i + j * assignment_context.virtual_segment_num]->logic_len
> maxLogicLen) {
largestFileIndex = i + j * assignment_context.virtual_segment_num;
maxLogicLen =
rel_file_vector[i + j * assignment_context.virtual_segment_num]->logic_len;
}
}
for (int j = 0; j < assignment_context.virtual_segment_num; j++) {
numOfLocalRead[j] = 0;
}
for(int k=0;k<rel_file_vector[largestFileIndex]->block_num;k++){
Block_Host_Index *hostID = rel_file_vector[largestFileIndex]->hostIDs + k;
for (int l = 0; l < hostID->replica_num; l++) {
uint32_t key = hostID->hostIndex[l];
PAIR pair = getHASHTABLENode(assignment_context_ptr->vseg_to_splits_map,TYPCONVERT(void *,key));
if( pair == NULL)
{
continue;
}
List* val = (List*)(pair->Value);
foreach(lc, val)
{
int j = lfirst_int(lc);
if (vs2fileno[j] == -1) {
numOfLocalRead[j] = numOfLocalRead[j] + 1;
}
}
}
}
int localMax = -1;
for (int j = 0; j < assignment_context.virtual_segment_num; j++) {
if (numOfLocalRead[j] > localMax && vs2fileno[j] == -1) {
// assign file group to the same host(container) index.
for (int k = 0; k < filesPerSegment; k++) {
rel_file_vector[i + k * assignment_context.virtual_segment_num]->segmentid = j;
}
localMax = numOfLocalRead[j];
}
}
if (debug_print_split_alloc_result) {
elog(LOG, "hash file segno %d 's max locality is %d",rel_file_vector[i]->segno,rel_file_vector[i]->segmentid);
}
vs2fileno[rel_file_vector[i]->segmentid] = rel_file_vector[i]->segno;
}
MemoryContext old = MemoryContextSwitchTo(TopMemoryContext);
int segmentCount = assignment_context.virtual_segment_num;
segmentFilenoPair* sfPairVector = (segmentFilenoPair *) palloc(
sizeof(segmentFilenoPair) * segmentCount);
int p = 0;
for (p = 0; p < segmentCount; p++) {
sfPairVector[p].segmentid = p;
sfPairVector[p].fileno = vs2fileno[p];
}
qsort(sfPairVector, segmentCount, sizeof(segmentFilenoPair),
compare_container_segment);
Segment** segmentsVector = (Segment **) palloc(
sizeof(Segment*) * segmentCount);
p = 0;
foreach (lc, (*resourcePtr)->segments)
{
Segment *info = (Segment *) lfirst(lc);
segmentsVector[p++] = info;
}
(*resourcePtr)->segments = NIL;
TargetSegmentIDMap tmpidMap;
tmpidMap.target_segment_num = idMap.target_segment_num;
tmpidMap.global_IDs = (int *) palloc(
sizeof(int) * tmpidMap.target_segment_num);
tmpidMap.hostname = (char **) palloc(
sizeof(char*) * tmpidMap.target_segment_num);
for (int l = 0; l < tmpidMap.target_segment_num; l++) {
tmpidMap.hostname[l] = (char *) palloc(
sizeof(char) * HOSTNAME_MAX_LENGTH);
}
for (int l = 0; l < tmpidMap.target_segment_num; l++) {
tmpidMap.global_IDs[l] = idMap.global_IDs[l];
strncpy(tmpidMap.hostname[l], idMap.hostname[l], HOSTNAME_MAX_LENGTH - 1);
}
for (p = 0; p < segmentCount; p++) {
segmentsVector[sfPairVector[p].segmentid]->segindex = p;
(*resourcePtr)->segments = lappend((*resourcePtr)->segments,
segmentsVector[sfPairVector[p].segmentid]);
(*idMap_ptr).global_IDs[p] =
tmpidMap.global_IDs[sfPairVector[p].segmentid];
strncpy((*idMap_ptr).hostname[p],
tmpidMap.hostname[sfPairVector[p].segmentid],
HOSTNAME_MAX_LENGTH-1);
if (debug_print_split_alloc_result) {
elog(LOG, "virtual segment No%d 's name is %s.",p, (*idMap_ptr).hostname[p]);
}
}
pfree(tmpidMap.global_IDs);
for (int l = 0; l < tmpidMap.target_segment_num; l++) {
pfree(tmpidMap.hostname[l]);
}
pfree(tmpidMap.hostname);
pfree(sfPairVector);
pfree(segmentsVector);
MemoryContextSwitchTo(old);
pfree(vs2fileno);
pfree(rel_file_vector);
pfree(numOfLocalRead);
}
}
/*
*
*/
static void allocation_preparation(List *hosts, TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context) {
/*
* initialize the ID mapping.
*/
idMap->target_segment_num = list_length(hosts);
idMap->global_IDs = (int *) palloc(sizeof(int) * idMap->target_segment_num);
idMap->hostname = (char **) palloc(sizeof(char*) * idMap->target_segment_num);
for (int p = 0; p < idMap->target_segment_num; p++) {
idMap->hostname[p] = (char *) palloc(sizeof(char) * HOSTNAME_MAX_LENGTH);
}
int i = 0;
assignment_context->vseg_to_splits_map = createHASHTABLE(
context->datalocality_memorycontext, 2048,
HASHTABLE_SLOT_VOLUME_DEFAULT_MAX, HASHTABLE_KEYTYPE_UINT32,
NULL);
ListCell *lc;
foreach(lc, hosts)
{
VirtualSegmentNode *vsn = (VirtualSegmentNode *) lfirst(lc);
HostDataVolumeInfo *hdvInfo = search_host_in_stat_context(context,
vsn->hostname);
idMap->global_IDs[i] = hdvInfo->hashEntry->index;
// add vseg to hashtable
uint32_t key=idMap->global_IDs[i];
if(getHASHTABLENode(assignment_context->vseg_to_splits_map,TYPCONVERT(void *,key))==NULL){
setHASHTABLENode(assignment_context->vseg_to_splits_map, TYPCONVERT(void *,key), NIL, false);
}
PAIR p = getHASHTABLENode(assignment_context->vseg_to_splits_map,TYPCONVERT(void *,key));
List** val = (List **)(&(p->Value));
*val = lappend_int(*val, i);
hdvInfo->occur_count++;
strncpy(idMap->hostname[i], vsn->hostname, HOSTNAME_MAX_LENGTH-1);
if (debug_print_split_alloc_result) {
elog(LOG, "datalocality using segment No%d hostname/id: %s/%d",i,vsn->hostname,idMap->global_IDs[i]);
}
i++;
// fake data locality
if (debug_fake_datalocality) {
fprintf(fp, "virtual segment No%d: %s\n", i - 1,
vsn->hostname);
}
}
/*
* initialize the assignment context.
*/
assignment_context->virtual_segment_num = idMap->target_segment_num;
assignment_context->vols = (int64 *) palloc(
sizeof(int64) * assignment_context->virtual_segment_num);
assignment_context->totalvols = (int64 *) palloc(
sizeof(int64) * assignment_context->virtual_segment_num);
assignment_context->totalvols_with_penalty = (int64 *) palloc(
sizeof(int64) * assignment_context->virtual_segment_num);
assignment_context->continue_split_num = (int *) palloc(
sizeof(int) * assignment_context->virtual_segment_num);
assignment_context->split_num = (int *) palloc(
sizeof(int) * assignment_context->virtual_segment_num);
assignment_context->roundrobinIndex = 0;
assignment_context->total_split_num = 0;
assignment_context->avg_size_of_whole_query =0.0;
MemSet(assignment_context->totalvols, 0,
sizeof(int64) * assignment_context->virtual_segment_num);
MemSet(assignment_context->totalvols_with_penalty, 0,
sizeof(int64) * assignment_context->virtual_segment_num);
MemSet(assignment_context->continue_split_num, 0,
sizeof(int) * assignment_context->virtual_segment_num);
MemSet(assignment_context->split_num, 0,
sizeof(int) * assignment_context->virtual_segment_num);
}
/*
*change_file_order_based_on_continuity
*/
static Relation_File** change_file_order_based_on_continuity(
Relation_Data *rel_data, TargetSegmentIDMap* idMap, int host_num,
int* fileCount, Relation_Assignment_Context *assignment_context) {
Relation_File** file_vector = NULL;
int* isBlockContinue = (int *) palloc(sizeof(int) * host_num);
for (int i = 0; i < host_num; i++) {
isBlockContinue[i] = 0;
}
*fileCount = 0;
ListCell* lc_file;
// sort relations by size.
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
if (rel_file->hostIDs == NULL) {
rel_file->continue_ratio = 0;
*fileCount = *fileCount + 1;
continue;
}
bool isBlocksBegin = true;
bool isLocalContinueBlockFound = false;
int file_total_block_count = 0;
int file_continue_block_count = 0;
ListCell* lc;
for (int i = 0; i < host_num; i++) {
isBlockContinue[i] = 0;
}
int continuityBeginIndex = 0;
file_total_block_count = rel_file->split_num;
for (int i = 0; i < rel_file->split_num; i++) {
if (isBlocksBegin) {
Block_Host_Index *hostID = rel_file->hostIDs + i;
for (int l = 0; l < hostID->replica_num; l++) {
uint32_t key = hostID->hostIndex[l];
PAIR pair = getHASHTABLENode(assignment_context->vseg_to_splits_map,
TYPCONVERT(void *, key));
if (pair == NULL) {
continue;
}
List* val = (List*) (pair->Value);
foreach(lc, val)
{
int j = lfirst_int(lc);
isBlockContinue[j]++;
isLocalContinueBlockFound = true;
isBlocksBegin = false;
}
}
} else {
Block_Host_Index *hostID = rel_file->hostIDs + i;
for (int l = 0; l < hostID->replica_num; l++) {
uint32_t key = hostID->hostIndex[l];
PAIR pair = getHASHTABLENode(assignment_context->vseg_to_splits_map,
TYPCONVERT(void *, key));
if (pair == NULL) {
continue;
}
List* val = (List*) (pair->Value);
foreach(lc, val)
{
int j = lfirst_int(lc);
if (isBlockContinue[j] == i - continuityBeginIndex) {
isBlockContinue[j]++;
isLocalContinueBlockFound = true;
}
}
}
}
if (!isLocalContinueBlockFound || i == rel_file->split_num - 1) {
int maxBlockContinue = 0;
for (int k = 0; k < host_num; k++) {
if (isBlockContinue[k] > maxBlockContinue) {
maxBlockContinue = isBlockContinue[k];
}
}
if (maxBlockContinue >= 2) {
file_continue_block_count += maxBlockContinue;
}
for (int k = 0; k < host_num; k++) {
isBlockContinue[k] = 0;
}
isBlocksBegin = true;
if (maxBlockContinue == 0) {
continuityBeginIndex = i + 1;
} else if (i != rel_file->split_num - 1) {
continuityBeginIndex = i;
i = i - 1;
}
}
isLocalContinueBlockFound = false;
}
rel_file->continue_ratio = (double) file_continue_block_count
/ file_total_block_count;
if (debug_print_split_alloc_result) {
elog(LOG, "file %d continuity ratio %f",rel_file->segno,rel_file->continue_ratio);
}
*fileCount = *fileCount + 1;
}
if (*fileCount > 0) {
file_vector = (Relation_File**) palloc(sizeof(Relation_File*) * *fileCount);
int fileindex = 0;
foreach(lc_file, rel_data->files)
{
file_vector[fileindex++] = (Relation_File *) lfirst(lc_file);
}
qsort(file_vector, *fileCount, sizeof(Relation_File*),
compare_file_continuity);
}
pfree(isBlockContinue);
return file_vector;
}
/*
*set_maximum_segment_volume_parameter
*/
static int64 set_maximum_segment_volume_parameter(Relation_Data *rel_data,
int vseg_num, double *maxSizePerSegment) {
int64 maxSizePerSegmentDiffBigVolume = 0;
int64 maxSizePerSegmentDiffSmallVolume = 0;
int64 maxSizePerSegmentDiffScalar = 0;
*maxSizePerSegment = rel_data->total_size / (double) vseg_num;
ListCell* lc_file;
int64 totalRelLastBlockSize = 0;
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
for (int i = 0; i < rel_file->split_num; i++) {
if (i == rel_file->split_num - 1) {
totalRelLastBlockSize += rel_file->splits[i].length;
}
}
}
double bigVolumeRatio = 0.001;
double smallVolumeRatio = 0.05;
maxSizePerSegmentDiffBigVolume = *maxSizePerSegment * bigVolumeRatio;
maxSizePerSegmentDiffSmallVolume = totalRelLastBlockSize / (double) vseg_num
* smallVolumeRatio;
maxSizePerSegmentDiffScalar = 32 << 20;
// when curSize > maxSizePerSegment, we allow some segments exceed the average volumes.
// with conditions: it less than 0.001* totalrelationsize and
// less than 32M(in case of Big Table such as 1T*0.001=1G which lead to data extremely
// imbalance) or it less than 0.05* the sum of size of all the last blocks of relation,
// which we call it as maxSizePerSegmentDiffSmallVolume (consider we have 64 small files with 1.5M
// avg size and 16 segment to assign. maxSizePerSegmentshould be 1.5*4 =6M and we can allow
// a exceed about 6+ 6*0.05=6.3M)
if (maxSizePerSegmentDiffBigVolume > maxSizePerSegmentDiffScalar){
maxSizePerSegmentDiffBigVolume = maxSizePerSegmentDiffScalar;
}
if(maxSizePerSegmentDiffBigVolume > maxSizePerSegmentDiffSmallVolume){
return maxSizePerSegmentDiffBigVolume + (int64)(*maxSizePerSegment) + 1;
}
else{
return maxSizePerSegmentDiffSmallVolume + (int64)(*maxSizePerSegment) + 1;
}
}
/*
*allocate_hash_relation
*/
/*
*allocate_random_relation
*/
static bool allocate_hash_relation(Relation_Data* rel_data,
Assignment_Log_Context* log_context, TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context, bool parentIsHashExist, bool parentIsHash) {
/*allocation unit in hash relation is file, we assign all the blocks of one file to one virtual segments*/
ListCell *lc_file;
int fileCount = 0;
Oid myrelid = rel_data->relid;
double relationDatalocality=1.0;
uint64_t before_allocate_hash = gettime_microsec();
foreach(lc_file, rel_data->files)
{
fileCount++;
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
for (int i = 0; i < rel_file->split_num; i++) {
int64 split_size = rel_file->splits[i].length;
int targethost = (rel_file->segno - 1) % (assignment_context->virtual_segment_num);
/*calculate keephash datalocality*/
/*for keep hash one file corresponds to one split*/
for (int p = 0; p < rel_file->block_num; p++) {
bool islocal = false;
Block_Host_Index *hostID = rel_file->hostIDs + p;
for (int l = 0; l < hostID->replica_num; l++) {
if (debug_print_split_alloc_result) {
elog(LOG, "file id is %d; vd id is %d",hostID->hostIndex[l],idMap->global_IDs[targethost]);
}
if (hostID->hostIndex[l] == idMap->global_IDs[targethost]) {
log_context->localDataSizePerRelation +=
rel_file->locations[p].length;
islocal = true;
break;
}
}
if (debug_print_split_alloc_result && !islocal) {
elog(LOG, "non local relation %u, file: %d, block: %d",myrelid,rel_file->segno,p);
}
}
log_context->totalDataSizePerRelation += split_size;
}
}
uint64_t after_allocate_hash = gettime_microsec();
int time_allocate_hash_firstfor = after_allocate_hash - before_allocate_hash;
if(log_context->totalDataSizePerRelation > 0){
relationDatalocality = log_context->localDataSizePerRelation / log_context->totalDataSizePerRelation;
}
double hash2RandomDatalocalityThreshold= 0.9;
/*for partition hash table, whether to convert random table to hash
* is determined by the datalocality of the first partition*/
if (parentIsHashExist) {
if (!parentIsHash) {
log_context->totalDataSizePerRelation = 0;
log_context->localDataSizePerRelation = 0;
return true;
}
}
else if((hash_to_random_flag == ENFORCE_HASH_TO_RANDOM ||
(relationDatalocality < hash2RandomDatalocalityThreshold && relationDatalocality >= 0 ))
&& hash_to_random_flag != ENFORCE_KEEP_HASH){
log_context->totalDataSizePerRelation =0;
log_context->localDataSizePerRelation =0;
return true;
}
fileCount =0;
foreach(lc_file, rel_data->files)
{
fileCount++;
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
for (int i = 0; i < rel_file->split_num; i++) {
int64 split_size = rel_file->splits[i].length;
Assert(rel_file->segno > 0);
int targethost = (rel_file->segno - 1)
% (assignment_context->virtual_segment_num);
rel_file->splits[i].host = targethost;
assignment_context->totalvols[targethost] += split_size;
assignment_context->split_num[targethost]++;
assignment_context->continue_split_num[targethost]++;
if (debug_print_split_alloc_result) {
elog(LOG, "file %d assigned to host %s",rel_file->segno,
idMap->hostname[targethost]);
}
}
assignment_context->total_split_num += rel_file->split_num;
}
uint64_t after_assigned_time = gettime_microsec();
int time_allocate_second_for = after_assigned_time - before_allocate_hash;
if( debug_fake_datalocality ){
fprintf(fp, "The time of allocate hash relation first for is : %d us.\n", time_allocate_hash_firstfor);
fprintf(fp, "The time of allocate hash relation is : %d us.\n", time_allocate_second_for);
}
return false;
}
static void allocate_random_relation(Relation_Data* rel_data,
Assignment_Log_Context* log_context, TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context) {
/*different from hash relation, allocation unit in random relation is block*/
/*first set max size per virtual segments.
*size can be exceeded by different strategy for big and small table
*/
double maxSizePerSegment = 0.0;
int64 maxExtendedSizePerSegment = set_maximum_segment_volume_parameter(rel_data,
assignment_context->virtual_segment_num, &maxSizePerSegment);
/* sort file based on the ratio of continue local read.
* and we will assign the block of the file with maximum continuity
*/
int fileCount = 0;
Oid myrelid = rel_data->relid;
Oid partition_parent_oid = rel_data->partition_parent_relid;
if(partition_parent_oid > 0){
PAIR pa = getHASHTABLENode(assignment_context->patition_parent_size_map,
TYPCONVERT(void *, partition_parent_oid));
int64* partitionParentAvgSize = (int64 *) (pa->Value);
assignment_context->avg_size_of_whole_partition_table = *partitionParentAvgSize;
if(debug_print_split_alloc_result){
elog(LOG, "partition table "INT64_FORMAT" of relation %u",
assignment_context->avg_size_of_whole_partition_table,partition_parent_oid);
}
}
uint64_t before_change_order = gettime_microsec();
Relation_File** file_vector = change_file_order_based_on_continuity(rel_data,
idMap, assignment_context->virtual_segment_num, &fileCount, assignment_context);
uint64_t after_change_order = gettime_microsec();
int change_file_order_time = after_change_order - before_change_order;
if ( debug_fake_datalocality ){
fprintf(fp, "The time of change_file_order_based_continuity is : %d us.\n", change_file_order_time);
}
/* put split into nonContinueLocalQueue when encounter non continue split
* put split into networkQueue when split need remote read
*/
List *networkQueue = NIL;
List *nonContinueLocalQueue = NIL;
int i=0;
//int j=0;
bool isExceedMaxSize = false;
bool isExceedPartitionTableSize = false;
bool isExceedWholeSize = false;
int* isBlockContinue = (int *) palloc(
sizeof(int) * assignment_context->virtual_segment_num);
/*find the insert node for each block*/
uint64_t before_run_find_insert_host = gettime_microsec();
int *hostOccurTimes = (int *) palloc(sizeof(int) * context->dds_context.size);
for (int fi = 0; fi < fileCount; fi++) {
Relation_File *rel_file = file_vector[fi];
/*for hash file whose bucket number doesn't equal to segment number*/
if (rel_file->hostIDs == NULL) {
rel_file->splits[0].host = 0;
continue;
}
MemSet(hostOccurTimes, 0, sizeof(int) * context->dds_context.size);
for (i = 0; i < rel_file->split_num; i++) {
Block_Host_Index *hostID = rel_file->hostIDs + i;
for (int l = 0; l < hostID->replica_num; l++) {
uint32_t key = hostID->hostIndex[l];
hostOccurTimes[key]++;
}
}
int maxOccurTime = -1;
int inserthost = -1;
int hostsWithSameOccurTimesExist = true;
for (int i = 0; i < context->dds_context.size; i++) {
if (hostOccurTimes[i] > maxOccurTime) {
maxOccurTime = hostOccurTimes[i];
inserthost = i;
hostsWithSameOccurTimesExist = false;
} else if (hostOccurTimes[i] == maxOccurTime) {
hostsWithSameOccurTimesExist = true;
}
}
/* currently we consider the insert hosts are the same for all the blocks in the same file.
* this logic can be changed in future, so we store the state in block level not file level
* if hostsWithSameOccurTimesExist we cannot determine which is insert host
* if maxOccurTime <2 we cannot determine which is insert host either*/
if (maxOccurTime < rel_file->split_num || maxOccurTime < 2 || hostsWithSameOccurTimesExist) {
inserthost = -1;
}
for (i = 0; i < rel_file->split_num; i++) {
Block_Host_Index *hostID = rel_file->hostIDs + i;
hostID->insertHost = inserthost;
}
}
pfree(hostOccurTimes);
uint64_t end_run_find_insert_host = gettime_microsec();
int run_find_insert_host = end_run_find_insert_host - before_run_find_insert_host;
if(debug_datalocality_time){
elog(LOG, "find insert host time: %d us. \n", run_find_insert_host);
}
/*three stage allocation algorithm*/
for (int fi = 0; fi < fileCount; fi++) {
Relation_File *rel_file = file_vector[fi];
/*for hash file whose bucket number doesn't equal to segment number*/
if (rel_file->hostIDs == NULL) {
rel_file->splits[0].host = 0;
assignment_context->total_split_num += 1;
continue;
}
bool isBlocksBegin = true;
bool isLocalContinueBlockFound = false;
ListCell *lc;
int beginIndex = 0;
for (i = 0; i < assignment_context->virtual_segment_num; i++) {
isBlockContinue[i] = 0;
}
/* we assign split(block) to host base on continuity
* the length of continue blocks of local host determines
* the final assignment (we prefer longer one).
*/
for (i = 0; i < rel_file->split_num; i++) {
int64 split_size = rel_file->splits[i].length;
int64 currentSequenceSize = 0;
for (int r = beginIndex; r <= i; r++) {
currentSequenceSize += rel_file->splits[r].length;
}
/* first block in one file doesn't need to consider continuity,
* but the following blocks must consider it.
*/
if (isBlocksBegin) {
Block_Host_Index *hostID = rel_file->hostIDs + i;
for (int l = 0; l < hostID->replica_num; l++) {
uint32_t key = hostID->hostIndex[l];
PAIR pair = getHASHTABLENode(assignment_context->vseg_to_splits_map,TYPCONVERT(void *,key));
if( pair == NULL)
{
continue;
}
List* val = (List*)(pair->Value);
foreach(lc, val)
{
int j = lfirst_int(lc);
isExceedMaxSize = currentSequenceSize + assignment_context->vols[j]
> maxExtendedSizePerSegment;
isExceedWholeSize = balance_on_whole_query_level
&& currentSequenceSize
+ assignment_context->totalvols_with_penalty[j]
> assignment_context->avg_size_of_whole_query;
int64** partitionvols_with_penalty=NULL;
if(partition_parent_oid > 0){
PAIR p = getHASHTABLENode(assignment_context->partitionvols_with_penalty_map,TYPCONVERT(void *,partition_parent_oid));
partitionvols_with_penalty = (int64 **)(&(p->Value));
isExceedPartitionTableSize = balance_on_partition_table_level && currentSequenceSize
+ (*partitionvols_with_penalty)[j]
> assignment_context->avg_size_of_whole_partition_table;
}else{
isExceedPartitionTableSize =false;
}
if ((!isExceedWholeSize
|| assignment_context->totalvols_with_penalty[j] == 0)
&& (!isExceedMaxSize
|| (i == beginIndex && assignment_context->vols[j] == 0))
&& (!isExceedPartitionTableSize || (*partitionvols_with_penalty)[j] ==0)) {
isBlockContinue[j]++;
isLocalContinueBlockFound = true;
isBlocksBegin = false;
}
}
}
} else {
Block_Host_Index *hostID = rel_file->hostIDs + i;
for (int l = 0; l < hostID->replica_num; l++) {
uint32_t key = hostID->hostIndex[l];
PAIR pair = getHASHTABLENode(assignment_context->vseg_to_splits_map,TYPCONVERT(void *,key));
if( pair == NULL)
{
continue;
}
List* val = (List*)(pair->Value);
foreach(lc, val)
{
int j = lfirst_int(lc);
isExceedMaxSize = currentSequenceSize + assignment_context->vols[j]
> maxExtendedSizePerSegment;
isExceedWholeSize = balance_on_whole_query_level
&& currentSequenceSize
+ assignment_context->totalvols_with_penalty[j]
> assignment_context->avg_size_of_whole_query;
int64** partitionvols_with_penalty = NULL;
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(
assignment_context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
partitionvols_with_penalty = (int64 **) (&(p->Value));
isExceedPartitionTableSize = balance_on_partition_table_level && currentSequenceSize
+ (*partitionvols_with_penalty)[j]
> assignment_context->avg_size_of_whole_partition_table;
} else {
isExceedPartitionTableSize = false;
}
if ((!isExceedWholeSize
|| assignment_context->totalvols_with_penalty[j] == 0)
&& (!isExceedMaxSize
|| (i == beginIndex && assignment_context->vols[j] == 0))
&& (!isExceedPartitionTableSize || (*partitionvols_with_penalty)[j] ==0)
&& isBlockContinue[j] == i - beginIndex) {
isBlockContinue[j]++;
isLocalContinueBlockFound = true;
}
}
}
}
/* no local & continue block found, then assign the former blocks.*/
if (!isLocalContinueBlockFound || i == rel_file->split_num - 1) {
int assignedVSeg = -1;
int maxBlockContinue = 0;
for (int k = 0; k < assignment_context->virtual_segment_num; k++) {
if (isBlockContinue[k] > maxBlockContinue) {
maxBlockContinue = isBlockContinue[k];
assignedVSeg = k;
} else if (isBlockContinue[k] > 0
&& isBlockContinue[k] == maxBlockContinue
&& assignment_context->vols[k]
< assignment_context->vols[assignedVSeg]*0.9) {
assignedVSeg = k;
}
}
/* if there is no local virtual segments to assign
* , then push this split to networkQueue
*/
if (assignedVSeg == -1) {
Split_Block *onesplit = makeNode(Split_Block);
onesplit->fileIndex = fi;
onesplit->splitIndex = i;
nonContinueLocalQueue = lappend(nonContinueLocalQueue, onesplit);
beginIndex = i + 1;
}
/* non continue block will be pushed into
*nonContinueLocalQueue and processed later.
*/
else if (maxBlockContinue == 1 /*&& (i != rel_file->split_num - 1)*/) {
Split_Block *onesplit = makeNode(Split_Block);
onesplit->fileIndex = fi;
if (!isLocalContinueBlockFound) {
onesplit->splitIndex = i - 1;
nonContinueLocalQueue = lappend(nonContinueLocalQueue, onesplit);
beginIndex = i;
i = i - 1;
} else {
onesplit->splitIndex = i;
nonContinueLocalQueue = lappend(nonContinueLocalQueue, onesplit);
}
} else {
/*split from beginIdex to i-1 should be assigned to assignedVSeg*/
for (int r = beginIndex; r < i; r++) {
assignment_context->vols[assignedVSeg] += rel_file->splits[r].length;
assignment_context->totalvols[assignedVSeg] += rel_file->splits[r].length;
assignment_context->totalvols_with_penalty[assignedVSeg] += rel_file->splits[r].length;
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(
assignment_context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
(*partitionvols_with_penalty)[assignedVSeg] +=
rel_file->splits[r].length;
p = getHASHTABLENode(assignment_context->partitionvols_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols = (int64 **) (&(p->Value));
(*partitionvols)[assignedVSeg] +=
rel_file->splits[r].length;
}
rel_file->splits[r].host = assignedVSeg;
if (debug_print_split_alloc_result) {
elog(LOG, "local2 split %d offset "INT64_FORMAT" of file %d is assigned to host %d",r,rel_file->splits[r].offset, rel_file->segno,assignedVSeg);
}
log_context->localDataSizePerRelation += rel_file->splits[r].length;
assignment_context->split_num[assignedVSeg]++;
assignment_context->continue_split_num[assignedVSeg]++;
}
if (!isLocalContinueBlockFound) {
beginIndex = i;
i = i - 1;
}
/* last split is continue*/
else {
log_context->localDataSizePerRelation += split_size;
assignment_context->vols[assignedVSeg] += split_size;
assignment_context->totalvols[assignedVSeg] += split_size;
assignment_context->totalvols_with_penalty[assignedVSeg] += split_size;
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(
assignment_context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
(*partitionvols_with_penalty)[assignedVSeg] += split_size;
p = getHASHTABLENode(assignment_context->partitionvols_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols = (int64 **) (&(p->Value));
(*partitionvols)[assignedVSeg] += split_size;
}
if (debug_print_split_alloc_result) {
elog(LOG, "local4 split %d offset "INT64_FORMAT" of file %d is assigned to host %d",i,rel_file->splits[i].offset, rel_file->segno,assignedVSeg);
}
rel_file->splits[i].host = assignedVSeg;
assignment_context->continue_split_num[assignedVSeg]++;
assignment_context->split_num[assignedVSeg]++;
}
}
for (int r = 0; r < assignment_context->virtual_segment_num; r++) {
isBlockContinue[r] = 0;
}
isBlocksBegin = true;
}
isLocalContinueBlockFound = false;
}
for (i = 0; i < rel_file->split_num; i++) {
int64 split_size = rel_file->splits[i].length;
log_context->totalDataSizePerRelation += split_size;
}
assignment_context->total_split_num += rel_file->split_num;
}
uint64_t after_continue_block = gettime_microsec();
int time_of_continue = after_continue_block - after_change_order;
if ( debug_fake_datalocality ){
fprintf(fp, "The time of allocate continue block is : %d us.\n", time_of_continue);
fprintf(fp, "The size of nonContinueLocalQueue is : %d .\n", list_length(nonContinueLocalQueue));
}
/*process non cotinue local queue*/
ListCell *file_split;
foreach(file_split, nonContinueLocalQueue)
{
Split_Block *onesplit = (Split_Block *) lfirst(file_split);
Relation_File *cur_file = file_vector[onesplit->fileIndex];
int64 cur_split_size = cur_file->splits[onesplit->splitIndex].length;
Block_Host_Index *hostID = cur_file->hostIDs + onesplit->splitIndex;
bool isLocality = false;
int assignedVSeg = select_random_host_algorithm(assignment_context,
cur_split_size, maxExtendedSizePerSegment, idMap,
&hostID, onesplit->fileIndex, partition_parent_oid, &isLocality);
if (isLocality) {
assignment_context->vols[assignedVSeg] += cur_split_size;
assignment_context->totalvols[assignedVSeg] += cur_split_size;
assignment_context->totalvols_with_penalty[assignedVSeg] += cur_split_size;
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(
assignment_context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
(*partitionvols_with_penalty)[assignedVSeg] += cur_split_size;
p = getHASHTABLENode(assignment_context->partitionvols_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols = (int64 **) (&(p->Value));
(*partitionvols)[assignedVSeg] += cur_split_size;
}
log_context->localDataSizePerRelation += cur_split_size;
if (debug_print_split_alloc_result) {
elog(LOG, "local1 split %d offset "INT64_FORMAT"of file %d is assigned to host %d",onesplit->splitIndex,cur_file->splits[onesplit->splitIndex].offset ,cur_file->segno,assignedVSeg);
}
bool isSplitOfFileExistInVseg = false;
for (int splitindex = 0; splitindex < cur_file->split_num; splitindex++) {
if (assignedVSeg == cur_file->splits[splitindex].host) {
isSplitOfFileExistInVseg = true;
}
}
if(!isSplitOfFileExistInVseg){
assignment_context->continue_split_num[assignedVSeg]++;
}
assignment_context->split_num[assignedVSeg]++;
cur_file->splits[onesplit->splitIndex].host = assignedVSeg;
} else {
Split_Block *networksplit = makeNode(Split_Block);
networksplit->fileIndex = onesplit->fileIndex;
networksplit->splitIndex = onesplit->splitIndex;
networkQueue = lappend(networkQueue, networksplit);
}
}
uint64_t after_noncontinue_block = gettime_microsec();
int time_of_noncontinue = after_noncontinue_block - after_continue_block;
if (debug_fake_datalocality) {
fprintf(fp,
"maxExtendedSizePerSegment is:"INT64_FORMAT", avg_size_of_whole_query is: %.3f",
maxExtendedSizePerSegment, assignment_context->avg_size_of_whole_query);
fprintf(fp, "The time of allocate non continue block is : %d us.\n", time_of_noncontinue);
fprintf(fp, "The size of networkQueue is : %d .\n", list_length(networkQueue));
}
/*process networkqueue*/
foreach(file_split, networkQueue)
{
Split_Block *onesplit = (Split_Block *) lfirst(file_split);
Relation_File *cur_file = file_vector[onesplit->fileIndex];
int64 cur_split_size = cur_file->splits[onesplit->splitIndex].length;
Block_Host_Index *hostID = cur_file->hostIDs + onesplit->splitIndex;
bool isLocality = false;
int assignedVSeg = select_random_host_algorithm(assignment_context,
cur_split_size, maxExtendedSizePerSegment, idMap,
&hostID, onesplit->fileIndex, partition_parent_oid, &isLocality);
if (isLocality) {
assignment_context->vols[assignedVSeg] += cur_split_size;
assignment_context->totalvols[assignedVSeg] += cur_split_size;
assignment_context->totalvols_with_penalty[assignedVSeg] += cur_split_size;
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(
assignment_context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
(*partitionvols_with_penalty)[assignedVSeg] += cur_split_size;
p = getHASHTABLENode(assignment_context->partitionvols_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols = (int64 **) (&(p->Value));
(*partitionvols)[assignedVSeg] += cur_split_size;
}
log_context->localDataSizePerRelation += cur_split_size;
if (debug_print_split_alloc_result) {
elog(LOG, "local10 split %d offset "INT64_FORMAT"of file %d is assigned to host %d",onesplit->splitIndex,cur_file->splits[onesplit->splitIndex].offset ,cur_file->segno,assignedVSeg);
}
} else {
int64 network_split_size = net_disk_ratio * cur_split_size;
double network_incre_size = (net_disk_ratio-1)*cur_split_size /assignment_context->virtual_segment_num;
if (datalocality_remedy_enable) {
int remedyVseg = remedy_non_localRead(onesplit->fileIndex,
onesplit->splitIndex, 0, file_vector, fileCount,
maxExtendedSizePerSegment, idMap, assignment_context);
if(remedyVseg != -1){
assignedVSeg = remedyVseg;
log_context->localDataSizePerRelation += cur_split_size;
network_split_size = cur_split_size;
network_incre_size =0;
}else{
cur_file->splits[onesplit->splitIndex].is_local_read =false;
}
}
assignment_context->vols[assignedVSeg] += network_split_size;
assignment_context->totalvols_with_penalty[assignedVSeg] +=
network_split_size;
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(
assignment_context->partitionvols_with_penalty_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
(*partitionvols_with_penalty)[assignedVSeg] += network_split_size;
p = getHASHTABLENode(assignment_context->partitionvols_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols = (int64 **) (&(p->Value));
(*partitionvols)[assignedVSeg] += cur_split_size;
}
assignment_context->totalvols[assignedVSeg] += cur_split_size;
maxExtendedSizePerSegment += network_incre_size;
assignment_context->avg_size_of_whole_query += network_incre_size;
if(partition_parent_oid > 0){
PAIR pa = getHASHTABLENode(assignment_context->patition_parent_size_map,
TYPCONVERT(void *, partition_parent_oid));
int64* partitionParentAvgSize = (int64 *) (pa->Value);
*partitionParentAvgSize += network_incre_size;
}
if (debug_print_split_alloc_result) {
elog(LOG, "non local10 split %d offset "INT64_FORMAT" of file %d is assigned to host %d",onesplit->splitIndex,cur_file->splits[onesplit->splitIndex].offset, cur_file->segno,assignedVSeg);
}
}
bool isSplitOfFileExistInVseg = false;
for (int splitindex = 0; splitindex < cur_file->split_num; splitindex++) {
if (assignedVSeg == cur_file->splits[splitindex].host) {
isSplitOfFileExistInVseg = true;
}
}
if (!isSplitOfFileExistInVseg) {
assignment_context->continue_split_num[assignedVSeg]++;
}
assignment_context->split_num[assignedVSeg]++;
cur_file->splits[onesplit->splitIndex].host = assignedVSeg;
}
uint64_t after_network_block = gettime_microsec();
int time_of_network = after_network_block- after_noncontinue_block;
if (debug_fake_datalocality) {
fprintf(fp,
"maxExtendedSizePerSegment is:"INT64_FORMAT", avg_size_of_whole_query is: %.3f",
maxExtendedSizePerSegment, assignment_context->avg_size_of_whole_query);
fprintf(fp, "The time of allocate non continue block is : %d us.\n", time_of_network);
}
if (debug_print_split_alloc_result) {
for (int j = 0; j < assignment_context->virtual_segment_num; j++) {
if (partition_parent_oid > 0) {
PAIR p = getHASHTABLENode(assignment_context->partitionvols_map,
TYPCONVERT(void *, partition_parent_oid));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
elog(LOG, "for partition parent table %u: sub partition table size of of vseg %d "
"is "INT64_FORMAT"",partition_parent_oid,j, (*partitionvols_with_penalty)[j]);
}
}
int64 maxvsSize = 0;
int64 minvsSize = INT64_MAX;
int64 totalMaxvsSize = 0;
int64 totalMinvsSize = INT64_MAX;
int64 totalMaxvsSizePenalty = 0;
int64 totalMinvsSizePenalty = INT64_MAX;
for (int j = 0; j < assignment_context->virtual_segment_num; j++) {
if (maxvsSize < assignment_context->vols[j]) {
maxvsSize = assignment_context->vols[j];
}
if (minvsSize > assignment_context->vols[j]) {
minvsSize = assignment_context->vols[j];
}
if (totalMaxvsSize < assignment_context->totalvols[j]) {
totalMaxvsSize = assignment_context->totalvols[j];
}
if (totalMinvsSize > assignment_context->totalvols[j]) {
totalMinvsSize = assignment_context->totalvols[j];
}
if (totalMaxvsSizePenalty < assignment_context->totalvols_with_penalty[j]) {
totalMaxvsSizePenalty = assignment_context->totalvols_with_penalty[j];
}
if (totalMinvsSizePenalty > assignment_context->totalvols_with_penalty[j]) {
totalMinvsSizePenalty = assignment_context->totalvols_with_penalty[j];
}
elog(LOG, "size with penalty of vs%d is "INT64_FORMAT"",j, assignment_context->vols[j]);
elog(LOG, "total size of vs%d is "INT64_FORMAT"",j, assignment_context->totalvols[j]);
elog(LOG, "total size with penalty of vs%d is "INT64_FORMAT"",j, assignment_context->totalvols_with_penalty[j]);
}
elog(LOG, "avg,max,min volume of segments are"
" %f,"INT64_FORMAT","INT64_FORMAT" for relation %u.",
maxSizePerSegment,maxvsSize,minvsSize,myrelid);
elog(LOG, "total max,min volume of segments are "INT64_FORMAT","INT64_FORMAT" for relation %u.",
totalMaxvsSize,totalMinvsSize,myrelid);
elog(LOG, "total max,min volume with penalty of segments are "INT64_FORMAT","INT64_FORMAT" for relation %u.",
totalMaxvsSizePenalty,totalMinvsSizePenalty,myrelid);
}
/*we don't need to free file_vector[i]*/
if (fileCount > 0) {
pfree(file_vector);
}
pfree(isBlockContinue);
}
/*
* remedy_non_localRead when there is a non local read block
*/
static int remedy_non_localRead(int fileIndex, int splitIndex, int parentPos,
Relation_File** file_vector, int fileCount, int64 maxExtendedSizePerSegment,
TargetSegmentIDMap* idMap,
Relation_Assignment_Context* assignment_context) {
Relation_File *cur_file = file_vector[fileIndex];
Block_Host_Index *hostID = cur_file->hostIDs + splitIndex;
for (int i = 0; i < fileCount; i++) {
Relation_File *former_file = file_vector[i];
for (int j = 0; j < former_file->split_num; j++) {
if (former_file->splits[j].host >= 0
&& idMap->global_IDs[former_file->splits[j].host]
== hostID->hostIndex[parentPos]) {
bool isDone = false;
int orivseg = former_file->splits[j].host;
int64 former_split_size = former_file->splits[j].length;
// after swap with current split, the vseg should not exceed its volume
bool isExceedMaxSizeCurSplit = cur_file->splits[splitIndex].length
- former_split_size + assignment_context->vols[orivseg]
> maxExtendedSizePerSegment;
bool isExceedWholeSizeCurSplit = balance_on_whole_query_level
&& cur_file->splits[splitIndex].length - former_split_size
+ assignment_context->totalvols_with_penalty[orivseg]
> assignment_context->avg_size_of_whole_query;
// the adjusted split should be the similar size with the current split
// and the adjusted split must also be a local read split
if (isExceedMaxSizeCurSplit || isExceedWholeSizeCurSplit
|| former_split_size/(double)cur_file->splits[splitIndex].length>1.1
|| former_split_size/(double)cur_file->splits[splitIndex].length<0.9
|| !former_file->splits[j].is_local_read) {
continue;
}
for (int p = 0; p < 3; p++) {
Block_Host_Index *formerHostID = former_file->hostIDs + j;
if (formerHostID->hostIndex[p]
== hostID->hostIndex[parentPos]) {
continue;
}
for (int v = 0; v < assignment_context->virtual_segment_num; v++) {
if (idMap->global_IDs[v] == formerHostID->hostIndex[p]) {
bool isExceedMaxSize = former_split_size
+ assignment_context->vols[v] > maxExtendedSizePerSegment;
bool isExceedWholeSize = balance_on_whole_query_level
&& former_split_size
+ assignment_context->totalvols_with_penalty[v]
> assignment_context->avg_size_of_whole_query;
if (!isExceedWholeSize&& !isExceedMaxSize){
//removesplit
assignment_context->split_num[orivseg]--;
assignment_context->vols[orivseg] -= former_split_size;
assignment_context->totalvols[orivseg] -= former_split_size;
assignment_context->totalvols_with_penalty[orivseg] -=
former_split_size;
//insertsplit
former_file->splits[j].host = v;
assignment_context->split_num[v]++;
// we simply treat adjusted split as continue one
assignment_context->continue_split_num[v]++;
assignment_context->vols[v] += former_split_size;
assignment_context->totalvols[v] += former_split_size;
assignment_context->totalvols_with_penalty[v] +=
former_split_size;
isDone = true;
break;
}
}
}
if (isDone) {
return orivseg;
}
}
}
}
}
return -1;
}
/*
* show data loaclity log for performance tuning
*/
static void print_datalocality_overall_log_information(SplitAllocResult *result, List *virtual_segments, int relationCount,
Assignment_Log_Context *log_context, Relation_Assignment_Context* assignment_context,
split_to_segment_mapping_context *context) {
/*init Assignment_Log_Context*/
//log_context->totalDataSize = 0.0;
//log_context->datalocalityRatio = 0.0;
log_context->maxSegmentNumofHost = 0;
log_context->minSegmentNumofHost = INT_MAX;
log_context->avgSegmentNumofHost = 0;
log_context->numofDifferentHost = 0;
log_context->avgSizeOverall = 0;
log_context->maxSizeSegmentOverall = 0;
log_context->minSizeSegmentOverall = INT64_MAX;
log_context->avgSizeOverallPenalty = 0;
log_context->maxSizeSegmentOverallPenalty = 0;
log_context->minSizeSegmentOverallPenalty = INT64_MAX;
log_context->avgContinuityOverall = 0;
log_context->maxContinuityOverall = 0;
log_context->minContinuityOverall = DBL_MAX;
if (relationCount > 0 && log_context->totalDataSize > 0) {
log_context->datalocalityRatio = log_context->datalocalityRatio / log_context->totalDataSize;
}
/*compute avgSegmentNumofHost log information for performance tuning*/
ListCell *lc;
foreach(lc, virtual_segments)
{
VirtualSegmentNode *vsn = (VirtualSegmentNode *) lfirst(lc);
HostDataVolumeInfo *hdvInfo = search_host_in_stat_context(context,
vsn->hostname);
if (hdvInfo->occur_count > 0) {
if (hdvInfo->occur_count > log_context->maxSegmentNumofHost) {
log_context->maxSegmentNumofHost = hdvInfo->occur_count;
}
if (hdvInfo->occur_count < log_context->minSegmentNumofHost) {
log_context->minSegmentNumofHost = hdvInfo->occur_count;
}
}
}
for (int k = 0; k < context->dds_context.max_size; k++) {
if (context->dds_context.volInfos[k].occur_count > 0) {
log_context->avgSegmentNumofHost +=
context->dds_context.volInfos[k].occur_count;
log_context->numofDifferentHost++;
}
}
if (log_context->numofDifferentHost > 0) {
log_context->avgSegmentNumofHost /= log_context->numofDifferentHost;
}
for (int j = 0; j < assignment_context->virtual_segment_num; j++) {
if (log_context->maxSizeSegmentOverall < assignment_context->totalvols[j]) {
log_context->maxSizeSegmentOverall = assignment_context->totalvols[j];
}
if (log_context->minSizeSegmentOverall > assignment_context->totalvols[j]) {
log_context->minSizeSegmentOverall = assignment_context->totalvols[j];
}
log_context->avgSizeOverall += assignment_context->totalvols[j];
if (log_context->maxSizeSegmentOverallPenalty < assignment_context->totalvols_with_penalty[j]) {
log_context->maxSizeSegmentOverallPenalty = assignment_context->totalvols_with_penalty[j];
}
if (log_context->minSizeSegmentOverallPenalty > assignment_context->totalvols_with_penalty[j]) {
log_context->minSizeSegmentOverallPenalty = assignment_context->totalvols_with_penalty[j];
}
log_context->avgSizeOverallPenalty += assignment_context->totalvols_with_penalty[j];
double cur_continue_value = 0.0;
if (assignment_context->split_num[j] > 1) {
cur_continue_value = assignment_context->continue_split_num[j]
/ (double) assignment_context->split_num[j];
if(cur_continue_value > 1){
cur_continue_value = 1;
}
} else if (assignment_context->split_num[j] == 1) {
/*if there is only one split, we also consider it as continue one;*/
cur_continue_value = 1;
}
if (log_context->maxContinuityOverall < cur_continue_value) {
log_context->maxContinuityOverall = cur_continue_value;
}
if (log_context->minContinuityOverall > cur_continue_value) {
log_context->minContinuityOverall = cur_continue_value;
}
log_context->avgContinuityOverall += cur_continue_value;
}
if (assignment_context->virtual_segment_num > 0) {
log_context->avgContinuityOverall /= assignment_context->virtual_segment_num;
log_context->avgSizeOverall /= assignment_context->virtual_segment_num;
log_context->avgSizeOverallPenalty /= assignment_context->virtual_segment_num;
}
/* print data locality result*/
elog(
DEBUG1, "data locality ratio: %.3f; virtual segment number: %d; "
"different host number: %d; virtual segment number per host(avg/min/max): (%d/%d/%d); "
"segment size(avg/min/max): (%.3f B/"INT64_FORMAT" B/"INT64_FORMAT" B); "
"segment size with penalty(avg/min/max): (%.3f B/"INT64_FORMAT" B/"INT64_FORMAT" B); continuity(avg/min/max): (%.3f/%.3f/%.3f)."
,log_context->datalocalityRatio,assignment_context->virtual_segment_num,log_context->numofDifferentHost,
log_context->avgSegmentNumofHost,log_context->minSegmentNumofHost,log_context->maxSegmentNumofHost,
log_context->avgSizeOverall,log_context->minSizeSegmentOverall,log_context->maxSizeSegmentOverall,
log_context->avgSizeOverallPenalty,log_context->minSizeSegmentOverallPenalty,log_context->maxSizeSegmentOverallPenalty,
log_context->avgContinuityOverall,log_context->minContinuityOverall,log_context->maxContinuityOverall
);
appendStringInfo(result->datalocalityInfo, "data locality ratio: %.3f; virtual segment number: %d; "
"different host number: %d; virtual segment number per host(avg/min/max): (%d/%d/%d); "
"segment size(avg/min/max): (%.3f B/"INT64_FORMAT" B/"INT64_FORMAT" B); "
"segment size with penalty(avg/min/max): (%.3f B/"INT64_FORMAT" B/"INT64_FORMAT" B); continuity(avg/min/max): (%.3f/%.3f/%.3f); "
,log_context->datalocalityRatio,assignment_context->virtual_segment_num,log_context->numofDifferentHost,
log_context->avgSegmentNumofHost,log_context->minSegmentNumofHost,log_context->maxSegmentNumofHost,
log_context->avgSizeOverall,log_context->minSizeSegmentOverall,log_context->maxSizeSegmentOverall,
log_context->avgSizeOverallPenalty,log_context->minSizeSegmentOverallPenalty,log_context->maxSizeSegmentOverallPenalty,
log_context->avgContinuityOverall,log_context->minContinuityOverall,log_context->maxContinuityOverall);
if (debug_fake_datalocality) {
fprintf(fp, "datalocality ratio: %.3f; virtual segments number: %d, "
"different host number: %d, segment number per host(avg/min/max): (%d/%d/%d); "
"segments size(avg/min/max): (%.3f/"INT64_FORMAT"/"INT64_FORMAT"); "
"segments size with penalty(avg/min/max): (%.3f/"INT64_FORMAT"/"INT64_FORMAT"); continuity(avg/min/max): (%.3f/%.3f/%.3f)."
,log_context->datalocalityRatio,assignment_context->virtual_segment_num,log_context->numofDifferentHost,
log_context->avgSegmentNumofHost,log_context->minSegmentNumofHost,log_context->maxSegmentNumofHost,
log_context->avgSizeOverall,log_context->minSizeSegmentOverall,log_context->maxSizeSegmentOverall,
log_context->avgSizeOverallPenalty,log_context->minSizeSegmentOverallPenalty,log_context->maxSizeSegmentOverallPenalty,
log_context->avgContinuityOverall,log_context->minContinuityOverall,log_context->maxContinuityOverall
);
fpratio = fopen("/tmp/datalocality_ratio", "w+");
fprintf(fpratio, "datalocality_ratio=%.3f\n",log_context->datalocalityRatio);
fprintf(fpratio, "virtual_segments_number=%d\n",assignment_context->virtual_segment_num);
if(log_context->minSegmentNumofHost > 0 ){
fprintf(fpratio, "segmentnumber_perhost_max/min=%.2f\n", (double)(log_context->maxSegmentNumofHost / log_context->minSegmentNumofHost));
}else{
fprintf(fpratio, "segmentnumber_perhost_max/min=" INT64_FORMAT "\n", INT64_MAX);
}
if(log_context->avgSegmentNumofHost > 0 ){
fprintf(fpratio, "segmentnumber_perhost_max/avg=%.2f\n", (double)(log_context->maxSegmentNumofHost / log_context->avgSegmentNumofHost));
}else{
fprintf(fpratio, "segmentnumber_perhost_max/avg=" INT64_FORMAT "\n", INT64_MAX);
}
if (log_context->minSizeSegmentOverall > 0){
fprintf(fpratio, "segments_size_max/min=%.5f\n", (double)log_context->maxSizeSegmentOverall / (double)log_context->minSizeSegmentOverall);
}else{
fprintf(fpratio, "segments_size_max/min=" INT64_FORMAT "\n", INT64_MAX);
}
if (log_context->avgSizeOverall > 0){
fprintf(fpratio, "segments_size_max/avg=%.5f\n", log_context->maxSizeSegmentOverall / log_context->avgSizeOverall);
}else{
fprintf(fpratio, "segments_size_max/avg=" INT64_FORMAT "\n", INT64_MAX);
}
if (log_context->minSizeSegmentOverallPenalty > 0){
fprintf(fpratio, "segments_size_penalty_max/min=%.5f\n",(double)log_context->maxSizeSegmentOverallPenalty / (double)log_context->minSizeSegmentOverallPenalty);
}else{
fprintf(fpratio, "segments_size_penalty_max/min=" INT64_FORMAT "\n", INT64_MAX);
}
if (log_context->avgSizeOverallPenalty > 0){
fprintf(fpratio, "segments_size_penalty_max/avg=%.5f\n",log_context->maxSizeSegmentOverallPenalty / log_context->avgSizeOverallPenalty);
}else{
fprintf(fpratio, "segments_size_penalty_max/avg=" INT64_FORMAT "\n", INT64_MAX);
}
if (log_context->minContinuityOverall > 0){
fprintf(fpratio, "continuity_max/min=%.5f\n",log_context->maxContinuityOverall / log_context->minContinuityOverall);
}else{
fprintf(fpratio, "continuity_max/min=" INT64_FORMAT "\n", INT64_MAX);
}
if (log_context->avgContinuityOverall > 0){
fprintf(fpratio, "continuity_max/avg=%.5f\n",log_context->maxContinuityOverall / log_context->avgContinuityOverall);
}else{
fprintf(fpratio, "continuity_max/avg=" INT64_FORMAT "\n", INT64_MAX);
}
fflush(fpratio);
fclose(fpratio);
fpratio = NULL;
}
}
/*
* check whether a relation is hash
*/
static bool is_relation_hash(GpPolicy *targetPolicy) {
if (targetPolicy->nattrs == 0) {
return false;
} else {
return true;
}
}
/*
* caculate_per_relation_data_locality_result
*/
static void caculate_per_relation_data_locality_result(Relation_Data* rel_data,
Assignment_Log_Context* log_context,
Relation_Assignment_Context* assignment_context) {
double datalocalityRatioPerRelation = 0.0;
/* we need to log data locality of every relation*/
if (log_context->totalDataSizePerRelation > 0) {
datalocalityRatioPerRelation =
(double) log_context->localDataSizePerRelation
/ log_context->totalDataSizePerRelation;
}
log_context->datalocalityRatio +=
(double) log_context->localDataSizePerRelation;
if (debug_print_split_alloc_result) {
elog(
LOG, "datalocality relation:%u relation ratio: %f with %d virtual segments",
rel_data->relid,datalocalityRatioPerRelation,assignment_context->virtual_segment_num);
}
if (debug_fake_datalocality) {
fprintf(fp, "datalocality relation %u ratio is:%f \n", rel_data->relid,
datalocalityRatioPerRelation);
}
log_context->totalDataSize += log_context->totalDataSizePerRelation;
}
/*
* combine all splits to Detailed_File_Split
*/
static void combine_all_splits(Detailed_File_Split **splits,
Relation_Assignment_Context* assignment_context, TargetSegmentIDMap* idMap,
Assignment_Log_Context* log_context,
split_to_segment_mapping_context* context) {
ListCell *lc;
*splits = (Detailed_File_Split *) palloc(
sizeof(Detailed_File_Split) * assignment_context->total_split_num);
int total_split_index = 0;
bool nonLocalExist = false;
int64 splitTotalLength = 0;
/* go through all splits again. combine all splits to Detailed_File_Split structure*/
foreach(lc, context->chsl_context.relations)
{
ListCell *lc_file;
Relation_Data *rel_data = (Relation_Data *) lfirst(lc);
bool isRelationHash = true;
GpPolicy *targetPolicy = NULL;
Oid myrelid = rel_data->relid;
targetPolicy = GpPolicyFetch(CurrentMemoryContext, myrelid);
if (targetPolicy->nattrs == 0) {
isRelationHash = false;
}
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
for (int i = 0; i < rel_file->split_num; i++) {
/* fake data locality */
char *p = NULL;
if (debug_fake_datalocality) {
bool isLocalRead = false;
int localCount = 0;
if (isRelationHash && context->keep_hash
&& assignment_context->virtual_segment_num
== targetPolicy->bucketnum) {
for (int p = 0; p < rel_file->block_num; p++) {
Block_Host_Index *hostID = rel_file->hostIDs + p;
for (int l = 0; l < hostID->replica_num; l++) {
if (hostID->hostIndex[l]
== idMap->global_IDs[rel_file->splits[i].host]) {
localCount++;
break;
}
}
}
} else {/*if random*/
Block_Host_Index *hostID = rel_file->hostIDs + i;
for (int l = 0; l < hostID->replica_num; l++) {
if (hostID->hostIndex[l]
== idMap->global_IDs[rel_file->splits[i].host]) {
isLocalRead = true;
break;
}
}
}
if (localCount == rel_file->block_num || isLocalRead) {
fprintf(fp,
"split %d of file %d of relation %d is assigned to virtual segment No%d: Local Read .\n",
i, rel_file->segno, rel_data->relid, rel_file->splits[i].host);
} else if (localCount == 0 && !isLocalRead) {
fprintf(fp,
"split %d of file %d of relation %d is assigned to virtual segment No%d: Remote Read .\n",
i, rel_file->segno, rel_data->relid, rel_file->splits[i].host);
} else {
fprintf(fp,
"split %d of file %d of relation %d is assigned to virtual segment No%d: Local Read Ratio : %d / %d \n",
i, rel_file->segno, rel_data->relid, rel_file->splits[i].host,
localCount, rel_file->block_num);
}
}
/*double check when datalocality is 1.0*/
if (log_context->datalocalityRatio == 1.0 && !nonLocalExist) {
bool isLocal = false;
Block_Host_Index *hostID = rel_file->hostIDs + i;
if (hostID && rel_file->splits[i].host >= 0
&& rel_file->splits[i].host < idMap->target_segment_num) {
for (int l = 0; l < hostID->replica_num; l++) {
if (hostID->hostIndex[l]
== idMap->global_IDs[rel_file->splits[i].host]) {
isLocal = true;
break;
}
}
if (!isLocal) {
nonLocalExist = true;
elog(
LOG, "datalocality is not 1.0 when split%d of file %d.",i,rel_file->segno);
}
}
}
(*splits)[total_split_index].rel_oid = rel_data->relid;
(*splits)[total_split_index].segno = rel_file->segno;
(*splits)[total_split_index].index = 1;
(*splits)[total_split_index].host = rel_file->splits[i].host;
if ((*splits)[total_split_index].host == -1) {
(*splits)[total_split_index].host = 0;
}
(*splits)[total_split_index].offset = rel_file->splits[i].offset;
(*splits)[total_split_index].length = rel_file->splits[i].length;
(*splits)[total_split_index].logiceof = rel_file->logic_len;
p = rel_file->splits[i].ext_file_uri;
(*splits)[total_split_index].ext_file_uri_string = p ? pstrdup(p) : (char *) NULL;
total_split_index++;
splitTotalLength += rel_file->splits[i].length;
}
}
}
if(context->total_metadata_logic_len != splitTotalLength){
elog(ERROR, "total split length does not equal to metadata total logic length!");
}
}
/*
* The driver of the allocation algorithm.
*/
static List *
run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, QueryResource ** resourcePtr,
split_to_segment_mapping_context *context) {
uint64_t before_run_allocation = 0;
before_run_allocation = gettime_microsec();
List *alloc_result = NIL;
ListCell *lc;
TargetSegmentIDMap idMap;
Relation_Assignment_Context assignment_context;
Assignment_Log_Context log_context;
Split_Assignment_Result split_assign_result;
Relation_Data** rel_data_vector = NULL;
int relationCount = 0;
MemoryContextSwitchTo(context->datalocality_memorycontext);
/*before assign splits to virtual segments, we index virtual segments in different hosts to different hash value*/
allocation_preparation(virtual_segments, &idMap, &assignment_context, context);
log_context.totalDataSize = 0.0;
log_context.datalocalityRatio = 0.0;
/*sort relations by size.*/
relationCount = list_length(context->chsl_context.relations);
if (relationCount > 0) {
rel_data_vector = (Relation_Data**) palloc(
sizeof(Relation_Data*) * relationCount);
int i = 0;
foreach(lc, context->chsl_context.relations)
{
rel_data_vector[i++] = (Relation_Data *) lfirst(lc);
}
qsort(rel_data_vector, relationCount, sizeof(Relation_Data*),
compare_relation_size);
}
assignment_context.patition_parent_size_map = createHASHTABLE(
context->datalocality_memorycontext, 16,
HASHTABLE_SLOT_VOLUME_DEFAULT_MAX, HASHTABLE_KEYTYPE_UINT32,
NULL);
assignment_context.partitionvols_with_penalty_map = createHASHTABLE(
context->datalocality_memorycontext, 16,
HASHTABLE_SLOT_VOLUME_DEFAULT_MAX, HASHTABLE_KEYTYPE_UINT32,
NULL);
assignment_context.partitionvols_map = createHASHTABLE(
context->datalocality_memorycontext, 16,
HASHTABLE_SLOT_VOLUME_DEFAULT_MAX, HASHTABLE_KEYTYPE_UINT32,
NULL);
Relation inhrel;
inhrel = heap_open(InheritsRelationId, AccessShareLock);
cqContext *pcqCtx;
cqContext cqc;
HeapTuple inhtup;
/*calculate average size per vseg for all all the relation in a query
* and initialize the patition_parent_size_map*/
for (int relIndex = 0; relIndex < relationCount; relIndex++) {
Relation_Data *rel_data = rel_data_vector[relIndex];
pcqCtx = caql_beginscan(
caql_addrel(cqclr(&cqc), inhrel), cql("SELECT * FROM pg_inherits "
" WHERE inhrelid = :1 ", ObjectIdGetDatum(rel_data->relid)));
while (HeapTupleIsValid(inhtup = caql_getnext(pcqCtx))) {
int64 block_count=0;
if (rel_data->total_size == 0 && rel_data->files != NULL) {
rel_data->files = NULL;
}
ListCell* lc_file;
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
block_count += rel_file->split_num;
}
rel_data->block_count = block_count;
Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inhtup);
Oid inhparent = inh->inhparent;
rel_data->partition_parent_relid = inhparent;
uint32_t key = (uint32_t) inhparent;
if (getHASHTABLENode(assignment_context.patition_parent_size_map,
TYPCONVERT(void *, key)) == NULL) {
int64 size=0;
setHASHTABLENode(assignment_context.patition_parent_size_map,
TYPCONVERT(void *, key),&size , false);
}
PAIR p = getHASHTABLENode(assignment_context.patition_parent_size_map,
TYPCONVERT(void *, key));
int64* val = (int64 *) (p->Value);
*val += (int64) (rel_data->total_size * 1.05
/ (double) assignment_context.virtual_segment_num);
if (getHASHTABLENode(assignment_context.partitionvols_with_penalty_map,
TYPCONVERT(void *, key)) == NULL) {
setHASHTABLENode(assignment_context.partitionvols_with_penalty_map,
TYPCONVERT(void *, key), NIL, false);
p = getHASHTABLENode(assignment_context.partitionvols_with_penalty_map,
TYPCONVERT(void *, key));
int64** partitionvols_with_penalty = (int64 **) (&(p->Value));
*partitionvols_with_penalty = (int64 *) palloc(
sizeof(int64) * assignment_context.virtual_segment_num);
MemSet(*partitionvols_with_penalty, 0,
sizeof(int64) * assignment_context.virtual_segment_num);
}
if (getHASHTABLENode(assignment_context.partitionvols_map,
TYPCONVERT(void *, key)) == NULL) {
setHASHTABLENode(assignment_context.partitionvols_map,
TYPCONVERT(void *, key), NIL, false);
p = getHASHTABLENode(assignment_context.partitionvols_map,
TYPCONVERT(void *, key));
int64** partitionvols = (int64 **) (&(p->Value));
*partitionvols = (int64 *) palloc(
sizeof(int64) * assignment_context.virtual_segment_num);
MemSet(*partitionvols, 0,
sizeof(int64) * assignment_context.virtual_segment_num);
}
break;
}
caql_endscan(pcqCtx);
assignment_context.avg_size_of_whole_query += rel_data->total_size;
}
heap_close(inhrel, AccessShareLock);
assignment_context.avg_size_of_whole_query /= (double) assignment_context.virtual_segment_num;
/* vseg can do more than 1.05 times workload than average.*/
assignment_context.avg_size_of_whole_query *= 1.05;
if (debug_print_split_alloc_result) {
elog(LOG, "avg_size_of_whole_query is:%f",assignment_context.avg_size_of_whole_query);
}
int allocate_hash_or_random_time = 0;
bool vSegOrderChanged = false;
List* parentRelsType =NULL;
for (int relIndex = 0; relIndex < relationCount; relIndex++) {
log_context.localDataSizePerRelation = 0;
log_context.totalDataSizePerRelation = 0;
Relation_Data *rel_data = rel_data_vector[relIndex];
/*empty file need to be skipped*/
if (rel_data->total_size == 0 && rel_data->files != NULL) {
rel_data->files = NULL;
}
/* for each relation, set context.vols to zero */
MemSet(assignment_context.vols, 0,
sizeof(int64) * assignment_context.virtual_segment_num);
/* the whole query context maybe keephash but some relation is random
* in this case, we set isRelationHash=false when relation is random.
*/
Oid myrelid = rel_data->relid;
GpPolicy *targetPolicy = NULL;
targetPolicy = GpPolicyFetch(CurrentMemoryContext, myrelid);
bool isRelationHash = is_relation_hash(targetPolicy);
int fileCountInRelation = list_length(rel_data->files);
bool FileCountBucketNumMismatch = false;
if (targetPolicy->bucketnum > 0) {
Relation rel = heap_open(rel_data->relid, NoLock);
targetPolicy->bucketnum == 0 ? false : true;
if (!RelationIsExternal(rel))
{
FileCountBucketNumMismatch = fileCountInRelation %
targetPolicy->bucketnum == 0 ? false : true;
}
else
{
ListCell *lc_file;
int maxsegno = 0;
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
if (rel_file->segno > maxsegno)
maxsegno = rel_file->segno;
}
FileCountBucketNumMismatch =
maxsegno > targetPolicy->bucketnum ? true : false;
}
heap_close(rel, NoLock);
}
if (isRelationHash && FileCountBucketNumMismatch && !allow_file_count_bucket_num_mismatch) {
elog(ERROR, "file count %d in catalog is not in proportion to the bucket "
"number %d of hash table with oid=%u, some data may be lost, if you "
"still want to continue the query by considering the table as random, set GUC "
"allow_file_count_bucket_num_mismatch to on and try again.",
fileCountInRelation, targetPolicy->bucketnum, myrelid);
}
/* change the virtual segment order when keep hash.
* order of idMap should also be changed.
* if file count of the table is not equal to or multiple of
* bucket number, we should process it as random table.
*/
if (isRelationHash && context->keep_hash
&& assignment_context.virtual_segment_num == targetPolicy->bucketnum
&& !vSegOrderChanged && !FileCountBucketNumMismatch) {
change_hash_virtual_segments_order(resourcePtr, rel_data,
&assignment_context, &idMap);
for (int p = 0; p < idMap.target_segment_num; p++) {
if (debug_fake_datalocality) {
fprintf(fp, "After resort virtual segment No%d: %s\n", p,
idMap.hostname[p]);
}
}
vSegOrderChanged = true;
}
if (debug_print_split_alloc_result) {
for (int w = 0; w < idMap.target_segment_num; w++) {
elog(LOG, "After resort datalocality using segment No%d hostname: %s,hostnameid: %d"
,w,idMap.hostname[w],idMap.global_IDs[w]);
}
}
assignment_context.block_lessthan_vseg_round_robin_no =-1;
if(rel_data->block_count > 0 && rel_data->block_count < assignment_context.virtual_segment_num){
assignment_context.block_lessthan_vseg_round_robin_no = 0 ;
}
uint64_t before_run_allocate_hash_or_random = gettime_microsec();
/*allocate hash relation*/
if (isRelationHash) {
/*
* if file count of the table is not equal to or multiple of
* bucket number, we should process it as random table.
*/
if (context->keep_hash
&& assignment_context.virtual_segment_num== targetPolicy->bucketnum
&& !FileCountBucketNumMismatch) {
ListCell* parlc;
bool parentIsHashExist=false;
bool parentIsHash =false;
/*check whether relation is partition table and need to be checked as random relation*/
if (parentRelsType != NULL) {
foreach(parlc, parentRelsType)
{
CurrentRelType* prtype = (CurrentRelType *) lfirst(parlc);
if(prtype->relid == rel_data->partition_parent_relid || prtype->relid == rel_data->relid){
parentIsHashExist=true;
parentIsHash = prtype->isHash;
}
}
}
bool needToChangeHash2Random = false;
needToChangeHash2Random = allocate_hash_relation(rel_data,
&log_context, &idMap, &assignment_context, context, parentIsHashExist,parentIsHash);
if (!parentIsHashExist) {
/*for partition table, whether to convert from hash to random is determined by the first partition.
* it doesn't need by planner, so it doesn't need to be in global memory context*/
CurrentRelType* parentRelType = (CurrentRelType *) palloc(
sizeof(CurrentRelType));
parentRelType->relid = rel_data->partition_parent_relid;
parentRelType->isHash = !needToChangeHash2Random;
parentRelsType = lappend(parentRelsType, parentRelType);
}
MemoryContext cur_memorycontext;
cur_memorycontext = MemoryContextSwitchTo(context->old_memorycontext);
CurrentRelType* relType = (CurrentRelType *) palloc(sizeof(CurrentRelType));
relType->relid = rel_data->relid;
if (needToChangeHash2Random) {
relType->isHash = false;
} else {
relType->isHash = true;
}
result->relsType = lappend(result->relsType, relType);
MemoryContextSwitchTo(cur_memorycontext);
if (needToChangeHash2Random) {
allocate_random_relation(rel_data, &log_context, &idMap, &assignment_context, context);
}
}
/*allocate hash relation as a random relation*/
else{
MemoryContext cur_memorycontext;
cur_memorycontext = MemoryContextSwitchTo(context->old_memorycontext);
CurrentRelType* relType = (CurrentRelType *) palloc(
sizeof(CurrentRelType));
relType->relid = rel_data->relid;
relType->isHash = false;
result->relsType = lappend(result->relsType, relType);
MemoryContextSwitchTo(cur_memorycontext);
allocate_random_relation(rel_data, &log_context,&idMap, &assignment_context, context);
}
}
/*allocate random relation*/
else {
allocate_random_relation(rel_data, &log_context,&idMap, &assignment_context, context);
}
uint64_t after_run_allocate_hash_or_random = gettime_microsec();
allocate_hash_or_random_time = after_run_allocate_hash_or_random - before_run_allocate_hash_or_random;
caculate_per_relation_data_locality_result(rel_data, &log_context,&assignment_context);
}
print_datalocality_overall_log_information(result,virtual_segments, relationCount,
&log_context, &assignment_context, context);
if (relationCount > 0) {
pfree(rel_data_vector);
}
/* go through all splits again. combine all splits to Detailed_File_Split structure*/
Detailed_File_Split *splits =NULL;
combine_all_splits(&splits, &assignment_context, &idMap, &log_context,
context);
uint64_t after_run_allocation = 0;
after_run_allocation = gettime_microsec();
int eclaspeTime = after_run_allocation - before_run_allocation;
if (debug_fake_datalocality) {
fprintf(fp, "datalocality ratio is:%f\n", log_context.datalocalityRatio);
fprintf(fp, "The time of run_allocation_algorithm is : %d us. \n", eclaspeTime);
fprintf(fp, "The time of run_allocate_hash_or_random is : %d us. \n", allocate_hash_or_random_time);
fflush(fp);
fclose(fp);
fp = NULL;
elog(ERROR, "Abort fake data locality!");
}
/*
* sort all the splits.
*/
qsort(splits, assignment_context.total_split_num, sizeof(Detailed_File_Split),
compare_detailed_file_split);
init_split_assignment_result(&split_assign_result,
assignment_context.virtual_segment_num);
assign_splits_to_hosts(&split_assign_result, splits, assignment_context.total_split_num);
MemoryContextSwitchTo(context->old_memorycontext);
alloc_result = post_process_assign_result(&split_assign_result);
uint64_t run_datalocality = 0;
run_datalocality = gettime_microsec();
int dl_overall_time = run_datalocality - before_run_allocation;
context->cal_datalocality_time_us = dl_overall_time;
if(debug_datalocality_time){
elog(LOG, "datalocality overall execution time: %d us. \n", dl_overall_time);
}
result->datalocalityTime = (double)(context->metadata_cache_time_us + context->alloc_resource_time_us + context->cal_datalocality_time_us)/ 1000;
appendStringInfo(result->datalocalityInfo, "DFS metadatacache: %.3f ms; resource allocation: %.3f ms; datalocality calculation: %.3f ms.",
(double)context->metadata_cache_time_us/1000, (double)context->alloc_resource_time_us/1000, (double)context->cal_datalocality_time_us/1000);
return alloc_result;
}
/*
* cleanup_allocation_algorithm: free all the resources
* used during the allocation algorithm.
*/
static void cleanup_allocation_algorithm(
split_to_segment_mapping_context *context) {
ListCell *lc;
foreach(lc, context->chsl_context.relations)
{
Relation_Data *rel_data = (Relation_Data *) lfirst(lc);
if ((rel_data->type == DATALOCALITY_APPENDONLY)
|| (rel_data->type == DATALOCALITY_PARQUET)) {
ListCell *lc_file;
foreach(lc_file, rel_data->files)
{
Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
if (rel_file->locations != NULL) {
free_hdfs_data_block_location(rel_file->locations,
rel_file->block_num);
}
}
}
}
if(DataLocalityMemoryContext){
MemoryContextResetAndDeleteChildren(DataLocalityMemoryContext);
}
return;
}
/*
* udf_collector_walker: the routine to file udfs.
*/
bool udf_collector_walker(Node *node,
udf_collector_context *context) {
if (node == NULL) {
return false;
}
if (IsA(node, Query)) {
return query_tree_walker((Query *) node, udf_collector_walker,
(void *) context,
QTW_EXAMINE_RTES);
}
/*For Aggref, we don't consider it as udf.*/
if(IsA(node,FuncExpr)){
if(!IsBuildInFunction(((FuncExpr *) node)->funcid)){
context->udf_exist = true;
}
return false;
}
return expression_tree_walker(node, udf_collector_walker,
(void *) context);
return false;
}
/*
* find_udf: collect all udf, and store them into the udf_collector_context.
*/
void find_udf(Query *query, udf_collector_context *context) {
query_tree_walker(query, udf_collector_walker, (void *) context,
QTW_EXAMINE_RTES);
return;
}
/*
* calculate_planner_segment_num
* fixedVsegNum is used by PBE, since all the execute should use the same number of vsegs.
*/
SplitAllocResult *
calculate_planner_segment_num(PlannedStmt *plannedstmt, Query *query,
QueryResourceLife resourceLife, int fixedVsegNum) {
SplitAllocResult *result = NULL;
QueryResource *resource = NULL;
List *virtual_segments = NIL;
List *alloc_result = NIL;
Node *planTree = plannedstmt->planTree;
GpPolicy *intoPolicy = plannedstmt->intoPolicy;
int sliceNum = plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1;
split_to_segment_mapping_context context;
context.chsl_context.relations = NIL;
int planner_segments = 0; /*virtual segments number for explain statement */
result = (SplitAllocResult *) palloc(sizeof(SplitAllocResult));
result->resource = NULL;
result->alloc_results = NIL;
result->relsType = NIL;
result->planner_segments = 0;
result->datalocalityInfo = makeStringInfo();
result->datalocalityTime = 0;
/* fake data locality */
if (debug_fake_datalocality) {
fp = fopen("/tmp/cdbdatalocality.result", "w+");
if (fp == NULL) {
elog(ERROR, "Could not open file!");
return result;
}
}
if (Gp_role != GP_ROLE_DISPATCH) {
result->resource = NULL;
result->alloc_results = NIL;
result->relsType = NIL;
result->planner_segments = 0;
return result;
}
PG_TRY();
{
init_datalocality_memory_context();
init_datalocality_context(plannedstmt, &context);
collect_range_tables(query, &(context.rtc_context));
collect_scan_rangetable(planTree, &(context.srtc_context));
bool isTableFunctionExists = false;
/*
* the number of virtual segments is determined by 5 factors:
* 1 bucket number of external table
* 2 whether function exists
* 3 bucket number of hash result relation
* 4 bucket number of hash "from" relation
* 5 data size of random "from" relation
*/
udf_collector_context udf_context;
udf_context.udf_exist = false;
find_udf(query, &udf_context);
isTableFunctionExists = udf_context.udf_exist;
/*convert range table list to oid list and check whether table function exists
*we keep a full range table list and a range table list without result relation separately
*/
convert_range_tables_to_oids_and_check_table_functions(
&(context.rtc_context.full_range_tables), &isTableFunctionExists,
context.datalocality_memorycontext);
convert_range_tables_to_oids_and_check_table_functions(
&(context.rtc_context.range_tables), &isTableFunctionExists,
context.datalocality_memorycontext);
/* set expected virtual segment number for hash table and external table*/
/* calculate hashSegNum, externTableSegNum, resultRelationHashSegNum */
check_keep_hash_and_external_table(&context, query, intoPolicy);
/*Table Function VSeg Number = default_segment_number(configured in GUC) if table function exists or gpfdist exists,
*0 Otherwise.
*/
if (isTableFunctionExists) {
context.tableFuncSegNum = GetUserDefinedFunctionVsegNum();
}
/* get block location and calculate relation size*/
get_block_locations_and_claculte_table_size(&context);
if(context.chsl_context.relations){
Relation_Data* tmp = (Relation_Data*) lfirst(context.chsl_context.relations->tail);
}
/*use inherit resource*/
if (resourceLife == QRL_INHERIT) {
if ( SPI_IsInPrepare() && (GetActiveQueryResource() == NULL) )
{
resource = NULL;
}
else
{
resource = AllocateResource(resourceLife, sliceNum, 0, 0, 0, NULL, 0);
}
if (resource != NULL) {
if ((context.keep_hash)
&& (list_length(resource->segments) != context.hashSegNum)) {
context.keep_hash = false;
}
}
}
/*allocate new resource*/
if (((resourceLife == QRL_INHERIT) && (resource == NULL))
|| (resourceLife == QRL_ONCE) || (resourceLife == QRL_NONE)) {
/*generate hostname-volume pair to help RM to choose a host with
*maximum data locality(only when the vseg number less than host number)
*/
if(enable_prefer_list_to_rm){
context.host_context.size = context.dds_context.size;
MemoryContextSwitchTo(context.datalocality_memorycontext);
context.host_context.hostnameVolInfos = (HostnameVolumeInfo *) palloc(
sizeof(HostnameVolumeInfo) * context.host_context.size);
for (int i = 0; i < context.host_context.size; i++) {
MemSet(&(context.host_context.hostnameVolInfos[i].hostname), 0,
HOSTNAME_MAX_LENGTH);
strncpy(context.host_context.hostnameVolInfos[i].hostname,
context.dds_context.volInfos[i].hashEntry->key.hostname,
HOSTNAME_MAX_LENGTH-1);
context.host_context.hostnameVolInfos[i].datavolume = context.dds_context.volInfos[i].datavolume;
}
MemoryContextSwitchTo(context.old_memorycontext);
}else{
context.host_context.size = 0;
context.host_context.hostnameVolInfos = NULL;
}
/* determine the random table segment number by the following 4 steps*/
/* Step1 we expect one split(block) processed by one virtual segment*/
context.randomSegNum = context.total_split_count;
/* Step2 combine segment when splits are with small size*/
int64 min_split_size = min_datasize_to_combine_segment; /*default 128M*/
min_split_size <<= 20;
int expected_segment_num_with_minsize = (context.total_size + min_split_size - 1)
/ min_split_size;
if (context.randomSegNum > expected_segment_num_with_minsize) {
context.randomSegNum = expected_segment_num_with_minsize;
}
/* Step3 split segment when there are tow many files (default add one more segment per 100(guc) files)*/
int expected_segment_num_with_max_filecount = (context.total_file_count
+ max_filecount_notto_split_segment - 1)
/ max_filecount_notto_split_segment;
if (context.randomSegNum < expected_segment_num_with_max_filecount) {
context.randomSegNum = expected_segment_num_with_max_filecount;
}
/* Step4 we at least use one segment*/
if (context.randomSegNum < context.minimum_segment_num) {
context.randomSegNum = context.minimum_segment_num;
}
int maxExpectedNonRandomSegNum = 0;
if (maxExpectedNonRandomSegNum < context.tableFuncSegNum)
maxExpectedNonRandomSegNum = context.tableFuncSegNum;
if (maxExpectedNonRandomSegNum < context.hashSegNum)
maxExpectedNonRandomSegNum = context.hashSegNum;
if (debug_fake_segmentnum){
fpsegnum = fopen("/tmp/segmentnumber", "w+");
fprintf(fpsegnum, "Default segment num : %d.\n", GetHashDistPartitionNum());
fprintf(fpsegnum, "\n");
fprintf(fpsegnum, "From random relation segment num : %d.\n", context.randomSegNum);
fprintf(fpsegnum, "Result relation hash segment num : %d.\n", context.resultRelationHashSegNum);
fprintf(fpsegnum, "\n");
fprintf(fpsegnum, "Table function segment num : %d.\n", context.tableFuncSegNum);
fprintf(fpsegnum, "Extern table segment num : %d.\n", context.externTableForceSegNum);
fprintf(fpsegnum, "From hash relation segment num : %d.\n", context.hashSegNum);
fprintf(fpsegnum, "MaxExpectedNonRandom segment num : %d.\n", maxExpectedNonRandomSegNum);
fprintf(fpsegnum, "\n");
}
int minTargetSegmentNumber = 0;
int maxTargetSegmentNumber = 0;
/* we keep resultRelationHashSegNum in the highest priority*/
if (context.resultRelationHashSegNum != 0) {
if ((context.resultRelationHashSegNum < context.externTableForceSegNum
&& context.externTableForceSegNum != 0)
|| (context.resultRelationHashSegNum < context.externTableLocationSegNum)) {
/* bucket number of result table must be equal to or larger than
* location number of external table.*/
elog(ERROR, "bucket number of result hash table and external table should match each other");
}
maxTargetSegmentNumber = context.resultRelationHashSegNum;
minTargetSegmentNumber = context.resultRelationHashSegNum;
}
else if(context.externTableForceSegNum > 0){
/* location number of external table must be less than the number of virtual segments*/
if(context.externTableForceSegNum < context.externTableLocationSegNum){
elog(ERROR, "external table bucket number should match each other");
}
maxTargetSegmentNumber = context.externTableForceSegNum;
minTargetSegmentNumber = context.externTableForceSegNum;
}
else if (maxExpectedNonRandomSegNum > 0) {
if (maxExpectedNonRandomSegNum == context.hashSegNum) {
/* in general, we keep bucket number of hash table equals to the number of virtual segments
* but this rule can be broken when there is a large random table in the range tables list
*/
context.hashSegNum =
context.hashSegNum < context.minimum_segment_num ?
context.minimum_segment_num : context.hashSegNum;
double considerRandomWhenHashExistRatio = 1.5;
/*if size of random table >1.5 *hash table, we consider relax the restriction of hash bucket number*/
if (context.randomRelSize
> considerRandomWhenHashExistRatio * context.hashRelSize) {
if (context.randomSegNum < context.hashSegNum) {
context.randomSegNum = context.hashSegNum;
}
maxTargetSegmentNumber = context.randomSegNum;
minTargetSegmentNumber = context.minimum_segment_num;
} else {
maxTargetSegmentNumber = context.hashSegNum;
minTargetSegmentNumber = context.hashSegNum;
}
} else if (maxExpectedNonRandomSegNum == context.tableFuncSegNum) {
/* if there is a table function, we should at least use tableFuncSegNum virtual segments*/
context.tableFuncSegNum =
context.tableFuncSegNum < context.minimum_segment_num ?
context.minimum_segment_num : context.tableFuncSegNum;
if (context.randomSegNum < context.tableFuncSegNum) {
context.randomSegNum = context.tableFuncSegNum;
}
maxTargetSegmentNumber = context.randomSegNum;
minTargetSegmentNumber = context.minimum_segment_num;
}
} else {
maxTargetSegmentNumber = context.randomSegNum;
if(context.externTableLocationSegNum > 0 && maxTargetSegmentNumber < GetQueryVsegNum()){
maxTargetSegmentNumber = GetQueryVsegNum();
}
minTargetSegmentNumber = context.minimum_segment_num;
}
if (enforce_virtual_segment_number > 0) {
maxTargetSegmentNumber = enforce_virtual_segment_number;
minTargetSegmentNumber = enforce_virtual_segment_number;
}
/* in PBE mode, the execute should use the same vseg number. */
if(fixedVsegNum > 0 ){
maxTargetSegmentNumber = fixedVsegNum;
minTargetSegmentNumber = fixedVsegNum;
}
if(maxTargetSegmentNumber < minTargetSegmentNumber){
maxTargetSegmentNumber = minTargetSegmentNumber;
}
uint64_t before_rm_allocate_resource = gettime_microsec();
/* cost is use by RM to balance workload between hosts. the cost is at least one block size*/
int64 mincost = min_cost_for_each_query;
mincost <<= 20;
int64 queryCost = context.total_size < mincost ? mincost : context.total_size;
if (QRL_NONE != resourceLife) {
if (SPI_IsInPrepare())
{
resource = NULL;
/*
* prepare need to get resource quota from RM
* and pass quota(planner_segments) to Orca or Planner to generate plan
* the following executes(in PBE) should reallocate the same number
* of resources.
*/
uint32 seg_num;
uint32 seg_num_min;
uint32 seg_memory_mb;
double seg_core;
GetResourceQuota(maxTargetSegmentNumber,
minTargetSegmentNumber,
&seg_num,
&seg_num_min,
&seg_memory_mb,
&seg_core);
planner_segments = seg_num;
minTargetSegmentNumber = planner_segments;
maxTargetSegmentNumber = planner_segments;
}
else
{
resource = AllocateResource(QRL_ONCE, sliceNum, queryCost,
maxTargetSegmentNumber,
minTargetSegmentNumber,
context.host_context.hostnameVolInfos,
context.host_context.size);
}
}
/* for explain statement, we doesn't allocate resource physically*/
else {
uint32 seg_num, seg_num_min, seg_memory_mb;
double seg_core;
GetResourceQuota(maxTargetSegmentNumber, minTargetSegmentNumber, &seg_num,
&seg_num_min, &seg_memory_mb, &seg_core);
planner_segments = seg_num;
}
uint64_t after_rm_allocate_resource = gettime_microsec();
int eclaspeTime = after_rm_allocate_resource - before_rm_allocate_resource;
context.alloc_resource_time_us = eclaspeTime;
if(debug_datalocality_time){
elog(LOG, "rm allocate resource overall execution time: %d us. \n", eclaspeTime);
}
if (resource == NULL) {
result->resource = NULL;
result->alloc_results = NIL;
result->relsType = NIL;
result->planner_segments = planner_segments;
return result;
}
if (debug_fake_segmentnum){
fprintf(fpsegnum, "Target segment num Min: %d.\n", minTargetSegmentNumber);
fprintf(fpsegnum, "Target segment num Max: %d.\n", maxTargetSegmentNumber);
}
}
MemoryContextSwitchTo(context.datalocality_memorycontext);
virtual_segments = get_virtual_segments(resource);
int VirtualSegmentNumber = list_length(virtual_segments);
if (debug_fake_segmentnum){
fprintf(fpsegnum, "Real segment num : %d.\n", VirtualSegmentNumber);
fflush(fpsegnum);
fclose(fpsegnum);
fpsegnum = NULL;
elog(ERROR, "Abort fake segment number!");
}
/* for normal query if containerCount equals to 0, then stop the query.*/
if (resourceLife != QRL_NONE && VirtualSegmentNumber == 0) {
elog(ERROR, "Could not allocate enough resource!");
}
MemoryContextSwitchTo(context.old_memorycontext);
/* data locality allocation algorithm*/
alloc_result = run_allocation_algorithm(result, virtual_segments, &resource, &context);
result->resource = resource;
result->alloc_results = alloc_result;
result->planner_segments = list_length(resource->segments);
}
PG_CATCH();
{
cleanup_allocation_algorithm(&context);
PG_RE_THROW();
}
PG_END_TRY();
cleanup_allocation_algorithm(&context);
if(debug_datalocality_time){
elog(ERROR, "Abort debug metadata, datalocality, rm Time.");
}
return result;
}