added tk gui template (price feed client)

--HG--
branch : sandbox
This commit is contained in:
baloan
2018-10-19 21:15:53 +02:00
parent 6638d63e7c
commit fb5c9c405a
9 changed files with 971 additions and 748 deletions

View File

@@ -1,183 +0,0 @@
#!/usr/bin/env python
# Copyright (c) 2009 Andreas Balogh
# See LICENSE for details.
""" create MeGUI jobs for Panasonic MPG files """
# system imports
import logging
import sys
import os
import getopt
import re
import xml.dom.minidom as xdm
# local imports
# constants
# globals
LOG = logging.getLogger()
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s %(levelname).3s %(process)d:%(thread)d %(message)s",
datefmt="%H:%M:%S")
# definitions
class Usage(Exception):
pass
class Error(Exception):
pass
def main(argv = [__name__]):
try:
# check for parameters
LOG.info("starting '%s %s'", argv[0], " ".join(argv[1:]))
script_name = os.path.basename(argv[0])
try:
opts, args = getopt.getopt(argv[1:], "ht:j:z:", \
["help", "template", "job_dir", "temp_dir"])
except getopt.error, err:
raise Usage(err)
LOG.debug("opts: %s, args: %s", opts, args)
template = "job_divx_template.xml"
job_dir = "C:\\Program Files\\megui\\jobs"
temp_dir = "e:\\video"
for o, a in opts:
if o in ("-h", "--help"):
usage(script_name)
return 0
elif o in ("-t", "--template"):
template = a
elif o in ("-j", "--job_dir"):
job_dir = a
elif o in ("-z", "--temp_dir"):
temp_dir = a
src_dir = "D:\\Documents\\Raw\\Panasonic SDR-S7\\Video"
dest_dir = "D:\\Documents\\My Videos\\Balogh"
if len(args) == 2:
src_dir = args[0]
dest_dir = args[1]
elif len(args) == 1:
src_dir = args[0]
elif len(args) > 2:
raise Usage("too many arguments")
# call method with appropriate arguments
if src_dir and not os.path.exists(src_dir):
raise Error("Source directory not found [%s], aborting" % (src_dir, ))
if dest_dir and not os.path.exists(dest_dir):
LOG.warn("Destination directory not found [%s]", dest_dir)
LOG.info("Creating destination directory [%s]", dest_dir)
os.makedirs(dest_dir)
cli(src_dir, dest_dir, template, job_dir, temp_dir)
LOG.info("Done.")
return 0
except Error, err:
LOG.error(err)
return 1
except Usage, err:
LOG.error(err)
LOG.info("for usage use -h or --help")
return 2
def cli(src_dir, dest_dir, template, job_dir, tmp_dir):
""" command line interface """
if not os.path.exists(dest_dir):
LOG.error("Destination directory not found [%s]", dest_dir)
raise Error()
for mpg_fn in os.listdir(src_dir):
opts = { }
root, ext = os.path.splitext(mpg_fn)
# skip files which are not mpg video
if ext.lower() != ".mpg":
continue
# determine file paths
mpg_fp = os.path.join(src_dir, mpg_fn)
avi_fp = os.path.join(dest_dir, root + ".avi")
d2v_fp = os.path.join(tmp_dir, root + ".d2v")
opts["mpg_input"] = mpg_fp
opts["final_output"] = avi_fp
opts["d2v_output"] = d2v_fp
# if avi_fn doesn't exist then create job xml
if not os.path.exists(avi_fp):
create_megui_job(template, job_dir, opts)
def append_megui_joblist(jobname):
joblist = "c:\\program files\\megui\\joblists.xml"
# load the xml
inf = open(joblist,"r")
dom = xdm.parse(inf)
inf.close()
# add <string>job220</string> to <mainJobList>
JobListSerializer = dom.getElementsByTagName("JobListSerializer")[0]
mainJobList = JobListSerializer.getElementsByTagName("mainJobList")[0]
string = dom.createElement("string")
text = dom.createTextNode(jobname)
string.appendChild(text)
mainJobList.appendChild(string)
newline = dom.createTextNode("\n ")
mainJobList.appendChild(newline)
# save xml
outf = open(joblist,"w")
dom.writexml(outf)
outf.close()
def create_megui_job(template, job_dir, opts):
# replace in template: mpg_input, d2v_output, final_output, jobnum
jobnum = get_next_jobnumber(job_dir)
LOG.info("creating job %i...", jobnum)
jobfn = "job%i.xml" % (jobnum, )
jobname = "job%i" % (jobnum, )
opts["jobname"] = jobname
tplf = open(template, "r")
jobf = open(os.path.join(job_dir, jobfn), "w")
try:
for line in tplf:
# line = raw_line.rstrip("\n\r")
for k, v in opts.items():
kd = "".join(("$", k, "$"))
line = line.replace(kd, v)
jobf.write(line)
finally:
jobf.close()
tplf.close()
append_megui_joblist(jobname)
JOB_REO = re.compile("job(\d+)\.xml")
def get_next_jobnumber(job_dir):
max_jn = 0
for job_fn in os.listdir(job_dir):
job_mo = JOB_REO.match(job_fn)
if job_mo:
jn = int(job_mo.group(1))
if jn > max_jn:
max_jn = jn
return max_jn + 1
def usage(script_name):
print
print "usage: %s [-t template] [-j job_dir] [-z temp_dir] [src_dir [dst_dir]]" % (script_name,)
print """
src_dir source directory of original MPG files
dest_dir destination directory for compressed video
options:
-h, --help show this help message and exit
-t, --template job template file
-j, --job_dir megui job directory
-z, --temp_dir working directory for megui
"""
if __name__ == "__main__":
sys.exit(main(sys.argv))

View File

@@ -1,5 +0,0 @@
@echo off
rem determine directory of script. may be different from current working directory
for /F %%I IN ("%0") do set BIN_DIR=%%~dpI
rem expect python.exe in the PATH
python.exe %BIN_DIR%\create_jobs.py %*

View File

