bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: feature request: gzip/bzip support for sort


From: Dan Hipschman
Subject: Re: feature request: gzip/bzip support for sort
Date: Sun, 21 Jan 2007 20:55:13 -0800
User-agent: Mutt/1.5.9i

I have a good feeling about this one :-)  I think I've addressed
everything except adding libz.  I just don't think I have the time to do
that right now, and I think it can wait.  I can look into writing some
test cases in a bit.

2007-01-21  Jim Meyering  <address@hidden>

        * src/sort.c (MAX_FORK_RETRIES_COMPRESS, MAX_FORK_RETRIES_DECOMPRESS):
        In pipe_fork callers, use these named constants, not "2" and "8".
        (proctab, nprocs): Declare to be "static".
        (pipe_fork) [lint]: Initialize local, pid,
        to avoid unwarranted may-be-used-uninitialized warning.
        (create_temp): Use the active voice.  Describe parameters, too.

2007-01-21  James Youngman  <address@hidden>

        Centralize all the uses of sigprocmask().  Don't restore an invalid
        saved mask.
        * src/sort.c (enter_cs, leave_cs): New functions for protecting
        code sequences against signal delivery.
        * (exit_cleanup): Use enter_cs and leave_cs instead of
        calling sigprocmask directly.
        (create_temp_file, pipe_fork, zaptemp): Likewise

2007-01-21  Dan Hipschman  <address@hidden>

        Add compression of temp files to sort.
        * NEWS: Mention this.
        * bootstrap.conf: Import findprog.
        * configure.ac: Add AC_FUNC_FORK.
        * doc/coreutils.texi: Document GNUSORT_COMPRESSOR environment
        variable.
        * src/sort.c (compress_program): New global, holds the name of the
        external compression program.
        (struct sortfile): New type used by mergepfs and friends instead
        of filenames to hold PIDs of compressor processes.
        (proctab): New global, holds compressor PIDs on which to wait.
        (enum procstate, struct procnode): New types used by proctab.
        (proctab_hasher, proctab_comparator): New functions for proctab.
        (nprocs): New global, number of forked but unreaped children.
        (reap, reap_some): New function, wait for/cleanup forked processes.
        (register_proc, update_proc, wait_proc): New functions for adding,
        modifying and removing proctab entries.
        (create_temp_file): Change parameter type to pointer to file
        descriptor, and return type to pointer to struct tempnode.
        (dup2_or_die): New function used in create_temp and open_temp.
        (pipe_fork): New function, creates a pipe and child process.
        (create_temp): Creates a temp file and possibly a compression
        program to which we filter output.
        (open_temp): Opens a compressed temp file and creates a
        decompression process through which to filter the input.
        (mergefps): Change FILES parameter type to struct sortfile array
        and update access accordingly.  Use open_temp and reap_some.
        (avoid_trashing_input, merge): Change FILES parameter like
        mergefps and call create_temp instead of create_temp_file.
        (sort): Call create_temp instead of create_temp_file.
        Use reap_some.
        (avoid_trashing_input, merge, sort, main): Adapt to mergefps.

Index: NEWS
===================================================================
RCS file: /sources/coreutils/coreutils/NEWS,v
retrieving revision 1.467
diff -p -u -r1.467 NEWS
--- NEWS        18 Jan 2007 09:18:21 -0000      1.467
+++ NEWS        22 Jan 2007 03:44:29 -0000
@@ -29,6 +29,11 @@ GNU coreutils NEWS                      
 
   "rm --interactive=never F" no longer prompts for an unwritable F
 
+** New features
+
+  sort can now compresses temporary files to improve performance of
+  very large sorts.
+
 
 * Noteworthy changes in release 6.7 (2006-12-08) [stable]
 
