coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH 1/8] split: accept new output --filter option


From: Jim Meyering
Subject: [PATCH 1/8] split: accept new output --filter option
Date: Sat, 30 Apr 2011 15:31:02 +0200

From: Karl Heuer <address@hidden>

* src/split.c (FIXME): FIXME
---
 src/split.c |  211 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 188 insertions(+), 23 deletions(-)

diff --git a/src/split.c b/src/split.c
index 65e44dd..7aab357 100644
--- a/src/split.c
+++ b/src/split.c
@@ -25,7 +25,9 @@
 #include <assert.h>
 #include <stdio.h>
 #include <getopt.h>
+#include <signal.h>
 #include <sys/types.h>
+#include <sys/wait.h>

 #include "system.h"
 #include "error.h"
@@ -35,6 +37,7 @@
 #include "full-write.h"
 #include "quote.h"
 #include "safe-read.h"
+#include "sig2str.h"
 #include "xfreopen.h"
 #include "xstrtol.h"

@@ -45,6 +48,21 @@
   proper_name_utf8 ("Torbjorn Granlund", "Torbj\303\266rn Granlund"), \
   proper_name ("Richard M. Stallman")

+/* Shell command to filter through, instead of creating files.  */
+static char const *filter_command;
+
+/* Process ID of the filter.  */
+static int filter_pid;
+
+/* Array of open pipes.  */
+static int *open_pipes;
+static size_t open_pipes_alloc;
+static size_t n_open_pipes;
+
+/* Blocked signals.  */
+static sigset_t oldblocked;
+static sigset_t newblocked;
+
 /* Base name of output files.  */
 static char const *outbase;

@@ -103,6 +121,7 @@ static struct option const longopts[] =
   {"unbuffered", no_argument, NULL, 'u'},
   {"suffix-length", required_argument, NULL, 'a'},
   {"numeric-suffixes", no_argument, NULL, 'd'},
+  {"filter", required_argument, NULL, 'f'},
   {"verbose", no_argument, NULL, VERBOSE_OPTION},
   {"-io-blksize", required_argument, NULL,
    IO_BLKSIZE_OPTION}, /* do not document */
