From cd59e6eca13632f8fc749a63fb8ccbadbb77e21b Mon Sep 17 00:00:00 2001 From: Gerhard Sittig Date: Thu, 8 Jun 2017 20:13:31 +0200 Subject: [PATCH] input/csv: Send larger datafeed chunks, to speedup import The previous implementation sent one sigrok session datafeed packet per processed CSV line. This is rather inefficient for the CSV input module, and triggers a dramatic performance loss in the srzip output format. Communicate up to 128K samples within one datafeed packet. This fixes bug #695. Factor out repeated calculation of the unit size which is derived from the channel count. Fix a minor memory leak in an error path while we are here. (Other memory leaks in rare error paths remain with this commit.) Suggested-By: Elias Oenal --- src/input/csv.c | 93 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 27 deletions(-) diff --git a/src/input/csv.c b/src/input/csv.c index 1d230fdc..a9e5f91f 100644 --- a/src/input/csv.c +++ b/src/input/csv.c @@ -26,6 +26,8 @@ #define LOG_PREFIX "input/csv" +#define DATAFEED_MAX_SAMPLES (128 * 1024) + /* * The CSV input module has the following options: * @@ -156,11 +158,12 @@ struct context { /* Format sample data is stored in single column mode. */ int format; - /* Size of the sample buffer. */ - size_t sample_buffer_size; + size_t sample_unit_size; /**!< Byte count for a single sample. */ + uint8_t *sample_buffer; /**!< Buffer for a single sample. */ - /* Buffer to store sample data. */ - uint8_t *sample_buffer; + uint8_t *datafeed_buffer; /**!< Queue for datafeed submission. */ + size_t datafeed_buf_size; + size_t datafeed_buf_fill; /* Current line number. */ size_t line_number; @@ -190,7 +193,7 @@ static int parse_binstr(const char *str, struct context *inc) } /* Clear buffer in order to set bits only. */ - memset(inc->sample_buffer, 0, (inc->num_channels + 7) >> 3); + memset(inc->sample_buffer, 0, inc->sample_unit_size); i = inc->first_channel; @@ -222,7 +225,7 @@ static int parse_hexstr(const char *str, struct context *inc) } /* Clear buffer in order to set bits only. */ - memset(inc->sample_buffer, 0, (inc->num_channels + 7) >> 3); + memset(inc->sample_buffer, 0, inc->sample_unit_size); /* Calculate the position of the first hexadecimal digit. */ i = inc->first_channel / 4; @@ -266,7 +269,7 @@ static int parse_octstr(const char *str, struct context *inc) } /* Clear buffer in order to set bits only. */ - memset(inc->sample_buffer, 0, (inc->num_channels + 7) >> 3); + memset(inc->sample_buffer, 0, inc->sample_unit_size); /* Calculate the position of the first octal digit. */ i = inc->first_channel / 3; @@ -349,7 +352,7 @@ static int parse_multi_columns(char **columns, struct context *inc) char *column; /* Clear buffer in order to set bits only. */ - memset(inc->sample_buffer, 0, (inc->num_channels + 7) >> 3); + memset(inc->sample_buffer, 0, inc->sample_unit_size); for (i = 0; i < inc->num_channels; i++) { column = columns[i]; @@ -391,26 +394,47 @@ static int parse_single_column(const char *column, struct context *inc) return res; } -static int send_samples(const struct sr_dev_inst *sdi, uint8_t *buffer, - gsize buffer_size, gsize count) +static int flush_samples(const struct sr_input *in) { + struct context *inc; struct sr_datafeed_packet packet; struct sr_datafeed_logic logic; - int res; - gsize i; + int rc; + inc = in->priv; + if (!inc->datafeed_buf_fill) + return SR_OK; + + memset(&packet, 0, sizeof(packet)); + memset(&logic, 0, sizeof(logic)); packet.type = SR_DF_LOGIC; packet.payload = &logic; - logic.unitsize = buffer_size; - logic.length = buffer_size; - logic.data = buffer; + logic.unitsize = inc->sample_unit_size; + logic.length = inc->datafeed_buf_fill; + logic.data = inc->datafeed_buffer; - for (i = 0; i < count; i++) { - res = sr_session_send(sdi, &packet); - if (res != SR_OK) - return res; + rc = sr_session_send(in->sdi, &packet); + if (rc != SR_OK) + return rc; + + inc->datafeed_buf_fill = 0; + return SR_OK; +} + +static int queue_samples(const struct sr_input *in) +{ + struct context *inc; + int rc; + + inc = in->priv; + + inc->datafeed_buf_fill += inc->sample_unit_size; + if (inc->datafeed_buf_fill == inc->datafeed_buf_size) { + rc = flush_samples(in); + if (rc != SR_OK) + return rc; } - + inc->sample_buffer = &inc->datafeed_buffer[inc->datafeed_buf_fill]; return SR_OK; } @@ -593,11 +617,19 @@ static int initial_parse(const struct sr_input *in, GString *buf) g_string_free(channel_name, TRUE); /* - * Calculate the minimum buffer size to store the sample data of the - * channels. + * Calculate the minimum buffer size to store the set of samples + * of all channels (unit size). Determine a larger buffer size + * for datafeed submission that is a multiple of the unit size. + * Allocate the larger buffer, and have the "sample buffer" point + * to a location within that large buffer. */ - inc->sample_buffer_size = (inc->num_channels + 7) >> 3; - inc->sample_buffer = g_malloc(inc->sample_buffer_size); + inc->sample_unit_size = (inc->num_channels + 7) / 8; + inc->datafeed_buf_size = DATAFEED_MAX_SAMPLES; + inc->datafeed_buf_size /= inc->sample_unit_size; + inc->datafeed_buf_size *= inc->sample_unit_size; + inc->datafeed_buffer = g_malloc(inc->datafeed_buf_size); + inc->datafeed_buf_fill = 0; + inc->sample_buffer = &inc->datafeed_buffer[inc->datafeed_buf_fill]; out: if (columns) @@ -783,12 +815,13 @@ static int process_buffer(struct sr_input *in, gboolean is_eof) } /* Send sample data to the session bus. */ - ret = send_samples(in->sdi, inc->sample_buffer, - inc->sample_buffer_size, 1); + ret = queue_samples(in); if (ret != SR_OK) { sr_err("Sending samples failed."); + g_strfreev(columns); return SR_ERR; } + g_strfreev(columns); } g_strfreev(lines); @@ -832,6 +865,12 @@ static int end(struct sr_input *in) ret = process_buffer(in, TRUE); else ret = SR_OK; + if (ret != SR_OK) + return ret; + + ret = flush_samples(in); + if (ret != SR_OK) + return ret; inc = in->priv; if (inc->started) @@ -853,7 +892,7 @@ static void cleanup(struct sr_input *in) g_string_free(inc->comment, TRUE); g_free(inc->termination); - g_free(inc->sample_buffer); + g_free(inc->datafeed_buffer); } static int reset(struct sr_input *in)