bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: feature request: gzip/bzip support for sort


From: Dan Hipschman
Subject: Re: feature request: gzip/bzip support for sort
Date: Sat, 13 Jan 2007 17:19:22 -0800
User-agent: Mutt/1.5.9i

On Sat, Jan 13, 2007 at 10:36:05PM +0100, Jim Meyering wrote:
> Craig Macdonald <address@hidden> wrote:
> > On some occasions, I have the need to sort extremely large files, but
> > which compress well using programs such as gzip or bzip.
> ...
> 
> This task has been on the TODO list for some time:
> 
>   sort: Compress temporary files when doing large external sort/merges.
>     This improves performance when you can compress/uncompress faster than
>     you can read/write, which is common in these days of fast CPUs.
>     suggestion from Charles Randall on 2001-08-10
> 
> Just waiting for someone to work on it.

OK, here's the patch.  I ran the coreutils' tests for sort on the
result built with and without -DHAVE_LZO1X_H, and for each of those
cases with and without the --compress-temps flag, and further for each
of those with and without -S 1k, with all tests passing in all of the
eight runs.  I also tested it on some larger files:

$ head -n 50000 book_1.01.pgn > foo.txt
$ ll -h foo.txt
-rw-r--r--  1 dsh dsh 1.7M 2007-01-13 16:46 foo.txt
$ diff <(sort foo.txt ) <(~/temp/sort foo.txt )
$ diff <(sort foo.txt ) <(~/temp/sort --compress-temps foo.txt )
$ diff <(sort foo.txt ) <(~/temp/sort -S 1k --compress-temps foo.txt )

It seems to be doing its job as evidenced by this test:

$ ll -h book_1.01.pgn
-r--r--r--  1 dsh dsh 82M 2006-01-05 21:02 book_1.01.pgn
$ sort -S 1M book_1.01.pgn >/dev/null & sleep 5 ; ll -h /tmp
total 1.9M
-rw-------  1 dsh dsh 615K 2007-01-13 16:52 sortbDBc6F
-rw-------  1 dsh dsh 619K 2007-01-13 16:52 sortOfovkJ
-rw-------  1 dsh dsh 597K 2007-01-13 16:52 sortVkG1qc

$ ~/temp/sort -S 1M --compress-temps book_1.01.pgn >/dev/null & sleep 5 ; ll -h 
/tmp
total 1.7M
-rw-------  1 dsh dsh 264K 2007-01-13 16:53 sort0TlO2R
-rw-------  1 dsh dsh 233K 2007-01-13 16:53 sortFJyThy
-rw-------  1 dsh dsh 265K 2007-01-13 16:53 sortqDXeox
-rw-------  1 dsh dsh 282K 2007-01-13 16:53 sortsaJy2c
-rw-------  1 dsh dsh 275K 2007-01-13 16:53 sortTATWST
-rw-------  1 dsh dsh 265K 2007-01-13 16:53 sortuLB3Zc

Notice that the temp files are smaller, but there are more of them.  I
wondered about this and it just turns out that the sort I built from cvs
is running faster than my system's sort for whatever reason:

$ ~/temp/sort -S 1M book_1.01.pgn >/dev/null & sleep 5 ; ll -h /tmp
total 4.3M
-rw-------  1 dsh dsh 615K 2007-01-13 16:58 sort7XHWgF
-rw-------  1 dsh dsh 612K 2007-01-13 16:58 sortjYzlh3
-rw-------  1 dsh dsh 619K 2007-01-13 16:58 sortpkOT3Z
-rw-------  1 dsh dsh 597K 2007-01-13 16:58 sortQSLNvP
-rw-------  1 dsh dsh 620K 2007-01-13 16:58 sortqZojlc
-rw-------  1 dsh dsh 613K 2007-01-13 16:58 sortuYd7kv
-rw-------  1 dsh dsh 614K 2007-01-13 16:58 sortvDI9Gl

I haven't done the autoconf stuff to check for lzo and haven't
updated the documentation yet since I'd like to hear if this patch
is acceptable before doing any of that stuff.

Thanks,
Dan


