bug-hurd
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH] WIP: Fix ext2fs remount race condition


From: James Clarke
Subject: [PATCH] WIP: Fix ext2fs remount race condition
Date: Wed, 22 Jul 2015 01:46:48 +0100

---
 ext2fs/ext2fs.c          |  18 +++++++
 ext2fs/ext2fs.h          |   6 +++
 ext2fs/pager.c           |  35 +++++++++++++-
 fatfs/fatfs.h            |   2 +
 fatfs/pager.c            |  35 +++++++++++++-
 libdiskfs/disk-pager.c   |   3 +-
 libdiskfs/diskfs-pager.h |   1 +
 libpager/demuxer.c       | 122 +++++++++++++++++++++++++++++++++++++++++++----
 libpager/pager.h         |  23 ++++++++-
 9 files changed, 230 insertions(+), 15 deletions(-)

diff --git a/ext2fs/ext2fs.c b/ext2fs/ext2fs.c
index d0fdfe7..200c210 100644
--- a/ext2fs/ext2fs.c
+++ b/ext2fs/ext2fs.c
@@ -207,10 +207,28 @@ main (int argc, char **argv)
 error_t
 diskfs_reload_global_state ()
 {
+  error_t err;
+
   pokel_flush (&global_pokel);
   pager_flush (diskfs_disk_pager, 1);
+
+  /* Paging must specifically be inhibited; if not, a paging request
+     could be handled while sblock is still NULL.
+     While some RPCs are inhibited when this function is called by
+     libdiskfs, paging RPCs are still enabled. Even if we were to
+     inhibit paging RPCs, libpager has its own pool of workers to handle
+     requests asynchronously, which libports is unaware of, so requests
+     can be handled even after the relevant RPCs are disabled.  This is
+     all dealt with by {inhibit,resume}_disk_pager.  */
+  err = inhibit_disk_pager ();
+  if (err)
+    return err;
+
   sblock = NULL;
   get_hypermetadata ();
   map_hypermetadata ();
+
+  resume_disk_pager ();
+
   return 0;
 }
diff --git a/ext2fs/ext2fs.h b/ext2fs/ext2fs.h
index 96d8e9d..d5f400e 100644
--- a/ext2fs/ext2fs.h
+++ b/ext2fs/ext2fs.h
@@ -201,6 +201,12 @@ struct user_pager_info
 /* Set up the disk pager.  */
 void create_disk_pager (void);
 
+/* Inhibit the disk pager.  */
+error_t inhibit_disk_pager (void);
+
+/* Resume the disk pager.  */
+void resume_disk_pager (void);
+
 /* Call this when we should turn off caching so that unused memory object
    ports get freed.  */
 void drop_pager_softrefs (struct node *node);
diff --git a/ext2fs/pager.c b/ext2fs/pager.c
index b56c923..f41107f 100644
--- a/ext2fs/pager.c
+++ b/ext2fs/pager.c
@@ -34,6 +34,10 @@ struct port_bucket *disk_pager_bucket;
 /* A ports bucket to hold file pager ports.  */
 struct port_bucket *file_pager_bucket;
 
+/* Stores a reference to the requests instance used by the file pager so its
+   worker threads can be inhibited and resumed.  */
+struct requests *file_pager_requests;
+
 pthread_spinlock_t node_to_page_lock = PTHREAD_SPINLOCK_INITIALIZER;
 
 
@@ -1217,11 +1221,40 @@ create_disk_pager (void)
   file_pager_bucket = ports_create_bucket ();
 
   /* Start libpagers worker threads.  */
-  err = pager_start_workers (file_pager_bucket);
+  err = pager_start_workers (&file_pager_requests, file_pager_bucket);
   if (err)
     ext2_panic ("can't create libpager worker threads: %s", strerror (err));
 }
 
