input: Add sr_input_end().

This signifies to the module instance no more input will come. This
will cause the module to process any data it may have buffered. The
SR_DF_END packet will also typically be sent at this time.
This commit is contained in:
Bert Vermeulen 2014-09-23 11:12:33 +02:00
parent 89da5b3b54
commit 7066fd4660
9 changed files with 210 additions and 106 deletions

View File

@ -137,6 +137,7 @@ SR_API int sr_input_scan_buffer(GString *buf, const struct sr_input **in);
SR_API int sr_input_scan_file(const char *filename, const struct sr_input **in);
SR_API struct sr_dev_inst *sr_input_dev_inst_get(const struct sr_input *in);
SR_API int sr_input_send(const struct sr_input *in, GString *buf);
SR_API int sr_input_end(const struct sr_input *in);
SR_API int sr_input_free(const struct sr_input *in);
/*--- output/output.c -------------------------------------------------------*/

View File

@ -246,6 +246,10 @@ static int sanity_check_all_input_modules(void)
sr_err("No receive in module %d ('%s').", i, d);
errors++;
}
if (!inputs[i]->end) {
sr_err("No end in module %d ('%s').", i, d);
errors++;
}
if (errors == 0)
continue;

View File

@ -64,7 +64,7 @@ static int init(struct sr_input *in, GHashTable *options)
return SR_OK;
}
static int receive(struct sr_input *in, GString *buf)
static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -72,20 +72,9 @@ static int receive(struct sr_input *in, GString *buf)
struct sr_config *src;
struct context *inc;
gsize chunk_size, i;
int chunk, num_channels;
int chunk;
inc = in->priv;
g_string_append_len(in->buf, buf->str, buf->len);
num_channels = g_slist_length(in->sdi->channels);
if (!in->sdi_ready) {
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
@ -103,7 +92,7 @@ static int receive(struct sr_input *in, GString *buf)
packet.type = SR_DF_LOGIC;
packet.payload = &logic;
logic.unitsize = (num_channels + 7) / 8;
logic.unitsize = (g_slist_length(in->sdi->channels) + 7) / 8;
/* Cut off at multiple of unitsize. */
chunk_size = in->buf->len / logic.unitsize * logic.unitsize;
@ -120,21 +109,41 @@ static int receive(struct sr_input *in, GString *buf)
return SR_OK;
}
static int cleanup(struct sr_input *in)
static int receive(struct sr_input *in, GString *buf)
{
int ret;
g_string_append_len(in->buf, buf->str, buf->len);
if (!in->sdi_ready) {
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
ret = process_buffer(in);
return ret;
}
static int end(struct sr_input *in)
{
struct context *inc;
struct sr_datafeed_packet packet;
int ret;
if (in->sdi_ready)
ret = process_buffer(in);
else
ret = SR_OK;
inc = in->priv;
if (!inc)
return SR_OK;
if (inc->started) {
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
return SR_OK;
return ret;
}
static struct sr_option options[] = {
@ -160,5 +169,5 @@ SR_PRIV struct sr_input_module input_binary = {
.options = get_options,
.init = init,
.receive = receive,
.cleanup = cleanup,
.end = end,
};

View File

@ -76,7 +76,7 @@ static int init(struct sr_input *in, GHashTable *options)
return SR_OK;
}
static int receive(struct sr_input *in, GString *buf)
static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -84,20 +84,9 @@ static int receive(struct sr_input *in, GString *buf)
struct sr_config *src;
struct context *inc;
gsize chunk_size, i;
int chunk, num_channels;
int chunk;
inc = in->priv;
g_string_append_len(in->buf, buf->str, buf->len);
num_channels = g_slist_length(in->sdi->channels);
if (!in->sdi_ready) {
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
@ -115,7 +104,7 @@ static int receive(struct sr_input *in, GString *buf)
packet.type = SR_DF_LOGIC;
packet.payload = &logic;
logic.unitsize = (num_channels + 7) / 8;
logic.unitsize = (g_slist_length(in->sdi->channels) + 7) / 8;
/* Cut off at multiple of unitsize. */
chunk_size = in->buf->len / logic.unitsize * logic.unitsize;
@ -132,21 +121,41 @@ static int receive(struct sr_input *in, GString *buf)
return SR_OK;
}
static int cleanup(struct sr_input *in)
static int receive(struct sr_input *in, GString *buf)
{
int ret;
g_string_append_len(in->buf, buf->str, buf->len);
if (!in->sdi_ready) {
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
ret = process_buffer(in);
return ret;
}
static int end(struct sr_input *in)
{
struct context *inc;
struct sr_datafeed_packet packet;
int ret;
if (in->sdi_ready)
ret = process_buffer(in);
else
ret = SR_OK;
inc = in->priv;
if (!inc)
return SR_OK;
if (inc->started) {
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
return SR_OK;
return ret;
}
static struct sr_option options[] = {
@ -174,5 +183,5 @@ SR_PRIV struct sr_input_module input_chronovu_la8 = {
.format_match = format_match,
.init = init,
.receive = receive,
.cleanup = cleanup,
.end = end,
};

View File

@ -610,7 +610,7 @@ static int initial_receive(const struct sr_input *in)
return ret;
}
static int receive(struct sr_input *in, GString *buf)
static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -621,21 +621,7 @@ static int receive(struct sr_input *in, GString *buf)
int max_columns, ret, l;
char *p, **lines, **columns;
g_string_append_len(in->buf, buf->str, buf->len);
inc = in->priv;
if (!inc->termination) {
if ((ret = initial_receive(in)) == SR_ERR_NA)
/* Not enough data yet. */
return SR_OK;
else if (ret != SR_OK)
return SR_ERR;
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
@ -723,24 +709,61 @@ static int receive(struct sr_input *in, GString *buf)
g_strfreev(lines);
g_string_erase(in->buf, 0, p - in->buf->str + 1);
return SR_OK;
return ret;
}
static int cleanup(struct sr_input *in)
static int receive(struct sr_input *in, GString *buf)
{
struct context *inc;
int ret;
g_string_append_len(in->buf, buf->str, buf->len);
inc = in->priv;
if (!inc->termination) {
if ((ret = initial_receive(in)) == SR_ERR_NA)
/* Not enough data yet. */
return SR_OK;
else if (ret != SR_OK)
return SR_ERR;
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
ret = process_buffer(in);
return ret;
}
static int end(struct sr_input *in)
{
struct context *inc;
struct sr_datafeed_packet packet;
int ret;
if (in->sdi_ready)
ret = process_buffer(in);
else
ret = SR_OK;
inc = in->priv;
if (!inc)
return SR_OK;
if (inc->started) {
/* End of stream. */
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
return ret;
}
static int cleanup(struct sr_input *in)
{
struct context *inc;
inc = in->priv;
if (inc->delimiter)
g_string_free(inc->delimiter, TRUE);
@ -795,5 +818,6 @@ SR_PRIV struct sr_input_module input_csv = {
.format_match = format_match,
.init = init,
.receive = receive,
.end = end,
.cleanup = cleanup,
};

View File

@ -529,6 +529,20 @@ SR_API int sr_input_send(const struct sr_input *in, GString *buf)
return in->module->receive((struct sr_input *)in, buf);
}
/**
* Signal the input module no more data will come.
*
* This will cause the module to process any data it may have buffered.
* The SR_DF_END packet will also typically be sent at this time.
*
* @since 0.4.0
*/
SR_API int sr_input_end(const struct sr_input *in)
{
sr_spew("Calling end() on %s module.", in->module->id);
return in->module->end((struct sr_input *)in);
}
/**
* Free the specified input instance and all associated resources.
*

View File

@ -457,7 +457,7 @@ static gboolean have_header(GString *buf)
return FALSE;
}
static int receive(struct sr_input *in, GString *buf)
static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
@ -466,21 +466,7 @@ static int receive(struct sr_input *in, GString *buf)
uint64_t samplerate;
char *p;
g_string_append_len(in->buf, buf->str, buf->len);
inc = in->priv;
if (!inc->got_header) {
if (!have_header(in->buf))
return SR_OK;
if (!parse_header(in, in->buf) != SR_OK)
/* There was a header in there, but it was malformed. */
return SR_ERR;
in->sdi_ready = TRUE;
/* sdi is ready, notify frontend. */
return SR_OK;
}
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
@ -506,18 +492,56 @@ static int receive(struct sr_input *in, GString *buf)
return SR_OK;
}
static int cleanup(struct sr_input *in)
static int receive(struct sr_input *in, GString *buf)
{
struct context *inc;
int ret;
g_string_append_len(in->buf, buf->str, buf->len);
inc = in->priv;
if (!inc->got_header) {
if (!have_header(in->buf))
return SR_OK;
if (!parse_header(in, in->buf) != SR_OK)
/* There was a header in there, but it was malformed. */
return SR_ERR;
in->sdi_ready = TRUE;
/* sdi is ready, notify frontend. */
return SR_OK;
}
ret = process_buffer(in);
return ret;
}
static int end(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct context *inc;
int ret;
if (in->sdi_ready)
ret = process_buffer(in);
else
ret = SR_OK;
inc = in->priv;
if (inc->started) {
/* End of stream. */
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
return ret;
}
static int cleanup(struct sr_input *in)
{
struct context *inc;
inc = in->priv;
g_slist_free_full(inc->channels, free_channel);
return SR_OK;
@ -552,5 +576,6 @@ SR_PRIV struct sr_input_module input_vcd = {
.format_match = format_match,
.init = init,
.receive = receive,
.end = end,
.cleanup = cleanup,
};

View File

@ -235,40 +235,18 @@ static void send_chunk(const struct sr_input *in, int offset, int num_samples)
sr_session_send(in->sdi, &packet);
}
static int receive(struct sr_input *in, GString *buf)
static int process_buffer(struct sr_input *in)
{
struct context *inc;
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
struct sr_channel *ch;
struct sr_config *src;
struct context *inc;
int offset, chunk_samples, total_samples, processed, max_chunk_samples;
int num_samples, ret, i;
int num_samples, i;
char channelname[8];
g_string_append_len(in->buf, buf->str, buf->len);
if (in->buf->len < MIN_DATA_CHUNK_OFFSET) {
/*
* Don't even try until there's enough room
* for the data segment to start.
*/
return SR_OK;
}
inc = in->priv;
if (!in->sdi_ready) {
if ((ret = parse_wav_header(in->buf, inc)) == SR_ERR_NA)
/* Not enough data yet. */
return SR_OK;
else if (ret != SR_OK)
return ret;
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
if (!inc->started) {
for (i = 0; i < inc->num_channels; i++) {
snprintf(channelname, 8, "CH%d", i + 1);
@ -330,19 +308,57 @@ static int receive(struct sr_input *in, GString *buf)
return SR_OK;
}
static int cleanup(struct sr_input *in)
static int receive(struct sr_input *in, GString *buf)
{
struct context *inc;
int ret;
g_string_append_len(in->buf, buf->str, buf->len);
if (in->buf->len < MIN_DATA_CHUNK_OFFSET) {
/*
* Don't even try until there's enough room
* for the data segment to start.
*/
return SR_OK;
}
inc = in->priv;
if (!in->sdi_ready) {
if ((ret = parse_wav_header(in->buf, inc)) == SR_ERR_NA)
/* Not enough data yet. */
return SR_OK;
else if (ret != SR_OK)
return ret;
/* sdi is ready, notify frontend. */
in->sdi_ready = TRUE;
return SR_OK;
}
ret = process_buffer(in);
return SR_OK;
}
static int end(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct context *inc;
int ret;
if (in->sdi_ready)
ret = process_buffer(in);
else
ret = SR_OK;
inc = in->priv;
if (inc->started) {
/* End of stream. */
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
return SR_OK;
return ret;
}
SR_PRIV struct sr_input_module input_wav = {
@ -353,6 +369,6 @@ SR_PRIV struct sr_input_module input_wav = {
.format_match = format_match,
.init = init,
.receive = receive,
.cleanup = cleanup,
.end = end,
};

View File

@ -309,6 +309,8 @@ struct sr_input_module {
*/
int (*receive) (struct sr_input *in, GString *buf);
int (*end) (struct sr_input *in);
/**
* This function is called after the caller is finished using
* the input module, and can be used to free any internal