bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH] sort: Add --threads option, which parallelizes internal sort


From: Chen Guo
Subject: Re: [PATCH] sort: Add --threads option, which parallelizes internal sort.
Date: Fri, 09 Oct 2009 18:33:09 -0700
User-agent: Thunderbird 2.0.0.23 (X11/20090817)

Hi all,
I originally only sent this to Jim because I had no clue how to properly respond to the thread, hope I'm doing it right this time. I should mention that the times posted below are elapsed, user, and system times, and I should also note that my test file is cat'ed from /dev/random, 100 bytes at a time.

************************

Hi Jim,

With enormous help from Professor Eggert, and going off of Glen's code, I've edited parts of sort for improved parallelization. For example:

1,000,000 records of 100 bytes sorted in the LC_ALL locale, on gcc14 which has 2x4x3.0 GHz Xeons.

My implementation:
T=16
1.24 4.16 0.45
1.18 4.46 0.32
1.27 4.56 0.43
1.23 4.54 0.39
T=8
1.34 4.66 0.24
1.24 4.53 0.22
1.28 4.74 0.20
1.20 4.53 0.18
T=4
1.82 4.41 0.15
1.55 4.32 0.13
2.11 4.43 0.13
1.71 4.46 0.12
T=2
2.56 4.26 0.13
2.55 4.30 0.10
2.54 4.28 0.11
2.55 4.30 0.10
T=1
4.39 4.26 0.12
4.39 4.29 0.10
4.38 4.26 0.11
4.40 4.26 0.12

Glen's:
T=16
1.53 4.32 0.16
1.48 4.30 0.15
1.49 4.40 0.18
1.49 4.28 0.17
T=8
1.48 4.44 0.14
1.54 4.34 0.14
1.48 4.39 0.16
1.55 4.39 0.16
T=4
1.76 4.37 0.11
1.74 4.36 0.13
1.74 4.46 0.13
1.74 4.39 0.12
T=2
2.55 4.30 0.12
2.54 4.27 0.12
2.55 4.30 0.11
2.55 4.28 0.12
T=1
4.38 4.27 0.11
4.40 4.29 0.10
4.40 4.29 0.10
4.40 4.29 0.11


As a result of the new algorithm, user time is up, but wallclock time is down and the degree of parallelization is also up. In the build from the tests above, for nthreads <= 2 my implementation uses Glen's code, and judging by the results above it appears I should change that to nthreads <= 4. At any rate, from nthreads == 5 onwards the new implementation yields faster wallclock times than the current multithreaded version.

Professor Eggert and I believe the inherent problem with the current multithreaded version is that the processing phase of mergesort, the actual merge portion, is difficult to parallelize. I use a quicksort algorithm where the processing portion of the sort (the splitting of data by pivot) use all available threads.

In exchange for improved parallelization, 3*sizeof (struct line) bytes of RAM are now used per line, and there is extra copying of struct lines.

The algorithm is a multithreaded take on the Dutch National Flag quicksort algorithm. Let's say we're running with 4 threads:

The lines are divided into 4 equal pieces, each thread getting a piece. First, the lines are copied to a buffer according to how they compare to the pivot:

+---------------------------+-----------------------------+
|              <---  >pivot | ==pivot  --->   <--- <pivot |
+---------------------------+-----------------------------+
nlines*sizeof(struct line)   nlines*sizeof(struct line)

At this point, the main thread does bookkeeping and counts how many total lines belong to what category, from which threads. From this, we can derive the destinations of all the lines in the buffer. The final result would look like

+-------------------------------------------+-----------
|  >pivot  |  >pivot  |  >pivot  |  >pivot  | ==pivot
|   from   |   from   |   from   |   from   |  from      ...etc
| thread 1 | thread 2 | thread 3 | thread 4 | thread 1
+-------------------------------------------+-----------

The improvement over Glen's code is modest, but parallelization is up and the wallclock time no longer increases with higher amounts of threads. The code is rather unorganized right now, but if you guys are interested and believe this to be a viable implementation of threaded sort I'd be more than glad to clean it up and post it.

***********************