Index: bootstrap.conf
===================================================================
RCS file: /sources/coreutils/coreutils/bootstrap.conf,v
retrieving revision 1.15
diff -p -u -r1.15 bootstrap.conf
--- bootstrap.conf      15 Jan 2007 10:33:52 -0000      1.15
+++ bootstrap.conf      22 Jan 2007 03:44:29 -0000
@@ -43,7 +43,8 @@ gnulib_modules="
        config-h configmake
        closeout cycle-check d-ino d-type diacrit dirfd dirname dup2
        error euidaccess exclude exitfail fchdir fcntl fcntl-safer fdl
-       file-type fileblocks filemode filenamecat fnmatch-gnu fopen-safer
+       file-type fileblocks filemode filenamecat findprog fnmatch-gnu
+       fopen-safer
        fprintftime fsusage ftruncate fts getdate getgroups gethrxtime
        getline getloadavg getndelim2 getopt getpagesize getpass-gnu
        gettext gettime gettimeofday getugroups getusershell gnupload
Index: configure.ac
===================================================================
RCS file: /sources/coreutils/coreutils/configure.ac,v
retrieving revision 1.102
diff -p -u -r1.102 configure.ac
--- configure.ac        28 Dec 2006 08:18:17 -0000      1.102
+++ configure.ac        22 Jan 2007 03:44:29 -0000
@@ -39,6 +39,8 @@ gl_EARLY
 gl_INIT
 coreutils_MACROS
 
+AC_FUNC_FORK
+
 AC_CHECK_FUNCS(uname,
        OPTIONAL_BIN_PROGS="$OPTIONAL_BIN_PROGS uname\$(EXEEXT)"
        MAN="$MAN uname.1")
Index: doc/coreutils.texi
===================================================================
RCS file: /sources/coreutils/coreutils/doc/coreutils.texi,v
retrieving revision 1.363
diff -p -u -r1.363 coreutils.texi
--- doc/coreutils.texi  4 Jan 2007 11:04:46 -0000       1.363
+++ doc/coreutils.texi  22 Jan 2007 03:44:30 -0000
@@ -3411,6 +3411,19 @@ value as the directory for temporary fil
 @option{--temporary-directory} (@option{-T}) option in turn overrides
 the environment variable.
 
address@hidden GNUSORT_COMPRESSOR
+To improve performance when sorting very large files, GNU sort will,
+by default, try to compress temporary files with the program
address@hidden  The environment variable @env{GNUSORT_COMPRESSOR} can be
+set to the name of another program to be used.  The program specified
+must compress standard input to standard output when no arguments are
+given to it, and it must decompress standard input to standard output
+when the @option{-d} argument is given to it.  If the program exits
+with nonzero status, sort will terminate with an error.  To disable
+compression of temporary files, set the variable to the empty string.
+Whitespace and the backslash character should not appear in the
+program name.  They are reserved for future use.
+
 
 The following options affect the ordering of output lines.  They may be
 specified globally or as part of a specific key field.  If no key
Index: src/sort.c
===================================================================
RCS file: /sources/coreutils/coreutils/src/sort.c,v
retrieving revision 1.346
diff -p -u -r1.346 sort.c
--- src/sort.c  20 Jan 2007 09:36:45 -0000      1.346
+++ src/sort.c  22 Jan 2007 03:44:30 -0000
@@ -1,5 +1,5 @@
 /* sort - sort lines of text (with all kinds of options).
-   Copyright (C) 1988, 1991-2006 Free Software Foundation, Inc.
+   Copyright (C) 1988, 1991-2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -25,10 +25,13 @@
 
 #include <getopt.h>
 #include <sys/types.h>
+#include <sys/wait.h>
 #include <signal.h>
 #include "system.h"
 #include "error.h"
+#include "findprog.h"
 #include "hard-locale.h"
+#include "hash.h"
 #include "inttostr.h"
 #include "md5.h"
 #include "physmem.h"
@@ -63,7 +66,8 @@ struct rlimit { size_t rlim_cur; };
    present.  */
 #ifndef SA_NOCLDSTOP
 # define SA_NOCLDSTOP 0
