[async] pub/sub interface

This commit is contained in:
Milo Turner 2020-02-25 14:46:35 -05:00
parent 1bdb46d305
commit 3ba13abab5
2 changed files with 223 additions and 0 deletions

171
src/concurrent/pubsub.c Normal file
View File

@ -0,0 +1,171 @@
#include "pubsub.h"
#include "../util.h"
#include "../util/region.h"
static void pubsub_cleanup(struct pubsub*);
static void sub_cleanup(struct sub*);
static void msg_init(struct pubsub_msg* msg,
struct rgn* rgn, int type, size_t rc);
static void msg_release(struct pubsub_msg* msg);
struct pubsub* pubsub_new(struct rgn* init_rgn)
{
struct pubsub* src = ralloc_typed(init_rgn, struct pubsub, 1);
src->subs = NULL;
src->msg_rgn = NULL;
pthread_mutex_init(&src->mx, NULL);
rgn_pin(init_rgn, src, (void*) pubsub_cleanup);
return src;
}
struct sub* sub_new(struct rgn* rgn, struct pubsub* src)
{
struct sub* sub = ralloc_typed(rgn, struct sub, 1);
sub->src = src;
sub->msgs = NULL;
pthread_mutex_init(&sub->msgs_mx, NULL);
pthread_cond_init(&sub->msgs_cv, NULL);
// attach to doubly-linked list `src->subs`
pthread_mutex_lock(&src->mx);
struct sub* sub0 = src->subs;
if (sub0 != NULL) {
sub0->prev = src->subs = sub;
}
sub->next = sub0;
sub->prev = NULL;
src->subs = sub;
pthread_mutex_unlock(&src->mx);
rgn_pin(rgn, sub, (void*) sub_cleanup);
return sub;
}
static void pubsub_cleanup(struct pubsub* src)
{
ASSERT(pthread_mutex_trylock(&src->mx) == 0,
"Trying to clean up pubsub when mutex is locked!");
ASSERT(src->subs == NULL,
"Trying to clean up pubsub when there are still subscribers!");
pthread_mutex_destroy(&src->mx);
}
static void sub_cleanup(struct sub* sub)
{
ASSERT(pthread_mutex_trylock(&sub->msgs_mx) == 0,
"Trying to clean up sub when msgs mutex is locked!");
// remove from DLL
pthread_mutex_lock(&sub->src->mx);
struct sub* subp = sub->prev;
struct sub* subn = sub->next;
if (subp != NULL) {
subp->next = subn;
} else {
sub->src->subs = subn;
}
if (subn != NULL) {
subn->prev = subp;
}
pthread_mutex_unlock(&sub->src->mx);
// free un-received messages
for (struct sub_msg_list* msgs = sub->msgs; msgs != NULL; msgs = msgs->next) {
msg_release(msgs->msg);
}
pthread_cond_destroy(&sub->msgs_cv);
pthread_mutex_destroy(&sub->msgs_mx);
}
void* pubsub_begin_pub(struct pubsub* src, int type, size_t payload_size)
{
pthread_mutex_lock(&src->mx);
struct rgn* msg_rgn = rgn_bootstrap_new(SMALL);
struct pubsub_msg* msg =
ralloc(msg_rgn, sizeof(struct pubsub_msg) + payload_size);
size_t rc = 0;
for (struct sub* sub = src->subs; sub != NULL; sub = sub->next) {
pthread_mutex_lock(&sub->msgs_mx);
struct sub_msg_list* l =
ralloc_typed(msg_rgn, struct sub_msg_list, 1);
l->next = sub->msgs;
l->msg = msg;
sub->msgs = l;
rc++;
}
msg_init(msg, msg_rgn, type, rc);
src->msg_rgn = msg_rgn;
printf("[pubsub] sending message to %zu subscribers.\n", rc);
return msg->payload;
}
void pubsub_end_pub(struct pubsub* src)
{
for (struct sub* sub = src->subs; sub != NULL; sub = sub->next) {
pthread_cond_signal(&sub->msgs_cv);
pthread_mutex_unlock(&sub->msgs_mx);
}
src->msg_rgn = NULL;
pthread_mutex_unlock(&src->mx);
}
static void msg_init(
struct pubsub_msg* msg,
struct rgn* rgn, int type, size_t rc)
{
msg->rgn = rgn;
msg->type = type;
atomic_init(&msg->rc, rc);
}
static void msg_release(struct pubsub_msg* msg)
{
size_t rc = atomic_fetch_sub(&msg->rc, 1);
if (rc == 1) {
printf("[pubsub] destroying message object %p\n", msg);
rgn_cleanup(msg->rgn);
}
}
void sub_begin_recv(struct sub* sub)
{
pthread_mutex_lock(&sub->msgs_mx);
sub->msg_to_rel = NULL;
}
void sub_begin_recv_and_wait(struct sub* sub)
{
sub_begin_recv(sub);
if (sub->msgs == NULL) {
pthread_cond_wait(&sub->msgs_cv, &sub->msgs_mx);
}
}
void* sub_recv1(struct sub* sub, int* out_type)
{
if (sub->msg_to_rel != NULL) {
msg_release(sub->msg_to_rel);
sub->msg_to_rel = NULL;
}
if (sub->msgs == NULL) {
return NULL;
} else {
struct pubsub_msg* msg = sub->msgs->msg;
sub->msgs = sub->msgs->next;
sub->msg_to_rel = msg;
if (out_type != NULL) {
*out_type = msg->type;
}
printf("[pubsub] received message object %p\n", msg);
return msg->payload;
}
}
void sub_end_recv(struct sub* sub)
{
if (sub->msg_to_rel != NULL) {
msg_release(sub->msg_to_rel);
}
pthread_mutex_unlock(&sub->msgs_mx);
}

52
src/concurrent/pubsub.h Normal file
View File

@ -0,0 +1,52 @@
#pragma once
#include <stddef.h>
#include <pthread.h>
#include <stdatomic.h>
struct rgn;
struct sub;
struct pubsub_msg;
struct sub_msg_list;
struct pubsub {
struct sub* subs;
struct rgn* msg_rgn;
pthread_mutex_t mx;
};
struct pubsub_msg {
struct rgn* rgn;
volatile atomic_size_t rc;
int type;
char payload[0];
};
struct sub {
struct sub* next;
struct sub* prev;
struct pubsub* src;
struct sub_msg_list* msgs;
struct pubsub_msg* msg_to_rel;
pthread_cond_t msgs_cv;
pthread_mutex_t msgs_mx;
};
struct sub_msg_list {
struct sub_msg_list* next;
struct pubsub_msg* msg;
};
struct pubsub* pubsub_new(struct rgn* init_rgn);
struct sub* sub_new(struct rgn* rgn, struct pubsub* src);
void* pubsub_begin_pub(struct pubsub* src, int type, size_t payload_size);
void pubsub_end_pub(struct pubsub* src);
void sub_begin_recv(struct sub* sub);
void sub_begin_recv_and_wait(struct sub* sub);
void* sub_recv1(struct sub* sub, int* out_type);
void sub_end_recv(struct sub* sub);
#define pubsub_begin_pub_typed(_src, ty) \
((struct ty*) pubsub_begin_pub(_src, ty ## _TAG, sizeof(struct ty)))