Since then Jim's gotten back to me and given me the go ahead, so here it is. As it is now, there's a small bug where in the output, a couple lines would lose their new line characters, but other than that it produces the same output as the current sort.

address@hidden:~/testing$ ./current --threads=8 sort.c > sorted1
address@hidden:~/testing$ ./sortorig sort.c > sorted2
address@hidden:~/testing$ diff -a sorted1 sorted2
1711c1711,1712
<       errno = saved_errno;      errno = saved_errno;
---
>       errno = saved_errno;
>       errno = saved_errno;


diff --git a/src/sort.c b/src/sort.c
old mode 100644
new mode 100755
index f48d727..d618a78
--- a/src/sort.c
+++ b/src/sort.c
@@ -23,6 +23,7 @@
#include <config.h>

#include <getopt.h>
+#include <pthread.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <signal.h>
@@ -32,6 +33,7 @@
#include "filevercmp.h"
#include "hash.h"
#include "md5.h"
+#include "nproc.h"
#include "physmem.h"
#include "posixver.h"
#include "quote.h"
@@ -83,6 +85,13 @@ struct rlimit { size_t rlim_cur; };
# define OPEN_MAX 20
#endif

+/* Heuristic value for the number of lines for which it is worth
+   creating a subthread, during an internal merge sort, on a machine
+   that has processors galore.  Currently this number is just a guess.
+   This value must be at least 4.  We don't know of any machine where
+   this number has any practical effect.  */
+enum { SUBTHREAD_LINES_HEURISTIC = 4 };
+
#define UCHAR_LIM (UCHAR_MAX + 1)

#ifndef DEFAULT_TMPDIR
@@ -289,8 +298,6 @@ static char const *compress_program;
   number are present, temp files will be used. */
static unsigned int nmerge = NMERGE_DEFAULT;

-static void sortlines_temp (struct line *, size_t, struct line *);
-
/* Report MESSAGE for FILE, then clean up and exit.
   If FILE is null, it represents standard output.  */

@@ -380,6 +387,7 @@ Other options:\n\
-t, --field-separator=SEP use SEP instead of non-blank to blank transition\n\ -T, --temporary-directory=DIR use DIR for temporaries, not $TMPDIR or %s;\n\ multiple options specify multiple directories\n\ + --threads=N use no more than N threads to improve parallelism\n\
  -u, --unique              with -c, check for strict ordering;\n\
without -c, output only the first of an equal run\n\
"), DEFAULT_TMPDIR);
@@ -423,7 +431,8 @@ enum
  FILES0_FROM_OPTION,
  NMERGE_OPTION,
  RANDOM_SOURCE_OPTION,
-  SORT_OPTION
+  SORT_OPTION,
+  THREADS_OPTION
};

static char const short_options[] = "-bcCdfgik:mMno:rRsS:t:T:uVy:z";
@@ -455,6 +464,7 @@ static struct option const long_options[] =
  {"temporary-directory", required_argument, NULL, 'T'},
  {"unique", no_argument, NULL, 'u'},
  {"zero-terminated", no_argument, NULL, 'z'},
+  {"threads", required_argument, NULL, THREADS_OPTION},
  {GETOPT_HELP_OPTION_DECL},
  {GETOPT_VERSION_OPTION_DECL},
  {NULL, 0, NULL, 0},
@@ -1253,6 +1263,21 @@ specify_sort_size (int oi, char c, char const *s)
  xstrtol_fatal (e, oi, c, long_options, s);
}

