bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH] sort: add --threads option to parallelize internal sort.


From: Chen Guo
Subject: Re: [PATCH] sort: add --threads option to parallelize internal sort.
Date: Sat, 13 Mar 2010 04:17:16 -0800 (PST)

Hi Padraig,
    Your changes look fine to me, attached and at the bottom is
resubmission, with your changes, plus I realized there was both a
function and a struct named merge_node; so the function is now
mergelines_node.

    About the spinlock thing... I did some tinkering with mutexes. Tests
are on a Xeon, dual die, 8 total cores, with cap on nthreads removed.
96 MB input file, 1M lines of 100 bytes each. I ran a set using
spinlocks to lock struct merge_node, then one more using mutexes.

with spinlock:
bash-3.2$ for i in {1..5}; do env time -f "%e" ./sort --parallel=8 -S300M randL 
> /dev/null; done
1.29
1.32
1.28
1.30
1.32
bash-3.2$ for i in {1..5}; do env time -f "%e" ./sort --parallel=10 -S300M 
randL > /dev/null; done
3.19
1.53
4.85
1.48
1.59
bash-3.2$ for i in {1..5}; do env time -f "%e" ./sort --parallel=16 -S300M 
randL > /dev/null; done
3.64
8.98
4.13
2.81
2.85

with mutex:
bash-3.2$ for i in {1..5}; do env time -f "%e" ./sort --parallel=8 -S300M randL 
> /dev/null; done
1.57
1.55
1.59
1.47
1.55
bash-3.2$ for i in {1..5}; do env time -f "%e" ./sort --parallel=10 -S300M 
randL > /dev/null; done
1.49
1.48
1.46
1.52
1.45
bash-3.2$ for i in {1..5}; do env time -f "%e" ./sort --parallel=16 -S300M 
randL > /dev/null; done
1.44
1.51
1.46
1.48
1.46

Hence you see why we've used spinlocks, and capped the threads. When I
go home after finals next week I'll run tests on my dad's i7 machine. which
has logical cores > physical cores. From what I remember, on 8 threads
there vs 4, the average time was still faster than 4, but the variance was
quite high.

As for threaded external sort, originally Joey just called sort with nthreads /
# of disks for each disk. In this case, there wont be competing spinlocks
even with nthread threads for each disk.

However, he's been looking at load balancing (I jumped in on that just 
today, too) and it looks like the best approach there is to have a global
pool of threads, with no restriction on which threads work on which disk.

At first he did this really cool thing, dynamic thread allocation, where if
sorting on one disk has finished that thread'll go and help with the sorting
on another disk.

Joey got it working, but I think he decided it was too complicated, and
we're working on a simpler implementation right now, with that pool of
globally shared threads I talked about earlier. As it progresses I'll see if
we cant cram more threads in there.



From 1f6d349d564fdb6e5b8ed22b80847e68ddbbc263 Mon Sep 17 00:00:00 2001
From: Chen Guo <address@hidden>
Date: Fri, 5 Mar 2010 20:18:08 -0800
Subject: [PATCH] sort: add --parallel option to parallelize internal sort.
 This patch is by Gene Auyeung, Chris Dickens, Chen Guo, and Mike
 Nichols, based off of a patch by Paul Eggert, Glen Lenker, et
 al, with a basic heap implementation based off of the GDSL heap,
 originally by Nicolas Darnis.

On an dual-die, 8 core Intel Xeon, initial results sorting with
8 threads is almost 4X as fast as sorting with 1 thread. Single
threaded sorting has also been improved over the current
implemenation.

Sorting a 96M file on the aforementioned machine:
1 thread : 5.1s
2 threads: 2.87s
4 threads: 1.75s
8 threads: 1.31s

* bootstrap.conf: Add heap, pthread.
* coreutils.texi (sort): Document --parallel option.
* gl/lib/heap.c: New file. Very basic heap implementation.
* gl/lib/heap.h: New file.
* gl/modules/heap: New file.
* src/Makefile.am: Add LIB_PTHREAD.
* src/sort.c: Include heap.h, nproc.h, pthread.h.
(MAX_MERGE): New macro.
(SUBTHREAD_LINES_HEURISTIC, PARALLEL_OPTION): New constants.
(MERGE_END, MERGE_ROOT): New constants.
(struct merge_node): New struct.
(struct merge_node_queue): New struct.
(sortlines temp): Remove declaration.
(usage, long_options, main): New option, --parallel.
(specify_nthreads): New function.
(mergelines): New signature, to emphasize the fact that the HI area
must be part of the destination.  All callers changed.
(sequential_sort): New function, renamed from sortlines. Merge in
the functionality of sortlines_temp.
(compare_nodes): New function.
(lock_node, unlock_node): New functions.
(queue_destroy): New function.
(queue_init): New function.
(queue_insert): New function.
(queue_pop): New function.
(write_unqiue): New function.
(mergelines_node): New function.
(check_insert): New function.
(update_parent): New function.
(merge_loop): New function.
(sortlines): Rewrite to support and use parallelism, with a new
signature. All callers changed.
(struct thread_args): New struct.
(sortlines_thread): New function.
(sortlines_temp): Remove.
(sort): New argument NTHREADS. All uses changed. Output moved to
merge_node.
(main): disable threading if we are sorting at random.
* tests/Makefile.am (TESTS): Add misc/sort-benchmark-random.
* tests/misc/sort-benchmark-random: New file.
---
 bootstrap.conf                   |    2 +
 doc/coreutils.texi               |   15 +
 gl/lib/heap.c                    |  159 +++++++++++
 gl/lib/heap.h                    |   34 +++
 gl/modules/heap                  |   21 ++
 gnulib                           |    2 +-
 src/Makefile.am                  |    3 +
 src/sort.c                       |  578 ++++++++++++++++++++++++++++++++++----
 tests/Makefile.am                |    1 +
 tests/misc/sort-benchmark-random |   57 ++++
 10 files changed, 809 insertions(+), 63 deletions(-)
 create mode 100644 gl/lib/heap.c
 create mode 100644 gl/lib/heap.h
 create mode 100644 gl/modules/heap
 create mode 100644 tests/misc/sort-benchmark-random