+error_t
+inhibit_disk_pager (void)
+{
+  error_t err;
+
+  /* Inhibiting RPCs is not sufficient, nor is it in fact necessary.
+     Since libpager has its own pool of workers, requests can still be
+     handled after RPCs have been inhibited, so pager_inhibit_workers
+     must be used. In fact, RPCs will not be inhibited; the requests
+     will just queue up inside libpager, and will be handled once the
+     workers are resumed.
+     The file pager can rely on the disk pager, so inhibit the file
+     pager first.  */
+
+  err = pager_inhibit_workers (file_pager_requests);
+  if (err)
+    return err;
+
+  err = pager_inhibit_workers (diskfs_disk_pager_requests);
+  return err;
+}
+
+void
+resume_disk_pager (void)
+{
+  pager_resume_workers (diskfs_disk_pager_requests);
+  pager_resume_workers (file_pager_requests);
+}
+
 /* Call this to create a FILE_DATA pager and return a send right.
    NODE must be locked.  */
 mach_port_t
diff --git a/fatfs/fatfs.h b/fatfs/fatfs.h
index 3c3d836..54c3426 100644
--- a/fatfs/fatfs.h
+++ b/fatfs/fatfs.h
@@ -121,6 +121,8 @@ extern struct dirrect dr_root_node;
 void drop_pager_softrefs (struct node *);
 void allow_pager_softrefs (struct node *);
 void create_fat_pager (void);
+error_t inhibit_fat_pager (void);
+void resume_fat_pager (void);
 
 void flush_node_pager (struct node *node);
 
diff --git a/fatfs/pager.c b/fatfs/pager.c
index 10d1fc9..3d860d1 100644
--- a/fatfs/pager.c
+++ b/fatfs/pager.c
@@ -29,6 +29,10 @@ struct port_bucket *disk_pager_bucket;
 /* A ports bucket to hold file pager ports.  */
 struct port_bucket *file_pager_bucket;
 
+/* Stores a reference to the requests instance used by the file pager so its
+   worker threads can be inhibited and resumed.  */
+struct requests *file_pager_requests;
+
 /* Mapped image of the FAT.  */
 void *fat_image;
 
@@ -776,11 +780,40 @@ create_fat_pager (void)
   file_pager_bucket = ports_create_bucket ();
 
   /* Start libpagers worker threads.  */
-  err = pager_start_workers (file_pager_bucket);
+  err = pager_start_workers (&file_pager_requests, file_pager_bucket);
   if (err)
     error (2, err, "can't create libpager worker threads");
 }
 
+error_t
+inhibit_fat_pager (void)
+{
+  error_t err;
+
+  /* Inhibiting RPCs is not sufficient, nor is it in fact necessary.
+     Since libpager has its own pool of workers, requests can still be
+     handled after RPCs have been inhibited, so pager_inhibit_workers
+     must be used. In fact, RPCs will not be inhibited; the requests
+     will just queue up inside libpager, and will be handled once the
+     workers are resumed.
+     The file pager can rely on the disk pager, so inhibit the file
+     pager first.  */
+
+  err = pager_inhibit_workers (file_pager_requests);
+  if (err)
+    return err;
+
+  err = pager_inhibit_workers (diskfs_disk_pager_requests);
+  return err;
+}
+
+void
+resume_fat_pager (void)
+{
+  pager_resume_workers (diskfs_disk_pager_requests);
+  pager_resume_workers (file_pager_requests);
+}
+
 /* Call this to create a FILE_DATA pager and return a send right.
    NODE must be locked.  */
 mach_port_t
diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c
index 008aa2d..2cb33d4 100644
--- a/libdiskfs/disk-pager.c
+++ b/libdiskfs/disk-pager.c
@@ -24,6 +24,7 @@
 __thread struct disk_image_user *diskfs_exception_diu;
 
 struct pager *diskfs_disk_pager;
+struct requests *diskfs_disk_pager_requests;
 
 static void fault_handler (int sig, long int sigcode, struct sigcontext *scp);
 static struct hurd_signal_preemptor preemptor =