Index: sort.c
===================================================================
RCS file: /sources/coreutils/coreutils/src/sort.c,v
retrieving revision 1.344
diff -p -u -r1.344 sort.c
--- sort.c      13 Dec 2006 21:27:05 -0000      1.344
+++ sort.c      14 Jan 2007 00:38:01 -0000
@@ -50,6 +50,10 @@ struct rlimit { size_t rlim_cur; };
 # define getrlimit(Resource, Rlp) (-1)
 #endif
 
+#if HAVE_LZO1X_H
+# include <lzo1x.h>
+#endif
+
 /* The official name of this program (e.g., no `g' prefix).  */
 #define PROGRAM_NAME "sort"
 
@@ -261,6 +265,12 @@ static bool have_read_stdin;
 /* List of key field comparisons to be tried.  */
 static struct keyfield *keylist;
 
+/* Flag to compress temp files. */
+static bool compress_temps;
+
+/* The number of chars to allocate for compression buffers. */
+static size_t compress_buf_size = 16384;
+
 static void sortlines_temp (struct line *, size_t, struct line *);
 
 /* Report MESSAGE for FILE, then clean up and exit.
@@ -328,6 +338,7 @@ Other options:\n\
                               multiple options specify multiple directories\n\
   -u, --unique              with -c, check for strict ordering;\n\
                               without -c, output only the first of an equal 
run\n\
+      --compress-temps      compress temporary output to save space\n\
 "), DEFAULT_TMPDIR);
       fputs (_("\
   -z, --zero-terminated     end lines with 0 byte, not newline\n\
@@ -364,7 +375,8 @@ native byte values.\n\
    non-character as a pseudo short option, starting with CHAR_MAX + 1.  */
 enum
 {
-  RANDOM_SOURCE_OPTION = CHAR_MAX + 1
+  RANDOM_SOURCE_OPTION = CHAR_MAX + 1,
+  COMPRESS_TEMPS_OPTION
 };
 
 static char const short_options[] = "-bcdfgik:mMno:rRsS:t:T:uy:z";
