blob: 2a31d292ca1e1a6a30782f9d0e2fb7966031dbb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* workermgr.c
* Worker manager is designed to delivery a general thread-based task framework.
*
* From the callee sight, there is a 'job' to run and needs some threads.
* Each job contains some groups, which is schedule unit. Each group corresponds
* to a thread in the worker manager. In each group, there are some tasks. The
* tasks in one group have to be same property. But each group may have
* different property.
*/
#include "postgres.h"
#include <pthread.h>
#include "cdb/workermgr.h"
#include "cdb/cdbgang.h" /* gp_pthread_create */
#include "miscadmin.h" /* TODO: InterruptPending */
#include "utils/faultinjector.h"
/*
* This structure abstract the general job.
*/
typedef struct WorkerMgrThread {
bool started;
int thread_errno;
int thread_ret;
pthread_t thread;
/* Argument passed to thread. */
struct WorkerMgrState *state;
WorkerMgrTaskCallback func;
Task task;
} WorkerMgrThread;
typedef struct WorkerMgrState {
/* Control flags */
volatile bool cancel;
int threads_num;
WorkerMgrThread threads[0];
} WorkerMgrState;
typedef struct WorkerMgrThreadIterator {
int thread_id;
} WorkerMgrThreadIterator;
static void workermgr_init_thread_iterator(WorkerMgrState *state, WorkerMgrThreadIterator *iterator);
static WorkerMgrThread *workermgr_get_thread_iterator(WorkerMgrState *state, WorkerMgrThreadIterator *iterator);
static void *workermgr_thread_func(void *arg);
static void workermgr_join(WorkerMgrState *state);
/*
* workermgr_create_workermgr_state
* Create data structures for threads_num of threads.
*/
WorkerMgrState *
workermgr_create_workermgr_state(int threads_num)
{
WorkerMgrState *state;
/* Allocate the threads control data structure. */
state = palloc0(sizeof(WorkerMgrState) + threads_num * sizeof(WorkerMgrThread));
state->threads_num = threads_num;
return state;
}
void
workermgr_free_workermgr_state(WorkerMgrState *state)
{
pfree(state);
}
/*
* workermgr_submit_job
* Error/Resource boundary: This function should not free memory. All of the
* other resources should be released.
*/
bool
workermgr_submit_job(WorkerMgrState *state,
List *tasks,
WorkerMgrTaskCallback func)
{
WorkerMgrThreadIterator thread_iterator;
WorkerMgrThread *worker_mgr_thread;
int i = 0;
workermgr_init_thread_iterator(state, &thread_iterator);
while ((worker_mgr_thread = workermgr_get_thread_iterator(state, &thread_iterator)) != NULL)
{
worker_mgr_thread->state = state;
worker_mgr_thread->task = (Task) list_nth(tasks, i);
i++;
worker_mgr_thread->func = func;
#ifdef FAULT_INJECTOR
FaultInjector_InjectFaultIfSet(
WorkerManagerSubmitJob,
DDLNotSpecified,
"", // databaseName
""); // tableName
#endif
worker_mgr_thread->thread_ret = gp_pthread_create(&worker_mgr_thread->thread, workermgr_thread_func, worker_mgr_thread, "submit_plan_to_qe");
if (worker_mgr_thread->thread_ret)
goto error_cleanup;
worker_mgr_thread->started = true;
}
return true;
error_cleanup:
/* cleanup */
state->cancel = true;
workermgr_join(state);
CHECK_FOR_INTERRUPTS();
return false;
}
void
workermgr_set_state_cancel(WorkerMgrState *state)
{
state->cancel = true;
}
/*
* workermgr_cancel_job
*/
void
workermgr_cancel_job(WorkerMgrState *state)
{
state->cancel = true;
workermgr_join(state);
CHECK_FOR_INTERRUPTS();
}
void
workermgr_wait_job(WorkerMgrState *state)
{
state->cancel = false;
workermgr_join(state);
}
static void
workermgr_init_thread_iterator(WorkerMgrState *state, WorkerMgrThreadIterator *iterator)
{
iterator->thread_id = 0;
}
static WorkerMgrThread *
workermgr_get_thread_iterator(WorkerMgrState *state, WorkerMgrThreadIterator *iterator)
{
if (iterator->thread_id >= state->threads_num)
return NULL;
return &state->threads[iterator->thread_id++];
}
/*
* workermgr_thread_func
*/
static void *
workermgr_thread_func(void *arg)
{
WorkerMgrThread *thread = (WorkerMgrThread *) arg;
thread->func(thread->task, thread->state);
return NULL;
}
bool
workermgr_should_query_stop(WorkerMgrState *state)
{
if (state->cancel)
return true;
/* Die/Cancel/Lost Client */
if (InterruptPending)
return true;
return false;
}
/*
* workermgr_thread_cleanup
* TODO: pthread_join may not be able to interrupt, so change it to a loop!
*/
static void
workermgr_thread_cleanup(WorkerMgrThread *thread)
{
if (thread->started)
{
pthread_join(thread->thread, NULL);
thread->started = false;
}
}
/*
* workermgr_join
* Cleanup the threads, set state->cancel properly before calling. This
* function have to be safe even if it was interrupted and get into again.
*/
static void
workermgr_join(WorkerMgrState *state)
{
WorkerMgrThreadIterator iterator;
WorkerMgrThread *thread;
workermgr_init_thread_iterator(state, &iterator);
while ((thread = workermgr_get_thread_iterator(state, &iterator)) != NULL)
{
workermgr_thread_cleanup(thread);
}
}