writing price publisher using coop/threading models

--HG--
branch : sandbox
This commit is contained in:
baloan
2011-03-12 00:08:17 +01:00
parent 78d904b04f
commit 2b71a95807
12 changed files with 622 additions and 11 deletions

View File

@@ -2,3 +2,6 @@ syntax: glob
.settings
.project
.pydevproject
syntax: regexp
^world$

View File

@@ -3,7 +3,7 @@
<pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.5</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/sandbox/src</path>

View File

@@ -1,9 +0,0 @@
@echo off
FOR %%f IN (src\*.py) DO CALL :conv %%f
goto :EOF
:conv
echo "Deploying %1 -> O:\User\baloan\tmp\%~n1"
copy %1 O:\User\baloan\tmp\%~n1 >nul:
c:\apps\tofrodos\fromdos O:\User\baloan\tmp\%~n1
goto :EOF

View File

@@ -0,0 +1,90 @@
# Copyright 2011 baloan
# See LICENSE for details.
''' empty module '''
import logging
from circuits import Component, Debugger, Event, handler, Timer
from circuits.net.sockets import TCPServer, TCPClient
from circuits.net.sockets import Close, Connect, Write, Read
from circuits.tools import inspect, graph
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname).4s %(message)s',
datefmt='%H:%M:%S')
# definitions
class Tick(Event):
def __init__(self, *args, **kwargs):
super(Tick, self).__init__(*args, **kwargs)
class AutoConnect(Component):
def __init__(self, host, port, *args, **kwargs):
super(AutoConnect, self).__init__(*args, **kwargs)
self.host = host
self.port = port
self.tcp = None
def started(self, component, mode):
LOG.info("Started %s %s", component, mode)
self.tcp = TCPClient(channel=self.channel)
self.tcp.register(self)
print inspect(system)
print graph(system)
def ready(self, e):
LOG.info("ready %s", e)
self.push(Connect(self.host, self.port), target=self.channel)
def connected(self, host, port):
LOG.info("Connected %s %d", host, port)
# implement any negotiation here
def error(self, data):
LOG.info("Error [%s]", data)
def disconnected(self):
LOG.info("Disconnected")
self.tcp.unregister(self)
self.tcp = None
self.stop()
def stopped(self):
Timer(1, Event(), target='start').register(self)
class Parser(Component):
def __init__(self, *args, **kwargs):
super(Parser, self).__init__(*args, **kwargs)
self.buffer = ""
@handler("read", "hmpf", target="biw")
def read_biw(self, data):
blcks = data.split("\x0c")
blcks[0].join(self.buffer)
for blk in blcks[:-1]:
flds = blk.split("\x00")
self.push(Tick(flds))
self.buffer = blcks[-1]
@handler("read", target="cos")
def read_cos(self, data):
blcks = data.split("\x0c")
blcks[0].join(self.buffer)
for blk in blcks[:-1]:
flds = blk.split("\x00")
self.push(Tick(flds))
self.buffer = blcks[-1]
def tick(self, flds):
LOG.info("Tick %s", "|".join(flds))
if __name__ == "__main__":
system = AutoConnect("xapgu17a-uat.de.db.com", 2013, channel="biw") + TCPServer(8001) + Parser() + Debugger()
system.run(sleep=0.5)

View File

@@ -0,0 +1,17 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" circuits.prc_simulator """
# imports
import logging
# constants
# globals
LOG = logging.getLogger()
# definitions

86
src/circuits/primes.py Normal file
View File

