input: Add sdi_ready flag to struct sr_input.

When an input module instance has received enough input to fully
populate the struct sr_dev_inst, sdi_ready is set to TRUE and its
receive() method returns immediately. Any remaining received data
is buffered until the next time the function is called.
This commit is contained in:
Bert Vermeulen 2014-09-22 15:22:41 +02:00
parent 66c91e2509
commit d018181331
7 changed files with 138 additions and 86 deletions

View File

@ -64,7 +64,7 @@ static int init(struct sr_input *in, GHashTable *options)
return SR_OK;
}
static int receive(const struct sr_input *in, GString *buf)
static int receive(struct sr_input *in, GString *buf)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -80,16 +80,25 @@ static int receive(const struct sr_input *in, GString *buf)
num_channels = g_slist_length(in->sdi->channels);
if (!in->sdi_ready) {
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
inc->started = TRUE;
packet.type = SR_DF_META;
packet.payload = &meta;
src = sr_config_new(SR_CONF_SAMPLERATE, g_variant_new_uint64(inc->samplerate));
meta.config = g_slist_append(NULL, src);
sr_session_send(in->sdi, &packet);
sr_config_free(src);
if (inc->samplerate) {
packet.type = SR_DF_META;
packet.payload = &meta;
src = sr_config_new(SR_CONF_SAMPLERATE, g_variant_new_uint64(inc->samplerate));
meta.config = g_slist_append(NULL, src);
sr_session_send(in->sdi, &packet);
sr_config_free(src);
}
inc->started = TRUE;
}
packet.type = SR_DF_LOGIC;
@ -113,10 +122,12 @@ static int receive(const struct sr_input *in, GString *buf)
static int cleanup(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct context *inc;
struct sr_datafeed_packet packet;
inc = in->priv;
if (!inc)
return SR_OK;
if (inc->started) {
packet.type = SR_DF_END;

View File

@ -76,7 +76,7 @@ static int init(struct sr_input *in, GHashTable *options)
return SR_OK;
}
static int receive(const struct sr_input *in, GString *buf)
static int receive(struct sr_input *in, GString *buf)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -92,16 +92,25 @@ static int receive(const struct sr_input *in, GString *buf)
num_channels = g_slist_length(in->sdi->channels);
if (!in->sdi_ready) {
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
inc->started = TRUE;
packet.type = SR_DF_META;
packet.payload = &meta;
src = sr_config_new(SR_CONF_SAMPLERATE, g_variant_new_uint64(inc->samplerate));
meta.config = g_slist_append(NULL, src);
sr_session_send(in->sdi, &packet);
sr_config_free(src);
if (inc->samplerate) {
packet.type = SR_DF_META;
packet.payload = &meta;
src = sr_config_new(SR_CONF_SAMPLERATE, g_variant_new_uint64(inc->samplerate));
meta.config = g_slist_append(NULL, src);
sr_session_send(in->sdi, &packet);
sr_config_free(src);
}
inc->started = TRUE;
}
packet.type = SR_DF_LOGIC;
@ -125,10 +134,12 @@ static int receive(const struct sr_input *in, GString *buf)
static int cleanup(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct context *inc;
struct sr_datafeed_packet packet;
inc = in->priv;
if (!inc)
return SR_OK;
if (inc->started) {
packet.type = SR_DF_END;

View File

@ -589,11 +589,11 @@ static int initial_receive(const struct sr_input *in)
if (!(termination = get_line_termination(in->buf)))
/* Don't have a full line yet. */
return SR_OK_CONTINUE;
return SR_ERR_NA;
if (!(p = g_strrstr_len(in->buf->str, in->buf->len, termination)))
/* Don't have a full line yet. */
return SR_OK_CONTINUE;
return SR_ERR_NA;
len = p - in->buf->str - 1;
new_buf = g_string_new_len(in->buf->str, len);
g_string_append_c(new_buf, '\0');
@ -610,7 +610,7 @@ static int initial_receive(const struct sr_input *in)
return ret;
}
static int receive(const struct sr_input *in, GString *buf)
static int receive(struct sr_input *in, GString *buf)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -625,14 +625,18 @@ static int receive(const struct sr_input *in, GString *buf)
inc = in->priv;
if (!inc->termination) {
ret = initial_receive(in);
if (ret == SR_OK_CONTINUE)
if ((ret = initial_receive(in)) == SR_ERR_NA)
/* Not enough data yet. */
return SR_OK_CONTINUE;
return SR_OK;
else if (ret != SR_OK)
return SR_ERR;
inc->started = TRUE;
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
if (inc->samplerate) {
@ -644,6 +648,8 @@ static int receive(const struct sr_input *in, GString *buf)
sr_session_send(in->sdi, &packet);
sr_config_free(src);
}
inc->started = TRUE;
}
if (!(p = g_strrstr_len(in->buf->str, in->buf->len, inc->termination)))

View File

@ -495,11 +495,18 @@ SR_API int sr_input_scan_file(const char *filename, const struct sr_input **in)
* Return the input instance's (virtual) device instance. This can be
* used to find out the number of channels and other information.
*
* If the device instance has not yet been fully populated by the input
* module, NULL is returned. This indicates the module needs more data
* to identify the number of channels and so on.
*
* @since 0.4.0
*/
SR_API struct sr_dev_inst *sr_input_dev_inst_get(const struct sr_input *in)
{
return in->sdi;
if (in->sdi_ready)
return in->sdi;
else
return NULL;
}
/**
@ -508,12 +515,18 @@ SR_API struct sr_dev_inst *sr_input_dev_inst_get(const struct sr_input *in)
* When an input module instance is created with sr_input_new(), this
* function is used to feed data to the instance.
*
* As enough data gets fed into this function to completely populate
* the device instance associated with this input instance, this is
* guaranteed to return the moment it's ready. This gives the caller
* the chance to examine the device instance, attach session callbacks
* and on so.
*
* @since 0.4.0
*/
SR_API int sr_input_send(const struct sr_input *in, GString *buf)
{
sr_spew("Sending %d bytes to %s module.", buf->len, in->module->id);
return in->module->receive(in, buf);
return in->module->receive((struct sr_input *)in, buf);
}
/**
@ -533,6 +546,10 @@ SR_API int sr_input_free(const struct sr_input *in)
ret = in->module->cleanup((struct sr_input *)in);
if (in->sdi)
sr_dev_inst_free(in->sdi);
if (in->buf->len > 64) {
/* That seems more than just some sub-unitsize leftover... */
sr_warn("Found %d unprocessed bytes at free time.", in->buf->len);
}
if (in->buf)
g_string_free(in->buf, TRUE);
g_free((gpointer)in);

View File

@ -409,8 +409,6 @@ static int init(struct sr_input *in, GHashTable *options)
char name[16];
struct context *inc;
inc = g_malloc0(sizeof(struct context));
num_channels = g_variant_get_int32(g_hash_table_lookup(options, "numchannels"));
if (num_channels < 1) {
sr_err("Invalid value for numchannels: must be at least 1.");
@ -420,6 +418,7 @@ static int init(struct sr_input *in, GHashTable *options)
sr_err("No more than 64 channels supported.");
return SR_ERR_ARG;
}
inc = in->priv = g_malloc0(sizeof(struct context));
inc->maxchannels = num_channels;
inc->downsample = g_variant_get_int32(g_hash_table_lookup(options, "downsample"));
@ -458,7 +457,7 @@ static gboolean have_header(GString *buf)
return FALSE;
}
static int receive(const struct sr_input *in, GString *buf)
static int receive(struct sr_input *in, GString *buf)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -477,8 +476,13 @@ static int receive(const struct sr_input *in, GString *buf)
/* There was a header in there, but it was malformed. */
return SR_ERR;
in->sdi_ready = TRUE;
/* sdi is ready, notify frontend. */
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
inc->started = TRUE;
packet.type = SR_DF_META;
packet.payload = &meta;
@ -487,6 +491,8 @@ static int receive(const struct sr_input *in, GString *buf)
meta.config = g_slist_append(NULL, src);
sr_session_send(in->sdi, &packet);
sr_config_free(src);
inc->started = TRUE;
}
while ((p = g_strrstr_len(in->buf->str, in->buf->len, "\n"))) {

View File

@ -43,6 +43,7 @@
#define WAVE_FORMAT_EXTENSIBLE 0xfffe
struct context {
gboolean started;
int fmt_code;
uint64_t samplerate;
int samplesize;
@ -57,7 +58,7 @@ static int parse_wav_header(GString *buf, struct context *inc)
unsigned int fmt_code, samplesize, num_channels, unitsize;
if (buf->len < MIN_DATA_CHUNK_OFFSET)
return SR_OK_CONTINUE;
return SR_ERR_NA;
fmt_code = RL16(buf->str + 20);
samplerate = RL32(buf->str + 24);
@ -82,7 +83,7 @@ static int parse_wav_header(GString *buf, struct context *inc)
} else if (fmt_code == WAVE_FORMAT_EXTENSIBLE) {
if (buf->len < 70)
/* Not enough for extensible header and next chunk. */
return SR_OK_CONTINUE;
return SR_ERR_NA;
if (RL16(buf->str + 16) != 40) {
sr_err("WAV extensible format chunk must be 40 bytes.");
@ -150,6 +151,7 @@ static int init(struct sr_input *in, GHashTable *options)
(void)options;
in->sdi = sr_dev_inst_new(SR_ST_ACTIVE, NULL, NULL, NULL);
in->priv = g_malloc0(sizeof(struct context));
return SR_OK;
}
@ -177,42 +179,6 @@ static int find_data_chunk(GString *buf, int initial_offset)
return offset;
}
static int initial_receive(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
struct sr_channel *ch;
struct sr_config *src;
struct context *inc;
int ret, i;
char channelname[8];
if (!in->buf)
/* Shouldn't happen. */
return SR_ERR;
inc = in->priv = g_malloc(sizeof(struct context));
if ((ret = parse_wav_header(in->buf, inc)) != SR_OK)
return ret;
for (i = 0; i < inc->num_channels; i++) {
snprintf(channelname, 8, "CH%d", i + 1);
ch = sr_channel_new(i, SR_CHANNEL_ANALOG, TRUE, channelname);
in->sdi->channels = g_slist_append(in->sdi->channels, ch);
}
std_session_send_df_header(in->sdi, LOG_PREFIX);
packet.type = SR_DF_META;
packet.payload = &meta;
src = sr_config_new(SR_CONF_SAMPLERATE, g_variant_new_uint64(inc->samplerate));
meta.config = g_slist_append(NULL, src);
sr_session_send(in->sdi, &packet);
sr_config_free(src);
return SR_OK;
}
static void send_chunk(const struct sr_input *in, int offset, int num_samples)
{
struct sr_datafeed_packet packet;
@ -269,25 +235,58 @@ static void send_chunk(const struct sr_input *in, int offset, int num_samples)
sr_session_send(in->sdi, &packet);
}
static int receive(const struct sr_input *in, GString *buf)
static int receive(struct sr_input *in, GString *buf)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
struct sr_channel *ch;
struct sr_config *src;
struct context *inc;
int offset, chunk_samples, total_samples, processed, max_chunk_samples, num_samples, i;
int offset, chunk_samples, total_samples, processed, max_chunk_samples;
int num_samples, ret, i;
char channelname[8];
g_string_append_len(in->buf, buf->str, buf->len);
if (!in->priv) {
if (initial_receive((struct sr_input *)in) != SR_OK)
return SR_ERR;
if (in->buf->len < MIN_DATA_CHUNK_OFFSET) {
/*
* Don't even get started until there's enough room
* for the data segment to start.
*/
return SR_OK;
}
if (in->buf->len < MIN_DATA_CHUNK_OFFSET) {
/*
* Don't even try until there's enough room
* for the data segment to start.
*/
return SR_OK;
}
inc = in->priv;
if (!in->sdi_ready) {
if ((ret = parse_wav_header(in->buf, inc)) == SR_ERR_NA)
/* Not enough data yet. */
return SR_OK;
else if (ret != SR_OK)
return ret;
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
for (i = 0; i < inc->num_channels; i++) {
snprintf(channelname, 8, "CH%d", i + 1);
ch = sr_channel_new(i, SR_CHANNEL_ANALOG, TRUE, channelname);
in->sdi->channels = g_slist_append(in->sdi->channels, ch);
}
std_session_send_df_header(in->sdi, LOG_PREFIX);
packet.type = SR_DF_META;
packet.payload = &meta;
src = sr_config_new(SR_CONF_SAMPLERATE, g_variant_new_uint64(inc->samplerate));
meta.config = g_slist_append(NULL, src);
sr_session_send(in->sdi, &packet);
sr_config_free(src);
inc->started = TRUE;
}
if (!inc->found_data) {
/* Skip past size of 'fmt ' chunk. */
@ -334,15 +333,16 @@ static int receive(const struct sr_input *in, GString *buf)
static int cleanup(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct context *inc;
if (in->priv) {
inc = in->priv;
if (inc->started) {
/* End of stream. */
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
g_free(in->priv);
in->priv = NULL;
}
g_free(in->priv);
in->priv = NULL;
return SR_OK;
}

View File

@ -211,6 +211,7 @@ struct sr_input {
const struct sr_input_module *module;
GString *buf;
struct sr_dev_inst *sdi;
gboolean sdi_ready;
void *priv;
};
@ -306,7 +307,7 @@ struct sr_input_module {
* @retval SR_OK Success
* @retval other Negative error code.
*/
int (*receive) (const struct sr_input *in, GString *buf);
int (*receive) (struct sr_input *in, GString *buf);
/**
* This function is called after the caller is finished using