-# define sigprocmask(How, Set, Oset) /* empty */
+/* No sigprocmask.  Always 'return' zero. */
+# define sigprocmask(How, Set, Oset) (0)
 # define sigset_t int
 # if ! HAVE_SIGINTERRUPT
 #  define siginterrupt(sig, flag) /* empty */
@@ -92,6 +96,20 @@ enum
     SORT_FAILURE = 2
   };
 
+enum
+  {
+    /* The number of times we should try to fork a compression process
+       (we retry if the fork call fails).  We don't _need_ to compress
+       temp files, this is just to reduce disk access, so this number
+       can be small.  */
+    MAX_FORK_TRIES_COMPRESS = 2,
+
+    /* The number of times we should try to fork a decompression process.
+       If we can't fork a decompression process, we can't sort, so this
+       number should be big.  */
+    MAX_FORK_TRIES_DECOMPRESS = 8
+  };
+
 /* The representation of the decimal point in the current locale.  */
 static int decimal_point;
 
@@ -261,6 +279,9 @@ static bool have_read_stdin;
 /* List of key field comparisons to be tried.  */
 static struct keyfield *keylist;
 
+/* Program used to (de)compress temp files.  Must accept -d.  */
+static const char *compress_program;
+
 static void sortlines_temp (struct line *, size_t, struct line *);
 
 /* Report MESSAGE for FILE, then clean up and exit.
@@ -399,15 +420,207 @@ static struct option const long_options[
 /* The set of signals that are caught.  */
 static sigset_t caught_signals;
 
+/* Critical section status.  */
+struct cs_status
+{
+  bool valid;
+  sigset_t sigs;
+};
+
+/* Enter a critical section.  */
+static struct cs_status
+cs_enter (void)
+{
+  struct cs_status status;
+  status.valid = (sigprocmask (SIG_BLOCK, &caught_signals, &status.sigs) == 0);
+  return status;
+}
+
+/* Leave a critical section.  */
+static void
+cs_leave (struct cs_status status)
+{
+  if (status.valid)
+    {
+      /* Ignore failure when restoring the signal mask. */
+      sigprocmask (SIG_SETMASK, &status.sigs, NULL);
+    }
+}
+
 /* The list of temporary files. */
 struct tempnode
 {
   struct tempnode *volatile next;
+  pid_t pid;     /* If compressed, the pid of compressor, else zero */
   char name[1];  /* Actual size is 1 + file name length.  */
 };
 static struct tempnode *volatile temphead;
 static struct tempnode *volatile *temptail = &temphead;
 
