initial commit
This commit is contained in:
109
test/test_zcm.py
Normal file
109
test/test_zcm.py
Normal file
@@ -0,0 +1,109 @@
|
||||
#!/usr/bin/python3.5
|
||||
# encoding: utf8
|
||||
|
||||
"""Test zodb database class"""
|
||||
|
||||
from pprint import pprint
|
||||
import unittest
|
||||
import transaction
|
||||
import ZODB.POSException
|
||||
|
||||
import zcm
|
||||
|
||||
|
||||
FILE = r"..\var\unittest.zodb"
|
||||
|
||||
|
||||
class TestContext(unittest.TestCase):
|
||||
|
||||
def test_sync(self):
|
||||
print("sync")
|
||||
db = zcm.database(FILE)
|
||||
cx = db.open()
|
||||
cx.root.one = 2
|
||||
print(cx.root.one)
|
||||
with db.transaction() as cx2:
|
||||
_ = cx2.transaction_manager
|
||||
root = cx2.root()
|
||||
root['one'] = 3
|
||||
transaction.abort()
|
||||
print(cx.root.one)
|
||||
cx.root.one = 4
|
||||
print(cx.root.one)
|
||||
db.close()
|
||||
|
||||
def test_transaction(self):
|
||||
print("transaction")
|
||||
db = zcm.database(FILE)
|
||||
cx = db.open()
|
||||
cx.root.one = 2
|
||||
print(cx.root.one)
|
||||
with db.transaction() as cx2:
|
||||
_ = cx2.transaction_manager
|
||||
root = cx2.root()
|
||||
root['one'] = 3
|
||||
print(cx.root.one)
|
||||
cx.root.one = 4
|
||||
print(cx.root.one)
|
||||
cx3 = db.open()
|
||||
print(cx3.root.one)
|
||||
db.close()
|
||||
|
||||
def test_conflict(self):
|
||||
db = zcm.database(FILE)
|
||||
try:
|
||||
cx = db.open()
|
||||
cx.root.one = 2
|
||||
print(cx.root.one)
|
||||
with db.transaction() as cx2:
|
||||
_ = cx2.transaction_manager
|
||||
root = cx2.root()
|
||||
root['one'] = 3
|
||||
print(cx.root.one)
|
||||
cx.root.one = 4
|
||||
print(cx.root.one)
|
||||
with self.assertRaises(ZODB.POSException.ConflictError):
|
||||
transaction.commit()
|
||||
finally:
|
||||
db.close()
|
||||
db = zcm.database(FILE)
|
||||
cx = db.open()
|
||||
print(cx.root.one)
|
||||
db.close()
|
||||
|
||||
def test_connection(self):
|
||||
db = zcm.database(FILE)
|
||||
txm = transaction.TransactionManager()
|
||||
cx3 = db.open(txm)
|
||||
print(cx3.root.another)
|
||||
cx3.root.another = "no tx"
|
||||
print(cx3.root.another)
|
||||
with txm:
|
||||
cx3.root.another = "object"
|
||||
print(cx3.root.another)
|
||||
db.close()
|
||||
|
||||
def test_default_txm(self):
|
||||
print("default_txm")
|
||||
db = zcm.database(FILE)
|
||||
cx2, root = zcm.connection(db)
|
||||
cx3 = db.open()
|
||||
if cx2.transaction_manager is cx3.transaction_manager:
|
||||
print("cx3 identity")
|
||||
print(root.another)
|
||||
with cx2.transaction_manager:
|
||||
root.another = "more"
|
||||
print(cx3.root.another)
|
||||
with db.transaction() as cx4:
|
||||
cx4.root.another = "other"
|
||||
if cx4.transaction_manager == cx2.transaction_manager:
|
||||
print("cx4 same")
|
||||
else:
|
||||
print("cx4 differs")
|
||||
cx2.transaction_manager.abort()
|
||||
print(root.another)
|
||||
with cx3.transaction_manager:
|
||||
cx3.root.another = "object"
|
||||
print(root.another)
|
||||
print(cx3.root.another)
|
||||
db.close()
|
||||
Reference in New Issue
Block a user