diff --git a/bootstrap.conf b/bootstrap.conf
index 9cdf984..034720d 100644
--- a/bootstrap.conf
+++ b/bootstrap.conf
@@ -123,6 +123,7 @@ gnulib_modules="
   hard-locale
   hash
   hash-pjw
+  heap
   host-os
   human
   idcache
@@ -171,6 +172,7 @@ gnulib_modules="
   priv-set
   progname
   propername
+  pthread
   putenv
   quote
   quotearg
diff --git a/doc/coreutils.texi b/doc/coreutils.texi
index 34ccf5a..b29739f 100644
--- a/doc/coreutils.texi
+++ b/doc/coreutils.texi
@@ -4061,6 +4061,14 @@ have a large sort or merge that is I/O-bound, you can 
often improve
 performance by using this option to specify directories on different
 disks and controllers.
 
address@hidden address@hidden
address@hidden --parallel
address@hidden multithreaded sort
+Limit the number of sorts run in parallel to @var{n}. By default,
address@hidden is set to the number of available processors, and values
+greater than that are reduced to that limit. Also see
address@hidden invocation}.
+
 @item -u
 @itemx --unique
 @opindex -u
@@ -4151,6 +4159,13 @@ sort -n -r
 @end example
 
 @item
+Run no more that 4 sorts concurrently, using a buffer size of 10M.
+
address@hidden
+sort --parallel=4 -S 10M
address@hidden example
+
address@hidden
 Sort alphabetically, omitting the first and second fields
 and the blanks at the start of the third field.
 This uses a single key composed of the characters beginning
diff --git a/gl/lib/heap.c b/gl/lib/heap.c
new file mode 100644
index 0000000..a37224f
--- /dev/null
+++ b/gl/lib/heap.c
@@ -0,0 +1,159 @@
+/* Barebones heap implementation supporting only insert and pop.
+
+   Copyright (C) 2010 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas
+   Darnis <address@hidden>. */
+
+#include <config.h>
+
+#include "heap.h"
+#include "stdlib--.h"
+#include "xalloc.h"
+
+static int heap_default_compare (const void *, const void *);
+static size_t heapify_down (void **, size_t, size_t,
+                            int (*)(const void *, const void *));
+static void heapify_up (void **, size_t,
+                        int (*)(const void *, const void *));
+
+
+/* Allocate memory for the heap. */
+
+struct heap *
+heap_alloc (int (*compare)(const void *, const void *), size_t n_reserve)
+{
+  struct heap *heap;
+  void *xmalloc_ret = xmalloc (sizeof *heap);
+  heap = (struct heap *) xmalloc_ret;
+  if (!heap)
+    return NULL;
+
+  if (n_reserve <= 0)
+    n_reserve = 1;
+
+  xmalloc_ret = xmalloc (n_reserve * sizeof *(heap->array));
+  heap->array = (void **) xmalloc_ret;
+  if (!heap->array)
+    {
+      free (heap);
+      return NULL;
+    }
+
+  heap->array[0] = NULL;
+  heap->capacity = n_reserve;
+  heap->count = 0;
+  heap->compare = compare ? compare : heap_default_compare;
+
+  return heap;
+}
+
+
+static int
+heap_default_compare (const void *a, const void *b)
+{
+  return 0;
+}
+
+
+void
+heap_free (struct heap *heap)
+{
+  free (heap->array);
+  free (heap);
+}
+
+/* Insert element into heap. */
+
+int
+heap_insert (struct heap *heap, void *item)
+{
+  if (heap->capacity - 1 <= heap->count)
+    {
+      size_t new_size = (2 + heap->count) * sizeof *(heap->array);
+      void *realloc_ret = xrealloc (heap->array, new_size);
+      heap->array = (void **) realloc_ret;
+      heap->capacity = (2 + heap->count);
+
+      if (!heap->array)
+        return -1;
+    }
+
+  heap->array[++heap->count] = item;
+  heapify_up (heap->array, heap->count, heap->compare);
+
+  return 0;
+}
+
+/* Pop top element off heap. */
+
+void *
+heap_remove_top (struct heap *heap)
+{
+  if (heap->count == 0)
+    return NULL;
+
+  void *top = heap->array[1];
+  heap->array[1] = heap->array[heap->count--];
+  heapify_down (heap->array, heap->count, 1, heap->compare);
+
+  return top;
+}
+
+/* Move element down into appropriate position in heap. */
+
+static size_t
+heapify_down (void **array, size_t count, size_t initial,
+              int (*compare)(const void *, const void *))
+{
+  void *element = array[initial];
+
+  size_t parent = initial;
+  while (parent <= count / 2)
+    {
+      size_t child = 2 * parent;
+
+      if (child < count && compare (array[child], array[child+1]) < 0)
+        child++;
+
+      if (compare (array[child], element) <= 0)
+        break;
+
+      array[parent] = array[child];
+      parent = child;
+    }
+
+  array[parent] = element;
+  return parent;
+}
+
+/* Move element up into appropriate position in heap. */
+
+static void
+heapify_up (void **array, size_t count,
+            int (*compare)(const void *, const void *))
+{
+  size_t k = count;
+  void *new_element = array[k];
+
+  while (k != 1 && compare (array[k/2], new_element) <= 0)
+    {
+      array[k] = array[k/2];
+      k /= 2;
+    }
+
+  array[k] = new_element;
+}
diff --git a/gl/lib/heap.h b/gl/lib/heap.h
new file mode 100644
index 0000000..0ea516a
--- /dev/null
+++ b/gl/lib/heap.h
@@ -0,0 +1,34 @@
+/* Barebones heap implementation supporting only insert and pop.
+
+   Copyright (C) 2010 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+/* Full implementation: GDSL (http://gna.org/projects/gdsl/) by Nicolas
+   Darnis <address@hidden>. Adapted by Gene Auyeung. */
+
+#include <stddef.h>
+
+struct heap
+{
+  void **array;     /* array[0] is not used */
+  size_t capacity;  /* Array size */
+  size_t count;     /* Used as index to last element. Also is num of items. */
+  int (*compare)(const void *, const void *);
+};
+
+struct heap *heap_alloc (int (*)(const void *, const void *), size_t);
+void heap_free (struct heap *);
+int heap_insert (struct heap *heap, void *item);
+void *heap_remove_top (struct heap *heap);
diff --git a/gl/modules/heap b/gl/modules/heap
new file mode 100644
index 0000000..d8d061b
--- /dev/null
+++ b/gl/modules/heap
@@ -0,0 +1,21 @@
+Description:
+Binary heap with minimal number of methods. Used in sort.
+
+Files:
+lib/heap.c
+lib/heap.h
+
+Depends-on:
+
+configure.ac:
+
+Makefile.am:
+lib_SOURCES += heap.c heap.h
+
+Include:
+
+License
+GPL
+
+Maintainer:
+Gene Auyeung
diff --git a/gnulib b/gnulib
index 80cd995..e961f4c 160000
--- a/gnulib
+++ b/gnulib
@@ -1 +1 @@
-Subproject commit 80cd995cdcbf4b9ded895a43621a11f11806ad8d
+Subproject commit e961f4c04b6bbd1bf6f98c91fa0e6ae1d7a8eea9
diff --git a/src/Makefile.am b/src/Makefile.am
index ecb42a8..f44974e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -391,6 +391,9 @@ who_LDADD += $(GETADDRINFO_LIB)
 hostname_LDADD += $(GETHOSTNAME_LIB)
 uname_LDADD += $(GETHOSTNAME_LIB)
 
