session: Make sr_session_run() optional

Introduce a new API function sr_session_stopped_callback_set()
which can be used to receive notification when a session stops
running. This allows applications to integrate libsigrok event
processing with their own main loop, instead of blocking in
sr_session_run().
This commit is contained in:
Daniel Elstner 2015-10-09 02:35:47 +02:00
parent 45d835edc7
commit 5de0fc55a6
4 changed files with 280 additions and 100 deletions

View File

@ -1157,14 +1157,6 @@ struct sr_dev_driver {
void *context; void *context;
}; };
/**
* @struct sr_session
*
* Opaque data structure representing a libsigrok session. None of the fields
* of this structure are meant to be accessed directly.
*/
struct sr_session;
/** Serial port descriptor. */ /** Serial port descriptor. */
struct sr_serial_port { struct sr_serial_port {
/** The OS dependent name of the serial port. */ /** The OS dependent name of the serial port. */

View File

@ -98,8 +98,10 @@ SR_API const struct sr_key_info *sr_key_info_name_get(int keytype, const char *k
/*--- session.c -------------------------------------------------------------*/ /*--- session.c -------------------------------------------------------------*/
typedef void (*sr_session_stopped_callback)(void *data);
typedef void (*sr_datafeed_callback)(const struct sr_dev_inst *sdi, typedef void (*sr_datafeed_callback)(const struct sr_dev_inst *sdi,
const struct sr_datafeed_packet *packet, void *cb_data); const struct sr_datafeed_packet *packet, void *cb_data);
SR_API struct sr_trigger *sr_session_trigger_get(struct sr_session *session); SR_API struct sr_trigger *sr_session_trigger_get(struct sr_session *session);
/* Session setup */ /* Session setup */
@ -122,6 +124,10 @@ SR_API int sr_session_datafeed_callback_add(struct sr_session *session,
SR_API int sr_session_start(struct sr_session *session); SR_API int sr_session_start(struct sr_session *session);
SR_API int sr_session_run(struct sr_session *session); SR_API int sr_session_run(struct sr_session *session);
SR_API int sr_session_stop(struct sr_session *session); SR_API int sr_session_stop(struct sr_session *session);
SR_API int sr_session_is_running(struct sr_session *session);
SR_API int sr_session_stopped_callback_set(struct sr_session *session,
sr_session_stopped_callback cb, void *cb_data);
SR_API int sr_session_source_add(struct sr_session *session, int fd, SR_API int sr_session_source_add(struct sr_session *session, int fd,
int events, int timeout, sr_receive_data_callback cb, void *cb_data); int events, int timeout, sr_receive_data_callback cb, void *cb_data);
SR_API int sr_session_source_add_pollfd(struct sr_session *session, SR_API int sr_session_source_add_pollfd(struct sr_session *session,

View File

@ -701,6 +701,11 @@ struct sr_session {
GSList *transforms; GSList *transforms;
struct sr_trigger *trigger; struct sr_trigger *trigger;
/** Callback to invoke on session stop. */
sr_session_stopped_callback stopped_callback;
/** User data to be passed to the session stop callback. */
void *stopped_cb_data;
/** Mutex protecting the main context pointer and ownership flag. */ /** Mutex protecting the main context pointer and ownership flag. */
GMutex main_mutex; GMutex main_mutex;
/** Context of the session main loop. */ /** Context of the session main loop. */
@ -715,6 +720,8 @@ struct sr_session {
GHashTable *event_sources; GHashTable *event_sources;
/** Session main loop. */ /** Session main loop. */
GMainLoop *main_loop; GMainLoop *main_loop;
/** ID of idle source for dispatching the session stop notification. */
unsigned int stop_check_id;
}; };
SR_PRIV int sr_session_source_add_internal(struct sr_session *session, SR_PRIV int sr_session_source_add_internal(struct sr_session *session,

View File

@ -52,6 +52,7 @@ struct datafeed_callback {
}; };
/** Custom GLib event source for generic descriptor I/O. /** Custom GLib event source for generic descriptor I/O.
* @see https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html
* @internal * @internal
*/ */
struct fd_source { struct fd_source {
@ -68,6 +69,7 @@ struct fd_source {
}; };
/** FD event source prepare() method. /** FD event source prepare() method.
* This is called immediately before poll().
*/ */
static gboolean fd_source_prepare(GSource *source, int *timeout) static gboolean fd_source_prepare(GSource *source, int *timeout)
{ {
@ -94,6 +96,7 @@ static gboolean fd_source_prepare(GSource *source, int *timeout)
} }
/** FD event source check() method. /** FD event source check() method.
* This is called after poll() returns to check whether an event fired.
*/ */
static gboolean fd_source_check(GSource *source) static gboolean fd_source_check(GSource *source)
{ {
@ -108,6 +111,7 @@ static gboolean fd_source_check(GSource *source)
} }
/** FD event source dispatch() method. /** FD event source dispatch() method.
* This is called if either prepare() or check() returned TRUE.
*/ */
static gboolean fd_source_dispatch(GSource *source, static gboolean fd_source_dispatch(GSource *source,
GSourceFunc callback, void *user_data) GSourceFunc callback, void *user_data)
@ -342,7 +346,7 @@ SR_API int sr_session_dev_add(struct sr_session *session,
/* If sdi->driver is NULL, this is a virtual device. */ /* If sdi->driver is NULL, this is a virtual device. */
if (!sdi->driver) { if (!sdi->driver) {
/* Just add the device, don't run dev_open(). */ /* Just add the device, don't run dev_open(). */
session->devs = g_slist_append(session->devs, (gpointer)sdi); session->devs = g_slist_append(session->devs, sdi);
sdi->session = session; sdi->session = session;
return SR_OK; return SR_OK;
} }
@ -353,9 +357,12 @@ SR_API int sr_session_dev_add(struct sr_session *session,
return SR_ERR_BUG; return SR_ERR_BUG;
} }
session->devs = g_slist_append(session->devs, (gpointer)sdi); session->devs = g_slist_append(session->devs, sdi);
sdi->session = session; sdi->session = session;
/* TODO: This is invalid if the session runs in a different thread.
* The usage semantics and restrictions need to be documented.
*/
if (session->running) { if (session->running) {
/* Adding a device to a running session. Commit settings /* Adding a device to a running session. Commit settings
* and start acquisition on that device now. */ * and start acquisition on that device now. */
@ -366,7 +373,7 @@ SR_API int sr_session_dev_add(struct sr_session *session,
return ret; return ret;
} }
if ((ret = sdi->driver->dev_acquisition_start(sdi, if ((ret = sdi->driver->dev_acquisition_start(sdi,
(void *)sdi)) != SR_OK) { sdi)) != SR_OK) {
sr_err("Failed to start acquisition of device in " sr_err("Failed to start acquisition of device in "
"running session (%s)", sr_strerror(ret)); "running session (%s)", sr_strerror(ret));
return ret; return ret;
@ -551,11 +558,11 @@ static int set_main_context(struct sr_session *session)
{ {
GMainContext *def_context; GMainContext *def_context;
/* May happen if sr_session_start() is called again after /* May happen if sr_session_start() is called a second time
* sr_session_run(), but the session hasn't been stopped yet. * while the session is still running.
*/ */
if (session->main_loop) { if (session->main_context) {
sr_err("Cannot set main context; main loop already created."); sr_err("Main context already set.");
return SR_ERR; return SR_ERR;
} }
@ -620,13 +627,99 @@ static int unset_main_context(struct sr_session *session)
return ret; return ret;
} }
static unsigned int session_source_attach(struct sr_session *session,
GSource *source)
{
unsigned int id = 0;
g_mutex_lock(&session->main_mutex);
if (session->main_context)
id = g_source_attach(source, session->main_context);
else
sr_err("Cannot add event source without main context.");
g_mutex_unlock(&session->main_mutex);
return id;
}
/* Idle handler; invoked when the number of registered event sources
* for a running session drops to zero.
*/
static gboolean delayed_stop_check(void *data)
{
struct sr_session *session;
session = data;
session->stop_check_id = 0;
/* Session already ended? */
if (!session->running)
return G_SOURCE_REMOVE;
/* New event sources may have been installed in the meantime. */
if (g_hash_table_size(session->event_sources) != 0)
return G_SOURCE_REMOVE;
session->running = FALSE;
unset_main_context(session);
sr_info("Stopped.");
/* This indicates a bug in user code, since it is not valid to
* restart or destroy a session while it may still be running.
*/
if (!session->main_loop && !session->stopped_callback) {
sr_err("BUG: Session stop left unhandled.");
return G_SOURCE_REMOVE;
}
if (session->main_loop)
g_main_loop_quit(session->main_loop);
if (session->stopped_callback)
(*session->stopped_callback)(session->stopped_cb_data);
return G_SOURCE_REMOVE;
}
static int stop_check_later(struct sr_session *session)
{
GSource *source;
unsigned int source_id;
if (session->stop_check_id != 0)
return SR_OK; /* idle handler already installed */
source = g_idle_source_new();
g_source_set_callback(source, &delayed_stop_check, session, NULL);
source_id = session_source_attach(session, source);
session->stop_check_id = source_id;
g_source_unref(source);
return (source_id != 0) ? SR_OK : SR_ERR;
}
/** /**
* Start a session. * Start a session.
* *
* When this function returns with a status code indicating success, the
* session is running. Use sr_session_stopped_callback_set() to receive
* notification upon completion, or call sr_session_run() to block until
* the session stops.
*
* Session events will be processed in the context of the current thread.
* If a thread-default GLib main context has been set, and is not owned by
* any other thread, it will be used. Otherwise, libsigrok will create its
* own main context for the current thread.
*
* @param session The session to use. Must not be NULL. * @param session The session to use. Must not be NULL.
* *
* @retval SR_OK Success. * @retval SR_OK Success.
* @retval SR_ERR_ARG Invalid session passed. * @retval SR_ERR_ARG Invalid session passed.
* @retval SR_ERR Other error.
* *
* @since 0.4.0 * @since 0.4.0
*/ */
@ -634,8 +727,8 @@ SR_API int sr_session_start(struct sr_session *session)
{ {
struct sr_dev_inst *sdi; struct sr_dev_inst *sdi;
struct sr_channel *ch; struct sr_channel *ch;
GSList *l, *c; GSList *l, *c, *lend;
int enabled_channels, ret; int ret;
if (!session) { if (!session) {
sr_err("%s: session was NULL", __func__); sr_err("%s: session was NULL", __func__);
@ -648,63 +741,97 @@ SR_API int sr_session_start(struct sr_session *session)
return SR_ERR_ARG; return SR_ERR_ARG;
} }
if (session->trigger && verify_trigger(session->trigger) != SR_OK) if (session->running) {
sr_err("Cannot (re-)start session while it is still running.");
return SR_ERR; return SR_ERR;
}
if (session->trigger) {
ret = verify_trigger(session->trigger);
if (ret != SR_OK)
return ret;
}
/* Check enabled channels and commit settings of all devices. */
for (l = session->devs; l; l = l->next) {
sdi = l->data;
for (c = sdi->channels; c; c = c->next) {
ch = c->data;
if (ch->enabled)
break;
}
if (!c) {
sr_err("%s device %s has no enabled channels.",
sdi->driver->name, sdi->connection_id);
return SR_ERR;
}
ret = sr_config_commit(sdi);
if (ret != SR_OK) {
sr_err("Failed to commit %s device %s settings "
"before starting acquisition.",
sdi->driver->name, sdi->connection_id);
return ret;
}
}
ret = set_main_context(session); ret = set_main_context(session);
if (ret != SR_OK) if (ret != SR_OK)
return ret; return ret;
session->running = TRUE;
sr_info("Starting."); sr_info("Starting.");
session->running = TRUE;
/* Have all devices start acquisition. */
for (l = session->devs; l; l = l->next) { for (l = session->devs; l; l = l->next) {
sdi = l->data; sdi = l->data;
enabled_channels = 0; ret = sdi->driver->dev_acquisition_start(sdi, sdi);
for (c = sdi->channels; c; c = c->next) { if (ret != SR_OK) {
ch = c->data; sr_err("Could not start %s device %s acquisition.",
if (ch->enabled) { sdi->driver->name, sdi->connection_id);
enabled_channels++;
break;
}
}
if (enabled_channels == 0) {
ret = SR_ERR;
sr_err("%s using connection %s has no enabled channels!",
sdi->driver->name, sdi->connection_id);
break;
}
if ((ret = sr_config_commit(sdi)) != SR_OK) {
sr_err("Failed to commit device settings before "
"starting acquisition (%s)", sr_strerror(ret));
break;
}
if ((ret = sdi->driver->dev_acquisition_start(sdi, sdi)) != SR_OK) {
sr_err("%s: could not start an acquisition "
"(%s)", __func__, sr_strerror(ret));
break; break;
} }
} }
if (ret != SR_OK) { if (ret != SR_OK) {
unset_main_context(session); /* If there are multiple devices, some of them may already have
* started successfully. Stop them now before returning. */
lend = l->next;
for (l = session->devs; l != lend; l = l->next) {
sdi = l->data;
if (sdi->driver->dev_acquisition_stop)
sdi->driver->dev_acquisition_stop(sdi, sdi);
}
/* TODO: Handle delayed stops. Need to iterate the event
* sources... */
session->running = FALSE; session->running = FALSE;
}
/* TODO: What if there are multiple devices? Which return code? */
return ret; unset_main_context(session);
return ret;
}
if (g_hash_table_size(session->event_sources) == 0)
stop_check_later(session);
return SR_OK;
} }
/** /**
* Run a session. * Block until the running session stops.
*
* This is a convenience function which creates a GLib main loop and runs
* it to process session events until the session stops.
*
* Instead of using this function, applications may run their own GLib main
* loop, and use sr_session_stopped_callback_set() to receive notification
* when the session finished running.
* *
* @param session The session to use. Must not be NULL. * @param session The session to use. Must not be NULL.
* *
* @retval SR_OK Success. * @retval SR_OK Success.
* @retval SR_ERR_ARG Invalid session passed. * @retval SR_ERR_ARG Invalid session passed.
* @retval SR_ERR Error during event processing. * @retval SR_ERR Other error.
* *
* @since 0.4.0 * @since 0.4.0
*/ */
@ -714,30 +841,24 @@ SR_API int sr_session_run(struct sr_session *session)
sr_err("%s: session was NULL", __func__); sr_err("%s: session was NULL", __func__);
return SR_ERR_ARG; return SR_ERR_ARG;
} }
if (!session->devs) { if (!session->running) {
/* TODO: Actually the case? */ sr_err("No session running.");
sr_err("%s: session->devs was NULL; a session " return SR_ERR;
"cannot be run without devices.", __func__);
return SR_ERR_ARG;
} }
if (session->main_loop) { if (session->main_loop) {
sr_err("Main loop already created."); sr_err("Main loop already created.");
return SR_ERR; return SR_ERR;
} }
if (g_hash_table_size(session->event_sources) == 0) {
sr_warn("No event sources, returning immediately.");
return SR_OK;
}
g_mutex_lock(&session->main_mutex); g_mutex_lock(&session->main_mutex);
if (!session->main_context) { if (!session->main_context) {
sr_err("Cannot run without main context."); sr_err("Cannot run without main context.");
g_mutex_unlock(&session->main_mutex); g_mutex_unlock(&session->main_mutex);
return SR_ERR; return SR_ERR;
} }
sr_info("Running.");
session->main_loop = g_main_loop_new(session->main_context, FALSE); session->main_loop = g_main_loop_new(session->main_context, FALSE);
g_mutex_unlock(&session->main_mutex); g_mutex_unlock(&session->main_mutex);
g_main_loop_run(session->main_loop); g_main_loop_run(session->main_loop);
@ -766,7 +887,6 @@ static gboolean session_stop_sync(void *user_data)
if (sdi->driver && sdi->driver->dev_acquisition_stop) if (sdi->driver && sdi->driver->dev_acquisition_stop)
sdi->driver->dev_acquisition_stop(sdi, sdi); sdi->driver->dev_acquisition_stop(sdi, sdi);
} }
session->running = FALSE;
return G_SOURCE_REMOVE; return G_SOURCE_REMOVE;
} }
@ -774,39 +894,107 @@ static gboolean session_stop_sync(void *user_data)
/** /**
* Stop a session. * Stop a session.
* *
* The session is stopped immediately, with all acquisition sessions being * This requests the drivers of each device participating in the session to
* stopped and hardware drivers cleaned up. * abort the acquisition as soon as possible. Even after this function returns,
* event processing still continues until all devices have actually stopped.
* *
* If the session is run in a separate thread, this function will not block * Use sr_session_stopped_callback_set() to receive notification when the event
* until the session is finished executing. It is the caller's responsibility * processing finished.
* to wait for the session thread to return before assuming that the session is *
* completely decommissioned. * This function is reentrant. That is, it may be called from a different
* thread than the one executing the session, as long as it can be ensured
* that the session object is valid.
*
* If the session is not running, sr_session_stop() silently does nothing.
* *
* @param session The session to use. Must not be NULL. * @param session The session to use. Must not be NULL.
* *
* @retval SR_OK Success. * @retval SR_OK Success.
* @retval SR_ERR_ARG Invalid session passed. * @retval SR_ERR_ARG Invalid session passed.
* @retval SR_ERR Other error.
* *
* @since 0.4.0 * @since 0.4.0
*/ */
SR_API int sr_session_stop(struct sr_session *session) SR_API int sr_session_stop(struct sr_session *session)
{ {
GMainContext *main_context;
if (!session) { if (!session) {
sr_err("%s: session was NULL", __func__); sr_err("%s: session was NULL", __func__);
return SR_ERR_ARG; return SR_ERR_ARG;
} }
g_mutex_lock(&session->main_mutex); g_mutex_lock(&session->main_mutex);
if (session->main_context) { main_context = (session->main_context)
g_main_context_invoke(session->main_context, ? g_main_context_ref(session->main_context)
&session_stop_sync, session); : NULL;
} else {
sr_err("No main context set; already stopped?");
}
g_mutex_unlock(&session->main_mutex); g_mutex_unlock(&session->main_mutex);
return unset_main_context(session); if (!main_context) {
sr_dbg("No main context set; already stopped?");
/* Not an error; as it would be racy. */
return SR_OK;
}
g_main_context_invoke(main_context, &session_stop_sync, session);
g_main_context_unref(main_context);
return SR_OK;
}
/**
* Return whether the session is currently running.
*
* Note that this function should be called from the same thread
* the session was started in.
*
* @param session The session to use. Must not be NULL.
*
* @retval TRUE Session is running.
* @retval FALSE Session is not running.
* @retval SR_ERR_ARG Invalid session passed.
*
* @since 0.4.0
*/
SR_API int sr_session_is_running(struct sr_session *session)
{
if (!session) {
sr_err("%s: session was NULL", __func__);
return SR_ERR_ARG;
}
return session->running;
}
/**
* Set the callback to be invoked after a session stopped running.
*
* Install a callback to receive notification when a session run stopped.
* This can be used to integrate session execution with an existing main
* loop, without having to block in sr_session_run().
*
* Note that the callback will be invoked in the context of the thread
* that calls sr_session_start().
*
* @param session The session to use. Must not be NULL.
* @param cb The callback to invoke on session stop. May be NULL to unset.
* @param cb_data User data pointer to be passed to the callback.
*
* @retval SR_OK Success.
* @retval SR_ERR_ARG Invalid session passed.
*
* @since 0.4.0
*/
SR_API int sr_session_stopped_callback_set(struct sr_session *session,
sr_session_stopped_callback cb, void *cb_data)
{
if (!session) {
sr_err("%s: session was NULL", __func__);
return SR_ERR_ARG;
}
session->stopped_callback = cb;
session->stopped_cb_data = cb_data;
return SR_OK;
} }
/** /**
@ -957,7 +1145,6 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
SR_PRIV int sr_session_source_add_internal(struct sr_session *session, SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
void *key, GSource *source) void *key, GSource *source)
{ {
int ret;
/* /*
* This must not ever happen, since the source has already been * This must not ever happen, since the source has already been
* created and its finalize() method will remove the key for the * created and its finalize() method will remove the key for the
@ -970,20 +1157,10 @@ SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
} }
g_hash_table_insert(session->event_sources, key, source); g_hash_table_insert(session->event_sources, key, source);
g_mutex_lock(&session->main_mutex); if (session_source_attach(session, source) == 0)
return SR_ERR;
if (session->main_context) { return SR_OK;
if (g_source_attach(source, session->main_context) > 0)
ret = SR_OK;
else
ret = SR_ERR;
} else {
sr_err("Cannot add event source without main context.");
ret = SR_ERR;
}
g_mutex_unlock(&session->main_mutex);
return ret;
} }
SR_PRIV int sr_session_fd_source_add(struct sr_session *session, SR_PRIV int sr_session_fd_source_add(struct sr_session *session,
@ -1197,6 +1374,7 @@ SR_API int sr_session_source_remove_channel(struct sr_session *session,
* *
* @retval SR_OK Success. * @retval SR_OK Success.
* @retval SR_ERR_BUG Event source for @a key does not match @a source. * @retval SR_ERR_BUG Event source for @a key does not match @a source.
* @retval SR_ERR Other error.
*/ */
SR_PRIV int sr_session_source_destroyed(struct sr_session *session, SR_PRIV int sr_session_source_destroyed(struct sr_session *session,
void *key, GSource *source) void *key, GSource *source)
@ -1218,18 +1396,15 @@ SR_PRIV int sr_session_source_destroyed(struct sr_session *session,
return SR_ERR_BUG; return SR_ERR_BUG;
} }
g_hash_table_remove(session->event_sources, key); g_hash_table_remove(session->event_sources, key);
/*
* Quit the main loop if we just removed the last event source. if (g_hash_table_size(session->event_sources) > 0)
* TODO: This may need an idle callback depending on when event return SR_OK;
* sources are finalized. (The issue is remove followed by add
* within the same main loop iteration.) /* If no event sources are left, consider the acquisition finished.
* This is pretty crude, as it requires all event sources to be
* registered via the libsigrok API.
*/ */
if (session->main_loop return stop_check_later(session);
&& g_hash_table_size(session->event_sources) == 0) {
sr_dbg("Stopping main loop...");
g_main_loop_quit(session->main_loop);
}
return SR_OK;
} }
static void copy_src(struct sr_config *src, struct sr_datafeed_meta *meta_copy) static void copy_src(struct sr_config *src, struct sr_datafeed_meta *meta_copy)