thread: add over-engineered wrapper for thread management

This commit is contained in:
Andrei Alexeyev 2023-03-28 22:53:56 +02:00
parent 427d2ca32c
commit e957f11790
No known key found for this signature in database
GPG key ID: 72D26128040B9690
14 changed files with 387 additions and 58 deletions

View file

@ -13,11 +13,12 @@
#include "global.h"
#include "video.h"
#include "vfs/public.h"
#include "thread.h"
struct evloop_s evloop;
void eventloop_enter(void *context, LogicFrameFunc frame_logic, RenderFrameFunc frame_render, PostLoopFunc on_leave, uint target_fps) {
assert(is_main_thread());
assert(thread_current_is_main());
assume(evloop.stack_ptr < evloop.stack + EVLOOP_STACK_SIZE - 1);
LoopFrame *frame;
@ -39,7 +40,7 @@ void eventloop_enter(void *context, LogicFrameFunc frame_logic, RenderFrameFunc
}
void eventloop_leave(void) {
assert(is_main_thread());
assert(thread_current_is_main());
assume(evloop.stack_ptr != NULL);
LoopFrame *frame = evloop.stack_ptr;

View file

@ -11,10 +11,11 @@
#include "eventloop_private.h"
#include "util.h"
#include "framerate.h"
#include "thread.h"
#include "global.h"
void eventloop_run(void) {
assert(is_main_thread());
assert(thread_current_is_main());
if(evloop.stack_ptr == NULL) {
return;

View file

@ -10,6 +10,7 @@
#include "util.h"
#include "hirestime.h"
#include "thread.h"
static bool use_hires;
static hrtime_t time_current;
@ -81,7 +82,7 @@ void time_shutdown(void) {
hrtime_t time_get(void) {
if(use_hires) {
assert(is_main_thread());
assert(thread_current_is_main());
time_update();
return time_current;
}

View file

@ -15,6 +15,7 @@
#include "util.h"
#include "list.h"
#include "util/strbuf.h"
#include "thread.h"
typedef struct Logger {
LIST_INTERFACE(struct Logger);
@ -50,7 +51,7 @@ static struct {
} buffers;
struct {
SDL_Thread *thread;
Thread *thread;
SDL_mutex *mutex;
SDL_cond *cond;
LIST_ANCHOR(QueuedLogEntry) queue;
@ -103,7 +104,7 @@ static const char* level_prefix(LogLevel lvl) { INDEX_MAP(level_prefix_map, lvl)
static const char* level_name(LogLevel lvl) { INDEX_MAP(level_name_map, lvl) }
attr_unused static const char* level_ansi_style_code(LogLevel lvl) { INDEX_MAP(level_ansi_style_map, lvl) }
static void log_queue_shutdown(bool force_sync);
static void log_queue_shutdown_internal(bool force_sync);
noreturn static void log_abort(const char *msg) {
#ifdef LOG_FATAL_MSGBOX
@ -122,7 +123,7 @@ noreturn static void log_abort(const char *msg) {
}
#endif
log_queue_shutdown(true);
log_queue_shutdown_internal(true);
log_shutdown();
// abort() doesn't clean up, but it lets us get a backtrace, which is more useful
@ -311,6 +312,10 @@ static void log_internal(LogLevel lvl, const char *funcname, const char *filenam
.time = SDL_GetTicks(),
};
if(entry.thread) {
thread_incref(entry.thread);
}
if(logging.queue.thread) {
log_dispatch_async(&entry);
} else {
@ -361,7 +366,7 @@ static void *delete_logger(List **loggers, List *logger, void *arg) {
return NULL;
}
static int log_queue_thread(void *a) {
static void *log_queue_thread(void *a) {
SDL_mutex *mtx = logging.queue.mutex;
SDL_cond *cond = logging.queue.cond;
@ -386,7 +391,7 @@ static int log_queue_thread(void *a) {
}
SDL_UnlockMutex(mtx);
return 0;
return NULL;
}
static void log_queue_init(void) {
@ -404,13 +409,15 @@ static void log_queue_init(void) {
return;
}
if(!(logging.queue.thread = SDL_CreateThread(log_queue_thread, "Log queue", NULL))) {
log_sdl_error(LOG_ERROR, "SDL_CreateThread");
logging.queue.thread = thread_create("Log queue", log_queue_thread, NULL, THREAD_PRIO_LOW);
if(!logging.queue.thread) {
log_error("Failed to create log queue thread");
return;
}
}
static void log_queue_shutdown(bool force_sync) {
static void log_queue_shutdown_internal(bool force_sync) {
if(!logging.queue.thread) {
return;
}
@ -419,7 +426,7 @@ static void log_queue_shutdown(bool force_sync) {
logging.queue.shutdown = env_get("TAISEI_LOG_ASYNC_FAST_SHUTDOWN", false) && !force_sync ? 2 : 1;
SDL_CondBroadcast(logging.queue.cond);
SDL_UnlockMutex(logging.queue.mutex);
SDL_WaitThread(logging.queue.thread, NULL);
thread_wait(logging.queue.thread);
logging.queue.thread = NULL;
SDL_DestroyMutex(logging.queue.mutex);
logging.queue.mutex = NULL;
@ -427,6 +434,10 @@ static void log_queue_shutdown(bool force_sync) {
logging.queue.cond = NULL;
}
void log_queue_shutdown(void) {
log_queue_shutdown_internal(false);
}
void log_init(LogLevel lvls) {
logging.enabled_log_levels = lvls;
logging.mutex = SDL_CreateMutex();
@ -436,7 +447,7 @@ void log_init(LogLevel lvls) {
void log_shutdown(void) {
logging.initialized = false;
log_queue_shutdown(false);
log_queue_shutdown_internal(false);
list_foreach(&logging.outputs, delete_logger, NULL);
SDL_DestroyMutex(logging.mutex);
strbuf_free(&logging.buffers.pre_format);

View file

@ -122,6 +122,7 @@ extern Formatter log_formatter_file;
extern Formatter log_formatter_console;
void log_init(LogLevel lvls);
void log_queue_shutdown(void);
void log_shutdown(void);
void log_add_output(LogLevel levels, SDL_RWops *output, Formatter *formatter) attr_nonnull(3);
void log_backtrace(LogLevel lvl);

View file

@ -57,11 +57,11 @@ static void taisei_shutdown(void) {
events_shutdown();
time_shutdown();
coroutines_shutdown();
log_queue_shutdown();
thread_shutdown();
log_info("Good bye");
SDL_Quit();
log_shutdown();
SDL_Quit();
}
static void init_log(void) {
@ -160,8 +160,6 @@ static void init_sdl(void) {
SDL_SetHintWithPriority(SDL_HINT_EMSCRIPTEN_ASYNCIFY, "0", SDL_HINT_OVERRIDE);
#endif
main_thread_id = SDL_ThreadID();
SDL_LogPriority sdl_logprio = env_get("TAISEI_SDL_LOG", 0);
if(sdl_logprio >= SDL_LOG_PRIORITY_VERBOSE) {
@ -282,6 +280,7 @@ int main(int argc, char **argv) {
auto ctx = ALLOC(MainContext);
setlocale(LC_ALL, "C");
thread_init();
init_log();
stageinfo_init(); // cli_args depends on this

View file

@ -94,6 +94,7 @@ taisei_src = files(
'stageutils.c',
'stats.c',
'taskmanager.c',
'thread.c',
'transition.c',
'version.c',
'video.c',

View file

@ -727,7 +727,7 @@ static ResourceStatus pump_or_wait_for_resource_load_nolock(
if(load_state) {
ResourceStatus dep_status = pump_dependencies(load_state);
if(!pump_only && is_main_thread()) {
if(!pump_only && thread_current_is_main()) {
InternalResource *persistent = ires_get_persistent(ires);
if(dep_status == RES_STATUS_LOADING) {
@ -763,7 +763,7 @@ static ResourceStatus pump_or_wait_for_resource_load_nolock(
while(ires->status == RES_STATUS_LOADING && !pump_only) {
// If we get to this point, then we're waiting for a resource that's awaiting finalization on the main thread.
// If we *are* the main thread, there's no excuse for us to wait; we should've finalized the resource ourselves.
assert(!is_main_thread());
assert(!thread_current_is_main());
ires_cond_wait(ires);
}
@ -870,7 +870,7 @@ retry:
case LOAD_CONT: {
ResourceStatus dep_status;
if(is_main_thread()) {
if(thread_current_is_main()) {
dep_status = pump_dependencies(st);
if(dep_status == RES_STATUS_LOADING) {
@ -900,7 +900,7 @@ retry:
}
case LOAD_CONT_ON_MAIN:
if(pump_dependencies(st) == RES_STATUS_LOADING || !is_main_thread()) {
if(pump_dependencies(st) == RES_STATUS_LOADING || !thread_current_is_main()) {
lstate_set_ready_to_finalize(st);
ires_cond_broadcast(ires);
events_emit(TE_RESOURCE_ASYNC_LOADED, 0, ires, NULL);
@ -930,7 +930,7 @@ static SDLCALL int filter_asyncload_event(void *vires, SDL_Event *event) {
}
static bool unload_resource(InternalResource *ires) {
assert(is_main_thread());
assert(thread_current_is_main());
ResourceHandler *handler = get_ires_handler(ires);
const char *tname = handler->typename;
@ -1016,7 +1016,7 @@ static bool should_defer_load(InternalResLoadState *st) {
}
static bool resource_asyncload_handler(SDL_Event *evt, void *arg) {
assert(is_main_thread());
assert(thread_current_is_main());
InternalResource *ires = evt->user.data1;
InternalResLoadState *st = ires->load;

View file

@ -20,8 +20,7 @@ struct TaskManager {
uint running : 1;
uint aborted : 1;
SDL_atomic_t numtasks;
SDL_ThreadPriority thread_prio;
SDL_Thread *threads[];
Thread *threads[];
};
struct Task {
@ -71,14 +70,10 @@ static void task_free(Task *task) {
mem_free(task);
}
static int taskmgr_thread(void *arg) {
static void *taskmgr_thread(void *arg) {
TaskManager *mgr = arg;
attr_unused SDL_threadID tid = SDL_ThreadID();
if(SDL_SetThreadPriority(mgr->thread_prio) < 0) {
log_sdl_error(LOG_WARN, "SDL_SetThreadPriority");
}
bool running;
bool aborted;
@ -152,10 +147,10 @@ static int taskmgr_thread(void *arg) {
}
}
return 0;
return NULL;
}
TaskManager *taskmgr_create(uint numthreads, SDL_ThreadPriority prio, const char *name) {
TaskManager *taskmgr_create(uint numthreads, ThreadPriority prio, const char *name) {
int numcores = SDL_GetCPUCount();
if(numcores < 1) {
@ -185,19 +180,16 @@ TaskManager *taskmgr_create(uint numthreads, SDL_ThreadPriority prio, const char
}
mgr->numthreads = numthreads;
mgr->thread_prio = prio;
for(uint i = 0; i < numthreads; ++i) {
int digits = i ? log10(i) + 1 : 0;
int digits = i ? log10(i) + 1 : 1;
static const char *const prefix = "taskmgr";
char threadname[sizeof(prefix) + strlen(name) + digits + 2];
snprintf(threadname, sizeof(threadname), "%s:%s/%i", prefix, name, i);
if(!(mgr->threads[i] = SDL_CreateThread(taskmgr_thread, threadname, mgr))) {
log_sdl_error(LOG_WARN, "SDL_CreateThread");
if(!(mgr->threads[i] = thread_create(threadname, taskmgr_thread, mgr, prio))) {
for(uint j = 0; j < i; ++j) {
SDL_DetachThread(mgr->threads[j]);
thread_decref(mgr->threads[j]);
mgr->threads[j] = NULL;
}
@ -297,7 +289,7 @@ static void taskmgr_finalize_and_wait(TaskManager *mgr, bool do_abort) {
SDL_UnlockMutex(mgr->mutex);
for(uint i = 0; i < mgr->numthreads; ++i) {
SDL_WaitThread(mgr->threads[i], NULL);
thread_wait(mgr->threads[i]);
}
taskmgr_free(mgr);

View file

@ -9,7 +9,7 @@
#pragma once
#include "taisei.h"
#include <SDL.h>
#include "thread.h"
typedef struct TaskManager TaskManager;
typedef struct Task Task;
@ -51,7 +51,7 @@ typedef struct TaskParams {
/**
* Priority of the task. Lower values mean higher priority. Higher priority tasks are added
* to the queue ahead of the lower priority ones, and thus will start execute sooner. Note
* to the queue ahead of the lower priority ones, and thus will start executing sooner. Note
* that this affects only the pending tasks. A task that already began executing cannot be
* interrupted, regardless of its priority.
*/
@ -74,7 +74,7 @@ typedef struct TaskParams {
* On success, returns a pointer to the created TaskManager.
* On failure, returns NULL.
*/
TaskManager *taskmgr_create(uint numthreads, SDL_ThreadPriority prio, const char *name)
TaskManager *taskmgr_create(uint numthreads, ThreadPriority prio, const char *name)
attr_nodiscard attr_returns_max_aligned attr_nonnull(3);
/**

218
src/thread.c Normal file
View file

@ -0,0 +1,218 @@
/*
* This software is licensed under the terms of the MIT License.
* See COPYING for further information.
* ---
* Copyright (c) 2011-2019, Lukas Weber <laochailan@web.de>.
* Copyright (c) 2012-2019, Andrei Alexeyev <akari@taisei-project.org>.
*/
#include "taisei.h"
#include "thread.h"
#include "log.h"
#include "hashtable.h"
#include <SDL_thread.h>
static_assert(THREAD_PRIO_LOW == (int)SDL_THREAD_PRIORITY_LOW);
static_assert(THREAD_PRIO_NORMAL == (int)SDL_THREAD_PRIORITY_NORMAL);
static_assert(THREAD_PRIO_HIGH == (int)SDL_THREAD_PRIORITY_HIGH);
static_assert(THREAD_PRIO_CRITICAL == (int)SDL_THREAD_PRIORITY_TIME_CRITICAL);
enum ThreadState {
THREAD_STATE_EXECUTING,
THREAD_STATE_DONE,
THREAD_STATE_CLEANING_UP,
};
struct Thread {
SDL_Thread *sdlthread;
void *data;
ThreadID id;
SDL_atomic_t refcount;
SDL_atomic_t state;
char name[];
};
static struct {
ht_int2ptr_ts_t id_to_thread;
ThreadID main_id;
} threads;
void thread_init(void) {
threads.main_id = SDL_ThreadID();
ht_create(&threads.id_to_thread);
}
void thread_shutdown(void) {
ht_int2ptr_ts_iter_t iter;
ht_iter_begin(&threads.id_to_thread, &iter);
for(;iter.has_data; ht_iter_next(&iter)) {
Thread *thrd = iter.value;
int nref = SDL_AtomicGet(&thrd->refcount);
log_error("Thread '%s' had non-zero refcount of %i", thrd->name, nref);
SDL_Thread *sdlthread = SDL_AtomicSetPtr((void**)&thrd->sdlthread, NULL);
if(sdlthread) {
SDL_DetachThread(sdlthread);
}
}
ht_iter_end(&iter);
ht_destroy(&threads.id_to_thread);
}
static void thread_finalize(Thread *thrd) {
assert(thrd->sdlthread == NULL);
ht_unset(&threads.id_to_thread, thrd->id);
mem_free(thrd);
}
static void thread_try_finalize(Thread *thrd) {
if(SDL_AtomicCAS(&thrd->state, THREAD_STATE_DONE, THREAD_STATE_CLEANING_UP)) {
thread_finalize(thrd);
}
}
static void thread_try_detach(Thread *thrd) {
SDL_Thread *sdlthread = SDL_AtomicSetPtr((void**)&thrd->sdlthread, NULL);
if(sdlthread) {
SDL_DetachThread(sdlthread);
}
}
void thread_incref(Thread *thrd) {
SDL_AtomicIncRef(&thrd->refcount);
}
static void thread_decref_internal(Thread *thrd, bool try_detach) {
int prev_refcount = SDL_AtomicAdd(&thrd->refcount, -1);
assert(prev_refcount > 0);
if(prev_refcount == 1) {
thread_try_finalize(thrd);
}
}
void thread_decref(Thread *thrd) {
thread_decref_internal(thrd, true);
}
typedef struct ThreadCreateData {
Thread *thrd;
ThreadProc proc;
void *userdata;
ThreadPriority prio;
SDL_sem *init_sem;
} ThreadCreateData;
static int SDLCALL sdlthread_entry(void *data) {
auto tcd = *(ThreadCreateData*)data;
ThreadID id = SDL_ThreadID();
tcd.thrd->id = id;
ht_set(&threads.id_to_thread, id, tcd.thrd);
SDL_SemPost(tcd.init_sem);
if(SDL_SetThreadPriority(tcd.prio) < 0) {
log_sdl_error(LOG_WARN, "SDL_SetThreadPriority");
}
tcd.thrd->data = tcd.proc(tcd.userdata);
attr_unused bool cas_ok;
cas_ok = SDL_AtomicCAS(&tcd.thrd->state, THREAD_STATE_EXECUTING, THREAD_STATE_DONE);
assert(cas_ok);
if(SDL_AtomicGet(&tcd.thrd->refcount) < 1) {
thread_try_detach(tcd.thrd);
thread_try_finalize(tcd.thrd);
}
return 0;
}
Thread *thread_create(const char *name, ThreadProc proc, void *userdata, ThreadPriority prio) {
size_t nsize = strlen(name) + 1;
auto thrd = ALLOC_FLEX(Thread, nsize);
memcpy(thrd->name, name, nsize);
thrd->refcount.value = 1;
thrd->state.value = THREAD_STATE_EXECUTING;
ThreadCreateData tcd = {
.prio = prio,
.proc = proc,
.thrd = thrd,
.userdata = userdata,
.init_sem = SDL_CreateSemaphore(0),
};
if(UNLIKELY(!tcd.init_sem)) {
log_sdl_error(LOG_ERROR, "SDL_CreateSemaphore");
goto fail;
}
thrd->sdlthread = SDL_CreateThread(sdlthread_entry, name, &tcd);
if(UNLIKELY(!thrd->sdlthread)) {
log_sdl_error(LOG_ERROR, "SDL_CreateThread");
goto fail;
}
SDL_SemWait(tcd.init_sem);
SDL_DestroySemaphore(tcd.init_sem);
return thrd;
fail:
SDL_DestroySemaphore(tcd.init_sem);
mem_free(thrd);
return NULL;
}
Thread *thread_get_current(void) {
return ht_get(&threads.id_to_thread, thread_get_current_id(), NULL);
}
ThreadID thread_get_id(Thread *thrd) {
return thrd->id;
}
ThreadID thread_get_current_id(void) {
return SDL_ThreadID();
}
ThreadID thread_get_main_id(void) {
return threads.main_id;
}
bool thread_current_is_main(void) {
return thread_get_current_id() == threads.main_id;
}
const char *thread_get_name(Thread *thrd) {
return thrd->name;
}
void *thread_wait(Thread *thrd) {
SDL_Thread *sdlthread = SDL_AtomicSetPtr((void**)&thrd->sdlthread, NULL);
if(sdlthread) {
SDL_WaitThread(sdlthread, NULL);
}
void *r = thrd->data;
thread_decref_internal(thrd, false);
return r;
}
bool thread_get_result(Thread *thrd, void **result) {
if(SDL_AtomicGet(&thrd->state) == THREAD_STATE_EXECUTING) {
return false;
}
if(result) {
*result = thrd->data;
}
return true;
}

118
src/thread.h Normal file
View file

@ -0,0 +1,118 @@
/*
* This software is licensed under the terms of the MIT License.
* See COPYING for further information.
* ---
* Copyright (c) 2011-2019, Lukas Weber <laochailan@web.de>.
* Copyright (c) 2012-2019, Andrei Alexeyev <akari@taisei-project.org>.
*/
#pragma once
#include "taisei.h"
void thread_init(void);
void thread_shutdown(void);
typedef struct Thread Thread;
typedef uint64_t ThreadID;
typedef void *(*ThreadProc)(void *userdata);
typedef enum ThreadPriority {
THREAD_PRIO_LOW,
THREAD_PRIO_NORMAL,
THREAD_PRIO_HIGH,
THREAD_PRIO_CRITICAL,
} ThreadPriority;
/*
* Creates and starts executing a "managed" thread.
* Other threads not started by this function are known as "foreign", including the main thread.
*
* Thread objects have a reference count, which starts at 1.
* You can manipulate it via thread_incref() and thread_decref(); thread_wait() also decrements the
* reference count.
*
* Returns NULL if threads are not supported.
*/
Thread *thread_create(const char *name, ThreadProc proc, void *userdata, ThreadPriority prio)
attr_returns_max_aligned
attr_nonnull(1, 2);
/*
* Increments the thread's reference count.
*/
void thread_incref(Thread *thrd)
attr_nonnull_all;
/*
* Decrements the thread's reference count, releasing resources if it hits zero.
*
* If the reference count hits zero while the thread is running, the cleanup is delayed until after
* it's done executing, iff the reference count stays at 0 by then. Thus this is somewhat
* equivalent to pthread_detach, except it can be canceled by incrementing the refcount while the
* thread is still running. That is only guaranteed to be safe when doing so from within that
* thread, however.
*/
void thread_decref(Thread *thrd)
attr_nonnull_all;
/*
* Returns the executing managed Thread.
* For foreign threads this function returns NULL.
*/
Thread *thread_get_current(void);
/*
* Returns a unique ID of the Thread.
* The meaning of this value is system-specific.
*/
ThreadID thread_get_id(Thread *thrd)
attr_nonnull_all;
/*
* Returns a unique ID of the currently executing thread.
* This works even if the thread is foreign.
* For managed threads, this function returns the same value as thread_get_id(thread_get_current())
*/
ThreadID thread_get_current_id(void)
attr_pure;
/*
* Returns the unique id of the main thread.
*/
ThreadID thread_get_main_id(void)
attr_pure;
/*
* Returns true if the executing thread is the main thread.
*/
bool thread_current_is_main(void)
attr_pure;
/*
* Returns the name assigned to a managed thread at creation.
*/
const char *thread_get_name(Thread *thrd)
attr_returns_nonnull
attr_nonnull_all;
/*
* Waits for a thread to finish executing.
* Returns the entry point's return value.
* The thread can't be referenced again after this function returns.
*
* You must eventually wait on all threads you create, or detach them via thread_decref().
* Not doing so may result in a "zombie" thread preventing the main program from exiting.
*
* WARNING: This function also decrements the thread's reference count after the wait is done.
* You should not wait on a "detached" thread.
*/
void *thread_wait(Thread *thrd)
attr_nonnull_all;
/*
* Returns true if the thread has finished executing, and stores its return value into `result`.
*/
bool thread_get_result(Thread *thrd, void **result)
attr_nodiscard
attr_nonnull(1);

View file

@ -22,14 +22,3 @@ void inherit_missing_pointers(uint num, void *dest[num], void *const base[num])
}
}
}
SDL_threadID main_thread_id = 0;
bool is_main_thread(void) {
if(main_thread_id == 0) {
return true;
}
SDL_threadID tid = SDL_ThreadID();
return main_thread_id == tid;
}

View file

@ -12,7 +12,6 @@
#include <SDL.h>
void inherit_missing_pointers(uint num, void *dest[num], void *const base[num]) attr_nonnull(2, 3);
bool is_main_thread(void);
typedef union FloatBits {
float val;
@ -40,6 +39,4 @@ INLINE double bits_to_double(uint64_t i) {
return ((DoubleBits) { .bits = i }).val;
}
extern SDL_threadID main_thread_id;
#define ARRAY_SIZE(arr) (sizeof(arr)/sizeof(*(arr)))