+# for pthread
+sort_LDADD += $(LIB_PTHREAD)
+
 $(PROGRAMS): ../lib/libcoreutils.a
 
 # Get the release year from ../lib/version-etc.c.
diff --git a/src/sort.c b/src/sort.c
index 5a937dd..08f4919 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -23,6 +23,7 @@
 #include <config.h>
 
 #include <getopt.h>
+#include <pthread.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <signal.h>
@@ -32,8 +33,10 @@
 #include "filevercmp.h"
 #include "hard-locale.h"
 #include "hash.h"
+#include "heap.h"
 #include "ignore-value.h"
 #include "md5.h"
+#include "nproc.h"
 #include "physmem.h"
 #include "posixver.h"
 #include "quote.h"
@@ -92,6 +95,18 @@ struct rlimit { size_t rlim_cur; };
 # define DEFAULT_TMPDIR "/tmp"
 #endif
 
+/* Maximum number of lines to merge every time a NODE is taken from
+   the MERGE_QUEUE.  Node is at level LEVEL in the binary merge tree,
+   and is responsible for merging TOTAL lines. */
+#define MAX_MERGE(total, level) ((total) / ((2 << level) * (2 << level)) + 1)
+
+/* Heuristic value for the number of lines for which it is worth
+   creating a subthread, during an internal merge sort, on a machine
+   that has processors galore.  Currently this number is just a guess.
+   This value must be at least 4.  We don't know of any machine where
+   this number has any practical effect.  */
+enum { SUBTHREAD_LINES_HEURISTIC = 4 };
+
 /* Exit statuses.  */
 enum
   {
@@ -118,6 +133,15 @@ enum
     MAX_FORK_TRIES_DECOMPRESS = 9
   };
 
+enum
+  {
+    /* Level of the end-of-merge node, one level above the root. */
+    MERGE_END = 0,
+
+    /* Level of the root node in merge tree. */
+    MERGE_ROOT = 1
+  };
+
 /* The representation of the decimal point in the current locale.  */
 static int decimal_point;
 
@@ -194,6 +218,31 @@ struct month
   int val;
 };
 
+/* Binary merge tree node. */
+struct merge_node
+{
+  struct line *lo;              /* Lines to merge from LO child node. */
+  struct line *hi;              /* Lines to merge from HI child ndoe. */
+  struct line *end_lo;          /* End of available lines from LO. */
+  struct line *end_hi;          /* End of available lines from HI. */
+  struct line **dest;           /* Pointer to destination of merge. */
+  size_t nlo;                   /* Total Lines remaining from LO. */
+  size_t nhi;                   /* Total lines remaining from HI. */
+  size_t level;                 /* Level in merge tree. */
+  struct merge_node *parent;    /* Parent node. */
+  bool queued;                  /* Node is already in heap. */
+  pthread_spinlock_t *lock;     /* Lock for node operations. */
+};
+
+/* Priority queue of merge nodes. */
+struct merge_node_queue
+{
+  struct heap *priority_queue;  /* Priority queue of merge tree nodes. */
+  pthread_mutex_t mutex;        /* Lock for queue operations. */
+  pthread_cond_t cond;          /* Conditional wait for empty queue to populate
+                                   when popping. */
+};
+
 /* FIXME: None of these tables work with multibyte character sets.
    Also, there are many other bugs when handling multibyte characters.
    One way to fix this is to rewrite `sort' to use wide characters
@@ -295,8 +344,6 @@ static char const *compress_program;
    number are present, temp files will be used. */
 static unsigned int nmerge = NMERGE_DEFAULT;
 