@@ -373,6 +385,7 @@ static struct option const long_options[
 {
   {"ignore-leading-blanks", no_argument, NULL, 'b'},
   {"check", no_argument, NULL, 'c'},
+  {"compress-temps", no_argument, NULL, COMPRESS_TEMPS_OPTION},
   {"dictionary-order", no_argument, NULL, 'd'},
   {"ignore-case", no_argument, NULL, 'f'},
   {"general-numeric-sort", no_argument, NULL, 'g'},
@@ -399,10 +412,19 @@ static struct option const long_options[
 /* The set of signals that are caught.  */
 static sigset_t caught_signals;
 
+/* Buffer for compressing output to temp files. */
+struct compress_buf
+{
+  char *pos, *lim;
+  char buf[1];  /* Actual size is compression_buffer_size */
+};
+
 /* The list of temporary files. */
 struct tempnode
 {
   struct tempnode *volatile next;
+  struct compress_buf *compress_buf;
+  bool compress; /* Should we try to compress? */
   char name[1];  /* Actual size is 1 + file name length.  */
 };
 static struct tempnode *volatile temphead;
@@ -439,6 +461,8 @@ create_temp_file (FILE **pfp)
   memcpy (file, temp_dir, len);
   memcpy (file + len, slashbase, sizeof slashbase);
   node->next = NULL;
+  node->compress_buf = NULL;
+  node->compress = compress_temps;
   if (++temp_dir_index == temp_dir_count)
     temp_dir_index = 0;
 
@@ -521,6 +545,15 @@ write_bytes (const char *buf, size_t n_b
     die (_("write failed"), output_file);
 }
 
+static size_t
+read_bytes (void *buf, size_t n_bytes, FILE *fp, const char *input_file)
+{
+  size_t n = fread (buf, 1, n_bytes, fp);
+  if (ferror (fp))
+    die (_("read failed"), input_file);
+  return n;
+}
+
 /* Append DIR to the array of temporary directory names.  */
 static void
 add_temp_dir (char const *dir)
@@ -558,9 +591,237 @@ zaptemp (const char *name)
     error (0, unlink_errno, _("warning: cannot remove: %s"), name);
   if (! next)
     temptail = pnode;
+  free (node->compress_buf);
   free (node);
 }
 
+/* Find a temp file by name */
+
+static struct tempnode *
+find_temp (const char *tfname)
+{
+  struct tempnode *node = NULL;
+
+  if (temptail)
+    {
+      /* It's often the last one created. */
+      node = (struct tempnode *)
+        ((char *) temptail - offsetof (struct tempnode, next));
+
+      /* But not always. */
+      if (tfname != node->name)
+        {
+          node = temphead;
+          while (node && tfname != node->name)
+            node = node->next;
+        }
+    }
+
+  return node;
+}
+
+/* Get the buffer associated with this temp file or create it if necessary. */
+
+static struct compress_buf *
+get_compression_buffer (const char *tfname)
+{
+  struct compress_buf *cbuf;
+  struct tempnode *node;
+
+  if (!compress_temps || !(node = find_temp (tfname)))
+    return NULL;
+
+  if (node->compress && !node->compress_buf)
+    {
+      cbuf = malloc (offsetof (struct compress_buf, buf) + compress_buf_size);
+      if (!cbuf)
+        {
+          node->compress = false;
+          return NULL;
+        }
+      cbuf->pos = cbuf->buf;
+      cbuf->lim = cbuf->buf + compress_buf_size;
+      node->compress_buf = cbuf;
+    }
+
+  return node->compress_buf;
+}
+
+#if HAVE_LZO1X_H
+static lzo_bytep
+compress_out (void)
+{
+  static lzo_bytep out;
+  if (!out)
+    out = xmalloc (compress_buf_size + compress_buf_size / 16 + 64 + 3);
+  return out;
+}
+
+static lzo_bytep
+compress_wrkmem (void)
+{
+  static lzo_bytep wrkmem;
+  if (!wrkmem)
+    wrkmem = xmalloc (LZO1X_1_MEM_COMPRESS);
+  return wrkmem;
+}
+#endif
+
+/* Actually write the compressed data to disk.  */
+
+static void
+flush_compression (FILE *fp, const char *file, struct compress_buf *cbuf)
+{
+#if HAVE_LZO1X_H
+  lzo_bytep out = compress_out ();
+  lzo_bytep wrkmem = compress_wrkmem ();
+  lzo_uint in_len = (lzo_uint) (cbuf->pos - cbuf->buf);
+  lzo_uint out_len;
+
+  lzo1x_1_compress ((lzo_bytep) cbuf->buf, in_len, out, &out_len, wrkmem);
+  write_bytes ((char *) &in_len, sizeof in_len, fp, file);
+  write_bytes ((char *) &out_len, sizeof out_len, fp, file);
+  if (in_len <= out_len)
+    write_bytes (cbuf->buf, in_len, fp, file);
+  else
+    write_bytes ((char *) out, out_len, fp, file);
+
+  cbuf->pos = cbuf->buf;
+
+#endif
+}
+
+static void
+load_compression_buffer (FILE *fp, const char *fname,
+                         struct compress_buf *cbuf)
+{
+#if HAVE_LZO1X_H
+  lzo_bytep out = compress_out ();
+  lzo_uint in_len;
+  lzo_uint out_len;
+  size_t n;
+  bool eof;
+
+  n = read_bytes (&in_len, sizeof in_len, fp, fname);
+  if (feof (fp) && n == 0)
+    return;
+
+  eof = (n != sizeof in_len);
+  if (!eof)
+    eof = (read_bytes (&out_len, sizeof out_len, fp, fname) != sizeof out_len);
+
+  if (!eof)
+    {
+      cbuf->pos = cbuf->lim - in_len;
+      if (in_len <= out_len)
+        {
+          eof = (read_bytes (cbuf->pos, in_len, fp, fname) != in_len);
+        }
+      else
+        {
+          lzo_uint new_len;
+          eof = (read_bytes (out, out_len, fp, fname) != out_len);
+          if (!eof)
+            lzo1x_decompress (out, out_len,
+                              (lzo_bytep) cbuf->pos, &new_len, NULL);
+        }
+    }
+
+  if (eof)
+    die (_("premature EOF in temporary file"), fname);
+
+#endif
+}
+
+/* Saves input until the buffer is full.  Then it compresses the buffer
+   and writes them to disk.  */
+
+static void
+compress_and_write_bytes (const char *buf, size_t n_bytes,
+                          FILE *fp, const char *output_file,
+                          struct compress_buf *cbuf)
+{
+  const char *rest = buf;
+  size_t left = n_bytes;
+
+  if (!cbuf)
+    {
+      write_bytes (buf, n_bytes, fp, output_file);
+      return;
+    }
+
+  while (0 < left)
+    {
+      size_t nbuf = (size_t) (cbuf->lim - cbuf->pos);
+      size_t n2cp = MIN (left, nbuf);
+
+      memcpy (cbuf->pos, rest, n2cp);
+      rest += n2cp;
+      left -= n2cp;
+      cbuf->pos += n2cp;
+
+      if (cbuf->pos == cbuf->lim)
+        flush_compression (fp, output_file, cbuf);
+    }
+}
+
+/* Read up to NCHARS decompressed bytes from a file.
+   Returns the number of bytes read.  */
+
+static size_t
+read_and_decompress_bytes (void *buf, size_t nchars,
+                           FILE *fp, const char *fname,
+                           struct compress_buf *cbuf)
+{
+  char *rest = buf;
+  size_t left = nchars;
+
+  if (!cbuf)
+    {
+      return read_bytes (buf, nchars, fp, fname);
+    }
+
+  while (0 < left)
+    {
+      size_t nbuf;
+      size_t n2cp;
+
+      if (cbuf->pos == cbuf->lim)
+        {
+          if (feof (fp))
+            return nchars - left;
+          load_compression_buffer (fp, fname, cbuf);
+        }
+
+      nbuf = (size_t) (cbuf->lim - cbuf->pos);
+      n2cp = MIN (left, nbuf);
+
+      memcpy (rest, cbuf->pos, n2cp);
+      rest += n2cp;
+      left -= n2cp;
+      cbuf->pos += n2cp;
+    }
+
+  return nchars;
+}
+
+/* Flush anything still waiting for compression and xfclose the file. */
+
+static void
+flush_compression_and_xfclose (FILE *fp, char const *file,
+                               struct compress_buf *cbuf)
+{
+  if (cbuf)
+    {
+      flush_compression (fp, file, cbuf);
+
+      /* This is necessary so the buffer will be reloaded */
+      /* when the file is reopened. */
+      cbuf->pos = cbuf->lim;
+    }
+  xfclose (fp, file);
+}
+
 #if HAVE_NL_LANGINFO
 
 static int
@@ -980,10 +1241,13 @@ fillbuf (struct buffer *buf, FILE *fp, c
   char eol = eolchar;
   size_t line_bytes = buf->line_bytes;
   size_t mergesize = merge_buffer_size - MIN_MERGE_BUFFER_SIZE;
+  struct compress_buf *cbuf;
 
   if (buf->eof)
     return false;
 
+  cbuf = get_compression_buffer (file);
+
   if (buf->used != buf->left)
     {
       memmove (buf->buf, buf->buf + buf->used - buf->left, buf->left);
@@ -1007,23 +1271,19 @@ fillbuf (struct buffer *buf, FILE *fp, c
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */
          size_t readsize = (avail - 1) / (line_bytes + 1);
-         size_t bytes_read = fread (ptr, 1, readsize, fp);
+          size_t bytes_read =
+            read_and_decompress_bytes (ptr, readsize, fp, file, cbuf);
          char *ptrlim = ptr + bytes_read;
          char *p;
          avail -= bytes_read;
 
          if (bytes_read != readsize)
            {
-             if (ferror (fp))
-               die (_("read failed"), file);
-             if (feof (fp))
-               {
-                 buf->eof = true;
-                 if (buf->buf == ptrlim)
-                   return false;
-                 if (ptrlim[-1] != eol)
-                   *ptrlim++ = eol;
-               }
+              buf->eof = true;
+              if (buf->buf == ptrlim)
+                return false;
+              if (ptrlim[-1] != eol)
+                *ptrlim++ = eol;
            }
 
          /* Find and record each line in the just-read input.  */
@@ -1593,6 +1853,7 @@ mergefps (char **files, size_t ntemps, s
   struct buffer buffer[NMERGE];        /* Input buffers for each file. */
   struct line saved;           /* Saved line storage for unique check. */
   struct line const *savedline = NULL;
+  struct compress_buf *cbuf;
                                /* &saved if there is a saved line. */
   size_t savealloc = 0;                /* Size allocated for the saved line. */
   struct line const *cur[NMERGE]; /* Current line in each line table. */
@@ -1606,6 +1867,8 @@ mergefps (char **files, size_t ntemps, s
   struct keyfield const *key = keylist;
   saved.text = NULL;
 
+  cbuf = get_compression_buffer (output_file);
+
   /* Read initial lines from each input file. */
   for (i = 0; i < nfiles; )
     {
@@ -1659,7 +1922,8 @@ mergefps (char **files, size_t ntemps, s
          if (savedline && compare (savedline, smallest))
            {
              savedline = NULL;
-             write_bytes (saved.text, saved.length, ofp, output_file);
+             compress_and_write_bytes (saved.text, saved.length,
+                                        ofp, output_file, cbuf);
            }
          if (!savedline)
            {
@@ -1688,7 +1952,8 @@ mergefps (char **files, size_t ntemps, s
            }
        }
       else
-       write_bytes (smallest->text, smallest->length, ofp, output_file);
+       compress_and_write_bytes (smallest->text, smallest->length,
+                                  ofp, output_file, cbuf);
 
       /* Check if we need to read more lines into core. */
       if (base[ord[0]] < smallest)
@@ -1759,11 +2024,12 @@ mergefps (char **files, size_t ntemps, s
 
   if (unique && savedline)
     {
-      write_bytes (saved.text, saved.length, ofp, output_file);
+      compress_and_write_bytes (saved.text, saved.length,
+                                ofp, output_file, cbuf);
       free (saved.text);
     }
 
-  xfclose (ofp, output_file);
+  flush_compression_and_xfclose (ofp, output_file, cbuf);
 }
 
 /* Merge into T the two sorted arrays of lines LO (with NLO members)
@@ -2034,6 +2300,7 @@ sort (char * const *files, size_t nfiles
        {
          struct line *line;
          struct line *linebase;
+          struct compress_buf *cbuf;
 
          if (buf.eof && nfiles
              && (bytes_per_line + 1
@@ -2056,24 +2323,27 @@ sort (char * const *files, size_t nfiles
              tfp = xfopen (output_file, "w");
              temp_output = output_file;
              output_file_created = true;
+              cbuf = NULL;
            }
          else
            {
              ++ntemps;
              temp_output = create_temp_file (&tfp);
+              cbuf = get_compression_buffer (temp_output);
            }
 
          do
            {
              line--;
-             write_bytes (line->text, line->length, tfp, temp_output);
+              compress_and_write_bytes (line->text, line->length,
+                                        tfp, temp_output, cbuf);
              if (unique)
                while (linebase < line && compare (line, line - 1) == 0)
                  line--;
            }
          while (linebase < line);
 
-         xfclose (tfp, temp_output);
+          flush_compression_and_xfclose (tfp, temp_output, cbuf);
 
          if (output_file_created)
            goto finish;
@@ -2463,6 +2733,10 @@ main (int argc, char **argv)
          checkonly = true;
          break;
 
+        case COMPRESS_TEMPS_OPTION:
+          compress_temps = true;
+          break;
+
        case 'k':
          key = key_init (&key_buf);
 
@@ -2607,6 +2881,12 @@ main (int argc, char **argv)
        }
     }
 
+#if HAVE_LZO1X_H
+  if (compress_temps)
+    if (lzo_init () != LZO_E_OK)
+      compress_temps = false;   /* Guess again. */
+#endif
+
   /* Inheritance of global options to individual keys. */
   for (key = keylist; key; key = key->next)
     {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]