commit-gnue
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnue] r7309 - in trunk/gnue-appserver: . tests


From: johannes
Subject: [gnue] r7309 - in trunk/gnue-appserver: . tests
Date: Thu, 7 Apr 2005 06:54:39 -0500 (CDT)

Author: johannes
Date: 2005-04-07 06:54:37 -0500 (Thu, 07 Apr 2005)
New Revision: 7309

Added:
   trunk/gnue-appserver/tests/
   trunk/gnue-appserver/tests/data.py
Log:
Added first unittest to appserver



Property changes on: trunk/gnue-appserver/tests
___________________________________________________________________
Name: svn:ignore
   + *.pyc


Added: trunk/gnue-appserver/tests/data.py
===================================================================
--- trunk/gnue-appserver/tests/data.py  2005-04-07 09:31:56 UTC (rev 7308)
+++ trunk/gnue-appserver/tests/data.py  2005-04-07 11:54:37 UTC (rev 7309)
@@ -0,0 +1,694 @@
+# GNU Enterprise Application Server - Unit Testing - data.py
+#
+# Copyright 2001-2005 Free Software Foundation
+#
+# This file is part of GNU Enterprise
+#
+# GNU Enterprise is free software; you can redistribute it
+# and/or modify it under the terms of the GNU General Public
+# License as published by the Free Software Foundation; either
+# version 2, or (at your option) any later version.
+#
+# GNU Enterprise is distributed in the hope that it will be
+# useful, but WITHOUT ANY WARRANTY; without even the implied
+# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public
+# License along with program; see the file COPYING. If not,
+# write to the Free Software Foundation, Inc., 59 Temple Place
+# - Suite 330, Boston, MA 02111-1307, USA.
+#
+# $Id$
+
+import unittest
+
+from gnue.appserver import data, geasConfiguration
+from gnue.common.apps import GClientApp
+from gnue.common.datasources import GConnections
+
+# =============================================================================
+# TestCase for _cache class
+# =============================================================================
+
+class CachingTestCase (unittest.TestCase):
+  """
+  TestCase for the low-level class _cache
+  """
+
+  # ---------------------------------------------------------------------------
+  # Test Case Setup
+  # ---------------------------------------------------------------------------
+
+  def setUp (self):
+    """
+    Create a new instance of the cache
+    """
+
+    self.cache = data._cache ()
+
+
+  # ---------------------------------------------------------------------------
+  # Is the cache cleaned well ?
+  # ---------------------------------------------------------------------------
+
+  def test_cacheClean (self):
+    """
+    Test if the cache is cleaned well
+    """
+
+    table = u'FOO'
+    rows  = [u'100', u'200']
+
+    # Write data into clean cache. The cache should not be dirty at all
+    for row in rows:
+      self.cache.write (table, row, u'foo', 1, False)
+
+    result = self.__orderTables (self.cache.dirtyTables ())
+    self.assertEqual (result, [])
+    self.assertEqual (self.cache.inserted, {})
+    self.assertEqual (self.cache.deleted, {})
+
+    # Now write data to dirty cache, which will render the records as 'changed'
+    for row in rows:
+      self.cache.write (table, row, u'foo', 1, True)
+
+    self.cache.insertRecord (u'BAR', u'200')
+    self.cache.initialized (u'BAR', u'200')
+    self.cache.insertRecord (u'FOO', u'771')
+    self.cache.initialized (u'FOO', u'771')
+    self.cache.write (u'FOO', u'771', u'val', 5, True)
+
+    result = self.__orderTables (self.cache.dirtyTables ())
+    exp = [(u'BAR', u'200', 'initialized', [(u'gnue_id', u'200')]),
+           (u'FOO', u'100', 'changed', [(u'foo', 1)]),
+           (u'FOO', u'200', 'changed', [(u'foo', 1)]),
+           (u'FOO', u'771', 'inserted', [(u'gnue_id', u'771'), (u'val', 5)])]
+
+    self.assertEqual (result, exp)
+    self.assertEqual (self.cache.inserted, {u'BAR-200': (u'BAR', u'200'),
+        u'FOO-771': (u'FOO', u'771')})
+    self.assertEqual (self.cache.deleted, {})
+
+    # Cancelling the current atomic operation should make the cache clean again
+    self.cache.cancel ()
+
+    result = self.__orderTables (self.cache.dirtyTables ())
+    self.assertEqual (result, [])
+    self.assertEqual (self.cache.inserted, {})
+    self.assertEqual (self.cache.deleted, {})
+
+    # Insert another three records. One is changed into 'inserted' state,
+    # another one is left at 'initialized' and the last one is left at
+    # 'initializing'
+    self.cache.clear ()
+
+    self.cache.insertRecord (table, u'1')
+    self.cache.initialized (table, u'1')
+    self.cache.write (table, u'1', u'foo', 1, True)
+
+    self.cache.insertRecord (table, u'2')
+    self.cache.initialized (table, u'2')
+
+    self.cache.insertRecord (table, u'3')
+
+    exp = [(table, u'1', 'inserted', [(u'foo', 1), (u'gnue_id', u'1')]),
+           (table, u'2', 'initialized', [(u'gnue_id', u'2')]),
+           (table, u'3', 'initializing', [(u'gnue_id', u'3')])]
+    self.assertEqual (exp, self.__orderTables (self.cache.dirtyTables ()))
+
+    self.cache.makeClean (table, u'1')
+
+    # Now clear all 'clean' stuff
+    exp = [(table, u'2', 'initialized', [(u'gnue_id', u'2')]),
+           (table, u'3', 'initializing', [(u'gnue_id', u'3')])]
+    self.cache.clear (True)
+    self.assertEqual (exp, self.__orderTables (self.cache.dirtyTables ()))
+
+    # And now everything should really go away
+    self.cache.clear ()
+    self.assertEqual (self.cache.dirtyTables (), {})
+
+
+    
+  # ---------------------------------------------------------------------------
+  # Perform insertions
+  # ---------------------------------------------------------------------------
+
+  def test_cacheInsertions (self):
+    """
+    Perform variouse test regarding data insertion
+    """
+
+    self.cache.clear ()
+
+    table = u'table'
+    row   = u'100'
+    row2  = u'200'
+
+    # We cannot insert a record if it's already available
+    self.cache.write (table, row, u'foo', 1, True)
+    self.assertRaises (data.DuplicateRecordError, self.cache.insertRecord,
+        table, row)
+
+    self.cache.clear ()
+
+    # Insert a record, change and confirm it
+    self.cache.insertRecord (table, row)
+    self.cache.initialized (table, row)
+    self.cache.write (table, row, u'foo', 1, True)
+    self.cache.confirm ()
+
+    result = self.__orderTables (self.cache.dirtyTables ())
+    exp = [(table, row, 'inserted', [(u'foo', 1), (u'gnue_id', row)])]
+    self.assertEqual (result, exp)
+    self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+    self.assertEqual (self.cache.deleted, {})
+
+    # change it again, and insert another row. We won't set the latter one to
+    # initialized so it does not get into 'inserted' state
+    self.cache.write (table, row, u'foo', 2, True)
+    self.cache.insertRecord (table, row2)
+    self.cache.write (table, row2, u'bar', 2, True)
+
+    result = self.__orderTables (self.cache.dirtyTables ())
+    exp = [(table, row, 'inserted', [(u'foo', 2), (u'gnue_id', row)]),
+           (table, row2, 'initializing', [(u'bar', 2), (u'gnue_id', row2)])]
+    self.assertEqual (result, exp)
+    self.assertEqual (self.cache.inserted, {u'table-100': (table, row),
+                                            u'table-200': (table, row2)})
+    self.assertEqual (self.cache.deleted, {})
+
+    # Now delete the second record
+    self.cache.deleteRecord (table, row2)
+
+    # After deleting the previously inserted record the only record left should
+    # be the inserted one
+    result = self.__orderTables (self.cache.dirtyTables ())
+    exp = [(table, row, 'inserted', [(u'foo', 2), (u'gnue_id', row)]),
+           (table, row2, 'deleted', [(u'bar', 2), (u'gnue_id', None)])]
+    self.assertEqual (result, exp)
+    self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+    self.assertEqual (self.cache.deleted, {})
+
+    # cancel the atomic operation. This should give us the state from the last
+    # call of confirm ()
+    self.cache.cancel ()
+
+
+    result = self.__orderTables (self.cache.dirtyTables ())
+    exp = [(table, row, 'inserted', [(u'foo', 1), (u'gnue_id', row)])]
+    self.assertEqual (result, exp)
+    self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+    self.assertEqual (self.cache.deleted, {})
+
+    # Now simulate a commit which uses makeClean () to make a given record
+    # clean
+    self.cache.makeClean (table, row)
+    self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), [])
+    self.assertEqual (self.cache.inserted, {})
+    self.assertEqual (self.cache.deleted, {})
+
+    # Add a new record, confirm it and then delete it
+    self.cache.clear ()
+
+    self.cache.insertRecord (table, row)
+    self.cache.initialized (table, row)
+    self.cache.write (table, row, u'name', 'foobar', True)
+    self.cache.confirm ()
+
+    exp = [(table, row, 'inserted', [(u'gnue_id', row), (u'name', 'foobar')])]
+    self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), exp)
+    self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+
+    self.cache.deleteRecord (table, row)
+
+    exp = [(table, row, 'deleted', [(u'gnue_id', None), (u'name', 'foobar')])]
+    self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), exp)
+    self.assertEqual (self.cache.inserted, {})
+    self.assertEqual (self.cache.deleted, {})
+
+
+
+
+  # ---------------------------------------------------------------------------
+  # Perform tests with modifications of records
+  # ---------------------------------------------------------------------------
+
+  def test_cacheModification (self):
+    """
+    Perfrom tests with mixed modification operations
+    """
+
+    self.cache.clear ()
+
+    table = u'FOO'
+    row   = u'100'
+
+    # First add some clean data to the cache
+    self.cache.write (table, row, u'foo', 1, False)
+
+    # Now change an 'existing' record and cancel that operation
+    self.cache.write (table, row, u'bar', 2, True)
+    self.cache.write (table, row, u'foo', 100, True)
+    self.cache.cancel ()
+
+    # The cached (current) value of 'foo' should be still the clean value, and
+    # there must not be any 'dirty' data atm
+    self.assertEqual (self.cache.read (table, row, u'foo'), 1)
+    self.assertEqual (self.cache.dirtyTables (), {})
+
+    # Change the record again; now it should have the state 'changed'
+    self.cache.write (table, row, u'foo', 100, True)
+    result = self.__orderTables (self.cache.dirtyTables ())
+    exp = [(table, row, 'changed', [(u'foo', 100)])]
+    self.assertEqual (result, exp)
+    self.cache.confirm ()
+
+    # Calling initialized () for a record which wasn't inserted before must
+    # raise an exception
+    self.assertRaises (data.StateChangeError, self.cache.initialized,
+                       table, row)
+
+    # Now delete the record. It will change it's state and shows up in the
+    # deletion queue
+    self.cache.deleteRecord (table, row)
+    exp2 = [(table, row, 'deleted', [(u'foo', 100), (u'gnue_id', None)])]
+    self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), exp2)
+    self.assertEqual (self.cache.deleted, {u'FOO-100': (table, row)})
+
+    # Cancel the delete operation; we should be again at the point of the last
+    # confirmation
+    self.cache.cancel ()
+    result = self.__orderTables (self.cache.dirtyTables ())
+    self.assertEqual (result, exp)
+    self.assertEqual (self.cache.deleted, {})
+
+    # Make the changed record a clean record
+    self.cache.makeClean (table, row)
+    self.assertEqual (self.cache.dirtyTables (), {})
+
+    # Delting a record and afterwards adding the same key points out a bug in
+    # appserver
+    self.cache.deleteRecord (u'frog', u'dummy')
+    self.assertRaises (data.DuplicateRecordError, self.cache.insertRecord,
+                       u'frog', u'dummy')
+    expected = [(u'frog', u'dummy', 'deleted', [(u'gnue_id', None)])]
+    self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), expected)
+    self.assertEqual (self.cache.inserted, {})
+    self.assertEqual (self.cache.deleted, {u'frog-dummy': (u'frog', u'dummy')})
+
+    self.cache.cancel ()
+    self.assertEqual (self.cache.dirtyTables (), {})
+    self.assertEqual (self.cache.inserted, {})
+    self.assertEqual (self.cache.deleted, {})
+
+
+
+  # ---------------------------------------------------------------------------
+  # Prepare the given cache-ditctionary
+  # ---------------------------------------------------------------------------
+
+  def __orderTables (self, tables):
+    """
+    Convert the given cache-dictionary into a comparable order. This function
+    creates a sequence of quadruples built from tablename, rowid, current state
+    of that row in the cache and an ordered sequence of (field, value) pairs.
+    This sequence is sorted as well as the sequence of field-values.
+
+    @param tables: cache-dictionary of the form 'dict [table] [row] [field]'
+    @return: ordered sequence [(table, row, state, [(field, value), ...]), ... 
]
+    """
+
+    result = []
+ 
+    for (table, rows) in tables.items ():
+      for (row, fields) in rows.items ():
+        fseq = fields.items ()
+        fseq.sort ()
+
+        result.append ((table, row, self.cache.state (table, row), fseq))
+
+    result.sort ()
+
+    return result
+
+
+# =============================================================================
+# TestCase for connection class
+# =============================================================================
+
+class ConnectionTestCase (unittest.TestCase):
+  """
+  TestCase for the connection class (= low level layer for sessions)
+  """
+
+  # ---------------------------------------------------------------------------
+  # Prepare the test case
+  # ---------------------------------------------------------------------------
+
+  def setUp (self):
+    """
+    Create two seperate connections instances (so we can simulate two
+    concurrent sessions) and log into both (using 'gnue' as username and
+    password).
+    """
+
+    app = GClientApp.GClientApp (application = 'appserver',
+                                 defaults = geasConfiguration.ConfigOptions)
+
+    # Fake the login to the default connection
+    c = app.connections.getConnection ('gnue')
+    c.parameters ['_username'] = 'gnue'
+    c.parameters ['_password'] = 'gnue'
+
+    conn = GConnections.GConnections (app.connections._location,
+                                      app.connections._loginHandler,
+                                      app.connections._loginOptions)
+    c2 = conn.getConnection ('gnue')
+    c2.parameters ['_username'] = 'gnue'
+    c2.parameters ['_password'] = 'gnue'
+
+    self.connection = data.connection (app.connections, 'gnue')
+    self.concurrent = data.connection (conn, 'gnue')
+
+    self.remove = []
+
+
+  # ---------------------------------------------------------------------------
+  # Run some tests on the connection instance
+  # ---------------------------------------------------------------------------
+
+  def testConnection (self):
+    """
+    Perform all tests
+    """
+
+    # Run an unordered query
+    content = {None: (u'address_person', None, None, [u'address_name'])}
+    rs = self.connection.query (content, None, None)
+
+    self.assertEqual (rs.count (), 4)
+    e = [u'James Tiberius Kirk', u'Spock', u'Leonard H. McCoy',
+         u'Pavel Andreievich Chekov']
+    e.sort ()
+    self.assertEqual (self.__rsToSeq (rs, [u'address_name'], True), e)
+
+    # Run an ordered query
+    content = {None: (u'address_person', None, None, [u'address_name'])}
+    rs = self.connection.query (content, None, [{'name': u'address_name'}])
+
+    self.assertEqual (rs.count (), 4)
+    e = [u'James Tiberius Kirk', u'Leonard H. McCoy',
+         u'Pavel Andreievich Chekov', u'Spock']
+    self.assertEqual (self.__rsToSeq (rs, [u'address_name']), e)
+
+    # Run the same query again, and using a non-prefetched field
+    rs = self.connection.query (content, None, [{'name': u'address_name',
+                                                 'descending': True}])
+    result = []
+    rec = rs.nextRecord ()
+    while rec:
+      result.append ((rec.getField (u'address_name'), rec.getField
+                     (u'address_zip')))
+      rec = rs.nextRecord ()
+    rs.close ()
+
+    e = [(u'Spock', u'4711'),
+         (u'Pavel Andreievich Chekov', u'195251'),
+         (u'Leonard H. McCoy', u'39216'),
+         (u'James Tiberius Kirk', u'2002')]
+
+    self.assertEqual (result, e)
+
+    # Run the query with a simple condition
+    cond = ['like', ['field', u'address_zip'], ['const', '%1']]
+    rs = self.connection.query (content, cond, [{'name': u'address_name'}])
+
+    result = self.__rsToSeq (rs, [u'address_name'])
+    e = [u'Pavel Andreievich Chekov', u'Spock']
+    self.assertEqual (result, e)
+
+    # -------------------------------------------------------------------------
+    # Checking dirty reads
+
+    (orgCrew, orgData) = self.__getCrewMembers ()
+    expected = [(u'James Tiberius Kirk', u'US'),
+                (u'Leonard H. McCoy', u'US'),
+                (u'Pavel Andreievich Chekov', u'RU'),
+                (u'Spock', None)]
+
+    self.assertEqual (orgData, expected)
+
+    # Now, Doc McCoy moves to Austria
+    austria = u'0000000000000000000000200000000F'
+    orgCrew [1].putField (u'address_country', austria)
+    
+    # Spock will leave (for the moment)
+    spocksId = orgCrew [-1].getField (u'gnue_id')
+    self.connection.deleteRecord (u'address_person', spocksId)
+
+    # And a new crew member signs on
+    new = self.connection.insertRecord (u'address_person')
+    new.initialized ()
+    new.putField (u'address_name', u'Foobar')
+    new.putField (u'address_zip', u'6890')
+    new.putField (u'address_country', austria)
+
+    # So running the previous query again
+    (newCrew, newData) = self.__getCrewMembers ()
+    expected = [(u'Foobar', u'AT'),
+                (u'James Tiberius Kirk', u'US'),
+                (u'Leonard H. McCoy', u'AT'),
+                (u'Pavel Andreievich Chekov', u'RU')]
+    self.assertEqual (expected, newData)
+
+    # Now, there's another session doing the same request
+    (secCrew, secData) = self.__getCrewMembers (self.concurrent)
+
+    # Since the changes made before are not yet commited the second connection
+    # should still get the original result
+    self.assertEqual (orgData, secData)
+
+    # Cancelling the running atomic operation should also yield the former
+    # situation 
+    self.connection.cancelChanges ()
+    (newCrew, newData) = self.__getCrewMembers ()
+
+    self.assertEqual (orgData, newData)
+
+    # ---
+    # and again, Doc McCoy moves to Austria
+    austria = u'0000000000000000000000200000000F'
+    orgCrew [1].putField (u'address_country', austria)
+    
+    # Spock will leave (for the moment)
+    spocksId = orgCrew [-1].getField (u'gnue_id')
+    self.connection.deleteRecord (u'address_person', spocksId)
+
+    # And a new crew member signs on
+    new = self.connection.insertRecord (u'address_person')
+    new.initialized ()
+    new.putField (u'address_name', u'Foobar')
+    new.putField (u'address_zip', u'6890')
+    new.putField (u'address_country', austria)
+
+    oz = self.connection.insertRecord (u'address_country')
+    oz.initialized ()
+    oz.putField (u'address_code', u'OZ')
+    oz.putField (u'address_name', u'Oz')
+
+    self.connection.confirmChanges ()
+
+    # make some more changes
+    new.putField (u'address_name', u'buzzing the foobar')
+    new.putField (u'address_children', 2)
+
+    self.connection.deleteRecord (u'address_country', oz.getField (u'gnue_id'))
+
+    # change zip of James T. Kirk
+    orgCrew [0].putField (u'address_zip', u'0815')
+
+    # and now cancel all that changes
+    self.connection.cancelChanges ()
+
+    (newCrew, newData) = self.__getCrewMembers ()
+    expected = [(u'Foobar', u'AT'),
+                (u'James Tiberius Kirk', u'US'),
+                (u'Leonard H. McCoy', u'AT'),
+                (u'Pavel Andreievich Chekov', u'RU')]
+    self.assertEqual (expected, newData)
+
+    # The address_country table should again contain the new country record
+    content   = {None: (u'address_country', None, None, [u'address_name'])}
+    condition = {u'address_code': u'OZ'}
+    resultSet = self.connection.query (content, condition, None)
+    self.assertEqual ([u'Oz'], self.__rsToSeq (resultSet, [u'address_name']))
+
+    new.putField (u'address_country', oz.getField (u'gnue_id'))
+
+    # Doing a commit here means storing all changes, but also shows if data.py
+    # does a proper sorting of insertions. This is because we've created the
+    # new person before the new country (which is in fact a master record for
+    # the person and theirfor must be added *before* the person).
+    self.connection.commit ()
+
+    # Since the second session had no commit until now, no changes made by the
+    # first session should be visible at all.
+    (secCrew, secData) = self.__getCrewMembers (self.concurrent)
+    expected = [(u'James Tiberius Kirk', u'US'),
+                (u'Leonard H. McCoy', u'US'),
+                (u'Pavel Andreievich Chekov', u'RU'),
+                (u'Spock', None)]
+    self.assertEqual (expected, secData)
+
+    # After closing the second session's transaction the query should now
+    # return all changes made by the first session
+    self.concurrent.rollback ()
+
+    (secCrew, secData) = self.__getCrewMembers (self.concurrent)
+    expected = [(u'Foobar', u'OZ'),
+                (u'James Tiberius Kirk', u'US'),
+                (u'Leonard H. McCoy', u'AT'),
+                (u'Pavel Andreievich Chekov', u'RU')]
+    self.assertEqual (expected, secData)
+
+    self.remove = [(u'address_person', new.getField (u'gnue_id')),
+                   (u'address_country', oz.getField (u'gnue_id'))]
+
+
+
+  # ---------------------------------------------------------------------------
+  # Restore to the initial situation in the test database
+  # ---------------------------------------------------------------------------
+
+  def tearDown (self):
+    """
+    Undo all changes made by the testcase
+    """
+
+    self.connection.rollback ()
+
+    # First we have to re-add the record for Spock
+    spock = self.connection.insertRecord (u'address_person')
+    spock.initialized ()
+    spock.putField (u'address_name', u'Spock')
+    spock.putField (u'address_street', u'Vulc Lane 1')
+    spock.putField (u'address_zip', u'4711')
+    spock.putField (u'address_city', u'Vulcane')
+    spock.putField (u'address_children', 0)
+    spock.putField (u'address_weight', 78.8)
+    spock.putField (u'address_born', u'2230-01-01')
+    spock.putField (u'address_meettime', u'6:30')
+    spock.putField (u'address_lastmeeting', u'2270-05-17 08:00')
+    spock.putField (u'address_human', False)
+    # This is a bit hackish cause we're changing the primary key
+    spock.putField (u'gnue_id', '00000000000000000000000000001101')
+
+    # Reset the country for McCoy to the US
+    mccoy = self.connection.findRecord (u'address_person',
+                     u'00000000000000000000000000001102', [u'address_country'])
+    mccoy.putField (u'address_country', u'000000000000000000000020000000E3')
+
+    # And remove newly added records
+    for (table, row) in self.remove:
+      self.connection.deleteRecord (table, row)
+
+    self.connection.commit ()
+
+
+
+  # ---------------------------------------------------------------------------
+  # Create a list of records and it's data for all crew members
+  # ---------------------------------------------------------------------------
+
+  def __getCrewMembers (self, connection = None):
+    """
+    Create a list of records and it's data for all crew members
+
+    @param connection: if given this connection instance will be used. It
+        defaults to 'self.connection'.
+
+    @return: tuple (recordlist, datalist), where recordlist is a sequence of
+        data.record instances, and datalist is a sequence of tuples
+        (name, country-code), one per record found.
+    """
+
+    if connection is None:
+      connection = self.connection
+
+    # The following content definition drives 'address_person' as master table
+    # and joins 'address_country' in a left outer join, where 'address_name' is
+    # the onyle prefetched field for both tables. This forces data.py to do
+    # refetching for the joined table (cause we're accessing 'address_code').
+    content = {u't0': (u'address_person', None, None, [u'address_name']),
+               u't1': (u'address_country', u't0', u'address_country',
+                      [u'address_name'])}
+
+    sortOrder = [{'name': u't0.address_name'}]
+    resultSet = connection.query (content, None, sortOrder)
+    recList   = []
+    recData   = []
+
+    rec = resultSet.nextRecord ()
+    while rec:
+      country = rec.getField (u'address_country')
+      if country is not None:
+        jRec  = connection.findRecord (u'address_country', country, [])
+        cCode = jRec.getField (u'address_code')
+      else:
+        cCode = None
+
+      recData.append ((rec.getField (u'address_name'), cCode))
+
+      recList.append (rec)
+      rec = resultSet.nextRecord ()
+
+    resultSet.close ()
+
+    return (recList, recData)
+
+
+  # ---------------------------------------------------------------------------
+  # Convert a resultSet into a flat sequence
+  # ---------------------------------------------------------------------------
+
+  def __rsToSeq (self, rs, fields, reorder = False):
+    """
+    Flatten a resultSet into a sequence of tuples holding the requested field
+    values.
+
+    @param rs: the resultset to iterate
+    @param fields: sequence of fieldnames to extract
+    @param reorder: if True, the resulting sequence will be sorted
+
+    @return: sequence of value-tuples, one per record. If only one field is
+        requested, the result is a sequence of field-values (without tuples)
+    """
+
+    result = []
+    append = len (fields) == 1 and result.extend or result.append
+
+    rec = rs.nextRecord ()
+    while rec:
+      append ([rec.getField (f) for f in fields])
+      rec = rs.nextRecord ()
+
+    rs.close ()
+
+    if reorder:
+      result.sort ()
+
+    return result
+
+
+# =============================================================================
+# Main program
+# =============================================================================
+
+if __name__ == '__main__':
+  suite1 = unittest.makeSuite (CachingTestCase)
+  suite2 = unittest.makeSuite (ConnectionTestCase)
+  alltests = unittest.TestSuite ((suite1, suite2))
+
+  unittest.TextTestRunner (verbosity = 2).run (alltests)


Property changes on: trunk/gnue-appserver/tests/data.py
___________________________________________________________________
Name: svn:keywords
   + Id




reply via email to

[Prev in Thread] Current Thread [Next in Thread]