-static void sortlines_temp (struct line *, size_t, struct line *);
-
 /* Report MESSAGE for FILE, then clean up and exit.
    If FILE is null, it represents standard output.  */
 
@@ -389,6 +436,7 @@ Other options:\n\
   -t, --field-separator=SEP  use SEP instead of non-blank to blank 
transition\n\
   -T, --temporary-directory=DIR  use DIR for temporaries, not $TMPDIR or %s;\n\
                               multiple options specify multiple directories\n\
+      --parallel=N          Limit the number of sorts run concurrently to N\n\
   -u, --unique              with -c, check for strict ordering;\n\
                               without -c, output only the first of an equal 
run\n\
 "), DEFAULT_TMPDIR);
@@ -432,7 +480,8 @@ enum
   FILES0_FROM_OPTION,
   NMERGE_OPTION,
   RANDOM_SOURCE_OPTION,
-  SORT_OPTION
+  SORT_OPTION,
+  PARALLEL_OPTION
 };
 
 static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z";
@@ -465,6 +514,7 @@ static struct option const long_options[] =
   {"temporary-directory", required_argument, NULL, 'T'},
   {"unique", no_argument, NULL, 'u'},
   {"zero-terminated", no_argument, NULL, 'z'},
+  {"parallel", required_argument, NULL, PARALLEL_OPTION},
   {GETOPT_HELP_OPTION_DECL},
   {GETOPT_VERSION_OPTION_DECL},
   {NULL, 0, NULL, 0},
@@ -1112,8 +1162,9 @@ open_temp (const char *name, pid_t pid)
 }
 
 static void
-write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char 
*output_file)
+write_bytes (char *buf, size_t n_bytes, FILE *fp, const char *output_file)
 {
+  *(buf + n_bytes - 1) = eolchar;
   if (fwrite (buf, 1, n_bytes, fp) != n_bytes)
     die (_("write failed"), output_file);
 }
@@ -1329,6 +1380,22 @@ specify_sort_size (int oi, char c, char const *s)
   xstrtol_fatal (e, oi, c, long_options, s);
 }
 
+/* Specify the number of threads to spawn during internal sort.  */
+static unsigned long int
+specify_nthreads (int oi, char c, char const *s)
+{
+  unsigned long int nthreads;
+  enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, "");
+  if (e == LONGINT_OVERFLOW)
+    return ULONG_MAX;
+  if (e != LONGINT_OK)
+    xstrtol_fatal (e, oi, c, long_options, s);
+  if (nthreads == 0)
+    error (SORT_FAILURE, 0, _("number in parallel must be nonzero"));
+  return nthreads;
+}
+
+
 /* Return the default sort size.  */
 static size_t
 default_sort_size (void)
@@ -1673,13 +1740,17 @@ fillbuf (struct buffer *buf, FILE *fp, char const *file)
                   if (buf->buf == ptrlim)
                     return false;
                   if (ptrlim[-1] != eol)
-                    *ptrlim++ = eol;
+                    *ptrlim++ = '\0';
                 }
             }
 
           /* Find and record each line in the just-read input.  */
           while ((p = memchr (ptr, eol, ptrlim - ptr)))
             {
+              /* Delimit the line with NUL. This eliminates the need to
+                 replace the last byte with NUL, then putting the eolchar
+                 back before calling strcoll. */
+              *p = '\0';
               ptr = p + 1;
               line--;
               line->text = line_start;
@@ -2143,7 +2214,7 @@ keycompare (const struct line *a, const struct line *b)
                     }
                 }
 
-              diff = xmemcoll (copy_a, new_len_a, copy_b, new_len_b);
+              diff = xmemcoll0 (copy_a, new_len_a, copy_b, new_len_b);
 
               if (sizeof buf < size)
                 free (copy_a);
@@ -2153,7 +2224,7 @@ keycompare (const struct line *a, const struct line *b)
           else if (lenb == 0)
             goto greater;
           else