+struct sortfile
+{
+  char *name;
+  pid_t pid;     /* If compressed, the pid of compressor, else zero */
+};
+
+/* A table where we store compression process states.  We clean up all
+   processes in a timely manner so as not to exhaust system resources,
+   so we store the info on whether the process is still running, or has
+   been reaped here.  */
+static Hash_table *proctab;
+
+enum { INIT_PROCTAB_SIZE = 47 };
+
+enum procstate { ALIVE, ZOMBIE };
+
+/* A proctab entry.  The COUNT field is there in case we fork a new
+   compression process that has the same PID as an old zombie process
+   that is still in the table (because the process to decompress the
+   temp file it was associated with hasn't started yet).  */
+struct procnode
+{
+  pid_t pid;
+  enum procstate state;
+  size_t count;
+};
+
+static size_t
+proctab_hasher (const void *entry, size_t tabsize)
+{
+  const struct procnode *node = entry;
+  return node->pid % tabsize;
+}
+
+static bool
+proctab_comparator (const void *e1, const void *e2)
+{
+  const struct procnode *n1 = e1, *n2 = e2;
+  return n1->pid == n2->pid;
+}
+
+/* The total number of forked processes (compressors and decompressors)
+   that have not been reaped yet. */
+static size_t nprocs;
+
+/* The number of child processes we'll allow before we try to reap some. */
+enum { MAX_PROCS_BEFORE_REAP = 2 };
+
+/* If 0 < PID, wait for the child process with that PID to exit.
+   If PID is -1, clean up a random child process which has finished and
+   return the process ID of that child.  If PID is -1 and no processes
+   have quit yet, return 0 without waiting.  */
+
+static pid_t
+reap (pid_t pid)
+{
+  int status;
+  pid_t cpid = waitpid (pid, &status, pid < 0 ? WNOHANG : 0);
+
+  if (cpid < 0)
+    error (SORT_FAILURE, errno, _("waiting for %s [-d]"),
+           compress_program);
+  else if (0 < cpid)
+    {
+      if (! WIFEXITED (status) || WEXITSTATUS (status))
+       error (SORT_FAILURE, 0, _("%s [-d] terminated abnormally"),
+              compress_program);
+      --nprocs;
+    }
+
+  return cpid;
+}
+
+/* Add the PID of a running compression process to proctab, or update
+   the entry COUNT and STATE fields if it's already there.  This also
+   creates the table for us the first time it's called.  */
+
+static void
+register_proc (pid_t pid)
+{
+  struct procnode test, *node;
+
+  if (! proctab)
+    {
+      proctab = hash_initialize (INIT_PROCTAB_SIZE, NULL,
+                                proctab_hasher,
+                                proctab_comparator,
+                                free);
+      if (! proctab)
+       xalloc_die ();
+    }
+
+  test.pid = pid;
+  node = hash_lookup (proctab, &test);
+  if (node)
+    {
+      node->state = ALIVE;
+      ++node->count;
+    }
+  else
+    {
+      node = xmalloc (sizeof *node);
+      node->pid = pid;
+      node->state = ALIVE;
+      node->count = 1;
+      hash_insert (proctab, node);
+    }
+}
+
+/* This is called when we reap a random process.  We don't know
+   whether we have reaped a compression process or a decompression
+   process until we look in the table.  If there's an ALIVE entry for
+   it, then we have reaped a compression process, so change the state
+   to ZOMBIE.  Otherwise, it's a decompression processes, so ignore it.  */
+
+static void
+update_proc (pid_t pid)
+{
+  struct procnode test, *node;
+
+  test.pid = pid;
+  node = hash_lookup (proctab, &test);
+  if (node)
+    node->state = ZOMBIE;
+}
+
+/* This is for when we need to wait for a compression process to exit.
+   If it has a ZOMBIE entry in the table then it's already dead and has
+   been reaped.  Note that if there's an ALIVE entry for it, it still may
+   already have died and been reaped if a second process was created with
+   the same PID.  This is probably exceedingly rare, but to be on the safe
+   side we will have to wait for any compression process with this PID.  */
+
+static void
+wait_proc (pid_t pid)
+{
+  struct procnode test, *node;
+
+  test.pid = pid;
+  node = hash_lookup (proctab, &test);
+  if (node->state == ALIVE)
+    reap (pid);
+
+  node->state = ZOMBIE;
+  if (! --node->count)
+    {
+      hash_delete (proctab, node);
+      free (node);
+    }
+}
+
+/* Keep reaping finished children as long as there are more to reap.
+   This doesn't block waiting for any of them, it only reaps those
+   that are already dead.  */
+
+static void
+reap_some (void)
+{
+  pid_t pid;
+
+  while (0 < nprocs && (pid = reap (-1)))
+    update_proc (pid);
+}
+
 /* Clean up any remaining temporary files.  */
 
 static void
@@ -429,24 +642,22 @@ exit_cleanup (void)
     {
       /* Clean up any remaining temporary files in a critical section so
         that a signal handler does not try to clean them too.  */
-      sigset_t oldset;
-      sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+      struct cs_status cs = cs_enter ();
       cleanup ();
-      sigprocmask (SIG_SETMASK, &oldset, NULL);
+      cs_leave (cs);
     }
 
   close_stdout ();
 }
 
-/* Create a new temporary file, returning its newly allocated name.
-   Store into *PFP a stream open for writing.  */
+/* Create a new temporary file, returning its newly allocated tempnode.
+   Store into *PFD the file descriptor open for writing.  */
 
