Files
py_sandbox/src/publisher/multi_thread.py
baloan 38e71583c4 publisher/subscriber implementations
--HG--
branch : sandbox
2011-03-14 11:31:19 +01:00

170 lines
5.1 KiB
Python

# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.multi_thread
Price Publisher
"""
# imports
from Queue import Queue
from argparse import ArgumentParser
from threading import Thread, Event, Lock
import logging
import os
import random
import select
import sys
import time
# monkey patch to allow for interruptable listener
import multiprocessing.connection
multiprocessing.connection.Listener.fileno = lambda self: self._listener._socket.fileno()
LOG = logging.getLogger()
# definitions
class StoppableThread(Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
self._stop = Event()
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description="Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from %(dest)s")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
print args
# create product dict
threads = []
prds = { }
for n in range(10):
key = "AB" + "{:04}".format(n)
prds["AB" + key] = Pricer(key)
# start one thread for price changes
controller = Controller(prds)
threads.append(controller)
# main thread listens for connections
address = ('localhost', 8010) # family is deduced to be 'AF_INET'
listener = multiprocessing.connection.Listener(address)
# start threads
for thread in threads:
thread.start()
LOG.info('waiting for connection on %s', address)
# acceptor in main thread
try:
while True:
LOG.info("Waiting for accept...")
r, w, e = select.select((listener,), (), (), 5)
if listener in r:
cx = listener.accept()
LOG.info('connection accepted from %s', listener.last_accepted)
# start one thread per client
inq = Queue()
pub = Publisher(inq, cx)
with controller.pubqs_lock:
controller.pubqs.append(inq)
pub.daemon = True
pub.start()
threads.append(pub)
finally:
for thread in threads:
thread.stop()
for thread in threads:
thread.join()
LOG.info("done.")
class Publisher(StoppableThread):
THREAD_ID = 0
def __init__(self, inq, cx, *args, **kwargs):
super(Publisher, self).__init__(*args, **kwargs)
self.inq = inq
self.cx = cx
LOG.info("Publisher created")
self.daemon = True
Publisher.THREAD_ID += 1
self.name = "Pub-{}".format(Publisher.THREAD_ID)
def run(self):
LOG.info("Publisher running")
try:
while not self.stopped():
# what happens if socket closes during queue wait
item = self.inq.get()
# what happens if client does not pick up
self.cx.send(item)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
self.cx.close()
LOG.info("Publisher terminated")
# no way found to simultaneously receive
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
class Controller(StoppableThread):
THREAD_ID = 0
def __init__(self, prds, *args, **kwargs):
super(Controller, self).__init__(*args, **kwargs)
self.prds = prds
self.pubqs = []
self.pubqs_lock = Lock()
self.daemon = True
Controller.THREAD_ID += 1
self.name = "Control-{}".format(Controller.THREAD_ID)
def run(self):
while not self.stopped():
LOG.info("Price update cycle")
Pricer.VOLA = update_vola(Pricer.VOLA)
for prd in self.prds.values():
prd.run()
with self.pubqs_lock:
for pubq in self.pubqs:
pubq.put((prd.name, prd.prc))
time.sleep(5)
class Pricer():
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
def run(self):
self.prc += random.choice((-1, +1)) * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(funcName)10s %(message)s',
datefmt='%H:%M:%S')
main()