gnokii-users
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: smsd, postgresql and large numbers of messages - bugs?


From: drbob
Subject: Re: smsd, postgresql and large numbers of messages - bugs?
Date: Sun, 27 Mar 2011 02:12:42 +0000 (UTC)
User-agent: Pan/0.134 (Wait for Me; Unknown)

On Sat, 26 Mar 2011 20:08:05 +0000, drbob wrote:

> 
> Perhaps better would be to select all the unprocessed rows (without the
> FOR UPDATE), then within the for loop that iterates over all the
> unprocessed items issue a SELECT FOR UPDATE for the individual message
> row being dealt with at that time (with the NOWAIT option so we can move
> on to the next message if it's already locked). We'd also need to
> recheck the processed attribut in case another instance of smsd already
> got to that row.
> 
> I believe this would allow multiple instances of smsd to work
> concurrently on a large message queue.
> 
> Again, any comments welcome.

OK, I've no idea if these patches will work but here goes - they are TOTALLY 
UNTESTED 
(I don't have access to a test machine for gnokki right now) though they do 
compile 
(there could well be errors in the new SQL)

They try to make the changes described above.

I'll try to actually test the code in the morning and report back here.

As always any comments welcome.

Patch against 0.6.30:

--- pq.c        2007-05-07 19:05:51.000000000 +0100
+++ pq.c.new    2011-03-27 02:37:44.000000000 +0100
@@ -164,12 +164,9 @@
 
   buf = g_string_sized_new (128);
 
-  res1 = PQexec (connOut, "BEGIN");
-  PQclear (res1);
-
   g_string_sprintf (buf, "SELECT id, number, text, dreport FROM %s.outbox \
                           WHERE processed='f' AND localtime(0) >= not_before \
-                          AND localtime(0) <= not_after %s FOR UPDATE",
+                          AND localtime(0) <= not_after %s",
                           schema, phnStr->str);
   g_string_free (phnStr, TRUE);
 
@@ -187,8 +184,35 @@
 
   for (i = 0; i < PQntuples (res1); i++)
   {
+    res2 = PQexec (connOut, "BEGIN");
+    if (!res2 || PQresultStatus (res2) != PGRES_COMMAND_OK)
+    {
+      g_print (_("%d: BEGIN command failed.\n"), __LINE__);
+      gn_log_xdebug ("%s\n", buf->str);
+      g_print (_("Error: %s\n"), PQerrorMessage (connOut));
+      PQclear(res2);
+      continue;
+    }
+    PQclear(res2);
+    g_string_printf (buf,"SELECT id, number, text, dreport FROM %s.outbox \
+                          WHERE processed='f' AND id=%s AND localtime(0) >= 
not_before \
+                          AND localtime(0) <= not_after FOR UPDATE NOWAIT",
+                          schema,PQgetvalue (res1, i, 0));
+
+    res2 = PQexec (connOut, buf->str);
+    if (!res2 || PQresultStatus (res2) != PGRES_COMMAND_OK || PQntuples(res2) 
< 1)
+    {
+      g_print (_("%d: SELECT FOR UPDATE (row lock) command failed.\n"), 
__LINE__);
+      gn_log_xdebug ("%s\n", buf->str);
+      g_print (_("Error: %s\n"), PQerrorMessage (connOut));
+      PQclear(res2);
+      res2 = PQexec (connOut, "COMMIT");
+      PQclear (res2);
+      continue;
+    }
+
     gn_sms sms;
-    
+   
     gn_sms_default_submit (&sms);
     memset (&sms.remote.number, 0, sizeof (sms.remote.number));
     sms.delivery_report = atoi (PQgetvalue (res1, i, 3));
@@ -230,12 +254,18 @@
     }
 
     PQclear (res2);
+
+    res2 = PQexec (connOut, "COMMIT");
+    if (!res2 || PQresultStatus (res2) != PGRES_COMMAND_OK)
+    {
+      g_print (_("%d: COMMIT command failed.\n"), __LINE__);
+      gn_log_xdebug ("%s\n", buf->str);
+      g_print (_("Error: %s\n"), PQerrorMessage (connOut));
+    }
+    PQclear(res2);
   }
 
   PQclear (res1);
-
-  res1 = PQexec(connOut, "COMMIT");
-
   g_string_free(buf, TRUE);
-  PQclear (res1);
-}
+
+}
\ No newline at end of file

=======================================================================

Patch against git master:

--- pq.c        2011-03-26 22:01:31.000000000 +0000
+++ pq2.c       2011-03-26 22:33:52.000000000 +0000
@@ -360,7 +360,7 @@
 
   g_string_printf (buf, "SELECT id, number, text, dreport FROM %s.outbox \
                          WHERE processed='f' AND localtime(0) >= not_before \
-                         AND localtime(0) <= not_after %s FOR UPDATE",
+                         AND localtime(0) <= not_after %s",
                    schema, phnStr->str);
   g_string_free (phnStr, TRUE);
 
@@ -376,7 +376,34 @@
   }
 
   for (i = 0; i < PQntuples (res1); i++)
-  {
+  {    
+    res2 = PQexec (connOut, "BEGIN");
+    if (!res2 || PQresultStatus (res2) != PGRES_COMMAND_OK)
+    {
+      g_print (_("%d: BEGIN command failed.\n"), __LINE__);   
+      gn_log_xdebug ("%s\n", buf->str);
+      g_print (_("Error: %s\n"), PQerrorMessage (connOut));
+      PQclear(res2);
+      continue;
+    }
+    PQclear(res2);
+    g_string_printf (buf,"SELECT id, number, text, dreport FROM %s.outbox \
+                          WHERE processed='f' AND id=%s AND localtime(0) >= 
not_before \
+                          AND localtime(0) <= not_after FOR UPDATE NOWAIT",
+                          schema,PQgetvalue (res1, i, 0));
+
+    res2 = PQexec (connOut, buf->str);
+    if (!res2 || PQresultStatus (res2) != PGRES_COMMAND_OK || PQntuples(res2) 
< 1)
+    {
+      g_print (_("%d: SELECT FOR UPDATE (row lock) command failed.\n"), 
__LINE__);   
+      gn_log_xdebug ("%s\n", buf->str);
+      g_print (_("Error: %s\n"), PQerrorMessage (connOut));
+      PQclear(res2);
+      res2 = PQexec (connOut, "COMMIT");
+      PQclear (res2);
+      continue;
+    }
+
     gn_sms sms;
     
     gn_sms_default_submit (&sms);
@@ -419,8 +446,16 @@
       gn_log_xdebug ("%s\n", buf->str);
       g_print (_("Error: %s\n"), PQerrorMessage (connOut));
     }
-
     PQclear (res2);
+       
+    res2 = PQexec (connOut, "COMMIT");
+    if (!res2 || PQresultStatus (res2) != PGRES_COMMAND_OK)
+    {
+      g_print (_("%d: COMMIT command failed.\n"), __LINE__);   
+      gn_log_xdebug ("%s\n", buf->str);
+      g_print (_("Error: %s\n"), PQerrorMessage (connOut));      
+    }
+    PQclear(res2);
   }
 
   PQclear (res1);






reply via email to

[Prev in Thread] Current Thread [Next in Thread]