From 3ba13abab5724efca1fda302d86f107b43c99b17 Mon Sep 17 00:00:00 2001 From: Milo Turner Date: Tue, 25 Feb 2020 14:46:35 -0500 Subject: [PATCH] [async] pub/sub interface --- src/concurrent/pubsub.c | 171 ++++++++++++++++++++++++++++++++++++++++ src/concurrent/pubsub.h | 52 ++++++++++++ 2 files changed, 223 insertions(+) create mode 100644 src/concurrent/pubsub.c create mode 100644 src/concurrent/pubsub.h diff --git a/src/concurrent/pubsub.c b/src/concurrent/pubsub.c new file mode 100644 index 0000000..c5d58b0 --- /dev/null +++ b/src/concurrent/pubsub.c @@ -0,0 +1,171 @@ +#include "pubsub.h" +#include "../util.h" +#include "../util/region.h" + +static void pubsub_cleanup(struct pubsub*); +static void sub_cleanup(struct sub*); +static void msg_init(struct pubsub_msg* msg, + struct rgn* rgn, int type, size_t rc); +static void msg_release(struct pubsub_msg* msg); + +struct pubsub* pubsub_new(struct rgn* init_rgn) +{ + struct pubsub* src = ralloc_typed(init_rgn, struct pubsub, 1); + src->subs = NULL; + src->msg_rgn = NULL; + pthread_mutex_init(&src->mx, NULL); + rgn_pin(init_rgn, src, (void*) pubsub_cleanup); + return src; +} + +struct sub* sub_new(struct rgn* rgn, struct pubsub* src) +{ + struct sub* sub = ralloc_typed(rgn, struct sub, 1); + sub->src = src; + sub->msgs = NULL; + pthread_mutex_init(&sub->msgs_mx, NULL); + pthread_cond_init(&sub->msgs_cv, NULL); + + // attach to doubly-linked list `src->subs` + pthread_mutex_lock(&src->mx); + struct sub* sub0 = src->subs; + if (sub0 != NULL) { + sub0->prev = src->subs = sub; + } + sub->next = sub0; + sub->prev = NULL; + src->subs = sub; + pthread_mutex_unlock(&src->mx); + + rgn_pin(rgn, sub, (void*) sub_cleanup); + return sub; +} + +static void pubsub_cleanup(struct pubsub* src) +{ + ASSERT(pthread_mutex_trylock(&src->mx) == 0, + "Trying to clean up pubsub when mutex is locked!"); + ASSERT(src->subs == NULL, + "Trying to clean up pubsub when there are still subscribers!"); + pthread_mutex_destroy(&src->mx); +} + +static void sub_cleanup(struct sub* sub) +{ + ASSERT(pthread_mutex_trylock(&sub->msgs_mx) == 0, + "Trying to clean up sub when msgs mutex is locked!"); + + // remove from DLL + pthread_mutex_lock(&sub->src->mx); + struct sub* subp = sub->prev; + struct sub* subn = sub->next; + if (subp != NULL) { + subp->next = subn; + } else { + sub->src->subs = subn; + } + if (subn != NULL) { + subn->prev = subp; + } + pthread_mutex_unlock(&sub->src->mx); + + // free un-received messages + for (struct sub_msg_list* msgs = sub->msgs; msgs != NULL; msgs = msgs->next) { + msg_release(msgs->msg); + } + + pthread_cond_destroy(&sub->msgs_cv); + pthread_mutex_destroy(&sub->msgs_mx); +} + +void* pubsub_begin_pub(struct pubsub* src, int type, size_t payload_size) +{ + pthread_mutex_lock(&src->mx); + struct rgn* msg_rgn = rgn_bootstrap_new(SMALL); + struct pubsub_msg* msg = + ralloc(msg_rgn, sizeof(struct pubsub_msg) + payload_size); + size_t rc = 0; + for (struct sub* sub = src->subs; sub != NULL; sub = sub->next) { + pthread_mutex_lock(&sub->msgs_mx); + struct sub_msg_list* l = + ralloc_typed(msg_rgn, struct sub_msg_list, 1); + l->next = sub->msgs; + l->msg = msg; + sub->msgs = l; + rc++; + } + msg_init(msg, msg_rgn, type, rc); + src->msg_rgn = msg_rgn; + printf("[pubsub] sending message to %zu subscribers.\n", rc); + return msg->payload; +} + +void pubsub_end_pub(struct pubsub* src) +{ + for (struct sub* sub = src->subs; sub != NULL; sub = sub->next) { + pthread_cond_signal(&sub->msgs_cv); + pthread_mutex_unlock(&sub->msgs_mx); + } + src->msg_rgn = NULL; + pthread_mutex_unlock(&src->mx); +} + +static void msg_init( + struct pubsub_msg* msg, + struct rgn* rgn, int type, size_t rc) +{ + msg->rgn = rgn; + msg->type = type; + atomic_init(&msg->rc, rc); +} + +static void msg_release(struct pubsub_msg* msg) +{ + size_t rc = atomic_fetch_sub(&msg->rc, 1); + if (rc == 1) { + printf("[pubsub] destroying message object %p\n", msg); + rgn_cleanup(msg->rgn); + } +} + +void sub_begin_recv(struct sub* sub) +{ + pthread_mutex_lock(&sub->msgs_mx); + sub->msg_to_rel = NULL; +} + +void sub_begin_recv_and_wait(struct sub* sub) +{ + sub_begin_recv(sub); + if (sub->msgs == NULL) { + pthread_cond_wait(&sub->msgs_cv, &sub->msgs_mx); + } +} + +void* sub_recv1(struct sub* sub, int* out_type) +{ + if (sub->msg_to_rel != NULL) { + msg_release(sub->msg_to_rel); + sub->msg_to_rel = NULL; + } + if (sub->msgs == NULL) { + return NULL; + } else { + struct pubsub_msg* msg = sub->msgs->msg; + sub->msgs = sub->msgs->next; + sub->msg_to_rel = msg; + if (out_type != NULL) { + *out_type = msg->type; + } + printf("[pubsub] received message object %p\n", msg); + return msg->payload; + } +} + +void sub_end_recv(struct sub* sub) +{ + if (sub->msg_to_rel != NULL) { + msg_release(sub->msg_to_rel); + } + pthread_mutex_unlock(&sub->msgs_mx); +} diff --git a/src/concurrent/pubsub.h b/src/concurrent/pubsub.h new file mode 100644 index 0000000..2c513ef --- /dev/null +++ b/src/concurrent/pubsub.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include +#include + +struct rgn; +struct sub; +struct pubsub_msg; +struct sub_msg_list; + +struct pubsub { + struct sub* subs; + struct rgn* msg_rgn; + pthread_mutex_t mx; +}; + +struct pubsub_msg { + struct rgn* rgn; + volatile atomic_size_t rc; + int type; + char payload[0]; +}; + +struct sub { + struct sub* next; + struct sub* prev; + struct pubsub* src; + struct sub_msg_list* msgs; + struct pubsub_msg* msg_to_rel; + pthread_cond_t msgs_cv; + pthread_mutex_t msgs_mx; +}; + +struct sub_msg_list { + struct sub_msg_list* next; + struct pubsub_msg* msg; +}; + +struct pubsub* pubsub_new(struct rgn* init_rgn); +struct sub* sub_new(struct rgn* rgn, struct pubsub* src); + +void* pubsub_begin_pub(struct pubsub* src, int type, size_t payload_size); +void pubsub_end_pub(struct pubsub* src); + +void sub_begin_recv(struct sub* sub); +void sub_begin_recv_and_wait(struct sub* sub); +void* sub_recv1(struct sub* sub, int* out_type); +void sub_end_recv(struct sub* sub); + +#define pubsub_begin_pub_typed(_src, ty) \ + ((struct ty*) pubsub_begin_pub(_src, ty ## _TAG, sizeof(struct ty)))