eventlet coop added
--HG-- branch : sandbox
This commit is contained in:
170
src/publisher/multi_thread.py
Normal file
170
src/publisher/multi_thread.py
Normal file
@@ -0,0 +1,170 @@
|
||||
# Copyright (c) 2011 Andreas Balogh
|
||||
# See LICENSE for details.
|
||||
|
||||
""" prc_publish.multi_thread
|
||||
|
||||
Price Publisher
|
||||
"""
|
||||
|
||||
# imports
|
||||
|
||||
from Queue import Queue
|
||||
from argparse import ArgumentParser
|
||||
from threading import Thread, Event, Lock
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import select
|
||||
import sys
|
||||
import time
|
||||
|
||||
# monkey patch to allow for interruptable listener
|
||||
import multiprocessing.connection
|
||||
multiprocessing.connection.Listener.fileno = lambda self: self._listener._socket.fileno()
|
||||
|
||||
|
||||
LOG = logging.getLogger()
|
||||
|
||||
# definitions
|
||||
|
||||
class StoppableThread(Thread):
|
||||
"""Thread class with a stop() method. The thread itself has to check
|
||||
regularly for the stopped() condition."""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(StoppableThread, self).__init__(*args, **kwargs)
|
||||
self._stop = Event()
|
||||
|
||||
def stop(self):
|
||||
self._stop.set()
|
||||
|
||||
def stopped(self):
|
||||
return self._stop.isSet()
|
||||
|
||||
|
||||
def main(argv=None):
|
||||
if argv is None:
|
||||
argv = sys.argv
|
||||
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
|
||||
# parse options and arguments
|
||||
parser = ArgumentParser(description="Price Publisher")
|
||||
parser.add_argument("-f", "--file", dest="filename",
|
||||
help="read configuration from %(dest)s")
|
||||
parser.add_argument("-p", "--port", default=8001, type=int,
|
||||
help="server port [default: %(default)s")
|
||||
args = parser.parse_args()
|
||||
print args
|
||||
# create product dict
|
||||
threads = []
|
||||
prds = { }
|
||||
for n in range(10):
|
||||
key = "AB" + "{:04}".format(n)
|
||||
prds["AB" + key] = Pricer(key)
|
||||
# start one thread for price changes
|
||||
controller = Controller(prds)
|
||||
threads.append(controller)
|
||||
# main thread listens for connections
|
||||
address = ('localhost', 8010) # family is deduced to be 'AF_INET'
|
||||
listener = multiprocessing.connection.Listener(address)
|
||||
# start threads
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
|
||||
LOG.info('waiting for connection on %s', address)
|
||||
|
||||
try:
|
||||
while True:
|
||||
LOG.info("Waiting for accept...")
|
||||
r, w, e = select.select((listener,), (), (), 5)
|
||||
if listener in r:
|
||||
cx = listener.accept()
|
||||
LOG.info('connection accepted from %s', listener.last_accepted)
|
||||
# start one thread per client
|
||||
inq = Queue()
|
||||
pub = Publisher(inq, cx)
|
||||
with controller.pubqs_lock:
|
||||
controller.pubqs.append(inq)
|
||||
pub.daemon = True
|
||||
pub.start()
|
||||
threads.append(pub)
|
||||
finally:
|
||||
for thread in threads:
|
||||
thread.stop()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
LOG.info("done.")
|
||||
|
||||
|
||||
class Publisher(StoppableThread):
|
||||
THREAD_ID = 0
|
||||
def __init__(self, inq, cx, *args, **kwargs):
|
||||
super(Publisher, self).__init__(*args, **kwargs)
|
||||
self.inq = inq
|
||||
self.cx = cx
|
||||
LOG.info("Publisher created")
|
||||
self.daemon = True
|
||||
Publisher.THREAD_ID += 1
|
||||
self.name = "Pub-{}".format(Publisher.THREAD_ID)
|
||||
|
||||
def run(self):
|
||||
LOG.info("Publisher running")
|
||||
try:
|
||||
while not self.stopped():
|
||||
# what happens if socket closes during queue wait
|
||||
item = self.inq.get()
|
||||
# what happens if client does not pick up
|
||||
self.cx.send(item)
|
||||
# if connection closes
|
||||
except IOError, e:
|
||||
LOG.info(e)
|
||||
# make sure to close the socket
|
||||
finally:
|
||||
self.cx.close()
|
||||
LOG.info("Publisher terminated")
|
||||
|
||||
# no way found to simultaneously receive
|
||||
|
||||
def update_vola(old_vola):
|
||||
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
|
||||
return new_vola
|
||||
|
||||
|
||||
class Controller(StoppableThread):
|
||||
THREAD_ID = 0
|
||||
def __init__(self, prds, *args, **kwargs):
|
||||
super(Controller, self).__init__(*args, **kwargs)
|
||||
self.prds = prds
|
||||
self.pubqs = []
|
||||
self.pubqs_lock = Lock()
|
||||
self.daemon = True
|
||||
Controller.THREAD_ID += 1
|
||||
self.name = "Control-{}".format(Controller.THREAD_ID)
|
||||
|
||||
def run(self):
|
||||
while not self.stopped():
|
||||
LOG.info("Price update cycle")
|
||||
Pricer.VOLA = update_vola(Pricer.VOLA)
|
||||
for prd in self.prds.values():
|
||||
prd.run()
|
||||
with self.pubqs_lock:
|
||||
for pubq in self.pubqs:
|
||||
pubq.put((prd.name, prd.prc))
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
class Pricer():
|
||||
VOLA = 0.01
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
self.prc = random.random() * 100.0
|
||||
self.daemon = True
|
||||
|
||||
def run(self):
|
||||
self.prc += random.choice((-1, +1)) * self.VOLA
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=logging.DEBUG,
|
||||
format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s',
|
||||
datefmt='%H:%M:%S')
|
||||
main()
|
||||
Reference in New Issue
Block a user