| 86 | | |
|---|
| | 123 | # misc test utility classes & functions |
|---|
| | 124 | |
|---|
| | 125 | _currentcat = None |
|---|
| | 126 | |
|---|
| | 127 | class MemoryListHandler(logging.Handler): |
|---|
| | 128 | |
|---|
| | 129 | def __init__(self): |
|---|
| | 130 | logging.Handler.__init__(self, level=logging.DEBUG) |
|---|
| | 131 | self.log = [] |
|---|
| | 132 | |
|---|
| | 133 | def emit(self, record): |
|---|
| | 134 | print "Got record: %s" % record |
|---|
| | 135 | print "formatted as: %s" % self.format(record) |
|---|
| | 136 | self.log.append(self.format(record)) |
|---|
| | 137 | |
|---|
| | 138 | def print_log(self): |
|---|
| | 139 | print "\n".join(self.log) |
|---|
| | 140 | self.log = [] |
|---|
| | 141 | |
|---|
| | 142 | def get_log(self): |
|---|
| | 143 | log = self.log |
|---|
| | 144 | self.log = [] |
|---|
| | 145 | return log |
|---|
| | 146 | |
|---|
| | 147 | _memhandler = MemoryListHandler() |
|---|
| | 148 | |
|---|
| | 149 | def catch_validation_errors(widget, value): |
|---|
| | 150 | """Catch and unpack validation errors (for testing purposes).""" |
|---|
| | 151 | try: |
|---|
| | 152 | value = widget.validate(value) |
|---|
| | 153 | except validators.Invalid, errors: |
|---|
| | 154 | try: |
|---|
| | 155 | errors = errors.unpack_errors() |
|---|
| | 156 | except AttributeError: |
|---|
| | 157 | pass |
|---|
| | 158 | else: |
|---|
| | 159 | errors = {} |
|---|
| | 160 | return value, errors |
|---|
| | 161 | |
|---|
| | 162 | def capture_log(category): |
|---|
| | 163 | """Capture log for one category. |
|---|
| | 164 | |
|---|
| | 165 | The category can either be a single category (a string like 'foo.bar') |
|---|
| | 166 | or a list of them. You *must* call print_log() to reset when you're done. |
|---|
| | 167 | |
|---|
| | 168 | """ |
|---|
| | 169 | global _currentcat |
|---|
| | 170 | assert not _currentcat, "_currentcat not cleared. Use get_log to reset." |
|---|
| | 171 | if not isinstance(category, list) and not isinstance(category, tuple): |
|---|
| | 172 | category = [category] |
|---|
| | 173 | _currentcat = category |
|---|
| | 174 | for cat in category: |
|---|
| | 175 | log = logging.getLogger(cat) |
|---|
| | 176 | log.setLevel(logging.DEBUG) |
|---|
| | 177 | log.addHandler(_memhandler) |
|---|
| | 178 | |
|---|
| | 179 | def _reset_logging(): |
|---|
| | 180 | """Manage the resetting of the loggers.""" |
|---|
| | 181 | global _currentcat |
|---|
| | 182 | if not _currentcat: |
|---|
| | 183 | return |
|---|
| | 184 | for cat in _currentcat: |
|---|
| | 185 | log = logging.getLogger(cat) |
|---|
| | 186 | log.removeHandler(_memhandler) |
|---|
| | 187 | _currentcat = None |
|---|
| | 188 | |
|---|
| | 189 | def print_log(): |
|---|
| | 190 | """Print the log captured by capture_log to stdout. |
|---|
| | 191 | |
|---|
| | 192 | Resets that log and resets the temporarily added handlers. |
|---|
| | 193 | |
|---|
| | 194 | """ |
|---|
| | 195 | _reset_logging() |
|---|
| | 196 | _memhandler.print_log() |
|---|
| | 197 | |
|---|
| | 198 | def get_log(): |
|---|
| | 199 | """Return the list of log messages captured by capture_log. |
|---|
| | 200 | |
|---|
| | 201 | Resets that log and resets the temporarily added handlers. |
|---|
| | 202 | |
|---|
| | 203 | """ |
|---|
| | 204 | _reset_logging() |
|---|
| | 205 | return _memhandler.get_log() |
|---|
| | 206 | |
|---|
| | 207 | def sqlalchemy_cleanup(): |
|---|
| | 208 | database.metadata.clear() |
|---|
| | 209 | try: |
|---|
| | 210 | database.metadata.dispose() |
|---|
| | 211 | except AttributeError: # not threadlocal |
|---|
| | 212 | if database.metadata.bind: |
|---|
| | 213 | database.metadata.bind.dispose() |
|---|
| | 214 | database._engine = None |
|---|
| | 215 | sqlalchemy.orm.clear_mappers() |
|---|
| | 216 | |
|---|
| | 217 | |
|---|
| | 218 | # Base classes for unit test cases |
|---|
| | 219 | |
|---|
| | 220 | class TGTest(unittest.TestCase): |
|---|
| | 221 | """A WebTest enabled unit testing class. |
|---|
| | 222 | |
|---|
| | 223 | To use, subclass & set root to your controller object, or set app to a |
|---|
| | 224 | webtest.TestApp instance. |
|---|
| | 225 | |
|---|
| | 226 | In your tests, use self.app to make WebTest calls. |
|---|
| | 227 | |
|---|
| | 228 | """ |
|---|
| | 229 | root = None |
|---|
| | 230 | app = None |
|---|
| | 231 | stop_tg_only = False |
|---|
| | 232 | |
|---|
| | 233 | def setUp(self): |
|---|
| | 234 | """Set up the WebTest by starting the server. |
|---|
| | 235 | |
|---|
| | 236 | You should override this and make sure you have properly |
|---|
| | 237 | mounted a root for your server before calling super, |
|---|
| | 238 | or simply pass a root controller to super. |
|---|
| | 239 | Otherwise the Cherrypy filters for TurboGears will not be used. |
|---|
| | 240 | |
|---|
| | 241 | """ |
|---|
| | 242 | assert self.root or self.app, "Either self.root or self.app must be set" |
|---|
| | 243 | if not self.app: |
|---|
| | 244 | self.app = make_app(self.root) |
|---|
| | 245 | start_server() |
|---|
| | 246 | |
|---|
| | 247 | def tearDown(self): |
|---|
| | 248 | """Tear down the WebTest by stopping the server.""" |
|---|
| | 249 | stop_server(tg_only = self.stop_tg_only) |
|---|
| | 250 | |
|---|
| | 251 | def login_user(self, user): |
|---|
| | 252 | """Log a specified user object into the system.""" |
|---|
| | 253 | self.app.post(config.get('identity.failure_url'), dict( |
|---|
| | 254 | user_name=user.user_name, password=user.password, login='Login')) |
|---|
| | 255 | |
|---|
| | 256 | |
|---|
| | 257 | class BrowsingSession(object): |
|---|
| | 258 | |
|---|
| | 259 | def __init__(self): |
|---|
| | 260 | self.visit = None |
|---|
| | 261 | self.response, self.status = None, None |
|---|
| | 262 | self.cookie = Cookie.SimpleCookie() |
|---|
| | 263 | self.app = make_app() |
|---|
| | 264 | |
|---|
| | 265 | def goto(self, *args, **kwargs): |
|---|
| | 266 | if self.cookie: |
|---|
| | 267 | headers = kwargs.setdefault('headers', {}) |
|---|
| | 268 | headers['Cookie'] = self.cookie_encoded |
|---|
| | 269 | response = self.app.get(*args, **kwargs) |
|---|
| | 270 | |
|---|
| | 271 | # If we were given an encoding in the content type we should use it to |
|---|
| | 272 | # decode the response: |
|---|
| | 273 | ctype_parts = response.headers['Content-Type'].split(';') |
|---|
| | 274 | for parameter in ctype_parts[1:]: |
|---|
| | 275 | attribute, value = parameter.strip().split('=') |
|---|
| | 276 | try: |
|---|
| | 277 | self.unicode_response = response.body.decode(value) |
|---|
| | 278 | break |
|---|
| | 279 | except: |
|---|
| | 280 | # If the named encoding doesn't work then it doesn't work. We |
|---|
| | 281 | # just won't create the unicode_response field. |
|---|
| | 282 | pass |
|---|
| | 283 | |
|---|
| | 284 | self.response = response.body |
|---|
| | 285 | self.full_response = response |
|---|
| | 286 | self.status = response.status |
|---|
| | 287 | self.cookie = response.cookies_set |
|---|
| | 288 | self.cookie_encoded = response.headers.get('Set-Cookie', '') |
|---|
| | 289 | |
|---|
| | 290 | |
|---|
| | 291 | class DummySession: |
|---|
| | 292 | """A very simple dummy session.""" |
|---|
| | 293 | |
|---|
| | 294 | session_storage = dict |
|---|
| | 295 | to_be_loaded = None |
|---|
| | 296 | |
|---|
| | 297 | |
|---|
| | 298 | class DummyRequest: |
|---|
| | 299 | """A very simple dummy request.""" |
|---|
| | 300 | |
|---|
| | 301 | remote_host = "127.0.0.1" |
|---|
| | 302 | |
|---|
| | 303 | def __init__(self, method='GET', path='/', headers=None): |
|---|
| | 304 | self.headers = headers or {} |
|---|
| | 305 | self.method = method |
|---|
| | 306 | self.path = path |
|---|
| | 307 | self.base = '' |
|---|
| | 308 | self._session = DummySession() |
|---|
| | 309 | self.tg_template_enginename = config.get('tg.defaultview', 'kid') |
|---|
| | 310 | |
|---|
| | 311 | def purge__(self): |
|---|
| | 312 | pass |
|---|
| | 313 | |
|---|
| | 314 | |
|---|
| | 315 | class DummyResponse: |
|---|
| | 316 | """A very simple dummy response.""" |
|---|
| | 317 | |
|---|
| | 318 | headers = {} |
|---|
| | 319 | |
|---|
| | 320 | |
|---|
| | 321 | class AbstractDBTest(unittest.TestCase): |
|---|
| | 322 | """A database enabled unit testing class. |
|---|
| | 323 | |
|---|
| | 324 | Creates and destroys your database before and after each unit test. |
|---|
| | 325 | You must set the model attribute in order for this class to |
|---|
| | 326 | function correctly. |
|---|
| | 327 | |
|---|
| | 328 | """ |
|---|
| | 329 | model = None |
|---|
| | 330 | |
|---|
| | 331 | def setUp(self): |
|---|
| | 332 | raise NotImplementedError() |
|---|
| | 333 | |
|---|
| | 334 | def tearDown(self): |
|---|
| | 335 | raise NotImplementedError() |
|---|
| | 336 | |
|---|
| | 337 | |
|---|
| | 338 | class DBTestSO(AbstractDBTest): |
|---|
| | 339 | |
|---|
| | 340 | def _get_soClasses(self): |
|---|
| | 341 | try: |
|---|
| | 342 | return [self.model.__dict__[x] for x in self.model.soClasses] |
|---|
| | 343 | except AttributeError: |
|---|
| | 344 | return self.model.__dict__.values() |
|---|
| | 345 | |
|---|
| | 346 | def setUp(self): |
|---|
| | 347 | if not self.model: |
|---|
| | 348 | self.model = get_model() |
|---|
| | 349 | if not self.model: |
|---|
| | 350 | raise Exception("Unable to run database tests without a model") |
|---|
| | 351 | |
|---|
| | 352 | # list of constraints we will collect |
|---|
| | 353 | constraints = list() |
|---|
| | 354 | |
|---|
| | 355 | for item in self._get_soClasses(): |
|---|
| | 356 | if isinstance(item, types.TypeType) and issubclass(item, |
|---|
| | 357 | sqlobject.SQLObject) and item != sqlobject.SQLObject \ |
|---|
| | 358 | and item != InheritableSQLObject: |
|---|
| | 359 | # create table without applying constraints, collect |
|---|
| | 360 | # all the constaints for later creation. |
|---|
| | 361 | # see http://sqlobject.org/FAQ.html#mutually-referencing-tables |
|---|
| | 362 | # for more info |
|---|
| | 363 | collected_constraints = item.createTable(ifNotExists=True, |
|---|
| | 364 | applyConstraints=False) |
|---|
| | 365 | |
|---|
| | 366 | if collected_constraints: |
|---|
| | 367 | constraints.extend(collected_constraints) |
|---|
| | 368 | |
|---|
| | 369 | # now that all tables are created, add the constaints we collected |
|---|
| | 370 | for postponed_constraint in constraints: |
|---|
| | 371 | # item is the last processed item and we borrow its connection |
|---|
| | 372 | item._connection.query(postponed_constraint) |
|---|
| | 373 | |
|---|
| | 374 | def tearDown(self): |
|---|
| | 375 | database.rollback_all() |
|---|
| | 376 | for item in reversed(self._get_soClasses()): |
|---|
| | 377 | if isinstance(item, types.TypeType) and issubclass(item, |
|---|
| | 378 | sqlobject.SQLObject) and item != sqlobject.SQLObject \ |
|---|
| | 379 | and item != InheritableSQLObject: |
|---|
| | 380 | item.dropTable(ifExists=True, cascade=True) |
|---|
| | 381 | |
|---|
| | 382 | |
|---|
| | 383 | class DBTestSA(AbstractDBTest): |
|---|
| | 384 | |
|---|
| | 385 | def setUp(self): |
|---|
| | 386 | database.get_engine() |
|---|
| | 387 | database.metadata.create_all() |
|---|
| | 388 | |
|---|
| | 389 | def tearDown(self): |
|---|
| | 390 | database.metadata.drop_all() |
|---|
| | 391 | |
|---|
| | 392 | |
|---|
| | 393 | # Determine which class to use for "DBTest". Setup & teardown should behave |
|---|
| | 394 | # simularly regardless of which ORM you choose. |
|---|
| | 395 | if config.get("sqlobject.dburi"): |
|---|
| | 396 | DBTest = DBTestSO |
|---|
| | 397 | elif config.get("sqlalchemy.dburi"): |
|---|
| | 398 | DBTest = DBTestSA |
|---|
| | 399 | else: |
|---|
| | 400 | raise Exception("Unable to find sqlalchemy or sqlobject dburi") |
|---|
| | 401 | |
|---|
| | 402 | |
|---|
| | 403 | # deprecated functions kept for backward compability |
|---|
| | 404 | |
|---|
| | 405 | start_cp = deprecated('start_cp is superceded by start_server')(start_server) |
|---|
| | 406 | |
|---|
| | 407 | reset_cp = deprecated('reset_cp has been superceded by unmount.')(unmount) |
|---|
| 129 | | def make_wsgiapp(): |
|---|
| 130 | | """Return a WSGI application from cherrypy's root object.""" |
|---|
| 131 | | wsgiapp = cherrypy._cpwsgi.wsgiApp |
|---|
| 132 | | return wsgiapp |
|---|
| 133 | | |
|---|
| 134 | | |
|---|
| 135 | | def make_app(controller=None): |
|---|
| 136 | | """Return a WebTest.TestApp instance from Cherrypy. |
|---|
| 137 | | |
|---|
| 138 | | If a Controller object is provided, it will be mounted at the root level. |
|---|
| 139 | | If not, it'll look for an already mounted root. |
|---|
| 140 | | |
|---|
| 141 | | """ |
|---|
| 142 | | if controller: |
|---|
| 143 | | wsgiapp = mount(controller(), '/') |
|---|
| 144 | | else: |
|---|
| 145 | | wsgiapp = make_wsgiapp() |
|---|
| 146 | | return TestApp(wsgiapp) |
|---|
| 147 | | |
|---|
| 148 | | |
|---|
| 149 | | class TGTest(unittest.TestCase): |
|---|
| 150 | | """A WebTest enabled unit testing class. |
|---|
| 151 | | |
|---|
| 152 | | To use, subclass & set root to your controller object, or set app to a |
|---|
| 153 | | webtest.TestApp instance. |
|---|
| 154 | | |
|---|
| 155 | | In your tests, use self.app to make WebTest calls. |
|---|
| 156 | | """ |
|---|
| 157 | | |
|---|
| 158 | | root = None |
|---|
| 159 | | app = None |
|---|
| 160 | | stop_tg_only = False |
|---|
| 161 | | |
|---|
| 162 | | def setUp(self): |
|---|
| 163 | | """Set up the WebTest by starting the server. |
|---|
| 164 | | |
|---|
| 165 | | You should override this and make sure you have properly |
|---|
| 166 | | mounted a root for your server before calling super, |
|---|
| 167 | | or simply pass a root controller to super. |
|---|
| 168 | | Otherwise the Cherrypy filters for TurboGears will not be used. |
|---|
| 169 | | """ |
|---|
| 170 | | assert self.root or self.app, "Either self.root or self.app must be set" |
|---|
| 171 | | if not self.app: |
|---|
| 172 | | self.app = make_app(self.root) |
|---|
| 173 | | start_server() |
|---|
| 174 | | |
|---|
| 175 | | def tearDown(self): |
|---|
| 176 | | """Tear down the WebTest by stopping the server.""" |
|---|
| 177 | | stop_server(tg_only = self.stop_tg_only) |
|---|
| 178 | | |
|---|
| 179 | | def login_user(self, user): |
|---|
| 180 | | """Log a specified user object into the system.""" |
|---|
| 181 | | self.app.post(config.get('identity.failure_url'), dict( |
|---|
| 182 | | user_name=user.user_name, password=user.password, login='Login')) |
|---|
| 183 | | |
|---|
| 184 | | |
|---|
| 185 | | class BrowsingSession(object): |
|---|
| 186 | | |
|---|
| 187 | | def __init__(self): |
|---|
| 188 | | self.visit = None |
|---|
| 189 | | self.response, self.status = None, None |
|---|
| 190 | | self.cookie = Cookie.SimpleCookie() |
|---|
| 191 | | self.app = make_app() |
|---|
| 192 | | |
|---|
| 193 | | def goto(self, *args, **kwargs): |
|---|
| 194 | | if self.cookie: |
|---|
| 195 | | headers = kwargs.setdefault('headers', {}) |
|---|
| 196 | | headers['Cookie'] = self.cookie_encoded |
|---|
| 197 | | response = self.app.get(*args, **kwargs) |
|---|
| 198 | | |
|---|
| 199 | | # If we were given an encoding in the content type we should use it to |
|---|
| 200 | | # decode the response: |
|---|
| 201 | | ctype_parts = response.headers['Content-Type'].split(';') |
|---|
| 202 | | for parameter in ctype_parts[1:]: |
|---|
| 203 | | attribute, value = parameter.strip().split('=') |
|---|
| 204 | | try: |
|---|
| 205 | | self.unicode_response = response.body.decode(value) |
|---|
| 206 | | break |
|---|
| 207 | | except: |
|---|
| 208 | | # If the named encoding doesn't work then it doesn't work. We |
|---|
| 209 | | # just won't create the unicode_response field. |
|---|
| 210 | | pass |
|---|
| 211 | | |
|---|
| 212 | | self.response = response.body |
|---|
| 213 | | self.full_response = response |
|---|
| 214 | | self.status = response.status |
|---|
| 215 | | self.cookie = response.cookies_set |
|---|
| 216 | | self.cookie_encoded = response.headers.get('Set-Cookie', '') |
|---|
| 217 | | |
|---|
| 218 | | |
|---|
| 279 | | |
|---|
| 280 | | class AbstractDBTest(unittest.TestCase): |
|---|
| 281 | | """A database enabled unit testing class. |
|---|
| 282 | | |
|---|
| 283 | | Creates and destroys your database before and after each unit test. |
|---|
| 284 | | You must set the model attribute in order for this class to |
|---|
| 285 | | function correctly. |
|---|
| 286 | | """ |
|---|
| 287 | | model = None |
|---|
| 288 | | |
|---|
| 289 | | def setUp(self): |
|---|
| 290 | | raise NotImplementedError() |
|---|
| 291 | | |
|---|
| 292 | | def tearDown(self): |
|---|
| 293 | | raise NotImplementedError() |
|---|
| 294 | | |
|---|
| 295 | | class DBTestSO(AbstractDBTest): |
|---|
| 296 | | def _get_soClasses(self): |
|---|
| 297 | | try: |
|---|
| 298 | | return [self.model.__dict__[x] for x in self.model.soClasses] |
|---|
| 299 | | except AttributeError: |
|---|
| 300 | | return self.model.__dict__.values() |
|---|
| 301 | | |
|---|
| 302 | | def setUp(self): |
|---|
| 303 | | if not self.model: |
|---|
| 304 | | self.model = get_model() |
|---|
| 305 | | if not self.model: |
|---|
| 306 | | raise Exception("Unable to run database tests without a model") |
|---|
| 307 | | |
|---|
| 308 | | # list of constraints we will collect |
|---|
| 309 | | constraints = list() |
|---|
| 310 | | |
|---|
| 311 | | for item in self._get_soClasses(): |
|---|
| 312 | | if isinstance(item, types.TypeType) and issubclass(item, |
|---|
| 313 | | sqlobject.SQLObject) and item != sqlobject.SQLObject \ |
|---|
| 314 | | and item != InheritableSQLObject: |
|---|
| 315 | | # create table without applying constraints, collect |
|---|
| 316 | | # all the constaints for later creation. |
|---|
| 317 | | # see http://sqlobject.org/FAQ.html#mutually-referencing-tables |
|---|
| 318 | | # for more info |
|---|
| 319 | | collected_constraints = item.createTable(ifNotExists=True, |
|---|
| 320 | | applyConstraints=False) |
|---|
| 321 | | |
|---|
| 322 | | if collected_constraints: |
|---|
| 323 | | constraints.extend(collected_constraints) |
|---|
| 324 | | |
|---|
| 325 | | # now that all tables are created, add the constaints we collected |
|---|
| 326 | | for postponed_constraint in constraints: |
|---|
| 327 | | # item is the last processed item and we borrow its connection |
|---|
| 328 | | item._connection.query(postponed_constraint) |
|---|
| 329 | | |
|---|
| 330 | | |
|---|
| 331 | | def tearDown(self): |
|---|
| 332 | | database.rollback_all() |
|---|
| 333 | | for item in reversed(self._get_soClasses()): |
|---|
| 334 | | if isinstance(item, types.TypeType) and issubclass(item, |
|---|
| 335 | | sqlobject.SQLObject) and item != sqlobject.SQLObject \ |
|---|
| 336 | | and item != InheritableSQLObject: |
|---|
| 337 | | item.dropTable(ifExists=True, cascade=True) |
|---|
| 338 | | |
|---|
| 339 | | class DBTestSA(AbstractDBTest): |
|---|
| 340 | | def setUp(self): |
|---|
| 341 | | database.get_engine() |
|---|
| 342 | | database.metadata.create_all() |
|---|
| 343 | | |
|---|
| 344 | | def tearDown(self): |
|---|
| 345 | | database.metadata.drop_all() |
|---|
| 346 | | |
|---|
| 347 | | |
|---|
| 348 | | #Determine which class to use for "DBTest". Setup & teardown should behave |
|---|
| 349 | | #simularly regardless of which ORM you choose. |
|---|
| 350 | | if config.get("sqlobject.dburi"): |
|---|
| 351 | | DBTest = DBTestSO |
|---|
| 352 | | elif config.get("sqlalchemy.dburi"): |
|---|
| 353 | | DBTest = DBTestSA |
|---|
| 354 | | else: |
|---|
| 355 | | raise Exception("Unable to find sqlalchemy or sqlobject dburi") |
|---|
| 356 | | |
|---|
| 357 | | |
|---|
| 358 | | def unmount(): |
|---|
| 359 | | """Remove an application from the object traversal tree.""" |
|---|
| 360 | | # There's no clean way to remove a subtree under CP2, so the only use case |
|---|
| 361 | | # handled here is to remove the entire application. |
|---|
| 362 | | # Supposedly, you can do a partial unmount with CP3 using: |
|---|
| 363 | | # del cherrypy.tree.apps[path] |
|---|
| 364 | | cherrypy.root = None |
|---|
| 365 | | cherrypy.tree.mount_points = {} |
|---|
| 366 | | |
|---|
| 367 | | reset_cp = deprecated('reset_cp has been superceded by unmount.')(unmount) |
|---|
| 368 | | |
|---|
| 369 | | |
|---|
| 370 | | def mount(controller, path="/"): |
|---|
| 371 | | """Mount a controller at a path. Returns a wsgi application.""" |
|---|
| 372 | | if path == '/': |
|---|
| 373 | | cherrypy.root = controller |
|---|
| 374 | | else: |
|---|
| 375 | | cherrypy.tree.mount(controller, path) |
|---|
| 376 | | return make_wsgiapp() |
|---|
| 377 | | |
|---|
| 378 | | def catch_validation_errors(widget, value): |
|---|
| 379 | | """Catch and unpack validation errors (for testing purposes).""" |
|---|
| 380 | | try: |
|---|
| 381 | | value = widget.validate(value) |
|---|
| 382 | | except validators.Invalid, errors: |
|---|
| 383 | | try: |
|---|
| 384 | | errors = errors.unpack_errors() |
|---|
| 385 | | except AttributeError: |
|---|
| 386 | | pass |
|---|
| 387 | | else: |
|---|
| 388 | | errors = {} |
|---|
| 389 | | return value, errors |
|---|
| 390 | | |
|---|
| 391 | | |
|---|
| 392 | | class MemoryListHandler(logging.Handler): |
|---|
| 393 | | |
|---|
| 394 | | def __init__(self): |
|---|
| 395 | | logging.Handler.__init__(self, level=logging.DEBUG) |
|---|
| 396 | | self.log = [] |
|---|
| 397 | | |
|---|
| 398 | | def emit(self, record): |
|---|
| 399 | | print "Got record: %s" % record |
|---|
| 400 | | print "formatted as: %s" % self.format(record) |
|---|
| 401 | | self.log.append(self.format(record)) |
|---|
| 402 | | |
|---|
| 403 | | def print_log(self): |
|---|
| 404 | | print "\n".join(self.log) |
|---|
| 405 | | self.log = [] |
|---|
| 406 | | |
|---|
| 407 | | def get_log(self): |
|---|
| 408 | | log = self.log |
|---|
| 409 | | self.log = [] |
|---|
| 410 | | return log |
|---|
| 411 | | |
|---|
| 412 | | _memhandler = MemoryListHandler() |
|---|
| 413 | | |
|---|
| 414 | | |
|---|
| 415 | | _currentcat = None |
|---|
| 416 | | |
|---|
| 417 | | |
|---|
| 418 | | def capture_log(category): |
|---|
| 419 | | """Capture log for one category. |
|---|
| 420 | | |
|---|
| 421 | | The category can either be a single category (a string like 'foo.bar') |
|---|
| 422 | | or a list of them. You *must* call print_log() to reset when you're done. |
|---|
| 423 | | |
|---|
| 424 | | """ |
|---|
| 425 | | global _currentcat |
|---|
| 426 | | assert not _currentcat, "_currentcat not cleared. Use get_log to reset." |
|---|
| 427 | | if not isinstance(category, list) and not isinstance(category, tuple): |
|---|
| 428 | | category = [category] |
|---|
| 429 | | _currentcat = category |
|---|
| 430 | | for cat in category: |
|---|
| 431 | | log = logging.getLogger(cat) |
|---|
| 432 | | log.setLevel(logging.DEBUG) |
|---|
| 433 | | log.addHandler(_memhandler) |
|---|
| 434 | | |
|---|
| 435 | | |
|---|
| 436 | | def _reset_logging(): |
|---|
| 437 | | """Manage the resetting of the loggers.""" |
|---|
| 438 | | global _currentcat |
|---|
| 439 | | if not _currentcat: |
|---|
| 440 | | return |
|---|
| 441 | | for cat in _currentcat: |
|---|
| 442 | | log = logging.getLogger(cat) |
|---|
| 443 | | log.removeHandler(_memhandler) |
|---|
| 444 | | _currentcat = None |
|---|
| 445 | | |
|---|
| 446 | | |
|---|
| 447 | | def print_log(): |
|---|
| 448 | | """Print the log captured by capture_log to stdout. |
|---|
| 449 | | |
|---|
| 450 | | Resets that log and resets the temporarily added handlers. |
|---|
| 451 | | |
|---|
| 452 | | """ |
|---|
| 453 | | _reset_logging() |
|---|
| 454 | | _memhandler.print_log() |
|---|
| 455 | | |
|---|
| 456 | | |
|---|
| 457 | | def get_log(): |
|---|
| 458 | | """Return the list of log messages captured by capture_log. |
|---|
| 459 | | |
|---|
| 460 | | Resets that log and resets the temporarily added handlers. |
|---|
| 461 | | |
|---|
| 462 | | """ |
|---|
| 463 | | _reset_logging() |
|---|
| 464 | | return _memhandler.get_log() |
|---|
| 465 | | |
|---|
| 466 | | |
|---|
| 467 | | def sqlalchemy_cleanup(): |
|---|
| 468 | | database.metadata.clear() |
|---|
| 469 | | try: |
|---|
| 470 | | database.metadata.dispose() |
|---|
| 471 | | except AttributeError: # not threadlocal |
|---|
| 472 | | if database.metadata.bind: |
|---|
| 473 | | database.metadata.bind.dispose() |
|---|
| 474 | | database._engine = None |
|---|
| 475 | | sqlalchemy.orm.clear_mappers() |
|---|
| 476 | | |
|---|
| 477 | | |
|---|
| 478 | | __all__ = ["call", "create_request", "DBTest", "TGTest", |
|---|
| 479 | | "attach_identity", "set_identity_user", |
|---|
| 480 | | "capture_log", "print_log", "get_log", "sqlalchemy_cleanup", |
|---|
| 481 | | "make_wsgiapp", "make_app", "start_server", |
|---|
| 482 | | "stop_server", "mount", "unmount"] |
|---|
| | 480 | # Public API. We don't expose deprecated functions |
|---|
| | 481 | __all__ = [ |
|---|
| | 482 | "BrowsingSession", |
|---|
| | 483 | "DBTest", |
|---|
| | 484 | "DBTestSA", |
|---|
| | 485 | "DBTestSO", |
|---|
| | 486 | "DummyRequest" |
|---|
| | 487 | "DumyResponse", |
|---|
| | 488 | "DummySession", |
|---|
| | 489 | "TGTest", |
|---|
| | 490 | "capture_log", |
|---|
| | 491 | "make_app", |
|---|
| | 492 | "make_wsgiapp", |
|---|
| | 493 | "mount", |
|---|
| | 494 | "print_log", |
|---|
| | 495 | "get_log", |
|---|
| | 496 | "sqlalchemy_cleanup", |
|---|
| | 497 | "start_server", |
|---|
| | 498 | "stop_server", |
|---|
| | 499 | "unmount" |
|---|
| | 500 | ] |
|---|