[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnue] r7309 - in trunk/gnue-appserver: . tests
From: |
johannes |
Subject: |
[gnue] r7309 - in trunk/gnue-appserver: . tests |
Date: |
Thu, 7 Apr 2005 06:54:39 -0500 (CDT) |
Author: johannes
Date: 2005-04-07 06:54:37 -0500 (Thu, 07 Apr 2005)
New Revision: 7309
Added:
trunk/gnue-appserver/tests/
trunk/gnue-appserver/tests/data.py
Log:
Added first unittest to appserver
Property changes on: trunk/gnue-appserver/tests
___________________________________________________________________
Name: svn:ignore
+ *.pyc
Added: trunk/gnue-appserver/tests/data.py
===================================================================
--- trunk/gnue-appserver/tests/data.py 2005-04-07 09:31:56 UTC (rev 7308)
+++ trunk/gnue-appserver/tests/data.py 2005-04-07 11:54:37 UTC (rev 7309)
@@ -0,0 +1,694 @@
+# GNU Enterprise Application Server - Unit Testing - data.py
+#
+# Copyright 2001-2005 Free Software Foundation
+#
+# This file is part of GNU Enterprise
+#
+# GNU Enterprise is free software; you can redistribute it
+# and/or modify it under the terms of the GNU General Public
+# License as published by the Free Software Foundation; either
+# version 2, or (at your option) any later version.
+#
+# GNU Enterprise is distributed in the hope that it will be
+# useful, but WITHOUT ANY WARRANTY; without even the implied
+# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public
+# License along with program; see the file COPYING. If not,
+# write to the Free Software Foundation, Inc., 59 Temple Place
+# - Suite 330, Boston, MA 02111-1307, USA.
+#
+# $Id$
+
+import unittest
+
+from gnue.appserver import data, geasConfiguration
+from gnue.common.apps import GClientApp
+from gnue.common.datasources import GConnections
+
+# =============================================================================
+# TestCase for _cache class
+# =============================================================================
+
+class CachingTestCase (unittest.TestCase):
+ """
+ TestCase for the low-level class _cache
+ """
+
+ # ---------------------------------------------------------------------------
+ # Test Case Setup
+ # ---------------------------------------------------------------------------
+
+ def setUp (self):
+ """
+ Create a new instance of the cache
+ """
+
+ self.cache = data._cache ()
+
+
+ # ---------------------------------------------------------------------------
+ # Is the cache cleaned well ?
+ # ---------------------------------------------------------------------------
+
+ def test_cacheClean (self):
+ """
+ Test if the cache is cleaned well
+ """
+
+ table = u'FOO'
+ rows = [u'100', u'200']
+
+ # Write data into clean cache. The cache should not be dirty at all
+ for row in rows:
+ self.cache.write (table, row, u'foo', 1, False)
+
+ result = self.__orderTables (self.cache.dirtyTables ())
+ self.assertEqual (result, [])
+ self.assertEqual (self.cache.inserted, {})
+ self.assertEqual (self.cache.deleted, {})
+
+ # Now write data to dirty cache, which will render the records as 'changed'
+ for row in rows:
+ self.cache.write (table, row, u'foo', 1, True)
+
+ self.cache.insertRecord (u'BAR', u'200')
+ self.cache.initialized (u'BAR', u'200')
+ self.cache.insertRecord (u'FOO', u'771')
+ self.cache.initialized (u'FOO', u'771')
+ self.cache.write (u'FOO', u'771', u'val', 5, True)
+
+ result = self.__orderTables (self.cache.dirtyTables ())
+ exp = [(u'BAR', u'200', 'initialized', [(u'gnue_id', u'200')]),
+ (u'FOO', u'100', 'changed', [(u'foo', 1)]),
+ (u'FOO', u'200', 'changed', [(u'foo', 1)]),
+ (u'FOO', u'771', 'inserted', [(u'gnue_id', u'771'), (u'val', 5)])]
+
+ self.assertEqual (result, exp)
+ self.assertEqual (self.cache.inserted, {u'BAR-200': (u'BAR', u'200'),
+ u'FOO-771': (u'FOO', u'771')})
+ self.assertEqual (self.cache.deleted, {})
+
+ # Cancelling the current atomic operation should make the cache clean again
+ self.cache.cancel ()
+
+ result = self.__orderTables (self.cache.dirtyTables ())
+ self.assertEqual (result, [])
+ self.assertEqual (self.cache.inserted, {})
+ self.assertEqual (self.cache.deleted, {})
+
+ # Insert another three records. One is changed into 'inserted' state,
+ # another one is left at 'initialized' and the last one is left at
+ # 'initializing'
+ self.cache.clear ()
+
+ self.cache.insertRecord (table, u'1')
+ self.cache.initialized (table, u'1')
+ self.cache.write (table, u'1', u'foo', 1, True)
+
+ self.cache.insertRecord (table, u'2')
+ self.cache.initialized (table, u'2')
+
+ self.cache.insertRecord (table, u'3')
+
+ exp = [(table, u'1', 'inserted', [(u'foo', 1), (u'gnue_id', u'1')]),
+ (table, u'2', 'initialized', [(u'gnue_id', u'2')]),
+ (table, u'3', 'initializing', [(u'gnue_id', u'3')])]
+ self.assertEqual (exp, self.__orderTables (self.cache.dirtyTables ()))
+
+ self.cache.makeClean (table, u'1')
+
+ # Now clear all 'clean' stuff
+ exp = [(table, u'2', 'initialized', [(u'gnue_id', u'2')]),
+ (table, u'3', 'initializing', [(u'gnue_id', u'3')])]
+ self.cache.clear (True)
+ self.assertEqual (exp, self.__orderTables (self.cache.dirtyTables ()))
+
+ # And now everything should really go away
+ self.cache.clear ()
+ self.assertEqual (self.cache.dirtyTables (), {})
+
+
+
+ # ---------------------------------------------------------------------------
+ # Perform insertions
+ # ---------------------------------------------------------------------------
+
+ def test_cacheInsertions (self):
+ """
+ Perform variouse test regarding data insertion
+ """
+
+ self.cache.clear ()
+
+ table = u'table'
+ row = u'100'
+ row2 = u'200'
+
+ # We cannot insert a record if it's already available
+ self.cache.write (table, row, u'foo', 1, True)
+ self.assertRaises (data.DuplicateRecordError, self.cache.insertRecord,
+ table, row)
+
+ self.cache.clear ()
+
+ # Insert a record, change and confirm it
+ self.cache.insertRecord (table, row)
+ self.cache.initialized (table, row)
+ self.cache.write (table, row, u'foo', 1, True)
+ self.cache.confirm ()
+
+ result = self.__orderTables (self.cache.dirtyTables ())
+ exp = [(table, row, 'inserted', [(u'foo', 1), (u'gnue_id', row)])]
+ self.assertEqual (result, exp)
+ self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+ self.assertEqual (self.cache.deleted, {})
+
+ # change it again, and insert another row. We won't set the latter one to
+ # initialized so it does not get into 'inserted' state
+ self.cache.write (table, row, u'foo', 2, True)
+ self.cache.insertRecord (table, row2)
+ self.cache.write (table, row2, u'bar', 2, True)
+
+ result = self.__orderTables (self.cache.dirtyTables ())
+ exp = [(table, row, 'inserted', [(u'foo', 2), (u'gnue_id', row)]),
+ (table, row2, 'initializing', [(u'bar', 2), (u'gnue_id', row2)])]
+ self.assertEqual (result, exp)
+ self.assertEqual (self.cache.inserted, {u'table-100': (table, row),
+ u'table-200': (table, row2)})
+ self.assertEqual (self.cache.deleted, {})
+
+ # Now delete the second record
+ self.cache.deleteRecord (table, row2)
+
+ # After deleting the previously inserted record the only record left should
+ # be the inserted one
+ result = self.__orderTables (self.cache.dirtyTables ())
+ exp = [(table, row, 'inserted', [(u'foo', 2), (u'gnue_id', row)]),
+ (table, row2, 'deleted', [(u'bar', 2), (u'gnue_id', None)])]
+ self.assertEqual (result, exp)
+ self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+ self.assertEqual (self.cache.deleted, {})
+
+ # cancel the atomic operation. This should give us the state from the last
+ # call of confirm ()
+ self.cache.cancel ()
+
+
+ result = self.__orderTables (self.cache.dirtyTables ())
+ exp = [(table, row, 'inserted', [(u'foo', 1), (u'gnue_id', row)])]
+ self.assertEqual (result, exp)
+ self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+ self.assertEqual (self.cache.deleted, {})
+
+ # Now simulate a commit which uses makeClean () to make a given record
+ # clean
+ self.cache.makeClean (table, row)
+ self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), [])
+ self.assertEqual (self.cache.inserted, {})
+ self.assertEqual (self.cache.deleted, {})
+
+ # Add a new record, confirm it and then delete it
+ self.cache.clear ()
+
+ self.cache.insertRecord (table, row)
+ self.cache.initialized (table, row)
+ self.cache.write (table, row, u'name', 'foobar', True)
+ self.cache.confirm ()
+
+ exp = [(table, row, 'inserted', [(u'gnue_id', row), (u'name', 'foobar')])]
+ self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), exp)
+ self.assertEqual (self.cache.inserted, {u'table-100': (table, row)})
+
+ self.cache.deleteRecord (table, row)
+
+ exp = [(table, row, 'deleted', [(u'gnue_id', None), (u'name', 'foobar')])]
+ self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), exp)
+ self.assertEqual (self.cache.inserted, {})
+ self.assertEqual (self.cache.deleted, {})
+
+
+
+
+ # ---------------------------------------------------------------------------
+ # Perform tests with modifications of records
+ # ---------------------------------------------------------------------------
+
+ def test_cacheModification (self):
+ """
+ Perfrom tests with mixed modification operations
+ """
+
+ self.cache.clear ()
+
+ table = u'FOO'
+ row = u'100'
+
+ # First add some clean data to the cache
+ self.cache.write (table, row, u'foo', 1, False)
+
+ # Now change an 'existing' record and cancel that operation
+ self.cache.write (table, row, u'bar', 2, True)
+ self.cache.write (table, row, u'foo', 100, True)
+ self.cache.cancel ()
+
+ # The cached (current) value of 'foo' should be still the clean value, and
+ # there must not be any 'dirty' data atm
+ self.assertEqual (self.cache.read (table, row, u'foo'), 1)
+ self.assertEqual (self.cache.dirtyTables (), {})
+
+ # Change the record again; now it should have the state 'changed'
+ self.cache.write (table, row, u'foo', 100, True)
+ result = self.__orderTables (self.cache.dirtyTables ())
+ exp = [(table, row, 'changed', [(u'foo', 100)])]
+ self.assertEqual (result, exp)
+ self.cache.confirm ()
+
+ # Calling initialized () for a record which wasn't inserted before must
+ # raise an exception
+ self.assertRaises (data.StateChangeError, self.cache.initialized,
+ table, row)
+
+ # Now delete the record. It will change it's state and shows up in the
+ # deletion queue
+ self.cache.deleteRecord (table, row)
+ exp2 = [(table, row, 'deleted', [(u'foo', 100), (u'gnue_id', None)])]
+ self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), exp2)
+ self.assertEqual (self.cache.deleted, {u'FOO-100': (table, row)})
+
+ # Cancel the delete operation; we should be again at the point of the last
+ # confirmation
+ self.cache.cancel ()
+ result = self.__orderTables (self.cache.dirtyTables ())
+ self.assertEqual (result, exp)
+ self.assertEqual (self.cache.deleted, {})
+
+ # Make the changed record a clean record
+ self.cache.makeClean (table, row)
+ self.assertEqual (self.cache.dirtyTables (), {})
+
+ # Delting a record and afterwards adding the same key points out a bug in
+ # appserver
+ self.cache.deleteRecord (u'frog', u'dummy')
+ self.assertRaises (data.DuplicateRecordError, self.cache.insertRecord,
+ u'frog', u'dummy')
+ expected = [(u'frog', u'dummy', 'deleted', [(u'gnue_id', None)])]
+ self.assertEqual (self.__orderTables (self.cache.dirtyTables ()), expected)
+ self.assertEqual (self.cache.inserted, {})
+ self.assertEqual (self.cache.deleted, {u'frog-dummy': (u'frog', u'dummy')})
+
+ self.cache.cancel ()
+ self.assertEqual (self.cache.dirtyTables (), {})
+ self.assertEqual (self.cache.inserted, {})
+ self.assertEqual (self.cache.deleted, {})
+
+
+
+ # ---------------------------------------------------------------------------
+ # Prepare the given cache-ditctionary
+ # ---------------------------------------------------------------------------
+
+ def __orderTables (self, tables):
+ """
+ Convert the given cache-dictionary into a comparable order. This function
+ creates a sequence of quadruples built from tablename, rowid, current state
+ of that row in the cache and an ordered sequence of (field, value) pairs.
+ This sequence is sorted as well as the sequence of field-values.
+
+ @param tables: cache-dictionary of the form 'dict [table] [row] [field]'
+ @return: ordered sequence [(table, row, state, [(field, value), ...]), ...
]
+ """
+
+ result = []
+
+ for (table, rows) in tables.items ():
+ for (row, fields) in rows.items ():
+ fseq = fields.items ()
+ fseq.sort ()
+
+ result.append ((table, row, self.cache.state (table, row), fseq))
+
+ result.sort ()
+
+ return result
+
+
+# =============================================================================
+# TestCase for connection class
+# =============================================================================
+
+class ConnectionTestCase (unittest.TestCase):
+ """
+ TestCase for the connection class (= low level layer for sessions)
+ """
+
+ # ---------------------------------------------------------------------------
+ # Prepare the test case
+ # ---------------------------------------------------------------------------
+
+ def setUp (self):
+ """
+ Create two seperate connections instances (so we can simulate two
+ concurrent sessions) and log into both (using 'gnue' as username and
+ password).
+ """
+
+ app = GClientApp.GClientApp (application = 'appserver',
+ defaults = geasConfiguration.ConfigOptions)
+
+ # Fake the login to the default connection
+ c = app.connections.getConnection ('gnue')
+ c.parameters ['_username'] = 'gnue'
+ c.parameters ['_password'] = 'gnue'
+
+ conn = GConnections.GConnections (app.connections._location,
+ app.connections._loginHandler,
+ app.connections._loginOptions)
+ c2 = conn.getConnection ('gnue')
+ c2.parameters ['_username'] = 'gnue'
+ c2.parameters ['_password'] = 'gnue'
+
+ self.connection = data.connection (app.connections, 'gnue')
+ self.concurrent = data.connection (conn, 'gnue')
+
+ self.remove = []
+
+
+ # ---------------------------------------------------------------------------
+ # Run some tests on the connection instance
+ # ---------------------------------------------------------------------------
+
+ def testConnection (self):
+ """
+ Perform all tests
+ """
+
+ # Run an unordered query
+ content = {None: (u'address_person', None, None, [u'address_name'])}
+ rs = self.connection.query (content, None, None)
+
+ self.assertEqual (rs.count (), 4)
+ e = [u'James Tiberius Kirk', u'Spock', u'Leonard H. McCoy',
+ u'Pavel Andreievich Chekov']
+ e.sort ()
+ self.assertEqual (self.__rsToSeq (rs, [u'address_name'], True), e)
+
+ # Run an ordered query
+ content = {None: (u'address_person', None, None, [u'address_name'])}
+ rs = self.connection.query (content, None, [{'name': u'address_name'}])
+
+ self.assertEqual (rs.count (), 4)
+ e = [u'James Tiberius Kirk', u'Leonard H. McCoy',
+ u'Pavel Andreievich Chekov', u'Spock']
+ self.assertEqual (self.__rsToSeq (rs, [u'address_name']), e)
+
+ # Run the same query again, and using a non-prefetched field
+ rs = self.connection.query (content, None, [{'name': u'address_name',
+ 'descending': True}])
+ result = []
+ rec = rs.nextRecord ()
+ while rec:
+ result.append ((rec.getField (u'address_name'), rec.getField
+ (u'address_zip')))
+ rec = rs.nextRecord ()
+ rs.close ()
+
+ e = [(u'Spock', u'4711'),
+ (u'Pavel Andreievich Chekov', u'195251'),
+ (u'Leonard H. McCoy', u'39216'),
+ (u'James Tiberius Kirk', u'2002')]
+
+ self.assertEqual (result, e)
+
+ # Run the query with a simple condition
+ cond = ['like', ['field', u'address_zip'], ['const', '%1']]
+ rs = self.connection.query (content, cond, [{'name': u'address_name'}])
+
+ result = self.__rsToSeq (rs, [u'address_name'])
+ e = [u'Pavel Andreievich Chekov', u'Spock']
+ self.assertEqual (result, e)
+
+ # -------------------------------------------------------------------------
+ # Checking dirty reads
+
+ (orgCrew, orgData) = self.__getCrewMembers ()
+ expected = [(u'James Tiberius Kirk', u'US'),
+ (u'Leonard H. McCoy', u'US'),
+ (u'Pavel Andreievich Chekov', u'RU'),
+ (u'Spock', None)]
+
+ self.assertEqual (orgData, expected)
+
+ # Now, Doc McCoy moves to Austria
+ austria = u'0000000000000000000000200000000F'
+ orgCrew [1].putField (u'address_country', austria)
+
+ # Spock will leave (for the moment)
+ spocksId = orgCrew [-1].getField (u'gnue_id')
+ self.connection.deleteRecord (u'address_person', spocksId)
+
+ # And a new crew member signs on
+ new = self.connection.insertRecord (u'address_person')
+ new.initialized ()
+ new.putField (u'address_name', u'Foobar')
+ new.putField (u'address_zip', u'6890')
+ new.putField (u'address_country', austria)
+
+ # So running the previous query again
+ (newCrew, newData) = self.__getCrewMembers ()
+ expected = [(u'Foobar', u'AT'),
+ (u'James Tiberius Kirk', u'US'),
+ (u'Leonard H. McCoy', u'AT'),
+ (u'Pavel Andreievich Chekov', u'RU')]
+ self.assertEqual (expected, newData)
+
+ # Now, there's another session doing the same request
+ (secCrew, secData) = self.__getCrewMembers (self.concurrent)
+
+ # Since the changes made before are not yet commited the second connection
+ # should still get the original result
+ self.assertEqual (orgData, secData)
+
+ # Cancelling the running atomic operation should also yield the former
+ # situation
+ self.connection.cancelChanges ()
+ (newCrew, newData) = self.__getCrewMembers ()
+
+ self.assertEqual (orgData, newData)
+
+ # ---
+ # and again, Doc McCoy moves to Austria
+ austria = u'0000000000000000000000200000000F'
+ orgCrew [1].putField (u'address_country', austria)
+
+ # Spock will leave (for the moment)
+ spocksId = orgCrew [-1].getField (u'gnue_id')
+ self.connection.deleteRecord (u'address_person', spocksId)
+
+ # And a new crew member signs on
+ new = self.connection.insertRecord (u'address_person')
+ new.initialized ()
+ new.putField (u'address_name', u'Foobar')
+ new.putField (u'address_zip', u'6890')
+ new.putField (u'address_country', austria)
+
+ oz = self.connection.insertRecord (u'address_country')
+ oz.initialized ()
+ oz.putField (u'address_code', u'OZ')
+ oz.putField (u'address_name', u'Oz')
+
+ self.connection.confirmChanges ()
+
+ # make some more changes
+ new.putField (u'address_name', u'buzzing the foobar')
+ new.putField (u'address_children', 2)
+
+ self.connection.deleteRecord (u'address_country', oz.getField (u'gnue_id'))
+
+ # change zip of James T. Kirk
+ orgCrew [0].putField (u'address_zip', u'0815')
+
+ # and now cancel all that changes
+ self.connection.cancelChanges ()
+
+ (newCrew, newData) = self.__getCrewMembers ()
+ expected = [(u'Foobar', u'AT'),
+ (u'James Tiberius Kirk', u'US'),
+ (u'Leonard H. McCoy', u'AT'),
+ (u'Pavel Andreievich Chekov', u'RU')]
+ self.assertEqual (expected, newData)
+
+ # The address_country table should again contain the new country record
+ content = {None: (u'address_country', None, None, [u'address_name'])}
+ condition = {u'address_code': u'OZ'}
+ resultSet = self.connection.query (content, condition, None)
+ self.assertEqual ([u'Oz'], self.__rsToSeq (resultSet, [u'address_name']))
+
+ new.putField (u'address_country', oz.getField (u'gnue_id'))
+
+ # Doing a commit here means storing all changes, but also shows if data.py
+ # does a proper sorting of insertions. This is because we've created the
+ # new person before the new country (which is in fact a master record for
+ # the person and theirfor must be added *before* the person).
+ self.connection.commit ()
+
+ # Since the second session had no commit until now, no changes made by the
+ # first session should be visible at all.
+ (secCrew, secData) = self.__getCrewMembers (self.concurrent)
+ expected = [(u'James Tiberius Kirk', u'US'),
+ (u'Leonard H. McCoy', u'US'),
+ (u'Pavel Andreievich Chekov', u'RU'),
+ (u'Spock', None)]
+ self.assertEqual (expected, secData)
+
+ # After closing the second session's transaction the query should now
+ # return all changes made by the first session
+ self.concurrent.rollback ()
+
+ (secCrew, secData) = self.__getCrewMembers (self.concurrent)
+ expected = [(u'Foobar', u'OZ'),
+ (u'James Tiberius Kirk', u'US'),
+ (u'Leonard H. McCoy', u'AT'),
+ (u'Pavel Andreievich Chekov', u'RU')]
+ self.assertEqual (expected, secData)
+
+ self.remove = [(u'address_person', new.getField (u'gnue_id')),
+ (u'address_country', oz.getField (u'gnue_id'))]
+
+
+
+ # ---------------------------------------------------------------------------
+ # Restore to the initial situation in the test database
+ # ---------------------------------------------------------------------------
+
+ def tearDown (self):
+ """
+ Undo all changes made by the testcase
+ """
+
+ self.connection.rollback ()
+
+ # First we have to re-add the record for Spock
+ spock = self.connection.insertRecord (u'address_person')
+ spock.initialized ()
+ spock.putField (u'address_name', u'Spock')
+ spock.putField (u'address_street', u'Vulc Lane 1')
+ spock.putField (u'address_zip', u'4711')
+ spock.putField (u'address_city', u'Vulcane')
+ spock.putField (u'address_children', 0)
+ spock.putField (u'address_weight', 78.8)
+ spock.putField (u'address_born', u'2230-01-01')
+ spock.putField (u'address_meettime', u'6:30')
+ spock.putField (u'address_lastmeeting', u'2270-05-17 08:00')
+ spock.putField (u'address_human', False)
+ # This is a bit hackish cause we're changing the primary key
+ spock.putField (u'gnue_id', '00000000000000000000000000001101')
+
+ # Reset the country for McCoy to the US
+ mccoy = self.connection.findRecord (u'address_person',
+ u'00000000000000000000000000001102', [u'address_country'])
+ mccoy.putField (u'address_country', u'000000000000000000000020000000E3')
+
+ # And remove newly added records
+ for (table, row) in self.remove:
+ self.connection.deleteRecord (table, row)
+
+ self.connection.commit ()
+
+
+
+ # ---------------------------------------------------------------------------
+ # Create a list of records and it's data for all crew members
+ # ---------------------------------------------------------------------------
+
+ def __getCrewMembers (self, connection = None):
+ """
+ Create a list of records and it's data for all crew members
+
+ @param connection: if given this connection instance will be used. It
+ defaults to 'self.connection'.
+
+ @return: tuple (recordlist, datalist), where recordlist is a sequence of
+ data.record instances, and datalist is a sequence of tuples
+ (name, country-code), one per record found.
+ """
+
+ if connection is None:
+ connection = self.connection
+
+ # The following content definition drives 'address_person' as master table
+ # and joins 'address_country' in a left outer join, where 'address_name' is
+ # the onyle prefetched field for both tables. This forces data.py to do
+ # refetching for the joined table (cause we're accessing 'address_code').
+ content = {u't0': (u'address_person', None, None, [u'address_name']),
+ u't1': (u'address_country', u't0', u'address_country',
+ [u'address_name'])}
+
+ sortOrder = [{'name': u't0.address_name'}]
+ resultSet = connection.query (content, None, sortOrder)
+ recList = []
+ recData = []
+
+ rec = resultSet.nextRecord ()
+ while rec:
+ country = rec.getField (u'address_country')
+ if country is not None:
+ jRec = connection.findRecord (u'address_country', country, [])
+ cCode = jRec.getField (u'address_code')
+ else:
+ cCode = None
+
+ recData.append ((rec.getField (u'address_name'), cCode))
+
+ recList.append (rec)
+ rec = resultSet.nextRecord ()
+
+ resultSet.close ()
+
+ return (recList, recData)
+
+
+ # ---------------------------------------------------------------------------
+ # Convert a resultSet into a flat sequence
+ # ---------------------------------------------------------------------------
+
+ def __rsToSeq (self, rs, fields, reorder = False):
+ """
+ Flatten a resultSet into a sequence of tuples holding the requested field
+ values.
+
+ @param rs: the resultset to iterate
+ @param fields: sequence of fieldnames to extract
+ @param reorder: if True, the resulting sequence will be sorted
+
+ @return: sequence of value-tuples, one per record. If only one field is
+ requested, the result is a sequence of field-values (without tuples)
+ """
+
+ result = []
+ append = len (fields) == 1 and result.extend or result.append
+
+ rec = rs.nextRecord ()
+ while rec:
+ append ([rec.getField (f) for f in fields])
+ rec = rs.nextRecord ()
+
+ rs.close ()
+
+ if reorder:
+ result.sort ()
+
+ return result
+
+
+# =============================================================================
+# Main program
+# =============================================================================
+
+if __name__ == '__main__':
+ suite1 = unittest.makeSuite (CachingTestCase)
+ suite2 = unittest.makeSuite (ConnectionTestCase)
+ alltests = unittest.TestSuite ((suite1, suite2))
+
+ unittest.TextTestRunner (verbosity = 2).run (alltests)
Property changes on: trunk/gnue-appserver/tests/data.py
___________________________________________________________________
Name: svn:keywords
+ Id
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnue] r7309 - in trunk/gnue-appserver: . tests,
johannes <=