diff --git a/src/megui/create_jobs.py b/src/megui/create_jobs.py deleted file mode 100644 index 63a4026..0000000 --- a/src/megui/create_jobs.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009 Andreas Balogh -# See LICENSE for details. - -""" create MeGUI jobs for Panasonic MPG files """ - -# system imports - -import logging -import sys -import os -import getopt -import re -import xml.dom.minidom as xdm - -# local imports - -# constants - -# globals - -LOG = logging.getLogger() - -logging.basicConfig(level=logging.DEBUG, - format="%(asctime)s %(levelname).3s %(process)d:%(thread)d %(message)s", - datefmt="%H:%M:%S") - -# definitions - -class Usage(Exception): - pass - -class Error(Exception): - pass - -def main(argv = [__name__]): - try: - # check for parameters - LOG.info("starting '%s %s'", argv[0], " ".join(argv[1:])) - script_name = os.path.basename(argv[0]) - try: - opts, args = getopt.getopt(argv[1:], "ht:j:z:", \ - ["help", "template", "job_dir", "temp_dir"]) - except getopt.error, err: - raise Usage(err) - LOG.debug("opts: %s, args: %s", opts, args) - template = "job_divx_template.xml" - job_dir = "C:\\Program Files\\megui\\jobs" - temp_dir = "e:\\video" - for o, a in opts: - if o in ("-h", "--help"): - usage(script_name) - return 0 - elif o in ("-t", "--template"): - template = a - elif o in ("-j", "--job_dir"): - job_dir = a - elif o in ("-z", "--temp_dir"): - temp_dir = a - src_dir = "D:\\Documents\\Raw\\Panasonic SDR-S7\\Video" - dest_dir = "D:\\Documents\\My Videos\\Balogh" - if len(args) == 2: - src_dir = args[0] - dest_dir = args[1] - elif len(args) == 1: - src_dir = args[0] - elif len(args) > 2: - raise Usage("too many arguments") - # call method with appropriate arguments - if src_dir and not os.path.exists(src_dir): - raise Error("Source directory not found [%s], aborting" % (src_dir, )) - if dest_dir and not os.path.exists(dest_dir): - LOG.warn("Destination directory not found [%s]", dest_dir) - LOG.info("Creating destination directory [%s]", dest_dir) - os.makedirs(dest_dir) - cli(src_dir, dest_dir, template, job_dir, temp_dir) - LOG.info("Done.") - return 0 - except Error, err: - LOG.error(err) - return 1 - except Usage, err: - LOG.error(err) - LOG.info("for usage use -h or --help") - return 2 - - -def cli(src_dir, dest_dir, template, job_dir, tmp_dir): - """ command line interface """ - if not os.path.exists(dest_dir): - LOG.error("Destination directory not found [%s]", dest_dir) - raise Error() - for mpg_fn in os.listdir(src_dir): - opts = { } - root, ext = os.path.splitext(mpg_fn) - # skip files which are not mpg video - if ext.lower() != ".mpg": - continue - # determine file paths - mpg_fp = os.path.join(src_dir, mpg_fn) - avi_fp = os.path.join(dest_dir, root + ".avi") - d2v_fp = os.path.join(tmp_dir, root + ".d2v") - opts["mpg_input"] = mpg_fp - opts["final_output"] = avi_fp - opts["d2v_output"] = d2v_fp - # if avi_fn doesn't exist then create job xml - if not os.path.exists(avi_fp): - create_megui_job(template, job_dir, opts) - - -def append_megui_joblist(jobname): - joblist = "c:\\program files\\megui\\joblists.xml" - # load the xml - inf = open(joblist,"r") - dom = xdm.parse(inf) - inf.close() - # add job220 to - JobListSerializer = dom.getElementsByTagName("JobListSerializer")[0] - mainJobList = JobListSerializer.getElementsByTagName("mainJobList")[0] - string = dom.createElement("string") - text = dom.createTextNode(jobname) - string.appendChild(text) - mainJobList.appendChild(string) - newline = dom.createTextNode("\n ") - mainJobList.appendChild(newline) - # save xml - outf = open(joblist,"w") - dom.writexml(outf) - outf.close() - - -def create_megui_job(template, job_dir, opts): - # replace in template: mpg_input, d2v_output, final_output, jobnum - jobnum = get_next_jobnumber(job_dir) - LOG.info("creating job %i...", jobnum) - jobfn = "job%i.xml" % (jobnum, ) - jobname = "job%i" % (jobnum, ) - opts["jobname"] = jobname - tplf = open(template, "r") - jobf = open(os.path.join(job_dir, jobfn), "w") - try: - for line in tplf: - # line = raw_line.rstrip("\n\r") - for k, v in opts.items(): - kd = "".join(("$", k, "$")) - line = line.replace(kd, v) - jobf.write(line) - finally: - jobf.close() - tplf.close() - append_megui_joblist(jobname) - -JOB_REO = re.compile("job(\d+)\.xml") - -def get_next_jobnumber(job_dir): - max_jn = 0 - for job_fn in os.listdir(job_dir): - job_mo = JOB_REO.match(job_fn) - if job_mo: - jn = int(job_mo.group(1)) - if jn > max_jn: - max_jn = jn - return max_jn + 1 - - -def usage(script_name): - print - print "usage: %s [-t template] [-j job_dir] [-z temp_dir] [src_dir [dst_dir]]" % (script_name,) - print """ - src_dir source directory of original MPG files - dest_dir destination directory for compressed video -options: - -h, --help show this help message and exit - -t, --template job template file - -j, --job_dir megui job directory - -z, --temp_dir working directory for megui -""" - - -if __name__ == "__main__": - sys.exit(main(sys.argv)) - \ No newline at end of file diff --git a/src/megui/create_megui_jobs.bat b/src/megui/create_megui_jobs.bat deleted file mode 100644 index 783d07b..0000000 --- a/src/megui/create_megui_jobs.bat +++ /dev/null @@ -1,5 +0,0 @@ -@echo off -rem determine directory of script. may be different from current working directory -for /F %%I IN ("%0") do set BIN_DIR=%%~dpI -rem expect python.exe in the PATH -python.exe %BIN_DIR%\create_jobs.py %* diff --git a/src/megui/job_divx_template.xml b/src/megui/job_divx_template.xml deleted file mode 100644 index 78f409a..0000000 --- a/src/megui/job_divx_template.xml +++ /dev/null @@ -1,154 +0,0 @@ - - - - - $mpg_input$ - $d2v_output$ - - false - false - 1 - - - - - - - c0 - 192 - MPEG-PS - 0 - MP2 - 2 channels - 48.0 KHz - - - - - - ::192:: - - - 0 - false - false - StereoDownmix - ABR - 128 - true - 0 - false - 100 - 50 - - 0 - 0 - CBR - - - - false - false - true - false - true - false - 640 - AVI - - - - - none - false - - Lanczos - MinimalNoise - false - false - false - false - true - false - - - 4 - 1200 - 250 - 2 - 1 - 31 - true - false - true - true - 15 - - XVID - DIVX - DX50 - MP4V - - .stats - - - 0 - 2 - 1 - 1200 - 6 - 4 - 1 - 31 - 1 - 31 - 162 - 0 - 100 - 1 - 20 - 5 - 5 - 5 - 30 - 15 - 16 - 100 - 0 - 0 - 0 - 0 - 0 - 0 - false - false - true - true - true - true - false - true - false - 0 - H.263 - - 1 - - $final_output$ - - false - - - - - $jobname$ - WAITING - 0001-01-01T00:00:00 - 0001-01-01T00:00:00 - \ No newline at end of file diff --git a/src/oauth-rest/get-xml.py b/src/oauth-rest/get-xml.py new file mode 100644 index 0000000..1f533c6 --- /dev/null +++ b/src/oauth-rest/get-xml.py @@ -0,0 +1,43 @@ +#!python3 +# encoding: utf8 + +# Copyright (C) 2017 Andreas Balogh +# Published under the MIT LICENCE, See LICENCE for details. + +""" +An oauth authenticated web request +""" + +import requests +import requests.auth + +# Request a token +client_auth = requests.auth.HTTPBasicAuth('p-jcoLKBynTLew', 'gko_LXELoV07ZBNUXrvWZfzE3aI') +post_data = {"grant_type": "password", "username": "reddit_bot", "password": "snoo"} +headers = {"User-Agent": "ChangeMeClient/0.1 by YourUsername"} +response = requests.post("https://www.reddit.com/api/v1/access_token", + auth=client_auth, data=post_data, headers=headers) +response.json() + +# Use the token +headers = {"Authorization": "bearer fhTdafZI-0ClEzzYORfBSCR7x3M", + "User-Agent": "ChangeMeClient/0.1 by YourUsername"} +response = requests.get("https://oauth.reddit.com/api/v1/me", headers=headers) +response.json() + + +import urllib.request +DATA = "grant_type=password&username={user}&password={pwd}".format([user, pwd]) +req = urllib.request.Request(url="https://www.reddit.com/api/v1/access_token", + data=DATA, + method='PUT') +with urllib.request.urlopen(req) as f: + pass +print(f.status) +print(f.reason) +req = urllib.request.Request(url="https://www.reddit.com/api/v1/access_token") +req.add_header("Authorization", "bearer {0}".format(token)) +with urllib.request.urlopen(req) as f: + pass +print(f.status) +print(f.reason) diff --git a/src/pfc/pfc_asyncio.py b/src/pfc/pfc_asyncio.py new file mode 100644 index 0000000..1dde3e3 --- /dev/null +++ b/src/pfc/pfc_asyncio.py @@ -0,0 +1,311 @@ +#!python3 +# Created 2018 Andreas Balogh + +""" price feed client with asyncio + +Coroutine based price feed reader. Parses messages, filters and prints. + +tk GUI & async: https://stackoverflow.com/questions/49958180/using-async-await-keywords-with-tk-after-method-of-tkinter +""" + +from argparse import ArgumentParser +import concurrent.futures +import asyncio +import datetime +import logging +import os +import re +import sys +import collections +import datetime as dt +from _socket import AF_INET +import queue + +LOG = logging.getLogger() + +ONE_SECOND = datetime.timedelta(seconds=1) +ONE_MINUTE = datetime.timedelta(seconds=60) + +# queue for GUI to asyncio messages + + +class PriceFeedClient: + + def __init__(self, host, port, pfcq, guiq, keycol): + self.guiq = guiq + self.pfcq = pfcq + self.host = host + self.port = port + self.keycol = int(keycol) + LOG.info(f"keycol: {keycol}") + self.loop = asyncio.get_event_loop() + # status + self.feed_statd = PriceFeedClient.__init_statd() + self.sym_statd = collections.defaultdict(PriceFeedClient.__init_statd) + self.tps_history = list() + # ticker feed + self.filter = None + self.ticker = list() + self.last_symbol = None + self.ticks_per_block = 10 + # log subsystem + self.log_fp = None + self.log_stopdt = None + + @staticmethod + def __init_statd(): + statd = dict() + statd['last_tick'] = dt.datetime.now() + statd['sec_interval_ticks'] = 0 + statd['prev_sec_interval_ticks'] = 0 + statd['current_sec'] = statd['last_tick'].replace(microsecond=0) + statd['sec_count'] = 0 + statd['ticks_per_sec'] = 0.0 + return statd + + async def stream_reader(self): + # (family, type, proto, canonname, sockaddr) = getaddrinfo(host, port, family=0, type=0, proto=0, flags=0) + # AbstractEventLoop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0) + host_details = await self.loop.getaddrinfo(self.host, self.port, family=AF_INET) + (_, _, _, _, sockaddr) = host_details[0] + LOG.info(f"host: {sockaddr[0]}, port: {sockaddr[1]}") + try: + reader, _ = await asyncio.open_connection(sockaddr[0], sockaddr[1]) + except ConnectionRefusedError as e: + # LOG.exception(e) + self.guiq.put(("set_status", (f"{e}",), {})) + return + except: + raise + self.guiq.put(("set_status", (f"connected to host: {self.host}:{self.port}, IP: {sockaddr[0]}",), {})) + # tt = bytes.maketrans(b"\x00\x0c", b"|\n") + while True: + # read price feed line + try: + bytes_ = await reader.readuntil(separator=b'\x0c') + except concurrent.futures._base.CancelledError: + LOG.info("await reader.readuntil() aborted, terminating") + break + except Exception as e: + LOG.exception(e) + self.guiq.put(("set_status", (f"{e}",), {})) + break + line = bytes_.decode() + s = line.rstrip() + flds = s.split("\x00") + # calculate feed statistics + self.update_statd(self.feed_statd) + # calculate symbol statistics + if len(flds) > self.keycol: + symbol = flds[self.keycol] + statd = self.sym_statd[symbol] + self.update_statd(statd) + if len(self.ticker) < self.ticks_per_block or self.log_fp: + fldstr = "|".join(flds) + now = dt.datetime.now() + ts = now.strftime("%H:%M:%S.%f") + tick = ":".join((ts, fldstr)) + if self.filter is not None: + mo = self.filter.search(tick) + if mo is not None: + self.dump_tick(tick) + self.last_symbol = symbol + else: + self.dump_tick(tick) + + def dump_tick(self, tick): + if len(self.ticker) < self.ticks_per_block: + self.ticker.append(tick) + if self.log_fp: + line = "".join((tick, "\n")) + self.log_fp.write(line) + + @staticmethod + def update_statd(statd): + now = dt.datetime.now() + now_sec = now.replace(microsecond=0) + statd['last_tick'] = now + # update ticks per sec + statd['sec_interval_ticks'] = statd['sec_interval_ticks'] + 1 + if now_sec != statd['current_sec']: + if statd['sec_count'] < 60: + statd['sec_count'] = statd['sec_count'] + 1 + a = statd['sec_count'] - 1 + seconds_since_last = (now_sec - statd['current_sec']).total_seconds() + b = 1 / seconds_since_last + statd['ticks_per_sec'] = (a * statd['ticks_per_sec'] + b * statd['sec_interval_ticks']) / statd['sec_count'] + statd['prev_sec_interval_ticks'] = statd['sec_interval_ticks'] + statd['sec_interval_ticks'] = 0 + statd['current_sec'] = now_sec + + async def histogram_sender(self): + while True: + # tick update latency + latency_histogram = collections.defaultdict(int) + now = dt.datetime.now() + now_sec = now.replace(microsecond=0) + for statd in self.sym_statd.values(): + ltd = now_sec - statd['current_sec'] + latency = int(ltd.total_seconds()) + latency_histogram[latency] = latency_histogram[latency] + 1 + self.guiq.put(("upd_latency", (latency_histogram,), {})) + await asyncio.sleep(2) + LOG.info(f"done") + + async def guiq_sender(self): + while True: + # update feed and symbols statistics once per sec + self.update_statd(self.feed_statd) + # for statd in self.sym_statd.values(): + # self.update_statd(statd) + # tps history + self.tps_history.insert(0, self.feed_statd['prev_sec_interval_ticks']) + try: + del self.tps_history[120:] + except: + pass + self.guiq.put(("upd_tps_history", (self.tps_history,), {})) + # feed tick flow (in Hz) + statd = self.feed_statd + symbol_count = len(self.sym_statd) + tick_flow = statd['ticks_per_sec'] + ticks_per_interval = statd['prev_sec_interval_ticks'] + # print(f"{statd['last_tick']}, tps: {statd['ticks_per_sec']:,.0f}, flow: {tick_flow:,.2f}") + self.guiq.put(("upd_feed_stats", (symbol_count, tick_flow, ticks_per_interval, statd['last_tick']), {})) + # symbol tick flow + if len(self.ticker) > 0: + self.guiq.put(("update_ticker", (list(self.ticker),), {})) + # symbol stats + if self.filter and self.last_symbol: + statd = self.sym_statd[self.last_symbol] + tpm = statd['ticks_per_sec'] * 60 + last_symbol_tick = statd['last_tick'] + self.guiq.put(("upd_symbol_stats", (self.last_symbol, tpm, last_symbol_tick), {})) + self.ticker = list() + if self.log_fp: + rtd = self.log_stopdt - dt.datetime.now() + remaining = int(rtd.total_seconds()) + if remaining > 0: + self.guiq.put(("set_log_remaining", (f"{remaining}",), {})) + else: + self.abort_log() + await asyncio.sleep(1) + LOG.info(f"done") + + async def guiq_printer(self): + # for command line and debugging + while True: + try: + cmd, args, kwargs = self.guiq.get_nowait() + print(cmd, args, kwargs) + except queue.Empty: + pass + await asyncio.sleep(1) + + async def pfcq_reader(self): + while True: + cmd, args, kwargs = await self.pfcq.get() + try: + fn = getattr(self, cmd) + fn(*args, **kwargs) + except Exception as e: + print(cmd, args, kwargs) + LOG.exception(e) + + def set_filter(self, text): + if len(text) > 0: + try: + self.filter = re.compile(re.escape(text)) + except: + self.filter = None + self.guiq.put(("clear_symbol_stats", (), {})) + else: + self.filter = None + self.guiq.put(("clear_symbol_stats", (), {})) + + def set_ticks_per_block(self, n): + self.ticks_per_block = n + + def start_log(self, fn, duration): + if self.log_fp is None: + try: + self.log_fp = open(fn, "w") + self.log_stopdt = dt.datetime.now() + dt.timedelta(seconds=duration) + LOG.debug(f"logging activated until {self.log_stopdt}") + self.guiq.put(("set_log_remaining", (f"{duration}",), {})) + # self.loop.set_debug(True) + except Exception as e: + LOG.exception(e) + self.guiq.put(("set_status", (f"{e}",), {})) + self.log_fp = None + self.guiq.put(("set_log_remaining", ("",), {})) + else: + LOG.warn(f"logging still active until {self.log_stopdt}") + + def abort_log(self): + if self.log_fp: + # self.loop.set_debug(False) + self.log_fp.close() + self.log_fp = None + self.log_stopdt = None + self.guiq.put(("set_log_remaining", ("",), {})) + + +async def cancel_reader(loop, tasks, cancel_evt): + # wait for event from main thread to cancel + LOG.info(f"start cancel_reader, waiting for event") + # threading synchronisation primitives do block thus must run in an executor thread + await loop.run_in_executor(None, cancel_evt.wait) + LOG.info(f"cancelling stream_reader") + for task in tasks: + task.cancel() + + +def create_stream_reader(loop, cancel_evt, *args): + pfc = PriceFeedClient(*args) + # this function runs in the asyncio thread + tasks = [ loop.create_task(pfc.stream_reader()), + loop.create_task(pfc.guiq_sender()), + loop.create_task(pfc.histogram_sender()), + loop.create_task(pfc.pfcq_reader()) ] + LOG.info(f"created stream reader") + # since the tasks can not be returned as a result through call_soon_threadsafe + # and cancellation must occor from the asyncio thread + # we embed the task.cancel in an async function to wait for a threading Event triggered from the main GUI thread + loop.create_task(cancel_reader(loop, tasks, cancel_evt)) + + +def asyncio_run(loop): + asyncio.set_event_loop(loop) + try: + loop.run_forever() + finally: + # https://docs.python.org/3.6/library/asyncio-eventloop.html?highlight=asyncgen#asyncio.AbstractEventLoop.shutdown_asyncgens + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + + +def cli(argv=None): + # command line interface + if argv is None: + argv = sys.argv + script_name = os.path.basename(argv[0]) + LOG.info(f"Welcome to {script_name}, created 2009, updated 2018, Andreas Balogh") + # main + loop = asyncio.get_event_loop() + pfcq = asyncio.Queue() + guiq = queue.Queue() + pfc = PriceFeedClient("fracbxolappu2.de.db.com", 20354, pfcq, guiq, "1") + # pfc = PriceFeedClient("fracbxolappu8.de.db.com", 20377, q) + loop.create_task(pfc.stream_reader()) + loop.create_task(pfc.guiq_printer()) + loop.create_task(pfc.guiq_sender()) + loop.create_task(pfc.histogram_sender()) + asyncio_run(loop) + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, + format='%(asctime)s.%(msecs)03i [%(thread)i] %(levelname).4s %(funcName)10s: %(message)s', + datefmt='%H:%M:%S') + sys.exit(cli()) diff --git a/src/pfc/pfc_gui.py b/src/pfc/pfc_gui.py new file mode 100644 index 0000000..048ee54 --- /dev/null +++ b/src/pfc/pfc_gui.py @@ -0,0 +1,617 @@ +#!python3 +# Created 2018 Andreas Balogh + +""" XOL price feed client + +Coroutine based multithreaded XOL price feed reader. +Features: +* quick one click switching between feeds +* displays messages in ticker window +* allows to filter in ticker text +* logging facility. Saves price feed to disk. +* tick flow history +* product tick latency histogram + +See also: +http://effbot.org/tkinterbook/ +https://infohost.nmt.edu/tcc/help/pubs/tkinter/web/index.html +https://tkdocs.com/tutorial/index.html + +How to install? +1. install Anaconda from Automated Software Distribution, potentially one version of the Windows Runtime Library is needed +2. set proxy environment variables (optional) + set http_proxy=http://surf-proxy.intranet.db.com:8080 + set https_proxy=http://surf-proxy.intranet.db.com:8080 +3. create configuration file in c:/Users//pricefeed.ini + [defaults] + # see https://docs.python.org/3.6/library/os.path.html#os.path.expanduser for details + logpath=~\Documents + duration=10 + + [feedname1] + host=fracbxolappu2.de.db.com + port=20354 + keycol=1 + + [feedname2] + ... +""" + +import asyncio +import configparser +import logging +from pathlib import Path +import queue +import sys +import re +import threading +from tkinter import ttk + +import datetime as dt +import pfc_asyncio +import tkinter as tk +import tkinter.font as tkFont +import tkinter.messagebox +import tkinter.filedialog + +LOG = logging.getLogger() + + +class TpsHistory(tk.Canvas): + + def __init__(self, parent, *args, **kwargs): + super().__init__(parent, *args, **kwargs) + self.bar_w = 2 + + def plot(self, tps_list): + w = self.winfo_width() - 4 + h = self.winfo_height() - 4 + # LOG.debug(f"size {w}x{h} pixels") + xd0 = 20 + yd0 = 10 + xd1 = w - 5 + yd1 = h - 15 + # LOG.debug(f"border ({xa0},{ya0}), ({xa1},{ya1})") + tps_max = max(tps_list) + axis_max = int(tps_max / 1000) + 1 + diag_max = axis_max * 1000 + self.delete("frame") + self.delete("axis") + self.create_rectangle(xd0, yd0, xd1 + 1, yd1 + 1, tags="frame") + self.create_text(xd0 - 10, yd0, justify=tk.RIGHT, text=f"{axis_max}k", tags="axis") + self.create_text(xd0 - 10, yd1, justify=tk.RIGHT, text="0", tags="axis") + for i in (90, 60, 30, 0): + self.create_text(xd1 - i * self.bar_w, yd1 + 10, justify=tk.CENTER, text=f"{i}", tags="axis") + self.delete("bar") + # print(f"tps_list has {len(tps_list)} item") + xa0 = xd0 + 1 + ya0 = yd0 + 1 + xa1 = xd1 + ya1 = yd1 + for n, tps in enumerate(tps_list): + xb0 = xa1 - (n + 1) * self.bar_w + 1 + if xb0 <= xa0: + break + xb1 = xa1 - n * self.bar_w + yb0 = ya1 - max(0, (tps / diag_max) * (ya1 - ya0)) + yb1 = ya1 + # print(xb0, xb1, yb0, yb1) + if yb0 != yb1: + self.create_rectangle(xb0, yb0, xb1 + 1, yb1 + 1, tags="bar", width=0, fill="blue") + + +class LatencyHistogram(tk.Canvas): + + def __init__(self, parent, *args, **kwargs): + super().__init__(parent, *args, **kwargs) + self.bar_w = 1 + + def plot(self, histogram): + tps_list = [0] * 210 + for s, n in histogram.items(): + if s >= 210: + continue + tps_list[s] = n + # diagram geometry + w = self.winfo_width() - 4 + h = self.winfo_height() - 4 + # LOG.debug(f"size {w}x{h} pixels") + xd0 = 20 + yd0 = 10 + xd1 = w - 5 + yd1 = h - 15 + # LOG.debug(f"border ({xa0},{ya0}), ({xa1},{ya1})") + tps_max = max(tps_list) + axis_max = int(tps_max / 1000) + 1 + diag_max = axis_max * 1000 + self.delete("frame") + self.delete("axis") + self.create_rectangle(xd0, yd0, xd1 + 1, yd1 + 1, tags="frame") + self.create_text(xd0 - 10, yd0, justify=tk.RIGHT, text=f"{axis_max}k", tags="axis") + self.create_text(xd0 - 10, yd1, justify=tk.RIGHT, text="0", tags="axis") + for i in (0, 30, 60, 90, 120, 150, 180): + self.create_text(xd0 + i * self.bar_w, yd1 + 10, justify=tk.CENTER, text=f"{i}", tags="axis") + self.delete("bar") + # print(f"tps_list has {len(tps_list)} item") + xa0 = xd0 + 1 + ya0 = yd0 + 1 + xa1 = xd1 + ya1 = yd1 + for n, tps in enumerate(tps_list): + xb0 = xa0 + n * self.bar_w + xb1 = xa0 + (n + 1) * self.bar_w - 1 + if xb1 >= xa1: + continue + yb0 = ya1 - max(0, (tps / diag_max) * (ya1 - ya0)) + yb1 = ya1 + # print(xb0, xb1, yb0, yb1) + if yb0 != yb1: + self.create_rectangle(xb0, yb0, xb1 + 1, yb1 + 1, tags="bar", width=0, fill="red") + + +class LatencyCumulative(tk.Canvas): + + def __init__(self, parent, *args, **kwargs): + super().__init__(parent, *args, **kwargs) + self.bar_w = 1 + + def draw_axis(self): + pass + + def plot(self, histogram): + tps_list = [0] * 210 + for s, n in histogram.items(): + if s >= 210: + continue + tps_list[s] = n + # diagram geometry + w = self.winfo_width() - 4 + h = self.winfo_height() - 4 + # LOG.debug(f"size {w}x{h} pixels") + xd0 = 20 + yd0 = 10 + xd1 = w - 5 + yd1 = h - 15 + # LOG.debug(f"border ({xa0},{ya0}), ({xa1},{ya1})") + tps_max = max(tps_list) + axis_max = int(tps_max / 1000) + 1 + diag_max = axis_max * 1000 + self.delete("axis") + self.create_rectangle(xd0, yd0, xd1 + 1, yd1 + 1, tags="axis") + self.create_text(xd0 - 10, yd0, justify=tk.RIGHT, text=f"{axis_max}k", tags="axis") + self.create_text(xd0 - 10, yd1, justify=tk.RIGHT, text="0", tags="axis") + for i in (0, 30, 60, 90, 120, 150, 180): + self.create_text(xd0 + i * self.bar_w, yd1 + 10, justify=tk.CENTER, text=f"{i}", tags="axis") + self.delete("bar") + # print(f"tps_list has {len(tps_list)} item") + xa0 = xd0 + 1 + ya0 = yd0 + 1 + xa1 = xd1 + ya1 = yd1 + for n, tps in enumerate(tps_list): + xb0 = xa0 + n * self.bar_w + xb1 = xa0 + (n + 1) * self.bar_w - 1 + if xb1 >= xa1: + continue + yb0 = ya1 - max(0, (tps / diag_max) * (ya1 - ya0)) + yb1 = ya1 + # print(xb0, xb1, yb0, yb1) + if yb0 != yb1: + self.create_rectangle(xb0, yb0, xb1 + 1, yb1 + 1, tags="bar", width=0, fill="red") + + +class LogAs(tk.Toplevel): + + # see http://effbot.org/tkinterbook/tkinter-dialog-windows.htm + def __init__(self, parent, feed, **kwargs): + super().__init__(parent, **kwargs) + self.transient(parent) + self.parent = parent + self.title("Log as") + # setup widgets + self.path = tk.StringVar() + self.duration = tk.StringVar() + self.__create_widgets() + now = dt.datetime.now() + # can't use tt = str.maketrans(r"/\", "__") + # r"\" is not a valid string literal (even a raw string cannot end in an odd number of backslashes). + # Specifically, a raw literal cannot end in a single backslash (since the backslash would escape the following quote character). + # see https://docs.python.org/3.6/reference/lexical_analysis.html#literals + tt = str.maketrans(r"\/", "__") + fn = f"{feed}-{now:%Y%m%d}-{now:%H%M%S}.log".translate(tt) + fp = Path(parent.defaults["logpath"], fn).expanduser() + self.path.set(str(fp)) + self.duration.set(parent.defaults["duration"]) + # housekeeping + self.protocol("WM_DELETE_WINDOW", self.cancel) + self.geometry("+{}+{}".format(parent.winfo_rootx() + 50, parent.winfo_rooty() + 50)) + self.wait_window(self) + + def __create_widgets(self): + # destination selection + self.fr1 = ttk.Frame(self) + self.fr1.pack(fill=tk.BOTH, expand=1, padx=5, pady=5) + vcmd = (self.register(self.validate_numeric), '%P') + self.en11 = ttk.Entry(self.fr1, width=60, textvariable=self.path, validate='all', validatecommand=vcmd) + self.en11.pack(side=tk.LEFT) + self.bu11 = ttk.Button(self.fr1, text="Browse", command=self.browse) + self.bu11.pack(side=tk.LEFT) + # duration + self.fr2 = ttk.Frame(self) + self.fr2.pack() + self.la21 = ttk.Label(self.fr2, text='Duration [s]') + self.la21.pack(side=tk.LEFT) + self.en21 = ttk.Entry(self.fr2, width=20, textvariable=self.duration) + self.en21.pack(side=tk.RIGHT) + # start + self.fr3 = ttk.Frame(self) + self.fr3.pack() + self.bu31 = ttk.Button(self.fr3, text="Cancel", command=self.cancel) + self.bu31.pack(side=tk.LEFT) + self.bu32 = ttk.Button(self.fr3, text="Start", command=self.start) + self.bu32.pack(side=tk.LEFT) + + def validate_numeric(self, new_text): + return new_text.isdigit() + + def browse(self): + fp = Path(self.path.get()) + fn = tkinter.filedialog.asksaveasfilename(defaultextension=".log", filetypes=(("LOG", "*.log"),), + initialdir=fp.parent, initialfile=fp.name, + parent=self, title="Save log as") + if len(fn) > 0: + self.path.set(fn) + + def start(self): + fn = self.path.get() + duration = int(self.duration.get()) + self.parent.loop.call_soon_threadsafe(self.parent.pfcq.put_nowait, ("start_log", (fn, duration), {})) + self.cancel() + + def cancel(self): + self.parent.focus_set() + self.destroy() + + +class App(tk.Tk): + + def __init__(self, loop): + super().__init__() + self.option_add('*tearOff', False) + self.title("XOL price feed monitor") + self.protocol('WM_DELETE_WINDOW', self.file_exit) + self.text_lines = 15 + self.__create_menubar() + self.__create_widgets() + self.pfcq = asyncio.Queue(loop=loop) + self.guiq = queue.Queue() + self.cancel_evt = None + self.active_feed = -1 + self.loop = loop + self.feeds = dict() + self.defaults = { "duration": 5, + "logpath": r"~\Documents" } + self.last_symbol_tick = None + # start asynchronous initialization + self.la2['text'] = "Loading configuration..." + self.update_idletasks() + self.after_idle(self.load_config) + + def __create_menubar(self): + self.mb = tk.Menu(self) + self.menu_file = tk.Menu(self.mb) + self.menu_about = tk.Menu(self.mb) + self.mb.add_cascade(menu=self.menu_file, label='Pricefeed') + self.mb.add_cascade(menu=self.menu_about, label='Help') + self.menu_file.add_command(label="Log as...", command=self.file_saveas) + self.menu_file.add_command(label='Exit', command=self.file_exit) + self.menu_about.add_command(label='About...', command=self.help_about) + self['menu'] = self.mb + + def __create_widgets(self): + # feed selection list + self.fr0 = ttk.LabelFrame(self, padding=2, borderwidth="2p", text="Feeds") + self.fr0.pack(side=tk.LEFT, anchor=tk.N, fill=tk.Y, padx=5, pady=5) + # entry line + self.lb1 = tk.Listbox(self.fr0, selectmode=tk.SINGLE, exportselection=False, activestyle='none') + self.lb1.pack(fill=tk.BOTH, expand=1) + # feed select and filter + self.fr1 = ttk.Frame(self, padding=5) + self.fr1.pack(side=tk.LEFT, fill=tk.BOTH, expand=1) + # entry line + self.fr11 = ttk.Frame(self.fr1, padding=5) + self.fr11.pack(fill=tk.X) + # self.cb1 = ttk.Combobox(self.fr1) + # self.bu1 = ttk.Button(self.fr11, text="...", command=self.edit_feeds) + # self.bu1.pack(side=tk.LEFT) + self.la1 = ttk.Label(self.fr11, text='Search pattern ') + self.la1.pack(side=tk.LEFT) + self.filter = tk.StringVar() + vcmd = (self.register(self.filter_change), '%V', '%s', '%P') + self.en1 = ttk.Entry(self.fr11, validate='all', validatecommand=vcmd, textvariable=self.filter) + self.en1.pack(side=tk.LEFT) + self.scroll_lock = tk.IntVar() + self.cb11 = ttk.Checkbutton(self.fr11, text="Scroll lock", variable=self.scroll_lock) + self.cb11.pack(side=tk.RIGHT) + self.scroll_lock.set(0) + # status line + self.la2 = ttk.Label(self.fr1, text='Label2') + self.la2.pack(anchor=tk.W) + self.te1 = tk.Text(self.fr1, width=80, height=self.text_lines, wrap=tk.NONE) + self.te1.pack(fill=tk.BOTH, expand=1) + self.te1.bind("", self.__resize_text) + # properties frame + self.fr2 = ttk.Frame(self, padding=5) + self.fr2.pack(side=tk.LEFT, anchor=tk.N, fill=tk.Y) + # properties panel + self.fr3 = ttk.LabelFrame(self.fr2, padding=2, borderwidth="2p", text="Feed") + self.fr3.pack(side=tk.TOP) + # property row + self.fr31 = ttk.Frame(self.fr3, padding=2) + self.fr31.pack(fill=tk.X) + self.la31 = ttk.Label(self.fr31, text='Symbol count') + self.la31.pack(side=tk.LEFT) + self.en31 = ttk.Entry(self.fr31, justify=tk.RIGHT) + self.en31.pack(side=tk.RIGHT) + # property row + self.fr32 = ttk.Frame(self.fr3, padding=2) + self.fr32.pack(fill=tk.X) + self.la32 = ttk.Label(self.fr32, text='tps (60sec avg)') + self.la32.pack(side=tk.LEFT) + self.en32 = ttk.Entry(self.fr32, justify=tk.RIGHT) + self.en32.pack(side=tk.RIGHT) + # property row + self.fr33 = ttk.Frame(self.fr3, padding=2) + self.fr33.pack(fill=tk.X) + self.la33 = ttk.Label(self.fr33, text='tps (last sec)') + self.la33.pack(side=tk.LEFT) + self.en33 = ttk.Entry(self.fr33, justify=tk.RIGHT) + self.en33.pack(side=tk.RIGHT) + # property row + self.fr34 = ttk.Frame(self.fr3, padding=2) + self.fr34.pack(fill=tk.X) + self.la34 = ttk.Label(self.fr34, text='Last tick') + self.la34.pack(side=tk.LEFT) + self.en34 = ttk.Entry(self.fr34, justify=tk.RIGHT) + self.en34.pack(side=tk.RIGHT) + # properties panel + self.fr6 = ttk.LabelFrame(self.fr2, borderwidth="2p", text="Tick history") + self.fr6.pack(side=tk.TOP, fill=tk.X) + self.ca61 = TpsHistory(self.fr6, width=100, height=100) + self.ca61.pack() + # properties panel + self.fr7 = ttk.LabelFrame(self.fr2, borderwidth="2p", text="Latency histogram") + self.fr7.pack(side=tk.TOP, fill=tk.X) + self.ca71 = LatencyHistogram(self.fr7, width=100, height=100) + self.ca71.pack() + # properties panel + self.fr4 = ttk.LabelFrame(self.fr2, padding=2, borderwidth="2p", text="Filtered") + self.fr4.pack(side=tk.TOP, fill=tk.X) + # property row + self.fr41 = ttk.Frame(self.fr4, padding=2) + self.fr41.pack(fill=tk.X) + self.la41 = ttk.Label(self.fr41, text='Symbol') + self.la41.pack(side=tk.LEFT) + self.en41 = ttk.Entry(self.fr41) + self.en41.pack(side=tk.RIGHT) + # property row + self.fr42 = ttk.Frame(self.fr4, padding=2) + self.fr42.pack(fill=tk.X) + self.la42 = ttk.Label(self.fr42, text='tpm (60sec avg)') + self.la42.pack(side=tk.LEFT) + self.en42 = ttk.Entry(self.fr42, justify=tk.RIGHT) + self.en42.pack(side=tk.RIGHT) + # property row + self.fr43 = ttk.Frame(self.fr4, padding=2) + self.fr43.pack(fill=tk.X) + self.la43 = ttk.Label(self.fr43, text='Since last tick') + self.la43.pack(side=tk.LEFT) + self.en43 = ttk.Entry(self.fr43, justify=tk.RIGHT) + self.en43.pack(side=tk.RIGHT) + # properties panel + self.fr5 = ttk.LabelFrame(self.fr2, padding=2, borderwidth="2p", text="Log") + self.fr5.pack(side=tk.TOP, fill=tk.X) + # property row + self.fr51 = ttk.Frame(self.fr5, padding=2) + self.fr51.pack(fill=tk.X) + self.la51 = ttk.Label(self.fr51, text='Remaining [s]') + self.la51.pack(side=tk.LEFT) + self.en51 = ttk.Entry(self.fr51, justify=tk.RIGHT) + self.en51.pack(side=tk.RIGHT) + # property row + self.fr52 = ttk.Frame(self.fr5, padding=2) + self.fr52.pack() + self.bu52a = ttk.Button(self.fr52, text='LogAs...', command=self.file_saveas) + self.bu52a.pack(side=tk.LEFT) + self.bu52b = ttk.Button(self.fr52, text='Abort', command=self.abort_log) + self.bu52b.pack(side=tk.LEFT) + + def edit_feeds(self): + LOG.info("feeds") + + def file_saveas(self): + feed = self.lb1.get(tk.ACTIVE) + self.logas = LogAs(self, feed) + + def abort_log(self): + self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("abort_log", (), {})) + + def file_exit(self): + self.stop_pricefeed() + # allow threads to terminate + self.after(100, self.quit) + + def help_about(self): + tk.messagebox.showinfo("About", "Created by Andreas Balogh, 2018", parent=self) + + def __resize_text(self, event): + # see https://stackoverflow.com/questions/49411359/how-to-get-the-height-of-the-tkinter-text-widget-after-resizing + font = tkFont.nametofont(self.te1.cget("font")) + width = int(event.width / font.measure("0")) + height = int(event.height / font.metrics('linespace')) + 1 + LOG.debug(f"new size {width}x{height} chars, {event.width}x{event.height} pixels") + self.text_lines = height + self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("set_ticks_per_block", (self.text_lines,), {})) + if False: + print(event) + print(font.actual()) + print(font.measure("0")) + print(font.metrics()) + + def filter_change(self, event, old, new): + if old != new: + # LOG.debug(f"event: {event}, old: {old}, new: {new}") + self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("set_filter", (new,), {})) + return True + + def populate_feeds(self): + for feed in self.feeds: + self.lb1.insert(tk.END, feed) + self.lb1.selection_set(0) + self.lb1.activate(0) + + def monitor_feed_change(self): + # switch to current price feed + current_feed = self.lb1.get(tk.ACTIVE) + if self.active_feed != current_feed: + LOG.info(f"Price feed changed to '{current_feed}'...") + self.stop_pricefeed() + self.start_pricefeed() + self.active_feed = current_feed + self.after(500, self.monitor_feed_change) + + def update_ticker(self, ticks): + lock = self.scroll_lock.get() + if not lock: + for tick in ticks: + self.te1.insert("1.0", f"{tick}\n") + n = self.text_lines + 1 + self.te1.delete(f"{n}.0", tk.END) + + def set_status(self, s): + self.la2['text'] = s + + def upd_tps_history(self, tps_history): + self.ca61.config(width=self.fr6.winfo_width() - 15, height=100) + self.ca61.plot(tps_history) + + def upd_latency(self, latency_histogram): + self.ca71.config(width=self.fr7.winfo_width() - 15, height=100) + self.ca71.plot(latency_histogram) + + def upd_feed_stats(self, symbol_count, ticks_per_sec, ticks_per_interval, last_tick): + App.set_entry(self.en31, f"{symbol_count:,}") + App.set_entry(self.en32, f"{ticks_per_sec:,.0f}") + App.set_entry(self.en33, f"{ticks_per_interval:,.0f}") + App.set_entry(self.en34, f"{last_tick:%H:%M:%S.%f}") + if self.last_symbol_tick: + since_last_symbol_tick = dt.datetime.now() - self.last_symbol_tick + App.set_entry(self.en43, f"{since_last_symbol_tick.seconds} sec") + + def upd_symbol_stats(self, symbol, ticks_per_min, last_symbol_tick): + App.set_entry(self.en41, f"{symbol}") + App.set_entry(self.en42, f"{ticks_per_min:,.1f}") + self.last_symbol_tick = last_symbol_tick + App.set_entry(self.en43, f"< 1 sec") + + def clear_symbol_stats(self): + App.set_entry(self.en41, "") + App.set_entry(self.en42, "") + App.set_entry(self.en43, "") + self.last_symbol_tick = None + + def set_log_remaining(self, s): + App.set_entry(self.en51, s) + if len(s) > 0: + self.bu52a.state(["disabled"]) + self.bu52b.state(["!disabled"]) + else: + self.bu52a.state(["!disabled"]) + self.bu52b.state(["disabled"]) + + @staticmethod + def set_entry(w, text): + w.delete(0, tk.END) + w.insert(0, text) + + def guiq_reader(self): + # process price feed data + while True: + try: + cmd, args, kwargs = self.guiq.get_nowait() + except queue.Empty: + break + try: + fn = getattr(self, cmd) + fn(*args, **kwargs) + except Exception as e: + print(cmd, args, kwargs) + LOG.exception(e) + self.after(500, self.guiq_reader) + + def start_pricefeed(self): + feed_name = self.lb1.get(tk.ACTIVE) + feed = self.feeds[feed_name] + host_port = (feed['host'], int(feed['port'])) + keycol = feed["keycol"] + self.set_status(f"Starting pricefeed {feed_name} {host_port}...") + self.cancel_evt = threading.Event() + self.loop.call_soon_threadsafe(pfc_asyncio.create_stream_reader, self.loop, self.cancel_evt, *host_port, self.pfcq, self.guiq, keycol) + # sync variables with new feed + self.scroll_lock.set(0) + filter_ = self.filter.get() + self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("set_filter", (filter_,), {})) + # delete ticker screen + self.te1.delete("1.0", tk.END) + self.set_log_remaining("") + self.after(500, self.guiq_reader) + + def stop_pricefeed(self): + if self.cancel_evt is not None: + print(f"stopping pricefeed") + self.cancel_evt.set() + + def load_config(self): + fp = Path(r'~\pricefeed.ini').expanduser() + config = configparser.ConfigParser() + config.read(str(fp)) + for feed_name in config.sections(): + sd = config[feed_name] + if feed_name == "defaults": + if "logpath" in sd: + self.defaults["logpath"] = sd["logpath"] + if "duration" in sd: + if sd["duration"].isdigit(): + self.defaults["duration"] = sd["duration"] + else: + LOG.error(f"{sd['duration']} not numeric, using default") + else: + feed = dict() + feed['host'] = sd['host'] + feed['port'] = sd['port'] + feed['keycol'] = sd['keycol'] + self.feeds[feed_name] = feed + self.populate_feeds() + self.after_idle(self.monitor_feed_change) + + +def gui(): + # create asyncio loop + loop = asyncio.new_event_loop() + # setup tk.Tk() and toplevel window + root = App(loop) + # start async loop in separate thread + threading.Thread(target=pfc_asyncio.asyncio_run, args=(loop,)).start() + # tk GUI loop in main thread + root.mainloop() + # stop asyncio + loop.call_soon_threadsafe(loop.stop) + # destructor for tk resources + root.destroy() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s.%(msecs)03i [%(thread)i] %(levelname).4s %(funcName)10s: %(message)s', + datefmt='%H:%M:%S') + sys.exit(gui()) diff --git a/src/publisher/coop_eventlet.py b/src/publisher/coop_eventlet.py deleted file mode 100644 index edb07b8..0000000 --- a/src/publisher/coop_eventlet.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright (c) 2011 Andreas Balogh -# See LICENSE for details. - -""" prc_publish.eventlet - -Price Publisher -""" - -# imports - -from argparse import ArgumentParser -import eventlet -import logging -import os -import random -import sys -import cPickle as pickle - -LOG = logging.getLogger() - -# definitions - -def main(argv=None): - if argv is None: - argv = sys.argv - LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) - # parse options and arguments - parser = ArgumentParser(description="Price Publisher") - parser.add_argument("-f", "--file", dest="filename", - help="read configuration from %(dest)s") - parser.add_argument("-p", "--port", default=8001, type=int, - help="server port [default: %(default)s") - args = parser.parse_args() - print args - # create product dict - prds = { } - pubqs = [] - for n in range(10): - key = "AB" + "{:04}".format(n) - prds["AB" + key] = Pricer(key) - # start one thread for price changes - eventlet.spawn(controller, prds, pubqs) - address = ('localhost', 8010) - eventlet.spawn(listener, address, pubqs) - # main thread runs eventlet loop - while True: - eventlet.sleep(10) - - -def listener(address, pubqs): - sock = eventlet.listen(address) - while True: - LOG.info('waiting for connection on %s', address) - cx, remote = sock.accept() - LOG.info("accepting connection from %s", remote) - inq = eventlet.queue.Queue() - pubqs.append(inq) - eventlet.spawn(receiver, cx) - eventlet.spawn(publisher, pubqs, inq, cx) - - -def publisher(pubqs, inq, cx): - LOG.info("Publisher running") - try: - while True: - # what happens if client does not pick up - # what happens if client dies during queue wait - try: - with eventlet.Timeout(1): - item = inq.get() - s = pickle.dumps(item, pickle.HIGHEST_PROTOCOL) - # s = "{0[0]} {0[1]}\n\r".format(item) - cx.send(s) - except eventlet.Timeout: - # raises IOError if connection lost - cx.fileno() - # if connection closes - except IOError, e: - LOG.info(e) - # make sure to close the socket - finally: - cx.close() - pubqs.remove(inq) - LOG.info("Publisher terminated") - - -def receiver(cx): - LOG.info("Receiver running") - try: - while True: - # what happens if client does not pick up - s = cx.recv(4096) - if not s: - break - LOG.info(s) - # if connection closes - except IOError, e: - LOG.info(e) - # make sure to close the socket - finally: - cx.close() - LOG.info("Receiver terminated") - -def controller(prds, pubqs): - while True: - LOG.info("controller: price update cycle, %i pubqs", len(pubqs)) - Pricer.VOLA = update_vola(Pricer.VOLA) - for prd in prds.values(): - prd.run() - for pubq in pubqs: - pubq.put((prd.name, prd.prc)) - eventlet.sleep(5) - -def update_vola(old_vola): - new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) - return new_vola - -class Pricer(object): - VOLA = 0.01 - def __init__(self, name): - self.name = name - self.prc = random.random() * 100.0 - - def run(self): - self.prc += random.choice((-1, +1)) * self.prc * self.VOLA - - -if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(funcName)10s: %(message)s', - datefmt='%H:%M:%S') - main() diff --git a/src/publisher/coop_gevent.py b/src/publisher/coop_gevent.py deleted file mode 100644 index b94abd5..0000000 --- a/src/publisher/coop_gevent.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright (c) 2011 Andreas Balogh -# See LICENSE for details. - -""" prc_publish.multi_thread - -Price Publisher -""" - -# imports - -from argparse import ArgumentParser -import gevent.socket -import gevent.queue -import gevent -import logging -import os -import random -import socket -import sys - -LOG = logging.getLogger() - -# definitions - -def main(argv=None): - if argv is None: - argv = sys.argv - LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) - # parse options and arguments - parser = ArgumentParser(description="Price Publisher") - parser.add_argument("-f", "--file", dest="filename", - help="read configuration from %(dest)s") - parser.add_argument("-p", "--port", default=8001, type=int, - help="server port [default: %(default)s") - args = parser.parse_args() - print args - # create product dict - threads = [] - prds = { } - pubqs = [] - for n in range(10): - key = "AB" + "{:04}".format(n) - prds["AB" + key] = Pricer(key) - # start one thread for price changes - gevent.spawn(controller, prds, pubqs) - address = ('localhost', 8010) - gevent.spawn(listener, address, pubqs) - # main thread runs gevent loop - while True: - gevent.sleep(10) - - -def listener(address, pubqs): - sock = gevent.socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(address) - sock.listen(1) - while True: - LOG.info('waiting for connection on %s', address) - cx, address = sock.accept() - LOG.info("accepting connection from %s", address) - inq = gevent.queue.Queue() - pubqs.append(inq) - gevent.spawn(receiver, cx) - gevent.spawn(publisher, pubqs, inq, cx) - - -def publisher(pubqs, inq, cx): - LOG.info("Publisher running") - try: - while True: - # what happens if client does not pick up - # what happens if client dies during queue wait - item = inq.get() - # TODO: pickle - s = "{0[0]} {0[1]}\n\r".format(item) - cx.send(s) - # if connection closes - except IOError, e: - LOG.info(e) - # make sure to close the socket - finally: - cx.close() - pubqs.remove(inq) - LOG.info("Publisher terminated") - - -def receiver(cx): - LOG.info("Receiver running") - try: - while True: - # what happens if client does not pick up - s = cx.recv(4096) - if not s: - break - LOG.info(s) - # if connection closes - except IOError, e: - LOG.info(e) - # make sure to close the socket - finally: - cx.close() - LOG.info("Receiver terminated") - -def controller(prds, pubqs): - while True: - LOG.info("Price update cycle") - Pricer.VOLA = update_vola(Pricer.VOLA) - LOG.debug("controller: %i pubqs", len(pubqs)) - for prd in prds.values(): - prd.run() - for pubq in pubqs: - pubq.put((prd.name, prd.prc)) - gevent.sleep(5) - -def update_vola(old_vola): - new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) - return new_vola - -class Pricer(): - VOLA = 0.01 - def __init__(self, name): - self.name = name - self.prc = random.random() * 100.0 - - def run(self): - self.prc += random.choice((-1, +1)) * self.VOLA - - -if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s', - datefmt='%H:%M:%S') - main() diff --git a/src/publisher/coop_gevent_monkey.py b/src/publisher/coop_gevent_monkey.py deleted file mode 100644 index 0185d9f..0000000 --- a/src/publisher/coop_gevent_monkey.py +++ /dev/null @@ -1,141 +0,0 @@ -# Copyright (c) 2011 Andreas Balogh -# See LICENSE for details. - -""" prc_publish.multi_thread - -Price Publisher -""" - -# imports - -from gevent import monkey -monkey.patch_all() - -from argparse import ArgumentParser -from gevent.queue import Queue -import gevent -import logging -import os -import random -import socket -import sys -import threading - -LOG = logging.getLogger() - -# definitions - -def main(argv=None): - if argv is None: - argv = sys.argv - LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:])) - # parse options and arguments - parser = ArgumentParser(description="Price Publisher") - parser.add_argument("-f", "--file", dest="filename", - help="read configuration from %(dest)s") - parser.add_argument("-p", "--port", default=8001, type=int, - help="server port [default: %(default)s") - args = parser.parse_args() - print args - # create product dict - threads = [] - prds = { } - pubqs = [] - for n in range(10): - key = "AB" + "{:04}".format(n) - prds["AB" + key] = Pricer(key) - # start one thread for price changes - t1 = threading.Thread(target=controller, args=(prds, pubqs)) - t1.start() - address = ('localhost', 8010) - t2 = threading.Thread(target=listener, args=(address, pubqs)) - t2.start() - # main thread runs gevent loop - while True: - gevent.sleep(10) - - -def listener(address, pubqs): - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.bind(address) - sock.listen(1) - while True: - LOG.info('waiting for connection on %s', address) - cx, address = sock.accept() - LOG.info("accepting connection from %s", address) - inq = Queue() - pubqs.append(inq) - rec = threading.Thread(target=receiver, args=(cx,)) - pub = threading.Thread(target=publisher, args=(pubqs, inq, cx)) - rec.start() - pub.start() - - -def publisher(pubqs, inq, cx): - LOG.info("Publisher running") - try: - while True: - # what happens if client does not pick up - # what happens if client dies during queue wait - item = inq.get() - # TODO: pickle - s = "{0[0]} {0[1]}\n\r".format(item) - cx.send(s) - # if connection closes - except IOError, e: - LOG.info(e) - # make sure to close the socket - finally: - cx.close() - pubqs.remove(inq) - LOG.info("Publisher terminated") - - -def receiver(cx): - LOG.info("Receiver running") - try: - while True: - # what happens if client does not pick up - s = cx.recv(4096) - if not s: - break - LOG.info(s) - # if connection closes - except IOError, e: - LOG.info(e) - # make sure to close the socket - finally: - cx.close() - LOG.info("Receiver terminated") - -def controller(prds, pubqs): - while True: - LOG.info("Price update cycle") - Pricer.VOLA = update_vola(Pricer.VOLA) - LOG.debug("controller: %i pubqs", len(pubqs)) - for prd in prds.values(): - prd.run() - for pubq in pubqs: - pubq.put((prd.name, prd.prc)) - gevent.sleep(5) - -def update_vola(old_vola): - new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01) - return new_vola - -class Pricer(): - VOLA = 0.01 - def __init__(self, name): - self.name = name - self.prc = random.random() * 100.0 - self.daemon = True - - def run(self): - self.prc += random.choice((-1, +1)) * self.VOLA - - -if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG, - format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s', - datefmt='%H:%M:%S') - main()