@@ -1,154 +0,0 @@
<?xml version="1.0"?>
<TaggedJob xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<EncodingSpeed />
<Job xsi:type="IndexJob">
<Input>$mpg_input$</Input>
<Output>$d2v_output$</Output>
<FilesToDelete />
<LoadSources>false</LoadSources>
<DemuxVideo>false</DemuxVideo>
<DemuxMode>1</DemuxMode>
<AudioTracks>
<AudioTrackInfo>
<TrackInfo>
<Language />
</TrackInfo>
<Language />
<TrackIDx>c0</TrackIDx>
<TrackID>192</TrackID>
<ContainerType>MPEG-PS</ContainerType>
<Index>0</Index>
<Type>MP2</Type>
<NbChannels>2 channels</NbChannels>
<SamplingRate>48.0 KHz</SamplingRate>
</AudioTrackInfo>
</AudioTracks>
<PostprocessingProperties>
<AudioJobs>
<AudioJob>
<Input>::192::</Input>
<FilesToDelete />
<Settings xsi:type="MP3Settings">
<delay>0</delay>
<delayEnabled>false</delayEnabled>
<ForceDecodingViaDirectShow>false</ForceDecodingViaDirectShow>
<DownmixMode>StereoDownmix</DownmixMode>
<BitrateMode>ABR</BitrateMode>
<Bitrate>128</Bitrate>
<AutoGain>true</AutoGain>
<SampleRateType>0</SampleRateType>
<ApplyDRC>false</ApplyDRC>
<Normalize>100</Normalize>
<Quality>50</Quality>
</Settings>
<Delay>0</Delay>
<SizeBytes>0</SizeBytes>
<BitrateMode>CBR</BitrateMode>
</AudioJob>
</AudioJobs>
<DirectMuxAudio />
<AutoDeinterlace>false</AutoDeinterlace>
<AutoDeriveAR>false</AutoDeriveAR>
<SignalAR>true</SignalAR>
<AutoCrop>false</AutoCrop>
<KeepInputResolution>true</KeepInputResolution>
<PrerenderJob>false</PrerenderJob>
<HorizontalOutputResolution>640</HorizontalOutputResolution>
<ContainerTypeString>AVI</ContainerTypeString>
<OutputSize xsi:nil="true" />
<Splitting xsi:nil="true" />
<DAR xsi:nil="true" />
<AvsSettings>
<Mod16Method>none</Mod16Method>
<Resize>false</Resize>
<Template>&lt;input&gt;
Load_Stdcall_Plugin("C:\Program Files\megui\tools\yadif\yadif.dll")
Yadif(order=1)
&lt;crop&gt;
&lt;resize&gt;
&lt;denoise&gt;
</Template>
<ResizeMethod>Lanczos</ResizeMethod>
<DenoiseMethod>MinimalNoise</DenoiseMethod>
<Deinterlace>false</Deinterlace>
<Denoise>false</Denoise>
<IVTC>false</IVTC>
<MPEG2Deblock>false</MPEG2Deblock>
<ColourCorrect>true</ColourCorrect>
<DSS2>false</DSS2>
</AvsSettings>
<VideoSettings xsi:type="xvidSettings">
<EncodingMode>4</EncodingMode>
<BitrateQuantizer>1200</BitrateQuantizer>
<KeyframeInterval>250</KeyframeInterval>
<NbBframes>2</NbBframes>
<MinQuantizer>1</MinQuantizer>
<MaxQuantizer>31</MaxQuantizer>
<Turbo>true</Turbo>
<V4MV>false</V4MV>
<QPel>true</QPel>
<Trellis>true</Trellis>
<CreditsQuantizer>15</CreditsQuantizer>
<FourCCs>
<string>XVID</string>
<string>DIVX</string>
<string>DX50</string>
<string>MP4V</string>
</FourCCs>
<Logfile>.stats</Logfile>
<VideoName />
<CustomEncoderOptions />
<FourCC>0</FourCC>
<MaxNumberOfPasses>2</MaxNumberOfPasses>
<NbThreads>1</NbThreads>
<Quantizer>1200</Quantizer>
<MotionSearchPrecision>6</MotionSearchPrecision>
<VHQMode>4</VHQMode>
<MinPQuant>1</MinPQuant>
<MaxPQuant>31</MaxPQuant>
<MinBQuant>1</MinBQuant>
<MaxBQuant>31</MaxBQuant>
<BQuantRatio>162</BQuantRatio>
<BQuantOffset>0</BQuantOffset>
<KeyFrameBoost>100</KeyFrameBoost>
<KeyframeThreshold>1</KeyframeThreshold>
<KeyframeReduction>20</KeyframeReduction>
<OverflowControlStrength>5</OverflowControlStrength>
<MaxOverflowImprovement>5</MaxOverflowImprovement>
<MaxOverflowDegradation>5</MaxOverflowDegradation>
<HighBitrateDegradation>30</HighBitrateDegradation>
<LowBitrateImprovement>15</LowBitrateImprovement>
<ReactionDelayFactor>16</ReactionDelayFactor>
<AveragingPeriod>100</AveragingPeriod>
<FrameDropRatio>0</FrameDropRatio>
<RateControlBuffer>0</RateControlBuffer>
<XvidProfile>0</XvidProfile>
<VbvPeakRate>0</VbvPeakRate>
<VbvMaxRate>0</VbvMaxRate>
<VbvBuffer>0</VbvBuffer>
<PackedBitstream>false</PackedBitstream>
<GMC>false</GMC>
<ChromaMotion>true</ChromaMotion>
<ClosedGOP>true</ClosedGOP>
<VHQForBframes>true</VHQForBframes>
<AdaptiveQuant>true</AdaptiveQuant>
<Interlaced>false</Interlaced>
<BottomFieldFirst>true</BottomFieldFirst>
<LumiMasking>false</LumiMasking>
<BframeThreshold>0</BframeThreshold>
<QuantizerMatrix>H.263</QuantizerMatrix>
</VideoSettings>
<CustomAR>1</CustomAR>
<ChapterFile />
<FinalOutput>$final_output$</FinalOutput>
<DeviceOutputType />
<UseChaptersMarks>false</UseChaptersMarks>
</PostprocessingProperties>
</Job>
<RequiredJobNames />
<EnabledJobNames />
<Name>$jobname$</Name>
<Status>WAITING</Status>
<Start>0001-01-01T00:00:00</Start>
<End>0001-01-01T00:00:00</End>
</TaggedJob>

43
src/oauth-rest/get-xml.py Normal file
View File

@@ -0,0 +1,43 @@
#!python3
# encoding: utf8
# Copyright (C) 2017 Andreas Balogh
# Published under the MIT LICENCE, See LICENCE for details.
"""
An oauth authenticated web request
"""
import requests
import requests.auth
# Request a token
client_auth = requests.auth.HTTPBasicAuth('p-jcoLKBynTLew', 'gko_LXELoV07ZBNUXrvWZfzE3aI')
post_data = {"grant_type": "password", "username": "reddit_bot", "password": "snoo"}
headers = {"User-Agent": "ChangeMeClient/0.1 by YourUsername"}
response = requests.post("https://www.reddit.com/api/v1/access_token",
auth=client_auth, data=post_data, headers=headers)
response.json()
# Use the token
headers = {"Authorization": "bearer fhTdafZI-0ClEzzYORfBSCR7x3M",
"User-Agent": "ChangeMeClient/0.1 by YourUsername"}
response = requests.get("https://oauth.reddit.com/api/v1/me", headers=headers)
response.json()
import urllib.request
DATA = "grant_type=password&username={user}&password={pwd}".format([user, pwd])
req = urllib.request.Request(url="https://www.reddit.com/api/v1/access_token",
data=DATA,
method='PUT')
with urllib.request.urlopen(req) as f:
pass
print(f.status)
print(f.reason)
req = urllib.request.Request(url="https://www.reddit.com/api/v1/access_token")
req.add_header("Authorization", "bearer {0}".format(token))
with urllib.request.urlopen(req) as f:
pass
print(f.status)
print(f.reason)

311
src/pfc/pfc_asyncio.py Normal file
View File