-            diff = xmemcoll (texta, lena, textb, lenb);
+            diff = xmemcoll0 (texta, lena, textb, lenb);
         }
       else if (ignore)
         {
@@ -2274,7 +2345,7 @@ compare (const struct line *a, const struct line *b)
   else if (blen == 0)
     diff = 1;
   else if (hard_LC_COLLATE)
-    diff = xmemcoll (a->text, alen, b->text, blen);
+    diff = xmemcoll0 (a->text, alen, b->text, blen);
   else if (! (diff = memcmp (a->text, b->text, MIN (alen, blen))))
     diff = alen < blen ? -1 : alen != blen;
 
@@ -2615,25 +2686,28 @@ mergefiles (struct sortfile *files, size_t ntemps, 
size_t nfiles,
   return nopened;
 }
 
-/* Merge into T the two sorted arrays of lines LO (with NLO members)
-   and HI (with NHI members).  T, LO, and HI point just past their
-   respective arrays, and the arrays are in reverse order.  NLO and
-   NHI must be positive, and HI - NHI must equal T - (NLO + NHI).  */
+/* Merge into T (of size NLINES) the two sorted arrays of lines
+   LO (with NLINES / 2 members), and
+   T - (NLINES / 2) (with NLINES - NLINES / 2 members).
+   T and LO point just past their respective arrays, and the arrays
+   are in reverse order.  NLINES must be at least 2.  */
 
 static inline void
-mergelines (struct line *t,
-            struct line const *lo, size_t nlo,
-            struct line const *hi, size_t nhi)
+mergelines (struct line *restrict t, size_t nlines,
+            struct line const *restrict lo)
 {
+  size_t nlo = nlines / 2;
+  size_t nhi = nlines - nlo;
+  struct line *hi = t - nlo;
+
   for (;;)
     if (compare (lo - 1, hi - 1) <= 0)
       {
         *--t = *--lo;
         if (! --nlo)
           {
-            /* HI - NHI equalled T - (NLO + NHI) when this function
-               began.  Therefore HI must equal T now, and there is no
-               need to copy from HI to T.  */
+            /* HI must equal T now, and there is no need to copy from
+               HI to T. */
             return;
           }
       }
@@ -2664,15 +2738,25 @@ mergelines (struct line *t,
    D. A. Bell, Comp J. 1 (1958), 75.  */
 
 static void
-sortlines (struct line *lines, size_t nlines, struct line *temp)
+sequential_sort (struct line *restrict lines, size_t nlines,
+                 struct line *restrict temp, bool to_temp)
 {
   if (nlines == 2)
     {
-      if (0 < compare (&lines[-1], &lines[-2]))
+      /* Declare `swap' as int, not bool, to work around a bug
+         
<http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
+         in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
+      int swap = (0 < compare (&lines[-1], &lines[-2]));
+      if (to_temp)
         {
-          struct line tmp = lines[-1];
+          temp[-1] = lines[-1 - swap];
+          temp[-2] = lines[-2 + swap];
+        }
+      else if (swap)
+        {
+          temp[-1] = lines[-1];
           lines[-1] = lines[-2];
-          lines[-2] = tmp;
+          lines[-2] = temp[-1];
         }
     }
   else
@@ -2681,46 +2765,382 @@ sortlines (struct line *lines, size_t nlines, struct 
line *temp)
       size_t nhi = nlines - nlo;
       struct line *lo = lines;
       struct line *hi = lines - nlo;
-      struct line *sorted_lo = temp;
 
-      sortlines (hi, nhi, temp);
+      sequential_sort (hi, nhi, temp - (to_temp ? nlo : 0), to_temp);
       if (1 < nlo)
-        sortlines_temp (lo, nlo, sorted_lo);
+        sequential_sort (lo, nlo, temp, !to_temp);
+      else if (!to_temp)
+        temp[-1] = lo[-1];
+
+      struct line *dest;
+      struct line const *sorted_lo;
+      if (to_temp)
+        {
+          dest = temp;
+          sorted_lo = lines;
+        }
       else
-        sorted_lo[-1] = lo[-1];
+        {
+          dest = lines;
+          sorted_lo = temp;
+        }
+      mergelines (dest, nlines, sorted_lo);
+    }
+}
+
+/* Compare two NODEs for priority. The NODE with the higher (numerically
+   lower) level has priority. If tie, the NODE with the most remaining
+   lines has priority. */
+
+static int
+compare_nodes (const void *a, const void *b)
+{
+  const struct merge_node *nodea = (const struct merge_node *) a;
+  const struct merge_node *nodeb = (const struct merge_node *) b;
+  if (nodea->level == nodeb->level)
+      return (nodea->nlo + nodea->nhi) < (nodeb->nlo + nodeb->nhi);
+  return nodea->level < nodeb->level;
+}
+
+/* Lock a merge tree NODE. */
 
-      mergelines (lines, sorted_lo, nlo, hi, nhi);
+static inline void
+lock_node (struct merge_node *const restrict node)
+{
+  pthread_spin_lock (node->lock);
+}
+
+/* Unlock a merge tree NODE. */
+
+static inline void
+unlock_node (struct merge_node *const restrict node)
+{
+  pthread_spin_unlock (node->lock);
+}
+
+/* Destroy merge QUEUE. */
+
+static inline void
+queue_destroy (struct merge_node_queue *const restrict queue)
+{
+  heap_free (queue->priority_queue);
+  pthread_cond_destroy (&queue->cond);
+  pthread_mutex_destroy (&queue->mutex);
+}
+
+/* Initialize merge QUEUE, allocating space for a maximum of RESERVE nodes.
+   Though it's highly unlikely all nodes are in the heap at the same time,
+   RESERVE should accommodate all of them. Counting a NULL dummy head for the
+   heap, RESERVE should be 2 * NTHREADS. */
+
+static inline void
+queue_init (struct merge_node_queue *const restrict queue, size_t reserve)
+{
+  queue->priority_queue = (struct heap *) heap_alloc (compare_nodes, reserve);
+  pthread_mutex_init (&queue->mutex, NULL);
+  pthread_cond_init (&queue->cond, NULL);
+}
+
+/* Insert NODE into priority QUEUE. Assume caller either holds lock on NODE
+   or does not need to lock NODE. */
+
+static inline void
+queue_insert (struct merge_node_queue *const restrict queue,
+              struct merge_node *const restrict node)
+{
+  pthread_mutex_lock (&queue->mutex);
+  heap_insert (queue->priority_queue, node);
+  node->queued = true;
+  pthread_mutex_unlock (&queue->mutex);
+  pthread_cond_signal (&queue->cond);
+}
+
+/* Pop NODE off priority QUEUE. Guarantee a non-null, spinlocked NODE. */
+
+static inline struct merge_node *
+queue_pop (struct merge_node_queue *const restrict queue)
+{
+  struct merge_node *node = NULL;
+
+  while (!node)
+    {
+      pthread_mutex_lock (&queue->mutex);
+      if (queue->priority_queue->count)
+        node = (struct merge_node *) heap_remove_top (queue->priority_queue);
+      else
+        {
+          /* Go into conditional wait if no NODE is immediately available.  */
+          pthread_cond_wait (&queue->cond, &queue->mutex);
+        }
+      pthread_mutex_unlock (&queue->mutex);
     }
+  lock_node (node);
+  node->queued = false;
+  return node;
 }
 
-/* Like sortlines (LINES, NLINES, TEMP), except output into TEMP
-   rather than sorting in place.  */
+/* If UNQIUE is set, checks to make sure line isn't a duplicate before
+   outputting. If UNIQUE is not set, output the passed in line. Note that
+   this function does not actually save the line, nor any key information,
+   thus is only appropriate for internal sort. */
 
-static void
-sortlines_temp (struct line *lines, size_t nlines, struct line *temp)
+static inline void
+write_unique (struct line *const restrict line, FILE *tfp,
+              const char *temp_output)
 {
-  if (nlines == 2)
+  static struct line *saved = NULL;
+
+  if (!unique)
+    write_bytes (line->text, line->length, tfp, temp_output);
+  else if (!saved || compare (line, saved))
     {
-      /* Declare `swap' as int, not bool, to work around a bug
-         
<http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
-         in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
-      int swap = (0 < compare (&lines[-1], &lines[-2]));
-      temp[-1] = lines[-1 - swap];
-      temp[-2] = lines[-2 + swap];
+      saved = line;
+      write_bytes (line->text, line->length, tfp, temp_output);
+    }
+}
+
+/* Merge the lines currently available to a NODE in the binary
+   merge tree, up to a maximum specified by MAX_MERGE. */
+
+static inline size_t
+mergelines_node (struct merge_node *const restrict node, size_t total_lines,
+                 FILE *tfp, const char *temp_output)
+{
+  struct line *lo_orig = node->lo;
+  struct line *hi_orig = node->hi;
+  size_t to_merge = MAX_MERGE (total_lines, node->level);
+  size_t merged_lo;
+  size_t merged_hi;
+
+  if (node->level > MERGE_ROOT)
+    {
+      /* Merge to destination buffer. */
+      struct line *dest = *node->dest;
+      while (node->lo != node->end_lo && node->hi != node->end_hi && 
to_merge--)
+        if (compare (node->lo - 1, node->hi - 1) <= 0)
+          *--dest = *--node->lo;
+        else
+          *--dest = *--node->hi;
+
+      merged_lo = lo_orig - node->lo;
+      merged_hi = hi_orig - node->hi;
+
+      if (node->nhi == merged_hi)
+        while (node->lo != node->end_lo && to_merge--)
+          *--dest = *--node->lo;
+      else if (node->nlo == merged_lo)
+        while (node->hi != node->end_hi && to_merge--)
+          *--dest = *--node->hi;
     }
   else
     {
-      size_t nlo = nlines / 2;
-      size_t nhi = nlines - nlo;
-      struct line *lo = lines;
-      struct line *hi = lines - nlo;
-      struct line *sorted_hi = temp - nlo;
+      /* Merge directly to output. */
+      while (node->lo != node->end_lo && node->hi != node->end_hi && 
to_merge--)
+        {
+          if (compare (node->lo - 1, node->hi - 1) <= 0)
+            write_unique (--node->lo, tfp, temp_output);
+          else
+            write_unique (--node->hi, tfp, temp_output);
+        }
+
+      merged_lo = lo_orig - node->lo;
+      merged_hi = hi_orig - node->hi;
+
+      if (node->nhi == merged_hi)
+        {
+          while (node->lo != node->end_lo && to_merge--)
+            write_unique (--node->lo, tfp, temp_output);
+        }
+      else if (node->nlo == merged_lo)
+        {
+          while (node->hi != node->end_hi && to_merge--)
+            write_unique (--node->hi, tfp, temp_output);
+        }
+      node->dest -= lo_orig - node->lo + hi_orig - node->hi;
+    }
+
+  /* Update NODE. */
+  merged_lo = lo_orig - node->lo;
+  merged_hi = hi_orig - node->hi;
+  node->nlo -= merged_lo;
+  node->nhi -= merged_hi;
+  return merged_lo + merged_hi;
+}
+
+/* Insert NODE into QUEUE if it passes insertion checks. */
+
+static inline void
+check_insert (struct merge_node *node,
+              struct merge_node_queue *const restrict queue)
+{
+  size_t lo_avail = node->lo - node->end_lo;
+  size_t hi_avail = node->hi - node->end_hi;
+
+  /* Conditions for insertion:
+     1. NODE is not already in heap.
+     2. NODE has available lines from both it's children, OR one child has
+          available lines, but the other has exhausted all its lines. */
+  if ((!node->queued)
+      && ((lo_avail && (hi_avail || !(node->nhi)))
+          || (hi_avail && !(node->nlo))))
+    {
+      queue_insert (queue, node);
+    }
+}
+
+/* Update parent merge tree NODE. */
+
+static inline void
+update_parent (struct merge_node *const restrict node, size_t merged,
+               struct merge_node_queue *const restrict queue)
+{
+  if (node->level > MERGE_ROOT)
+    {
+      lock_node (node->parent);
+      *node->dest -= merged;
+      check_insert (node->parent, queue);
+      unlock_node (node->parent);
+    }
+  else if (node->nlo + node->nhi == 0)
+    {
+      /* If the MERGE_ROOT NODE has finished merging, insert the
+         MERGE_END node.  */
+      queue_insert (queue, node->parent);
+    }
+}
+
+/* Repeatedly pop QUEUE for a NODE with lines to merge, and merge at least
+   some of those lines, until the MERGE_END node is popped. */
+
+static void
+merge_loop (struct merge_node_queue *const restrict queue,
+            const size_t total_lines, FILE *tfp, const char *temp_output)
+{
+  while (1)
+    {
+      struct merge_node *node = queue_pop (queue);
+
+      if (node->level == MERGE_END)
+        {
+          unlock_node (node);
+          /* Reinsert so other threads can pop it. */
+          queue_insert (queue, node);
+          break;
+        }
+      size_t merged_lines = merge_node (node, total_lines, tfp, temp_output);
+      check_insert (node, queue);
+      update_parent (node, merged_lines, queue);
+
+      unlock_node (node);
+    }
+}
+
+
+static void sortlines (struct line *restrict, struct line *restrict,
+                       unsigned long int, const size_t,
+                       struct merge_node *const restrict, bool,
+                       struct merge_node_queue *const restrict,
+                       FILE *, const char *);
+
+/* Thread arguments for sortlines_thread. */
+
+struct thread_args
+{
+  struct line *lines;
+  struct line *dest;
+  unsigned long int nthreads;
+  const size_t total_lines;
+  struct merge_node *const restrict parent;
+  bool lo_child;
+  struct merge_node_queue *const restrict merge_queue;
+  FILE *tfp;
+  const char *output_temp;
+};
+
+/* Like sortlines, except with a signature acceptable to pthread_create.  */
 
-      sortlines_temp (hi, nhi, sorted_hi);
+static void *
+sortlines_thread (void *data)
+{
+  struct thread_args const *args = data;
+  sortlines (args->lines, args->dest, args->nthreads, args->total_lines,
+             args->parent, args->lo_child, args->merge_queue,
+             args->tfp, args->output_temp);
+  return NULL;
+}
+
+/* There are three phases to the algorithm: node creation, sequential sort,
+   and binary merge.
+
+   During node creation, sortlines recursively visits each node in the
+   binary merge tree and creates a NODE structure corresponding to all the
+   future line merging NODE is responsible for. For each call to
+   sortlines, half the available threads are assigned to each recursive
+   call, until a leaf node having only 1 available thread is reached.
+
+   Each leaf node then performs two sequential sorts, one on each half of
+   the lines it is responsible for. It records in its NODE structure that
+   there are two sorted sublists available to merge from, and inserts its
+   NODE into the priority queue.
+
+   The binary merge phase then begins. Each thread drops into a loop
+   where the thread retrieves a NODE from the priority queue, merges lines
+   available to that NODE, and potentially insert NODE or its parent back
+   into the queue if there are sufficient available lines for them to
+   merge. This continues until all lines at all nodes of the merge tree
+   have been merged. */
+
+static void
+sortlines (struct line *restrict lines, struct line *restrict dest,
+           unsigned long int nthreads, const size_t total_lines,
+           struct merge_node *const restrict parent, bool lo_child,
+           struct merge_node_queue *const restrict merge_queue,
+           FILE *tfp, const char *temp_output)
+{
+  /* Create merge tree NODE. */
+  size_t nlines = (lo_child)? parent->nlo : parent->nhi;
+  size_t nlo = nlines / 2;
+  size_t nhi = nlines - nlo;
+  struct line *lo = dest - total_lines;
+  struct line *hi = lo - nlo;
+  struct line **parent_end = (lo_child)? &parent->end_lo : &parent->end_hi;
+  pthread_spinlock_t lock;
+  pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+  struct merge_node node = {lo, hi, lo, hi, parent_end, nlo, nhi,
+                            parent->level + 1, parent, false, &lock};
+
+  /* Calculate thread arguments. */
+  unsigned long int lo_threads = nthreads / 2;
+  unsigned long int hi_threads = nthreads - lo_threads;
+  pthread_t thread;
+  struct thread_args args = {lines, lo, lo_threads, total_lines, &node,
+                             true, merge_queue, tfp, temp_output};
+
+  if (nthreads > 1 && SUBTHREAD_LINES_HEURISTIC <= nlines
+      && pthread_create (&thread, NULL, sortlines_thread, &args) == 0)
+    {
+      sortlines (lines - nlo, hi, hi_threads, total_lines, &node, false,
+                 merge_queue, tfp, temp_output);
+      pthread_join (thread, NULL);
+    }
+  else
+    {
+      /* Nthreads = 1, this is a leaf NODE, or pthread_create failed.
+         Sort with 1 thread. */
+      struct line *temp = lines - total_lines;
+      if (1 < nhi)
+        sequential_sort (lines - nlo, nhi, temp - nlo / 2, false);
       if (1 < nlo)
-        sortlines (lo, nlo, temp);
+        sequential_sort (lines, nlo, temp, false);
 
-      mergelines (temp, lo, nlo, sorted_hi, nhi);
+      /* Update merge NODE. No need to lock yet. */
+      node.lo = lines;
+      node.hi = lines - nlo;
+      node.end_lo = lines - nlo;
+      node.end_hi = lines - nlo - nhi;
+
+      queue_insert (merge_queue, &node);
+      merge_loop (merge_queue, total_lines, tfp, temp_output);
     }
 }
 
@@ -2926,7 +3346,8 @@ merge (struct sortfile *files, size_t ntemps, size_t 
nfiles,
 /* Sort NFILES FILES onto OUTPUT_FILE. */
 
 static void
-sort (char * const *files, size_t nfiles, char const *output_file)
+sort (char * const *files, size_t nfiles, char const *output_file,
+      unsigned long int nthreads)
 {
   struct buffer buf;
   size_t ntemps = 0;
@@ -2940,8 +3361,22 @@ sort (char * const *files, size_t nfiles, char const 
*output_file)
       char const *file = *files;
       FILE *fp = xfopen (file, "r");
       FILE *tfp;
-      size_t bytes_per_line = (2 * sizeof (struct line)
-                               - sizeof (struct line) / 2);
+
+      size_t bytes_per_line;
+      if (nthreads > 1)
+        {
+          /* Get log P. */
+          unsigned long int tmp = 1;
+          size_t mult = 1;
+          while (tmp < nthreads)
+            {
+              tmp *= 2;
+              mult++;
+            }
+          bytes_per_line = (mult * sizeof (struct line));
+        }
+      else
+        bytes_per_line = sizeof (struct line) * 3 / 2;
 
       if (! buf.alloc)
         initbuf (&buf, bytes_per_line,
@@ -2953,7 +3388,6 @@ sort (char * const *files, size_t nfiles, char const 
*output_file)
       while (fillbuf (&buf, fp, file))
         {
           struct line *line;
-          struct line *linebase;
 
           if (buf.eof && nfiles
               && (bytes_per_line + 1
@@ -2967,9 +3401,6 @@ sort (char * const *files, size_t nfiles, char const 
*output_file)
             }
 
           line = buffer_linelim (&buf);
-          linebase = line - buf.nlines;
-          if (1 < buf.nlines)
-            sortlines (line, buf.nlines, linebase);
           if (buf.eof && !nfiles && !ntemps && !buf.left)
             {
               xfclose (fp, file);
@@ -2982,16 +3413,23 @@ sort (char * const *files, size_t nfiles, char const 
*output_file)
               ++ntemps;
               temp_output = create_temp (&tfp, NULL);
             }
-
-          do
+          if (1 < buf.nlines)
             {
-              line--;
-              write_bytes (line->text, line->length, tfp, temp_output);
-              if (unique)
-                while (linebase < line && compare (line, line - 1) == 0)
-                  line--;
+              struct merge_node_queue merge_queue;
+              queue_init (&merge_queue, 2 * nthreads);
+
+              pthread_spinlock_t lock;
+              pthread_spin_init (&lock, PTHREAD_PROCESS_PRIVATE);
+              struct merge_node node =
+                {NULL, NULL, NULL, NULL, NULL, buf.nlines,
+                 buf.nlines, MERGE_END, NULL, false, &lock};
+
+              sortlines (line, line, nthreads, buf.nlines, &node, true,
+                         &merge_queue, tfp, temp_output);
+              queue_destroy (&merge_queue);
             }
-          while (linebase < line);
+          else
+            write_unique (line - 1, tfp, temp_output);
 
           xfclose (tfp, temp_output);
 
@@ -3227,6 +3665,7 @@ main (int argc, char **argv)
   bool mergeonly = false;
   char *random_source = NULL;
   bool need_random = false;
+  unsigned long int nthreads = 0;
   size_t nfiles = 0;
   bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL);
   bool obsolete_usage = (posix2_version () < 200112);
@@ -3553,6 +3992,10 @@ main (int argc, char **argv)
           add_temp_dir (optarg);
           break;
 
+        case PARALLEL_OPTION:
+          nthreads = specify_nthreads (oi, c, optarg);
+          break;
+
         case 'u':
           unique = true;
           break;
@@ -3701,6 +4144,9 @@ main (int argc, char **argv)
 
   if (need_random)
     {
+      /* Threading does not lock the randread_source structure, so
+         downgrade to one thread to avoid race conditions. */
+      nthreads = 1;
       randread_source = randread_new (random_source, MD5_DIGEST_SIZE);
       if (! randread_source)
         die (_("open failed"), random_source);
@@ -3755,7 +4201,15 @@ main (int argc, char **argv)
       IF_LINT (free (sortfiles));
     }
   else
-    sort (files, nfiles, outfile);
+    {
+      /* If NTHREADS > number of cores on the machine, spinlocking
+         could be wasteful.  */
+      unsigned long int np2 = num_processors (NPROC_CURRENT_OVERRIDABLE);
+      if (!nthreads || nthreads > np2)
+        nthreads = np2;
+
+      sort (files, nfiles, outfile, nthreads);
+    }
 
   if (have_read_stdin && fclose (stdin) == EOF)
     die (_("close failed"), "-");
diff --git a/tests/Makefile.am b/tests/Makefile.am
index db1610d..26c2203 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -220,6 +220,7 @@ TESTS =                        \
   misc/shred-remove                \
   misc/shuf                    \
   misc/sort                    \
+  misc/sort-benchmark-random            \
   misc/sort-compress                \
   misc/sort-continue                \
   misc/sort-files0-from                \
diff --git a/tests/misc/sort-benchmark-random b/tests/misc/sort-benchmark-random
new file mode 100644
index 0000000..0c18f9f
--- /dev/null
+++ b/tests/misc/sort-benchmark-random
@@ -0,0 +1,57 @@
+#!/bin/sh
+# Benchmark sort on randomly generated data.
+
+# Copyright (C) 2009 Free Software Foundation, Inc.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+# Written by Glen Lenker.
+
+if test "$VERBOSE" = yes; then
+  set -x
+  sort --version
+fi
+
+. $srcdir/test-lib.sh
+
+very_expensive_
+
+perl -e '
+my $num_lines = 500000;
+my $length = 100;
+
+for (my $i=0; $i < $num_lines; $i++)
+{
+    for (my $j=0; $j < $length; $j++)
+    {
+      printf "%c", 32 + rand(94);
+    }
+    print "\n";
+}' > in || framework_failure
+
+# We need to generate a lot of data for sort to show a noticeable
+# improvement in performance. Sorting it in PERL may take awhile.
+
+perl -e '
+open (FILE, "<in");
+my @list = <FILE>;
+print sort(@list);
+close (FILE);
+' > exp || framework_failure
+
+time sort in > out || fail=1
+
+compare out exp || fail=1
+
+Exit $fail
-- 
1.6.3.3

Attachment: pq.patch
Description: Binary data


reply via email to

[Prev in Thread] Current Thread [Next in Thread]