From 2b71a95807f4beef7052dc0caf9cb74fdcece795 Mon Sep 17 00:00:00 2001 From: baloan Date: Sat, 12 Mar 2011 00:08:17 +0100 Subject: [PATCH] writing price publisher using coop/threading models --HG-- branch : sandbox --- .hgignore | 3 + .pydevproject | 2 +- deploy.cmd | 9 -- src/circuits/auto_connect.py | 90 ++++++++++++++++ src/circuits/prc_simulator.py | 17 +++ src/circuits/primes.py | 86 +++++++++++++++ src/prc_publish/coop_gevent.py | 17 +++ src/prc_publish/coop_multitask.py | 125 ++++++++++++++++++++++ src/prc_publish/multi_thread.py | 168 ++++++++++++++++++++++++++++++ src/prc_publish/single_thread.py | 59 +++++++++++ src/zmq/primes.py | 55 ++++++++++ src/{ => zodb}/tutorial.py | 2 +- 12 files changed, 622 insertions(+), 11 deletions(-) delete mode 100644 deploy.cmd create mode 100644 src/circuits/auto_connect.py create mode 100644 src/circuits/prc_simulator.py create mode 100644 src/circuits/primes.py create mode 100644 src/prc_publish/coop_gevent.py create mode 100644 src/prc_publish/coop_multitask.py create mode 100644 src/prc_publish/multi_thread.py create mode 100644 src/prc_publish/single_thread.py create mode 100644 src/zmq/primes.py rename src/{ => zodb}/tutorial.py (89%) diff --git a/.hgignore b/.hgignore index 24b494e..dbd3d7f 100644 --- a/.hgignore +++ b/.hgignore @@ -2,3 +2,6 @@ syntax: glob .settings .project .pydevproject + +syntax: regexp +^world$ \ No newline at end of file diff --git a/.pydevproject b/.pydevproject index 4b6e256..8b60b2c 100644 --- a/.pydevproject +++ b/.pydevproject @@ -3,7 +3,7 @@ -python 2.5 +python 2.7 Default /sandbox/src diff --git a/deploy.cmd b/deploy.cmd deleted file mode 100644 index 78efbc3..0000000 --- a/deploy.cmd +++ /dev/null @@ -1,9 +0,0 @@ -@echo off -FOR %%f IN (src\*.py) DO CALL :conv %%f -goto :EOF - -:conv -echo "Deploying %1 -> O:\User\baloan\tmp\%~n1" -copy %1 O:\User\baloan\tmp\%~n1 >nul: -c:\apps\tofrodos\fromdos O:\User\baloan\tmp\%~n1 -goto :EOF diff --git a/src/circuits/auto_connect.py b/src/circuits/auto_connect.py new file mode 100644 index 0000000..12f8a4c --- /dev/null +++ b/src/circuits/auto_connect.py @@ -0,0 +1,90 @@ +# Copyright 2011 baloan +# See LICENSE for details. + +''' empty module ''' + +import logging +from circuits import Component, Debugger, Event, handler, Timer +from circuits.net.sockets import TCPServer, TCPClient +from circuits.net.sockets import Close, Connect, Write, Read +from circuits.tools import inspect, graph + +LOG = logging.getLogger() + +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(levelname).4s %(message)s', + datefmt='%H:%M:%S') + +# definitions + +class Tick(Event): + def __init__(self, *args, **kwargs): + super(Tick, self).__init__(*args, **kwargs) + + +class AutoConnect(Component): + def __init__(self, host, port, *args, **kwargs): + super(AutoConnect, self).__init__(*args, **kwargs) + self.host = host + self.port = port + self.tcp = None + + def started(self, component, mode): + LOG.info("Started %s %s", component, mode) + self.tcp = TCPClient(channel=self.channel) + self.tcp.register(self) + print inspect(system) + print graph(system) + + def ready(self, e): + LOG.info("ready %s", e) + self.push(Connect(self.host, self.port), target=self.channel) + + def connected(self, host, port): + LOG.info("Connected %s %d", host, port) + # implement any negotiation here + + def error(self, data): + LOG.info("Error [%s]", data) + + def disconnected(self): + LOG.info("Disconnected") + self.tcp.unregister(self) + self.tcp = None + self.stop() + + def stopped(self): + Timer(1, Event(), target='start').register(self) + +class Parser(Component): + def __init__(self, *args, **kwargs): + super(Parser, self).__init__(*args, **kwargs) + self.buffer = "" + + @handler("read", "hmpf", target="biw") + def read_biw(self, data): + blcks = data.split("\x0c") + blcks[0].join(self.buffer) + for blk in blcks[:-1]: + flds = blk.split("\x00") + self.push(Tick(flds)) + self.buffer = blcks[-1] + + @handler("read", target="cos") + def read_cos(self, data): + blcks = data.split("\x0c") + blcks[0].join(self.buffer) + for blk in blcks[:-1]: + flds = blk.split("\x00") + self.push(Tick(flds)) + self.buffer = blcks[-1] + + def tick(self, flds): + LOG.info("Tick %s", "|".join(flds)) + + +if __name__ == "__main__": + system = AutoConnect("xapgu17a-uat.de.db.com", 2013, channel="biw") + TCPServer(8001) + Parser() + Debugger() + system.run(sleep=0.5) + + \ No newline at end of file diff --git a/src/circuits/prc_simulator.py b/src/circuits/prc_simulator.py new file mode 100644 index 0000000..aebb49e --- /dev/null +++ b/src/circuits/prc_simulator.py @@ -0,0 +1,17 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" circuits.prc_simulator """ + +# imports + +import logging + +# constants + +# globals + +LOG = logging.getLogger() + +# definitions + diff --git a/src/circuits/primes.py b/src/circuits/primes.py new file mode 100644 index 0000000..ebe1642 --- /dev/null +++ b/src/circuits/primes.py @@ -0,0 +1,86 @@ +# Copyright 2010 baloan +# See LICENSE for details. + +''' multiprocessing demo ''' + +# system imports + +from circuits import Component, Event, Timer, handler, Debugger +from math import sqrt +import logging + +# globals + +LOG = logging.getLogger() + +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03d %(process)d:%(thread)d %(levelname).4s %(message)s', + datefmt='%H:%M:%S') + +# definitions + +class Print(Event): + 'Print Event' + +class Timeout(Event): + 'Timeout' + + +class Input(Component): + def started(self, component, mode): + LOG.info('started %s %s', component, mode) + for n in range(100): + LOG.info('put %i', n) + self.push(Event(n)) + LOG.info('worker done, ending') + Timer(5, Timeout('msg'), "timeout").register(self) + + def timeout(self, e): + LOG.info("timeout %s", e) + self.stop() + + def stopped(self, e): + pass + + +class Filter(Component): + def started(self, component, mode): + LOG.info('started %s %s', component, mode) + + def event(self, n): + LOG.info('Filter:event %i', n) + if self.is_prime(n): + LOG.info('Filter: sending %i', n) + self.push(Print(n)) + + def is_prime(self, n): + if n < 2: + return False + if n in (2, 3): + return True + for i in range(2, int(sqrt(n)) + 1): + if n % i == 0: + return False + return True + + +class Output(Component): + def started(self, component, mode): + LOG.info('started %s %s', component, mode) + + @handler('print') + def onPrint(self, n): + print '%i, ' % (n,), + + +def runInline(): + master = Input() + master += Filter() + master += Output() + master += Debugger() + master.run() + LOG.info('done.') + +if __name__ == '__main__': + runInline() + \ No newline at end of file diff --git a/src/prc_publish/coop_gevent.py b/src/prc_publish/coop_gevent.py new file mode 100644 index 0000000..cd13ae6 --- /dev/null +++ b/src/prc_publish/coop_gevent.py @@ -0,0 +1,17 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.coop_gevent """ + +# imports + +import logging +import datetime as dt + +# constants + +# globals + +LOG = logging.getLogger() + +# definitions diff --git a/src/prc_publish/coop_multitask.py b/src/prc_publish/coop_multitask.py new file mode 100644 index 0000000..52ffd80 --- /dev/null +++ b/src/prc_publish/coop_multitask.py @@ -0,0 +1,125 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.multitask + +Price Publisher +""" + +# imports + +from argparse import ArgumentParser +import logging +import multitask +import os +import random +import sys +import socket + +LOG = logging.getLogger() + +# definitions + +def main(argv=None): + if argv is None: + argv = sys.argv + LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) + # parse options and arguments + parser = ArgumentParser(description="Price Publisher") + parser.add_argument("-f", "--file", dest="filename", + help="read configuration from %(dest)s") + parser.add_argument("-p", "--port", default=8001, type=int, + help="server port [default: %(default)s") + args = parser.parse_args() + print args + # create product dict + prds = { } + pubqs = [] + for n in range(10): + key = "AB" + "{:04}".format(n) + prds["AB" + key] = Pricer(key) + # start one thread for price changes + multitask.add(controller(prds, pubqs)) + address = ('localhost', 8010) # family is deduced to be 'AF_INET' + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(address) + sock.listen(1) + multitask.add(listener(sock, pubqs)) + multitask.run() + +def listener(sock, pubqs): + while True: + cx, address = yield multitask.accept(sock) + LOG.info("accepting connection from %s", address) + inq = multitask.Queue() + pubqs.append(inq) + multitask.add(publisher(inq, cx)) + multitask.add(receiver(cx)) + + +def publisher(inq, cx): + LOG.info("Publisher running") + try: + while True: + # what happens if client does not pick up + item = yield inq.get() + yield cx.send(item) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + LOG.info("Publisher terminated") + + +def receiver(cx): + LOG.info("Receiver running") + try: + while True: + # what happens if client does not pick up + s = yield cx.recv(4096) + if not s: + break + LOG.info(s) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + LOG.info("Receiver terminated") + + + +def update_vola(old_vola): + new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) + return new_vola + + +def controller(prds, pubqs): + while True: + LOG.info("Price update cycle") + Pricer.VOLA = update_vola(Pricer.VOLA) + for prd in prds.values(): + prd.run() + for pubq in pubqs: + yield pubq.put((prd.name, prd.prc), timeout=5) + yield multitask.sleep(5) + +class Pricer(): + VOLA = 0.01 + def __init__(self, name): + self.name = name + self.prc = random.random() * 100.0 + self.daemon = True + + def run(self): + self.prc += random.choice((-1, +1)) * self.VOLA + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)i %(message)s', + datefmt='%H:%M:%S') + main() diff --git a/src/prc_publish/multi_thread.py b/src/prc_publish/multi_thread.py new file mode 100644 index 0000000..1b6e423 --- /dev/null +++ b/src/prc_publish/multi_thread.py @@ -0,0 +1,168 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.multi_thread + +Price Publisher +""" + +# imports + +from Queue import Queue +from argparse import ArgumentParser +from threading import Thread, Event, Lock +import logging +import os +import random +import select +import sys +import time + +# monkey patch to allow for interruptable listener +import multiprocessing.connection +multiprocessing.connection.Listener.fileno = lambda self: self._listener._socket.fileno() + + +LOG = logging.getLogger() + +# definitions + +class StoppableThread(Thread): + """Thread class with a stop() method. The thread itself has to check + regularly for the stopped() condition.""" + + def __init__(self, *args, **kwargs): + super(StoppableThread, self).__init__(*args, **kwargs) + self._stop = Event() + + def stop(self): + self._stop.set() + + def stopped(self): + return self._stop.isSet() + + +def main(argv=None): + if argv is None: + argv = sys.argv + LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) + # parse options and arguments + parser = ArgumentParser(description="Price Publisher") + parser.add_argument("-f", "--file", dest="filename", + help="read configuration from %(dest)s") + parser.add_argument("-p", "--port", default=8001, type=int, + help="server port [default: %(default)s") + args = parser.parse_args() + print args + # create product dict + threads = [] + prds = { } + for n in range(10): + key = "AB" + "{:04}".format(n) + prds["AB" + key] = Pricer(key) + # start one thread for price changes + controller = Controller(prds) + threads.append(controller) + # main thread listens for connections + address = ('localhost', 8010) # family is deduced to be 'AF_INET' + listener = multiprocessing.connection.Listener(address) + # start threads + for thread in threads: + thread.start() + + LOG.info('waiting for connection on %s', address) + + try: + while True: + LOG.info("Waiting for accept...") + r, w, e = select.select((listener,), (), (), 5) + if listener in r: + cx = listener.accept() + LOG.info('connection accepted from %s', listener.last_accepted) + # start one thread per client + inq = Queue() + pub = Publisher(inq, cx) + with controller.pubqs_lock: + controller.pubqs.append(inq) + pub.daemon = True + pub.start() + threads.append(pub) + finally: + for thread in threads: + thread.stop() + for thread in threads: + thread.join() + LOG.info("done.") + + +class Publisher(StoppableThread): + THREAD_ID = 0 + def __init__(self, inq, cx, *args, **kwargs): + super(Publisher, self).__init__(*args, **kwargs) + self.inq = inq + self.cx = cx + LOG.info("Publisher created") + self.daemon = True + Publisher.THREAD_ID += 1 + self.name = "Pub-{}".format(Publisher.THREAD_ID) + + def run(self): + LOG.info("Publisher running") + try: + while not self.stopped(): + # what happens if client does not pick up + item = self.inq.get() + self.cx.send(item) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + self.cx.close() + LOG.info("Publisher terminated") + + +def update_vola(old_vola): + new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) + return new_vola + + +class Controller(StoppableThread): + THREAD_ID = 0 + def __init__(self, prds, *args, **kwargs): + super(Controller, self).__init__(*args, **kwargs) + self.prds = prds + self.pubqs = [] + self.pubqs_lock = Lock() + self.daemon = True + Controller.THREAD_ID += 1 + self.name = "Control-{}".format(Controller.THREAD_ID) + + def run(self): + while not self.stopped(): + LOG.info("Price update cycle") + Pricer.VOLA = update_vola(Pricer.VOLA) + for prd in self.prds.values(): + prd.run() + with self.pubqs_lock: + for pubq in self.pubqs: + pubq.put((prd.name, prd.prc)) + time.sleep(5) + + +class Pricer(): + VOLA = 0.01 + def __init__(self, name): + self.name = name + self.prc = random.random() * 100.0 + self.daemon = True + + def run(self): + self.prc += random.choice((-1, +1)) * self.VOLA + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s', + datefmt='%H:%M:%S') + main() diff --git a/src/prc_publish/single_thread.py b/src/prc_publish/single_thread.py new file mode 100644 index 0000000..bbcc3e7 --- /dev/null +++ b/src/prc_publish/single_thread.py @@ -0,0 +1,59 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.single_thread + +Price Publisher +""" + +# imports + +from argparse import ArgumentParser +import logging +import os +import random +import sys + +LOG = logging.getLogger() + +# definitions + +def main(argv=None): + if argv is None: + argv = sys.argv + LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) + # parse options and arguments + parser = ArgumentParser(description = "Price Publisher") + parser.add_argument("-f", "--file", dest="filename", + help="read configuration from FILENAME") + parser.add_argument("-p", "--port", default=8001, type=int, + help="server port [default: %(default)s") + args = parser.parse_args() + # start program + prc = Pricer("AB1234") + for n in range(100): + prc.run() + Pricer.VOLA = update_vola(Pricer.VOLA) + print "{:03}: {:.2f} {} {:.4f}".format(n, Pricer.VOLA, prc.name, prc.prc) + + +def update_vola(old_vola): + new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) + return new_vola + + +class Pricer(): + VOLA = 0.01 + def __init__(self, name): + self.name = name + self.prc = random.random() * 100.0 + + def run(self): + self.prc += random.choice((-1, +1)) * self.VOLA + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', + datefmt='%H:%M:%S') + main() diff --git a/src/zmq/primes.py b/src/zmq/primes.py new file mode 100644 index 0000000..cb5259c --- /dev/null +++ b/src/zmq/primes.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" zmq.primes """ + +# system imports + +import logging +import sys +import os +from optparse import OptionParser +import zmq + +# globals + +LOG = logging.getLogger() + +# definitions + +def main(argv = None): + if argv is None: + argv = sys.argv + script_name = os.path.basename(argv[0]) + LOG.info("starting '%s %s'", script_name, " ".join(argv[1:])) + # parse options and arguments + usage = "usage: %prog [options] arg" + version = "%prog 1.0" + parser = OptionParser(usage, version) + parser.add_option("-f", "--file", dest="filename", + help="read configuration from FILENAME") + parser.add_option("-p", "--port", default="8001", type="int", + help="server port [default: %default]") + (opts, args) = parser.parse_args() + if len(args) > 0: + parser.error("too many arguments") + # call method with appropriate arguments + ctx = zmq.Context() + s = ctx.socket(zmq.REP) + s.bind('tcp://localhost:12345') + loop = zmq.eventloop.ioloop.IOLoop.instance() + stream = zmq.eventloop.zmqstream.ZMQStream(s, loop) + def echo(msg): + stream.send_multipart(msg) + stream.on_recv(echo) + loop.start() + return 0 + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)d:%(thread)d %(message)s', + datefmt='%H:%M:%S') + rc = main() + sys.exit(rc) diff --git a/src/tutorial.py b/src/zodb/tutorial.py similarity index 89% rename from src/tutorial.py rename to src/zodb/tutorial.py index d8e8a56..6472bcd 100644 --- a/src/tutorial.py +++ b/src/zodb/tutorial.py @@ -24,7 +24,7 @@ class Account(Persistent): # Configuration -storage = FileStorage(r"e:\temp\data.zodb") +storage = FileStorage(r"e:\workspaces\zodb3\data.zodb") db = DB(storage) connection = db.open() root = connection.root()