ui/input_async.c: fix synchronization scheme.

We now have entirely separate channels for each direction of
communication between the producer and consumer, rather than having the
two sides share a single state machine.
This commit is contained in:
Daniel Beer 2015-05-20 09:37:18 +12:00
parent 56318beed1
commit 5e47033700
1 changed files with 50 additions and 63 deletions

View File

@ -27,22 +27,6 @@
#define MAX_LINE_LENGTH 1024
/* Mailbox state machine works as follows:
*
* BLOCKED: no data is requested by the main thread. The only
* transition from this state is to RECEIVING at the request of the
* main thread.
*
* RECEIVING: main thread is waiting for data. Reader thread may
* change state to either READY or EOF.
*
* READY: a line of text is ready to be picked up from the mailbox.
* The only transitions are to either BLOCKED or RECEIVING, at the
* request of the main thread.
*
* EOF: end of input. No transitions from this state.
*/
typedef enum {
STATE_BLOCKED,
STATE_RECEIVING,
@ -51,9 +35,14 @@ typedef enum {
} mailbox_state_t;
struct mailbox {
thread_lock_t lock;
thread_cond_t cond;
mailbox_state_t state;
thread_lock_t lock_ack;
thread_cond_t cond_ack;
int ack;
thread_lock_t lock_text;
thread_cond_t cond_text;
int text_len;
int text_eof;
char text[MAX_LINE_LENGTH];
};
@ -67,17 +56,25 @@ static void handle_special(const char *text)
static void handle_command(const char *text)
{
/* Deliver the command to the mailbox if it's receiving. */
thread_lock_acquire(&linebox.lock);
int len = strlen(text);
if (linebox.state == STATE_RECEIVING) {
strncpy(linebox.text, text, sizeof(linebox.text));
linebox.text[sizeof(linebox.text) - 1] = 0;
linebox.state = STATE_READY;
}
if (len >= sizeof(linebox.text))
len = sizeof(linebox.text) - 1;
thread_lock_release(&linebox.lock);
thread_cond_notify(&linebox.cond);
/* Deliver the command to the mailbox */
thread_lock_acquire(&linebox.lock_text);
memcpy(linebox.text, text, len);
linebox.text[len] = 0;
linebox.text_len = len;
thread_lock_release(&linebox.lock_text);
thread_cond_notify(&linebox.cond_text);
/* Wait for ACK */
thread_lock_acquire(&linebox.lock_ack);
while (!linebox.ack)
thread_cond_wait(&linebox.cond_ack, &linebox.lock_ack);
linebox.ack = 0;
thread_lock_release(&linebox.lock_ack);
}
static void io_worker(void *thread_arg)
@ -101,22 +98,24 @@ static void io_worker(void *thread_arg)
handle_command(buf);
}
/* Wait for the next read request and deliver EOF */
thread_lock_acquire(&linebox.lock);
while (linebox.state != STATE_RECEIVING)
thread_cond_wait(&linebox.cond, &linebox.lock);
linebox.state = STATE_EOF;
thread_lock_release(&linebox.lock);
thread_cond_notify(&linebox.cond);
/* Deliver EOF */
thread_lock_acquire(&linebox.lock_text);
linebox.text_eof = 1;
thread_lock_release(&linebox.lock_text);
thread_cond_notify(&linebox.cond_text);
}
static int async_init(void)
{
thread_t thr;
thread_lock_init(&linebox.lock);
thread_cond_init(&linebox.cond);
linebox.state = STATE_BLOCKED;
thread_lock_init(&linebox.lock_text);
thread_lock_init(&linebox.lock_ack);
thread_cond_init(&linebox.cond_text);
thread_cond_init(&linebox.cond_ack);
linebox.text_len = -1;
linebox.text_eof = 0;
linebox.ack = 0;
if (thread_create(&thr, io_worker, NULL) < 0) {
fprintf(stderr, "async_init: failed to "
@ -131,38 +130,26 @@ static void async_exit(void) { }
static int async_read_command(char *buf, int max_len)
{
/* Upon entry to this function, the mailbox is either in the
* BLOCKED or EOF state. First, put us into the receiving
* state.
*/
thread_lock_acquire(&linebox.lock);
if (linebox.state == STATE_EOF) {
thread_lock_release(&linebox.lock);
return 1;
}
/* Wait for text or EOF */
thread_lock_acquire(&linebox.lock_text);
while (!linebox.text_eof && (linebox.text_len < 0))
thread_cond_wait(&linebox.cond_text, &linebox.lock_text);
linebox.state = STATE_RECEIVING;
thread_lock_release(&linebox.lock);
thread_cond_notify(&linebox.cond);
/* Now wait for the reader thread to change the mailbox state to
* either READY or EOF.
*/
thread_lock_acquire(&linebox.lock);
while (linebox.state == STATE_RECEIVING)
thread_cond_wait(&linebox.cond, &linebox.lock);
if (linebox.state == STATE_EOF) {
thread_lock_release(&linebox.lock);
if (linebox.text_eof) {
thread_lock_release(&linebox.lock_text);
return 1;
}
strncpy(buf, linebox.text, max_len);
buf[max_len - 1] = 0;
linebox.text_len = -1;
thread_lock_release(&linebox.lock_text);
linebox.state = STATE_BLOCKED;
thread_lock_release(&linebox.lock);
thread_cond_notify(&linebox.cond);
/* Send ACK */
thread_lock_acquire(&linebox.lock_ack);
linebox.ack = 1;
thread_lock_release(&linebox.lock_ack);
thread_cond_notify(&linebox.cond_ack);
return 0;
}