110 lines
2.9 KiB
Python
110 lines
2.9 KiB
Python
#!/usr/bin/python3.5
|
|
# encoding: utf8
|
|
|
|
"""Test zodb database class"""
|
|
|
|
from pprint import pprint
|
|
import unittest
|
|
import transaction
|
|
import ZODB.POSException
|
|
|
|
import zcm
|
|
|
|
|
|
FILE = r"..\var\unittest.zodb"
|
|
|
|
|
|
class TestContext(unittest.TestCase):
|
|
|
|
def test_sync(self):
|
|
print("sync")
|
|
db = zcm.database(FILE)
|
|
cx = db.open()
|
|
cx.root.one = 2
|
|
print(cx.root.one)
|
|
with db.transaction() as cx2:
|
|
_ = cx2.transaction_manager
|
|
root = cx2.root()
|
|
root['one'] = 3
|
|
transaction.abort()
|
|
print(cx.root.one)
|
|
cx.root.one = 4
|
|
print(cx.root.one)
|
|
db.close()
|
|
|
|
def test_transaction(self):
|
|
print("transaction")
|
|
db = zcm.database(FILE)
|
|
cx = db.open()
|
|
cx.root.one = 2
|
|
print(cx.root.one)
|
|
with db.transaction() as cx2:
|
|
_ = cx2.transaction_manager
|
|
root = cx2.root()
|
|
root['one'] = 3
|
|
print(cx.root.one)
|
|
cx.root.one = 4
|
|
print(cx.root.one)
|
|
cx3 = db.open()
|
|
print(cx3.root.one)
|
|
db.close()
|
|
|
|
def test_conflict(self):
|
|
db = zcm.database(FILE)
|
|
try:
|
|
cx = db.open()
|
|
cx.root.one = 2
|
|
print(cx.root.one)
|
|
with db.transaction() as cx2:
|
|
_ = cx2.transaction_manager
|
|
root = cx2.root()
|
|
root['one'] = 3
|
|
print(cx.root.one)
|
|
cx.root.one = 4
|
|
print(cx.root.one)
|
|
with self.assertRaises(ZODB.POSException.ConflictError):
|
|
transaction.commit()
|
|
finally:
|
|
db.close()
|
|
db = zcm.database(FILE)
|
|
cx = db.open()
|
|
print(cx.root.one)
|
|
db.close()
|
|
|
|
def test_connection(self):
|
|
db = zcm.database(FILE)
|
|
txm = transaction.TransactionManager()
|
|
cx3 = db.open(txm)
|
|
print(cx3.root.another)
|
|
cx3.root.another = "no tx"
|
|
print(cx3.root.another)
|
|
with txm:
|
|
cx3.root.another = "object"
|
|
print(cx3.root.another)
|
|
db.close()
|
|
|
|
def test_default_txm(self):
|
|
print("default_txm")
|
|
db = zcm.database(FILE)
|
|
cx2, root = zcm.connection(db)
|
|
cx3 = db.open()
|
|
if cx2.transaction_manager is cx3.transaction_manager:
|
|
print("cx3 identity")
|
|
print(root.another)
|
|
with cx2.transaction_manager:
|
|
root.another = "more"
|
|
print(cx3.root.another)
|
|
with db.transaction() as cx4:
|
|
cx4.root.another = "other"
|
|
if cx4.transaction_manager == cx2.transaction_manager:
|
|
print("cx4 same")
|
|
else:
|
|
print("cx4 differs")
|
|
cx2.transaction_manager.abort()
|
|
print(root.another)
|
|
with cx3.transaction_manager:
|
|
cx3.root.another = "object"
|
|
print(root.another)
|
|
print(cx3.root.another)
|
|
db.close()
|