>From b64d5063e26c0f3485d8342a2d5501f655f1063e Mon Sep 17 00:00:00 2001 From: Assaf Gordon Date: Wed, 6 Mar 2013 18:25:49 -0500 Subject: [PATCH] shuf: use reservoir-sampling when possible * src/shuf.c: Use reservoir-sampling when the number of output lines is known (by using '-n X' parameter). read_input_reservoir_sampling() - read lines from input file, and keep only K lines in memory, replacing lines with decreasing probability. prepare_shuf_lines() - convert reservoir lines to a usable structure. main() - if the number of lines is known, use reservoir-sampling instead of reading entire input file. --- src/shuf.c | 171 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 167 insertions(+), 4 deletions(-) diff --git a/src/shuf.c b/src/shuf.c index 71ac3e6..27982e5 100644 --- a/src/shuf.c +++ b/src/shuf.c @@ -25,6 +25,7 @@ #include "error.h" #include "fadvise.h" #include "getopt.h" +#include "linebuffer.h" #include "quote.h" #include "quotearg.h" #include "randint.h" @@ -81,7 +82,8 @@ With no FILE, or when FILE is -, read standard input.\n\ non-character as a pseudo short option, starting with CHAR_MAX + 1. */ enum { - RANDOM_SOURCE_OPTION = CHAR_MAX + 1 + RANDOM_SOURCE_OPTION = CHAR_MAX + 1, + DEV_DEBUG_OPTION }; static struct option const long_opts[] = @@ -92,11 +94,31 @@ static struct option const long_opts[] = {"output", required_argument, NULL, 'o'}, {"random-source", required_argument, NULL, RANDOM_SOURCE_OPTION}, {"zero-terminated", no_argument, NULL, 'z'}, + {"-debug", no_argument, NULL, DEV_DEBUG_OPTION}, {GETOPT_HELP_OPTION_DECL}, {GETOPT_VERSION_OPTION_DECL}, {0, 0, 0, 0}, }; +/* debugging for developers. Enables devmsg(). */ +static bool dev_debug = false; + +/* Like error(0, 0, ...), but without an implicit newline. + Also a noop unless the global DEV_DEBUG is set. + TODO: Replace with variadic macro in system.h or + move to a separate module. */ +static inline void +devmsg (char const *fmt, ...) +{ + if (dev_debug) + { + va_list ap; + va_start (ap, fmt); + vfprintf (stderr, fmt, ap); + va_end (ap); + } +} + static bool input_numbers_option_used (size_t lo_input, size_t hi_input) { @@ -135,6 +157,123 @@ next_line (char *line, char eolbyte, size_t n) return p + 1; } +/* Covnerts a 'struct linebuffer[]' into a shuf-compatible '**lines' buffer. + + The 'in_lines[]' is an array of n elements of 'struct linebuffer' . + + 'out_lines' will be initialized as so: + *outlines - points to an array of '*char' + (*outlines)[0] - points to the entire buffer (containing all lines). + (*outlines)[1] - points to the beginning of the second line + (inside the buffer of (*outlines)[0] ). + (*outlines)[K-1] - points to the beginning of the last line + (inside the buffer of (*outlines)[0] ). + (*outlines)[K] - points to the position one byte past the end + of the last line. + + All lines include the line-terminator character (either NUL or \n). + Length of strings should be decuded by subtracting pointers, not with strlen. + (see write_permuted_output() for the expected usage). + */ +static void +prepare_shuf_lines (struct linebuffer *in_lines, size_t n, char ***out_lines, + char eolbyte) +{ + size_t i; + size_t size = 0; + char* p; + char* buf; + + for (i = 0; i < n; ++i) + size += in_lines[i].length; + + p = buf = xmalloc (size); + *out_lines = xnmalloc (n+1, sizeof (**out_lines)); + for (i = 0; i < n; ++i) + { + (*out_lines)[i] = p; + memcpy (p, in_lines[i].buffer, in_lines[i].length); + p += in_lines[i].length; + } + (*out_lines)[i] = p; + + devmsg ("-- reservoir lines (begin)--\n"); + devmsg (buf); + devmsg ("-- reservoir lines (end)--\n"); +} + + +static size_t +read_input_reservoir_sampling (FILE *in, char eolbyte, char ***pline, size_t k, + struct randint_source *s) +{ + size_t i; + size_t n_lines=0; + struct linebuffer line; + struct linebuffer *rsrv = XCALLOC (k, struct linebuffer); /* init reservoir*/ + + devmsg ("--reservoir_sampling--\n"); + + initbuffer (&line); + while (readlinebuffer_delim (&line, in, eolbyte)!=NULL) + { + if ( n_lines < k ) + { + /* Read first K lines into reservoir */ + + if (dev_debug) + { + fprintf (stderr,"filling reservoir, input line %zu of %zu: '", + n_lines+1, k); + fwrite (line.buffer, sizeof (char), line.length-1, stderr); + fprintf (stderr, "'\n"); + } + + rsrv[n_lines] = line; + initbuffer (&line); /* next line-read will allocate a new buffer */ + + + } + else + { + /* Read the rest of the lines, with decreasing probability of updating + the reservoir */ + randint j = randint_choose (s, n_lines+1); + if ( j < k ) + { + if (dev_debug) + { + fprintf (stderr,"Replacing reservoir sample %zu with " \ + "line %zu '", j, n_lines); + fwrite (line.buffer, sizeof (char), line.length-1, stderr); + fprintf (stderr, "'\n"); + } + + rsrv[j] = line; + initbuffer (&line);/* next line-read will allocate a new buffer */ + + } + } + + ++n_lines; + } + freebuffer(&line); + + /* no more input lines, or an input error */ + if (ferror (in)) + error (EXIT_FAILURE, errno, _("read error")); + + /* Convert internal 'struct linebuffer' to shuf-compatible '**line' */ + prepare_shuf_lines (rsrv, MIN (k, n_lines), pline, eolbyte); + + /* free reservoir */ + for (i = 0; i < k; ++i) + freebuffer (&rsrv[i]); + free (rsrv); + + return MIN (k, n_lines); +} + /* Read data from file IN. Input lines are delimited by EOLBYTE; silently append a trailing EOLBYTE if the file ends in some other byte. Store a pointer to the resulting array of lines into *PLINE. @@ -209,12 +348,13 @@ main (int argc, char **argv) char *random_source = NULL; char eolbyte = '\n'; char **input_lines = NULL; + bool use_reservoir_sampling = false; int optc; int n_operands; char **operand; size_t n_lines; - char **line; + char **line = NULL; struct randint_source *randint_source; size_t *permutation; @@ -295,6 +435,10 @@ main (int argc, char **argv) eolbyte = '\0'; break; + case DEV_DEBUG_OPTION: + dev_debug = true; + break; + case_GETOPT_HELP_CHAR; case_GETOPT_VERSION_CHAR (PROGRAM_NAME, AUTHORS); default: @@ -341,8 +485,16 @@ main (int argc, char **argv) fadvise (stdin, FADVISE_SEQUENTIAL); - n_lines = read_input (stdin, eolbyte, &input_lines); - line = input_lines; + if (head_lines != SIZE_MAX) + { + use_reservoir_sampling = true; + n_lines = SIZE_MAX; /* unknown number of input lines, for now */ + } + else + { + n_lines = read_input (stdin, eolbyte, &input_lines); + line = input_lines; + } } head_lines = MIN (head_lines, n_lines); @@ -352,6 +504,17 @@ main (int argc, char **argv) if (! randint_source) error (EXIT_FAILURE, errno, "%s", quotearg_colon (random_source)); + if (use_reservoir_sampling) + { + /* Instead of reading the entire file into 'line', + use reservoir-sampling to store just "head_lines" random lines. */ + n_lines = read_input_reservoir_sampling (stdin, eolbyte, + &input_lines, head_lines, + randint_source); + line = input_lines; + head_lines = MIN (head_lines, n_lines); + } + /* Close stdin now, rather than earlier, so that randint_all_new doesn't have to worry about opening something other than stdin. */ -- 1.7.7.4