@@ -0,0 +1,86 @@
# Copyright 2010 baloan
# See LICENSE for details.
''' multiprocessing demo '''
# system imports
from circuits import Component, Event, Timer, handler, Debugger
from math import sqrt
import logging
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03d %(process)d:%(thread)d %(levelname).4s %(message)s',
datefmt='%H:%M:%S')
# definitions
class Print(Event):
'Print Event'
class Timeout(Event):
'Timeout'
class Input(Component):
def started(self, component, mode):
LOG.info('started %s %s', component, mode)
for n in range(100):
LOG.info('put %i', n)
self.push(Event(n))
LOG.info('worker done, ending')
Timer(5, Timeout('msg'), "timeout").register(self)
def timeout(self, e):
LOG.info("timeout %s", e)
self.stop()
def stopped(self, e):
pass
class Filter(Component):
def started(self, component, mode):
LOG.info('started %s %s', component, mode)
def event(self, n):
LOG.info('Filter:event %i', n)
if self.is_prime(n):
LOG.info('Filter: sending %i', n)
self.push(Print(n))
def is_prime(self, n):
if n < 2:
return False
if n in (2, 3):
return True
for i in range(2, int(sqrt(n)) + 1):
if n % i == 0:
return False
return True
class Output(Component):
def started(self, component, mode):
LOG.info('started %s %s', component, mode)
@handler('print')
def onPrint(self, n):
print '%i, ' % (n,),
def runInline():
master = Input()
master += Filter()
master += Output()
master += Debugger()
master.run()
LOG.info('done.')
if __name__ == '__main__':
runInline()

View File

@@ -0,0 +1,17 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.coop_gevent """
# imports
import logging
import datetime as dt
# constants
# globals
LOG = logging.getLogger()
# definitions

View File

@@ -0,0 +1,125 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.multitask
Price Publisher
"""
# imports
from argparse import ArgumentParser
import logging
import multitask
import os
import random
import sys
import socket
LOG = logging.getLogger()
# definitions
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description="Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from %(dest)s")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
print args
# create product dict
prds = { }
pubqs = []
for n in range(10):
key = "AB" + "{:04}".format(n)
prds["AB" + key] = Pricer(key)
# start one thread for price changes
multitask.add(controller(prds, pubqs))
address = ('localhost', 8010) # family is deduced to be 'AF_INET'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.listen(1)
multitask.add(listener(sock, pubqs))
multitask.run()
def listener(sock, pubqs):
while True:
cx, address = yield multitask.accept(sock)
LOG.info("accepting connection from %s", address)
inq = multitask.Queue()
pubqs.append(inq)
multitask.add(publisher(inq, cx))
multitask.add(receiver(cx))
def publisher(inq, cx):
LOG.info("Publisher running")
try:
while True:
# what happens if client does not pick up
item = yield inq.get()
yield cx.send(item)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
LOG.info("Publisher terminated")
def receiver(cx):
LOG.info("Receiver running")
try:
while True:
# what happens if client does not pick up
s = yield cx.recv(4096)
if not s:
break
LOG.info(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
LOG.info("Receiver terminated")
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
def controller(prds, pubqs):
while True:
LOG.info("Price update cycle")
Pricer.VOLA = update_vola(Pricer.VOLA)
for prd in prds.values():
prd.run()
for pubq in pubqs:
yield pubq.put((prd.name, prd.prc), timeout=5)
yield multitask.sleep(5)
class Pricer():
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
self.daemon = True
def run(self):
self.prc += random.choice((-1, +1)) * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)i %(message)s',
datefmt='%H:%M:%S')
main()

View File

