#!/usr/bin/python3.5 # encoding: utf8 """Test zodb database class""" from pprint import pprint import unittest import transaction import ZODB.POSException import zcm FILE = r"..\var\unittest.zodb" class TestContext(unittest.TestCase): def test_sync(self): print("sync") db = zcm.database(FILE) cx = db.open() cx.root.one = 2 print(cx.root.one) with db.transaction() as cx2: _ = cx2.transaction_manager root = cx2.root() root['one'] = 3 transaction.abort() print(cx.root.one) cx.root.one = 4 print(cx.root.one) db.close() def test_transaction(self): print("transaction") db = zcm.database(FILE) cx = db.open() cx.root.one = 2 print(cx.root.one) with db.transaction() as cx2: _ = cx2.transaction_manager root = cx2.root() root['one'] = 3 print(cx.root.one) cx.root.one = 4 print(cx.root.one) cx3 = db.open() print(cx3.root.one) db.close() def test_conflict(self): db = zcm.database(FILE) try: cx = db.open() cx.root.one = 2 print(cx.root.one) with db.transaction() as cx2: _ = cx2.transaction_manager root = cx2.root() root['one'] = 3 print(cx.root.one) cx.root.one = 4 print(cx.root.one) with self.assertRaises(ZODB.POSException.ConflictError): transaction.commit() finally: db.close() db = zcm.database(FILE) cx = db.open() print(cx.root.one) db.close() def test_connection(self): db = zcm.database(FILE) txm = transaction.TransactionManager() cx3 = db.open(txm) print(cx3.root.another) cx3.root.another = "no tx" print(cx3.root.another) with txm: cx3.root.another = "object" print(cx3.root.another) db.close() def test_default_txm(self): print("default_txm") db = zcm.database(FILE) cx2, root = zcm.connection(db) cx3 = db.open() if cx2.transaction_manager is cx3.transaction_manager: print("cx3 identity") print(root.another) with cx2.transaction_manager: root.another = "more" print(cx3.root.another) with db.transaction() as cx4: cx4.root.another = "other" if cx4.transaction_manager == cx2.transaction_manager: print("cx4 same") else: print("cx4 differs") cx2.transaction_manager.abort() print(root.another) with cx3.transaction_manager: cx3.root.another = "object" print(root.another) print(cx3.root.another) db.close()