session: Unify handling of I/O and timer sources
Handle I/O sources and timer ("dummy") sources within the same polling loop, so that both may be used together. Slightly change the API to improve consistency: a timeout value of -1 now disables the timeout, and 0 makes the source always time out immediately. The "dummy" sources already behaved that way, although it wasn't documented as such. Make sure that I/O events are processed preferentially: Skip any timeout callbacks if an I/O event occurred within the same poll iteration. This applies to both timer/idle sources and timeouts of I/O sources. Do not create dummy GPollFDs for timer/idle sources. Instead, split the sources array into an I/O section and a timer section, and create corresponding GPollFDs only for the I/O section. Use GArray to simplify the handling of the dynamic arrays.
This commit is contained in:
parent
62d7945f80
commit
faa5d7d997
|
@ -721,16 +721,15 @@ struct sr_session {
|
|||
|
||||
gboolean running;
|
||||
|
||||
unsigned int num_sources;
|
||||
|
||||
/*
|
||||
* Both "sources" and "pollfds" are of the same size and contain pairs
|
||||
* of descriptor and callback function. We can not embed the GPollFD
|
||||
* into the source struct since we want to be able to pass the array
|
||||
* of all poll descriptors to g_poll().
|
||||
* Each I/O source has an entry with the same index in both "sources"
|
||||
* and "pollfds". The "sources" array may be larger than "pollfds",
|
||||
* in which case the excess sources are pure timer sources.
|
||||
* We can not embed the GPollFD into the source struct since we want
|
||||
* to be able to pass the array of all poll descriptors to g_poll().
|
||||
*/
|
||||
struct source *sources;
|
||||
GPollFD *pollfds;
|
||||
GArray *sources;
|
||||
GArray *pollfds;
|
||||
|
||||
/*
|
||||
* These are our synchronization primitives for stopping the session in
|
||||
|
|
236
src/session.c
236
src/session.c
|
@ -45,7 +45,8 @@
|
|||
*/
|
||||
|
||||
struct source {
|
||||
int64_t due;
|
||||
int64_t timeout; /* microseconds */
|
||||
int64_t due; /* microseconds */
|
||||
sr_receive_data_callback cb;
|
||||
void *cb_data;
|
||||
|
||||
|
@ -54,8 +55,8 @@ struct source {
|
|||
*/
|
||||
gintptr poll_object;
|
||||
|
||||
int timeout;
|
||||
gboolean is_usb;
|
||||
gboolean triggered;
|
||||
};
|
||||
|
||||
struct datafeed_callback {
|
||||
|
@ -87,8 +88,10 @@ SR_API int sr_session_new(struct sr_context *ctx,
|
|||
session = g_malloc0(sizeof(struct sr_session));
|
||||
|
||||
session->ctx = ctx;
|
||||
session->running = FALSE;
|
||||
session->abort_session = FALSE;
|
||||
|
||||
session->sources = g_array_new(FALSE, FALSE, sizeof(struct source));
|
||||
session->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
|
||||
|
||||
g_mutex_init(&session->stop_mutex);
|
||||
|
||||
*new_session = session;
|
||||
|
@ -121,6 +124,9 @@ SR_API int sr_session_destroy(struct sr_session *session)
|
|||
|
||||
g_slist_free_full(session->owned_devs, (GDestroyNotify)sr_dev_inst_free);
|
||||
|
||||
g_array_unref(session->pollfds);
|
||||
g_array_unref(session->sources);
|
||||
|
||||
g_free(session);
|
||||
|
||||
return SR_OK;
|
||||
|
@ -377,12 +383,10 @@ static gboolean sr_session_check_aborted(struct sr_session *session)
|
|||
return stop;
|
||||
}
|
||||
|
||||
static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object);
|
||||
|
||||
/**
|
||||
* Call every device in the current session's callback.
|
||||
*
|
||||
* For sessions not driven by select loops such as sr_session_run(),
|
||||
* but driven by another scheduler, this can be used to poll the devices
|
||||
* from within that scheduler.
|
||||
* Poll the session's event sources.
|
||||
*
|
||||
* @param session The session to use. Must not be NULL.
|
||||
* @retval SR_OK Success.
|
||||
|
@ -390,15 +394,13 @@ static gboolean sr_session_check_aborted(struct sr_session *session)
|
|||
*/
|
||||
static int sr_session_iteration(struct sr_session *session)
|
||||
{
|
||||
int64_t start_time;
|
||||
int64_t stop_time;
|
||||
int64_t min_due;
|
||||
int64_t due;
|
||||
int64_t start_time, stop_time, min_due, due;
|
||||
int timeout_ms;
|
||||
unsigned int i;
|
||||
int ret, timeout;
|
||||
int ret;
|
||||
int fd;
|
||||
int revents;
|
||||
gboolean stop_checked;
|
||||
gboolean stopped;
|
||||
gboolean triggered, stopped;
|
||||
struct source *source;
|
||||
GPollFD *pollfd;
|
||||
gintptr poll_object;
|
||||
|
@ -406,16 +408,18 @@ static int sr_session_iteration(struct sr_session *session)
|
|||
int64_t usb_due;
|
||||
struct timeval tv;
|
||||
#endif
|
||||
if (session->num_sources <= 0) {
|
||||
if (session->sources->len == 0) {
|
||||
sr_session_check_aborted(session);
|
||||
return SR_OK;
|
||||
}
|
||||
start_time = g_get_monotonic_time();
|
||||
min_due = INT64_MAX;
|
||||
|
||||
for (i = 0; i < session->num_sources; ++i) {
|
||||
if (session->sources[i].due < min_due)
|
||||
min_due = session->sources[i].due;
|
||||
for (i = 0; i < session->sources->len; ++i) {
|
||||
source = &g_array_index(session->sources, struct source, i);
|
||||
if (source->due < min_due)
|
||||
min_due = source->due;
|
||||
source->triggered = FALSE;
|
||||
}
|
||||
#ifdef HAVE_LIBUSB_1_0
|
||||
usb_due = INT64_MAX;
|
||||
|
@ -433,13 +437,15 @@ static int sr_session_iteration(struct sr_session *session)
|
|||
}
|
||||
}
|
||||
#endif
|
||||
if (min_due > start_time)
|
||||
timeout = (min_due == INT64_MAX) ? -1
|
||||
: MIN((min_due - start_time + 999) / 1000, INT_MAX);
|
||||
if (min_due == INT64_MAX)
|
||||
timeout_ms = -1;
|
||||
else if (min_due > start_time)
|
||||
timeout_ms = MIN((min_due - start_time + 999) / 1000, INT_MAX);
|
||||
else
|
||||
timeout = 0;
|
||||
timeout_ms = 0;
|
||||
|
||||
ret = g_poll(session->pollfds, session->num_sources, timeout);
|
||||
ret = g_poll((GPollFD *)session->pollfds->data,
|
||||
session->pollfds->len, timeout_ms);
|
||||
#ifdef G_OS_UNIX
|
||||
if (ret < 0 && errno != EINTR) {
|
||||
sr_err("Error in poll: %s", g_strerror(errno));
|
||||
|
@ -452,13 +458,27 @@ static int sr_session_iteration(struct sr_session *session)
|
|||
}
|
||||
#endif
|
||||
stop_time = g_get_monotonic_time();
|
||||
stop_checked = FALSE;
|
||||
triggered = FALSE;
|
||||
stopped = FALSE;
|
||||
|
||||
for (i = 0; i < session->num_sources; i++) {
|
||||
source = &session->sources[i];
|
||||
pollfd = &session->pollfds[i];
|
||||
revents = (ret > 0) ? pollfd->revents : 0;
|
||||
for (i = 0; i < session->sources->len; ++i) {
|
||||
source = &g_array_index(session->sources, struct source, i);
|
||||
if (source->triggered)
|
||||
continue; /* already handled */
|
||||
|
||||
poll_object = source->poll_object;
|
||||
fd = (int)poll_object;
|
||||
revents = 0;
|
||||
|
||||
if (i < session->pollfds->len) {
|
||||
pollfd = &g_array_index(session->pollfds, GPollFD, i);
|
||||
fd = pollfd->fd;
|
||||
if (ret > 0)
|
||||
revents = pollfd->revents;
|
||||
}
|
||||
if (ret > 0 && revents == 0)
|
||||
continue; /* skip timeouts if any I/O event occurred */
|
||||
|
||||
due = source->due;
|
||||
#ifdef HAVE_LIBUSB_1_0
|
||||
if (source->is_usb && usb_due < due)
|
||||
|
@ -470,29 +490,30 @@ static int sr_session_iteration(struct sr_session *session)
|
|||
* The source may be gone after the callback returns,
|
||||
* so access any data now that needs accessing.
|
||||
*/
|
||||
poll_object = source->poll_object;
|
||||
if (source->timeout > 0)
|
||||
source->due = stop_time + INT64_C(1000) * source->timeout;
|
||||
pollfd->revents = 0;
|
||||
if (source->timeout >= 0)
|
||||
source->due = stop_time + source->timeout;
|
||||
source->triggered = TRUE;
|
||||
triggered = TRUE;
|
||||
/*
|
||||
* Invoke the source's callback on an event or timeout.
|
||||
*/
|
||||
if (!source->cb(pollfd->fd, revents, source->cb_data))
|
||||
sr_session_source_remove(session, poll_object);
|
||||
if (!source->cb(fd, revents, source->cb_data))
|
||||
_sr_session_source_remove(session, poll_object);
|
||||
/*
|
||||
* We want to take as little time as possible to stop
|
||||
* the session if we have been told to do so. Therefore,
|
||||
* we check the flag after processing every source, not
|
||||
* just once per main event loop.
|
||||
*/
|
||||
if (!stopped) {
|
||||
if (!stopped)
|
||||
stopped = sr_session_check_aborted(session);
|
||||
stop_checked = TRUE;
|
||||
}
|
||||
|
||||
/* Restart loop as the sources list may have changed. */
|
||||
i = 0;
|
||||
}
|
||||
if (!stop_checked)
|
||||
|
||||
/* Check for abort at least once per iteration. */
|
||||
if (!triggered)
|
||||
sr_session_check_aborted(session);
|
||||
|
||||
return SR_OK;
|
||||
|
@ -609,11 +630,14 @@ SR_API int sr_session_start(struct sr_session *session)
|
|||
*
|
||||
* @retval SR_OK Success.
|
||||
* @retval SR_ERR_ARG Invalid session passed.
|
||||
* @retval SR_ERR Error during event processing.
|
||||
*
|
||||
* @since 0.4.0
|
||||
*/
|
||||
SR_API int sr_session_run(struct sr_session *session)
|
||||
{
|
||||
int ret;
|
||||
|
||||
if (!session) {
|
||||
sr_err("%s: session was NULL", __func__);
|
||||
return SR_ERR_ARG;
|
||||
|
@ -629,17 +653,12 @@ SR_API int sr_session_run(struct sr_session *session)
|
|||
|
||||
sr_info("Running.");
|
||||
|
||||
/* Do we have real sources? */
|
||||
if (session->num_sources == 1 && session->pollfds[0].fd == -1) {
|
||||
/* Dummy source, freewheel over it. */
|
||||
while (session->num_sources)
|
||||
session->sources[0].cb(-1, 0, session->sources[0].cb_data);
|
||||
} else {
|
||||
/* Real sources, use g_poll() main loop. */
|
||||
while (session->num_sources)
|
||||
sr_session_iteration(session);
|
||||
/* Poll event sources until none are left. */
|
||||
while (session->sources->len > 0) {
|
||||
ret = sr_session_iteration(session);
|
||||
if (ret != SR_OK)
|
||||
return ret;
|
||||
}
|
||||
|
||||
return SR_OK;
|
||||
}
|
||||
|
||||
|
@ -854,8 +873,8 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
|
|||
*
|
||||
* @param session The session to use. Must not be NULL.
|
||||
* @param pollfd The GPollFD.
|
||||
* @param[in] timeout Max time to wait before the callback is called,
|
||||
* ignored if 0.
|
||||
* @param[in] timeout Max time in ms to wait before the callback is called,
|
||||
* or -1 to wait indefinitely.
|
||||
* @param cb Callback function to add. Must not be NULL.
|
||||
* @param cb_data Data for the callback function. Can be NULL.
|
||||
* @param poll_object Handle by which the source is identified
|
||||
|
@ -868,34 +887,36 @@ SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
|
|||
GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
|
||||
void *cb_data, gintptr poll_object, gboolean is_usb)
|
||||
{
|
||||
struct source *new_sources, *s;
|
||||
GPollFD *new_pollfds;
|
||||
struct source src;
|
||||
|
||||
if (!cb) {
|
||||
sr_err("%s: cb was NULL", __func__);
|
||||
return SR_ERR_ARG;
|
||||
}
|
||||
|
||||
/* Note: cb_data can be NULL, that's not a bug. */
|
||||
|
||||
new_pollfds = g_realloc(session->pollfds,
|
||||
sizeof(GPollFD) * (session->num_sources + 1));
|
||||
new_sources = g_realloc(session->sources, sizeof(struct source) *
|
||||
(session->num_sources + 1));
|
||||
src.cb = cb;
|
||||
src.cb_data = cb_data;
|
||||
src.poll_object = poll_object;
|
||||
src.is_usb = is_usb;
|
||||
src.triggered = FALSE;
|
||||
|
||||
new_pollfds[session->num_sources] = *pollfd;
|
||||
s = &new_sources[session->num_sources++];
|
||||
if (timeout > 0)
|
||||
s->due = g_get_monotonic_time() + INT64_C(1000) * timeout;
|
||||
else
|
||||
s->due = INT64_MAX;
|
||||
s->cb = cb;
|
||||
s->cb_data = cb_data;
|
||||
s->poll_object = poll_object;
|
||||
s->timeout = timeout;
|
||||
s->is_usb = is_usb;
|
||||
session->pollfds = new_pollfds;
|
||||
session->sources = new_sources;
|
||||
if (timeout >= 0) {
|
||||
src.timeout = INT64_C(1000) * timeout;
|
||||
src.due = g_get_monotonic_time() + src.timeout;
|
||||
} else {
|
||||
src.timeout = -1;
|
||||
src.due = INT64_MAX;
|
||||
}
|
||||
|
||||
if (pollfd) {
|
||||
/* I/O source */
|
||||
g_array_insert_val(session->sources, session->pollfds->len, src);
|
||||
g_array_append_vals(session->pollfds, pollfd, 1);
|
||||
} else {
|
||||
/* Timer source */
|
||||
g_array_append_val(session->sources, src);
|
||||
}
|
||||
|
||||
return SR_OK;
|
||||
}
|
||||
|
@ -906,7 +927,8 @@ SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
|
|||
* @param session The session to use. Must not be NULL.
|
||||
* @param fd The file descriptor.
|
||||
* @param events Events to check for.
|
||||
* @param timeout Max time to wait before the callback is called, ignored if 0.
|
||||
* @param timeout Max time in ms to wait before the callback is called,
|
||||
* or -1 to wait indefinitely.
|
||||
* @param cb Callback function to add. Must not be NULL.
|
||||
* @param cb_data Data for the callback function. Can be NULL.
|
||||
*
|
||||
|
@ -920,20 +942,25 @@ SR_API int sr_session_source_add(struct sr_session *session, int fd,
|
|||
{
|
||||
GPollFD p;
|
||||
|
||||
if (fd < 0 && timeout < 0) {
|
||||
sr_err("Timer source without timeout would block indefinitely");
|
||||
return SR_ERR_ARG;
|
||||
}
|
||||
p.fd = fd;
|
||||
p.events = events;
|
||||
p.revents = 0;
|
||||
|
||||
return sr_session_source_add_internal(session, &p, timeout,
|
||||
cb, cb_data, fd, FALSE);
|
||||
return sr_session_source_add_internal(session,
|
||||
(fd < 0) ? NULL : &p, timeout, cb, cb_data, fd, FALSE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an event source for a GPollFD.
|
||||
*
|
||||
* @param session The session to use. Must not be NULL.
|
||||
* @param pollfd The GPollFD.
|
||||
* @param timeout Max time to wait before the callback is called, ignored if 0.
|
||||
* @param pollfd The GPollFD. Must not be NULL.
|
||||
* @param timeout Max time in ms to wait before the callback is called,
|
||||
* or -1 to wait indefinitely.
|
||||
* @param cb Callback function to add. Must not be NULL.
|
||||
* @param cb_data Data for the callback function. Can be NULL.
|
||||
*
|
||||
|
@ -946,6 +973,10 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
|
|||
GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
|
||||
void *cb_data)
|
||||
{
|
||||
if (!pollfd) {
|
||||
sr_err("%s: pollfd was NULL", __func__);
|
||||
return SR_ERR_ARG;
|
||||
}
|
||||
return sr_session_source_add_internal(session, pollfd, timeout,
|
||||
cb, cb_data, (gintptr)pollfd, FALSE);
|
||||
}
|
||||
|
@ -956,7 +987,8 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
|
|||
* @param session The session to use. Must not be NULL.
|
||||
* @param channel The GIOChannel.
|
||||
* @param events Events to poll on.
|
||||
* @param timeout Max time to wait before the callback is called, ignored if 0.
|
||||
* @param timeout Max time in ms to wait before the callback is called,
|
||||
* or -1 to wait indefinitely.
|
||||
* @param cb Callback function to add. Must not be NULL.
|
||||
* @param cb_data Data for the callback function. Can be NULL.
|
||||
*
|
||||
|
@ -971,7 +1003,7 @@ SR_API int sr_session_source_add_channel(struct sr_session *session,
|
|||
{
|
||||
GPollFD p;
|
||||
|
||||
#ifdef _WIN32
|
||||
#ifdef G_OS_WIN32
|
||||
g_io_channel_win32_make_pollfd(channel, events, &p);
|
||||
#else
|
||||
p.fd = g_io_channel_unix_get_fd(channel);
|
||||
|
@ -994,34 +1026,18 @@ SR_API int sr_session_source_add_channel(struct sr_session *session,
|
|||
*/
|
||||
static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object)
|
||||
{
|
||||
unsigned int old;
|
||||
unsigned int i;
|
||||
|
||||
if (!session->sources || !session->num_sources) {
|
||||
sr_err("%s: sources was NULL", __func__);
|
||||
return SR_ERR_BUG;
|
||||
}
|
||||
for (i = 0; i < session->sources->len; ++i) {
|
||||
if (g_array_index(session->sources, struct source, i)
|
||||
.poll_object == poll_object) {
|
||||
|
||||
for (old = 0; old < session->num_sources; old++) {
|
||||
if (session->sources[old].poll_object == poll_object)
|
||||
g_array_remove_index(session->sources, i);
|
||||
if (i < session->pollfds->len)
|
||||
g_array_remove_index(session->pollfds, i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* fd not found, nothing to do */
|
||||
if (old == session->num_sources)
|
||||
return SR_OK;
|
||||
|
||||
session->num_sources--;
|
||||
|
||||
if (old != session->num_sources) {
|
||||
memmove(&session->pollfds[old], &session->pollfds[old + 1],
|
||||
(session->num_sources - old) * sizeof(GPollFD));
|
||||
memmove(&session->sources[old], &session->sources[old + 1],
|
||||
(session->num_sources - old) * sizeof(struct source));
|
||||
}
|
||||
|
||||
session->pollfds = g_realloc(session->pollfds, sizeof(GPollFD) * session->num_sources);
|
||||
session->sources = g_realloc(session->sources, sizeof(struct source) * session->num_sources);
|
||||
|
||||
return SR_OK;
|
||||
}
|
||||
|
||||
|
@ -1039,7 +1055,7 @@ static int _sr_session_source_remove(struct sr_session *session, gintptr poll_ob
|
|||
*/
|
||||
SR_API int sr_session_source_remove(struct sr_session *session, int fd)
|
||||
{
|
||||
return _sr_session_source_remove(session, (gintptr)fd);
|
||||
return _sr_session_source_remove(session, fd);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1047,7 +1063,7 @@ SR_API int sr_session_source_remove(struct sr_session *session, int fd)
|
|||
*
|
||||
* @param session The session to use. Must not be NULL.
|
||||
* @param pollfd The poll descriptor for which the source should be removed.
|
||||
*
|
||||
* Must not be NULL.
|
||||
* @return SR_OK upon success, SR_ERR_ARG upon invalid arguments, or
|
||||
* SR_ERR_MALLOC upon memory allocation errors, SR_ERR_BUG upon
|
||||
* internal errors.
|
||||
|
@ -1057,6 +1073,10 @@ SR_API int sr_session_source_remove(struct sr_session *session, int fd)
|
|||
SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
|
||||
GPollFD *pollfd)
|
||||
{
|
||||
if (!pollfd) {
|
||||
sr_err("%s: pollfd was NULL", __func__);
|
||||
return SR_ERR_ARG;
|
||||
}
|
||||
return _sr_session_source_remove(session, (gintptr)pollfd);
|
||||
}
|
||||
|
||||
|
@ -1065,7 +1085,7 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
|
|||
*
|
||||
* @param session The session to use. Must not be NULL.
|
||||
* @param channel The channel for which the source should be removed.
|
||||
*
|
||||
* Must not be NULL.
|
||||
* @retval SR_OK Success.
|
||||
* @retval SR_ERR_ARG Invalid argument.
|
||||
* @return SR_ERR_BUG Internal error.
|
||||
|
@ -1075,6 +1095,10 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
|
|||
SR_API int sr_session_source_remove_channel(struct sr_session *session,
|
||||
GIOChannel *channel)
|
||||
{
|
||||
if (!channel) {
|
||||
sr_err("%s: channel was NULL", __func__);
|
||||
return SR_ERR_ARG;
|
||||
}
|
||||
return _sr_session_source_remove(session, (gintptr)channel);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue