pspp-cvs
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Pspp-cvs] Changes to pspp/src/sort.c


From: Ben Pfaff
Subject: [Pspp-cvs] Changes to pspp/src/sort.c
Date: Tue, 15 Mar 2005 01:04:14 -0500

Index: pspp/src/sort.c
diff -u pspp/src/sort.c:1.26 pspp/src/sort.c:1.27
--- pspp/src/sort.c:1.26        Mon Mar 14 06:54:40 2005
+++ pspp/src/sort.c     Tue Mar 15 06:04:10 2005
@@ -20,6 +20,7 @@
 #include <config.h>
 #include "sort.h"
 #include "error.h"
+#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <errno.h>
@@ -31,6 +32,7 @@
 #include "command.h"
 #include "error.h"
 #include "expressions/public.h"
+#include "glob.h"
 #include "lexer.h"
 #include "misc.h"
 #include "settings.h"
@@ -39,18 +41,6 @@
 #include "vfm.h"
 #include "vfmP.h"
 
-#if HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-
-#if HAVE_SYS_TYPES_H
-#include <sys/types.h>
-#endif
-
-#if HAVE_SYS_STAT_H
-#include <sys/stat.h>
-#endif
-
 #include "debug-print.h"
 
 /* Sort direction. */
@@ -75,6 +65,11 @@
     size_t crit_cnt;
   };
 
+/* These should only be changed for testing purposes. */
+static int min_buffers = 64;
+static int max_buffers = INT_MAX;
+static bool allow_internal_sort = true;
+
 static int compare_record (const struct ccase *, const struct ccase *,
                            const struct sort_criteria *);
 static struct casefile *do_internal_sort (struct casereader *,
@@ -87,7 +82,7 @@
 cmd_sort_cases (void)
 {
   struct sort_criteria *criteria;
-  int success;
+  bool success = false;
 
   lex_match (T_BY);
 
@@ -95,7 +90,30 @@
   if (criteria == NULL)
     return CMD_FAILURE;
 
+  if (test_mode && lex_match ('/')) 
+    {
+      if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
+          || !lex_force_int ())
+        goto done;
+
+      min_buffers = max_buffers = lex_integer ();
+      allow_internal_sort = false;
+      if (max_buffers < 2) 
+        {
+          msg (SE, _("Buffer limit must be at least 2."));
+          goto done;
+        }
+
+      lex_get ();
+    }
+
   success = sort_active_file_in_place (criteria);
+
+ done:
+  min_buffers = 64;
+  max_buffers = INT_MAX;
+  allow_internal_sort = true;
+  
   sort_destroy_criteria (criteria);
   return success ? lex_end_of_command () : CMD_FAILURE;
 }
@@ -252,9 +270,7 @@
 struct casefile *
 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
 {
-  struct casefile *output;
-
-  output = do_internal_sort (reader, criteria);
+  struct casefile *output = do_internal_sort (reader, criteria);
   if (output == NULL)
     output = do_external_sort (reader, criteria);
   casereader_destroy (reader);
@@ -271,7 +287,7 @@
 static int compare_indexed_cases (const void *, const void *, void *);
 
 /* If the data is in memory, do an internal sort and return a new
-   casefile for the data. */
+   casefile for the data.  Otherwise, return a null pointer. */
 static struct casefile *
 do_internal_sort (struct casereader *reader,
                   const struct sort_criteria *criteria)
@@ -280,6 +296,9 @@
   struct casefile *dst;
   unsigned long case_cnt;
 
+  if (!allow_internal_sort)
+    return NULL;
+
   src = casereader_get_casefile (reader);
   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
     return NULL;
@@ -335,52 +354,29 @@
 
 /* External sort. */
 
-/* Maximum order of merge.  If you increase this, then you should
-   use a heap for comparing cases during merge.  */
-#define MAX_MERGE_ORDER                7
-
-/* Minimum total number of records for buffers. */
-#define MIN_BUFFER_TOTAL_SIZE_RECS     64
-
-/* Minimum single input buffer size, in bytes and records. */
-#define MIN_BUFFER_SIZE_BYTES  4096
-#define MIN_BUFFER_SIZE_RECS   16
-
-#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
-#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
-#endif
-
-/* Sorts initial runs A and B in decending order by length. */
-static int
-compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
-{
-  struct casefile *const *a = a_;
-  struct casefile *const *b = b_;
-  unsigned long a_case_cnt = casefile_get_case_cnt (*a);
-  unsigned long b_case_cnt = casefile_get_case_cnt (*b);
-  
-  return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
-}
+/* Maximum order of merge (external sort only).  The maximum
+   reasonable value is about 7.  Above that, it would be a good
+   idea to use a heap in merge_once() to select the minimum. */
+#define MAX_MERGE_ORDER 7
 
 /* Results of an external sort. */
 struct external_sort 
   {
     const struct sort_criteria *criteria; /* Sort criteria. */
     size_t value_cnt;                 /* Size of data in `union value's. */
-    struct casefile **initial_runs;   /* Array of initial runs. */
+    struct casefile **runs;           /* Array of initial runs. */
     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
   };
 
 /* Prototypes for helper functions. */
-static int write_initial_runs (struct external_sort *, struct casereader *);
-static int merge (struct external_sort *);
+static int write_runs (struct external_sort *, struct casereader *);
+static struct casefile *merge (struct external_sort *);
 static void destroy_external_sort (struct external_sort *);
 
-/* Performs an external sort of the active file according to the
-   specification in SCP.  Forms initial runs using a heap as a
-   reservoir.  Determines the optimum merge pattern via Huffman's
-   method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
-   according to that pattern. */
+/* Performs a stable external sort of the active file according
+   to the specification in SCP.  Forms initial runs using a heap
+   as a reservoir.  Merges the initial runs according to a
+   pattern that assures stability. */
 static struct casefile *
 do_external_sort (struct casereader *reader,
                   const struct sort_criteria *criteria)
@@ -394,11 +390,10 @@
   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
   xsrt->run_cap = 512;
   xsrt->run_cnt = 0;
-  xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
-  if (write_initial_runs (xsrt, reader) && merge (xsrt))
+  xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
+  if (write_runs (xsrt, reader))
     {
-      struct casefile *output = xsrt->initial_runs[0];
-      xsrt->initial_runs[0] = NULL;
+      struct casefile *output = merge (xsrt);
       destroy_external_sort (xsrt);
       return output;
     }
@@ -418,8 +413,8 @@
       int i;
       
       for (i = 0; i < xsrt->run_cnt; i++)
-        casefile_destroy (xsrt->initial_runs[i]);
-      free (xsrt->initial_runs);
+        casefile_destroy (xsrt->runs[i]);
+      free (xsrt->runs);
       free (xsrt);
     }
 }
@@ -431,6 +426,7 @@
   {
     int run;                    /* Run number of case. */
     struct ccase record;        /* Case data. */
+    size_t idx;                 /* Case number (for stability). */
   };
 
 /* Represents a set of initial runs during an external sort. */
@@ -455,7 +451,8 @@
 static const struct case_sink_class sort_sink_class;
 
 static void destroy_initial_run_state (struct initial_run_state *);
-static void process_case (struct initial_run_state *, const struct ccase *);
+static void process_case (struct initial_run_state *, const struct ccase *,
+                          size_t);
 static int allocate_cases (struct initial_run_state *);
 static void output_record (struct initial_run_state *);
 static void start_run (struct initial_run_state *);
@@ -467,10 +464,11 @@
 
 /* Reads cases from READER and composes initial runs in XSRT. */
 static int
-write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
+write_runs (struct external_sort *xsrt, struct casereader *reader)
 {
   struct initial_run_state *irs;
   struct ccase c;
+  size_t idx = 0;
   int success = 0;
 
   /* Allocate memory for cases. */
@@ -489,7 +487,7 @@
   /* Create initial runs. */
   start_run (irs);
   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
-    process_case (irs, &c);
+    process_case (irs, &c, idx++);
   while (irs->okay && irs->record_cnt > 0)
     output_record (irs);
   end_run (irs);
@@ -504,18 +502,19 @@
 
 /* Add a single case to an initial run. */
 static void
-process_case (struct initial_run_state *irs, const struct ccase *c)
+process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
 {
-  struct record_run *new_record_run;
+  struct record_run *rr;
 
   /* Compose record_run for this run and add to heap. */
   assert (irs->record_cnt < irs->record_cap - 1);
-  new_record_run = irs->records + irs->record_cnt++;
-  case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
-  new_record_run->run = irs->run;
+  rr = irs->records + irs->record_cnt++;
+  case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
+  rr->run = irs->run;
+  rr->idx = idx;
   if (!case_is_null (&irs->last_output)
       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
-    new_record_run->run = irs->run + 1;
+    rr->run = irs->run + 1;
   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
              compare_record_run_minheap, irs);
 
@@ -557,6 +556,8 @@
                       + irs->xsrt->value_cnt * sizeof (union value)
                       + 4 * sizeof (void *));
   max_cases = get_max_workspace() / approx_case_cost;
+  if (max_cases > max_buffers)
+    max_cases = max_buffers;
   irs->records = malloc (sizeof *irs->records * max_cases);
   for (i = 0; i < max_cases; i++)
     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
@@ -567,12 +568,12 @@
   irs->record_cap = max_cases;
 
   /* Fail if we didn't allocate an acceptable number of cases. */
-  if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
+  if (irs->records == NULL || max_cases < min_buffers)
     {
       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
                 "cases of %d bytes each.  (PSPP workspace is currently "
                 "restricted to a maximum of %d KB.)"),
-          MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 
1024);
+          min_buffers, approx_case_cost, get_max_workspace() / 1024);
       return 0;
     }
   return 1;
@@ -612,16 +613,18 @@
 }
 
 /* Compares record-run tuples A and B on run number first, then
-   on the current record according to SCP. */
+   on record, then on case index. */
 static int
 compare_record_run (const struct record_run *a,
                     const struct record_run *b,
                     struct initial_run_state *irs)
 {
-  if (a->run != b->run)
-    return a->run > b->run ? 1 : -1;
-  else
-    return compare_record (&a->record, &b->record, irs->xsrt->criteria);
+  int result = a->run < b->run ? -1 : a->run > b->run;
+  if (result == 0)
+    result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
+  if (result == 0)
+    result = a->idx < b->idx ? -1 : a->idx > b->idx;
+  return result;
 }
 
 /* Compares record-run tuples A and B on run number first, then
@@ -653,14 +656,14 @@
   /* Record initial run. */
   if (irs->casefile != NULL) 
     {
+      casefile_sleep (irs->casefile);
       if (xsrt->run_cnt >= xsrt->run_cap) 
         {
           xsrt->run_cap *= 2;
-          xsrt->initial_runs
-            = xrealloc (xsrt->initial_runs,
-                        sizeof *xsrt->initial_runs * xsrt->run_cap);
+          xsrt->runs = xrealloc (xsrt->runs,
+                                 sizeof *xsrt->runs * xsrt->run_cap);
         }
-      xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
+      xsrt->runs[xsrt->run_cnt++] = irs->casefile;
       irs->casefile = NULL; 
     }
 }
@@ -705,193 +708,128 @@
 
 /* Merging. */
 
