session: Properly accumulate event source timeouts

Keep track of when source timeouts are due and properly compare
against accumulated elapsed time between invocations. This prevents
sources with short timeouts from blocking other sources with longer
timeouts indefinitely.
This commit is contained in:
Daniel Elstner 2015-08-30 21:43:30 +02:00
parent 4399cc0f41
commit 62d7945f80
3 changed files with 97 additions and 56 deletions

View File

@ -718,7 +718,7 @@ struct sr_session {
GSList *datafeed_callbacks; GSList *datafeed_callbacks;
GSList *transforms; GSList *transforms;
struct sr_trigger *trigger; struct sr_trigger *trigger;
GTimeVal starttime;
gboolean running; gboolean running;
unsigned int num_sources; unsigned int num_sources;
@ -731,7 +731,6 @@ struct sr_session {
*/ */
struct source *sources; struct source *sources;
GPollFD *pollfds; GPollFD *pollfds;
int source_timeout;
/* /*
* These are our synchronization primitives for stopping the session in * These are our synchronization primitives for stopping the session in
@ -744,6 +743,9 @@ struct sr_session {
gboolean abort_session; gboolean abort_session;
}; };
SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
void *cb_data, gintptr poll_object, gboolean is_usb);
SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi, SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
const struct sr_datafeed_packet *packet); const struct sr_datafeed_packet *packet);
SR_PRIV int sr_session_stop_sync(struct sr_session *session); SR_PRIV int sr_session_stop_sync(struct sr_session *session);

View File

@ -45,7 +45,7 @@
*/ */
struct source { struct source {
int timeout; int64_t due;
sr_receive_data_callback cb; sr_receive_data_callback cb;
void *cb_data; void *cb_data;
@ -53,6 +53,9 @@ struct source {
* being polled and will be used to match the source when removing it again. * being polled and will be used to match the source when removing it again.
*/ */
gintptr poll_object; gintptr poll_object;
int timeout;
gboolean is_usb;
}; };
struct datafeed_callback { struct datafeed_callback {
@ -84,7 +87,6 @@ SR_API int sr_session_new(struct sr_context *ctx,
session = g_malloc0(sizeof(struct sr_session)); session = g_malloc0(sizeof(struct sr_session));
session->ctx = ctx; session->ctx = ctx;
session->source_timeout = -1;
session->running = FALSE; session->running = FALSE;
session->abort_session = FALSE; session->abort_session = FALSE;
g_mutex_init(&session->stop_mutex); g_mutex_init(&session->stop_mutex);
@ -383,17 +385,15 @@ static gboolean sr_session_check_aborted(struct sr_session *session)
* from within that scheduler. * from within that scheduler.
* *
* @param session The session to use. Must not be NULL. * @param session The session to use. Must not be NULL.
* @param block If TRUE, this call will wait for any of the session's
* sources to fire an event on the file descriptors, or
* any of their timeouts to activate. In other words, this
* can be used as a select loop.
* If FALSE, return immediately if none of the sources has
* events pending.
* @retval SR_OK Success. * @retval SR_OK Success.
* @retval SR_ERR Error occurred. * @retval SR_ERR Error occurred.
*/ */
static int sr_session_iteration(struct sr_session *session, gboolean block) static int sr_session_iteration(struct sr_session *session)
{ {
int64_t start_time;
int64_t stop_time;
int64_t min_due;
int64_t due;
unsigned int i; unsigned int i;
int ret, timeout; int ret, timeout;
int revents; int revents;
@ -401,14 +401,24 @@ static int sr_session_iteration(struct sr_session *session, gboolean block)
gboolean stopped; gboolean stopped;
struct source *source; struct source *source;
GPollFD *pollfd; GPollFD *pollfd;
gintptr poll_object;
#ifdef HAVE_LIBUSB_1_0 #ifdef HAVE_LIBUSB_1_0
int usb_timeout; int64_t usb_due;
struct timeval tv; struct timeval tv;
#endif #endif
if (session->num_sources <= 0) {
sr_session_check_aborted(session);
return SR_OK;
}
start_time = g_get_monotonic_time();
min_due = INT64_MAX;
timeout = block ? session->source_timeout : 0; for (i = 0; i < session->num_sources; ++i) {
if (session->sources[i].due < min_due)
min_due = session->sources[i].due;
}
#ifdef HAVE_LIBUSB_1_0 #ifdef HAVE_LIBUSB_1_0
usb_due = INT64_MAX;
if (session->ctx->usb_source_present) { if (session->ctx->usb_source_present) {
ret = libusb_get_next_timeout(session->ctx->libusb_ctx, &tv); ret = libusb_get_next_timeout(session->ctx->libusb_ctx, &tv);
if (ret < 0) { if (ret < 0) {
@ -416,11 +426,18 @@ static int sr_session_iteration(struct sr_session *session, gboolean block)
libusb_error_name(ret)); libusb_error_name(ret));
return SR_ERR; return SR_ERR;
} else if (ret == 1) { } else if (ret == 1) {
usb_timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; usb_due = start_time + tv.tv_usec
timeout = MIN(timeout, usb_timeout); + (int64_t)tv.tv_sec * G_USEC_PER_SEC;
if (usb_due < min_due)
min_due = usb_due;
} }
} }
#endif #endif
if (min_due > start_time)
timeout = (min_due == INT64_MAX) ? -1
: MIN((min_due - start_time + 999) / 1000, INT_MAX);
else
timeout = 0;
ret = g_poll(session->pollfds, session->num_sources, timeout); ret = g_poll(session->pollfds, session->num_sources, timeout);
#ifdef G_OS_UNIX #ifdef G_OS_UNIX
@ -434,6 +451,7 @@ static int sr_session_iteration(struct sr_session *session, gboolean block)
return SR_ERR; return SR_ERR;
} }
#endif #endif
stop_time = g_get_monotonic_time();
stop_checked = FALSE; stop_checked = FALSE;
stopped = FALSE; stopped = FALSE;
@ -441,17 +459,26 @@ static int sr_session_iteration(struct sr_session *session, gboolean block)
source = &session->sources[i]; source = &session->sources[i];
pollfd = &session->pollfds[i]; pollfd = &session->pollfds[i];
revents = (ret > 0) ? pollfd->revents : 0; revents = (ret > 0) ? pollfd->revents : 0;
due = source->due;
if (revents > 0 || (ret == 0 #ifdef HAVE_LIBUSB_1_0
&& session->source_timeout == source->timeout)) { if (source->is_usb && usb_due < due)
due = usb_due;
#endif
if (revents == 0 && stop_time < due)
continue;
/* /*
* Invoke the source's callback on an event, * The source may be gone after the callback returns,
* or if the poll timed out and this source * so access any data now that needs accessing.
* asked for that timeout. */
poll_object = source->poll_object;
if (source->timeout > 0)
source->due = stop_time + INT64_C(1000) * source->timeout;
pollfd->revents = 0;
/*
* Invoke the source's callback on an event or timeout.
*/ */
if (!source->cb(pollfd->fd, revents, source->cb_data)) if (!source->cb(pollfd->fd, revents, source->cb_data))
sr_session_source_remove(session, sr_session_source_remove(session, poll_object);
source->poll_object);
/* /*
* We want to take as little time as possible to stop * We want to take as little time as possible to stop
* the session if we have been told to do so. Therefore, * the session if we have been told to do so. Therefore,
@ -462,7 +489,8 @@ static int sr_session_iteration(struct sr_session *session, gboolean block)
stopped = sr_session_check_aborted(session); stopped = sr_session_check_aborted(session);
stop_checked = TRUE; stop_checked = TRUE;
} }
} /* Restart loop as the sources list may have changed. */
i = 0;
} }
if (!stop_checked) if (!stop_checked)
sr_session_check_aborted(session); sr_session_check_aborted(session);
@ -609,7 +637,7 @@ SR_API int sr_session_run(struct sr_session *session)
} else { } else {
/* Real sources, use g_poll() main loop. */ /* Real sources, use g_poll() main loop. */
while (session->num_sources) while (session->num_sources)
sr_session_iteration(session, TRUE); sr_session_iteration(session);
} }
return SR_OK; return SR_OK;
@ -830,13 +858,15 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
* ignored if 0. * ignored if 0.
* @param cb Callback function to add. Must not be NULL. * @param cb Callback function to add. Must not be NULL.
* @param cb_data Data for the callback function. Can be NULL. * @param cb_data Data for the callback function. Can be NULL.
* @param poll_object TODO. * @param poll_object Handle by which the source is identified
* @param is_usb TRUE for a libusb polling source
* *
* @retval SR_OK Success. * @retval SR_OK Success.
* @retval SR_ERR_ARG Invalid argument. * @retval SR_ERR_ARG Invalid argument.
*/ */
static int _sr_session_source_add(struct sr_session *session, GPollFD *pollfd, SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
int timeout, sr_receive_data_callback cb, void *cb_data, gintptr poll_object) GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
void *cb_data, gintptr poll_object, gboolean is_usb)
{ {
struct source *new_sources, *s; struct source *new_sources, *s;
GPollFD *new_pollfds; GPollFD *new_pollfds;
@ -855,17 +885,18 @@ static int _sr_session_source_add(struct sr_session *session, GPollFD *pollfd,
new_pollfds[session->num_sources] = *pollfd; new_pollfds[session->num_sources] = *pollfd;
s = &new_sources[session->num_sources++]; s = &new_sources[session->num_sources++];
s->timeout = timeout; if (timeout > 0)
s->due = g_get_monotonic_time() + INT64_C(1000) * timeout;
else
s->due = INT64_MAX;
s->cb = cb; s->cb = cb;
s->cb_data = cb_data; s->cb_data = cb_data;
s->poll_object = poll_object; s->poll_object = poll_object;
s->timeout = timeout;
s->is_usb = is_usb;
session->pollfds = new_pollfds; session->pollfds = new_pollfds;
session->sources = new_sources; session->sources = new_sources;
if (timeout != session->source_timeout && timeout > 0
&& (session->source_timeout == -1 || timeout < session->source_timeout))
session->source_timeout = timeout;
return SR_OK; return SR_OK;
} }
@ -893,7 +924,8 @@ SR_API int sr_session_source_add(struct sr_session *session, int fd,
p.events = events; p.events = events;
p.revents = 0; p.revents = 0;
return _sr_session_source_add(session, &p, timeout, cb, cb_data, (gintptr)fd); return sr_session_source_add_internal(session, &p, timeout,
cb, cb_data, fd, FALSE);
} }
/** /**
@ -914,8 +946,8 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
GPollFD *pollfd, int timeout, sr_receive_data_callback cb, GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
void *cb_data) void *cb_data)
{ {
return _sr_session_source_add(session, pollfd, timeout, cb, return sr_session_source_add_internal(session, pollfd, timeout,
cb_data, (gintptr)pollfd); cb, cb_data, (gintptr)pollfd, FALSE);
} }
/** /**
@ -946,8 +978,8 @@ SR_API int sr_session_source_add_channel(struct sr_session *session,
p.events = events; p.events = events;
p.revents = 0; p.revents = 0;
#endif #endif
return sr_session_source_add_internal(session, &p, timeout, cb,
return _sr_session_source_add(session, &p, timeout, cb, cb_data, (gintptr)channel); cb_data, (gintptr)channel, FALSE);
} }
/** /**

View File

@ -254,16 +254,23 @@ SR_PRIV int usb_source_add(struct sr_session *session, struct sr_context *ctx,
ctx->usb_pollfd.revents = 0; ctx->usb_pollfd.revents = 0;
ctx->usb_cb = cb; ctx->usb_cb = cb;
ctx->usb_cb_data = cb_data; ctx->usb_cb_data = cb_data;
sr_session_source_add_pollfd(session, &ctx->usb_pollfd, timeout, sr_session_source_add_internal(session, &ctx->usb_pollfd, timeout,
usb_callback, ctx); usb_callback, ctx, (gintptr)&ctx->usb_pollfd, TRUE);
#else #else
const struct libusb_pollfd **lupfd; const struct libusb_pollfd **lupfd;
unsigned int i; unsigned int i;
lupfd = libusb_get_pollfds(ctx->libusb_ctx); lupfd = libusb_get_pollfds(ctx->libusb_ctx);
for (i = 0; lupfd[i]; i++) for (i = 0; lupfd[i]; i++) {
sr_session_source_add(session, lupfd[i]->fd, lupfd[i]->events, GPollFD p;
timeout, cb, cb_data);
p.fd = lupfd[i]->fd;
p.events = lupfd[i]->events;
p.revents = 0;
sr_session_source_add_internal(session, &p, timeout,
cb, cb_data, p.fd, TRUE);
}
free(lupfd); free(lupfd);
#endif #endif
ctx->usb_source_present = TRUE; ctx->usb_source_present = TRUE;