-static char *
-create_temp_file (FILE **pfp)
+static struct tempnode *
+create_temp_file (int *pfd)
 {
   static char const slashbase[] = "/sortXXXXXX";
   static size_t temp_dir_index;
-  sigset_t oldset;
   int fd;
   int saved_errno;
   char const *temp_dir = temp_dirs[temp_dir_index];
@@ -454,15 +665,17 @@ create_temp_file (FILE **pfp)
   struct tempnode *node =
     xmalloc (offsetof (struct tempnode, name) + len + sizeof slashbase);
   char *file = node->name;
+  struct cs_status cs;
 
   memcpy (file, temp_dir, len);
   memcpy (file + len, slashbase, sizeof slashbase);
   node->next = NULL;
+  node->pid = 0;
   if (++temp_dir_index == temp_dir_count)
     temp_dir_index = 0;
 
   /* Create the temporary file in a critical section, to avoid races.  */
-  sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+  cs = cs_enter ();
   fd = mkstemp (file);
   if (0 <= fd)
     {
@@ -470,13 +683,14 @@ create_temp_file (FILE **pfp)
       temptail = &node->next;
     }
   saved_errno = errno;
-  sigprocmask (SIG_SETMASK, &oldset, NULL);
+  cs_leave (cs);
   errno = saved_errno;
 
-  if (fd < 0 || (*pfp = fdopen (fd, "w")) == NULL)
+  if (fd < 0)
     die (_("cannot create temporary file"), file);
 
-  return file;
+  *pfd = fd;
+  return node;
 }
 
 /* Return a stream for FILE, opened with mode HOW.  A null FILE means
@@ -534,6 +748,197 @@ xfclose (FILE *fp, char const *file)
 }
 
 static void
+dup2_or_die (int oldfd, int newfd)
+{
+  if (dup2 (oldfd, newfd) < 0)
+    error (SORT_FAILURE, errno, _("dup2 failed"));
+}
+
+/* Fork a child process for piping to and do common cleanup.  The
+   TRIES parameter tells us how many times to try to fork before
+   giving up.  Return the PID of the child or -1 if fork failed.  */
+
+static pid_t
+pipe_fork (int pipefds[2], size_t tries)
+{
+#if HAVE_WORKING_FORK
+  struct tempnode *saved_temphead;
+  int saved_errno;
+  unsigned int wait_retry = 1;
+  pid_t pid IF_LINT (= -1);
+  struct cs_status cs;
+
+  if (pipe (pipefds) < 0)
+    return -1;
+
+  while (tries--)
+    {
+      /* This is so the child process won't delete our temp files
+        if it receives a signal before exec-ing.  */
+      cs = cs_enter ();
+      saved_temphead = temphead;
+      temphead = NULL;
+
+      pid = fork ();
+      saved_errno = errno;
+      if (pid)
+       temphead = saved_temphead;
+
+      cs_leave (cs);
+      errno = saved_errno;
+
+      if (0 <= pid || errno != EAGAIN)
+       break;
+      else
+       {
+         sleep (wait_retry);
+         wait_retry *= 2;
+         reap_some ();
+       }
+    }
+
+  if (pid < 0)
+    {
+      close (pipefds[0]);
+      close (pipefds[1]);
+    }
+  else if (pid == 0)
+    {
+      close (STDIN_FILENO);
+      close (STDOUT_FILENO);
+    }
+  else
+    ++nprocs;
+
+  return pid;
+
+#else  /* ! HAVE_WORKING_FORK */
+  return -1;
+#endif
+}
+
+/* Create a temporary file and start a compression program to filter output
+   to that file.  Set *PFP to the file handle and if *PPID is non-NULL,
+   set it to the PID of the newly-created process.  */
+
+static char *
+create_temp (FILE **pfp, pid_t *ppid)
+{
+  static bool compress_program_known;
+  int tempfd;
+  struct tempnode *node = create_temp_file (&tempfd);
+  char *name = node->name;
+
+  if (! compress_program_known)
+    {
+      compress_program = getenv ("GNUSORT_COMPRESSOR");
+      if (compress_program == NULL)
+       {
+         static const char *default_program = "gzip";
+         const char *path_program = find_in_path (default_program);
+
+         if (path_program != default_program)
+           {
+             if (access (path_program, X_OK) == 0)
+               compress_program = path_program;
+             else
+               free ((char *) path_program);
+           }
+       }
+      else if (*compress_program == '\0')
+       compress_program = NULL;
+
+      compress_program_known = true;
+    }
+
+  if (compress_program)
+    {
+      int pipefds[2];
+
+      node->pid = pipe_fork (pipefds, MAX_FORK_TRIES_COMPRESS);
+      if (0 < node->pid)
+       {
+         close (tempfd);
+         close (pipefds[0]);
+         tempfd = pipefds[1];
+
+         register_proc (node->pid);
+       }
+      else if (node->pid == 0)
+       {
+         close (pipefds[1]);
+         dup2_or_die (tempfd, STDOUT_FILENO);
+         close (tempfd);
+         dup2_or_die (pipefds[0], STDIN_FILENO);
+         close (pipefds[0]);
+
+         if (execlp (compress_program, compress_program,
+                     (char *) NULL) < 0)
+           error (SORT_FAILURE, errno, _("couldn't execute %s"),
+                  compress_program);
+       }
+      else
+       node->pid = 0;
+    }
+
+  *pfp = fdopen (tempfd, "w");
+  if (! *pfp)
+    die (_("couldn't create temporary file"), name);
+
+  if (ppid)
+    *ppid = node->pid;
+
+  return name;
+}
+
+/* Open a compressed temp file and start a decompression process through
+   which to filter the input.  PID must be the valid processes ID of the
+   process used to compress the file.  */
+
+static FILE *
+open_temp (const char *name, pid_t pid)
+{
+  int tempfd, pipefds[2];
+  pid_t child_pid;
+  FILE *fp;
+
+  wait_proc (pid);
+
+  tempfd = open (name, O_RDONLY);
+  if (tempfd < 0)
+    die (_("couldn't open temporary file"), name);
+
+  child_pid = pipe_fork (pipefds, MAX_FORK_TRIES_DECOMPRESS);
+  if (0 < child_pid)
+    {
+      close (tempfd);
+      close (pipefds[1]);
+    }
+  else if (child_pid == 0)
+    {
+      close (pipefds[0]);
+      dup2_or_die (tempfd, STDIN_FILENO);
+      close (tempfd);
+      dup2_or_die (pipefds[1], STDOUT_FILENO);
+      close (pipefds[1]);
+
+      if (execlp (compress_program, compress_program,
+                 "-d", (char *) NULL) < 0)
+       error (SORT_FAILURE, errno, _("couldn't execute %s -d"),
+              compress_program);
+    }
+  else
+    error (SORT_FAILURE, errno, _("couldn't create process for %s -d"),
+          compress_program);
+
+  fp = fdopen (pipefds[0], "r");
+  if (! fp)
+    die (_("couldn't create temporary file"), name);
+
+  return fp;
+}
+
+static void
 write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char 
