diff --git a/src/concurrent/msgq.c b/src/concurrent/msgq.c index 766ff51..55de30f 100644 --- a/src/concurrent/msgq.c +++ b/src/concurrent/msgq.c @@ -1,24 +1,33 @@ #include "msgq.h" +#include "../util.h" static void msgq_cleanup(struct msgq* mq); struct msgq* msgq_new(struct rgn* init_rgn) { struct msgq* mq = ralloc_typed(init_rgn, struct msgq, 1); + rgn_pin(init_rgn, mq, (void*) msgq_cleanup); + rgn_init(&mq->rgn, SMALL); mq->head = mq->tail = NULL; - rgn_pin(init_rgn, mq, (void*) msgq_cleanup); + pthread_mutex_init(&mq->mx, NULL); + pthread_cond_init(&mq->cv, NULL); return mq; } static void msgq_cleanup(struct msgq* mq) { - msgq_begin_recv(mq); + ASSERT(pthread_mutex_trylock(&mq->mx) == 0, + "Trying to clean up msgq when mutex is locked!"); + + pthread_cond_destroy(&mq->cv); + pthread_mutex_destroy(&mq->mx); rgn_cleanup(&mq->rgn); } void* msgq_begin_send(struct msgq* mq, int type, size_t payload_size) { + pthread_mutex_lock(&mq->mx); struct msgq_msg* msg = ralloc(&mq->rgn, sizeof(struct msgq_msg) + payload_size); msg->next = NULL; msg->type = type; @@ -33,10 +42,13 @@ void* msgq_begin_send(struct msgq* mq, int type, size_t payload_size) void msgq_end_send(struct msgq* mq) { + pthread_cond_broadcast(&mq->cv); + pthread_mutex_unlock(&mq->mx); } void msgq_begin_recv(struct msgq* mq) { + pthread_mutex_lock(&mq->mx); } void* msgq_recv1(struct msgq* mq, int* out_type) @@ -59,4 +71,5 @@ void msgq_end_recv(struct msgq* mq) { mq->head = mq->tail = NULL; rgn_clear(&mq->rgn); + pthread_mutex_unlock(&mq->mx); } diff --git a/src/concurrent/msgq.h b/src/concurrent/msgq.h index 633f8e2..6cc4ea0 100644 --- a/src/concurrent/msgq.h +++ b/src/concurrent/msgq.h @@ -1,6 +1,7 @@ #pragma once #include "../util/region.h" +#include struct msgq_msg; @@ -8,6 +9,8 @@ struct msgq { struct rgn rgn; struct msgq_msg* head; struct msgq_msg* tail; + pthread_mutex_t mx; + pthread_cond_t cv; }; struct msgq_msg {