-/* State of merging initial runs. */
-struct merge_state 
-  {
-    struct external_sort *xsrt; /* External sort state. */
-    struct ccase *cases;        /* Buffers. */
-    size_t case_cnt;            /* Number of buffers. */
-  };
-
-struct run;
-static struct casefile *merge_once (struct merge_state *,
+static int choose_merge (struct casefile *runs[], int run_cnt, int order);
+static struct casefile *merge_once (struct external_sort *,
                                     struct casefile *[], size_t);
-static int mod (int, int);
 
-/* Performs a series of P-way merges of initial runs. */
-static int
+/* Repeatedly merges run until only one is left,
+   and returns the final casefile.  */
+static struct casefile *
 merge (struct external_sort *xsrt)
 {
-  struct merge_state mrg;       /* State of merge. */
-  size_t approx_case_cost;      /* Approximate memory cost of one case. */
-  int max_order;                /* Maximum order of merge. */
-  size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
-  int success = 0;
-  int i;
-
-  mrg.xsrt = xsrt;
-
-  /* Allocate as many cases as possible into cases. */
-  approx_case_cost = (sizeof *mrg.cases
-                      + xsrt->value_cnt * sizeof (union value)
-                      + 4 * sizeof (void *));
-  mrg.case_cnt = get_max_workspace() / approx_case_cost;
-  mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
-  if (mrg.cases == NULL)
-    goto done;
-  for (i = 0; i < mrg.case_cnt; i++) 
-    if (!case_try_create (&mrg.cases[i], xsrt->value_cnt)) 
-      {
-        mrg.case_cnt = i;
-        break;
-      }
-  if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
-    {
-      msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
-                "cases of %d bytes each.  (PSPP workspace is currently "
-                "restricted to a maximum of %d KB.)"),
-          MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 
1024);
-      return 0;
-    }
-
-  /* Determine maximum order of merge. */
-  max_order = MAX_MERGE_ORDER;
-  if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
-    max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
-  else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
-           < MIN_BUFFER_SIZE_BYTES)
-    max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
-                                / (xsrt->value_cnt * sizeof (union value)));
-  if (max_order < 2)
-    max_order = 2;
-  if (max_order > xsrt->run_cnt)
-    max_order = xsrt->run_cnt;
-
-  /* Repeatedly merge the P shortest existing runs until only one run
-     is left. */
-  make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
-             compare_initial_runs, NULL);
-  dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
-
-  assert (max_order > 0);
-  assert (max_order <= 2
-          || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
   while (xsrt->run_cnt > 1)
     {
-      struct casefile *output_run;
-      int order;
-      int i;
-
-      /* Choose order of merge (max_order after first merge). */
-      order = max_order - dummy_run_cnt;
-      dummy_run_cnt = 0;
-
-      /* Choose runs to merge. */
-      assert (xsrt->run_cnt >= order);
-      for (i = 0; i < order; i++) 
-        pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
-                  sizeof *xsrt->initial_runs,
-                  compare_initial_runs, NULL); 
-          
-      /* Merge runs. */
-      output_run = merge_once (&mrg,
-                               xsrt->initial_runs + xsrt->run_cnt, order);
-      if (output_run == NULL)
-        goto done;
-      
-      /* Add output run to heap. */
-      xsrt->initial_runs[xsrt->run_cnt++] = output_run;
-      push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
-                 compare_initial_runs, NULL);
+      int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
+      int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
+      xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
+      remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
+                    idx + 1, order - 1);
+      xsrt->run_cnt -= order - 1;
     }
-
-  /* Exactly one run is left, which contains the entire sorted
-     file.  We could use it to find a total case count. */
   assert (xsrt->run_cnt == 1);
-
-  success = 1;
-
- done:
-  for (i = 0; i < mrg.case_cnt; i++)
-    case_destroy (&mrg.cases[i]);
-  free (mrg.cases);
-
-  return success;
+  xsrt->run_cnt = 0;
+  return xsrt->runs[0];
 }
 
-/* Modulo function as defined by Knuth. */
+/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
+   and returns the index of the first one.
+
+   For stability, we must merge only consecutive runs.  For
+   efficiency, we choose the shortest consecutive sequence of
+   runs. */
 static int