*output_file)
 {
   if (fwrite (buf, 1, n_bytes, fp) != n_bytes)
@@ -558,20 +963,20 @@ zaptemp (const char *name)
   struct tempnode *volatile *pnode;
   struct tempnode *node;
   struct tempnode *next;
-  sigset_t oldset;
   int unlink_status;
   int unlink_errno = 0;
+  struct cs_status cs;
 
   for (pnode = &temphead; (node = *pnode)->name != name; pnode = &node->next)
     continue;
 
   /* Unlink the temporary file in a critical section to avoid races.  */
   next = node->next;
-  sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+  cs = cs_enter ();
   unlink_status = unlink (name);
   unlink_errno = errno;
   *pnode = next;
-  sigprocmask (SIG_SETMASK, &oldset, NULL);
+  cs_leave (cs);
 
   if (unlink_status != 0)
     error (0, unlink_errno, _("warning: cannot remove: %s"), name);
@@ -1605,7 +2010,7 @@ check (char const *file_name)
    file has not been opened yet (or written to, if standard output).  */
 
 static void
-mergefps (char **files, size_t ntemps, size_t nfiles,
+mergefps (struct sortfile *files, size_t ntemps, size_t nfiles,
          FILE *ofp, char const *output_file)
 {
   FILE *fps[NMERGE];           /* Input streams for each file.  */
@@ -1628,10 +2033,12 @@ mergefps (char **files, size_t ntemps, s
   /* Read initial lines from each input file. */
   for (i = 0; i < nfiles; )
     {
-      fps[i] = xfopen (files[i], "r");
+      fps[i] = (files[i].pid
+               ? open_temp (files[i].name, files[i].pid)
+               : xfopen (files[i].name, "r"));
       initbuf (&buffer[i], sizeof (struct line),
               MAX (merge_buffer_size, sort_size / nfiles));
-      if (fillbuf (&buffer[i], fps[i], files[i]))
+      if (fillbuf (&buffer[i], fps[i], files[i].name))
        {
          struct line const *linelim = buffer_linelim (&buffer[i]);
          cur[i] = linelim - 1;
@@ -1641,11 +2048,11 @@ mergefps (char **files, size_t ntemps, s
       else
        {
          /* fps[i] is empty; eliminate it from future consideration.  */
-         xfclose (fps[i], files[i]);
+         xfclose (fps[i], files[i].name);
          if (i < ntemps)
            {
              ntemps--;
-             zaptemp (files[i]);
+             zaptemp (files[i].name);
            }
          free (buffer[i].buf);
          --nfiles;
@@ -1714,7 +2121,7 @@ mergefps (char **files, size_t ntemps, s
        cur[ord[0]] = smallest - 1;
       else
        {
-         if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]]))
+         if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]].name))
            {
              struct line const *linelim = buffer_linelim (&buffer[ord[0]]);
              cur[ord[0]] = linelim - 1;
@@ -1727,11 +2134,11 @@ mergefps (char **files, size_t ntemps, s
                if (ord[i] > ord[0])
                  --ord[i];
              --nfiles;
-             xfclose (fps[ord[0]], files[ord[0]]);
+             xfclose (fps[ord[0]], files[ord[0]].name);
              if (ord[0] < ntemps)
                {
                  ntemps--;
-                 zaptemp (files[ord[0]]);
+                 zaptemp (files[ord[0]].name);
                }
              free (buffer[ord[0]].buf);
              for (i = ord[0]; i < nfiles; ++i)
@@ -1774,6 +2181,10 @@ mergefps (char **files, size_t ntemps, s
          ord[j] = ord[j + 1];
        ord[count_of_smaller_lines] = ord0;
       }
+
+      /* Free up some resources every once in a while.  */
+      if (MAX_PROCS_BEFORE_REAP < nprocs)
+       reap_some ();
     }
 
   if (unique && savedline)
@@ -1912,8 +2323,8 @@ sortlines_temp (struct line *lines, size
    common cases.  */
 
 static size_t
-avoid_trashing_input (char **files, size_t ntemps, size_t nfiles,
-                     char const *outfile)
+avoid_trashing_input (struct sortfile *files, size_t ntemps,
+                     size_t nfiles, char const *outfile)
 {
   size_t i;
   bool got_outstat = false;
@@ -1921,11 +2332,11 @@ avoid_trashing_input (char **files, size
 
   for (i = ntemps; i < nfiles; i++)
     {
-      bool is_stdin = STREQ (files[i], "-");
+      bool is_stdin = STREQ (files[i].name, "-");
       bool same;
       struct stat instat;
 
-      if (outfile && STREQ (outfile, files[i]) && !is_stdin)
+      if (outfile && STREQ (outfile, files[i].name) && !is_stdin)
        same = true;
       else
        {
@@ -1941,7 +2352,7 @@ avoid_trashing_input (char **files, size
 
          same = (((is_stdin
                    ? fstat (STDIN_FILENO, &instat)
-                   : stat (files[i], &instat))
+                   : stat (files[i].name, &instat))
                   == 0)
                  && SAME_INODE (instat, outstat));
        }
@@ -1949,9 +2360,11 @@ avoid_trashing_input (char **files, size
       if (same)
        {
          FILE *tftp;
-         char *temp = create_temp_file (&tftp);
-         mergefps (&files[i], 0, nfiles - i, tftp, temp);
-         files[i] = temp;
+         pid_t pid;
+         char *temp = create_temp (&tftp, &pid);
+         mergefps (&files[i],0, nfiles - i, tftp, temp);
+         files[i].name = temp;
+         files[i].pid = pid;
          return i + 1;
        }
     }
@@ -1965,7 +2378,8 @@ avoid_trashing_input (char **files, size
    OUTPUT_FILE; a null OUTPUT_FILE stands for standard output.  */
 
 static void
-merge (char **files, size_t ntemps, size_t nfiles, char const *output_file)
+merge (struct sortfile *files, size_t ntemps, size_t nfiles,
+       char const *output_file)
 {
   while (NMERGE < nfiles)
     {
@@ -1986,11 +2400,13 @@ merge (char **files, size_t ntemps, size
       for (out = in = 0; out < nfiles / NMERGE; out++, in += NMERGE)
        {
          FILE *tfp;
-         char *temp = create_temp_file (&tfp);
+         pid_t pid;
+         char *temp = create_temp (&tfp, &pid);
          size_t nt = MIN (ntemps, NMERGE);
          ntemps -= nt;
          mergefps (&files[in], nt, NMERGE, tfp, temp);
-         files[out] = temp;
+         files[out].name = temp;
+         files[out].pid = pid;
        }
 
       remainder = nfiles - in;
@@ -2003,11 +2419,13 @@ merge (char **files, size_t ntemps, size
             files as possible, to avoid needless I/O.  */
          size_t nshortmerge = remainder - cheap_slots + 1;
          FILE *tfp;
-         char *temp = create_temp_file (&tfp);
+         pid_t pid;
+         char *temp = create_temp (&tfp, &pid);
          size_t nt = MIN (ntemps, nshortmerge);
          ntemps -= nt;
          mergefps (&files[in], nt, nshortmerge, tfp, temp);
-         files[out++] = temp;
+         files[out].name = temp;
+         files[out++].pid = pid;
          in += nshortmerge;
        }
 
@@ -2079,7 +2497,7 @@ sort (char * const *files, size_t nfiles
          else
            {
              ++ntemps;
-             temp_output = create_temp_file (&tfp);
+             temp_output = create_temp (&tfp, NULL);
            }
 
          do
@@ -2094,6 +2512,10 @@ sort (char * const *files, size_t nfiles
 
          xfclose (tfp, temp_output);
 
+         /* Free up some resources every once in a while.  */
+         if (MAX_PROCS_BEFORE_REAP < nprocs)
+           reap_some ();
+
          if (output_file_created)
            goto finish;
        }
@@ -2107,10 +2529,11 @@ sort (char * const *files, size_t nfiles
     {
       size_t i;
       struct tempnode *node = temphead;
-      char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
+      struct sortfile *tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
       for (i = 0; node; i++)
        {
-         tempfiles[i] = node->name;
+         tempfiles[i].name = node->name;
+         tempfiles[i].pid = node->pid;
          node = node->next;
        }
       merge (tempfiles, ntemps, ntemps, output_file);
@@ -2717,7 +3140,15 @@ main (int argc, char **argv)
     }
 
   if (mergeonly)
-    merge (files, 0, nfiles, outfile);
+    {
+      struct sortfile *sortfiles = xcalloc (nfiles, sizeof *sortfiles);
+      size_t i;
+
+      for (i = 0; i < nfiles; ++i)
+       sortfiles[i].name = files[i];
+
+      merge (sortfiles, 0, nfiles, outfile);
+    }
   else
     sort (files, nfiles, outfile);
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]