@@ -0,0 +1,311 @@
#!python3
# Created 2018 Andreas Balogh
""" price feed client with asyncio
Coroutine based price feed reader. Parses messages, filters and prints.
tk GUI & async: https://stackoverflow.com/questions/49958180/using-async-await-keywords-with-tk-after-method-of-tkinter
"""
from argparse import ArgumentParser
import concurrent.futures
import asyncio
import datetime
import logging
import os
import re
import sys
import collections
import datetime as dt
from _socket import AF_INET
import queue
LOG = logging.getLogger()
ONE_SECOND = datetime.timedelta(seconds=1)
ONE_MINUTE = datetime.timedelta(seconds=60)
# queue for GUI to asyncio messages
class PriceFeedClient:
def __init__(self, host, port, pfcq, guiq, keycol):
self.guiq = guiq
self.pfcq = pfcq
self.host = host
self.port = port
self.keycol = int(keycol)
LOG.info(f"keycol: {keycol}")
self.loop = asyncio.get_event_loop()
# status
self.feed_statd = PriceFeedClient.__init_statd()
self.sym_statd = collections.defaultdict(PriceFeedClient.__init_statd)
self.tps_history = list()
# ticker feed
self.filter = None
self.ticker = list()
self.last_symbol = None
self.ticks_per_block = 10
# log subsystem
self.log_fp = None
self.log_stopdt = None
@staticmethod
def __init_statd():
statd = dict()
statd['last_tick'] = dt.datetime.now()
statd['sec_interval_ticks'] = 0
statd['prev_sec_interval_ticks'] = 0
statd['current_sec'] = statd['last_tick'].replace(microsecond=0)
statd['sec_count'] = 0
statd['ticks_per_sec'] = 0.0
return statd
async def stream_reader(self):
# (family, type, proto, canonname, sockaddr) = getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)
# AbstractEventLoop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)
host_details = await self.loop.getaddrinfo(self.host, self.port, family=AF_INET)
(_, _, _, _, sockaddr) = host_details[0]
LOG.info(f"host: {sockaddr[0]}, port: {sockaddr[1]}")
try:
reader, _ = await asyncio.open_connection(sockaddr[0], sockaddr[1])
except ConnectionRefusedError as e:
# LOG.exception(e)
self.guiq.put(("set_status", (f"{e}",), {}))
return
except:
raise
self.guiq.put(("set_status", (f"connected to host: {self.host}:{self.port}, IP: {sockaddr[0]}",), {}))
# tt = bytes.maketrans(b"\x00\x0c", b"|\n")
while True:
# read price feed line
try:
bytes_ = await reader.readuntil(separator=b'\x0c')
except concurrent.futures._base.CancelledError:
LOG.info("await reader.readuntil() aborted, terminating")
break
except Exception as e:
LOG.exception(e)
self.guiq.put(("set_status", (f"{e}",), {}))
break
line = bytes_.decode()
s = line.rstrip()
flds = s.split("\x00")
# calculate feed statistics
self.update_statd(self.feed_statd)
# calculate symbol statistics
if len(flds) > self.keycol:
symbol = flds[self.keycol]
statd = self.sym_statd[symbol]
self.update_statd(statd)
if len(self.ticker) < self.ticks_per_block or self.log_fp:
fldstr = "|".join(flds)
now = dt.datetime.now()
ts = now.strftime("%H:%M:%S.%f")
tick = ":".join((ts, fldstr))
if self.filter is not None:
mo = self.filter.search(tick)
if mo is not None:
self.dump_tick(tick)
self.last_symbol = symbol
else:
self.dump_tick(tick)
def dump_tick(self, tick):
if len(self.ticker) < self.ticks_per_block:
self.ticker.append(tick)
if self.log_fp:
line = "".join((tick, "\n"))
self.log_fp.write(line)
@staticmethod
def update_statd(statd):
now = dt.datetime.now()
now_sec = now.replace(microsecond=0)
statd['last_tick'] = now
# update ticks per sec
statd['sec_interval_ticks'] = statd['sec_interval_ticks'] + 1
if now_sec != statd['current_sec']:
if statd['sec_count'] < 60:
statd['sec_count'] = statd['sec_count'] + 1
a = statd['sec_count'] - 1
seconds_since_last = (now_sec - statd['current_sec']).total_seconds()
b = 1 / seconds_since_last
statd['ticks_per_sec'] = (a * statd['ticks_per_sec'] + b * statd['sec_interval_ticks']) / statd['sec_count']
statd['prev_sec_interval_ticks'] = statd['sec_interval_ticks']
statd['sec_interval_ticks'] = 0
statd['current_sec'] = now_sec
async def histogram_sender(self):
while True:
# tick update latency
latency_histogram = collections.defaultdict(int)
now = dt.datetime.now()
now_sec = now.replace(microsecond=0)
for statd in self.sym_statd.values():
ltd = now_sec - statd['current_sec']
latency = int(ltd.total_seconds())
latency_histogram[latency] = latency_histogram[latency] + 1
self.guiq.put(("upd_latency", (latency_histogram,), {}))
await asyncio.sleep(2)
LOG.info(f"done")
async def guiq_sender(self):
while True:
# update feed and symbols statistics once per sec
self.update_statd(self.feed_statd)
# for statd in self.sym_statd.values():
# self.update_statd(statd)
# tps history
self.tps_history.insert(0, self.feed_statd['prev_sec_interval_ticks'])
try:
del self.tps_history[120:]
except:
pass
self.guiq.put(("upd_tps_history", (self.tps_history,), {}))
# feed tick flow (in Hz)
statd = self.feed_statd
symbol_count = len(self.sym_statd)
tick_flow = statd['ticks_per_sec']
ticks_per_interval = statd['prev_sec_interval_ticks']
# print(f"{statd['last_tick']}, tps: {statd['ticks_per_sec']:,.0f}, flow: {tick_flow:,.2f}")
self.guiq.put(("upd_feed_stats", (symbol_count, tick_flow, ticks_per_interval, statd['last_tick']), {}))
# symbol tick flow
if len(self.ticker) > 0:
self.guiq.put(("update_ticker", (list(self.ticker),), {}))
# symbol stats
if self.filter and self.last_symbol:
statd = self.sym_statd[self.last_symbol]
tpm = statd['ticks_per_sec'] * 60
last_symbol_tick = statd['last_tick']
self.guiq.put(("upd_symbol_stats", (self.last_symbol, tpm, last_symbol_tick), {}))
self.ticker = list()
if self.log_fp:
rtd = self.log_stopdt - dt.datetime.now()
remaining = int(rtd.total_seconds())
if remaining > 0:
self.guiq.put(("set_log_remaining", (f"{remaining}",), {}))
else:
self.abort_log()
await asyncio.sleep(1)
LOG.info(f"done")
async def guiq_printer(self):
# for command line and debugging
while True:
try:
cmd, args, kwargs = self.guiq.get_nowait()
print(cmd, args, kwargs)
except queue.Empty:
pass
await asyncio.sleep(1)
async def pfcq_reader(self):
while True:
cmd, args, kwargs = await self.pfcq.get()
try:
fn = getattr(self, cmd)
fn(*args, **kwargs)
except Exception as e:
print(cmd, args, kwargs)
LOG.exception(e)
def set_filter(self, text):
if len(text) > 0:
try:
self.filter = re.compile(re.escape(text))
except:
self.filter = None
self.guiq.put(("clear_symbol_stats", (), {}))
else:
self.filter = None
self.guiq.put(("clear_symbol_stats", (), {}))
def set_ticks_per_block(self, n):
self.ticks_per_block = n
def start_log(self, fn, duration):
if self.log_fp is None:
try:
self.log_fp = open(fn, "w")
self.log_stopdt = dt.datetime.now() + dt.timedelta(seconds=duration)
LOG.debug(f"logging activated until {self.log_stopdt}")
self.guiq.put(("set_log_remaining", (f"{duration}",), {}))
# self.loop.set_debug(True)
except Exception as e:
LOG.exception(e)
self.guiq.put(("set_status", (f"{e}",), {}))
self.log_fp = None
self.guiq.put(("set_log_remaining", ("",), {}))
else:
LOG.warn(f"logging still active until {self.log_stopdt}")
def abort_log(self):
if self.log_fp:
# self.loop.set_debug(False)
self.log_fp.close()
self.log_fp = None
self.log_stopdt = None
self.guiq.put(("set_log_remaining", ("",), {}))
async def cancel_reader(loop, tasks, cancel_evt):
# wait for event from main thread to cancel
LOG.info(f"start cancel_reader, waiting for event")
# threading synchronisation primitives do block thus must run in an executor thread
await loop.run_in_executor(None, cancel_evt.wait)
LOG.info(f"cancelling stream_reader")
for task in tasks:
task.cancel()
def create_stream_reader(loop, cancel_evt, *args):
pfc = PriceFeedClient(*args)
# this function runs in the asyncio thread
tasks = [ loop.create_task(pfc.stream_reader()),
loop.create_task(pfc.guiq_sender()),
loop.create_task(pfc.histogram_sender()),
loop.create_task(pfc.pfcq_reader()) ]
LOG.info(f"created stream reader")
# since the tasks can not be returned as a result through call_soon_threadsafe
# and cancellation must occor from the asyncio thread
# we embed the task.cancel in an async function to wait for a threading Event triggered from the main GUI thread
loop.create_task(cancel_reader(loop, tasks, cancel_evt))
def asyncio_run(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
# https://docs.python.org/3.6/library/asyncio-eventloop.html?highlight=asyncgen#asyncio.AbstractEventLoop.shutdown_asyncgens
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
def cli(argv=None):
# command line interface
if argv is None:
argv = sys.argv
script_name = os.path.basename(argv[0])
LOG.info(f"Welcome to {script_name}, created 2009, updated 2018, Andreas Balogh")
# main
loop = asyncio.get_event_loop()
pfcq = asyncio.Queue()
guiq = queue.Queue()
pfc = PriceFeedClient("fracbxolappu2.de.db.com", 20354, pfcq, guiq, "1")
# pfc = PriceFeedClient("fracbxolappu8.de.db.com", 20377, q)
loop.create_task(pfc.stream_reader())
loop.create_task(pfc.guiq_printer())
loop.create_task(pfc.guiq_sender())
loop.create_task(pfc.histogram_sender())
asyncio_run(loop)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format='%(asctime)s.%(msecs)03i [%(thread)i] %(levelname).4s %(funcName)10s: %(message)s',
datefmt='%H:%M:%S')
sys.exit(cli())

