INVOKE_TASK_DELAYED; CANCEL_TASK_WHEN; event robustness fixes

This commit is contained in:
Andrei Alexeyev 2019-07-22 22:07:25 +03:00
parent 156d8147b1
commit 5b0b676c0e
No known key found for this signature in database
GPG key ID: 363707CD4C7FE8A4
3 changed files with 166 additions and 35 deletions

View file

@ -32,7 +32,7 @@
struct CoTask {
LIST_INTERFACE(CoTask);
List event_subscriber_chain;
// List event_subscriber_chain;
koishi_coroutine_t ko;
BoxedEntity bound_ent;
@ -40,6 +40,8 @@ struct CoTask {
CoTaskFunc func;
void *arg;
} finalizer;
uint32_t unique_id;
};
struct CoSched {
@ -52,6 +54,23 @@ static size_t num_tasks_in_use;
CoSched *_cosched_global;
BoxedTask cotask_box(CoTask *task) {
return (BoxedTask) {
.ptr = (uintptr_t)task,
.unique_id = task->unique_id,
};
}
CoTask *cotask_unbox(BoxedTask box) {
CoTask *task = (void*)box.ptr;
if(task->unique_id == box.unique_id) {
return task;
}
return NULL;
}
CoTask *cotask_new(CoTaskFunc func) {
CoTask *task;
++num_tasks_in_use;
@ -74,10 +93,15 @@ CoTask *cotask_new(CoTaskFunc func) {
);
}
static uint32_t unique_counter = 0;
task->unique_id = ++unique_counter;
assert(unique_counter != 0);
return task;
}
void cotask_free(CoTask *task) {
task->unique_id = 0;
memset(&task->bound_ent, 0, sizeof(task->bound_ent));
memset(&task->finalizer, 0, sizeof(task->finalizer));
alist_push(&task_pool, task);
@ -95,6 +119,16 @@ CoTask *cotask_active(void) {
return CASTPTR_ASSUME_ALIGNED((char*)koishi_active() - offsetof(CoTask, ko), CoTask);
}
bool cotask_cancel(CoTask *task) {
if(cotask_status(task) != CO_STATUS_DEAD) {
koishi_kill(&task->ko);
assert(cotask_status(task) == CO_STATUS_DEAD);
return true;
}
return false;
}
void *cotask_resume(CoTask *task, void *arg) {
if(task->bound_ent.ent && !ent_unbox(task->bound_ent)) {
koishi_kill(&task->ko);
@ -124,10 +158,26 @@ bool cotask_wait_event(CoEvent *evt, void *arg) {
struct {
uint32_t unique_id;
uint32_t num_signaled;
uint16_t num_signaled;
} snapshot = { evt->unique_id, evt->num_signaled };
alist_append(&evt->subscribers, &cotask_active()->event_subscriber_chain);
evt->num_subscribers++;
assert(evt->num_subscribers != 0);
if(evt->num_subscribers >= evt->num_subscribers_allocated) {
if(!evt->num_subscribers_allocated) {
evt->num_subscribers_allocated = 4;
} else {
evt->num_subscribers_allocated *= 2;
assert(evt->num_subscribers_allocated != 0);
}
evt->subscribers = realloc(evt->subscribers, sizeof(*evt->subscribers) * evt->num_subscribers_allocated);
}
evt->subscribers[evt->num_subscribers - 1] = cotask_box(cotask_active());
// alist_append(&evt->subscribers, &cotask_active()->event_subscriber_chain);
while(true) {
EVT_DEBUG("[%p]", (void*)evt);
@ -171,19 +221,46 @@ void cotask_set_finalizer(CoTask *task, CoTaskFunc finalizer, void *arg) {
void coevent_init(CoEvent *evt) {
static uint32_t g_uid;
evt->unique_id = ++g_uid;
evt->num_signaled = 0;
*evt = (CoEvent) { .unique_id = ++g_uid };
assert(g_uid != 0);
}
static void coevent_wake_subscribers(CoEvent *evt) {
for(List *node = evt->subscribers.first; node; node = node->next) {
#if 0
List *subs_head = evt->subscribers.first;
evt->subscribers.first = evt->subscribers.last = NULL;
// NOTE: might need to copy the list into an array before iterating, to make sure nothing can corrupt the chain...
for(List *node = subs_head, *next; node; node = next) {
next = node->next;
CoTask *task = CASTPTR_ASSUME_ALIGNED((char*)node - offsetof(CoTask, event_subscriber_chain), CoTask);
TASK_DEBUG("Resume CoEvent{%p} subscriber %p", (void*)evt, (void*)task);
cotask_resume(task, NULL);
if(cotask_status(task) != CO_STATUS_DEAD) {
TASK_DEBUG("Resume CoEvent{%p} subscriber %p", (void*)evt, (void*)task);
cotask_resume(task, NULL);
}
}
#endif
if(!evt->num_subscribers) {
return;
}
evt->subscribers.first = evt->subscribers.last = NULL;
assert(evt->num_subscribers);
BoxedTask subs_snapshot[evt->num_subscribers];
memcpy(subs_snapshot, evt->subscribers, sizeof(subs_snapshot));
evt->num_subscribers = 0;
for(int i = 0; i < ARRAY_SIZE(subs_snapshot); ++i) {
CoTask *task = cotask_unbox(subs_snapshot[i]);
if(task && cotask_status(task) != CO_STATUS_DEAD) {
EVT_DEBUG("Resume CoEvent{%p} subscriber %p", (void*)evt, (void*)task);
cotask_resume(task, NULL);
}
}
}
void coevent_signal(CoEvent *evt) {
@ -201,6 +278,9 @@ void coevent_signal_once(CoEvent *evt) {
void coevent_cancel(CoEvent* evt) {
evt->num_signaled = evt->unique_id = 0;
coevent_wake_subscribers(evt);
evt->num_subscribers_allocated = 0;
free(evt->subscribers);
evt->subscribers = NULL;
}
CoSched *cosched_new(void) {
@ -208,18 +288,12 @@ CoSched *cosched_new(void) {
return sched;
}
void *cosched_new_task(CoSched *sched, CoTaskFunc func, void *arg) {
CoTask *cosched_new_task(CoSched *sched, CoTaskFunc func, void *arg) {
CoTask *task = cotask_new(func);
void *ret = cotask_resume(task, arg);
if(cotask_status(task) == CO_STATUS_DEAD) {
cotask_free(task);
} else {
assert(cotask_status(task) == CO_STATUS_SUSPENDED);
alist_append(&sched->pending_tasks, task);
}
return ret;
cotask_resume(task, arg);
assert(cotask_status(task) == CO_STATUS_SUSPENDED || cotask_status(task) == CO_STATUS_DEAD);
alist_append(&sched->pending_tasks, task);
return task;
}
uint cosched_run_tasks(CoSched *sched) {
@ -267,3 +341,11 @@ void coroutines_shutdown(void) {
free(task);
}
}
DEFINE_EXTERN_TASK(_cancel_task_helper) {
CoTask *task = cotask_unbox(ARGS.task);
if(task) {
cotask_cancel(task);
}
}

View file

@ -27,10 +27,20 @@ typedef enum CoStatus {
CO_STATUS_DEAD = KOISHI_DEAD,
} CoStatus;
typedef struct CoEvent {
ListAnchor subscribers;
typedef struct BoxedTask {
uintptr_t ptr;
uint32_t unique_id;
uint32_t num_signaled;
} BoxedTask;
typedef struct CoEvent {
// ListAnchor subscribers;
// FIXME: Is there a better way than a dynamic array?
// An intrusive linked list just isn't robust enough.
BoxedTask *subscribers;
uint32_t unique_id;
uint16_t num_signaled;
uint8_t num_subscribers;
uint8_t num_subscribers_allocated;
} CoEvent;
void coroutines_init(void);
@ -38,6 +48,7 @@ void coroutines_shutdown(void);
CoTask *cotask_new(CoTaskFunc func);
void cotask_free(CoTask *task);
bool cotask_cancel(CoTask *task);
void *cotask_resume(CoTask *task, void *arg);
void *cotask_yield(void *arg);
bool cotask_wait_event(CoEvent *evt, void *arg);
@ -46,13 +57,16 @@ CoTask *cotask_active(void);
void cotask_bind_to_entity(CoTask *task, EntityInterface *ent);
void cotask_set_finalizer(CoTask *task, CoTaskFunc finalizer, void *arg);
BoxedTask cotask_box(CoTask *task);
CoTask *cotask_unbox(BoxedTask box);
void coevent_init(CoEvent *evt);
void coevent_signal(CoEvent *evt);
void coevent_signal_once(CoEvent *evt);
void coevent_cancel(CoEvent *evt);
CoSched *cosched_new(void);
void *cosched_new_task(CoSched *sched, CoTaskFunc func, void *arg); // creates and runs the task, returns whatever it yields, schedules it for resume on cosched_run_tasks if it's still alive
CoTask *cosched_new_task(CoSched *sched, CoTaskFunc func, void *arg); // creates and runs the task, schedules it for resume on cosched_run_tasks if it's still alive
uint cosched_run_tasks(CoSched *sched); // returns number of tasks ran
void cosched_free(CoSched *sched);
@ -61,6 +75,9 @@ static inline attr_must_inline void cosched_set_invoke_target(CoSched *sched) {
#define TASK_ARGS_NAME(name) COARGS_##name
#define TASK_ARGS(name) struct TASK_ARGS_NAME(name)
#define TASK_ARGSDELAY_NAME(name) COARGSDELAY_##name
#define TASK_ARGSDELAY(name) struct TASK_ARGSDELAY_NAME(name)
#define TASK_ARGSCOND_NAME(name) COARGSCOND_##name
#define TASK_ARGSCOND(name) struct TASK_ARGSCOND_NAME(name)
@ -77,10 +94,14 @@ static inline attr_must_inline void cosched_set_invoke_target(CoSched *sched) {
linkage char COTASK_UNUSED_CHECK_##name; \
/* user-defined type of args struct */ \
struct TASK_ARGS_NAME(name) argstruct; \
/* type of internal args struct for INVOKE_TASK_DELAYED */ \
struct TASK_ARGSDELAY_NAME(name) { TASK_ARGS(name) real_args; int delay; }; \
/* type of internal args struct for INVOKE_TASK_WHEN */ \
struct TASK_ARGSCOND_NAME(name) { TASK_ARGS(name) real_args; CoEvent *event; }; \
/* task entry point for INVOKE_TASK */ \
attr_unused linkage void *COTASKTHUNK_##name(void *arg); \
/* task entry point for INVOKE_TASK_DELAYED */ \
attr_unused linkage void *COTASKTHUNKDELAY_##name(void *arg); \
/* task entry point for INVOKE_TASK_WHEN */ \
attr_unused linkage void *COTASKTHUNKCOND_##name(void *arg) /* require semicolon */ \
@ -96,6 +117,19 @@ static inline attr_must_inline void cosched_set_invoke_target(CoSched *sched) {
/* exit coroutine */ \
return NULL; \
} \
/* task entry point for INVOKE_TASK_DELAYED */ \
attr_unused linkage void *COTASKTHUNKDELAY_##name(void *arg) { \
/* copy args to our coroutine stack so that they're valid after caller returns */ \
TASK_ARGSDELAY(name) args_copy = *(TASK_ARGSDELAY(name)*)arg; \
/* wait out the delay */ \
WAIT(args_copy.delay); \
/* call prologue */ \
COTASKPROLOGUE_##name(&args_copy.real_args); \
/* call body */ \
COTASK_##name(&args_copy.real_args); \
/* exit coroutine */ \
return NULL; \
} \
/* task entry point for INVOKE_TASK_WHEN */ \
attr_unused linkage void *COTASKTHUNKCOND_##name(void *arg) { \
/* copy args to our coroutine stack so that they're valid after caller returns */ \
@ -189,17 +223,32 @@ static inline attr_must_inline void cosched_set_invoke_target(CoSched *sched) {
/* begin finalizer body definition */ \
static void COTASKFINALIZER_##name(TASK_ARGS(name) *_cotask_args)
#define INVOKE_TASK(name, ...) do { \
(void)COTASK_UNUSED_CHECK_##name; \
TASK_ARGS(name) _coargs = { __VA_ARGS__ }; \
cosched_new_task(_cosched_global, COTASKTHUNK_##name, &_coargs); \
} while(0)
#define INVOKE_TASK(name, ...) ( \
(void)COTASK_UNUSED_CHECK_##name, \
cosched_new_task(_cosched_global, COTASKTHUNK_##name, \
&(TASK_ARGS(name)) { __VA_ARGS__ } \
) \
)
#define INVOKE_TASK_WHEN(_event, name, ...) do { \
(void)COTASK_UNUSED_CHECK_##name; \
TASK_ARGSCOND(name) _coargs = { .real_args = { __VA_ARGS__ }, .event = (_event) }; \
cosched_new_task(_cosched_global, COTASKTHUNKCOND_##name, &_coargs); \
} while(0)
#define INVOKE_TASK_WHEN(_event, name, ...) ( \
(void)COTASK_UNUSED_CHECK_##name, \
cosched_new_task(_cosched_global, COTASKTHUNKCOND_##name, \
&(TASK_ARGSCOND(name)) { .real_args = { __VA_ARGS__ }, .event = (_event) } \
) \
)
#define INVOKE_TASK_DELAYED(_delay, name, ...) ( \
(void)COTASK_UNUSED_CHECK_##name, \
cosched_new_task(_cosched_global, COTASKTHUNKDELAY_##name, \
&(TASK_ARGSDELAY(name)) { .real_args = { __VA_ARGS__ }, .delay = (_delay) } \
) \
)
DECLARE_EXTERN_TASK(_cancel_task_helper, { BoxedTask task; });
#define CANCEL_TASK_WHEN(_event, _task) INVOKE_TASK_WHEN(_event, _cancel_task_helper, _task)
#define THIS_TASK cotask_box(cotask_active())
#define YIELD cotask_yield(NULL)
#define WAIT(delay) do { int _delay = (delay); while(_delay-- > 0) YIELD; } while(0)

View file

@ -150,7 +150,7 @@ TASK(stage_main, { int ignored; }) {
log_debug("test 2! %i", global.timer);
for(;;) {
INVOKE_TASK(test_enemy, 9000, CMPLX(VIEWPORT_W, VIEWPORT_H) * 0.5, 3*I);
INVOKE_TASK_DELAYED(60, test_enemy, 9000, CMPLX(VIEWPORT_W, VIEWPORT_H) * 0.5, 3*I);
WAIT(1000);
}
}