diff --git a/src/prc_publish/coop_gevent.py b/src/prc_publish/coop_gevent.py deleted file mode 100644 index cd13ae6..0000000 --- a/src/prc_publish/coop_gevent.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright (c) 2011 Andreas Balogh -# See LICENSE for details. - -""" prc_publish.coop_gevent """ - -# imports - -import logging -import datetime as dt - -# constants - -# globals - -LOG = logging.getLogger() - -# definitions diff --git a/src/publisher/coop_eventlet.py b/src/publisher/coop_eventlet.py new file mode 100644 index 0000000..fb8df7d --- /dev/null +++ b/src/publisher/coop_eventlet.py @@ -0,0 +1,133 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.eventlet + +Price Publisher +""" + +# imports + +from argparse import ArgumentParser +import eventlet +import logging +import os +import random +import sys + +LOG = logging.getLogger() + +# definitions + +def main(argv=None): + if argv is None: + argv = sys.argv + LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) + # parse options and arguments + parser = ArgumentParser(description="Price Publisher") + parser.add_argument("-f", "--file", dest="filename", + help="read configuration from %(dest)s") + parser.add_argument("-p", "--port", default=8001, type=int, + help="server port [default: %(default)s") + args = parser.parse_args() + print args + # create product dict + prds = { } + pubqs = [] + for n in range(10): + key = "AB" + "{:04}".format(n) + prds["AB" + key] = Pricer(key) + # start one thread for price changes + eventlet.spawn(controller, prds, pubqs) + address = ('localhost', 8010) + eventlet.spawn(listener, address, pubqs) + # main thread runs eventlet loop + while True: + eventlet.sleep(10) + + +def listener(address, pubqs): + sock = eventlet.listen(address) + while True: + LOG.info('waiting for connection on %s', address) + cx, remote = sock.accept() + LOG.info("accepting connection from %s", remote) + inq = eventlet.queue.Queue() + pubqs.append(inq) + eventlet.spawn(receiver, cx) + eventlet.spawn(publisher, pubqs, inq, cx) + + +def publisher(pubqs, inq, cx): + LOG.info("Publisher running") + try: + while True: + # what happens if client does not pick up + # what happens if client dies during queue wait + try: + with eventlet.Timeout(1): + item = inq.get() + # TODO: pickle + s = "{0[0]} {0[1]}\n\r".format(item) + cx.send(s) + except eventlet.Timeout: + # raises IOError if connection lost + LOG.debug(cx.fileno()) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + pubqs.remove(inq) + LOG.info("Publisher terminated") + + +def receiver(cx): + LOG.info("Receiver running") + try: + while True: + # what happens if client does not pick up + s = cx.recv(4096) + if not s: + break + LOG.info(s) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + LOG.info("Receiver terminated") + +def controller(prds, pubqs): + while True: + LOG.info("Price update cycle") + Pricer.VOLA = update_vola(Pricer.VOLA) + LOG.debug("controller: %i pubqs", len(pubqs)) + for prd in prds.values(): + prd.run() + for pubq in pubqs: + pubq.put((prd.name, prd.prc)) + eventlet.sleep(5) + +def update_vola(old_vola): + new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) + return new_vola + +class Pricer(object): + VOLA = 0.01 + def __init__(self, name): + self.name = name + self.prc = random.random() * 100.0 + self.daemon = True + + def run(self): + self.prc += random.choice((-1, +1)) * self.VOLA + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(funcName)10s: %(message)s', + datefmt='%H:%M:%S') + main() diff --git a/src/publisher/coop_gevent.py b/src/publisher/coop_gevent.py new file mode 100644 index 0000000..9e7ff81 --- /dev/null +++ b/src/publisher/coop_gevent.py @@ -0,0 +1,134 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.multi_thread + +Price Publisher +""" + +# imports + +from argparse import ArgumentParser +import gevent.socket +import gevent.queue +import gevent +import logging +import os +import random +import socket +import sys + +LOG = logging.getLogger() + +# definitions + +def main(argv=None): + if argv is None: + argv = sys.argv + LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) + # parse options and arguments + parser = ArgumentParser(description="Price Publisher") + parser.add_argument("-f", "--file", dest="filename", + help="read configuration from %(dest)s") + parser.add_argument("-p", "--port", default=8001, type=int, + help="server port [default: %(default)s") + args = parser.parse_args() + print args + # create product dict + threads = [] + prds = { } + pubqs = [] + for n in range(10): + key = "AB" + "{:04}".format(n) + prds["AB" + key] = Pricer(key) + # start one thread for price changes + gevent.spawn(controller, prds, pubqs) + address = ('localhost', 8010) + gevent.spawn(listener, address, pubqs) + # main thread runs gevent loop + while True: + gevent.sleep(10) + + +def listener(address, pubqs): + sock = gevent.socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(address) + sock.listen(1) + while True: + LOG.info('waiting for connection on %s', address) + cx, address = sock.accept() + LOG.info("accepting connection from %s", address) + inq = gevent.queue.Queue() + pubqs.append(inq) + gevent.spawn(receiver, cx) + gevent.spawn(publisher, pubqs, inq, cx) + + +def publisher(pubqs, inq, cx): + LOG.info("Publisher running") + try: + while True: + # what happens if client does not pick up + # what happens if client dies during queue wait + item = inq.get() + # TODO: pickle + s = "{0[0]} {0[1]}\n\r".format(item) + cx.send(s) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + pubqs.remove(inq) + LOG.info("Publisher terminated") + + +def receiver(cx): + LOG.info("Receiver running") + try: + while True: + # what happens if client does not pick up + s = cx.recv(4096) + if not s: + break + LOG.info(s) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + LOG.info("Receiver terminated") + +def controller(prds, pubqs): + while True: + LOG.info("Price update cycle") + Pricer.VOLA = update_vola(Pricer.VOLA) + LOG.debug("controller: %i pubqs", len(pubqs)) + for prd in prds.values(): + prd.run() + for pubq in pubqs: + pubq.put((prd.name, prd.prc)) + gevent.sleep(5) + +def update_vola(old_vola): + new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) + return new_vola + +class Pricer(): + VOLA = 0.01 + def __init__(self, name): + self.name = name + self.prc = random.random() * 100.0 + self.daemon = True + + def run(self): + self.prc += random.choice((-1, +1)) * self.VOLA + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s', + datefmt='%H:%M:%S') + main() diff --git a/src/publisher/coop_gevent_monkey.py b/src/publisher/coop_gevent_monkey.py new file mode 100644 index 0000000..0185d9f --- /dev/null +++ b/src/publisher/coop_gevent_monkey.py @@ -0,0 +1,141 @@ +# Copyright (c) 2011 Andreas Balogh +# See LICENSE for details. + +""" prc_publish.multi_thread + +Price Publisher +""" + +# imports + +from gevent import monkey +monkey.patch_all() + +from argparse import ArgumentParser +from gevent.queue import Queue +import gevent +import logging +import os +import random +import socket +import sys +import threading + +LOG = logging.getLogger() + +# definitions + +def main(argv=None): + if argv is None: + argv = sys.argv + LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) + # parse options and arguments + parser = ArgumentParser(description="Price Publisher") + parser.add_argument("-f", "--file", dest="filename", + help="read configuration from %(dest)s") + parser.add_argument("-p", "--port", default=8001, type=int, + help="server port [default: %(default)s") + args = parser.parse_args() + print args + # create product dict + threads = [] + prds = { } + pubqs = [] + for n in range(10): + key = "AB" + "{:04}".format(n) + prds["AB" + key] = Pricer(key) + # start one thread for price changes + t1 = threading.Thread(target=controller, args=(prds, pubqs)) + t1.start() + address = ('localhost', 8010) + t2 = threading.Thread(target=listener, args=(address, pubqs)) + t2.start() + # main thread runs gevent loop + while True: + gevent.sleep(10) + + +def listener(address, pubqs): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(address) + sock.listen(1) + while True: + LOG.info('waiting for connection on %s', address) + cx, address = sock.accept() + LOG.info("accepting connection from %s", address) + inq = Queue() + pubqs.append(inq) + rec = threading.Thread(target=receiver, args=(cx,)) + pub = threading.Thread(target=publisher, args=(pubqs, inq, cx)) + rec.start() + pub.start() + + +def publisher(pubqs, inq, cx): + LOG.info("Publisher running") + try: + while True: + # what happens if client does not pick up + # what happens if client dies during queue wait + item = inq.get() + # TODO: pickle + s = "{0[0]} {0[1]}\n\r".format(item) + cx.send(s) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + pubqs.remove(inq) + LOG.info("Publisher terminated") + + +def receiver(cx): + LOG.info("Receiver running") + try: + while True: + # what happens if client does not pick up + s = cx.recv(4096) + if not s: + break + LOG.info(s) + # if connection closes + except IOError, e: + LOG.info(e) + # make sure to close the socket + finally: + cx.close() + LOG.info("Receiver terminated") + +def controller(prds, pubqs): + while True: + LOG.info("Price update cycle") + Pricer.VOLA = update_vola(Pricer.VOLA) + LOG.debug("controller: %i pubqs", len(pubqs)) + for prd in prds.values(): + prd.run() + for pubq in pubqs: + pubq.put((prd.name, prd.prc)) + gevent.sleep(5) + +def update_vola(old_vola): + new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) + return new_vola + +class Pricer(): + VOLA = 0.01 + def __init__(self, name): + self.name = name + self.prc = random.random() * 100.0 + self.daemon = True + + def run(self): + self.prc += random.choice((-1, +1)) * self.VOLA + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s', + datefmt='%H:%M:%S') + main() diff --git a/src/prc_publish/coop_multitask.py b/src/publisher/coop_multitask.py similarity index 73% rename from src/prc_publish/coop_multitask.py rename to src/publisher/coop_multitask.py index 52ffd80..84b4c7e 100644 --- a/src/prc_publish/coop_multitask.py +++ b/src/publisher/coop_multitask.py @@ -40,36 +40,46 @@ def main(argv=None): prds["AB" + key] = Pricer(key) # start one thread for price changes multitask.add(controller(prds, pubqs)) - address = ('localhost', 8010) # family is deduced to be 'AF_INET' - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(address) - sock.listen(1) - multitask.add(listener(sock, pubqs)) + address = ('localhost', 8010) + multitask.add(listener(address, pubqs)) multitask.run() -def listener(sock, pubqs): + +def listener(address, pubqs): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind(address) + sock.listen(50) while True: cx, address = yield multitask.accept(sock) LOG.info("accepting connection from %s", address) inq = multitask.Queue() pubqs.append(inq) - multitask.add(publisher(inq, cx)) multitask.add(receiver(cx)) + multitask.add(publisher(pubqs, inq, cx)) -def publisher(inq, cx): +def publisher(pubqs, inq, cx): LOG.info("Publisher running") try: while True: # what happens if client does not pick up - item = yield inq.get() - yield cx.send(item) + # what happens if client dies during queue wait + try: + item = yield inq.get(timeout=1) + # TODO: pickle + s = "{0[0]} {0[1]}\n\r".format(item) + # LOG.debug(s) + yield multitask.send(cx, s) + except multitask.Timeout: + # raises IOError if connection lost + cx.fileno() # if connection closes except IOError, e: LOG.info(e) # make sure to close the socket finally: cx.close() + pubqs.remove(inq) LOG.info("Publisher terminated") @@ -78,7 +88,7 @@ def receiver(cx): try: while True: # what happens if client does not pick up - s = yield cx.recv(4096) + s = yield multitask.recv(cx, 4096) if not s: break LOG.info(s) @@ -90,22 +100,19 @@ def receiver(cx): cx.close() LOG.info("Receiver terminated") - - -def update_vola(old_vola): - new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) - return new_vola - - def controller(prds, pubqs): while True: - LOG.info("Price update cycle") + LOG.info("controller: price update cycle, %i pubqs", len(pubqs)) Pricer.VOLA = update_vola(Pricer.VOLA) for prd in prds.values(): prd.run() for pubq in pubqs: - yield pubq.put((prd.name, prd.prc), timeout=5) - yield multitask.sleep(5) + yield pubq.put((prd.name, prd.prc)) + yield # multitask.sleep(5) + +def update_vola(old_vola): + new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) + return new_vola class Pricer(): VOLA = 0.01 @@ -120,6 +127,6 @@ class Pricer(): if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(process)i %(message)s', + format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s', datefmt='%H:%M:%S') main() diff --git a/src/prc_publish/multi_thread.py b/src/publisher/multi_thread.py similarity index 94% rename from src/prc_publish/multi_thread.py rename to src/publisher/multi_thread.py index 1b6e423..15381c4 100644 --- a/src/prc_publish/multi_thread.py +++ b/src/publisher/multi_thread.py @@ -110,8 +110,9 @@ class Publisher(StoppableThread): LOG.info("Publisher running") try: while not self.stopped(): - # what happens if client does not pick up + # what happens if socket closes during queue wait item = self.inq.get() + # what happens if client does not pick up self.cx.send(item) # if connection closes except IOError, e: @@ -121,6 +122,7 @@ class Publisher(StoppableThread): self.cx.close() LOG.info("Publisher terminated") +# no way found to simultaneously receive def update_vola(old_vola): new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) diff --git a/src/prc_publish/single_thread.py b/src/publisher/single_thread.py similarity index 100% rename from src/prc_publish/single_thread.py rename to src/publisher/single_thread.py