+/* Specify the number of threads to spawn during internal sort.  */
+static unsigned long int
+specify_nthreads (int oi, char c, char const *s)
+{
+  unsigned long int nthreads;
+  enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, "");
+  if (e == LONGINT_OVERFLOW)
+    return ULONG_MAX;
+  if (e != LONGINT_OK)
+    xstrtol_fatal (e, oi, c, long_options, s);
+  if (nthreads == 0)
+    error (SORT_FAILURE, 0, _("number of threads must be nonzero"));
+  return nthreads;
+}
+
/* Return the default sort size.  */
static size_t
default_sort_size (void)
@@ -2441,10 +2466,13 @@ mergefiles (struct sortfile *files, size_t ntemps, size_t nfiles,
   NHI must be positive, and HI - NHI must equal T - (NLO + NHI).  */

static inline void
-mergelines (struct line *t,
-        struct line const *lo, size_t nlo,
-        struct line const *hi, size_t nhi)
+mergelines (struct line *t, size_t nlines,
+            struct line const *restrict lo)
{
+  size_t nlo = nlines / 2;
+  size_t nhi = nlines - nlo;
+  struct line const *hi = t - nlo;
+
  for (;;)
    if (compare (lo - 1, hi - 1) <= 0)
      {
@@ -2471,28 +2499,70 @@ mergelines (struct line *t,
      }
}

-/* Sort the array LINES with NLINES members, using TEMP for temporary space.
-   NLINES must be at least 2.
+static void sortlines_merge (struct line *restrict, size_t, struct line *restrict,
+                     unsigned long int, bool);
+
+/* Thread arguments for sortlines_thread.  */
+struct merge_args
+{
+  struct line *lines;
+  size_t nlines;
+  struct line *temp;
+  unsigned long int nthreads;
+  bool to_temp;
+};
+
+/* Like sortlines, except with a signature acceptable to pthread_create. */
+static void *
+sortlines_merge_thread (void *data)
+{
+  struct merge_args const *args = data;
+  sortlines_merge (args->lines, args->nlines, args->temp, args->nthreads,
+               args->to_temp);
+  return NULL;
+}
+
+/* Sort the array LINES with NLINES members, using TEMP for temporary space,
+   spawning NTHREADS threads.  NLINES must be at least 2.
   The input and output arrays are in reverse order, and LINES and
   TEMP point just past the end of their respective arrays.

+   If TO_TEMP, place the result in TEMP (trashing LINES in the
+   process); otherwise, place the result back into LINES so that it is
+   an in-place sort (trashing TEMP in the process).
+
   Use a recursive divide-and-conquer algorithm, in the style
-   suggested by Knuth volume 3 (2nd edition), exercise 5.2.4-23.  Use
-   the optimization suggested by exercise 5.2.4-10; this requires room
-   for only 1.5*N lines, rather than the usual 2*N lines.  Knuth
-   writes that this memory optimization was originally published by
-   D. A. Bell, Comp J. 1 (1958), 75.  */
+   suggested by Knuth volume 3 (2nd edition), exercise 5.2.4-23.  If
+   multithreaded, this requires that TEMP contain NLINES entries; if
+   singlethreaded, use the optimization suggested by Knuth exercise
+   5.2.4-10, which requires room for only 1.5*N lines, rather than the
+   usual 2*N lines.  Knuth writes that this memory optimization was
+   originally published by D. A. Bell, Comp J. 1 (1958), 75.
+
+   This function is inline so that its tests for multthreadedness and
+   inplacedness can be optimized away in common cases.  */

static void
-sortlines (struct line *lines, size_t nlines, struct line *temp)
+sortlines_merge (struct line *restrict lines, size_t nlines,
+             struct line *restrict temp,
+                 unsigned long int nthreads, bool to_temp)
{
  if (nlines == 2)
    {
-      if (0 < compare (&lines[-1], &lines[-2]))
+      /* Declare `swap' as int, not bool, to work around a bug
+ <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
+     in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
+      int swap = (0 < compare (&lines[-1], &lines[-2]));
+      if (to_temp)
    {
-      struct line tmp = lines[-1];
+      temp[-1] = lines[-1 - swap];
+      temp[-2] = lines[-2 + swap];
+    }
+      else if (swap)
+    {
+      temp[-1] = lines[-1];
      lines[-1] = lines[-2];
-      lines[-2] = tmp;
+      lines[-2] = temp[-1];
    }
    }
  else
@@ -2501,46 +2571,359 @@ sortlines (struct line *lines, size_t nlines, struct line *temp)
      size_t nhi = nlines - nlo;
      struct line *lo = lines;
      struct line *hi = lines - nlo;
-      struct line *sorted_lo = temp;

-      sortlines (hi, nhi, temp);
-      if (1 < nlo)
-    sortlines_temp (lo, nlo, sorted_lo);
+      unsigned long int child_subthreads = nthreads / 2;
+      unsigned long int my_subthreads = nthreads - child_subthreads;
+      pthread_t thread;
+ struct merge_args args = {hi, nhi, temp - nlo, child_subthreads, to_temp};
+
+      if (child_subthreads != 0 && SUBTHREAD_LINES_HEURISTIC <= nlines
+ && pthread_create (&thread, NULL, sortlines_merge_thread, &args) == 0)
+    {
+      /* Guarantee that nlo and nhi are each at least 2.  */
+      verify (4 <= SUBTHREAD_LINES_HEURISTIC);
+
+      sortlines_merge (lo, nlo, temp, my_subthreads, !to_temp);
+      pthread_join (thread, NULL);
+    }
      else
-    sorted_lo[-1] = lo[-1];
+    {
+      sortlines_merge (hi, nhi, temp - (to_temp ? nlo : 0), 1, to_temp);
+      if (1 < nlo)
+        sortlines_merge (lo, nlo, temp, 1, !to_temp);
+      else if (!to_temp)
+        temp[-1] = lo[-1];
+    }

-      mergelines (lines, sorted_lo, nlo, hi, nhi);
+      struct line *dest;
+      struct line const *sorted_lo;
+      if (to_temp)
+    {
+      dest = temp;
+      sorted_lo = lines;
+    }
+      else
+    {
+      dest = lines;
+      sorted_lo = temp;
+    }
+      mergelines (dest, nlines, sorted_lo);
    }
}

-/* Like sortlines (LINES, NLINES, TEMP), except output into TEMP
-   rather than sorting in place.  */
+
+struct sortlines_args
+{
+  struct line *lines;
+  size_t nlines;
+  int nthreads;
+  struct line* temp;
+};
+
+static void *
+sortlines_merge_thread2 (void *data)
+{
+  struct sortlines_args const *args = data;
+ sortlines_merge (args->lines, args->nlines, args->temp, args->nthreads, 0);
+  return NULL;
+}
+
+static inline void swap_lines (struct line* line1, struct line* line2)
+{
+  struct line tmp = *line1;
+  *line1 = *line2;
+  *line2 = tmp;
+}
+
+// swap = 0: just return pointer
+// swap = 1: arrange first, middle last in order
+static struct line*
+medianof3 (struct line* first, struct line* middle, struct line* last,
+           int swap)
+{
+  if (0 < compare (middle, first))            // f < m
+    {
+      if (0 < compare (first, last))          // l < f < m
+        {
+          if (swap)
+            {
+              struct line tmp = *middle;
+              *middle = *first;
+              *first = *last;
+              *last = tmp;
+            }
+          else
+            return first;
+        }
+      else if (0 < compare (middle, last))    // f < l < m
+        {
+          if (swap)
+            swap_lines (middle, last);
+          else
+            return last;
+        }
+      // Else, first < middle < last. Perfect.
+    }
+  else                                        // m < f
+    {
+      if (0 < compare (last, first))          // m < f < l
+        {
+          if (swap)
+            swap_lines (middle, first);
+          else
+            return first;
+        }
+      else if (0 < compare (last, middle))    // m < l < f
+        {
+          if (swap)
+            {
+              struct line tmp = *middle;
+              *middle = *last;
+              *last = *first;
+              *first = tmp;
+            }
+          else
+            return last;
+        }
+      else if (swap)                          // l < m < f
+        swap_lines (first, last);
+    }
+  return middle;
+}
+
+
+struct dist_args
+{
+  struct line* iter;
+  size_t nlines;
+  struct line* buf;
+  struct line* pivot;
+};
+
+struct counters
+{
+  size_t nless;
+  size_t neq;
+  size_t ngrt;
+};
+
+static struct counters*
+distribute (struct line *iter, size_t nlines, struct line* buf,
+            struct line* pivot)
+{
+  struct line* bufless = buf;
+  struct line* bufeq = buf - nlines;
+  struct line* bufgrt = bufeq;
+  size_t i = 0;
+  while (i++ < nlines)
+    {
+      int result = compare (pivot, --iter);
+      if (result > 0)
+          *(--bufless) = *iter;
+      else if (result == 0)
+          *(bufeq++) = *iter;
+      else
+          *(--bufgrt) = *iter;
+    }
+
+  struct counters* ret = malloc (sizeof (struct counters));
+  ret->nless = buf - bufless;
+  ret->neq = bufeq - buf + nlines;
+  ret->ngrt = buf - nlines- bufgrt;
+  return ret;
+}
+
+static void *
+distribute_thread (void *data)
+{
+  struct dist_args const *args = data;
+ pthread_exit(distribute (args->iter, args->nlines, args->buf, args->pivot));
+  return NULL;
+}
+
+
+struct copy_args
+{
+  struct line* buf;
+  struct line* destless;
+  struct line* desteq;
+  struct line* destgrt;
+  size_t nlines;
+  size_t nless;
+  size_t neq;
+};
+
+static void *
+copy_lines (void* data)
+{
+  struct copy_args* args = data;
+  struct line *bufeq = args->buf - args->nlines;
+  struct line *bufgrt = bufeq;
+  size_t ngrt = args->nlines - args->nless - args->neq;
+  while (args->nless-- != 0)
+    *(--args->destless) = *(--args->buf);
+  while (args->neq-- != 0)
+    *(--args->desteq) = *(bufeq++);
+  while (ngrt-- != 0)
+    *(--args->destgrt) = *(--bufgrt);
+  return NULL;
+}
+
+
+static void
+sortlines (struct line *restrict, size_t, size_t, struct line *restrict);
+
+static void *
+sortlines_thread (void *data)
+{
+  struct sortlines_args const *args = data;
+  sortlines (args->lines, args->nlines, args->nthreads, args->temp);
+  return NULL;
+}

static void
-sortlines_temp (struct line *lines, size_t nlines, struct line *temp)
+sortlines (struct line *restrict lines, size_t nlines, size_t nthreads,
+           struct line *restrict buf)
{
  if (nlines == 2)
    {
-      /* Declare `swap' as int, not bool, to work around a bug
- <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
-     in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
-      int swap = (0 < compare (&lines[-1], &lines[-2]));
-      temp[-1] = lines[-1 - swap];
-      temp[-2] = lines[-2 + swap];
+      if (0 < compare (&lines[-1], &lines[-2]))
+    {
+      struct line tmp = lines[-1];
+      lines[-1] = lines[-2];
+      lines[-2] = tmp;
+    }
    }
-  else
+  else if (nlines > 2)
    {
-      size_t nlo = nlines / 2;
-      size_t nhi = nlines - nlo;
-      struct line *lo = lines;
-      struct line *hi = lines - nlo;
-      struct line *sorted_hi = temp - nlo;
+      // The array is backwards, so right is first, left is last
+      struct line pivot;
+      size_t mid_index = -((nlines+1) / 2);
+      struct line* middle = &lines[mid_index];
+      struct line* right = &lines[-1];
+      struct line* left = &lines[-nlines];
+      if (nlines < 200)
+        {
+          pivot = *(medianof3 (right, middle, left, 1));
+          if (nlines == 3)
+            return;
+        }
+      else
+        {
+          // srand(time(0)) is called in sort()
+          middle = medianof3 (right, middle, left, 0);
+          struct line* middle2 = medianof3 (left + rand() % nlines,
+                                            left + rand() % nlines,
+                                            right - rand() % nlines, 0);
+          struct line* middle3 = medianof3 (left + rand() % nlines,
+                                            right - rand() % nlines,
+                                            right - rand() % nlines, 0);
+          pivot = *(medianof3 (middle2, middle, middle3, 0));
+        }

-      sortlines_temp (hi, nhi, sorted_hi);
-      if (1 < nlo)
-    sortlines (lo, nlo, temp);
+      // For each of nthread subsections of array, construct a <, ==, and >
+      // section.
+      pthread_t* threads = malloc ((nthreads-1)*sizeof(pthread_t));
+      size_t threadlines = nlines/nthreads;
+      size_t* nless_array = malloc (nthreads*sizeof(size_t));
+      size_t* neq_array = malloc (nthreads*sizeof(size_t));
+      size_t* ngrt_array = malloc (nthreads*sizeof(size_t));
+ struct dist_args* d_args = malloc ((nthreads-1)*sizeof(struct dist_args));
+
+      size_t i = 0;
+      for (; i < nthreads-1; i++)
+        {
+          d_args[i].iter = lines - i*threadlines;
+          d_args[i].nlines = threadlines;
+          d_args[i].buf = buf - 2*i*threadlines;
+          d_args[i].pivot = &pivot;
+ pthread_create (&threads[i], NULL, distribute_thread, &d_args[i]);
+        }
+ struct counters* temp = distribute (lines - i*threadlines, nlines - i*threadlines,
+                                          buf - 2*i*threadlines, &pivot);
+      nless_array[nthreads-1] = temp->nless;
+      neq_array[nthreads-1] = temp->neq;
+      ngrt_array[nthreads-1] = temp->ngrt;
+      free(temp);
+      for (i = 0; i < nthreads-1; i++)
+        {
+          pthread_join (threads[i], (void**) &temp);
+          nless_array[i] = temp->nless;
+          neq_array[i] = temp->neq;
+          ngrt_array[i] = temp->ngrt;
+          free(temp);
+        }
+      free (d_args);

-      mergelines (temp, lo, nlo, sorted_hi, nhi);
+      size_t n1 = 0;
+      size_t n2 = 0;
+      for (i = 0; i < nthreads; i++)
+        {
+          n1 += nless_array[i];
+          n2 += ngrt_array[i];
+        }
+      struct line* destless = lines;
+      struct line* desteq = lines - n1;
+      struct line* destgrt = lines - nlines + n2;
+      struct copy_args* c_args = malloc(nthreads*sizeof(struct copy_args));
+      for (i = 0; i < nthreads-1; i++)
+        {
+          c_args[i].buf = buf - 2*i*threadlines;
+          c_args[i].destless = destless;
+          c_args[i].desteq = desteq;
+          c_args[i].destgrt = destgrt;
+          c_args[i].nlines = threadlines;
+          c_args[i].nless = nless_array[i];
+          c_args[i].neq = neq_array[i];
+          pthread_create (&threads[i], NULL, copy_lines, &c_args[i]);
+          destless -= nless_array[i];
+          desteq -= neq_array[i];
+          destgrt -= ngrt_array[i];
+        }
+      c_args[i].buf = buf - 2*i*threadlines;
+      c_args[i].destless = destless;
+      c_args[i].desteq = desteq;
+      c_args[i].destgrt = destgrt;
+      c_args[i].nlines = nlines - i*threadlines;
+      c_args[i].nless = nless_array[i];
+      c_args[i].neq = neq_array[i];
+      copy_lines (&c_args[i]);
+      for (i = 0; i < nthreads-1; i++)
+        pthread_join(threads[i], NULL);
+
+      free (threads);
+      free (nless_array);
+      free (neq_array);
+      free (ngrt_array);
+      free (c_args);
+
+      if (nthreads > 1 && n1 > 1 && n2 > 1)
+        {
+          size_t child_threads = (n1 * nthreads / nlines) + 0.5;
+          if (child_threads == 0)
+            child_threads = 1;
+          struct sortlines_args args = {lines, n1, child_threads, buf};
+          pthread_t new_thread;
+
+          if (child_threads < 3)
+ pthread_create (&new_thread, NULL, sortlines_merge_thread2, &args);
+          else
+            pthread_create (&new_thread, NULL, sortlines_thread, &args);
+          if (nthreads - child_threads < 3)
+              sortlines_merge (lines-nlines+n2, n2, buf - 2*n1,
+                               nthreads - child_threads, 0);
+          else
+              sortlines (lines-nlines+n2, n2, nthreads-child_threads,
+                         buf - 2*n1);
+          pthread_join (new_thread, NULL);
+        }
+      else
+        {
+          if (n1 > 1)
+              sortlines_merge (lines, n1, buf, 1, 0);
+          if (n2 > 1)
+              sortlines_merge (lines - nlines + n2, n2, buf - n1, 1, 0);
+        }
    }
}

@@ -2746,7 +3129,8 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles,
/* Sort NFILES FILES onto OUTPUT_FILE. */

static void
-sort (char * const *files, size_t nfiles, char const *output_file)
+sort (char * const *files, size_t nfiles, char const *output_file,
+      unsigned long int nthreads)
{
  struct buffer buf;
  size_t ntemps = 0;
@@ -2754,14 +3138,19 @@ sort (char * const *files, size_t nfiles, char const *output_file)

  buf.alloc = 0;

+  srand (time (0));
+
  while (nfiles)
    {
      char const *temp_output;
      char const *file = *files;
      FILE *fp = xfopen (file, "r");
      FILE *tfp;
-      size_t bytes_per_line = (2 * sizeof (struct line)
-                   - sizeof (struct line) / 2);
+
+      /* If singlethreaded, the merge uses the memory optimization
+     suggested in Knuth exercise 5.2.4-10; see sortlines.  */
+      size_t bytes_per_line = 3*sizeof (struct line);
+                  // - (1 < nthreads ? 0 : sizeof (struct line) / 2));

      if (! buf.alloc)
    initbuf (&buf, bytes_per_line,
@@ -2789,7 +3178,12 @@ sort (char * const *files, size_t nfiles, char const *output_file)
      line = buffer_linelim (&buf);
      linebase = line - buf.nlines;
      if (1 < buf.nlines)
-        sortlines (line, buf.nlines, linebase);
+            {
+ if (nthreads > 2) + sortlines (line, buf.nlines, nthreads, linebase);
+              else
+ sortlines_merge (line, buf.nlines, linebase, nthreads, false);
+            }
      if (buf.eof && !nfiles && !ntemps && !buf.left)
        {
          xfclose (fp, file);
@@ -2802,7 +3196,6 @@ sort (char * const *files, size_t nfiles, char const *output_file)
          ++ntemps;
          temp_output = create_temp (&tfp, NULL);
        }
-
      do
        {
          line--;
@@ -3041,6 +3434,7 @@ main (int argc, char **argv)
  bool mergeonly = false;
  char *random_source = NULL;
  bool need_random = false;
+  unsigned long int nthreads = 0;
  size_t nfiles = 0;
  bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL);
  bool obsolete_usage = (posix2_version () < 200112);
@@ -3051,6 +3445,7 @@ main (int argc, char **argv)

  initialize_main (&argc, &argv);
  set_program_name (argv[0]);
+//remember: originally LC_ALL, switch to 'C' as needed
  setlocale (LC_ALL, "");
  bindtextdomain (PACKAGE, LOCALEDIR);
  textdomain (PACKAGE);
@@ -3363,6 +3758,10 @@ main (int argc, char **argv)
      add_temp_dir (optarg);
      break;

+    case THREADS_OPTION:
+      nthreads = specify_nthreads (oi, c, optarg);
+      break;
+
    case 'u':
      unique = true;
      break;
@@ -3508,6 +3907,9 @@ main (int argc, char **argv)

  if (need_random)
    {
+      /* Threading does not lock the randread_source structure, so
+     downgrade to one thread to avoid race conditions. */
+      nthreads = 1;
      randread_source = randread_new (random_source, MD5_DIGEST_SIZE);
      if (! randread_source)
    die (_("open failed"), random_source);
@@ -3562,7 +3964,20 @@ main (int argc, char **argv)
      IF_LINT (free (sortfiles));
    }
  else
-    sort (files, nfiles, outfile);
+    {
+      if (!nthreads)
+    {
+      /* The default NTHREADS is 2 ** floor (log2 (number of processors)).
+         If comparisons do not vary in cost and threads are
+         scheduled evenly, with the current algorithm there is no
+         performance advantage to using a number of threads that
+         is not a power of 2.  */
+      unsigned long int np2 = num_processors () / 2;
+      for (nthreads = 1; nthreads <= np2; nthreads *= 2)
+        continue;
+    }
+      sort (files, nfiles, outfile, nthreads);
+    }

  if (have_read_stdin && fclose (stdin) == EOF)
    die (_("close failed"), "-");




reply via email to

[Prev in Thread] Current Thread [Next in Thread]