@@ -43,7 +44,7 @@ diskfs_start_disk_pager (struct user_pager_info *upi,
   mach_port_t disk_pager_port;
 
   /* Start libpagers worker threads.  */
-  err = pager_start_workers (pager_bucket);
+  err = pager_start_workers (&diskfs_disk_pager_requests, pager_bucket);
   if (err)
     error (2, err, "creating pager worker threads failed");
 
diff --git a/libdiskfs/diskfs-pager.h b/libdiskfs/diskfs-pager.h
index a253069..db99f9ff 100644
--- a/libdiskfs/diskfs-pager.h
+++ b/libdiskfs/diskfs-pager.h
@@ -40,6 +40,7 @@ extern void diskfs_start_disk_pager (struct user_pager_info 
*info,
                                     size_t size, void **image);
 
 extern struct pager *diskfs_disk_pager;
+extern struct requests *diskfs_disk_pager_requests;
 
 struct disk_image_user
   {
diff --git a/libpager/demuxer.c b/libpager/demuxer.c
index 4dd3cd8..524705c 100644
--- a/libpager/demuxer.c
+++ b/libpager/demuxer.c
@@ -71,9 +71,16 @@ struct worker
 struct requests
 {
   struct port_bucket *bucket;
-  struct queue queue;
+  /* Normally, both queues are the same.  However, when the workers are
+     inhibited, a new queue_in is created, but queue_out is left as the
+     old value, so the workers drain queue_out but do not receive new
+     requests.  */
+  struct queue *queue_in;      /* the queue to add to */
+  struct queue *queue_out;     /* the queue to take from */
   int asleep;
   pthread_cond_t wakeup;
+  pthread_cond_t inhibit_wakeup;
+  pthread_cond_t resume_wakeup;
   pthread_mutex_t lock;
   struct worker workers[WORKER_COUNT];
 };
@@ -108,10 +115,10 @@ pager_demuxer (struct requests *requests,
 
   pthread_mutex_lock (&requests->lock);
 
-  queue_enqueue (&requests->queue, &r->item);
+  queue_enqueue (requests->queue_in, &r->item);
 
-  /* Awake worker.  */
-  if (requests->asleep > 0)
+  /* Awake worker, but only if not inhibited.  */
+  if (requests->asleep > 0 && requests->queue_in == requests->queue_out)
       pthread_cond_signal (&requests->wakeup);
 
   pthread_mutex_unlock (&requests->lock);
@@ -186,9 +193,11 @@ worker_func (void *arg)
 
     get_request_locked:
       /* ... get a request from the global queue instead.  */
-      while ((r = queue_dequeue (&requests->queue)) == NULL)
+      while ((r = queue_dequeue (requests->queue_out)) == NULL)
        {
          requests->asleep += 1;
+         if (requests->asleep == WORKER_COUNT)
+           pthread_cond_broadcast (&requests->inhibit_wakeup);
          pthread_cond_wait (&requests->wakeup, &requests->lock);
          requests->asleep -= 1;
        }
@@ -298,27 +307,48 @@ service_paging_requests (void *arg)
 
 /* Start the worker threads libpager uses to service requests.  */
 error_t
-pager_start_workers (struct port_bucket *pager_bucket)
+pager_start_workers (struct requests **out_requests,
+                    struct port_bucket *pager_bucket)
 {
   error_t err;
   int i;
   pthread_t t;
   struct requests *requests;
 
+  if (out_requests == NULL)
+    /* Return rather than using goto done, since that would dereference
+       out_requests.  */
+    return EINVAL;
+
   requests = malloc (sizeof *requests);
   if (requests == NULL)
-    return ENOMEM;
+    {
+      err = ENOMEM;
+      goto done;
+    }
 
   requests->bucket = pager_bucket;
   requests->asleep = 0;
-  queue_init (&requests->queue);
+
+  requests->queue_in = malloc (sizeof *requests->queue_in);
+  if (requests->queue_in == NULL)
+    {
+      err = ENOMEM;
+      goto done;
+    }
+  queue_init (requests->queue_in);
+  /* Until the workers are inhibited, both queues are the same.  */
+  requests->queue_out = requests->queue_in;
+
   pthread_cond_init (&requests->wakeup, NULL);
+  pthread_cond_init (&requests->inhibit_wakeup, NULL);
+  pthread_cond_init (&requests->resume_wakeup, NULL);
   pthread_mutex_init (&requests->lock, NULL);
 
   /* Make a thread to service paging requests.  */
   err = pthread_create (&t, NULL, service_paging_requests, requests);
   if (err)
-    return err;
+    goto done;
   pthread_detach (t);
 
   for (i = 0; i < WORKER_COUNT; i++)
@@ -329,9 +359,81 @@ pager_start_workers (struct port_bucket *pager_bucket)
 
       err = pthread_create (&t, NULL, &worker_func, &requests->workers[i]);
       if (err)
-       return err;
+       goto done;
       pthread_detach (t);
     }
 
+done:
+  if (err)
+    *out_requests = NULL;
+  else
+    *out_requests = requests;
+
   return err;
 }
+
+error_t
+pager_inhibit_workers (struct requests *requests)
+{
+  error_t err;
+
+  pthread_mutex_lock (&requests->lock);
+
+  /* Check the workers are not already inhibited nor in the process of
+     being inhibited, and only create a new queue if necessary;
+     otherwise the queued requests would be discarded, and queue_out
+     would be leaked.  */
+  if (requests->queue_out == requests->queue_in)
+    {
+      /* Any new paging requests will go into a new queue.  */
+      struct queue *new_queue = malloc (sizeof *new_queue);
+      if (new_queue == NULL)
+       {
+         err = ENOMEM;
+         goto done_locked;
+       }
+      queue_init (new_queue);
+      requests->queue_in = new_queue;
+    }
+
+  /* Wait until all the workers are asleep, as then the old queue and
+     all individual worker queues have been drained.  */
+  while (requests->asleep < WORKER_COUNT)
+    pthread_cond_wait (&requests->inhibit_wakeup, &requests->lock);
+
+  pthread_cond_broadcast (&requests->resume_wakeup);
+
+done_locked:
+  pthread_mutex_unlock (&requests->lock);
+  return err;
+}
+
+void
+pager_resume_workers (struct requests *requests)
+{
+  pthread_mutex_lock (&requests->lock);
+
+  /* Check the workers are inhibited/being inhibited.  */
+  if (requests->queue_out != requests->queue_in)
+    {
+      /* If the inhibiting has not yet finished (the old queue has not
+        drained), wait for it to do so.  */
+      while (requests->asleep < WORKER_COUNT)
+       pthread_cond_wait (&requests->resume_wakeup, &requests->lock);
+
+      /* Another resume may have run in the meantime, in which case the
+        old queue has already been freed, so queue_out should not be
+        freed and updated to be queue_in.  */
+      if (requests->queue_out != requests->queue_in)
+       {
+         /* The queue has been drained and will no longer be used.  */
+         free (requests->queue_out);
+         requests->queue_out = requests->queue_in;
+         /* We need to wake up all workers, as there could be multiple
+            requests in the new queue.  */
+         pthread_cond_broadcast (&requests->wakeup);
+       }
+    }
+
+  pthread_mutex_unlock (&requests->lock);
+}
diff --git a/libpager/pager.h b/libpager/pager.h
index fe34238..8a99c30 100644
--- a/libpager/pager.h
+++ b/libpager/pager.h
@@ -25,8 +25,27 @@
    scope.  */
 struct user_pager_info;
 
-/* Start the worker threads libpager uses to service requests.  */
-error_t pager_start_workers (struct port_bucket *pager_bucket);
+struct requests;
+
+/* Start the worker threads libpager uses to service requests. If no
+   error is returned, *requests will be a valid pointer, else it will be
+   set to NULL. Returns EINVAL if requests is NULL.  */
+error_t
+pager_start_workers (struct requests **requests,
+                    struct port_bucket *pager_bucket);
+
+/* Inhibits the worker threads libpager uses to service requests,
+   blocking until all requests sent before this function is called have
+   finished. Note that RPCs will not be inhibited, so new requests will
+   queue up, but will not be handled until the workers are resumed. If
+   RPCs should be inhibited as well, call ports_inhibit_bucket_rpcs with
+   the bucket used to create the workers before calling this.  */
+error_t
+pager_inhibit_workers (struct requests *requests);
+
+/* Resumes the worker threads libpager uses to service requests.  */
+void
+pager_resume_workers (struct requests *requests);
 
 /* Create a new pager.  The pager will have a port created for it
    (using libports, in BUCKET) and will be immediately ready
-- 
2.4.6




reply via email to

[Prev in Thread] Current Thread [Next in Thread]