| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /*------------------------------------------------------------------------- |
| * |
| * dispatcher_mgt.c |
| * |
| * Dispatcher management is part of execution codes. |
| */ |
| |
| #include "postgres.h" |
| |
| #include "cdb/dispatcher.h" |
| #include "cdb/dispatcher_mgt.h" |
| #include "cdb/executormgr.h" |
| #include "cdb/workermgr.h" |
| #include "cdb/cdbvars.h" |
| #include "miscadmin.h" /* TODO: GetAuthenticatedUserId */ |
| |
| /* for poll */ |
| #ifdef HAVE_POLL_H |
| #include <poll.h> |
| #endif |
| #ifdef HAVE_SYS_POLL_H |
| #include <sys/poll.h> |
| #endif |
| #include "cdb/cdbconn.h" /* SOCK_ERRNO */ |
| |
| typedef enum DispMgtConstant { |
| DISPMGT_POLL_TIME = 2 * 1000, |
| } DispMgtConstant; |
| |
| /* |
| * QueryExecutorInGroupIterator/QueryExecutorGroupIterator |
| */ |
| typedef struct QueryExecutorInGroupIterator |
| { |
| int executor_id; |
| bool skip_stopped_executor; |
| } QueryExecutorInGroupIterator; |
| |
| typedef struct QueryExecutorGroupIterator |
| { |
| struct QueryExecutorTeam *team; |
| int group_id; |
| } QueryExecutorGroupIterator; |
| |
| /* Iterate all of groups. */ |
| static void dispmgt_init_query_executor_group_iterator(QueryExecutorTeam *team, |
| QueryExecutorGroupIterator *iterator); |
| static QueryExecutorGroup *dispmgt_get_query_executor_group_iterator( |
| QueryExecutorGroupIterator *iterator); |
| |
| |
| |
| /* |
| * dispmgt_init_query_executor_group_iterator |
| * Use stack allocate QueryExecutorGroupIterator to simplify error handler. |
| */ |
| static void |
| dispmgt_init_query_executor_group_iterator(QueryExecutorTeam *team, |
| QueryExecutorGroupIterator *iterator) |
| { |
| iterator->team = team; |
| iterator->group_id = 0; |
| } |
| |
| /* |
| * dispmgt_get_query_executor_group_iterator |
| */ |
| static QueryExecutorGroup * |
| dispmgt_get_query_executor_group_iterator( |
| QueryExecutorGroupIterator *iterator) |
| { |
| if (iterator->group_id >= iterator->team->query_executor_group_num) |
| return NULL; |
| |
| return &iterator->team->query_executor_groups[iterator->group_id++]; |
| } |
| |
| int |
| dispmgt_get_group_num(QueryExecutorTeam *team) |
| { |
| return team->query_executor_group_num; |
| } |
| |
| static void |
| dispmgt_init_query_executor_in_group_iterator(QueryExecutorGroup *group, |
| QueryExecutorInGroupIterator *iterator, |
| bool skip_stopped_executor) |
| { |
| iterator->executor_id = 0; |
| iterator->skip_stopped_executor = skip_stopped_executor; |
| } |
| |
| /* |
| * dispmgt_get_query_executor_in_group_iterator |
| */ |
| static struct QueryExecutor * |
| dispmgt_get_query_executor_in_group_iterator(QueryExecutorGroup *group, |
| QueryExecutorInGroupIterator *iterator) |
| { |
| for (; |
| iterator->executor_id < group->query_executor_num; |
| iterator->executor_id++) |
| { |
| struct QueryExecutor *executor = group->query_executors[iterator->executor_id]; |
| |
| if (iterator->skip_stopped_executor && |
| executormgr_is_stop(executor)) |
| continue; |
| |
| if (!executormgr_is_executor_valid(executor)) |
| continue; |
| |
| /* Increase index or we may return same object in the next iterate. */ |
| iterator->executor_id++; |
| return executor; |
| } |
| |
| return NULL; |
| } |
| |
| void |
| dispmgt_init_query_executor_iterator(QueryExecutorTeam *team, |
| QueryExecutorIterator *iterator) |
| { |
| iterator->team = team; |
| iterator->group_id = 0; |
| iterator->executor_id = 0; |
| } |
| |
| struct QueryExecutor * |
| dispmgt_get_query_executor_iterator(QueryExecutorIterator *iterator, bool mayContainInvalidExecutor) |
| { |
| QueryExecutorGroup *group; |
| struct QueryExecutor *executor = NULL; |
| |
| do { |
| if (iterator->group_id >= iterator->team->query_executor_group_num) |
| break; |
| |
| group = &iterator->team->query_executor_groups[iterator->group_id]; |
| |
| if (iterator->executor_id >= group->query_executor_num) |
| { |
| /* No more executor in this group, go to next group. */ |
| iterator->group_id++; |
| iterator->executor_id = 0; |
| continue; |
| } |
| |
| executor = group->query_executors[iterator->executor_id++]; |
| if (mayContainInvalidExecutor && !executormgr_is_executor_valid(executor)) |
| continue; |
| else |
| return executor; |
| } while (1); |
| |
| return NULL; |
| } |
| |
| List * |
| dispmgt_takeover_segment_conns(QueryExecutorTeam *team) |
| { |
| QueryExecutorIterator iterator; |
| struct QueryExecutor *executor; |
| List *segment_conns = NIL; |
| |
| dispmgt_init_query_executor_iterator(team, &iterator); |
| while ((executor = dispmgt_get_query_executor_iterator(&iterator, true))) |
| { |
| segment_conns = lappend(segment_conns, executormgr_takeover_segment_conns(executor)); |
| } |
| |
| return segment_conns; |
| } |
| |
| void |
| dispmgt_free_takeoved_segment_conns(List *takeoved_segment_conns) |
| { |
| ListCell *lc; |
| |
| foreach(lc, takeoved_segment_conns) |
| { |
| struct SegmentDatabaseDescriptor *desc = lfirst(lc); |
| |
| executormgr_free_takeovered_segment_conn(desc); |
| } |
| |
| list_free(takeoved_segment_conns); |
| } |
| |
| static QueryExecutorTeam * |
| dispmgt_create_executor_team(int group_num) |
| { |
| QueryExecutorTeam *team = palloc0(sizeof(QueryExecutorTeam)); |
| |
| team->query_executor_group_num = group_num; |
| team->query_executor_groups = palloc0(sizeof(QueryExecutorGroup) * group_num); |
| |
| return team; |
| } |
| |
| static bool |
| dispmgt_create_executor_group(QueryExecutorTeam *team, QueryExecutorGroup *group, int executor_num) |
| { |
| int i; |
| |
| group->team = team; |
| group->query_executor_num = executor_num; |
| group->query_executors = palloc0(sizeof(struct QueryExecutor *) * executor_num); |
| |
| for (i = 0; i < group->query_executor_num; i++) |
| group->query_executors[i] = executormgr_create_executor(); |
| |
| group->fds = palloc0(sizeof(struct pollfd) * group->query_executor_num); |
| return true; |
| } |
| |
| /* |
| * dispmgt_create_dispmgt_state |
| * Create data structure for workermgr(threads). |
| */ |
| QueryExecutorTeam * |
| dispmgt_create_dispmgt_state(struct DispatchData *data, |
| int threads_num, |
| int total_executors_num, |
| int avg_executors_per_thread) |
| { |
| QueryExecutorTeam *team; |
| QueryExecutorGroup *group; |
| QueryExecutorGroupIterator group_iterator; |
| |
| /* |
| * Create the workermgr data structure based on our schedule. |
| */ |
| team = dispmgt_create_executor_team(threads_num); |
| team->refDispatchData = data; |
| |
| dispmgt_init_query_executor_group_iterator(team, &group_iterator); |
| while ((group = dispmgt_get_query_executor_group_iterator(&group_iterator)) != NULL) |
| { |
| int executors_num_for_this_group = Min(total_executors_num, avg_executors_per_thread); |
| |
| total_executors_num -= executors_num_for_this_group; |
| dispmgt_create_executor_group(team, group, executors_num_for_this_group); |
| } |
| Assert(total_executors_num == 0); |
| |
| return team; |
| } |
| |
| /* |
| * dispmgt_thread_func_run |
| * This function is called for dispatching and monitoring the executors in one |
| * group. |
| * |
| * Error/Resource boundary: This function should not free memory and does not |
| * stop the thread too. It only cancel or |
| */ |
| static void |
| dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) |
| { |
| QueryExecutorInGroupIterator iterator; |
| struct DispatchData *data = group->team->refDispatchData; |
| struct QueryExecutor *executor; |
| struct QueryExecutor *err_handle_executor = NULL; |
| |
| /* Assume the connections are already set up. */ |
| dispmgt_init_query_executor_in_group_iterator(group, &iterator, false); |
| while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) |
| { |
| if (err_handle_executor == NULL) |
| err_handle_executor = executor; |
| |
| if (workermgr_should_query_stop(state)) { |
| write_log("%s(): query is canceled before dispatching. " |
| "Will exit and clean up.", __func__); |
| err_handle_executor = executor; |
| goto error_cleanup; |
| } |
| |
| if (!executormgr_dispatch_and_run(data, executor)) { |
| write_log("%s(): query cannot dispatch and run. " |
| "Will exit and clean up.", __func__); |
| err_handle_executor = executor; |
| goto error_cleanup; |
| } |
| } |
| |
| /* Poll executors. */ |
| while (1) |
| { |
| int nfds = 0; |
| int n; |
| int cur_fds_idx = 0; |
| |
| /* Check global state to abort query, this let poll process easier. */ |
| if (workermgr_should_query_stop(state)){ |
| write_log("%s(): query is canceled before polling executors." |
| "Will exit and clean up.", __func__); |
| goto error_cleanup; |
| } |
| /* Skip the stopped executor make the logic easy to understand. */ |
| dispmgt_init_query_executor_in_group_iterator(group, &iterator, true); |
| while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) |
| { |
| if (!executormgr_check_segment_status(executor)) |
| { |
| write_log("%s(): detected one segment (Global ID: %d) is down, " |
| "so abort the query that is running or will run on it", |
| __func__, executormgr_get_segment_ID(executor)); |
| err_handle_executor = executor; |
| goto error_cleanup; |
| } |
| |
| /* |
| * The fds array may shorter than executor array. |
| * DO NOT mark executor stop! |
| */ |
| group->fds[nfds].fd = executormgr_get_fd(executor); |
| group->fds[nfds].events = POLLIN; |
| nfds++; |
| } |
| |
| /* No need to work! */ |
| if (nfds == 0) |
| goto thread_return; |
| |
| /* |
| * Use the following strategy to process poll results. |
| * 1. continue if poll was interrupted |
| * 2. 'poll' error should be treated as a query stop error |
| * 2. Lost connection check can be |
| * 3. check the executor state if timeout too many times |
| * 4. check executor returns and stop them if executor finish |
| */ |
| n = poll(group->fds, nfds, DISPMGT_POLL_TIME); |
| |
| if (n < 0 && SOCK_ERRNO == EINTR) |
| continue; |
| |
| if (n < 0) |
| { |
| /* |
| * System call poll error is only caused by program bug or system |
| * resources unavailable. In this case, fail the query is okay. |
| */ |
| write_log("%s(): poll() failed with errno: %d. " |
| "Will exit and clean up.", __func__, SOCK_ERRNO); |
| goto error_cleanup; |
| } |
| |
| if (n == 0) |
| { |
| /* |
| * Network problem and long time query may get here. |
| * Check network problem is expensive. |
| * Long time query only concerns cancel query, check it later in the |
| * loop start. |
| */ |
| continue; |
| } |
| |
| /* Someone returns, check it. */ |
| dispmgt_init_query_executor_in_group_iterator(group, &iterator, true); |
| while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) |
| { |
| int __MAYBE_UNUSED sockfd; |
| |
| sockfd = executormgr_get_fd(executor); |
| /* TODO: is that safe to call Assert() in a thread ? */ |
| Assert(group->fds[cur_fds_idx].fd == sockfd); |
| if (!(group->fds[cur_fds_idx++].revents & POLLIN)) |
| continue; |
| |
| if (!executormgr_consume(executor)) { |
| write_log("%s(): fail to consume data. " |
| "Will exit and clean up.", __func__); |
| err_handle_executor = executor; |
| goto error_cleanup; |
| } |
| } |
| } |
| |
| error_cleanup: |
| /* |
| * Cleanup rules: |
| * 1. query cancel, result error, and poll error: mark the executor stop. |
| * 2. connection error: mark the gang error. Set by workermgr_mark_executor_error(). |
| */ |
| workermgr_set_state_cancel(state); |
| dispmgt_init_query_executor_in_group_iterator(group, &iterator, false); |
| while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) |
| { |
| if (!executormgr_is_stop(executor)) |
| { |
| /* |
| * Executor is running, cancel it. If connection error occurs let |
| * following code cope with it. |
| */ |
| executormgr_cancel(executor); |
| } |
| |
| /* Executor stopped but no error. */ |
| if (!executormgr_has_error(executor)) |
| continue; |
| } |
| |
| /* |
| * Previously query failed, probably in this executor. We need to |
| * set error code here if it has not been set although the executor |
| * is probably fine. This let the main process for the query proceed |
| * to cancel the query in its thread also, without waiting for a long |
| * time. We expect the error code have been set previously before jumping |
| * to error_cleanup. The code below could be the last defence. |
| */ |
| executormgr_seterrcode_if_needed(err_handle_executor); |
| |
| thread_return: |
| return; |
| } |
| |
| void |
| dispmgt_dispatch_and_run(struct WorkerMgrState *state, |
| struct QueryExecutorTeam *team) |
| { |
| QueryExecutorGroupIterator group_iterator; |
| struct QueryExecutorGroup *query_executor_group; |
| List *tasks = NIL; |
| |
| /* Construct a list of work for each of threads. */ |
| |
| dispmgt_init_query_executor_group_iterator(team, &group_iterator); |
| while ((query_executor_group = dispmgt_get_query_executor_group_iterator(&group_iterator)) != NULL) |
| tasks = lappend(tasks, (Task) query_executor_group); |
| |
| workermgr_submit_job(state, tasks, (WorkerMgrTaskCallback) dispmgt_thread_func_run); |
| } |
| |
| /* |
| * dispmgt_create_concurrent_connect_state |
| */ |
| static List * |
| dispmgt_create_concurrent_connect_state(List *executors, int executors_num_per_thread) |
| { |
| int threads_num = 1; |
| int executors_num = list_length(executors); |
| int left_executors_num = list_length(executors); |
| List *tasks = NIL; |
| int i, j; |
| |
| /* Compute threads_num */ |
| if (executors_num_per_thread > executors_num) |
| threads_num = 1; |
| else |
| threads_num = (executors_num / executors_num_per_thread) + ((executors_num % executors_num_per_thread == 0) ? 0 : 1); |
| |
| /* Create the List of List of info. */ |
| for (i = 0; i < threads_num; i++) |
| { |
| List *task = NIL; |
| |
| for (j = 0; j < executors_num_per_thread && left_executors_num > 0; j++) |
| { |
| task = lappend(task, list_nth(executors, i * executors_num_per_thread + j)); |
| left_executors_num--; |
| } |
| |
| tasks = lappend(tasks, task); |
| } |
| |
| return tasks; |
| } |
| |
| static void |
| dispmgt_free_concurrent_connect_state(List *tasks) |
| { |
| ListCell *lc; |
| |
| foreach(lc, tasks) |
| list_free(lfirst(lc)); |
| |
| list_free(tasks); |
| } |
| |
| |
| static void |
| dispmgt_thread_func_connect(List *executor_info, struct WorkerMgrState *state) |
| { |
| ListCell *lc; |
| |
| foreach(lc, executor_info) |
| { |
| ConcurrentConnectExecutorInfo *info = lfirst(lc); |
| |
| if (workermgr_should_query_stop(state)) |
| break; |
| |
| if (!executormgr_connect(info->desc, info->executor, info->is_writer, info->is_superuser)) |
| break; |
| } |
| |
| return; |
| } |
| |
| /* |
| * dispmgt_concurrent_connect |
| */ |
| bool |
| dispmgt_concurrent_connect(List *executors, int executors_num_per_thread) |
| { |
| struct WorkerMgrState *state = NULL; |
| List *tasks; |
| volatile bool has_error = false; |
| |
| Assert(list_length(executors) > 0); |
| tasks = dispmgt_create_concurrent_connect_state(executors, executors_num_per_thread); |
| state = workermgr_create_workermgr_state(list_length(tasks)); |
| |
| PG_TRY(); |
| { |
| workermgr_submit_job(state, tasks, (WorkerMgrTaskCallback) dispmgt_thread_func_connect); |
| workermgr_wait_job(state); |
| } |
| PG_CATCH(); |
| { |
| workermgr_cancel_job(state); |
| /* We have to clean up the executors. */ |
| dispmgt_free_concurrent_connect_state(tasks); |
| workermgr_free_workermgr_state(state); |
| |
| FlushErrorState(); |
| has_error = true; |
| } |
| PG_END_TRY(); |
| |
| /* Error occured and cleaned up. */ |
| if (has_error) |
| return false; |
| |
| CHECK_FOR_INTERRUPTS(); |
| dispmgt_free_concurrent_connect_state(tasks); |
| workermgr_free_workermgr_state(state); |
| |
| return true; |
| } |
| |
| ConcurrentConnectExecutorInfo * |
| dispmgt_build_preconnect_info(struct Segment *segment, |
| bool is_writer, |
| struct QueryExecutor *executor, |
| struct DispatchData *data, |
| struct DispatchSlice *slice, |
| struct DispatchTask *task) |
| { |
| ConcurrentConnectExecutorInfo *info = palloc0(sizeof(ConcurrentConnectExecutorInfo)); |
| |
| info->is_writer = is_writer; |
| info->is_superuser = superuser_arg(GetAuthenticatedUserId()); |
| info->executor = executor; |
| info->data = data; |
| info->slice = slice; |
| info->task = task; |
| info->desc = executormgr_prepare_connect(segment, is_writer); |
| |
| return info; |
| } |
| |
| void |
| dispmgt_free_preconnect_info(ConcurrentConnectExecutorInfo *info) |
| { |
| pfree(info); |
| } |
| |
| |