@@ -0,0 +1,168 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.multi_thread
Price Publisher
"""
# imports
from Queue import Queue
from argparse import ArgumentParser
from threading import Thread, Event, Lock
import logging
import os
import random
import select
import sys
import time
# monkey patch to allow for interruptable listener
import multiprocessing.connection
multiprocessing.connection.Listener.fileno = lambda self: self._listener._socket.fileno()
LOG = logging.getLogger()
# definitions
class StoppableThread(Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop = Event()
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description="Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from %(dest)s")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
print args
# create product dict
threads = []
prds = { }
for n in range(10):
key = "AB" + "{:04}".format(n)
prds["AB" + key] = Pricer(key)
# start one thread for price changes
controller = Controller(prds)
threads.append(controller)
# main thread listens for connections
address = ('localhost', 8010) # family is deduced to be 'AF_INET'
listener = multiprocessing.connection.Listener(address)
# start threads
for thread in threads:
thread.start()
LOG.info('waiting for connection on %s', address)
try:
while True:
LOG.info("Waiting for accept...")
r, w, e = select.select((listener,), (), (), 5)
if listener in r:
cx = listener.accept()
LOG.info('connection accepted from %s', listener.last_accepted)
# start one thread per client
inq = Queue()
pub = Publisher(inq, cx)
with controller.pubqs_lock:
controller.pubqs.append(inq)
pub.daemon = True
pub.start()
threads.append(pub)
finally:
for thread in threads:
thread.stop()
for thread in threads:
thread.join()
LOG.info("done.")
class Publisher(StoppableThread):
THREAD_ID = 0
def __init__(self, inq, cx, *args, **kwargs):
super(Publisher, self).__init__(*args, **kwargs)
self.inq = inq
self.cx = cx
LOG.info("Publisher created")
self.daemon = True
Publisher.THREAD_ID += 1
self.name = "Pub-{}".format(Publisher.THREAD_ID)
def run(self):
LOG.info("Publisher running")
try:
while not self.stopped():
# what happens if client does not pick up
item = self.inq.get()
self.cx.send(item)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
self.cx.close()
LOG.info("Publisher terminated")
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
class Controller(StoppableThread):
THREAD_ID = 0
def __init__(self, prds, *args, **kwargs):
super(Controller, self).__init__(*args, **kwargs)
self.prds = prds
self.pubqs = []
self.pubqs_lock = Lock()
self.daemon = True
Controller.THREAD_ID += 1
self.name = "Control-{}".format(Controller.THREAD_ID)
def run(self):
while not self.stopped():
LOG.info("Price update cycle")
Pricer.VOLA = update_vola(Pricer.VOLA)
for prd in self.prds.values():
prd.run()
with self.pubqs_lock:
for pubq in self.pubqs:
pubq.put((prd.name, prd.prc))
time.sleep(5)
class Pricer():
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
self.daemon = True
def run(self):
self.prc += random.choice((-1, +1)) * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s',
datefmt='%H:%M:%S')
main()

View File

@@ -0,0 +1,59 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.single_thread
Price Publisher
"""
# imports
from argparse import ArgumentParser
import logging
import os
import random
import sys
LOG = logging.getLogger()
# definitions
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description = "Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from FILENAME")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
# start program
prc = Pricer("AB1234")
for n in range(100):
prc.run()
Pricer.VOLA = update_vola(Pricer.VOLA)
print "{:03}: {:.2f} {} {:.4f}".format(n, Pricer.VOLA, prc.name, prc.prc)
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
class Pricer():
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
def run(self):
self.prc += random.choice((-1, +1)) * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
main()

55
src/zmq/primes.py Normal file
View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" zmq.primes """
# system imports
import logging
import sys
import os
from optparse import OptionParser
import zmq
# globals
LOG = logging.getLogger()
# definitions
def main(argv = None):
if argv is None:
argv = sys.argv
script_name = os.path.basename(argv[0])
LOG.info("starting '%s %s'", script_name, " ".join(argv[1:]))
# parse options and arguments
usage = "usage: %prog [options] arg"
version = "%prog 1.0"
parser = OptionParser(usage, version)
parser.add_option("-f", "--file", dest="filename",
help="read configuration from FILENAME")
parser.add_option("-p", "--port", default="8001", type="int",
help="server port [default: %default]")
(opts, args) = parser.parse_args()
if len(args) > 0:
parser.error("too many arguments")
# call method with appropriate arguments
ctx = zmq.Context()
s = ctx.socket(zmq.REP)
s.bind('tcp://localhost:12345')
loop = zmq.eventloop.ioloop.IOLoop.instance()
stream = zmq.eventloop.zmqstream.ZMQStream(s, loop)
def echo(msg):
stream.send_multipart(msg)
stream.on_recv(echo)
loop.start()
return 0
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s',
datefmt='%H:%M:%S')
rc = main()
sys.exit(rc)

View File

@@ -24,7 +24,7 @@ class Account(Persistent):
# Configuration
storage = FileStorage(r"e:\temp\data.zodb")
storage = FileStorage(r"e:\workspaces\zodb3\data.zodb")
db = DB(storage)
connection = db.open()
root = connection.root()