Threading improvements (#125)

* Add a general purpose multi-threaded task manager (worker pool) for background tasks

Reimplemented screenshots off-loading using the new task manager.

* Largerly rewrite resource loading internals

They use the new task manager API now and should be generally more
robust.

* Made the game playable without threads again

* wait for resource async load task instead of intermediate state change

* remove dead code

* taskmgr: if creating a worker thread fails, try to make sure the others terminate
This commit is contained in:
Andrei Alexeyev 2018-05-25 09:01:07 +03:00 committed by GitHub
parent bb0f9ce0d4
commit 09946ebff9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 1001 additions and 356 deletions

View file

@ -103,43 +103,31 @@ void play_loop(const char *name) {
}
}
void reset_sounds(void) {
Resource *res;
Sound *snd;
static void* reset_sounds_callback(const char *name, Resource *res, void *arg) {
bool reset = (intptr_t)arg;
Sound *snd = res->data;
for(HashtableIterator *i = hashtable_iter(get_resource_table(RES_SFX));
hashtable_iter_next(i, 0, (void**)&res);
) {
if(!(snd = res->data)) {
continue;
if(snd) {
if(reset) {
snd->lastplayframe = 0;
}
snd->lastplayframe = 0;
if(snd->islooping) {
if(snd->islooping && (global.frames > snd->lastplayframe + LOOPTIMEOUTFRAMES || reset)) {
snd->islooping = false;
audio_backend_sound_stop_loop(snd->impl);
}
}
return NULL;
}
void reset_sounds(void) {
resource_for_each(RES_SFX, reset_sounds_callback, (void*)true);
list_foreach(&sound_queue, discard_enqueued_sound, NULL);
}
void update_sounds(void) {
Resource *res;
Sound *snd;
for(HashtableIterator *i = hashtable_iter(get_resource_table(RES_SFX));
hashtable_iter_next(i, 0, (void**)&res);
) {
if(!(snd = res->data)) {
continue;
}
if(snd->islooping && global.frames > snd->lastplayframe + LOOPTIMEOUTFRAMES) {
snd->islooping = false;
audio_backend_sound_stop_loop(snd->impl);
}
}
resource_for_each(RES_SFX, reset_sounds_callback, (void*)false);
for(struct enqueued_sound *s = sound_queue, *next; s; s = next) {
next = (struct enqueued_sound*)s->next;

View file

@ -7,6 +7,8 @@
*/
#pragma once
#include "taisei.h"
#include "dialog.h"
void dialog_reimu_stage1(Dialog *d);

View file

@ -175,43 +175,61 @@ void hashtable_free(Hashtable *ht) {
free(ht);
}
static HashtableElement* hashtable_get_internal(Hashtable *ht, void *key, hash_t hash) {
HashtableElement *elems = ht->table[hash & ht->hash_mask];
for(HashtableElement *e = elems; e; e = e->next) {
if(hash == e->hash && ht->cmp_func(key, e->key)) {
return e;
}
}
return NULL;
}
void* hashtable_get(Hashtable *ht, void *key) {
assert(ht != NULL);
hash_t hash = ht->hash_func(key);
HashtableElement *elem;
void *data;
hashtable_begin_read(ht);
HashtableElement *elems = ht->table[hash & ht->hash_mask];
for(HashtableElement *e = elems; e; e = e->next) {
if(hash == e->hash && ht->cmp_func(key, e->key)) {
hashtable_end_read(ht);
return e->data;
}
}
elem = hashtable_get_internal(ht, key, hash);
data = elem ? elem->data : NULL;
hashtable_end_read(ht);
return NULL;
return data;
}
void* hashtable_get_unsafe(Hashtable *ht, void *key) {
assert(ht != NULL);
hash_t hash = ht->hash_func(key);
HashtableElement *elems = ht->table[hash & ht->hash_mask];
HashtableElement *elem;
void *data;
for(HashtableElement *e = elems; e; e = e->next) {
if(hash == e->hash && ht->cmp_func(key, e->key)) {
return e->data;
}
}
elem = hashtable_get_internal(ht, key, hash);
data = elem ? elem->data : NULL;
return NULL;
return data;
}
static void hashtable_set_internal(Hashtable *ht, HashtableElement **table, size_t hash_mask, hash_t hash, void *key, void *data) {
static bool hashtable_set_internal(Hashtable *ht, HashtableElement **table, size_t hash_mask, hash_t hash, void *key, void *data, void* (*datafunc)(void*), bool allow_overwrite, void **val) {
size_t idx = hash & hash_mask;
HashtableElement *elems = table[idx], *elem;
void *result = NULL;
for(HashtableElement *e = elems; e; e = e->next) {
if(hash == e->hash && ht->cmp_func(key, e->key)) {
if(!allow_overwrite) {
if(val != NULL) {
*val = e->data;
}
return result;
}
if(ht->free_func) {
ht->free_func(e->key);
}
@ -222,6 +240,14 @@ static void hashtable_set_internal(Hashtable *ht, HashtableElement **table, size
}
}
if(datafunc != NULL) {
data = datafunc(data);
}
if(val != NULL) {
*val = data;
}
if(data) {
elem = malloc(sizeof(HashtableElement));
ht->copy_func(&elem->key, key);
@ -229,9 +255,11 @@ static void hashtable_set_internal(Hashtable *ht, HashtableElement **table, size
elem->data = data;
list_push(&elems, elem);
ht->num_elements++;
result = data;
}
table[idx] = elems;
return result;
}
static void hashtable_resize(Hashtable *ht, size_t new_size) {
@ -241,7 +269,7 @@ static void hashtable_resize(Hashtable *ht, size_t new_size) {
for(size_t i = 0; i < ht->table_size; ++i) {
for(HashtableElement *e = ht->table[i]; e; e = e->next) {
hashtable_set_internal(ht, new_table, new_hash_mask, e->hash, e->key, e->data);
hashtable_set_internal(ht, new_table, new_hash_mask, e->hash, e->key, e->data, NULL, true, NULL);
}
}
@ -264,7 +292,7 @@ void hashtable_set(Hashtable *ht, void *key, void *data) {
hash_t hash = ht->hash_func(key);
hashtable_begin_write(ht);
hashtable_set_internal(ht, ht->table, ht->hash_mask, hash, key, data);
hashtable_set_internal(ht, ht->table, ht->hash_mask, hash, key, data, NULL, true, NULL);
if(ht->num_elements == ht->table_size) {
hashtable_resize(ht, ht->table_size * 2);
@ -273,6 +301,22 @@ void hashtable_set(Hashtable *ht, void *key, void *data) {
hashtable_end_write(ht);
}
bool hashtable_try_set(Hashtable *ht, void *key, void *data, void* (*datafunc)(void*), void **val) {
assert(ht != NULL);
hash_t hash = ht->hash_func(key);
hashtable_begin_write(ht);
bool result = hashtable_set_internal(ht, ht->table, ht->hash_mask, hash, key, data, datafunc, false, val);
if(ht->num_elements == ht->table_size) {
hashtable_resize(ht, ht->table_size * 2);
}
hashtable_end_write(ht);
return result;
}
void hashtable_unset(Hashtable *ht, void *key) {
hashtable_set(ht, key, NULL);
}

View file

@ -38,6 +38,7 @@ void hashtable_free(Hashtable *ht);
void* hashtable_get(Hashtable *ht, void *key) attr_hot;
void* hashtable_get_unsafe(Hashtable *ht, void *key) attr_hot;
void hashtable_set(Hashtable *ht, void *key, void *data);
bool hashtable_try_set(Hashtable *ht, void *key, void *data, void* (*datafunc)(void*), void **val);
void hashtable_unset(Hashtable *ht, void *key);
void hashtable_unset_deferred(Hashtable *ht, void *key, ListContainer **list);
void hashtable_unset_deferred_now(Hashtable *ht, ListContainer **list);

View file

@ -56,6 +56,7 @@ void time_init(void) {
if(use_hires) {
if(!(paranoia = SDL_CreateMutex())) {
log_warn("Not using the system high resolution timer: SDL_CreateMutex() failed: %s", SDL_GetError());
use_hires = false;
return;
}

View file

@ -78,6 +78,8 @@ bool log_initialized(void);
#define log_fatal(...) _taisei_log_fatal(LOG_FATAL, __func__, __VA_ARGS__)
#define log_custom(lvl, ...) _taisei_log(lvl, false, __func__, __VA_ARGS__)
#define log_sdl_error(funcname) log_warn("%s() failed: %s", funcname, SDL_GetError())
//
// don't call these directly, use the macros
//

View file

@ -26,10 +26,13 @@
#include "version.h"
#include "credits.h"
#include "renderer/api.h"
#include "taskmanager.h"
static void taisei_shutdown(void) {
log_info("Shutting down");
taskmgr_global_shutdown();
if(!global.is_replay_verification) {
config_save();
progress_save();
@ -229,6 +232,7 @@ int main(int argc, char **argv) {
config_load();
init_sdl();
taskmgr_global_init();
time_init();
init_global(&a);
events_init();

View file

@ -84,6 +84,7 @@ taisei_src = files(
'stageobjects.c',
'stagetext.c',
'stageutils.c',
'taskmanager.c',
'transition.c',
'version.c',
'video.c',

View file

@ -15,6 +15,7 @@
#include "video.h"
#include "menu/mainmenu.h"
#include "events.h"
#include "taskmanager.h"
#include "texture.h"
#include "animation.h"
@ -42,70 +43,123 @@ ResourceHandler *_handlers[] = {
[RES_SHADER_PROGRAM] = NULL,
};
typedef enum ResourceStatus {
RES_STATUS_LOADING,
RES_STATUS_LOADED,
RES_STATUS_FAILED,
} ResourceStatus;
typedef struct InternalResource {
Resource res;
ResourceStatus status;
SDL_mutex *mutex;
SDL_cond *cond;
Task *async_task;
} InternalResource;
typedef struct ResourceAsyncLoadData {
InternalResource *ires;
char *path;
char *name;
ResourceFlags flags;
void *opaque;
} ResourceAsyncLoadData;
struct ResourceHandlerPrivate {
Hashtable *mapping;
};
Resources resources;
static SDL_threadID main_thread_id;
static SDL_threadID main_thread_id; // TODO: move this somewhere else
static inline ResourceHandler* get_handler(ResourceType type) {
return *(_handlers + type);
}
static void alloc_handler(ResourceHandler *h) {
assert(h != NULL);
h->mapping = hashtable_new_stringkeys();
h->async_load_data = hashtable_new_stringkeys();
static inline ResourceHandler* get_ires_handler(InternalResource *ires) {
return get_handler(ires->res.type);
}
static void unload_resource(Resource *res) {
if(!(res->flags & RESF_FAILED)) {
get_handler(res->type)->procs.unload(res->data);
}
free(res);
static void alloc_handler(ResourceHandler *h) {
assert(h != NULL);
h->private = calloc(1, sizeof(ResourceHandlerPrivate));
h->private->mapping = hashtable_new_stringkeys();
}
static const char* type_name(ResourceType type) {
return get_handler(type)->typename;
}
Resource* insert_resource(ResourceType type, const char *name, void *data, ResourceFlags flags, const char *source) {
assert(name != NULL);
assert(source != NULL);
static void* datafunc_begin_load_resource(void *arg) {
ResourceType type = (intptr_t)arg;
if(data == NULL) {
const char *typename = type_name(type);
if(!(flags & RESF_OPTIONAL)) {
log_fatal("Required %s '%s' couldn't be loaded", typename, name);
} else {
log_warn("Failed to load %s '%s'", typename, name);
InternalResource *ires = calloc(1, sizeof(InternalResource));
ires->res.type = type;
ires->status = RES_STATUS_LOADING;
ires->mutex = SDL_CreateMutex();
ires->cond = SDL_CreateCond();
return ires;
}
static bool try_begin_load_resource(ResourceType type, const char *name, InternalResource **out_ires) {
ResourceHandler *handler = get_handler(type);
return hashtable_try_set(handler->private->mapping, (char*)name, (void*)(uintptr_t)type, datafunc_begin_load_resource, (void**)out_ires);
}
static void load_resource_finish(InternalResource *ires, void *opaque, const char *path, const char *name, char *allocated_path, char *allocated_name, ResourceFlags flags);
static void finish_async_load(InternalResource *ires, ResourceAsyncLoadData *data) {
assert(ires == data->ires);
assert(ires->status == RES_STATUS_LOADING);
load_resource_finish(ires, data->opaque, data->path, data->name, data->path, data->name, data->flags);
SDL_CondBroadcast(data->ires->cond);
assert(ires->status != RES_STATUS_LOADING);
free(data);
}
static ResourceStatus wait_for_resource_load(InternalResource *ires) {
SDL_LockMutex(ires->mutex);
if(ires->async_task != NULL && SDL_ThreadID() == main_thread_id) {
assert(ires->status == RES_STATUS_LOADING);
ResourceAsyncLoadData *data;
Task *task = ires->async_task;
ires->async_task = NULL;
SDL_UnlockMutex(ires->mutex);
if(!task_finish(task, (void**)&data)) {
log_fatal("Internal error: ires->async_task failed");
}
SDL_LockMutex(ires->mutex);
if(ires->status == RES_STATUS_LOADING) {
finish_async_load(ires, data);
}
}
ResourceHandler *handler = get_handler(type);
Resource *oldres = hashtable_get_string(handler->mapping, name);
Resource *res = malloc(sizeof(Resource));
if(type == RES_MODEL || env_get("TAISEI_NOUNLOAD", false)) {
// FIXME: models can't be safely unloaded at runtime
flags |= RESF_PERMANENT;
while(ires->status == RES_STATUS_LOADING) {
SDL_CondWait(ires->cond, ires->mutex);
}
res->type = handler->type;
res->flags = flags;
res->data = data;
ResourceStatus status = ires->status;
SDL_UnlockMutex(ires->mutex);
if(oldres) {
log_warn("Replacing a previously loaded %s '%s'", type_name(type), name);
unload_resource(oldres);
return status;
}
static void unload_resource(InternalResource *ires) {
if(wait_for_resource_load(ires) == RES_STATUS_LOADED) {
get_handler(ires->res.type)->procs.unload(ires->res.data);
}
hashtable_set_string(handler->mapping, name, res);
if(data) {
log_info("Loaded %s '%s' from '%s' (%s)", type_name(handler->type), name, source,
(flags & RESF_PERMANENT) ? "permanent" : "transient");
}
return res;
SDL_DestroyCond(ires->cond);
SDL_DestroyMutex(ires->mutex);
free(ires);
}
static char* get_name(ResourceHandler *handler, const char *path) {
@ -116,104 +170,65 @@ static char* get_name(ResourceHandler *handler, const char *path) {
return resource_util_basename(handler->subdir, path);
}
typedef struct ResourceAsyncLoadData {
ResourceHandler *handler;
char *path;
char *name;
ResourceFlags flags;
void *opaque;
} ResourceAsyncLoadData;
static int load_resource_async_thread(void *vdata) {
static void* load_resource_async_task(void *vdata) {
ResourceAsyncLoadData *data = vdata;
SDL_SetThreadPriority(SDL_THREAD_PRIORITY_LOW);
data->opaque = data->handler->procs.begin_load(data->path, data->flags);
events_emit(TE_RESOURCE_ASYNC_LOADED, 0, data, NULL);
SDL_LockMutex(data->ires->mutex);
data->opaque = get_ires_handler(data->ires)->procs.begin_load(data->path, data->flags);
events_emit(TE_RESOURCE_ASYNC_LOADED, 0, data->ires, data);
SDL_UnlockMutex(data->ires->mutex);
return 0;
return data;
}
static Resource* load_resource_finish(void *opaque, ResourceHandler *handler, const char *path, const char *name, char *allocated_path, char *allocated_name, ResourceFlags flags);
static bool resource_asyncload_handler(SDL_Event *evt, void *arg) {
assert(SDL_ThreadID() == main_thread_id);
ResourceAsyncLoadData *data = evt->user.data1;
InternalResource *ires = evt->user.data1;
if(!data) {
SDL_LockMutex(ires->mutex);
Task *task = ires->async_task;
assert(!task || ires->status == RES_STATUS_LOADING);
ires->async_task = NULL;
SDL_UnlockMutex(ires->mutex);
if(task == NULL) {
return true;
}
char name[strlen(data->name) + 1];
strcpy(name, data->name);
ResourceAsyncLoadData *data, *verify_data;
load_resource_finish(data->opaque, data->handler, data->path, data->name, data->path, data->name, data->flags);
hashtable_unset(data->handler->async_load_data, name);
free(data);
if(!task_finish(task, (void**)&verify_data)) {
log_fatal("Internal error: data->ires->async_task failed");
}
SDL_LockMutex(ires->mutex);
if(ires->status == RES_STATUS_LOADING) {
data = evt->user.data2;
assert(data == verify_data);
finish_async_load(ires, data);
}
SDL_UnlockMutex(ires->mutex);
return true;
}
static void load_resource_async(ResourceHandler *handler, char *path, char *name, ResourceFlags flags) {
log_debug("Loading %s '%s' asynchronously", type_name(handler->type), name);
static void load_resource_async(InternalResource *ires, char *path, char *name, ResourceFlags flags) {
ResourceAsyncLoadData *data = malloc(sizeof(ResourceAsyncLoadData));
hashtable_set_string(handler->async_load_data, name, data);
data->handler = handler;
log_debug("Loading %s '%s' asynchronously", type_name(ires->res.type), name);
data->ires = ires;
data->path = path;
data->name = name;
data->flags = flags;
SDL_Thread *thread = SDL_CreateThread(load_resource_async_thread, __func__, data);
if(thread) {
SDL_DetachThread(thread);
} else {
log_warn("SDL_CreateThread() failed: %s", SDL_GetError());
log_warn("Falling back to synchronous loading. Use TAISEI_NOASYNC=1 to suppress this warning.");
load_resource_async_thread(data);
}
ires->async_task = taskmgr_global_submit((TaskParams) { load_resource_async_task, data });
}
static void update_async_load_state(void) {
SDL_Event evt;
uint32_t etype = MAKE_TAISEI_EVENT(TE_RESOURCE_ASYNC_LOADED);
while(SDL_PeepEvents(&evt, 1, SDL_GETEVENT, etype, etype)) {
resource_asyncload_handler(&evt, NULL);
}
}
static bool resource_check_async_load(ResourceHandler *handler, const char *name) {
if(SDL_ThreadID() == main_thread_id) {
update_async_load_state();
}
ResourceAsyncLoadData *data = hashtable_get_string(handler->async_load_data, name);
return data;
}
static void resource_wait_for_async_load(ResourceHandler *handler, const char *name) {
while(resource_check_async_load(handler, name));
}
static void resource_wait_for_all_async_loads(ResourceHandler *handler) {
char *key;
hashtable_lock(handler->async_load_data);
HashtableIterator *i = hashtable_iter(handler->async_load_data);
while(hashtable_iter_next(i, (void**)&key, NULL)) {
resource_check_async_load(handler, key);
}
hashtable_unlock(handler->async_load_data);
}
static Resource* load_resource(ResourceHandler *handler, const char *path, const char *name, ResourceFlags flags, bool async) {
Resource *res;
flags &= ~RESF_FAILED;
void load_resource(InternalResource *ires, const char *path, const char *name, ResourceFlags flags, bool async) {
ResourceHandler *handler = get_ires_handler(ires);
const char *typename = type_name(handler->type);
char *allocated_path = NULL;
char *allocated_name = NULL;
@ -237,7 +252,7 @@ static Resource* load_resource(ResourceHandler *handler, const char *path, const
log_warn("Failed to locate %s '%s'", typename, name);
}
flags |= RESF_FAILED;
ires->status = RES_STATUS_FAILED;
}
} else if(!name) {
name = allocated_name = get_name(handler, path);
@ -247,76 +262,78 @@ static Resource* load_resource(ResourceHandler *handler, const char *path, const
assert(handler->procs.check(path));
}
if(async) {
if(resource_check_async_load(handler, name)) {
return NULL;
}
} else {
resource_wait_for_async_load(handler, name);
}
res = hashtable_get_string(handler->mapping, name);
if(res) {
log_warn("%s '%s' is already loaded", typename, name);
free(allocated_name);
return res;
}
if(flags & RESF_FAILED) {
return load_resource_finish(NULL, handler, path, name, allocated_path, allocated_name, flags);
if(ires->status == RES_STATUS_FAILED) {
load_resource_finish(ires, NULL, path, name, allocated_path, allocated_name, flags);
return;
}
if(async) {
// these will be freed when loading is done
path = allocated_path ? allocated_path : strdup(path);
name = allocated_name ? allocated_name : strdup(name);
load_resource_async(handler, (char*)path, (char*)name, flags);
return NULL;
load_resource_async(ires, (char*)path, (char*)name, flags);
} else {
load_resource_finish(ires, handler->procs.begin_load(path, flags), path, name, allocated_path, allocated_name, flags);
}
return load_resource_finish(handler->procs.begin_load(path, flags), handler, path, name, allocated_path, allocated_name, flags);
}
static Resource* load_resource_finish(void *opaque, ResourceHandler *handler, const char *path, const char *name, char *allocated_path, char *allocated_name, ResourceFlags flags) {
void *raw = (flags & RESF_FAILED) ? NULL : handler->procs.end_load(opaque, path, flags);
static void finalize_resource(InternalResource *ires, const char *name, void *data, ResourceFlags flags, const char *source) {
assert(name != NULL);
assert(source != NULL);
if(raw == NULL) {
flags |= RESF_FAILED;
if(data == NULL) {
const char *typename = type_name(ires->res.type);
if(!(flags & RESF_OPTIONAL)) {
log_fatal("Required %s '%s' couldn't be loaded", typename, name);
} else {
log_warn("Failed to load %s '%s'", typename, name);
}
}
if(ires->res.type == RES_MODEL || env_get("TAISEI_NOUNLOAD", false)) {
// FIXME: models can't be safely unloaded at runtime
flags |= RESF_PERMANENT;
}
ires->res.flags = flags;
ires->res.data = data;
if(data) {
log_info("Loaded %s '%s' from '%s' (%s)", type_name(ires->res.type), name, source, (flags & RESF_PERMANENT) ? "permanent" : "transient");
}
ires->status = data ? RES_STATUS_LOADED : RES_STATUS_FAILED;
}
static void load_resource_finish(InternalResource *ires, void *opaque, const char *path, const char *name, char *allocated_path, char *allocated_name, ResourceFlags flags) {
void *raw = (ires->status == RES_STATUS_FAILED) ? NULL : get_ires_handler(ires)->procs.end_load(opaque, path, flags);
name = name ? name : "<name unknown>";
path = path ? path : "<path unknown>";
char *sp = vfs_repr(path, true);
Resource *res = insert_resource(handler->type, name, raw, flags, sp ? sp : path);
finalize_resource(ires, name, raw, flags, sp ? sp : path);
free(sp);
free(allocated_path);
free(allocated_name);
return res;
}
Resource* get_resource(ResourceType type, const char *name, ResourceFlags flags) {
ResourceHandler *handler = get_handler(type);
InternalResource *ires;
Resource *res;
if(flags & RESF_UNSAFE) {
res = hashtable_get_unsafe(handler->mapping, (void*)name);
flags &= ~RESF_UNSAFE;
} else {
res = hashtable_get(handler->mapping, (void*)name);
ires = hashtable_get_unsafe(get_handler(type)->private->mapping, (char*)name);
if(ires != NULL && ires->status == RES_STATUS_LOADED) {
return &ires->res;
}
}
if(res) {
return res;
}
if(try_begin_load_resource(type, name, &ires)) {
SDL_LockMutex(ires->mutex);
resource_wait_for_async_load(handler, name);
res = hashtable_get(handler->mapping, (void*)name);
if(!res) {
if(!(flags & RESF_PRELOAD)) {
log_warn("%s '%s' was not preloaded", type_name(type), name);
@ -325,15 +342,31 @@ Resource* get_resource(ResourceType type, const char *name, ResourceFlags flags)
}
}
res = load_resource(handler, NULL, name, flags, false);
}
load_resource(ires, NULL, name, flags, false);
SDL_CondBroadcast(ires->cond);
if(res && (flags & RESF_PERMANENT) && !(res->flags & (RESF_PERMANENT | RESF_FAILED))) {
log_debug("Promoted %s '%s' to permanent", type_name(type), name);
res->flags |= RESF_PERMANENT;
}
if(ires->status == RES_STATUS_FAILED) {
res = NULL;
} else {
assert(ires->status == RES_STATUS_LOADED);
assert(ires->res.data != NULL);
res = &ires->res;
}
return res;
SDL_UnlockMutex(ires->mutex);
return res;
} else {
ResourceStatus status = wait_for_resource_load(ires);
if(status == RES_STATUS_FAILED) {
return NULL;
}
assert(status == RES_STATUS_LOADED);
assert(ires->res.data != NULL);
return &ires->res;
}
}
void* get_resource_data(ResourceType type, const char *name, ResourceFlags flags) {
@ -346,22 +379,17 @@ void* get_resource_data(ResourceType type, const char *name, ResourceFlags flags
return NULL;
}
Hashtable* get_resource_table(ResourceType type) {
return get_handler(type)->mapping;
}
void preload_resource(ResourceType type, const char *name, ResourceFlags flags) {
if(env_get("TAISEI_NOPRELOAD", false))
return;
ResourceHandler *handler = get_handler(type);
InternalResource *ires;
if(hashtable_get_string(handler->mapping, name) ||
hashtable_get_string(handler->async_load_data, name)) {
return;
if(try_begin_load_resource(type, name, &ires)) {
SDL_LockMutex(ires->mutex);
load_resource(ires, NULL, name, flags | RESF_PRELOAD, !env_get("TAISEI_NOASYNC", false));
SDL_UnlockMutex(ires->mutex);
}
load_resource(handler, NULL, name, flags | RESF_PRELOAD, !env_get("TAISEI_NOASYNC", false));
}
void preload_resources(ResourceType type, ResourceFlags flags, const char *firstname, ...) {
@ -450,6 +478,24 @@ static void* preload_shaders(const char *path, void *arg) {
return NULL;
}
struct resource_for_each_arg {
void *(*callback)(const char *name, Resource *res, void *arg);
void *arg;
};
static void* resource_for_each_ht_adapter(void *key, void *data, void *varg) {
const char *name = key;
InternalResource *ires = data;
struct resource_for_each_arg *arg = varg;
return arg->callback(name, &ires->res, arg->arg);
}
void* resource_for_each(ResourceType type, void* (*callback)(const char *name, Resource *res, void *arg), void *arg) {
ResourceHandler *handler = get_handler(type);
struct resource_for_each_arg htarg = { callback, arg };
return hashtable_foreach(handler->private->mapping, resource_for_each_ht_adapter, &htarg);
}
void load_resources(void) {
menu_preload();
@ -463,31 +509,46 @@ void free_resources(bool all) {
for(ResourceType type = 0; type < RES_NUMTYPES; ++type) {
ResourceHandler *handler = get_handler(type);
resource_wait_for_all_async_loads(handler);
char *name;
Resource *res;
InternalResource *ires;
ListContainer *unset_list = NULL;
for(HashtableIterator *i = hashtable_iter(handler->mapping); hashtable_iter_next(i, (void**)&name, (void**)&res);) {
if(!all && res->flags & RESF_PERMANENT)
hashtable_lock(handler->private->mapping);
for(HashtableIterator *i = hashtable_iter(handler->private->mapping); hashtable_iter_next(i, (void**)&name, (void**)&ires);) {
if(!all && (ires->res.flags & RESF_PERMANENT)) {
continue;
}
attr_unused ResourceFlags flags = res->flags;
unload_resource(res);
log_debug("Unloaded %s '%s' (%s)", type_name(type), name,
(flags & RESF_PERMANENT) ? "permanent" : "transient"
);
list_push(&unset_list, list_wrap_container(name));
}
hashtable_unlock(handler->private->mapping);
for(ListContainer *c; (c = list_pop(&unset_list));) {
char *tmp = c->data;
char name[strlen(tmp) + 1];
strcpy(name, tmp);
ires = hashtable_get_string(handler->private->mapping, name);
attr_unused ResourceFlags flags = ires->res.flags;
if(!all) {
hashtable_unset_deferred(handler->mapping, name, &unset_list);
hashtable_unset_string(handler->private->mapping, name);
}
unload_resource(ires);
free(c);
log_debug(
"Unloaded %s '%s' (%s)",
type_name(type),
name,
(flags & RESF_PERMANENT) ? "permanent" : "transient"
);
}
if(all) {
hashtable_free(handler->mapping);
} else {
hashtable_unset_deferred_now(handler->mapping, &unset_list);
hashtable_free(handler->private->mapping);
free(handler->private);
}
}

View file

@ -30,7 +30,6 @@ typedef enum ResourceFlags {
RESF_PERMANENT = 2,
RESF_PRELOAD = 4,
RESF_UNSAFE = 8,
RESF_FAILED = 16,
} ResourceFlags;
#define RESF_DEFAULT 0
@ -62,6 +61,8 @@ typedef void* (*ResourceEndLoadProc)(void *opaque, const char *path, uint flags)
// Unloads a resource, freeing all allocated to it memory.
typedef void (*ResourceUnloadProc)(void *res);
typedef struct ResourceHandlerPrivate ResourceHandlerPrivate;
typedef struct ResourceHandler {
ResourceType type;
@ -77,8 +78,7 @@ typedef struct ResourceHandler {
ResourceUnloadProc unload;
} procs;
Hashtable *mapping;
Hashtable *async_load_data;
ResourceHandlerPrivate *private;
} ResourceHandler;
typedef struct Resource {
@ -93,10 +93,9 @@ void free_resources(bool all);
Resource* get_resource(ResourceType type, const char *name, ResourceFlags flags);
void* get_resource_data(ResourceType type, const char *name, ResourceFlags flags);
Resource* insert_resource(ResourceType type, const char *name, void *data, ResourceFlags flags, const char *source);
void preload_resource(ResourceType type, const char *name, ResourceFlags flags);
void preload_resources(ResourceType type, ResourceFlags flags, const char *firstname, ...) attr_sentinel;
Hashtable* get_resource_table(ResourceType type);
void* resource_for_each(ResourceType type, void* (*callback)(const char *name, Resource *res, void *arg), void *arg);
void resource_util_strip_ext(char *path);
char* resource_util_basename(const char *prefix, const char *path);

451
src/taskmanager.c Normal file
View file

@ -0,0 +1,451 @@
/*
* This software is licensed under the terms of the MIT-License
* See COPYING for further information.
* ---
* Copyright (c) 2011-2018, Lukas Weber <laochailan@web.de>.
* Copyright (c) 2012-2018, Andrei Alexeyev <akari@alienslab.net>.
*/
#include "taisei.h"
#include "taskmanager.h"
#include "list.h"
#include "util.h"
struct TaskManager {
Task *queue;
SDL_mutex *mutex;
SDL_cond *cond;
uint numthreads;
uint running : 1;
uint aborted : 1;
SDL_atomic_t numtasks;
SDL_ThreadPriority thread_prio;
SDL_Thread *threads[];
};
struct Task {
LIST_INTERFACE(Task);
task_func_t callback;
task_free_func_t userdata_free_callback;
void *userdata;
int prio;
SDL_mutex *mutex;
SDL_cond *cond;
TaskStatus status;
void *result;
uint disowned : 1;
uint in_queue : 1;
};
static TaskManager *g_taskmgr;
static void taskmgr_free(TaskManager *mgr) {
log_debug("%08lx freeing task manager %p", SDL_ThreadID(), (void*)mgr);
if(mgr->mutex != NULL) {
SDL_DestroyMutex(mgr->mutex);
}
if(mgr->cond != NULL) {
SDL_DestroyCond(mgr->cond);
}
free(mgr);
}
static void task_free(Task *task) {
assert(!task->in_queue);
assert(task->disowned);
log_debug("%08lx freeing task %p", SDL_ThreadID(), (void*)task);
if(task->userdata_free_callback != NULL) {
task->userdata_free_callback(task->userdata);
}
if(task->mutex != NULL) {
SDL_DestroyMutex(task->mutex);
}
if(task->cond != NULL) {
SDL_DestroyCond(task->cond);
}
free(task);
}
static int taskmgr_thread(void *arg) {
TaskManager *mgr = arg;
attr_unused SDL_threadID tid = SDL_ThreadID();
log_debug("%08lx stage 1", tid);
if(SDL_SetThreadPriority(mgr->thread_prio) < 0) {
log_sdl_error("SDL_SetThreadPriority");
}
bool running;
bool aborted;
do {
SDL_LockMutex(mgr->mutex);
running = mgr->running;
aborted = mgr->aborted;
if(!running && !aborted) {
SDL_CondWait(mgr->cond, mgr->mutex);
}
SDL_UnlockMutex(mgr->mutex);
} while(!running && !aborted);
log_debug("%08lx stage 2", tid);
while(running && !aborted) {
SDL_LockMutex(mgr->mutex);
Task *task = list_pop(&mgr->queue);
running = mgr->running;
aborted = mgr->aborted;
if(running && task == NULL && !aborted) {
log_debug("%08lx sleep: %i %i %p", tid, running, aborted, (void*)task);
SDL_CondWait(mgr->cond, mgr->mutex);
log_debug("%08lx wake", tid);
}
SDL_UnlockMutex(mgr->mutex);
if(task != NULL) {
log_debug("%08lx taking task %p", tid, (void*)task);
SDL_LockMutex(task->mutex);
bool task_disowned = task->disowned;
if(aborted && task->status == TASK_PENDING) {
task->status = TASK_CANCELLED;
}
if(task->status == TASK_PENDING) {
log_debug("%08lx task %p running", tid, (void*)task);
task->status = TASK_RUNNING;
SDL_UnlockMutex(task->mutex);
task->result = task->callback(task->userdata);
SDL_LockMutex(task->mutex);
assert(task->in_queue);
task->in_queue = false;
(void)SDL_AtomicDecRef(&mgr->numtasks);
log_debug("%08lx task %p done", tid, (void*)task);
if((task_disowned = task->disowned)) {
SDL_UnlockMutex(task->mutex);
task_free(task);
} else {
task->status = TASK_FINISHED;
SDL_CondBroadcast(task->cond);
SDL_UnlockMutex(task->mutex);
}
} else if(task->status == TASK_CANCELLED) {
assert(task->in_queue);
task->in_queue = false;
(void)SDL_AtomicDecRef(&mgr->numtasks);
SDL_UnlockMutex(task->mutex);
log_debug("%08lx task %p is cancelled", tid, (void*)task);
if(task_disowned) {
task_free(task);
}
} else {
UNREACHABLE;
}
}
}
log_debug("%08lx thread exiting", tid);
return 0;
}
TaskManager* taskmgr_create(uint numthreads, SDL_ThreadPriority prio, const char *name) {
int numcores = SDL_GetCPUCount();
uint maxthreads = numcores * 8;
if(numcores < 1) {
log_warn("SDL_GetCPUCount() returned %i, assuming 1", numcores);
numcores = 1;
}
if(numthreads == 0) {
numthreads = numcores * 4;
} else if(numthreads > maxthreads) {
log_warn("Number of threads capped to %i (%i requested)", maxthreads, numthreads);
numthreads = maxthreads;
}
TaskManager *mgr = calloc(1, sizeof(TaskManager) + numthreads * sizeof(SDL_Thread*));
if(!(mgr->mutex = SDL_CreateMutex())) {
log_sdl_error("SDL_CreateMutex");
goto fail;
}
if(!(mgr->cond = SDL_CreateCond())) {
log_sdl_error("SDL_CreateCond");
goto fail;
}
mgr->numthreads = numthreads;
mgr->thread_prio = prio;
for(uint i = 0; i < numthreads; ++i) {
int digits = i ? log10(i) + 1 : 0;
static const char *const prefix = "taskmgr";
char threadname[sizeof(prefix) + strlen(name) + digits + 2];
snprintf(threadname, sizeof(threadname), "%s:%s/%i", prefix, name, i);
if(!(mgr->threads[i] = SDL_CreateThread(taskmgr_thread, threadname, mgr))) {
log_sdl_error("SDL_CreateThread");
for(uint j = 0; j < i; ++j) {
SDL_DetachThread(mgr->threads[j]);
mgr->threads[j] = NULL;
}
SDL_LockMutex(mgr->mutex);
mgr->aborted = true;
SDL_CondBroadcast(mgr->cond);
SDL_UnlockMutex(mgr->mutex);
goto fail;
}
}
SDL_LockMutex(mgr->mutex);
mgr->running = true;
SDL_CondBroadcast(mgr->cond);
SDL_UnlockMutex(mgr->mutex);
log_debug(
"Created task manager %s (%p) with %u threads at priority %i",
name,
(void*)mgr,
mgr->numthreads,
prio
);
return mgr;
fail:
taskmgr_free(mgr);
return NULL;
}
static int task_prio_func(List *ltask) {
return ((Task*)ltask)->prio;
}
Task* taskmgr_submit(TaskManager *mgr, TaskParams params) {
assert(params.callback != NULL);
Task *task = calloc(1, sizeof(Task));
task->callback = params.callback;
task->userdata_free_callback = params.userdata_free_callback;
task->userdata = params.userdata;
task->prio = params.prio;
task->status = TASK_PENDING;
if(!(task->mutex = SDL_CreateMutex())) {
log_sdl_error("SDL_CreateMutex");
goto fail;
}
if(!(task->cond = SDL_CreateCond())) {
log_sdl_error("SDL_CreateCond");
goto fail;
}
SDL_LockMutex(mgr->mutex);
if(params.topmost) {
list_insert_at_priority_head(&mgr->queue, task, task->prio, task_prio_func);
} else {
list_insert_at_priority_tail(&mgr->queue, task, task->prio, task_prio_func);
}
task->in_queue = true;
SDL_AtomicIncRef(&mgr->numtasks);
SDL_CondSignal(mgr->cond);
SDL_UnlockMutex(mgr->mutex);
return task;
fail:
task_free(task);
return NULL;
}
uint taskmgr_remaining(TaskManager *mgr) {
return SDL_AtomicGet(&mgr->numtasks);
}
static void taskmgr_finalize_and_wait(TaskManager *mgr, bool abort) {
log_debug(
"%08lx [%p] waiting for %u tasks (abort = %i)",
SDL_ThreadID(),
(void*)mgr,
taskmgr_remaining(mgr),
abort
);
assert(mgr->running);
assert(!mgr->aborted);
SDL_LockMutex(mgr->mutex);
mgr->running = false;
mgr->aborted = abort;
SDL_CondBroadcast(mgr->cond);
SDL_UnlockMutex(mgr->mutex);
for(uint i = 0; i < mgr->numthreads; ++i) {
SDL_WaitThread(mgr->threads[i], NULL);
}
taskmgr_free(mgr);
}
void taskmgr_finish(TaskManager *mgr) {
taskmgr_finalize_and_wait(mgr, false);
}
void taskmgr_abort(TaskManager *mgr) {
taskmgr_finalize_and_wait(mgr, true);
}
TaskStatus task_status(Task *task) {
TaskStatus result = TASK_INVALID;
if(task != NULL) {
SDL_LockMutex(task->mutex);
result = task->status;
SDL_UnlockMutex(task->mutex);
}
return result;
}
bool task_wait(Task *task, void **result) {
bool success = false;
if(task == NULL) {
log_debug("%08lx task was null", SDL_ThreadID());
return success;
}
void *_result = NULL;
SDL_LockMutex(task->mutex);
log_debug("%08lx %p %i", SDL_ThreadID(), (void*)task, task->status);
if(task->status == TASK_CANCELLED) {
success = false;
} else if(task->status == TASK_FINISHED) {
success = true;
_result = task->result;
} else {
log_debug("%08lx %p sleep", SDL_ThreadID(), (void*)task);
SDL_CondWait(task->cond, task->mutex);
_result = task->result;
success = (task->status == TASK_FINISHED);
log_debug("%08lx %p wake %i %i", SDL_ThreadID(), (void*)task, task->status, success);
}
SDL_UnlockMutex(task->mutex);
if(success && result != NULL) {
*result = _result;
}
return success;
}
bool task_cancel(Task *task) {
bool success = false;
if(task == NULL) {
return success;
}
SDL_LockMutex(task->mutex);
if(task->status == TASK_PENDING) {
task->status = TASK_CANCELLED;
success = true;
}
SDL_UnlockMutex(task->mutex);
return success;
}
bool task_detach(Task *task) {
bool success = false;
bool task_in_queue;
if(task == NULL) {
return success;
}
SDL_LockMutex(task->mutex);
assert(!task->disowned);
task->disowned = true;
task_in_queue = task->in_queue;
success = true;
SDL_UnlockMutex(task->mutex);
if(!task_in_queue) {
task_free(task);
}
return success;
}
bool task_finish(Task *task, void **result) {
bool success = task_wait(task, result);
task_detach(task);
return success;
}
bool task_abort(Task *task) {
bool success = task_cancel(task);
task_detach(task);
return success;
}
void taskmgr_global_init(void) {
assert(g_taskmgr == NULL);
g_taskmgr = taskmgr_create(0, SDL_THREAD_PRIORITY_LOW, "global");
}
void taskmgr_global_shutdown(void) {
if(g_taskmgr != NULL) {
taskmgr_finish(g_taskmgr);
g_taskmgr = NULL;
}
}
Task* taskmgr_global_submit(TaskParams params) {
if(g_taskmgr == NULL) {
Task *t = calloc(1, sizeof(Task));
t->callback = params.callback;
t->userdata = params.userdata;
t->userdata_free_callback = params.userdata_free_callback;
t->result = params.callback(params.userdata);
return t;
}
return taskmgr_submit(g_taskmgr, params);
}

177
src/taskmanager.h Normal file
View file