diff --git a/src/publisher/coop_eventlet.py b/src/publisher/coop_eventlet.py index fb8df7d..edb07b8 100644 --- a/src/publisher/coop_eventlet.py +++ b/src/publisher/coop_eventlet.py @@ -14,6 +14,7 @@ import logging import os import random import sys +import cPickle as pickle LOG = logging.getLogger() @@ -67,12 +68,12 @@ def publisher(pubqs, inq, cx): try: with eventlet.Timeout(1): item = inq.get() - # TODO: pickle - s = "{0[0]} {0[1]}\n\r".format(item) + s = pickle.dumps(item, pickle.HIGHEST_PROTOCOL) + # s = "{0[0]} {0[1]}\n\r".format(item) cx.send(s) except eventlet.Timeout: # raises IOError if connection lost - LOG.debug(cx.fileno()) + cx.fileno() # if connection closes except IOError, e: LOG.info(e) @@ -102,9 +103,8 @@ def receiver(cx): def controller(prds, pubqs): while True: - LOG.info("Price update cycle") + LOG.info("controller: price update cycle, %i pubqs", len(pubqs)) Pricer.VOLA = update_vola(Pricer.VOLA) - LOG.debug("controller: %i pubqs", len(pubqs)) for prd in prds.values(): prd.run() for pubq in pubqs: @@ -120,10 +120,9 @@ class Pricer(object): def __init__(self, name): self.name = name self.prc = random.random() * 100.0 - self.daemon = True def run(self): - self.prc += random.choice((-1, +1)) * self.VOLA + self.prc += random.choice((-1, +1)) * self.prc * self.VOLA if __name__ == '__main__': diff --git a/src/publisher/coop_gevent.py b/src/publisher/coop_gevent.py index 9e7ff81..b94abd5 100644 --- a/src/publisher/coop_gevent.py +++ b/src/publisher/coop_gevent.py @@ -121,7 +121,6 @@ class Pricer(): def __init__(self, name): self.name = name self.prc = random.random() * 100.0 - self.daemon = True def run(self): self.prc += random.choice((-1, +1)) * self.VOLA diff --git a/src/publisher/coop_multitask.py b/src/publisher/coop_multitask.py index 84b4c7e..6a594a4 100644 --- a/src/publisher/coop_multitask.py +++ b/src/publisher/coop_multitask.py @@ -119,7 +119,6 @@ class Pricer(): def __init__(self, name): self.name = name self.prc = random.random() * 100.0 - self.daemon = True def run(self): self.prc += random.choice((-1, +1)) * self.VOLA diff --git a/src/publisher/multi_thread.py b/src/publisher/multi_thread.py index 15381c4..2fa461b 100644 --- a/src/publisher/multi_thread.py +++ b/src/publisher/multi_thread.py @@ -71,7 +71,7 @@ def main(argv=None): thread.start() LOG.info('waiting for connection on %s', address) - + # acceptor in main thread try: while True: LOG.info("Waiting for accept...") @@ -157,7 +157,6 @@ class Pricer(): def __init__(self, name): self.name = name self.prc = random.random() * 100.0 - self.daemon = True def run(self): self.prc += random.choice((-1, +1)) * self.VOLA @@ -165,6 +164,6 @@ class Pricer(): if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s', + format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(funcName)10s %(message)s', datefmt='%H:%M:%S') main() diff --git a/src/subscriber/coop_eventlet.py b/src/subscriber/coop_eventlet.py new file mode 100644 index 0000000..87e599f --- /dev/null +++ b/src/subscriber/coop_eventlet.py @@ -0,0 +1,58 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.eventlet + +Price Publisher +""" + +# imports + +from argparse import ArgumentParser +import eventlet +import logging +import os +from eventlet.green import socket +import sys +import cPickle as pickle + +LOG = logging.getLogger() + +# definitions + +def main(argv=None): + if argv is None: + argv = sys.argv + LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) + # parse options and arguments + parser = ArgumentParser(description="Price Publisher") + parser.add_argument("-f", "--file", dest="filename", + help="read configuration from %(dest)s") + parser.add_argument("-p", "--port", default=8001, type=int, + help="server port [default: %(default)s") + args = parser.parse_args() + print args + # application + address = ('localhost', 8010) + while True: + subscriber(address) + eventlet.sleep(1) + +def subscriber(address): + try: + LOG.info("connecting to %s", address) + cx = eventlet.connect(address) + except socket.error, e: + LOG.error("%i %s", e.errno, e) + return + while True: + s = cx.recv(4096) + item = pickle.loads(s) + LOG.info("%s", item) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(funcName)10s: %(message)s', + datefmt='%H:%M:%S') + main()