-mod (int x, int y)
+choose_merge (struct casefile *runs[], int run_cnt, int order) 
 {
-  if (y == 0)
-    return x;
-  else if (x == 0)
-    return 0;
-  else if (x > 0 && y > 0)
-    return x % y;
-  else if (x < 0 && y > 0)
-    return y - (-x) % y;
-
-  abort ();
+  int min_idx, min_sum;
+  int cur_idx, cur_sum;
+  int i;
+
+  /* Sum up the length of the first ORDER runs. */
+  cur_sum = 0;
+  for (i = 0; i < order; i++)
+    cur_sum += casefile_get_case_cnt (runs[i]);
+
+  /* Find the shortest group of ORDER runs,
+     using a running total for efficiency. */
+  min_idx = 0;
+  min_sum = cur_sum;
+  for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
+    {
+      cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
+      cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
+      if (cur_sum < min_sum)
+        {
+          min_sum = cur_sum;
+          min_idx = cur_idx;
+        }
+    }
+
+  return min_idx;
 }
 
-/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
-   new run.  Returns nonzero only if successful.  Adds an entry
-   to MRG->xsrt->runs for the output file if and only if the
-   output file is actually created.  Always deletes all the input
-   files. */
+/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
+   new run, and returns the new run. */
 static struct casefile *
-merge_once (struct merge_state *mrg,
-            struct casefile *input_runs[],
+merge_once (struct external_sort *xsrt,
+            struct casefile **const input_files,
             size_t run_cnt)
 {
-  struct casereader *input_readers[MAX_MERGE_ORDER];
-  struct ccase input_cases[MAX_MERGE_ORDER];
-  struct casefile *output_casefile = NULL;
+  struct run
+    {
+      struct casefile *file;
+      struct casereader *reader;
+      struct ccase ccase;
+    }
+  *runs;
+
+  struct casefile *output = NULL;
   int i;
 
+  /* Open input files. */
+  runs = xmalloc (sizeof *runs * run_cnt);
   for (i = 0; i < run_cnt; i++) 
     {
-      input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
-      if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
+      struct run *r = &runs[i];
+      r->file = input_files[i];
+      r->reader = casefile_get_destructive_reader (r->file);
+      if (!casereader_read_xfer (r->reader, &r->ccase))
         {
           run_cnt--;
           i--;
         }
     }
-  
-  output_casefile = casefile_create (mrg->xsrt->value_cnt);
-  casefile_to_disk (output_casefile);
+
+  /* Create output file. */
+  output = casefile_create (xsrt->value_cnt);
+  casefile_to_disk (output);
 
   /* Merge. */
   while (run_cnt > 0) 
     {
-      size_t min_idx;
-
+      struct run *min_run, *run;
+      
       /* Find minimum. */
-      min_idx = 0;
-      for (i = 1; i < run_cnt; i++)
-       if (compare_record (&input_cases[i], &input_cases[min_idx],
-                            mrg->xsrt->criteria) < 0)
-          min_idx = i;
+      min_run = runs;
+      for (run = runs + 1; run < runs + run_cnt; run++)
+       if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
+          min_run = run;
 
       /* Write minimum to output file. */
-      casefile_append_xfer (output_casefile, &input_cases[min_idx]);
+      casefile_append_xfer (output, &min_run->ccase);
 
-      if (!casereader_read_xfer (input_readers[min_idx],
-                                 &input_cases[min_idx]))
+      /* Read another case from minimum run. */
+      if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
         {
-          casereader_destroy (input_readers[min_idx]);
-          casefile_destroy (input_runs[min_idx]);
+          casereader_destroy (min_run->reader);
+          casefile_destroy (min_run->file);
 
+          remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
           run_cnt--;
-          input_runs[min_idx] = input_runs[run_cnt];
-          input_readers[min_idx] = input_readers[run_cnt];
-          input_cases[min_idx] = input_cases[run_cnt];
         } 
     }
 
-  casefile_sleep (output_casefile);
+  casefile_sleep (output);
+  free (runs);
 
-  return output_casefile;
+  return output;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]