[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Maposmatic-dev] [PATCH maposmatic 2/3] Revamp the job renderer module w
From: |
Maxime Petazzoni |
Subject: |
[Maposmatic-dev] [PATCH maposmatic 2/3] Revamp the job renderer module with timeout support |
Date: |
Fri, 5 Feb 2010 09:20:30 +0100 |
The render module, originally a simple extract of the render_job
function from the old daemon, has been rewriten to expose a more
sensible API and to provide self-contained timeout support.
Two classes are now available, exposing the same public API for seamless
use of any of the two:
* JobRenderer is a simple, blocking job renderer. It can be threaded
if the caller decides to call start() instead of run().
* TimingOutJobRenderer, as its name suggests, is a timing out job
renderer that makes use of the threading capability of JobRenderer
to handle a timeout on the rendering process and kill the rendering
thread, whatever it is doing, if the given timeout is reached.
The render module also now exposes a few public RESULT_ constants that
can be used to identify the result of a job rendering. This is used in
the daemon to infer an appropriate resultmsg.
As a standalone process, the job renderer now takes an optional second
argument: the timeout, in seconds.
.../scripts/wrapper.py scripts/render.py <jobid> [timeout]
As of now, the daemon is fully usable in production with the same level
of functionality as before.
Signed-off-by: Maxime Petazzoni <address@hidden>
---
scripts/daemon.py | 72 ++++++++++++---------
scripts/render.py | 187 +++++++++++++++++++++++++++++++++++++++--------------
2 files changed, 179 insertions(+), 80 deletions(-)
diff --git a/scripts/daemon.py b/scripts/daemon.py
index 1639034..95d3327 100755
--- a/scripts/daemon.py
+++ b/scripts/daemon.py
@@ -23,29 +23,38 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
-import time
import sys
import threading
+import time
import render
from www.maposmatic.models import MapRenderingJob
from www.settings import LOG
from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_MAX_SIZE_GB
-RESULT_SUCCESSFULL = 'ok'
-RESULT_INTERRUPTED = 'rendering interrupted'
-RESULT_FAILED = 'rendering failed'
-RESULT_CANCELED = 'rendering took too long, canceled'
+_DEFAULT_CLEAN_FREQUENCY = 20 # Clean thread polling frequency, in
+ # seconds.
+_DEFAULT_POLL_FREQUENCY = 10 # Daemon job polling frequency, in seconds
+
+_RESULT_MSGS = {
+ render.RESULT_SUCCESS: 'ok',
+ render.RESULT_KEYBOARD_INTERRUPT: 'rendering interrupted',
+ render.RESULT_RENDERING_EXCEPTION: 'rendering failed',
+ render.RESULT_TIMEOUT_REACHED: 'rendering took too long, canceled'
+}
class MapOSMaticDaemon:
"""
This is a basic rendering daemon, base class for the different
implementations of rendering scheduling. By default, it acts as a
standalone, single-process MapOSMatic rendering daemon.
+
+ It of course uses the TimingOutJobRenderer to ensure no long-lasting job
+ stalls the queue.
"""
- def __init__(self, frequency):
- self.frequency = 10
+ def __init__(self, frequency=_DEFAULT_POLL_FREQUENCY):
+ self.frequency = frequency
LOG.info("MapOSMatic rendering daemon started.")
self.rollback_orphaned_jobs()
@@ -73,31 +82,32 @@ class MapOSMaticDaemon:
LOG.info("MapOSMatic rendering daemon terminating.")
def dispatch(self, job):
- """Dispatch the given job. In this simple single-process daemon, this
- is as simple as rendering it."""
- self.render(job)
+ """In this simple single-process daemon, dispatching is as easy as
+ calling the render() method. Subclasses probably want to overload this
+ method too and implement a more clever dispatching mechanism.
+
+ Args:
+ job (MapRenderingJob): the job to process and render.
- def render(self, job):
- """Render the given job, handling the different rendering outcomes
- (success or failures)."""
+ Returns True if the rendering was successful, False otherwise.
+ """
- LOG.info("Rendering job #%d '%s'..." %
- (job.id, job.maptitle))
- job.start_rendering()
+ return self.render(job, 'maposmaticd_%d_' % os.getpid())
+
+ def render(self, job, prefix=None):
+ """Render the given job using a timing out job renderer.
- ret = render.render_job(job)
- if ret == 0:
- msg = RESULT_SUCCESSFULL
- LOG.info("Finished rendering of job #%d." % job.id)
- elif ret == 1:
- msg = RESULT_INTERRUPTED
- LOG.info("Rendering of job #%d interrupted!" % job.id)
- else:
- msg = RESULT_FAILED
- LOG.info("Rendering of job #%d failed (exception occurred)!" %
- job.id)
+ Args:
+ job (MapRenderingJob): the job to process and render.
+
+ Returns True if the rendering was successful, False otherwise.
+ """
- job.end_rendering(msg)
+ renderer = render.TimingOutJobRenderer(job, prefix=prefix)
+ job.start_rendering()
+ ret = renderer.run()
+ job.end_rendering(_RESULT_MSGS[ret])
+ return ret == 0
class RenderingsGarbageCollector(threading.Thread):
@@ -107,7 +117,7 @@ class RenderingsGarbageCollector(threading.Thread):
of RENDERING_RESULT_MAX_SIZE_GB.
"""
- def __init__(self, frequency=20):
+ def __init__(self, frequency=_DEFAULT_CLEAN_FREQUENCY):
threading.Thread.__init__(self)
self.frequency = frequency
@@ -217,8 +227,8 @@ if __name__ == '__main__':
"Please use a valid RENDERING_RESULT_PATH.")
sys.exit(1)
- daemon = MapOSMaticDaemon(10)
- cleaner = RenderingsGarbageCollector(20)
+ cleaner = RenderingsGarbageCollector()
+ daemon = MapOSMaticDaemon()
cleaner.start()
daemon.serve()
diff --git a/scripts/render.py b/scripts/render.py
index 32ae347..ea370f6 100755
--- a/scripts/render.py
+++ b/scripts/render.py
@@ -25,77 +25,166 @@
import Image
import os
import sys
+import threading
from ocitysmap.coords import BoundingBox
from ocitysmap.street_index import OCitySMap
from www.maposmatic.models import MapRenderingJob
-from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS
+from www.settings import LOG
from www.settings import OCITYSMAP_CFG_PATH
+from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS
-def render_job(job, prefix=None):
- """Renders the given job, encapsulating all processing errors and
- exceptions.
+RESULT_SUCCESS = 0
+RESULT_KEYBOARD_INTERRUPT = 1
+RESULT_RENDERING_EXCEPTION = 2
+RESULT_TIMEOUT_REACHED = 3
- This does not affect the job entry in the database in any way. It's the
- responsibility of the caller to do maintain the job status in the database.
+class TimingOutJobRenderer:
+ """
+ The TimingOutJobRenderer is a wrapper around JobRenderer implementing
+ timeout management. It uses JobRenderer as a thread, and tries to join it
+ for the given timeout. If the timeout is reached, the thread is suspended,
+ cleaned up and killed.
- Returns:
- * 0 on success;
- * 1 on ^C;
- * 2 on a rendering exception from OCitySMap.
+ The TimingOutJobRenderer has exactly the same API as the non-threading
+ JobRenderer, so it can be used in place of JobRenderer very easily.
"""
- if job.administrative_city is None:
- bbox = BoundingBox(job.lat_upper_left, job.lon_upper_left,
- job.lat_bottom_right, job.lon_bottom_right)
- renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
- map_areas_prefix=prefix,
- boundingbox=bbox,
- language=job.map_language)
- else:
- renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
- map_areas_prefix=prefix,
- osmid=job.administrative_osmid,
- language=job.map_language)
-
- prefix = os.path.join(RENDERING_RESULT_PATH, job.files_prefix())
+ def __init__(self, job, timeout=1200, prefix=None):
+ """Initializes this TimingOutJobRenderer with a given job and a
timeout.
+
+ Args:
+ job (MapRenderingJob): the job to render.
+ timeout (int): a timeout, in seconds (defaults to 20 minutes).
+ prefix (string): renderer map_areas table prefix.
+ """
+
+ self.__timeout = timeout
+ self.__thread = JobRenderer(job, prefix)
+
+ def run(self):
+ """Renders the job using a JobRendered, encapsulating all processing
+ errors and exceptions, with the addition here of a processing timeout.
+
+ Returns one of the RESULT_ constants.
+ """
+
+ self.__thread.start()
+ self.__thread.join(self.__timeout)
+
+ # If the thread is no longer alive, the timeout was not reached and all
+ # is well.
+ if not self.__thread.isAlive():
+ return self.__thread.result
+
+ LOG.info("Rendering of job #%d took too long (timeout reached)!" %
+ self.__thread.job.id)
+
+ # Remove the job files
+ self.__thread.job.remove_all_files()
+
+ # Kill the thread and return TIMEOUT_REACHED
+ del self.__thread
+ return RESULT_TIMEOUT_REACHED
+
+class JobRenderer(threading.Thread):
+ """
+ A simple, blocking job rendered. It can be used as a thread, or directly in
+ the main processing path of the caller if it chooses to call run()
+ directly.
+ """
+
+ def __init__(self, job, prefix):
+ threading.Thread.__init__(self)
+ self.job = job
+ self.prefix = prefix
+ self.result = None
+
+ def run(self):
+ """Renders the given job, encapsulating all processing errors and
+ exceptions.
+
+ This does not affect the job entry in the database in any way. It's the
+ responsibility of the caller to do maintain the job status in the
+ database.
+
+ Returns one of the RESULT_ constants.
+ """
+
+ LOG.info("Rendering job #%d '%s'..." % (self.job.id,
self.job.maptitle))
+
+ if self.job.administrative_city is None:
+ bbox = BoundingBox(self.job.lat_upper_left,
+ self.job.lon_upper_left,
+ self.job.lat_bottom_right,
+ self.job.lon_bottom_right)
+ renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
+ map_areas_prefix=self.prefix,
+ boundingbox=bbox,
+ language=self.job.map_language)
+ else:
+ renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
+ map_areas_prefix=self.prefix,
+ osmid=self.job.administrative_osmid,
+ language=self.job.map_language)
+
+ prefix = os.path.join(RENDERING_RESULT_PATH, self.job.files_prefix())
+
+ try:
+ # Render the map in all RENDERING_RESULT_FORMATS
+ result = renderer.render_map_into_files(self.job.maptitle, prefix,
+ RENDERING_RESULT_FORMATS,
+ 'zoom:16')
+
+ # Render the index in all RENDERING_RESULT_FORMATS, using the
+ # same map size.
+ renderer.render_index(self.job.maptitle, prefix,
+ RENDERING_RESULT_FORMATS,
+ result.width, result.height)
+
+ # Create thumbnail
+ if 'png' in RENDERING_RESULT_FORMATS:
+ img = Image.open(prefix + '.png')
+ img.thumbnail((200, 200), Image.ANTIALIAS)
+ img.save(prefix + '_small.png')
+
+ self.result = RESULT_SUCCESS
+ LOG.info("Finished rendering of job #%d." % self.job.id)
+ except KeyboardInterrupt:
+ self.result = RESULT_KEYBOARD_INTERRUPT
+ LOG.info("Rendering of job #%d interrupted!" % self.job.id)
+ except:
+ self.result = RESULT_RENDERING_EXCEPTION
+ LOG.info("Rendering of job #%d failed (exception occurred)!" %
+ self.job.id)
+
+ # Remove the job files if the rendering was not successful.
+ if self.result:
+ self.job.remove_all_files()
+
+ return self.result
- try:
- # Render the map in all RENDERING_RESULT_FORMATS
- result = renderer.render_map_into_files(job.maptitle, prefix,
- RENDERING_RESULT_FORMATS,
- 'zoom:16')
-
- # Render the index in all RENDERING_RESULT_FORMATS, using the
- # same map size.
- renderer.render_index(job.maptitle, prefix, RENDERING_RESULT_FORMATS,
- result.width, result.height)
-
- # Create thumbnail
- if 'png' in RENDERING_RESULT_FORMATS:
- img = Image.open(prefix + '.png')
- img.thumbnail((200, 200), Image.ANTIALIAS)
- img.save(prefix + '_small.png')
-
- return 0
- except KeyboardInterrupt:
- return 1
- except:
- return 2
if __name__ == '__main__':
def usage():
- sys.stderr.write('usage: %s <jobid>' % sys.argv[0])
+ sys.stderr.write('usage: %s <jobid> [timeout]\n' % sys.argv[0])
- if len(sys.argv) != 2:
+ if len(sys.argv) < 2 or len(sys.argv) > 3:
usage()
sys.exit(3)
try:
jobid = int(sys.argv[1])
job = MapRenderingJob.objects.get(id=jobid)
+
if job:
- sys.exit(render_job(job, 'renderer_%d' % os.getpid()))
+ prefix = 'renderer_%d_' % os.getpid()
+ if len(sys.argv) == 3:
+ renderer = TimingOutJobRenderer(job, int(sys.argv[2]), prefix)
+ else:
+ renderer = JobRenderer(job, prefix)
+
+ sys.exit(renderer.run())
else:
sys.stderr.write('Job #%d not found!' % jobid)
sys.exit(4)
--
1.6.3.3.277.g88938c