617
src/pfc/pfc_gui.py Normal file
View File

@@ -0,0 +1,617 @@
#!python3
# Created 2018 Andreas Balogh
""" XOL price feed client
Coroutine based multithreaded XOL price feed reader.
Features:
* quick one click switching between feeds
* displays messages in ticker window
* allows to filter in ticker text
* logging facility. Saves price feed to disk.
* tick flow history
* product tick latency histogram
See also:
http://effbot.org/tkinterbook/
https://infohost.nmt.edu/tcc/help/pubs/tkinter/web/index.html
https://tkdocs.com/tutorial/index.html
How to install?
1. install Anaconda from Automated Software Distribution, potentially one version of the Windows Runtime Library is needed
2. set proxy environment variables (optional)
set http_proxy=http://surf-proxy.intranet.db.com:8080
set https_proxy=http://surf-proxy.intranet.db.com:8080
3. create configuration file in c:/Users/<username>/pricefeed.ini
[defaults]
# see https://docs.python.org/3.6/library/os.path.html#os.path.expanduser for details
logpath=~\Documents
duration=10
[feedname1]
host=fracbxolappu2.de.db.com
port=20354
keycol=1
[feedname2]
...
"""
import asyncio
import configparser
import logging
from pathlib import Path
import queue
import sys
import re
import threading
from tkinter import ttk
import datetime as dt
import pfc_asyncio
import tkinter as tk
import tkinter.font as tkFont
import tkinter.messagebox
import tkinter.filedialog
LOG = logging.getLogger()
class TpsHistory(tk.Canvas):
def __init__(self, parent, *args, **kwargs):
super().__init__(parent, *args, **kwargs)
self.bar_w = 2
def plot(self, tps_list):
w = self.winfo_width() - 4
h = self.winfo_height() - 4
# LOG.debug(f"size {w}x{h} pixels")
xd0 = 20
yd0 = 10
xd1 = w - 5
yd1 = h - 15
# LOG.debug(f"border ({xa0},{ya0}), ({xa1},{ya1})")
tps_max = max(tps_list)
axis_max = int(tps_max / 1000) + 1
diag_max = axis_max * 1000
self.delete("frame")
self.delete("axis")
self.create_rectangle(xd0, yd0, xd1 + 1, yd1 + 1, tags="frame")
self.create_text(xd0 - 10, yd0, justify=tk.RIGHT, text=f"{axis_max}k", tags="axis")
self.create_text(xd0 - 10, yd1, justify=tk.RIGHT, text="0", tags="axis")
for i in (90, 60, 30, 0):
self.create_text(xd1 - i * self.bar_w, yd1 + 10, justify=tk.CENTER, text=f"{i}", tags="axis")
self.delete("bar")
# print(f"tps_list has {len(tps_list)} item")
xa0 = xd0 + 1
ya0 = yd0 + 1
xa1 = xd1
ya1 = yd1
for n, tps in enumerate(tps_list):
xb0 = xa1 - (n + 1) * self.bar_w + 1
if xb0 <= xa0:
break
xb1 = xa1 - n * self.bar_w
yb0 = ya1 - max(0, (tps / diag_max) * (ya1 - ya0))
yb1 = ya1
# print(xb0, xb1, yb0, yb1)
if yb0 != yb1:
self.create_rectangle(xb0, yb0, xb1 + 1, yb1 + 1, tags="bar", width=0, fill="blue")
class LatencyHistogram(tk.Canvas):
def __init__(self, parent, *args, **kwargs):
super().__init__(parent, *args, **kwargs)
self.bar_w = 1
def plot(self, histogram):
tps_list = [0] * 210
for s, n in histogram.items():
if s >= 210:
continue
tps_list[s] = n
# diagram geometry
w = self.winfo_width() - 4
h = self.winfo_height() - 4
# LOG.debug(f"size {w}x{h} pixels")
xd0 = 20
yd0 = 10
xd1 = w - 5
yd1 = h - 15
# LOG.debug(f"border ({xa0},{ya0}), ({xa1},{ya1})")
tps_max = max(tps_list)
axis_max = int(tps_max / 1000) + 1
diag_max = axis_max * 1000
self.delete("frame")
self.delete("axis")
self.create_rectangle(xd0, yd0, xd1 + 1, yd1 + 1, tags="frame")
self.create_text(xd0 - 10, yd0, justify=tk.RIGHT, text=f"{axis_max}k", tags="axis")
self.create_text(xd0 - 10, yd1, justify=tk.RIGHT, text="0", tags="axis")
for i in (0, 30, 60, 90, 120, 150, 180):
self.create_text(xd0 + i * self.bar_w, yd1 + 10, justify=tk.CENTER, text=f"{i}", tags="axis")
self.delete("bar")
# print(f"tps_list has {len(tps_list)} item")
xa0 = xd0 + 1
ya0 = yd0 + 1
xa1 = xd1
ya1 = yd1
for n, tps in enumerate(tps_list):
xb0 = xa0 + n * self.bar_w
xb1 = xa0 + (n + 1) * self.bar_w - 1
if xb1 >= xa1:
continue
yb0 = ya1 - max(0, (tps / diag_max) * (ya1 - ya0))
yb1 = ya1
# print(xb0, xb1, yb0, yb1)
if yb0 != yb1:
self.create_rectangle(xb0, yb0, xb1 + 1, yb1 + 1, tags="bar", width=0, fill="red")
class LatencyCumulative(tk.Canvas):
def __init__(self, parent, *args, **kwargs):
super().__init__(parent, *args, **kwargs)
self.bar_w = 1
def draw_axis(self):
pass
def plot(self, histogram):
tps_list = [0] * 210
for s, n in histogram.items():
if s >= 210:
continue
tps_list[s] = n
# diagram geometry
w = self.winfo_width() - 4
h = self.winfo_height() - 4
# LOG.debug(f"size {w}x{h} pixels")
xd0 = 20
yd0 = 10
xd1 = w - 5
yd1 = h - 15
# LOG.debug(f"border ({xa0},{ya0}), ({xa1},{ya1})")
tps_max = max(tps_list)
axis_max = int(tps_max / 1000) + 1
diag_max = axis_max * 1000
self.delete("axis")
self.create_rectangle(xd0, yd0, xd1 + 1, yd1 + 1, tags="axis")
self.create_text(xd0 - 10, yd0, justify=tk.RIGHT, text=f"{axis_max}k", tags="axis")
self.create_text(xd0 - 10, yd1, justify=tk.RIGHT, text="0", tags="axis")
for i in (0, 30, 60, 90, 120, 150, 180):
self.create_text(xd0 + i * self.bar_w, yd1 + 10, justify=tk.CENTER, text=f"{i}", tags="axis")
self.delete("bar")
# print(f"tps_list has {len(tps_list)} item")
xa0 = xd0 + 1
ya0 = yd0 + 1
xa1 = xd1
ya1 = yd1
for n, tps in enumerate(tps_list):
xb0 = xa0 + n * self.bar_w
xb1 = xa0 + (n + 1) * self.bar_w - 1
if xb1 >= xa1:
continue
yb0 = ya1 - max(0, (tps / diag_max) * (ya1 - ya0))
yb1 = ya1
# print(xb0, xb1, yb0, yb1)
if yb0 != yb1:
self.create_rectangle(xb0, yb0, xb1 + 1, yb1 + 1, tags="bar", width=0, fill="red")
class LogAs(tk.Toplevel):
# see http://effbot.org/tkinterbook/tkinter-dialog-windows.htm
def __init__(self, parent, feed, **kwargs):
super().__init__(parent, **kwargs)
self.transient(parent)
self.parent = parent
self.title("Log as")
# setup widgets
self.path = tk.StringVar()
self.duration = tk.StringVar()
self.__create_widgets()
now = dt.datetime.now()
# can't use tt = str.maketrans(r"/\", "__")
# r"\" is not a valid string literal (even a raw string cannot end in an odd number of backslashes).
# Specifically, a raw literal cannot end in a single backslash (since the backslash would escape the following quote character).
# see https://docs.python.org/3.6/reference/lexical_analysis.html#literals
tt = str.maketrans(r"\/", "__")
fn = f"{feed}-{now:%Y%m%d}-{now:%H%M%S}.log".translate(tt)
fp = Path(parent.defaults["logpath"], fn).expanduser()
self.path.set(str(fp))
self.duration.set(parent.defaults["duration"])
# housekeeping
self.protocol("WM_DELETE_WINDOW", self.cancel)
self.geometry("+{}+{}".format(parent.winfo_rootx() + 50, parent.winfo_rooty() + 50))
self.wait_window(self)
def __create_widgets(self):
# destination selection
self.fr1 = ttk.Frame(self)
self.fr1.pack(fill=tk.BOTH, expand=1, padx=5, pady=5)
vcmd = (self.register(self.validate_numeric), '%P')
self.en11 = ttk.Entry(self.fr1, width=60, textvariable=self.path, validate='all', validatecommand=vcmd)
self.en11.pack(side=tk.LEFT)
self.bu11 = ttk.Button(self.fr1, text="Browse", command=self.browse)
self.bu11.pack(side=tk.LEFT)
# duration
self.fr2 = ttk.Frame(self)
self.fr2.pack()
self.la21 = ttk.Label(self.fr2, text='Duration [s]')
self.la21.pack(side=tk.LEFT)
self.en21 = ttk.Entry(self.fr2, width=20, textvariable=self.duration)
self.en21.pack(side=tk.RIGHT)
# start
self.fr3 = ttk.Frame(self)
self.fr3.pack()
self.bu31 = ttk.Button(self.fr3, text="Cancel", command=self.cancel)
self.bu31.pack(side=tk.LEFT)
self.bu32 = ttk.Button(self.fr3, text="Start", command=self.start)
self.bu32.pack(side=tk.LEFT)
def validate_numeric(self, new_text):
return new_text.isdigit()
def browse(self):
fp = Path(self.path.get())
fn = tkinter.filedialog.asksaveasfilename(defaultextension=".log", filetypes=(("LOG", "*.log"),),
initialdir=fp.parent, initialfile=fp.name,
parent=self, title="Save log as")
if len(fn) > 0:
self.path.set(fn)
def start(self):
fn = self.path.get()
duration = int(self.duration.get())
self.parent.loop.call_soon_threadsafe(self.parent.pfcq.put_nowait, ("start_log", (fn, duration), {}))
self.cancel()
def cancel(self):
self.parent.focus_set()
self.destroy()
class App(tk.Tk):
def __init__(self, loop):
super().__init__()
self.option_add('*tearOff', False)
self.title("XOL price feed monitor")
self.protocol('WM_DELETE_WINDOW', self.file_exit)
self.text_lines = 15
self.__create_menubar()
self.__create_widgets()
self.pfcq = asyncio.Queue(loop=loop)
self.guiq = queue.Queue()
self.cancel_evt = None
self.active_feed = -1
self.loop = loop
self.feeds = dict()
self.defaults = { "duration": 5,
"logpath": r"~\Documents" }
self.last_symbol_tick = None
# start asynchronous initialization
self.la2['text'] = "Loading configuration..."
self.update_idletasks()
self.after_idle(self.load_config)
def __create_menubar(self):
self.mb = tk.Menu(self)
self.menu_file = tk.Menu(self.mb)
self.menu_about = tk.Menu(self.mb)
self.mb.add_cascade(menu=self.menu_file, label='Pricefeed')
self.mb.add_cascade(menu=self.menu_about, label='Help')
self.menu_file.add_command(label="Log as...", command=self.file_saveas)
self.menu_file.add_command(label='Exit', command=self.file_exit)
self.menu_about.add_command(label='About...', command=self.help_about)
self['menu'] = self.mb
def __create_widgets(self):
# feed selection list
self.fr0 = ttk.LabelFrame(self, padding=2, borderwidth="2p", text="Feeds")
self.fr0.pack(side=tk.LEFT, anchor=tk.N, fill=tk.Y, padx=5, pady=5)
# entry line
self.lb1 = tk.Listbox(self.fr0, selectmode=tk.SINGLE, exportselection=False, activestyle='none')
self.lb1.pack(fill=tk.BOTH, expand=1)
# feed select and filter
self.fr1 = ttk.Frame(self, padding=5)
self.fr1.pack(side=tk.LEFT, fill=tk.BOTH, expand=1)
# entry line
self.fr11 = ttk.Frame(self.fr1, padding=5)
self.fr11.pack(fill=tk.X)
# self.cb1 = ttk.Combobox(self.fr1)
# self.bu1 = ttk.Button(self.fr11, text="...", command=self.edit_feeds)
# self.bu1.pack(side=tk.LEFT)
self.la1 = ttk.Label(self.fr11, text='Search pattern ')
self.la1.pack(side=tk.LEFT)
self.filter = tk.StringVar()
vcmd = (self.register(self.filter_change), '%V', '%s', '%P')
self.en1 = ttk.Entry(self.fr11, validate='all', validatecommand=vcmd, textvariable=self.filter)
self.en1.pack(side=tk.LEFT)
self.scroll_lock = tk.IntVar()
self.cb11 = ttk.Checkbutton(self.fr11, text="Scroll lock", variable=self.scroll_lock)
self.cb11.pack(side=tk.RIGHT)
self.scroll_lock.set(0)
# status line
self.la2 = ttk.Label(self.fr1, text='Label2')
self.la2.pack(anchor=tk.W)
self.te1 = tk.Text(self.fr1, width=80, height=self.text_lines, wrap=tk.NONE)
self.te1.pack(fill=tk.BOTH, expand=1)
self.te1.bind("<Configure>", self.__resize_text)
# properties frame
self.fr2 = ttk.Frame(self, padding=5)
self.fr2.pack(side=tk.LEFT, anchor=tk.N, fill=tk.Y)
# properties panel
self.fr3 = ttk.LabelFrame(self.fr2, padding=2, borderwidth="2p", text="Feed")
self.fr3.pack(side=tk.TOP)
# property row
self.fr31 = ttk.Frame(self.fr3, padding=2)
self.fr31.pack(fill=tk.X)
self.la31 = ttk.Label(self.fr31, text='Symbol count')
self.la31.pack(side=tk.LEFT)
self.en31 = ttk.Entry(self.fr31, justify=tk.RIGHT)
self.en31.pack(side=tk.RIGHT)
# property row
self.fr32 = ttk.Frame(self.fr3, padding=2)
self.fr32.pack(fill=tk.X)
self.la32 = ttk.Label(self.fr32, text='tps (60sec avg)')
self.la32.pack(side=tk.LEFT)
self.en32 = ttk.Entry(self.fr32, justify=tk.RIGHT)
self.en32.pack(side=tk.RIGHT)
# property row
self.fr33 = ttk.Frame(self.fr3, padding=2)
self.fr33.pack(fill=tk.X)
self.la33 = ttk.Label(self.fr33, text='tps (last sec)')
self.la33.pack(side=tk.LEFT)
self.en33 = ttk.Entry(self.fr33, justify=tk.RIGHT)
self.en33.pack(side=tk.RIGHT)
# property row
self.fr34 = ttk.Frame(self.fr3, padding=2)
self.fr34.pack(fill=tk.X)
self.la34 = ttk.Label(self.fr34, text='Last tick')
self.la34.pack(side=tk.LEFT)
self.en34 = ttk.Entry(self.fr34, justify=tk.RIGHT)
self.en34.pack(side=tk.RIGHT)
# properties panel
self.fr6 = ttk.LabelFrame(self.fr2, borderwidth="2p", text="Tick history")
self.fr6.pack(side=tk.TOP, fill=tk.X)
self.ca61 = TpsHistory(self.fr6, width=100, height=100)
self.ca61.pack()
# properties panel
self.fr7 = ttk.LabelFrame(self.fr2, borderwidth="2p", text="Latency histogram")
self.fr7.pack(side=tk.TOP, fill=tk.X)
self.ca71 = LatencyHistogram(self.fr7, width=100, height=100)
self.ca71.pack()
# properties panel
self.fr4 = ttk.LabelFrame(self.fr2, padding=2, borderwidth="2p", text="Filtered")
self.fr4.pack(side=tk.TOP, fill=tk.X)
# property row
self.fr41 = ttk.Frame(self.fr4, padding=2)
self.fr41.pack(fill=tk.X)
self.la41 = ttk.Label(self.fr41, text='Symbol')
self.la41.pack(side=tk.LEFT)
self.en41 = ttk.Entry(self.fr41)
self.en41.pack(side=tk.RIGHT)
# property row
self.fr42 = ttk.Frame(self.fr4, padding=2)
self.fr42.pack(fill=tk.X)
self.la42 = ttk.Label(self.fr42, text='tpm (60sec avg)')
self.la42.pack(side=tk.LEFT)
self.en42 = ttk.Entry(self.fr42, justify=tk.RIGHT)
self.en42.pack(side=tk.RIGHT)
# property row
self.fr43 = ttk.Frame(self.fr4, padding=2)
self.fr43.pack(fill=tk.X)
self.la43 = ttk.Label(self.fr43, text='Since last tick')
self.la43.pack(side=tk.LEFT)
self.en43 = ttk.Entry(self.fr43, justify=tk.RIGHT)
self.en43.pack(side=tk.RIGHT)
# properties panel
self.fr5 = ttk.LabelFrame(self.fr2, padding=2, borderwidth="2p", text="Log")
self.fr5.pack(side=tk.TOP, fill=tk.X)
# property row
self.fr51 = ttk.Frame(self.fr5, padding=2)
self.fr51.pack(fill=tk.X)
self.la51 = ttk.Label(self.fr51, text='Remaining [s]')
self.la51.pack(side=tk.LEFT)
self.en51 = ttk.Entry(self.fr51, justify=tk.RIGHT)
self.en51.pack(side=tk.RIGHT)
# property row
self.fr52 = ttk.Frame(self.fr5, padding=2)
self.fr52.pack()
self.bu52a = ttk.Button(self.fr52, text='LogAs...', command=self.file_saveas)
self.bu52a.pack(side=tk.LEFT)
self.bu52b = ttk.Button(self.fr52, text='Abort', command=self.abort_log)
self.bu52b.pack(side=tk.LEFT)
def edit_feeds(self):
LOG.info("feeds")
def file_saveas(self):
feed = self.lb1.get(tk.ACTIVE)
self.logas = LogAs(self, feed)
def abort_log(self):
self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("abort_log", (), {}))
def file_exit(self):
self.stop_pricefeed()
# allow threads to terminate
self.after(100, self.quit)
def help_about(self):
tk.messagebox.showinfo("About", "Created by Andreas Balogh, 2018", parent=self)
def __resize_text(self, event):
# see https://stackoverflow.com/questions/49411359/how-to-get-the-height-of-the-tkinter-text-widget-after-resizing
font = tkFont.nametofont(self.te1.cget("font"))
width = int(event.width / font.measure("0"))
height = int(event.height / font.metrics('linespace')) + 1
LOG.debug(f"new size {width}x{height} chars, {event.width}x{event.height} pixels")
self.text_lines = height
self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("set_ticks_per_block", (self.text_lines,), {}))
if False:
print(event)
print(font.actual())
print(font.measure("0"))
print(font.metrics())
def filter_change(self, event, old, new):
if old != new:
# LOG.debug(f"event: {event}, old: {old}, new: {new}")
self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("set_filter", (new,), {}))
return True
def populate_feeds(self):
for feed in self.feeds:
self.lb1.insert(tk.END, feed)
self.lb1.selection_set(0)
self.lb1.activate(0)
def monitor_feed_change(self):
# switch to current price feed
current_feed = self.lb1.get(tk.ACTIVE)
if self.active_feed != current_feed:
LOG.info(f"Price feed changed to '{current_feed}'...")
self.stop_pricefeed()
self.start_pricefeed()
self.active_feed = current_feed
self.after(500, self.monitor_feed_change)
def update_ticker(self, ticks):
lock = self.scroll_lock.get()
if not lock:
for tick in ticks:
self.te1.insert("1.0", f"{tick}\n")
n = self.text_lines + 1
self.te1.delete(f"{n}.0", tk.END)
def set_status(self, s):
self.la2['text'] = s
def upd_tps_history(self, tps_history):
self.ca61.config(width=self.fr6.winfo_width() - 15, height=100)
self.ca61.plot(tps_history)
def upd_latency(self, latency_histogram):
self.ca71.config(width=self.fr7.winfo_width() - 15, height=100)
self.ca71.plot(latency_histogram)
def upd_feed_stats(self, symbol_count, ticks_per_sec, ticks_per_interval, last_tick):
App.set_entry(self.en31, f"{symbol_count:,}")
App.set_entry(self.en32, f"{ticks_per_sec:,.0f}")
App.set_entry(self.en33, f"{ticks_per_interval:,.0f}")
App.set_entry(self.en34, f"{last_tick:%H:%M:%S.%f}")
if self.last_symbol_tick:
since_last_symbol_tick = dt.datetime.now() - self.last_symbol_tick
App.set_entry(self.en43, f"{since_last_symbol_tick.seconds} sec")
def upd_symbol_stats(self, symbol, ticks_per_min, last_symbol_tick):
App.set_entry(self.en41, f"{symbol}")
App.set_entry(self.en42, f"{ticks_per_min:,.1f}")
self.last_symbol_tick = last_symbol_tick
App.set_entry(self.en43, f"< 1 sec")
def clear_symbol_stats(self):
App.set_entry(self.en41, "")
App.set_entry(self.en42, "")
App.set_entry(self.en43, "")
self.last_symbol_tick = None
def set_log_remaining(self, s):
App.set_entry(self.en51, s)
if len(s) > 0:
self.bu52a.state(["disabled"])
self.bu52b.state(["!disabled"])
else:
self.bu52a.state(["!disabled"])
self.bu52b.state(["disabled"])
@staticmethod
def set_entry(w, text):
w.delete(0, tk.END)
w.insert(0, text)
def guiq_reader(self):
# process price feed data
while True:
try:
cmd, args, kwargs = self.guiq.get_nowait()
except queue.Empty:
break
try:
fn = getattr(self, cmd)
fn(*args, **kwargs)
except Exception as e:
print(cmd, args, kwargs)
LOG.exception(e)
self.after(500, self.guiq_reader)
def start_pricefeed(self):
feed_name = self.lb1.get(tk.ACTIVE)
feed = self.feeds[feed_name]
host_port = (feed['host'], int(feed['port']))
keycol = feed["keycol"]
self.set_status(f"Starting pricefeed {feed_name} {host_port}...")
self.cancel_evt = threading.Event()
self.loop.call_soon_threadsafe(pfc_asyncio.create_stream_reader, self.loop, self.cancel_evt, *host_port, self.pfcq, self.guiq, keycol)
# sync variables with new feed
self.scroll_lock.set(0)
filter_ = self.filter.get()
self.loop.call_soon_threadsafe(self.pfcq.put_nowait, ("set_filter", (filter_,), {}))
# delete ticker screen
self.te1.delete("1.0", tk.END)
self.set_log_remaining("")
self.after(500, self.guiq_reader)
def stop_pricefeed(self):
if self.cancel_evt is not None:
print(f"stopping pricefeed")
self.cancel_evt.set()
def load_config(self):
fp = Path(r'~\pricefeed.ini').expanduser()
config = configparser.ConfigParser()
config.read(str(fp))
for feed_name in config.sections():
sd = config[feed_name]
if feed_name == "defaults":
if "logpath" in sd:
self.defaults["logpath"] = sd["logpath"]
if "duration" in sd:
if sd["duration"].isdigit():
self.defaults["duration"] = sd["duration"]
else:
LOG.error(f"{sd['duration']} not numeric, using default")
else:
feed = dict()
feed['host'] = sd['host']
feed['port'] = sd['port']
feed['keycol'] = sd['keycol']
self.feeds[feed_name] = feed
self.populate_feeds()
self.after_idle(self.monitor_feed_change)
def gui():
# create asyncio loop
loop = asyncio.new_event_loop()
# setup tk.Tk() and toplevel window
root = App(loop)
# start async loop in separate thread
threading.Thread(target=pfc_asyncio.asyncio_run, args=(loop,)).start()
# tk GUI loop in main thread
root.mainloop()
# stop asyncio
loop.call_soon_threadsafe(loop.stop)
# destructor for tk resources
root.destroy()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i [%(thread)i] %(levelname).4s %(funcName)10s: %(message)s',
datefmt='%H:%M:%S')
sys.exit(gui())