@@ -170,6 +189,7 @@ Mandatory arguments to long options are mandatory for short 
options too.\n\
   -C, --line-bytes=SIZE   put at most SIZE bytes of lines per output file\n\
   -d, --numeric-suffixes  use numeric suffixes instead of alphabetic\n\
   -e, --elide-empty-files  do not generate empty output files with `-n'\n\
+  -f, --filter=COMMAND    write to shell COMMAND; file name is $FILE\n\
   -l, --lines=NUMBER      put NUMBER lines per output file\n\
   -n, --number=CHUNKS     generate CHUNKS output files.  See below\n\
   -u, --unbuffered        immediately copy input to output with `-n r/...'\n\
@@ -256,10 +276,116 @@ next_file_name (void)
 static int
 create (const char *name)
 {
-  if (verbose)
-    fprintf (stdout, _("creating file %s\n"), quote (name));
-  return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
-               (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH));
+  if (!filter_command)
+    {
+      if (verbose)
+        fprintf (stdout, _("creating file %s\n"), quote (name));
+      return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
+                   (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | 
S_IWOTH));
+    }
+  else
+    {
+      int fd_pair[2];
+      pid_t child_pid;
+      char const *shell_prog = getenv ("SHELL");
+      if (shell_prog == NULL)
+        shell_prog = "/bin/sh";
+      if (setenv ("FILE", name, 1) != 0)
+        error (EXIT_FAILURE, errno, "cannot set environment variable");
+      if (verbose)
+        fprintf (stdout, _("executing with FILE=%s\n"), quote (name));
+      if (pipe (fd_pair) != 0)
+        error (EXIT_FAILURE, errno, _("cannot create pipe"));
+      child_pid = fork ();
+      if (child_pid == 0)
+        {
+          /* This is the child process.  If an error occurs here, the
+             parent will eventually learn about it after doing a wait,
+             at which time it will emit its own error message.  */
+          int j;
+          /* We have to close any pipes that were opened during an
+             earlier call, otherwise this process will be holding a
+             write-pipe that will prevent the earlier process from
+             reading an EOF on the corresponding read-pipe.  */
+          for (j = 0; j < n_open_pipes; ++j)
+            if (close (open_pipes[j]) != 0)
+              error (EXIT_FAILURE, errno, _("closing prior pipe"));
+          if (close (fd_pair[1]))
+            error (EXIT_FAILURE, errno, _("closing output pipe"));
+          if (fd_pair[0] != STDIN_FILENO)
+            {
+              if (dup2 (fd_pair[0], STDIN_FILENO) != STDIN_FILENO)
+                error (EXIT_FAILURE, errno, _("moving input pipe"));
+              if (close (fd_pair[0]) != 0)
+                error (EXIT_FAILURE, errno, _("closing input pipe"));
+            }
+          sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+          execl (shell_prog, last_component (shell_prog), "-c",
+                 filter_command, (char *) NULL);
+          error (EXIT_FAILURE, errno, "%s", shell_prog);
+        }
+      if (child_pid == -1)
+        error (EXIT_FAILURE, errno, _("cannot spawn new process"));
+      if (close (fd_pair[0]) != 0)
+        error (EXIT_FAILURE, errno, _("closing input pipe"));
+      filter_pid = child_pid;
+      if (n_open_pipes == open_pipes_alloc)
+        open_pipes = x2nrealloc (open_pipes, &open_pipes_alloc,
+                                 sizeof *open_pipes);
+      open_pipes[n_open_pipes++] = fd_pair[1];
+      return fd_pair[1];
+    }
+}
+
+/* Close the output file, and do any associated cleanup.
+   If FP and FD are both specified, they refer to the same open file;
+   in this case FP is closed, but FD is still used in cleanup.  */
+static void
+closeout (FILE *fp, int fd, pid_t pid, char const *name)
+{
+  if (fp != NULL && fclose (fp) != 0 && errno != EPIPE)
+    error (EXIT_FAILURE, errno, "%s", name);
+  if (fd >= 0)
+    {
+      if (fp == NULL && close (fd) < 0)
+        error (EXIT_FAILURE, errno, "%s", name);
+      int j;
+      for (j = 0; j < n_open_pipes; ++j)
+        if (open_pipes[j] == fd) {
+          open_pipes[j] = open_pipes[--n_open_pipes];
+          break;
+        }
+    }
+  if (pid > 0)
+    {
+      int wstatus = 0;
+      if (waitpid (pid, &wstatus, 0) == -1 && errno != ECHILD)
+        error (EXIT_FAILURE, errno, _("waiting for child process"));
+      if (WIFSIGNALED (wstatus))
+        {
+          int sig = WTERMSIG (wstatus);
+          if (sig != SIGPIPE)
+            {
+              char signame[SIG2STR_MAX+1];
+              if (sig2str (sig, signame) != 0)
+                sprintf (signame, "%d", sig);
+              error (sig + 128, 0,
+                     _("with FILE=%s, signal %s (%s) from command: %s"),
+                     name, signame, strsignal (sig), filter_command);
+            }
+        }
+      else if (WIFEXITED (wstatus))
+        {
+          int ex = WEXITSTATUS (wstatus);
+          if (ex != 0)
+            error (ex, 0, _("with FILE=%s, exit %d from command: %s"),
+                   name, ex, filter_command);
+        }
+      else
+        /* shouldn't happen.  */
+        error (EXIT_FAILURE, 0,
+               _("unknown status from command (0x%X)"), wstatus);
+    }
 }

 /* Write BYTES bytes at BP to an output file.
@@ -273,13 +399,12 @@ cwrite (bool new_file_flag, const char *bp, size_t bytes)
     {
       if (!bp && bytes == 0 && elide_empty_files)
         return;
-      if (output_desc >= 0 && close (output_desc) < 0)
-        error (EXIT_FAILURE, errno, "%s", outfile);
+      closeout (NULL, output_desc, filter_pid, outfile);
       next_file_name ();
       if ((output_desc = create (outfile)) < 0)
         error (EXIT_FAILURE, errno, "%s", outfile);
     }
-  if (full_write (output_desc, bp, bytes) != bytes)
+  if (full_write (output_desc, bp, bytes) != bytes && errno != EPIPE)
     error (EXIT_FAILURE, errno, "%s", outfile);
 }

@@ -501,7 +626,8 @@ lines_chunk_split (uintmax_t k, uintmax_t n, char *buf, 
size_t bufsize,
               /* We don't use the stdout buffer here since we're writing
                  large chunks from an existing file, so it's more efficient
                  to write out directly.  */
-              if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
+              if (full_write (STDOUT_FILENO, bp, to_write) != to_write
+                  && errno != EPIPE)
                 error (EXIT_FAILURE, errno, "%s", _("write error"));
             }
           else
@@ -564,7 +690,7 @@ bytes_chunk_extract (uintmax_t k, uintmax_t n, char *buf, 
size_t bufsize,
         error (EXIT_FAILURE, errno, "%s", infile);
       else if (n_read == 0)
         break; /* eof.  */
-      if (full_write (STDOUT_FILENO, buf, n_read) != n_read)
+      if (full_write (STDOUT_FILENO, buf, n_read) != n_read && errno != EPIPE)
         error (EXIT_FAILURE, errno, "%s", quote ("-"));
       start += n_read;
     }
@@ -575,6 +701,7 @@ typedef struct of_info
   char *of_name;
   int ofd;
   FILE *ofile;
+  int opid;
 } of_t;

 enum
@@ -637,14 +764,17 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles)
                 error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
             }

-          if (fclose (files[i_reopen].ofile) != 0)
+          if (fclose (files[i_reopen].ofile) != 0 && errno != EPIPE)
             error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name);
+          files[i_reopen].ofile = NULL;
           files[i_reopen].ofd = OFD_APPEND;
         }

       files[i_check].ofd = fd;
       if (!(files[i_check].ofile = fdopen (fd, "a")))
         error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
