[async] use pthread library in 'msgq'

This commit is contained in:
Milo Turner 2020-02-19 20:15:56 -05:00
parent 0d68825b70
commit bb8eda00e4
2 changed files with 18 additions and 2 deletions

View File

@ -1,24 +1,33 @@
#include "msgq.h" #include "msgq.h"
#include "../util.h"
static void msgq_cleanup(struct msgq* mq); static void msgq_cleanup(struct msgq* mq);
struct msgq* msgq_new(struct rgn* init_rgn) struct msgq* msgq_new(struct rgn* init_rgn)
{ {
struct msgq* mq = ralloc_typed(init_rgn, struct msgq, 1); struct msgq* mq = ralloc_typed(init_rgn, struct msgq, 1);
rgn_pin(init_rgn, mq, (void*) msgq_cleanup);
rgn_init(&mq->rgn, SMALL); rgn_init(&mq->rgn, SMALL);
mq->head = mq->tail = NULL; mq->head = mq->tail = NULL;
rgn_pin(init_rgn, mq, (void*) msgq_cleanup); pthread_mutex_init(&mq->mx, NULL);
pthread_cond_init(&mq->cv, NULL);
return mq; return mq;
} }
static void msgq_cleanup(struct msgq* mq) static void msgq_cleanup(struct msgq* mq)
{ {
msgq_begin_recv(mq); ASSERT(pthread_mutex_trylock(&mq->mx) == 0,
"Trying to clean up msgq when mutex is locked!");
pthread_cond_destroy(&mq->cv);
pthread_mutex_destroy(&mq->mx);
rgn_cleanup(&mq->rgn); rgn_cleanup(&mq->rgn);
} }
void* msgq_begin_send(struct msgq* mq, int type, size_t payload_size) void* msgq_begin_send(struct msgq* mq, int type, size_t payload_size)
{ {
pthread_mutex_lock(&mq->mx);
struct msgq_msg* msg = ralloc(&mq->rgn, sizeof(struct msgq_msg) + payload_size); struct msgq_msg* msg = ralloc(&mq->rgn, sizeof(struct msgq_msg) + payload_size);
msg->next = NULL; msg->next = NULL;
msg->type = type; msg->type = type;
@ -33,10 +42,13 @@ void* msgq_begin_send(struct msgq* mq, int type, size_t payload_size)
void msgq_end_send(struct msgq* mq) void msgq_end_send(struct msgq* mq)
{ {
pthread_cond_broadcast(&mq->cv);
pthread_mutex_unlock(&mq->mx);
} }
void msgq_begin_recv(struct msgq* mq) void msgq_begin_recv(struct msgq* mq)
{ {
pthread_mutex_lock(&mq->mx);
} }
void* msgq_recv1(struct msgq* mq, int* out_type) void* msgq_recv1(struct msgq* mq, int* out_type)
@ -59,4 +71,5 @@ void msgq_end_recv(struct msgq* mq)
{ {
mq->head = mq->tail = NULL; mq->head = mq->tail = NULL;
rgn_clear(&mq->rgn); rgn_clear(&mq->rgn);
pthread_mutex_unlock(&mq->mx);
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "../util/region.h" #include "../util/region.h"
#include <pthread.h>
struct msgq_msg; struct msgq_msg;
@ -8,6 +9,8 @@ struct msgq {
struct rgn rgn; struct rgn rgn;
struct msgq_msg* head; struct msgq_msg* head;
struct msgq_msg* tail; struct msgq_msg* tail;
pthread_mutex_t mx;
pthread_cond_t cv;
}; };
struct msgq_msg { struct msgq_msg {