View File

@@ -1,132 +0,0 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.eventlet
Price Publisher
"""
# imports
from argparse import ArgumentParser
import eventlet
import logging
import os
import random
import sys
import cPickle as pickle
LOG = logging.getLogger()
# definitions
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description="Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from %(dest)s")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
print args
# create product dict
prds = { }
pubqs = []
for n in range(10):
key = "AB" + "{:04}".format(n)
prds["AB" + key] = Pricer(key)
# start one thread for price changes
eventlet.spawn(controller, prds, pubqs)
address = ('localhost', 8010)
eventlet.spawn(listener, address, pubqs)
# main thread runs eventlet loop
while True:
eventlet.sleep(10)
def listener(address, pubqs):
sock = eventlet.listen(address)
while True:
LOG.info('waiting for connection on %s', address)
cx, remote = sock.accept()
LOG.info("accepting connection from %s", remote)
inq = eventlet.queue.Queue()
pubqs.append(inq)
eventlet.spawn(receiver, cx)
eventlet.spawn(publisher, pubqs, inq, cx)
def publisher(pubqs, inq, cx):
LOG.info("Publisher running")
try:
while True:
# what happens if client does not pick up
# what happens if client dies during queue wait
try:
with eventlet.Timeout(1):
item = inq.get()
s = pickle.dumps(item, pickle.HIGHEST_PROTOCOL)
# s = "{0[0]} {0[1]}\n\r".format(item)
cx.send(s)
except eventlet.Timeout:
# raises IOError if connection lost
cx.fileno()
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
pubqs.remove(inq)
LOG.info("Publisher terminated")
def receiver(cx):
LOG.info("Receiver running")
try:
while True:
# what happens if client does not pick up
s = cx.recv(4096)
if not s:
break
LOG.info(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
LOG.info("Receiver terminated")
def controller(prds, pubqs):
while True:
LOG.info("controller: price update cycle, %i pubqs", len(pubqs))
Pricer.VOLA = update_vola(Pricer.VOLA)
for prd in prds.values():
prd.run()
for pubq in pubqs:
pubq.put((prd.name, prd.prc))
eventlet.sleep(5)
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
class Pricer(object):
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
def run(self):
self.prc += random.choice((-1, +1)) * self.prc * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(funcName)10s: %(message)s',
datefmt='%H:%M:%S')
main()

View File

@@ -1,133 +0,0 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.multi_thread
Price Publisher
"""
# imports
from argparse import ArgumentParser
import gevent.socket
import gevent.queue
import gevent
import logging
import os
import random
import socket
import sys
LOG = logging.getLogger()
# definitions
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description="Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from %(dest)s")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
print args
# create product dict
threads = []
prds = { }
pubqs = []
for n in range(10):
key = "AB" + "{:04}".format(n)
prds["AB" + key] = Pricer(key)
# start one thread for price changes
gevent.spawn(controller, prds, pubqs)
address = ('localhost', 8010)
gevent.spawn(listener, address, pubqs)
# main thread runs gevent loop
while True:
gevent.sleep(10)
def listener(address, pubqs):
sock = gevent.socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.listen(1)
while True:
LOG.info('waiting for connection on %s', address)
cx, address = sock.accept()
LOG.info("accepting connection from %s", address)
inq = gevent.queue.Queue()
pubqs.append(inq)
gevent.spawn(receiver, cx)
gevent.spawn(publisher, pubqs, inq, cx)
def publisher(pubqs, inq, cx):
LOG.info("Publisher running")
try:
while True:
# what happens if client does not pick up
# what happens if client dies during queue wait
item = inq.get()
# TODO: pickle
s = "{0[0]} {0[1]}\n\r".format(item)
cx.send(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
pubqs.remove(inq)
LOG.info("Publisher terminated")
def receiver(cx):
LOG.info("Receiver running")
try:
while True:
# what happens if client does not pick up
s = cx.recv(4096)
if not s:
break
LOG.info(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
LOG.info("Receiver terminated")
def controller(prds, pubqs):
while True:
LOG.info("Price update cycle")
Pricer.VOLA = update_vola(Pricer.VOLA)
LOG.debug("controller: %i pubqs", len(pubqs))
for prd in prds.values():
prd.run()
for pubq in pubqs:
pubq.put((prd.name, prd.prc))
gevent.sleep(5)
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
class Pricer():
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
def run(self):
self.prc += random.choice((-1, +1)) * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s',
datefmt='%H:%M:%S')
main()

View File

@@ -1,141 +0,0 @@
# Copyright (c) 2011 Andreas Balogh
# See LICENSE for details.
""" prc_publish.multi_thread
Price Publisher
"""
# imports
from gevent import monkey
monkey.patch_all()
from argparse import ArgumentParser
from gevent.queue import Queue
import gevent
import logging
import os
import random
import socket
import sys
import threading
LOG = logging.getLogger()
# definitions
def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), " ".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description="Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from %(dest)s")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
print args
# create product dict
threads = []
prds = { }
pubqs = []
for n in range(10):
key = "AB" + "{:04}".format(n)
prds["AB" + key] = Pricer(key)
# start one thread for price changes
t1 = threading.Thread(target=controller, args=(prds, pubqs))
t1.start()
address = ('localhost', 8010)
t2 = threading.Thread(target=listener, args=(address, pubqs))
t2.start()
# main thread runs gevent loop
while True:
gevent.sleep(10)
def listener(address, pubqs):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(address)
sock.listen(1)
while True:
LOG.info('waiting for connection on %s', address)
cx, address = sock.accept()
LOG.info("accepting connection from %s", address)
inq = Queue()
pubqs.append(inq)
rec = threading.Thread(target=receiver, args=(cx,))
pub = threading.Thread(target=publisher, args=(pubqs, inq, cx))
rec.start()
pub.start()
def publisher(pubqs, inq, cx):
LOG.info("Publisher running")
try:
while True:
# what happens if client does not pick up
# what happens if client dies during queue wait
item = inq.get()
# TODO: pickle
s = "{0[0]} {0[1]}\n\r".format(item)
cx.send(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
pubqs.remove(inq)
LOG.info("Publisher terminated")
def receiver(cx):
LOG.info("Receiver running")
try:
while True:
# what happens if client does not pick up
s = cx.recv(4096)
if not s:
break
LOG.info(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
LOG.info("Receiver terminated")
def controller(prds, pubqs):
while True:
LOG.info("Price update cycle")
Pricer.VOLA = update_vola(Pricer.VOLA)
LOG.debug("controller: %i pubqs", len(pubqs))
for prd in prds.values():
prd.run()
for pubq in pubqs:
pubq.put((prd.name, prd.prc))
gevent.sleep(5)
def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola
class Pricer():
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0
self.daemon = True
def run(self):
self.prc += random.choice((-1, +1)) * self.VOLA
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).4s %(threadName)10s %(message)s',
datefmt='%H:%M:%S')
main()