+      files[i_check].opid = filter_pid;
+      filter_pid = 0;
     }

   return file_limit;
@@ -658,6 +788,7 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles)
 static void
 lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize)
 {
+  bool wrapped = false;
   bool file_limit;
   size_t i_file;
   of_t *files IF_LINT (= NULL);
@@ -678,6 +809,7 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t 
bufsize)
           files[i_file].of_name = xstrdup (outfile);
           files[i_file].ofd = OFD_NEW;
           files[i_file].ofile = NULL;
+          files[i_file].opid = 0;
         }
       i_file = 0;
       file_limit = false;
@@ -715,10 +847,12 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t 
bufsize)
             {
               if (line_no == k && unbuffered)
                 {
-                  if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
+                  if (full_write (STDOUT_FILENO, bp, to_write) != to_write
+                      && errno != EPIPE)
                     error (EXIT_FAILURE, errno, "%s", _("write error"));
                 }
-              else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1)
+              else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1
+                       && errno != EPIPE)
                 {
                   clearerr (stdout); /* To silence close_stdout().  */
                   error (EXIT_FAILURE, errno, "%s", _("write error"));
@@ -734,19 +868,25 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t 
bufsize)
                 {
                   /* Note writing to fd, rather than flushing the FILE gives
                      an 8% performance benefit, due to reduced data copying.  
*/
-                  if (full_write (files[i_file].ofd, bp, to_write) != to_write)
+                  if (full_write (files[i_file].ofd, bp, to_write) != to_write
+                      && errno != EPIPE)
                     error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
                 }
-              else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1)
+              else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1
+                       && errno != EPIPE)
                 error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
               if (file_limit)
                 {
-                  if (fclose (files[i_file].ofile) != 0)
+                  if (fclose (files[i_file].ofile) != 0 && errno != EPIPE)
                     error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+                  files[i_file].ofile = NULL;
                   files[i_file].ofd = OFD_APPEND;
                 }
               if (next && ++i_file == n)
-                i_file = 0;
+                {
+                  wrapped = true;
+                  i_file = 0;
+                }
             }

           bp = bp_out;
@@ -757,11 +897,18 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t 
bufsize)
      and to signal any waiting fifo consumers.
      Also, close any open file descriptors.
      FIXME: Should we do this before EXIT_FAILURE?  */
-  for (i_file = 0; !k && !elide_empty_files && i_file < n; i_file++)
+  if (!k)
     {
-      file_limit |= ofile_open (files, i_file, n);
-      if (fclose (files[i_file].ofile) != 0)
-        error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+      int ceiling = (wrapped ? n : i_file);
+      for (i_file = 0; i_file < n; i_file++)
+        {
+          if (i_file >= ceiling && !elide_empty_files)
+            file_limit |= ofile_open (files, i_file, n);
+          if (files[i_file].ofd >= 0)
+            closeout (files[i_file].ofile, files[i_file].ofd,
+                      files[i_file].opid, files[i_file].of_name);
+          files[i_file].ofd = OFD_APPEND;
+        }
     }
 }

@@ -824,7 +971,8 @@ main (int argc, char **argv)
       int this_optind = optind ? optind : 1;
       char *slash;

-      c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u", longopts, NULL);
+      c = getopt_long (argc, argv, "0123456789C:a:b:def:l:n:u",
+                       longopts, NULL);
       if (c == -1)
         break;

@@ -955,6 +1103,10 @@ main (int argc, char **argv)
           elide_empty_files = true;
           break;

+        case 'f':
+          filter_command = optarg;
+          break;
+
         case IO_BLKSIZE_OPTION:
           {
             uintmax_t tmp_blk_size;
@@ -1048,6 +1200,18 @@ main (int argc, char **argv)

   buf = ptr_align (xmalloc (in_blk_size + 1 + page_size - 1), page_size);

+  /* When filtering, closure of one pipe must not terminate the process,
+     as there may still be other streams expecting input from us.  */
+  sigemptyset (&newblocked);
+  if (filter_command)
+    {
+      struct sigaction act;
+      sigaction (SIGPIPE, NULL, &act);
+      if (act.sa_handler != SIG_IGN)
+        sigaddset (&newblocked, SIGPIPE);
+    }
+  sigprocmask (SIG_BLOCK, &newblocked, &oldblocked);
+
   switch (split_type)
     {
     case type_digits:
@@ -1084,10 +1248,11 @@ main (int argc, char **argv)
       abort ();
     }

+  sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+
   if (close (STDIN_FILENO) != 0)
     error (EXIT_FAILURE, errno, "%s", infile);
-  if (output_desc >= 0 && close (output_desc) < 0)
-    error (EXIT_FAILURE, errno, "%s", outfile);
+  closeout (NULL, output_desc, filter_pid, outfile);

   exit (EXIT_SUCCESS);
 }
-- 
1.7.5.134.g1c08b




reply via email to

[Prev in Thread] Current Thread [Next in Thread]